Commit ecc436fc authored by Ben Walker's avatar Ben Walker Committed by Jim Harris
Browse files

nvmf: Remove global g_rdma



The transport API now allows for multiple transport
objects, so allocate them on demand instead of using
a single global.

Change-Id: I5dd35f287fe7312e6185c75ae75e2488ec8cc78e
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.gerrithub.io/371990


Reviewed-by: default avatarDaniel Verkamp <daniel.verkamp@intel.com>
Tested-by: default avatarSPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
parent baa936a1
Loading
Loading
Loading
Loading
+87 −68
Original line number Diff line number Diff line
@@ -186,7 +186,9 @@ struct spdk_nvmf_rdma_listen_addr {
	TAILQ_ENTRY(spdk_nvmf_rdma_listen_addr)	link;
};

struct spdk_nvmf_rdma {
struct spdk_nvmf_rdma_transport {
	struct spdk_nvmf_transport	transport;

	struct rdma_event_channel	*event_channel;

	pthread_mutex_t 		lock;
@@ -198,11 +200,6 @@ struct spdk_nvmf_rdma {
	TAILQ_HEAD(, spdk_nvmf_rdma_listen_addr)	listen_addrs;
};

static struct spdk_nvmf_rdma g_rdma = {
	.lock = PTHREAD_MUTEX_INITIALIZER,
	.listen_addrs = TAILQ_HEAD_INITIALIZER(g_rdma.listen_addrs),
};

static void
spdk_nvmf_rdma_qpair_destroy(struct spdk_nvmf_rdma_qpair *rdma_qpair)
{
@@ -240,6 +237,7 @@ spdk_nvmf_rdma_qpair_create(struct spdk_nvmf_transport *transport,
			    struct rdma_cm_id *id, struct ibv_comp_channel *channel,
			    uint16_t max_queue_depth, uint16_t max_rw_depth, uint32_t subsystem_id)
{
	struct spdk_nvmf_rdma_transport *rtransport;
	struct spdk_nvmf_rdma_qpair	*rdma_qpair;
	struct spdk_nvmf_qpair		*qpair;
	int				rc, i;
@@ -247,6 +245,8 @@ spdk_nvmf_rdma_qpair_create(struct spdk_nvmf_transport *transport,
	struct spdk_nvmf_rdma_recv	*rdma_recv;
	struct spdk_nvmf_rdma_request	*rdma_req;

	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);

	rdma_qpair = calloc(1, sizeof(struct spdk_nvmf_rdma_qpair));
	if (rdma_qpair == NULL) {
		SPDK_ERRLOG("Could not allocate new connection.\n");
@@ -301,7 +301,7 @@ spdk_nvmf_rdma_qpair_create(struct spdk_nvmf_transport *transport,
					    0x1000, NULL);
	rdma_qpair->cpls = spdk_dma_zmalloc(max_queue_depth * sizeof(*rdma_qpair->cpls),
					    0x1000, NULL);
	rdma_qpair->bufs = spdk_dma_zmalloc(max_queue_depth * g_rdma.in_capsule_data_size,
	rdma_qpair->bufs = spdk_dma_zmalloc(max_queue_depth * rtransport->in_capsule_data_size,
					    0x1000, NULL);
	if (!rdma_qpair->reqs || !rdma_qpair->recvs || !rdma_qpair->cmds ||
	    !rdma_qpair->cpls || !rdma_qpair->bufs) {
@@ -317,7 +317,7 @@ spdk_nvmf_rdma_qpair_create(struct spdk_nvmf_transport *transport,
					 max_queue_depth * sizeof(*rdma_qpair->cpls),
					 0);
	rdma_qpair->bufs_mr = ibv_reg_mr(id->pd, rdma_qpair->bufs,
					 max_queue_depth * g_rdma.in_capsule_data_size,
					 max_queue_depth * rtransport->in_capsule_data_size,
					 IBV_ACCESS_LOCAL_WRITE |
					 IBV_ACCESS_REMOTE_WRITE);
	if (!rdma_qpair->cmds_mr || !rdma_qpair->cpls_mr || !rdma_qpair->bufs_mr) {
@@ -330,7 +330,7 @@ spdk_nvmf_rdma_qpair_create(struct spdk_nvmf_transport *transport,
	SPDK_TRACELOG(SPDK_TRACE_RDMA, "Completion Array: %p Length: %lx LKey: %x\n",
		      rdma_qpair->cpls, max_queue_depth * sizeof(*rdma_qpair->cpls), rdma_qpair->cpls_mr->lkey);
	SPDK_TRACELOG(SPDK_TRACE_RDMA, "In Capsule Data Array: %p Length: %x LKey: %x\n",
		      rdma_qpair->bufs, max_queue_depth * g_rdma.in_capsule_data_size, rdma_qpair->bufs_mr->lkey);
		      rdma_qpair->bufs, max_queue_depth * rtransport->in_capsule_data_size, rdma_qpair->bufs_mr->lkey);

	for (i = 0; i < max_queue_depth; i++) {
		struct ibv_recv_wr *bad_wr = NULL;
@@ -338,14 +338,14 @@ spdk_nvmf_rdma_qpair_create(struct spdk_nvmf_transport *transport,
		rdma_recv = &rdma_qpair->recvs[i];

		/* Set up memory to receive commands */
		rdma_recv->buf = (void *)((uintptr_t)rdma_qpair->bufs + (i * g_rdma.in_capsule_data_size));
		rdma_recv->buf = (void *)((uintptr_t)rdma_qpair->bufs + (i * rtransport->in_capsule_data_size));

		rdma_recv->sgl[0].addr = (uintptr_t)&rdma_qpair->cmds[i];
		rdma_recv->sgl[0].length = sizeof(rdma_qpair->cmds[i]);
		rdma_recv->sgl[0].lkey = rdma_qpair->cmds_mr->lkey;

		rdma_recv->sgl[1].addr = (uintptr_t)rdma_recv->buf;
		rdma_recv->sgl[1].length = g_rdma.in_capsule_data_size;
		rdma_recv->sgl[1].length = rtransport->in_capsule_data_size;
		rdma_recv->sgl[1].lkey = rdma_qpair->bufs_mr->lkey;

		rdma_recv->wr.wr_id = (uintptr_t)rdma_recv;
@@ -528,6 +528,7 @@ spdk_nvmf_rdma_request_transfer_data(struct spdk_nvmf_request *req)
static int
nvmf_rdma_connect(struct spdk_nvmf_transport *transport, struct rdma_cm_event *event)
{
	struct spdk_nvmf_rdma_transport *rtransport;
	struct spdk_nvmf_rdma_qpair	*rdma_qpair = NULL;
	struct spdk_nvmf_rdma_listen_addr *addr;
	struct rdma_conn_param		*rdma_param = NULL;
@@ -540,6 +541,8 @@ nvmf_rdma_connect(struct spdk_nvmf_transport *transport, struct rdma_cm_event *e
	uint32_t			subsystem_id = 0;
	int 				rc;

	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);

	if (event->id == NULL) {
		SPDK_ERRLOG("connect request: missing cm_id\n");
		goto err0;
@@ -572,9 +575,9 @@ nvmf_rdma_connect(struct spdk_nvmf_transport *transport, struct rdma_cm_event *e
	SPDK_TRACELOG(SPDK_TRACE_RDMA, "Calculating Queue Depth\n");

	/* Start with the maximum queue depth allowed by the target */
	max_queue_depth = g_rdma.max_queue_depth;
	max_rw_depth = g_rdma.max_queue_depth;
	SPDK_TRACELOG(SPDK_TRACE_RDMA, "Target Max Queue Depth: %d\n", g_rdma.max_queue_depth);
	max_queue_depth = rtransport->max_queue_depth;
	max_rw_depth = rtransport->max_queue_depth;
	SPDK_TRACELOG(SPDK_TRACE_RDMA, "Target Max Queue Depth: %d\n", rtransport->max_queue_depth);

	/* Next check the local NIC's hardware limitations */
	SPDK_TRACELOG(SPDK_TRACE_RDMA,
@@ -723,6 +726,7 @@ spdk_nvmf_request_prep_data(struct spdk_nvmf_request *req)
	struct spdk_nvmf_rdma_request		*rdma_req;
	struct spdk_nvmf_rdma_poll_group	*rgroup;
	struct spdk_nvme_sgl_descriptor		*sgl;
	struct spdk_nvmf_rdma_transport		*rtransport;

	cmd = &req->cmd->nvme_cmd;
	rsp = &req->rsp->nvme_cpl;
@@ -754,14 +758,15 @@ spdk_nvmf_request_prep_data(struct spdk_nvmf_request *req)
		return SPDK_NVMF_REQUEST_PREP_READY;
	}

	rtransport = SPDK_CONTAINEROF(req->qpair->transport, struct spdk_nvmf_rdma_transport, transport);
	sgl = &cmd->dptr.sgl1;

	if (sgl->generic.type == SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK &&
	    (sgl->keyed.subtype == SPDK_NVME_SGL_SUBTYPE_ADDRESS ||
	     sgl->keyed.subtype == SPDK_NVME_SGL_SUBTYPE_INVALIDATE_KEY)) {
		if (sgl->keyed.length > g_rdma.max_io_size) {
		if (sgl->keyed.length > rtransport->max_io_size) {
			SPDK_ERRLOG("SGL length 0x%x exceeds max io size 0x%x\n",
				    sgl->keyed.length, g_rdma.max_io_size);
				    sgl->keyed.length, rtransport->max_io_size);
			rsp->status.sc = SPDK_NVME_SC_DATA_SGL_LENGTH_INVALID;
			return SPDK_NVMF_REQUEST_PREP_ERROR;
		}
@@ -782,7 +787,7 @@ spdk_nvmf_request_prep_data(struct spdk_nvmf_request *req)
			 */
			assert(cmd->opc == SPDK_NVME_OPC_FABRIC);
			assert(req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER);
			assert(req->length <= g_rdma.in_capsule_data_size);
			assert(req->length <= rtransport->in_capsule_data_size);

			/* Use the in capsule data buffer, even though this isn't in capsule data. */
			SPDK_TRACELOG(SPDK_TRACE_RDMA, "Request using in capsule buffer for non-capsule data\n");
@@ -817,7 +822,7 @@ spdk_nvmf_request_prep_data(struct spdk_nvmf_request *req)
	} else if (sgl->generic.type == SPDK_NVME_SGL_TYPE_DATA_BLOCK &&
		   sgl->unkeyed.subtype == SPDK_NVME_SGL_SUBTYPE_OFFSET) {
		uint64_t offset = sgl->address;
		uint32_t max_len = g_rdma.in_capsule_data_size;
		uint32_t max_len = rtransport->in_capsule_data_size;

		SPDK_TRACELOG(SPDK_TRACE_NVMF, "In-capsule data: offset 0x%" PRIx64 ", length 0x%x\n",
			      offset, sgl->unkeyed.length);
@@ -915,55 +920,55 @@ static struct spdk_nvmf_transport *
spdk_nvmf_rdma_create(struct spdk_nvmf_tgt *tgt)
{
	int rc;
	struct spdk_nvmf_transport *transport;
	struct spdk_nvmf_rdma_transport *rtransport;

	transport = calloc(1, sizeof(*transport));
	if (!transport) {
	rtransport = calloc(1, sizeof(*rtransport));
	if (!rtransport) {
		return NULL;
	}

	transport->tgt = tgt;
	transport->ops = &spdk_nvmf_transport_rdma;
	pthread_mutex_init(&rtransport->lock, NULL);
	TAILQ_INIT(&rtransport->listen_addrs);

	rtransport->transport.tgt = tgt;
	rtransport->transport.ops = &spdk_nvmf_transport_rdma;

	SPDK_NOTICELOG("*** RDMA Transport Init ***\n");

	pthread_mutex_lock(&g_rdma.lock);
	g_rdma.max_queue_depth = tgt->max_queue_depth;
	g_rdma.max_io_size = tgt->max_io_size;
	g_rdma.in_capsule_data_size = tgt->in_capsule_data_size;
	rtransport->max_queue_depth = tgt->max_queue_depth;
	rtransport->max_io_size = tgt->max_io_size;
	rtransport->in_capsule_data_size = tgt->in_capsule_data_size;

	g_rdma.event_channel = rdma_create_event_channel();
	if (g_rdma.event_channel == NULL) {
	rtransport->event_channel = rdma_create_event_channel();
	if (rtransport->event_channel == NULL) {
		SPDK_ERRLOG("rdma_create_event_channel() failed, %s\n", strerror(errno));
		free(transport);
		pthread_mutex_unlock(&g_rdma.lock);
		free(rtransport);
		return NULL;
	}

	rc = fcntl(g_rdma.event_channel->fd, F_SETFL, O_NONBLOCK);
	rc = fcntl(rtransport->event_channel->fd, F_SETFL, O_NONBLOCK);
	if (rc < 0) {
		SPDK_ERRLOG("fcntl to set fd to non-blocking failed\n");
		free(transport);
		pthread_mutex_unlock(&g_rdma.lock);
		free(rtransport);
		return NULL;
	}

	pthread_mutex_unlock(&g_rdma.lock);
	return transport;
	return &rtransport->transport;
}

static int
spdk_nvmf_rdma_destroy(struct spdk_nvmf_transport *transport)
{
	pthread_mutex_lock(&g_rdma.lock);
	struct spdk_nvmf_rdma_transport *rtransport;

	assert(TAILQ_EMPTY(&g_rdma.listen_addrs));
	if (g_rdma.event_channel != NULL) {
		rdma_destroy_event_channel(g_rdma.event_channel);
	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);

	assert(TAILQ_EMPTY(&rtransport->listen_addrs));
	if (rtransport->event_channel != NULL) {
		rdma_destroy_event_channel(rtransport->event_channel);
	}
	pthread_mutex_unlock(&g_rdma.lock);

	free(transport);
	free(rtransport);

	return 0;
}
@@ -972,10 +977,13 @@ static int
spdk_nvmf_rdma_listen(struct spdk_nvmf_transport *transport,
		      const struct spdk_nvme_transport_id *trid)
{
	struct spdk_nvmf_rdma_transport *rtransport;
	struct spdk_nvmf_rdma_listen_addr *addr_tmp, *addr;
	struct sockaddr_in saddr;
	int rc;

	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);

	addr = calloc(1, sizeof(*addr));
	if (!addr) {
		return -ENOMEM;
@@ -989,23 +997,23 @@ spdk_nvmf_rdma_listen(struct spdk_nvmf_transport *transport,
	snprintf(addr->trid.traddr, sizeof(addr->trid.traddr), "%s", trid->traddr);
	snprintf(addr->trid.trsvcid, sizeof(addr->trid.trsvcid), "%s", trid->trsvcid);

	pthread_mutex_lock(&g_rdma.lock);
	assert(g_rdma.event_channel != NULL);
	TAILQ_FOREACH(addr_tmp, &g_rdma.listen_addrs, link) {
	pthread_mutex_lock(&rtransport->lock);
	assert(rtransport->event_channel != NULL);
	TAILQ_FOREACH(addr_tmp, &rtransport->listen_addrs, link) {
		if (spdk_nvme_transport_id_compare(&addr_tmp->trid, &addr->trid) == 0) {
			addr_tmp->ref++;
			free(addr);
			/* Already listening at this address */
			pthread_mutex_unlock(&g_rdma.lock);
			pthread_mutex_unlock(&rtransport->lock);
			return 0;
		}
	}

	rc = rdma_create_id(g_rdma.event_channel, &addr->id, addr, RDMA_PS_TCP);
	rc = rdma_create_id(rtransport->event_channel, &addr->id, addr, RDMA_PS_TCP);
	if (rc < 0) {
		SPDK_ERRLOG("rdma_create_id() failed\n");
		free(addr);
		pthread_mutex_unlock(&g_rdma.lock);
		pthread_mutex_unlock(&rtransport->lock);
		return rc;
	}

@@ -1018,7 +1026,7 @@ spdk_nvmf_rdma_listen(struct spdk_nvmf_transport *transport,
		SPDK_ERRLOG("rdma_bind_addr() failed\n");
		rdma_destroy_id(addr->id);
		free(addr);
		pthread_mutex_unlock(&g_rdma.lock);
		pthread_mutex_unlock(&rtransport->lock);
		return rc;
	}

@@ -1027,7 +1035,7 @@ spdk_nvmf_rdma_listen(struct spdk_nvmf_transport *transport,
		SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
		rdma_destroy_id(addr->id);
		free(addr);
		pthread_mutex_unlock(&g_rdma.lock);
		pthread_mutex_unlock(&rtransport->lock);
		return rc;
	}

@@ -1036,7 +1044,7 @@ spdk_nvmf_rdma_listen(struct spdk_nvmf_transport *transport,
		SPDK_ERRLOG("Failed to create completion channel\n");
		rdma_destroy_id(addr->id);
		free(addr);
		pthread_mutex_unlock(&g_rdma.lock);
		pthread_mutex_unlock(&rtransport->lock);
		return rc;
	}
	SPDK_TRACELOG(SPDK_TRACE_RDMA, "For listen id %p with context %p, created completion channel %p\n",
@@ -1048,7 +1056,7 @@ spdk_nvmf_rdma_listen(struct spdk_nvmf_transport *transport,
		ibv_destroy_comp_channel(addr->comp_channel);
		rdma_destroy_id(addr->id);
		free(addr);
		pthread_mutex_unlock(&g_rdma.lock);
		pthread_mutex_unlock(&rtransport->lock);
		return rc;
	}

@@ -1058,7 +1066,7 @@ spdk_nvmf_rdma_listen(struct spdk_nvmf_transport *transport,
		ibv_destroy_comp_channel(addr->comp_channel);
		rdma_destroy_id(addr->id);
		free(addr);
		pthread_mutex_unlock(&g_rdma.lock);
		pthread_mutex_unlock(&rtransport->lock);
		return rc;
	}

@@ -1067,8 +1075,8 @@ spdk_nvmf_rdma_listen(struct spdk_nvmf_transport *transport,

	addr->ref = 1;

	TAILQ_INSERT_TAIL(&g_rdma.listen_addrs, addr, link);
	pthread_mutex_unlock(&g_rdma.lock);
	TAILQ_INSERT_TAIL(&rtransport->listen_addrs, addr, link);
	pthread_mutex_unlock(&rtransport->lock);

	return 0;
}
@@ -1077,9 +1085,12 @@ static int
spdk_nvmf_rdma_stop_listen(struct spdk_nvmf_transport *transport,
			   const struct spdk_nvme_transport_id *_trid)
{
	struct spdk_nvmf_rdma_transport *rtransport;
	struct spdk_nvmf_rdma_listen_addr *addr, *tmp;
	struct spdk_nvme_transport_id trid = {};

	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);

	/* Selectively copy the trid. Things like NQN don't matter here - that
	 * mapping is enforced elsewhere.
	 */
@@ -1088,13 +1099,13 @@ spdk_nvmf_rdma_stop_listen(struct spdk_nvmf_transport *transport,
	snprintf(trid.traddr, sizeof(addr->trid.traddr), "%s", _trid->traddr);
	snprintf(trid.trsvcid, sizeof(addr->trid.trsvcid), "%s", _trid->trsvcid);

	pthread_mutex_lock(&g_rdma.lock);
	TAILQ_FOREACH_SAFE(addr, &g_rdma.listen_addrs, link, tmp) {
	pthread_mutex_lock(&rtransport->lock);
	TAILQ_FOREACH_SAFE(addr, &rtransport->listen_addrs, link, tmp) {
		if (spdk_nvme_transport_id_compare(&addr->trid, &trid) == 0) {
			assert(addr->ref > 0);
			addr->ref--;
			if (addr->ref == 0) {
				TAILQ_REMOVE(&g_rdma.listen_addrs, addr, link);
				TAILQ_REMOVE(&rtransport->listen_addrs, addr, link);
				ibv_destroy_comp_channel(addr->comp_channel);
				rdma_destroy_id(addr->id);
				free(addr);
@@ -1103,7 +1114,7 @@ spdk_nvmf_rdma_stop_listen(struct spdk_nvmf_transport *transport,
		}
	}

	pthread_mutex_unlock(&g_rdma.lock);
	pthread_mutex_unlock(&rtransport->lock);
	return 0;
}

@@ -1113,11 +1124,14 @@ spdk_nvmf_rdma_poll(struct spdk_nvmf_qpair *qpair);
static void
spdk_nvmf_rdma_accept(struct spdk_nvmf_transport *transport)
{
	struct spdk_nvmf_rdma_transport *rtransport;
	struct rdma_cm_event		*event;
	int				rc;
	struct spdk_nvmf_rdma_qpair	*rdma_qpair, *tmp;

	if (g_rdma.event_channel == NULL) {
	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);

	if (rtransport->event_channel == NULL) {
		return;
	}

@@ -1136,7 +1150,7 @@ spdk_nvmf_rdma_accept(struct spdk_nvmf_transport *transport)
	}

	while (1) {
		rc = rdma_get_cm_event(g_rdma.event_channel, &event);
		rc = rdma_get_cm_event(rtransport->event_channel, &event);
		if (rc == 0) {
			SPDK_TRACELOG(SPDK_TRACE_RDMA, "Acceptor Event: %s\n", CM_EVENT_STR[event->event]);

@@ -1195,10 +1209,13 @@ spdk_nvmf_rdma_discover(struct spdk_nvmf_transport *transport,
static struct spdk_nvmf_poll_group *
spdk_nvmf_rdma_poll_group_create(struct spdk_nvmf_transport *transport)
{
	struct spdk_nvmf_rdma_transport *rtransport;
	struct spdk_nvmf_rdma_poll_group	*rgroup;
	int				i;
	struct spdk_nvmf_rdma_buf	*buf;

	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);

	rgroup = calloc(1, sizeof(*rgroup));
	if (!rgroup) {
		return NULL;
@@ -1207,18 +1224,18 @@ spdk_nvmf_rdma_poll_group_create(struct spdk_nvmf_transport *transport)
	/* TODO: Make the number of elements in this pool configurable. For now, one full queue
	 *       worth seems reasonable.
	 */
	rgroup->buf = spdk_dma_zmalloc(g_rdma.max_queue_depth * g_rdma.max_io_size,
	rgroup->buf = spdk_dma_zmalloc(rtransport->max_queue_depth * rtransport->max_io_size,
				       0x20000, NULL);
	if (!rgroup->buf) {
		SPDK_ERRLOG("Large buffer pool allocation failed (%d x %d)\n",
			    g_rdma.max_queue_depth, g_rdma.max_io_size);
			    rtransport->max_queue_depth, rtransport->max_io_size);
		free(rgroup);
		return NULL;
	}

	SLIST_INIT(&rgroup->data_buf_pool);
	for (i = 0; i < g_rdma.max_queue_depth; i++) {
		buf = (struct spdk_nvmf_rdma_buf *)(rgroup->buf + (i * g_rdma.max_io_size));
	for (i = 0; i < rtransport->max_queue_depth; i++) {
		buf = (struct spdk_nvmf_rdma_buf *)(rgroup->buf + (i * rtransport->max_io_size));
		SLIST_INSERT_HEAD(&rgroup->data_buf_pool, buf, link);
	}

@@ -1247,9 +1264,11 @@ spdk_nvmf_rdma_poll_group_add(struct spdk_nvmf_poll_group *group,
{
	struct spdk_nvmf_rdma_poll_group	*rgroup;
	struct spdk_nvmf_rdma_qpair 		*rdma_qpair;
	struct spdk_nvmf_rdma_transport		*rtransport;

	rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group);
	rdma_qpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
	rtransport = SPDK_CONTAINEROF(group->transport, struct spdk_nvmf_rdma_transport, transport);

	if (rgroup->verbs != NULL) {
		if (rgroup->verbs != rdma_qpair->cm_id->verbs) {
@@ -1264,19 +1283,19 @@ spdk_nvmf_rdma_poll_group_add(struct spdk_nvmf_poll_group *group,

	rgroup->verbs = rdma_qpair->cm_id->verbs;
	rgroup->buf_mr = ibv_reg_mr(rdma_qpair->cm_id->pd, rgroup->buf,
				    g_rdma.max_queue_depth * g_rdma.max_io_size,
				    rtransport->max_queue_depth * rtransport->max_io_size,
				    IBV_ACCESS_LOCAL_WRITE |
				    IBV_ACCESS_REMOTE_WRITE);
	if (!rgroup->buf_mr) {
		SPDK_ERRLOG("Large buffer pool registration failed (%d x %d)\n",
			    g_rdma.max_queue_depth, g_rdma.max_io_size);
			    rtransport->max_queue_depth, rtransport->max_io_size);
		spdk_dma_free(rgroup->buf);
		free(rgroup);
		return -1;
	}

	SPDK_TRACELOG(SPDK_TRACE_RDMA, "Controller session Shared Data Pool: %p Length: %x LKey: %x\n",
		      rgroup->buf,  g_rdma.max_queue_depth * g_rdma.max_io_size, rgroup->buf_mr->lkey);
		      rgroup->buf,  rtransport->max_queue_depth * rtransport->max_io_size, rgroup->buf_mr->lkey);

	return 0;
}