Commit fdec444a authored by Philipp Skadorov's avatar Philipp Skadorov Committed by Jim Harris
Browse files

nvmf/rdma: track requests in any state



Requests that are being put into IBV context are lost when
IBV QP breaks and its SQ drains.

In order to track NVMf/RDMA requests, RDMA QP has been
reworked to track requests at any state with queues of
requests for each state.

This allowed to get rid of a few intermediate queues and
request counters.

A couple of states has been added to track outbound requests
with and without data. They will be used by QP recovery for
freeing resources assigned to outstanding requests.

Change-Id: Ie84207325c38e5bb2c247cd6dcddb82dfad0d503
Signed-off-by: default avatarPhilipp Skadorov <philipp.skadorov@wdc.com>
Reviewed-on: https://review.gerrithub.io/416878


Tested-by: default avatarSPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
parent 75646dbe
Loading
Loading
Loading
Loading
+156 −121
Original line number Diff line number Diff line
@@ -78,9 +78,9 @@ enum spdk_nvmf_rdma_request_state {
	RDMA_REQUEST_STATE_NEED_BUFFER,

	/* The request is waiting on RDMA queue depth availability
	 * to transfer data from the host to the controller.
	 * to transfer data between the host and the controller.
	 */
	RDMA_REQUEST_STATE_TRANSFER_PENDING_HOST_TO_CONTROLLER,
	RDMA_REQUEST_STATE_DATA_TRANSFER_PENDING,

	/* The request is currently transferring data from the host to the controller. */
	RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER,
@@ -94,19 +94,22 @@ enum spdk_nvmf_rdma_request_state {
	/* The request finished executing at the block device */
	RDMA_REQUEST_STATE_EXECUTED,

	/* The request is waiting on RDMA queue depth availability
	 * to transfer data from the controller to the host.
	 */
	RDMA_REQUEST_STATE_TRANSFER_PENDING_CONTROLLER_TO_HOST,

	/* The request is ready to send a completion */
	RDMA_REQUEST_STATE_READY_TO_COMPLETE,

	/* The request currently has a completion outstanding */
	/* The request is currently transferring data from the controller to the host. */
	RDMA_REQUEST_STATE_TRANSFERRING_CONTROLLER_TO_HOST,

	/* The request currently has an outstanding completion without an
	 * associated data transfer.
	 */
	RDMA_REQUEST_STATE_COMPLETING,

	/* The request completed and can be marked free. */
	RDMA_REQUEST_STATE_COMPLETED,

	/* Terminator */
	RDMA_REQUEST_NUM_STATES,
};

#define OBJECT_NVMF_RDMA_IO				0x40
@@ -114,13 +117,13 @@ enum spdk_nvmf_rdma_request_state {
#define									TRACE_GROUP_NVMF_RDMA 0x4
#define TRACE_RDMA_REQUEST_STATE_NEW					SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0x0)
#define TRACE_RDMA_REQUEST_STATE_NEED_BUFFER				SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0x1)
#define TRACE_RDMA_REQUEST_STATE_TRANSFER_PENDING_HOST_TO_CONTROLLER	SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0x2)
#define TRACE_RDMA_REQUEST_STATE_DATA_TRANSFER_PENDING			SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0x2)
#define TRACE_RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER	SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0x3)
#define TRACE_RDMA_REQUEST_STATE_READY_TO_EXECUTE			SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0x4)
#define TRACE_RDMA_REQUEST_STATE_EXECUTING				SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0x5)
#define TRACE_RDMA_REQUEST_STATE_EXECUTED				SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0x6)
#define TRACE_RDMA_REQUEST_STATE_TRANSFER_PENDING_CONTROLLER_TO_HOST	SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0x7)
#define TRACE_RDMA_REQUEST_STATE_READY_TO_COMPLETE			SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0x8)
#define TRACE_RDMA_REQUEST_STATE_READY_TO_COMPLETE			SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0x7)
#define TRACE_RDMA_REQUEST_STATE_TRANSFERRING_CONTROLLER_TO_HOST	SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0x8)
#define TRACE_RDMA_REQUEST_STATE_COMPLETING				SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0x9)
#define TRACE_RDMA_REQUEST_STATE_COMPLETED				SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0xA)

@@ -134,7 +137,7 @@ SPDK_TRACE_REGISTER_FN(nvmf_trace)
					TRACE_RDMA_REQUEST_STATE_NEED_BUFFER,
					OWNER_NONE, OBJECT_NVMF_RDMA_IO, 0, 0, 0, "");
	spdk_trace_register_description("RDMA_REQ_TX_PENDING_H_TO_C", "",
					TRACE_RDMA_REQUEST_STATE_TRANSFER_PENDING_HOST_TO_CONTROLLER,
					TRACE_RDMA_REQUEST_STATE_DATA_TRANSFER_PENDING,
					OWNER_NONE, OBJECT_NVMF_RDMA_IO, 0, 0, 0, "");
	spdk_trace_register_description("RDMA_REQ_TX_H_TO_C", "",
					TRACE_RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER,
@@ -148,13 +151,13 @@ SPDK_TRACE_REGISTER_FN(nvmf_trace)
	spdk_trace_register_description("RDMA_REQ_EXECUTED", "",
					TRACE_RDMA_REQUEST_STATE_EXECUTED,
					OWNER_NONE, OBJECT_NVMF_RDMA_IO, 0, 0, 0, "");
	spdk_trace_register_description("RDMA_REQ_TX_PENDING_C_TO_H", "",
					TRACE_RDMA_REQUEST_STATE_TRANSFER_PENDING_CONTROLLER_TO_HOST,
					OWNER_NONE, OBJECT_NVMF_RDMA_IO, 0, 0, 0, "");
	spdk_trace_register_description("RDMA_REQ_RDY_TO_COMPLETE", "",
					TRACE_RDMA_REQUEST_STATE_READY_TO_COMPLETE,
					OWNER_NONE, OBJECT_NVMF_RDMA_IO, 0, 0, 0, "");
	spdk_trace_register_description("RDMA_REQ_COMPLETING", "",
	spdk_trace_register_description("RDMA_REQ_COMPLETING_CONTROLLER_TO_HOST", "",
					TRACE_RDMA_REQUEST_STATE_TRANSFERRING_CONTROLLER_TO_HOST,
					OWNER_NONE, OBJECT_NVMF_RDMA_IO, 0, 0, 0, "");
	spdk_trace_register_description("RDMA_REQ_COMPLETING_INCAPSULE", "",
					TRACE_RDMA_REQUEST_STATE_COMPLETING,
					OWNER_NONE, OBJECT_NVMF_RDMA_IO, 0, 0, 0, "");
	spdk_trace_register_description("RDMA_REQ_COMPLETED", "",
@@ -201,6 +204,7 @@ struct spdk_nvmf_rdma_request {
	} data;

	TAILQ_ENTRY(spdk_nvmf_rdma_request)	link;
	TAILQ_ENTRY(spdk_nvmf_rdma_request)	state_link;
};

struct spdk_nvmf_rdma_qpair {
@@ -217,23 +221,14 @@ struct spdk_nvmf_rdma_qpair {
	/* The maximum number of active RDMA READ and WRITE operations at one time */
	uint16_t				max_rw_depth;

	/* The current number of I/O outstanding on this connection. This number
	 * includes all I/O from the time the capsule is first received until it is
	 * completed.
	 */
	uint16_t				cur_queue_depth;

	/* The number of RDMA READ and WRITE requests that are outstanding */
	uint16_t				cur_rdma_rw_depth;

	/* Receives that are waiting for a request object */
	TAILQ_HEAD(, spdk_nvmf_rdma_recv)	incoming_queue;

	/* Requests that are not in use */
	TAILQ_HEAD(, spdk_nvmf_rdma_request)	free_queue;
	/* Queues to track the requests in all states */
	TAILQ_HEAD(, spdk_nvmf_rdma_request)	state_queue[RDMA_REQUEST_NUM_STATES];

	/* Requests that are waiting to perform an RDMA READ or WRITE */
	TAILQ_HEAD(, spdk_nvmf_rdma_request)	pending_rdma_rw_queue;
	/* Number of requests in each state */
	uint32_t				state_cntr[RDMA_REQUEST_NUM_STATES];

	/* Array of size "max_queue_depth" containing RDMA requests. */
	struct spdk_nvmf_rdma_request		*reqs;
@@ -260,7 +255,6 @@ struct spdk_nvmf_rdma_qpair {
	struct ibv_mr				*bufs_mr;

	TAILQ_ENTRY(spdk_nvmf_rdma_qpair)	link;
	TAILQ_ENTRY(spdk_nvmf_rdma_qpair)	pending_link;

	/* Mgmt channel */
	struct spdk_io_channel			*mgmt_channel;
@@ -330,6 +324,21 @@ struct spdk_nvmf_rdma_mgmt_channel {
	TAILQ_HEAD(, spdk_nvmf_rdma_request)	pending_data_buf_queue;
};

static void spdk_nvmf_rdma_request_set_state(struct spdk_nvmf_rdma_request *rdma_req,
		enum spdk_nvmf_rdma_request_state	state)
{
	struct spdk_nvmf_qpair		*qpair;
	struct spdk_nvmf_rdma_qpair	*rqpair;

	qpair = rdma_req->req.qpair;
	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
	TAILQ_REMOVE(&rqpair->state_queue[rdma_req->state], rdma_req, state_link);
	rqpair->state_cntr[rdma_req->state]--;
	rdma_req->state = state;
	TAILQ_INSERT_TAIL(&rqpair->state_queue[rdma_req->state], rdma_req, state_link);
	rqpair->state_cntr[rdma_req->state]++;
}

static int
spdk_nvmf_rdma_mgmt_channel_create(void *io_device, void *ctx_buf)
{
@@ -467,6 +476,12 @@ spdk_nvmf_rdma_qpair_initialize(struct spdk_nvmf_qpair *qpair)
			      rqpair->bufs, rqpair->max_queue_depth * rtransport->in_capsule_data_size, rqpair->bufs_mr->lkey);
	}

	/* Initialise request state queues and counters of the queue pair */
	for (i = RDMA_REQUEST_STATE_FREE; i < RDMA_REQUEST_NUM_STATES; i++) {
		TAILQ_INIT(&rqpair->state_queue[i]);
		rqpair->state_cntr[i] = 0;
	}

	for (i = 0; i < rqpair->max_queue_depth; i++) {
		struct ibv_recv_wr *bad_wr = NULL;

@@ -528,7 +543,10 @@ spdk_nvmf_rdma_qpair_initialize(struct spdk_nvmf_qpair *qpair)
		rdma_req->data.wr.sg_list = rdma_req->data.sgl;
		rdma_req->data.wr.num_sge = SPDK_COUNTOF(rdma_req->data.sgl);

		TAILQ_INSERT_TAIL(&rqpair->free_queue, rdma_req, link);
		/* Initialize request state to FREE */
		rdma_req->state = RDMA_REQUEST_STATE_FREE;
		TAILQ_INSERT_TAIL(&rqpair->state_queue[rdma_req->state], rdma_req, state_link);
		rqpair->state_cntr[rdma_req->state]++;
	}

	return 0;
@@ -549,8 +567,6 @@ request_transfer_in(struct spdk_nvmf_request *req)

	assert(req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER);

	rqpair->cur_rdma_rw_depth++;

	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "RDMA READ POSTED. Request: %p Connection: %p\n", req, qpair);

	rdma_req->data.wr.opcode = IBV_WR_RDMA_READ;
@@ -558,19 +574,13 @@ request_transfer_in(struct spdk_nvmf_request *req)
	rc = ibv_post_send(rqpair->cm_id->qp, &rdma_req->data.wr, &bad_wr);
	if (rc) {
		SPDK_ERRLOG("Unable to transfer data from host to target\n");

		/* Decrement r/w counter back since data transfer
		 * has not started.
		 */
		rqpair->cur_rdma_rw_depth--;
		return -1;
	}

	return 0;
}

static int
request_transfer_out(struct spdk_nvmf_request *req)
request_transfer_out(struct spdk_nvmf_request *req, int *data_posted)
{
	int				rc;
	struct spdk_nvmf_rdma_request	*rdma_req;
@@ -580,6 +590,7 @@ request_transfer_out(struct spdk_nvmf_request *req)
	struct ibv_recv_wr		*bad_recv_wr = NULL;
	struct ibv_send_wr		*send_wr, *bad_send_wr = NULL;

	*data_posted = 0;
	qpair = req->qpair;
	rsp = &req->rsp->nvme_cpl;
	rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req);
@@ -614,10 +625,10 @@ request_transfer_out(struct spdk_nvmf_request *req)
	    req->xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "RDMA WRITE POSTED. Request: %p Connection: %p\n", req, qpair);

		rqpair->cur_rdma_rw_depth++;
		rdma_req->data.wr.opcode = IBV_WR_RDMA_WRITE;

		rdma_req->data.wr.next = send_wr;
		*data_posted = 1;
		send_wr = &rdma_req->data.wr;
	}

@@ -627,13 +638,6 @@ request_transfer_out(struct spdk_nvmf_request *req)
	rc = ibv_post_send(rqpair->cm_id->qp, send_wr, &bad_send_wr);
	if (rc) {
		SPDK_ERRLOG("Unable to send response capsule\n");

		if (rdma_req->data.wr.opcode == IBV_WR_RDMA_WRITE) {
			/* Decrement r/w counter back since data transfer
			 * has not started.
			 */
			rqpair->cur_rdma_rw_depth--;
		}
	}

	return rc;
@@ -768,9 +772,6 @@ nvmf_rdma_connect(struct spdk_nvmf_transport *transport, struct rdma_cm_event *e
	rqpair->cm_id = event->id;
	rqpair->qpair.transport = transport;
	TAILQ_INIT(&rqpair->incoming_queue);
	TAILQ_INIT(&rqpair->free_queue);
	TAILQ_INIT(&rqpair->pending_rdma_rw_queue);

	event->id->context = &rqpair->qpair;

	cb_fn(&rqpair->qpair);
@@ -1046,6 +1047,20 @@ spdk_nvmf_rdma_request_parse_sgl(struct spdk_nvmf_rdma_transport *rtransport,
	return -1;
}

static int
spdk_nvmf_rdma_cur_rw_depth(struct spdk_nvmf_rdma_qpair *rqpair)
{
	return rqpair->state_cntr[RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER] +
	       rqpair->state_cntr[RDMA_REQUEST_STATE_TRANSFERRING_CONTROLLER_TO_HOST];
}

static int
spdk_nvmf_rdma_cur_queue_depth(struct spdk_nvmf_rdma_qpair *rqpair)
{
	return rqpair->max_queue_depth -
	       rqpair->state_cntr[RDMA_REQUEST_STATE_FREE];
}

static bool
spdk_nvmf_rdma_request_process(struct spdk_nvmf_rdma_transport *rtransport,
			       struct spdk_nvmf_rdma_request *rdma_req)
@@ -1057,6 +1072,8 @@ spdk_nvmf_rdma_request_process(struct spdk_nvmf_rdma_transport *rtransport,
	struct spdk_nvmf_rdma_recv	*rdma_recv;
	enum spdk_nvmf_rdma_request_state prev_state;
	bool				progress = false;
	int				data_posted;
	int				cur_rdma_rw_depth;

	rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);
	device = rqpair->port->device;
@@ -1076,8 +1093,6 @@ spdk_nvmf_rdma_request_process(struct spdk_nvmf_rdma_transport *rtransport,
			break;
		case RDMA_REQUEST_STATE_NEW:
			spdk_trace_record(TRACE_RDMA_REQUEST_STATE_NEW, 0, 0, (uintptr_t)rdma_req, 0);

			rqpair->cur_queue_depth++;
			rdma_recv = rdma_req->recv;

			/* The first element of the SGL is the NVMe command */
@@ -1085,18 +1100,17 @@ spdk_nvmf_rdma_request_process(struct spdk_nvmf_rdma_transport *rtransport,
			memset(rdma_req->req.rsp, 0, sizeof(*rdma_req->req.rsp));

			TAILQ_REMOVE(&rqpair->incoming_queue, rdma_recv, link);
			TAILQ_REMOVE(&rqpair->free_queue, rdma_req, link);

			/* The next state transition depends on the data transfer needs of this request. */
			rdma_req->req.xfer = spdk_nvmf_rdma_request_get_xfer(rdma_req);

			/* If no data to transfer, ready to execute. */
			if (rdma_req->req.xfer == SPDK_NVME_DATA_NONE) {
				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE;
				spdk_nvmf_rdma_request_set_state(rdma_req, RDMA_REQUEST_STATE_READY_TO_EXECUTE);
				break;
			}

			rdma_req->state = RDMA_REQUEST_STATE_NEED_BUFFER;
			spdk_nvmf_rdma_request_set_state(rdma_req, RDMA_REQUEST_STATE_NEED_BUFFER);
			TAILQ_INSERT_TAIL(&rqpair->ch->pending_data_buf_queue, rdma_req, link);
			break;
		case RDMA_REQUEST_STATE_NEED_BUFFER:
@@ -1114,7 +1128,7 @@ spdk_nvmf_rdma_request_process(struct spdk_nvmf_rdma_transport *rtransport,
			if (rc < 0) {
				TAILQ_REMOVE(&rqpair->ch->pending_data_buf_queue, rdma_req, link);
				rsp->status.sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
				spdk_nvmf_rdma_request_set_state(rdma_req, RDMA_REQUEST_STATE_READY_TO_COMPLETE);
				break;
			}

@@ -1129,30 +1143,47 @@ spdk_nvmf_rdma_request_process(struct spdk_nvmf_rdma_transport *rtransport,
			 * arrive using in capsule data, we need to do a transfer from the host.
			 */
			if (rdma_req->req.xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER && rdma_req->data_from_pool) {
				rdma_req->state = RDMA_REQUEST_STATE_TRANSFER_PENDING_HOST_TO_CONTROLLER;
				TAILQ_INSERT_TAIL(&rqpair->pending_rdma_rw_queue, rdma_req, link);
				spdk_nvmf_rdma_request_set_state(rdma_req, RDMA_REQUEST_STATE_DATA_TRANSFER_PENDING);
				break;
			}

			rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE;
			spdk_nvmf_rdma_request_set_state(rdma_req, RDMA_REQUEST_STATE_READY_TO_EXECUTE);
			break;
		case RDMA_REQUEST_STATE_TRANSFER_PENDING_HOST_TO_CONTROLLER:
			spdk_trace_record(TRACE_RDMA_REQUEST_STATE_TRANSFER_PENDING_HOST_TO_CONTROLLER, 0, 0,
		case RDMA_REQUEST_STATE_DATA_TRANSFER_PENDING:
			spdk_trace_record(TRACE_RDMA_REQUEST_STATE_DATA_TRANSFER_PENDING, 0, 0,
					  (uintptr_t)rdma_req, 0);

			if (rdma_req != TAILQ_FIRST(&rqpair->pending_rdma_rw_queue)) {
			if (rdma_req != TAILQ_FIRST(&rqpair->state_queue[RDMA_REQUEST_STATE_DATA_TRANSFER_PENDING])) {
				/* This request needs to wait in line to perform RDMA */
				break;
			}
			cur_rdma_rw_depth = spdk_nvmf_rdma_cur_rw_depth(rqpair);

			if (cur_rdma_rw_depth >= rqpair->max_rw_depth) {
				/* R/W queue is full, need to wait */
				break;
			}

			if (rqpair->cur_rdma_rw_depth < rqpair->max_rw_depth) {
				TAILQ_REMOVE(&rqpair->pending_rdma_rw_queue, rdma_req, link);
				rdma_req->state = RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER;
			if (rdma_req->req.xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER) {
				rc = request_transfer_in(&rdma_req->req);
				if (rc) {
				if (!rc) {
					spdk_nvmf_rdma_request_set_state(rdma_req,
									 RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER);
				} else {
					rsp->status.sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
					rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
					spdk_nvmf_rdma_request_set_state(rdma_req,
									 RDMA_REQUEST_STATE_READY_TO_COMPLETE);
				}
			} else if (rdma_req->req.xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
				/* The data transfer will be kicked off from
				 * RDMA_REQUEST_STATE_READY_TO_COMPLETE state.
				 */
				spdk_nvmf_rdma_request_set_state(rdma_req,
								 RDMA_REQUEST_STATE_READY_TO_COMPLETE);
			} else {
				SPDK_ERRLOG("Cannot perform data transfer, unknown state: %u\n",
					    rdma_req->req.xfer);
				assert(0);
			}
			break;
		case RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER:
@@ -1163,7 +1194,7 @@ spdk_nvmf_rdma_request_process(struct spdk_nvmf_rdma_transport *rtransport,
			break;
		case RDMA_REQUEST_STATE_READY_TO_EXECUTE:
			spdk_trace_record(TRACE_RDMA_REQUEST_STATE_READY_TO_EXECUTE, 0, 0, (uintptr_t)rdma_req, 0);
			rdma_req->state = RDMA_REQUEST_STATE_EXECUTING;
			spdk_nvmf_rdma_request_set_state(rdma_req, RDMA_REQUEST_STATE_EXECUTING);
			spdk_nvmf_request_exec(&rdma_req->req);
			break;
		case RDMA_REQUEST_STATE_EXECUTING:
@@ -1174,31 +1205,26 @@ spdk_nvmf_rdma_request_process(struct spdk_nvmf_rdma_transport *rtransport,
		case RDMA_REQUEST_STATE_EXECUTED:
			spdk_trace_record(TRACE_RDMA_REQUEST_STATE_EXECUTED, 0, 0, (uintptr_t)rdma_req, 0);
			if (rdma_req->req.xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
				rdma_req->state = RDMA_REQUEST_STATE_TRANSFER_PENDING_CONTROLLER_TO_HOST;
				TAILQ_INSERT_TAIL(&rqpair->pending_rdma_rw_queue, rdma_req, link);
				spdk_nvmf_rdma_request_set_state(rdma_req, RDMA_REQUEST_STATE_DATA_TRANSFER_PENDING);
			} else {
				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
			}
			break;
		case RDMA_REQUEST_STATE_TRANSFER_PENDING_CONTROLLER_TO_HOST:
			spdk_trace_record(TRACE_RDMA_REQUEST_STATE_TRANSFER_PENDING_CONTROLLER_TO_HOST, 0, 0,
					  (uintptr_t)rdma_req, 0);
			if (rdma_req != TAILQ_FIRST(&rqpair->pending_rdma_rw_queue)) {
				/* This request needs to wait in line to perform RDMA */
				break;
			}

			if (rqpair->cur_rdma_rw_depth < rqpair->max_rw_depth) {
				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
				TAILQ_REMOVE(&rqpair->pending_rdma_rw_queue, rdma_req, link);
				spdk_nvmf_rdma_request_set_state(rdma_req, RDMA_REQUEST_STATE_READY_TO_COMPLETE);
			}
			break;
		case RDMA_REQUEST_STATE_READY_TO_COMPLETE:
			spdk_trace_record(TRACE_RDMA_REQUEST_STATE_READY_TO_COMPLETE, 0, 0, (uintptr_t)rdma_req, 0);
			rdma_req->state = RDMA_REQUEST_STATE_COMPLETING;

			rc = request_transfer_out(&rdma_req->req);
			rc = request_transfer_out(&rdma_req->req, &data_posted);
			assert(rc == 0); /* No good way to handle this currently */
			spdk_nvmf_rdma_request_set_state(rdma_req,
							 data_posted ?
							 RDMA_REQUEST_STATE_TRANSFERRING_CONTROLLER_TO_HOST :
							 RDMA_REQUEST_STATE_COMPLETING);
			break;
		case RDMA_REQUEST_STATE_TRANSFERRING_CONTROLLER_TO_HOST:
			spdk_trace_record(TRACE_RDMA_REQUEST_STATE_TRANSFERRING_CONTROLLER_TO_HOST, 0, 0,
					  (uintptr_t)rdma_req,
					  0);
			/* Some external code must kick a request into RDMA_REQUEST_STATE_COMPLETED
			 * to escape this state. */
			break;
		case RDMA_REQUEST_STATE_COMPLETING:
			spdk_trace_record(TRACE_RDMA_REQUEST_STATE_COMPLETING, 0, 0, (uintptr_t)rdma_req, 0);
@@ -1207,8 +1233,6 @@ spdk_nvmf_rdma_request_process(struct spdk_nvmf_rdma_transport *rtransport,
			break;
		case RDMA_REQUEST_STATE_COMPLETED:
			spdk_trace_record(TRACE_RDMA_REQUEST_STATE_COMPLETED, 0, 0, (uintptr_t)rdma_req, 0);
			assert(rqpair->cur_queue_depth > 0);
			rqpair->cur_queue_depth--;

			if (rdma_req->data_from_pool) {
				/* Put the buffer/s back in the pool */
@@ -1222,8 +1246,11 @@ spdk_nvmf_rdma_request_process(struct spdk_nvmf_rdma_transport *rtransport,
			rdma_req->req.length = 0;
			rdma_req->req.iovcnt = 0;
			rdma_req->req.data = NULL;
			rdma_req->state = RDMA_REQUEST_STATE_FREE;
			TAILQ_INSERT_TAIL(&rqpair->free_queue, rdma_req, link);
			spdk_nvmf_rdma_request_set_state(rdma_req, RDMA_REQUEST_STATE_FREE);
			break;
		case RDMA_REQUEST_NUM_STATES:
		default:
			assert(0);
			break;
		}

@@ -1683,6 +1710,22 @@ spdk_nvmf_process_cm_event(struct spdk_nvmf_transport *transport, new_qpair_fn c
	}
}

static bool
spdk_nvmf_rdma_qpair_is_idle(struct spdk_nvmf_qpair *qpair)
{
	int cur_queue_depth, cur_rdma_rw_depth;
	struct spdk_nvmf_rdma_qpair *rqpair;

	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
	cur_queue_depth = spdk_nvmf_rdma_cur_queue_depth(rqpair);
	cur_rdma_rw_depth = spdk_nvmf_rdma_cur_rw_depth(rqpair);

	if (cur_queue_depth == 0 && cur_rdma_rw_depth == 0) {
		return true;
	}
	return false;
}

static void
spdk_nvmf_process_ib_event(struct spdk_nvmf_rdma_device *device)
{
@@ -1949,7 +1992,7 @@ spdk_nvmf_rdma_request_complete(struct spdk_nvmf_request *req)
			struct spdk_nvmf_rdma_transport, transport);
	struct spdk_nvmf_rdma_request	*rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req);

	rdma_req->state = RDMA_REQUEST_STATE_EXECUTED;
	spdk_nvmf_rdma_request_set_state(rdma_req, RDMA_REQUEST_STATE_EXECUTED);
	spdk_nvmf_rdma_request_process(rtransport, rdma_req);

	return 0;
@@ -1968,15 +2011,17 @@ spdk_nvmf_rdma_qpair_process_pending(struct spdk_nvmf_rdma_transport *rtransport
	struct spdk_nvmf_rdma_recv	*rdma_recv, *recv_tmp;
	struct spdk_nvmf_rdma_request	*rdma_req, *req_tmp;

	/* We process I/O in the pending_rdma_rw queue at the highest priority. */
	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->pending_rdma_rw_queue, link, req_tmp) {
	/* We process I/O in the data transfer pending queue at the highest priority. */
	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->state_queue[RDMA_REQUEST_STATE_DATA_TRANSFER_PENDING],
			   state_link, req_tmp) {
		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false) {
			break;
		}
	}

	/* The second highest priority is I/O waiting on memory buffers. */
	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->ch->pending_data_buf_queue, link, req_tmp) {
	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->ch->pending_data_buf_queue, link,
			   req_tmp) {
		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false) {
			break;
		}
@@ -1984,14 +2029,13 @@ spdk_nvmf_rdma_qpair_process_pending(struct spdk_nvmf_rdma_transport *rtransport

	/* The lowest priority is processing newly received commands */
	TAILQ_FOREACH_SAFE(rdma_recv, &rqpair->incoming_queue, link, recv_tmp) {
		rdma_req = TAILQ_FIRST(&rqpair->free_queue);
		if (rdma_req == NULL) {
			/* Need to wait for more SEND completions */
		if (TAILQ_EMPTY(&rqpair->state_queue[RDMA_REQUEST_STATE_FREE])) {
			break;
		}

		rdma_req = TAILQ_FIRST(&rqpair->state_queue[RDMA_REQUEST_STATE_FREE]);
		rdma_req->recv = rdma_recv;
		rdma_req->state = RDMA_REQUEST_STATE_NEW;
		spdk_nvmf_rdma_request_set_state(rdma_req, RDMA_REQUEST_STATE_NEW);
		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false) {
			break;
		}
@@ -2037,6 +2081,15 @@ get_rdma_recv_from_wc(struct ibv_wc *wc)
	return rdma_recv;
}

#ifdef DEBUG
static int
spdk_nvmf_rdma_req_is_completing(struct spdk_nvmf_rdma_request *rdma_req)
{
	return rdma_req->state == RDMA_REQUEST_STATE_TRANSFERRING_CONTROLLER_TO_HOST ||
	       rdma_req->state == RDMA_REQUEST_STATE_COMPLETING;
}
#endif

static int
spdk_nvmf_rdma_poller_poll(struct spdk_nvmf_rdma_transport *rtransport,
			   struct spdk_nvmf_rdma_poller *rpoller)
@@ -2070,9 +2123,8 @@ spdk_nvmf_rdma_poller_poll(struct spdk_nvmf_rdma_transport *rtransport,
			rdma_req = get_rdma_req_from_wc(&wc[i]);
			rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);

			assert(rdma_req->state == RDMA_REQUEST_STATE_COMPLETING);
			rdma_req->state = RDMA_REQUEST_STATE_COMPLETED;

			assert(spdk_nvmf_rdma_req_is_completing(rdma_req));
			spdk_nvmf_rdma_request_set_state(rdma_req, RDMA_REQUEST_STATE_COMPLETED);
			spdk_nvmf_rdma_request_process(rtransport, rdma_req);

			count++;
@@ -2085,8 +2137,6 @@ spdk_nvmf_rdma_poller_poll(struct spdk_nvmf_rdma_transport *rtransport,
			rdma_req = get_rdma_req_from_wc(&wc[i]);
			rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);

			rqpair->cur_rdma_rw_depth--;

			/* Try to process other queued requests */
			spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair);
			break;
@@ -2096,9 +2146,7 @@ spdk_nvmf_rdma_poller_poll(struct spdk_nvmf_rdma_transport *rtransport,
			rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);

			assert(rdma_req->state == RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER);
			rqpair->cur_rdma_rw_depth--;
			rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE;

			spdk_nvmf_rdma_request_set_state(rdma_req, RDMA_REQUEST_STATE_READY_TO_EXECUTE);
			spdk_nvmf_rdma_request_process(rtransport, rdma_req);

			/* Try to process other queued requests */
@@ -2151,19 +2199,6 @@ spdk_nvmf_rdma_poll_group_poll(struct spdk_nvmf_transport_poll_group *group)
	return count;
}

static bool
spdk_nvmf_rdma_qpair_is_idle(struct spdk_nvmf_qpair *qpair)
{
	struct spdk_nvmf_rdma_qpair *rqpair;

	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);

	if (rqpair->cur_queue_depth == 0 && rqpair->cur_rdma_rw_depth == 0) {
		return true;
	}
	return false;
}

const struct spdk_nvmf_transport_ops spdk_nvmf_transport_rdma = {
	.type = SPDK_NVME_TRANSPORT_RDMA,
	.create = spdk_nvmf_rdma_create,