Commit 3c423f40 authored by Ben Walker's avatar Ben Walker
Browse files

nvmf: Turn RDMA req processing into a state machine



Formalize a state machine around request processing.
The state is advanced by calling
spdk_nvmf_rdma_request_process().

This clarifies the implementation considerably and
cleans up a few corner cases. Unfortunately, the diff
is also enormous and there does not appear to be a
way to reduce it.

Change-Id: I5741da24bcffc1aef367ebfe3dd1f589c5746901
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.gerrithub.io/374540


Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Tested-by: default avatarSPDK Automated Test System <sys_sgsw@intel.com>
parent 1ff5f4ab
Loading
Loading
Loading
Loading
+261 −273
Original line number Diff line number Diff line
@@ -59,6 +59,48 @@
#define NVMF_DEFAULT_TX_SGE		1
#define NVMF_DEFAULT_RX_SGE		2

enum spdk_nvmf_rdma_request_state {
	/* The request is not currently in use */
	RDMA_REQUEST_STATE_FREE = 0,

	/* Initial state when request first received */
	RDMA_REQUEST_STATE_NEW,

	/* The request is queued until a data buffer is available. */
	RDMA_REQUEST_STATE_NEED_BUFFER,

	/* The request is waiting on RDMA queue depth availability
	 * to transfer data from the host to the controller.
	 */
	RDMA_REQUEST_STATE_TRANSFER_PENDING_HOST_TO_CONTROLLER,

	/* The request is currently transferring data from the host to the controller. */
	RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER,

	/* The request is ready to execute at the block device */
	RDMA_REQUEST_STATE_READY_TO_EXECUTE,

	/* The request is currently executing at the block device */
	RDMA_REQUEST_STATE_EXECUTING,

	/* The request finished executing at the block device */
	RDMA_REQUEST_STATE_EXECUTED,

	/* The request is waiting on RDMA queue depth availability
	 * to transfer data from the controller to the host.
	 */
	RDMA_REQUEST_STATE_TRANSFER_PENDING_CONTROLLER_TO_HOST,

	/* The request is ready to send a completion */
	RDMA_REQUEST_STATE_READY_TO_COMPLETE,

	/* The request currently has a completion outstanding */
	RDMA_REQUEST_STATE_COMPLETING,

	/* The request completed and can be marked free. */
	RDMA_REQUEST_STATE_COMPLETED,
};

/* This structure holds commands as they are received off the wire.
 * It must be dynamically paired with a full request object
 * (spdk_nvmf_rdma_request) to service a request. It is separate
@@ -80,6 +122,8 @@ struct spdk_nvmf_rdma_request {
	struct spdk_nvmf_request		req;
	bool					data_from_pool;

	enum spdk_nvmf_rdma_request_state	state;

	struct spdk_nvmf_rdma_recv		*recv;

	struct {
@@ -495,35 +539,6 @@ request_transfer_out(struct spdk_nvmf_request *req)
	return rc;
}

static int
spdk_nvmf_rdma_request_transfer_data(struct spdk_nvmf_request *req)
{
	struct spdk_nvmf_rdma_request	*rdma_req;
	struct spdk_nvmf_qpair		*qpair;
	struct spdk_nvmf_rdma_qpair	*rdma_qpair;

	qpair = req->qpair;
	rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req);
	rdma_qpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);

	if (req->xfer == SPDK_NVME_DATA_NONE) {
		/* If no data transfer, this can bypass the queue */
		return request_transfer_out(req);
	}

	if (rdma_qpair->cur_rdma_rw_depth < rdma_qpair->max_rw_depth) {
		if (req->xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
			return request_transfer_out(req);
		} else if (req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER) {
			return request_transfer_in(req);
		}
	} else {
		TAILQ_INSERT_TAIL(&rdma_qpair->pending_rdma_rw_queue, rdma_req, link);
	}

	return 0;
}

static int
nvmf_rdma_connect(struct spdk_nvmf_transport *transport, struct rdma_cm_event *event)
{
@@ -657,6 +672,7 @@ nvmf_rdma_disconnect(struct rdma_cm_event *evt)
	struct spdk_nvmf_ctrlr		*ctrlr;
	struct spdk_nvmf_subsystem	*subsystem;
	struct spdk_nvmf_rdma_qpair 	*rdma_qpair;
	struct spdk_nvmf_rdma_qpair	*r, *t;

	if (evt->id == NULL) {
		SPDK_ERRLOG("disconnect request: missing cm_id\n");
@@ -673,11 +689,23 @@ nvmf_rdma_disconnect(struct rdma_cm_event *evt)

	rdma_qpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);

	/* The connection may still be in this pending list when a disconnect
	 * event arrives. Search for it and remove it if it is found.
	 */
	TAILQ_FOREACH_SAFE(r, &g_pending_conns, link, t) {
		if (r == rdma_qpair) {
			SPDK_TRACELOG(SPDK_TRACE_RDMA, "Received disconnect for qpair %p before first SEND ack\n",
				      rdma_qpair);
			TAILQ_REMOVE(&g_pending_conns, rdma_qpair, link);
			break;
		}
	}

	ctrlr = qpair->ctrlr;
	if (ctrlr == NULL) {
		/* No ctrlr has been established yet. That means the qpair
		 * must be in the pending connections list. Remove it. */
		TAILQ_REMOVE(&g_pending_conns, rdma_qpair, link);
		/* No ctrlr has been established yet, so destroy
		 * the connection immediately.
		 */
		spdk_nvmf_rdma_qpair_destroy(rdma_qpair);
		return 0;
	}
@@ -710,13 +738,6 @@ static const char *CM_EVENT_STR[] = {
};
#endif /* DEBUG */

typedef enum _spdk_nvmf_request_prep_type {
	SPDK_NVMF_REQUEST_PREP_ERROR = -1,
	SPDK_NVMF_REQUEST_PREP_READY = 0,
	SPDK_NVMF_REQUEST_PREP_PENDING_BUFFER = 1,
	SPDK_NVMF_REQUEST_PREP_PENDING_DATA = 2,
} spdk_nvmf_request_prep_type;

static int
spdk_nvmf_rdma_mem_notify(void *cb_ctx, struct spdk_mem_map *map,
			  enum spdk_mem_map_notify_action action,
@@ -884,102 +905,172 @@ spdk_nvmf_rdma_request_parse_sgl(struct spdk_nvmf_rdma_transport *rtransport,
	return -1;
}

static spdk_nvmf_request_prep_type
spdk_nvmf_request_prep_data(struct spdk_nvmf_request *req)
static bool
spdk_nvmf_rdma_request_process(struct spdk_nvmf_rdma_transport *rtransport,
			       struct spdk_nvmf_rdma_request *rdma_req)
{

	struct spdk_nvmf_rdma_request		*rdma_req;
	struct spdk_nvmf_rdma_transport		*rtransport;
	struct spdk_nvmf_rdma_qpair	*rqpair;
	struct spdk_nvmf_rdma_device	*device;
	struct spdk_nvme_cpl		*rsp = &rdma_req->req.rsp->nvme_cpl;
	int				rc;
	struct spdk_nvmf_rdma_recv	*rdma_recv;
	enum spdk_nvmf_rdma_request_state prev_state;
	bool				progress = false;

	rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req);
	rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);
	device = rqpair->port->device;

	assert(rdma_req->state != RDMA_REQUEST_STATE_FREE);

	req->length = 0;
	req->data = NULL;
	/* The loop here is to allow for several back-to-back state changes. */
	do {
		prev_state = rdma_req->state;

	req->xfer = spdk_nvmf_rdma_request_get_xfer(rdma_req);
	if (req->xfer == SPDK_NVME_DATA_NONE) {
		return SPDK_NVMF_REQUEST_PREP_READY;
		SPDK_TRACELOG(SPDK_TRACE_RDMA, "Request %p entering state %d\n", rdma_req, prev_state);

		switch (rdma_req->state) {
		case RDMA_REQUEST_STATE_FREE:
			/* Some external code must kick a request into RDMA_REQUEST_STATE_NEW
			 * to escape this state. */
			break;
		case RDMA_REQUEST_STATE_NEW:
			rqpair->cur_queue_depth++;
			rdma_recv = rdma_req->recv;

			/* The first element of the SGL is the NVMe command */
			rdma_req->req.cmd = (union nvmf_h2c_msg *)rdma_recv->sgl[0].addr;
			memset(rdma_req->req.rsp, 0, sizeof(*rdma_req->req.rsp));

			TAILQ_REMOVE(&rqpair->incoming_queue, rdma_recv, link);
			TAILQ_REMOVE(&rqpair->free_queue, rdma_req, link);

			/* The next state transition depends on the data transfer needs of this request. */
			rdma_req->req.xfer = spdk_nvmf_rdma_request_get_xfer(rdma_req);

			/* If no data to transfer, ready to execute. */
			if (rdma_req->req.xfer == SPDK_NVME_DATA_NONE) {
				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE;
				break;
			}

			rdma_req->state = RDMA_REQUEST_STATE_NEED_BUFFER;
			TAILQ_INSERT_TAIL(&rqpair->pending_data_buf_queue, rdma_req, link);
			break;
		case RDMA_REQUEST_STATE_NEED_BUFFER:
			assert(rdma_req->req.xfer != SPDK_NVME_DATA_NONE);

			if (rdma_req != TAILQ_FIRST(&rqpair->pending_data_buf_queue)) {
				/* This request needs to wait in line to obtain a buffer */
				break;
			}

	rtransport = SPDK_CONTAINEROF(req->qpair->transport, struct spdk_nvmf_rdma_transport, transport);
	device = SPDK_CONTAINEROF(req->qpair, struct spdk_nvmf_rdma_qpair, qpair)->port->device;
			TAILQ_REMOVE(&rqpair->pending_data_buf_queue, rdma_req, link);

			/* Try to get a data buffer */
			rc = spdk_nvmf_rdma_request_parse_sgl(rtransport, device, rdma_req);
			if (rc < 0) {
		return SPDK_NVMF_REQUEST_PREP_ERROR;
				rsp->status.sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
				break;
			}

	if (!req->data) {
		return SPDK_NVMF_REQUEST_PREP_PENDING_BUFFER;
			if (!rdma_req->req.data) {
				/* No buffers available. Put this request back at the head of
				 * the queue. */
				TAILQ_INSERT_HEAD(&rqpair->pending_data_buf_queue, rdma_req, link);
				break;
			}

			/* If data is transferring from host to controller and the data didn't
			 * arrive using in capsule data, we need to do a transfer from the host.
			 */
	if (req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER && rdma_req->data_from_pool) {
		return SPDK_NVMF_REQUEST_PREP_PENDING_DATA;
	}

	return SPDK_NVMF_REQUEST_PREP_READY;
			if (rdma_req->req.xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER && rdma_req->data_from_pool) {
				rdma_req->state = RDMA_REQUEST_STATE_TRANSFER_PENDING_HOST_TO_CONTROLLER;
				TAILQ_INSERT_TAIL(&rqpair->pending_rdma_rw_queue, rdma_req, link);
				break;
			}

static int
spdk_nvmf_rdma_handle_pending_rdma_rw(struct spdk_nvmf_qpair *qpair)
{
	struct spdk_nvmf_rdma_qpair		*rdma_qpair;
	struct spdk_nvmf_rdma_transport		*rtransport;
	struct spdk_nvmf_rdma_request		*rdma_req, *tmp;
	int 					rc;
	int 					count = 0;

	rdma_qpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
	rtransport = SPDK_CONTAINEROF(qpair->transport, struct spdk_nvmf_rdma_transport, transport);

	/* First, try to assign free data buffers to requests that need one */
	if (qpair->ctrlr) {
		TAILQ_FOREACH_SAFE(rdma_req, &rdma_qpair->pending_data_buf_queue, link, tmp) {
			assert(rdma_req->req.data == NULL);
			rdma_req->req.data = spdk_mempool_get(rtransport->data_buf_pool);
			if (!rdma_req->req.data) {
			rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE;
			break;
		case RDMA_REQUEST_STATE_TRANSFER_PENDING_HOST_TO_CONTROLLER:
			if (rdma_req != TAILQ_FIRST(&rqpair->pending_rdma_rw_queue)) {
				/* This request needs to wait in line to perform RDMA */
				break;
			}
			rdma_req->data.sgl[0].addr = (uintptr_t)rdma_req->req.data;
			rdma_req->data.sgl[0].lkey = ((struct ibv_mr *)spdk_mem_map_translate(rdma_qpair->port->device->map,
						      (uint64_t)rdma_req->req.data))->lkey;
			rdma_req->data_from_pool = true;
			TAILQ_REMOVE(&rdma_qpair->pending_data_buf_queue, rdma_req, link);
			if (rdma_req->req.xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER) {
				TAILQ_INSERT_TAIL(&rdma_qpair->pending_rdma_rw_queue, rdma_req, link);
			} else {
				rc = spdk_nvmf_request_exec(&rdma_req->req);
				if (rc < 0) {
					return -1;
				}
				count++;

			if (rqpair->cur_rdma_rw_depth < rqpair->max_rw_depth) {
				TAILQ_REMOVE(&rqpair->pending_rdma_rw_queue, rdma_req, link);
				rdma_req->state = RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER;
				rc = request_transfer_in(&rdma_req->req);
				if (rc) {
					rsp->status.sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
					rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
				}
			}
			break;
		case RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER:
			/* Some external code must kick a request into RDMA_REQUEST_STATE_READY_TO_EXECUTE
			 * to escape this state. */
			break;
		case RDMA_REQUEST_STATE_READY_TO_EXECUTE:
			rdma_req->state = RDMA_REQUEST_STATE_EXECUTING;
			spdk_nvmf_request_exec(&rdma_req->req);
			break;
		case RDMA_REQUEST_STATE_EXECUTING:
			/* Some external code must kick a request into RDMA_REQUEST_STATE_EXECUTED
			 * to escape this state. */
			break;
		case RDMA_REQUEST_STATE_EXECUTED:
			if (rdma_req->req.xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
				rdma_req->state = RDMA_REQUEST_STATE_TRANSFER_PENDING_CONTROLLER_TO_HOST;
				TAILQ_INSERT_TAIL(&rqpair->pending_rdma_rw_queue, rdma_req, link);
			} else {
				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
			}

	/* Try to initiate RDMA Reads or Writes on requests that have data buffers */
	while (rdma_qpair->cur_rdma_rw_depth < rdma_qpair->max_rw_depth) {
		rdma_req = TAILQ_FIRST(&rdma_qpair->pending_rdma_rw_queue);
		if (spdk_unlikely(!rdma_req)) {
			break;
		case RDMA_REQUEST_STATE_TRANSFER_PENDING_CONTROLLER_TO_HOST:
			if (rdma_req != TAILQ_FIRST(&rqpair->pending_rdma_rw_queue)) {
				/* This request needs to wait in line to perform RDMA */
				break;
			}

		TAILQ_REMOVE(&rdma_qpair->pending_rdma_rw_queue, rdma_req, link);
			if (rqpair->cur_rdma_rw_depth < rqpair->max_rw_depth) {
				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
				TAILQ_REMOVE(&rqpair->pending_rdma_rw_queue, rdma_req, link);
			}
			break;
		case RDMA_REQUEST_STATE_READY_TO_COMPLETE:
			rdma_req->state = RDMA_REQUEST_STATE_COMPLETING;

		SPDK_TRACELOG(SPDK_TRACE_RDMA, "Submitting previously queued for RDMA R/W request %p\n", rdma_req);
			rc = request_transfer_out(&rdma_req->req);
			assert(rc == 0); /* No good way to handle this currently */
			break;
		case RDMA_REQUEST_STATE_COMPLETING:
			/* Some external code must kick a request into RDMA_REQUEST_STATE_COMPLETED
			 * to escape this state. */
			break;
		case RDMA_REQUEST_STATE_COMPLETED:
			assert(rqpair->cur_queue_depth > 0);
			rqpair->cur_queue_depth--;

		rc = spdk_nvmf_rdma_request_transfer_data(&rdma_req->req);
		if (rc) {
			return -1;
			if (rdma_req->data_from_pool) {
				/* Put the buffer back in the pool */
				spdk_mempool_put(rtransport->data_buf_pool, rdma_req->req.data);
				rdma_req->data_from_pool = false;
			}
			rdma_req->req.length = 0;
			rdma_req->req.data = NULL;
			rdma_req->state = RDMA_REQUEST_STATE_FREE;
			TAILQ_INSERT_TAIL(&rqpair->free_queue, rdma_req, link);
			break;
		}

	return count;
		if (rdma_req->state != prev_state) {
			progress = true;
		}
	} while (rdma_req->state != prev_state);

	return progress;
}

/* Public API callbacks begin here */
@@ -1417,36 +1508,14 @@ spdk_nvmf_rdma_poll_group_remove(struct spdk_nvmf_poll_group *group,
static int
spdk_nvmf_rdma_request_complete(struct spdk_nvmf_request *req)
{
	struct spdk_nvme_cpl *rsp = &req->rsp->nvme_cpl;
	int rc;

	if (rsp->status.sc == SPDK_NVME_SC_SUCCESS &&
	    req->xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
		rc = spdk_nvmf_rdma_request_transfer_data(req);
	} else {
		rc = request_transfer_out(req);
	}

	return rc;
}

static void
request_release_buffer(struct spdk_nvmf_request *req)
{
	struct spdk_nvmf_rdma_request	*rdma_req;
	struct spdk_nvmf_qpair		*qpair = req->qpair;
	struct spdk_nvmf_rdma_transport	*rtransport;
	struct spdk_nvmf_rdma_transport	*rtransport = SPDK_CONTAINEROF(req->qpair->transport,
			struct spdk_nvmf_rdma_transport, transport);
	struct spdk_nvmf_rdma_request	*rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req);

	rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req);
	rtransport = SPDK_CONTAINEROF(qpair->transport, struct spdk_nvmf_rdma_transport, transport);
	rdma_req->state = RDMA_REQUEST_STATE_EXECUTED;
	spdk_nvmf_rdma_request_process(rtransport, rdma_req);

	if (rdma_req->data_from_pool) {
		/* Put the buffer back in the pool */
		spdk_mempool_put(rtransport->data_buf_pool, req->data);
		req->data = NULL;
		req->length = 0;
		rdma_req->data_from_pool = false;
	}
	return 0;
}

static void
@@ -1455,68 +1524,41 @@ spdk_nvmf_rdma_close_qpair(struct spdk_nvmf_qpair *qpair)
	spdk_nvmf_rdma_qpair_destroy(SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair));
}

static int
process_incoming_queue(struct spdk_nvmf_rdma_qpair *rdma_qpair)
static void
spdk_nvmf_rdma_qpair_process_pending(struct spdk_nvmf_rdma_transport *rtransport,
				     struct spdk_nvmf_rdma_qpair *rqpair)
{
	struct spdk_nvmf_rdma_recv	*rdma_recv, *tmp;
	struct spdk_nvmf_rdma_request	*rdma_req;
	struct spdk_nvmf_request	*req;
	int rc, count;
	bool error = false;
	struct spdk_nvmf_rdma_recv	*rdma_recv, *recv_tmp;
	struct spdk_nvmf_rdma_request	*rdma_req, *req_tmp;

	count = 0;
	TAILQ_FOREACH_SAFE(rdma_recv, &rdma_qpair->incoming_queue, link, tmp) {
		rdma_req = TAILQ_FIRST(&rdma_qpair->free_queue);
		if (rdma_req == NULL) {
			/* Need to wait for more SEND completions */
	/* We process I/O in the pending_rdma_rw queue at the highest priority. */
	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->pending_rdma_rw_queue, link, req_tmp) {
		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false) {
			break;
		}
		TAILQ_REMOVE(&rdma_qpair->free_queue, rdma_req, link);
		TAILQ_REMOVE(&rdma_qpair->incoming_queue, rdma_recv, link);
		rdma_req->recv = rdma_recv;
		req = &rdma_req->req;

		/* The first element of the SGL is the NVMe command */
		req->cmd = (union nvmf_h2c_msg *)rdma_recv->sgl[0].addr;

		spdk_trace_record(TRACE_NVMF_IO_START, 0, 0, (uint64_t)req, 0);

		memset(req->rsp, 0, sizeof(*req->rsp));
		rc = spdk_nvmf_request_prep_data(req);
		switch (rc) {
		case SPDK_NVMF_REQUEST_PREP_READY:
			SPDK_TRACELOG(SPDK_TRACE_RDMA, "Request %p is ready for execution\n", req);
			/* Data is immediately available */
			rc = spdk_nvmf_request_exec(req);
			if (rc < 0) {
				error = true;
				continue;
			}
			count++;
			break;
		case SPDK_NVMF_REQUEST_PREP_PENDING_BUFFER:
			SPDK_TRACELOG(SPDK_TRACE_RDMA, "Request %p needs data buffer\n", req);
			TAILQ_INSERT_TAIL(&rdma_qpair->pending_data_buf_queue, rdma_req, link);
			break;
		case SPDK_NVMF_REQUEST_PREP_PENDING_DATA:
			SPDK_TRACELOG(SPDK_TRACE_RDMA, "Request %p needs data transfer\n", req);
			rc = spdk_nvmf_rdma_request_transfer_data(req);
			if (rc < 0) {
				error = true;
				continue;
	}
			break;
		case SPDK_NVMF_REQUEST_PREP_ERROR:
			spdk_nvmf_request_complete(req);

	/* The second highest priority is I/O waiting on memory buffers. */
	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->pending_data_buf_queue, link, req_tmp) {
		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false) {
			break;
		}
	}

	if (error) {
		return -1;
	/* The lowest priority is processing newly received commands */
	TAILQ_FOREACH_SAFE(rdma_recv, &rqpair->incoming_queue, link, recv_tmp) {
		rdma_req = TAILQ_FIRST(&rqpair->free_queue);
		if (rdma_req == NULL) {
			/* Need to wait for more SEND completions */
			break;
		}

	return count;
		rdma_req->recv = rdma_recv;
		rdma_req->state = RDMA_REQUEST_STATE_NEW;
		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false) {
			break;
		}
	}
}

static struct spdk_nvmf_rdma_request *
@@ -1549,38 +1591,35 @@ get_rdma_recv_from_wc(struct spdk_nvmf_rdma_qpair *rdma_qpair,
	return rdma_recv;
}

/* Returns the number of times that spdk_nvmf_request_exec was called,
 * or -1 on error.
 */
static int
spdk_nvmf_rdma_poll(struct spdk_nvmf_qpair *qpair)
{
	struct ibv_wc wc[32];
	struct spdk_nvmf_rdma_transport *rtransport;
	struct spdk_nvmf_rdma_qpair	*rdma_qpair;
	struct spdk_nvmf_rdma_request	*rdma_req;
	struct spdk_nvmf_rdma_recv	*rdma_recv;
	struct spdk_nvmf_request *req;
	int reaped, i, rc;
	int reaped, i;
	int count = 0;
	bool error = false;
	char buf[64];

	rtransport = SPDK_CONTAINEROF(qpair->transport, struct spdk_nvmf_rdma_transport, transport);
	rdma_qpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);

	/* Poll for completing operations. */
	rc = ibv_poll_cq(rdma_qpair->cq, 32, wc);
	if (rc < 0) {
	reaped = ibv_poll_cq(rdma_qpair->cq, 32, wc);
	if (reaped < 0) {
		spdk_strerror_r(errno, buf, sizeof(buf));
		SPDK_ERRLOG("Error polling CQ! (%d): %s\n",
			    errno, buf);
		return -1;
	}

	reaped = rc;
	for (i = 0; i < reaped; i++) {
		if (wc[i].status) {
			SPDK_ERRLOG("CQ error on Connection %p, Request 0x%lu (%d): %s\n",
				    qpair, wc[i].wr_id, wc[i].status, ibv_wc_status_str(wc[i].status));
			SPDK_ERRLOG("CQ error on CQ %p, Request 0x%lu (%d): %s\n",
				    rdma_qpair->cq, wc[i].wr_id, wc[i].status, ibv_wc_status_str(wc[i].status));
			error = true;
			continue;
		}
@@ -1588,100 +1627,49 @@ spdk_nvmf_rdma_poll(struct spdk_nvmf_qpair *qpair)
		switch (wc[i].opcode) {
		case IBV_WC_SEND:
			rdma_req = get_rdma_req_from_wc(rdma_qpair, &wc[i]);
			req = &rdma_req->req;

			assert(rdma_qpair->cur_queue_depth > 0);
			SPDK_TRACELOG(SPDK_TRACE_RDMA,
				      "RDMA SEND Complete. Request: %p Connection: %p Outstanding I/O: %d\n",
				      req, qpair, rdma_qpair->cur_queue_depth - 1);
			rdma_qpair->cur_queue_depth--;
			assert(rdma_req->state == RDMA_REQUEST_STATE_COMPLETING);
			rdma_req->state = RDMA_REQUEST_STATE_COMPLETED;

			/* The request may still own a data buffer. Release it */
			request_release_buffer(req);
			spdk_nvmf_rdma_request_process(rtransport, rdma_req);

			/* Put the request back on the free list */
			TAILQ_INSERT_TAIL(&rdma_qpair->free_queue, rdma_req, link);
			count++;

			/* Try to process queued incoming requests */
			rc = process_incoming_queue(rdma_qpair);
			if (rc < 0) {
				error = true;
				continue;
			}
			count += rc;
			/* Try to process other queued requests */
			spdk_nvmf_rdma_qpair_process_pending(rtransport, rdma_qpair);
			break;

		case IBV_WC_RDMA_WRITE:
			rdma_req = get_rdma_req_from_wc(rdma_qpair, &wc[i]);
			req = &rdma_req->req;

			SPDK_TRACELOG(SPDK_TRACE_RDMA, "RDMA WRITE Complete. Request: %p Connection: %p\n",
				      req, qpair);
			spdk_trace_record(TRACE_RDMA_WRITE_COMPLETE, 0, 0, (uint64_t)req, 0);

			/* Now that the write has completed, the data buffer can be released */
			request_release_buffer(req);

			rdma_qpair->cur_rdma_rw_depth--;

			/* Since an RDMA R/W operation completed, try to submit from the pending list. */
			rc = spdk_nvmf_rdma_handle_pending_rdma_rw(qpair);
			if (rc < 0) {
				error = true;
				continue;
			}
			count += rc;
			/* Try to process other queued requests */
			spdk_nvmf_rdma_qpair_process_pending(rtransport, rdma_qpair);
			break;

		case IBV_WC_RDMA_READ:
			rdma_req = get_rdma_req_from_wc(rdma_qpair, &wc[i]);
			req = &rdma_req->req;

			SPDK_TRACELOG(SPDK_TRACE_RDMA, "RDMA READ Complete. Request: %p Connection: %p\n",
				      req, qpair);
			spdk_trace_record(TRACE_RDMA_READ_COMPLETE, 0, 0, (uint64_t)req, 0);
			rc = spdk_nvmf_request_exec(req);
			if (rc) {
				error = true;
				continue;
			}
			count++;

			/* Since an RDMA R/W operation completed, try to submit from the pending list. */
			assert(rdma_req->state == RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER);
			rdma_qpair->cur_rdma_rw_depth--;
			rc = spdk_nvmf_rdma_handle_pending_rdma_rw(qpair);
			if (rc < 0) {
				error = true;
				continue;
			}
			count += rc;
			rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE;

			spdk_nvmf_rdma_request_process(rtransport, rdma_req);

			/* Try to process other queued requests */
			spdk_nvmf_rdma_qpair_process_pending(rtransport, rdma_qpair);
			break;

		case IBV_WC_RECV:
			rdma_recv = get_rdma_recv_from_wc(rdma_qpair, &wc[i]);

			rdma_qpair->cur_queue_depth++;
			if (rdma_qpair->cur_queue_depth > rdma_qpair->max_queue_depth) {
				SPDK_TRACELOG(SPDK_TRACE_RDMA,
					      "Temporarily exceeded maximum queue depth (%u). Queueing.\n",
					      rdma_qpair->cur_queue_depth);
			}
			SPDK_TRACELOG(SPDK_TRACE_RDMA,
				      "RDMA RECV Complete. Recv: %p Connection: %p Outstanding I/O: %d\n",
				      rdma_recv, qpair, rdma_qpair->cur_queue_depth);

			TAILQ_INSERT_TAIL(&rdma_qpair->incoming_queue, rdma_recv, link);
			rc = process_incoming_queue(rdma_qpair);
			if (rc < 0) {
				error = true;
				continue;
			}
			count += rc;

			/* Try to process other queued requests */
			spdk_nvmf_rdma_qpair_process_pending(rtransport, rdma_qpair);
			break;

		default:
			SPDK_ERRLOG("Received an unknown opcode on the CQ: %d\n", wc[i].opcode);
			error = true;
			continue;
		}
	}