Commit 158dc947 authored by Seth Howell's avatar Seth Howell Committed by Jim Harris
Browse files

rdma: Make sure we don't submit too many WRs



Before, the number of WRs and the number of RDMA requests were linked by
a constant multiple. This is no longer the case so we need to make sure
that we don't overshoot the limit of WRs for the qpair.

Change-Id: I0eac75e96c25d78d0656e4b22747f15902acdab7
Signed-off-by: default avatarSeth Howell <seth.howell@intel.com>
Reviewed-on: https://review.gerrithub.io/c/439573


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
parent 6ee1f99b
Loading
Loading
Loading
Loading
+51 −4
Original line number Diff line number Diff line
@@ -264,6 +264,19 @@ struct spdk_nvmf_rdma_qpair {
	/* The maximum number of active RDMA READ and ATOMIC operations at one time */
	uint16_t				max_read_depth;

	/* The maximum number of RDMA send operations at one time */
	uint32_t				max_send_depth;

	/* The current number of outstanding WRs from this qpair's
	 * recv queue. Should not exceed device->attr.max_queue_depth.
	 */
	uint16_t				current_recv_depth;

	/* The current number of posted WRs from this qpair's
	 * send queue. Should not exceed max_send_depth.
	 */
	uint32_t				current_send_depth;

	/* The current number of active RDMA READ operations */
	uint16_t				current_read_depth;

@@ -752,6 +765,8 @@ spdk_nvmf_rdma_qpair_initialize(struct spdk_nvmf_qpair *qpair)

	rpoller->required_num_wr = required_num_wr;

	rqpair->max_send_depth = spdk_min((uint32_t)(rqpair->max_queue_depth * 2 + 1),
					  ibv_init_attr.cap.max_send_wr);
	rqpair->max_send_sge = spdk_min(NVMF_DEFAULT_TX_SGE, ibv_init_attr.cap.max_send_sge);
	rqpair->max_recv_sge = spdk_min(NVMF_DEFAULT_RX_SGE, ibv_init_attr.cap.max_recv_sge);
	spdk_trace_record(TRACE_RDMA_QP_CREATE, 0, 0, (uintptr_t)rqpair->cm_id, 0);
@@ -814,6 +829,7 @@ spdk_nvmf_rdma_qpair_initialize(struct spdk_nvmf_qpair *qpair)
		rqpair->state_cntr[i] = 0;
	}

	rqpair->current_recv_depth = rqpair->max_queue_depth;
	for (i = 0; i < rqpair->max_queue_depth; i++) {
		struct ibv_recv_wr *bad_wr = NULL;

@@ -844,6 +860,7 @@ spdk_nvmf_rdma_qpair_initialize(struct spdk_nvmf_qpair *qpair)
		rdma_recv->wr.sg_list = rdma_recv->sgl;

		rc = ibv_post_recv(rqpair->cm_id->qp, &rdma_recv->wr, &bad_wr);
		rqpair->current_recv_depth--;
		if (rc) {
			SPDK_ERRLOG("Unable to post capsule for RDMA RECV\n");
			spdk_nvmf_rdma_qpair_destroy(rqpair);
@@ -903,6 +920,7 @@ request_transfer_in(struct spdk_nvmf_request *req)
	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);

	assert(req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER);
	assert(rdma_req != NULL);

	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "RDMA READ POSTED. Request: %p Connection: %p\n", req, qpair);

@@ -912,6 +930,7 @@ request_transfer_in(struct spdk_nvmf_request *req)
		return -1;
	}
	rqpair->current_read_depth += rdma_req->num_outstanding_data_wr;
	rqpair->current_send_depth += rdma_req->num_outstanding_data_wr;
	return 0;
}

@@ -950,6 +969,7 @@ request_transfer_out(struct spdk_nvmf_request *req, int *data_posted)
		return rc;
	}
	rdma_req->recv = NULL;
	rqpair->current_recv_depth--;

	/* Build the response which consists of an optional
	 * RDMA WRITE to transfer data, plus an RDMA SEND
@@ -972,6 +992,8 @@ request_transfer_out(struct spdk_nvmf_request *req, int *data_posted)
		SPDK_ERRLOG("Unable to send response capsule\n");
		return rc;
	}
	/* +1 for the rsp wr */
	rqpair->current_send_depth += rdma_req->num_outstanding_data_wr + 1;

	return 0;
}
@@ -1538,9 +1560,9 @@ spdk_nvmf_rdma_request_process(struct spdk_nvmf_rdma_transport *rtransport,
			}

			if (rdma_req->req.xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER) {

				if (rqpair->current_read_depth >= rqpair->max_read_depth) {
					/* Read operation queue is full, need to wait */
				if (rqpair->current_send_depth + rdma_req->num_outstanding_data_wr > rqpair->max_send_depth
				    || rqpair->current_read_depth + rdma_req->num_outstanding_data_wr > rqpair->max_read_depth) {
					/* We can only have so many WRs outstanding. we have to wait until some finish. */
					break;
				}
				rc = request_transfer_in(&rdma_req->req);
@@ -1556,6 +1578,13 @@ spdk_nvmf_rdma_request_process(struct spdk_nvmf_rdma_transport *rtransport,
				/* The data transfer will be kicked off from
				 * RDMA_REQUEST_STATE_READY_TO_COMPLETE state.
				 */
				if ((rqpair->current_send_depth + rdma_req->num_outstanding_data_wr + 1) >
				    rqpair->max_send_depth) {
					/* We can only have so many WRs outstanding. we have to wait until some finish.
					* +1 since each request has an additional wr in the resp.
					* Check the recv queue since we have one in the recv as well */
					break;
				}
				spdk_nvmf_rdma_request_set_state(rdma_req,
								 RDMA_REQUEST_STATE_READY_TO_COMPLETE);
			} else {
@@ -2644,6 +2673,7 @@ spdk_nvmf_rdma_close_qpair(struct spdk_nvmf_qpair *qpair)
		assert(false);
		return;
	}
	rqpair->current_recv_depth--;

	rqpair->drain_send_wr.type = RDMA_WR_TYPE_DRAIN_SEND;
	send_wr.wr_id = (uintptr_t)&rqpair->drain_send_wr;
@@ -2654,6 +2684,7 @@ spdk_nvmf_rdma_close_qpair(struct spdk_nvmf_qpair *qpair)
		assert(false);
		return;
	}
	rqpair->current_send_depth++;
}

#ifdef DEBUG
@@ -2706,6 +2737,7 @@ spdk_nvmf_rdma_poller_poll(struct spdk_nvmf_rdma_transport *rtransport,
				/* We're going to attempt an error recovery, so force the request into
				 * the completed state. */
				spdk_nvmf_rdma_request_set_state(rdma_req, RDMA_REQUEST_STATE_COMPLETED);
				rqpair->current_send_depth--;
				spdk_nvmf_rdma_request_process(rtransport, rdma_req);
				break;
			case RDMA_WR_TYPE_RECV:
@@ -2715,6 +2747,9 @@ spdk_nvmf_rdma_poller_poll(struct spdk_nvmf_rdma_transport *rtransport,
				/* Dump this into the incoming queue. This gets cleaned up when
				 * the queue pair disconnects or recovers. */
				TAILQ_INSERT_TAIL(&rqpair->incoming_queue, rdma_recv, link);
				rqpair->current_recv_depth++;

				/* Don't worry about responding to recv overflow, we are disconnecting anyways */
				break;
			case RDMA_WR_TYPE_DATA:
				/* If the data transfer fails still force the queue into the error state,
@@ -2733,12 +2768,15 @@ spdk_nvmf_rdma_poller_poll(struct spdk_nvmf_rdma_transport *rtransport,
						spdk_nvmf_rdma_request_set_state(rdma_req, RDMA_REQUEST_STATE_COMPLETED);
					}
				}
				rqpair->current_send_depth--;
				break;
			case RDMA_WR_TYPE_DRAIN_RECV:
				rqpair = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvmf_rdma_qpair, drain_recv_wr);
				assert(rqpair->disconnect_flags & RDMA_QP_DISCONNECTING);
				SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Drained QP RECV %u (%p)\n", rqpair->qpair.qid, rqpair);
				rqpair->disconnect_flags |= RDMA_QP_RECV_DRAINED;
				rqpair->current_recv_depth++;
				/* Don't worry about responding to recv overflow, we are disconnecting anyways */
				if (rqpair->disconnect_flags & RDMA_QP_SEND_DRAINED) {
					spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair, true);
					spdk_nvmf_rdma_qpair_destroy(rqpair);
@@ -2750,6 +2788,7 @@ spdk_nvmf_rdma_poller_poll(struct spdk_nvmf_rdma_transport *rtransport,
				assert(rqpair->disconnect_flags & RDMA_QP_DISCONNECTING);
				SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Drained QP SEND %u (%p)\n", rqpair->qpair.qid, rqpair);
				rqpair->disconnect_flags |= RDMA_QP_SEND_DRAINED;
				rqpair->current_send_depth--;
				if (rqpair->disconnect_flags & RDMA_QP_RECV_DRAINED) {
					spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair, true);
					spdk_nvmf_rdma_qpair_destroy(rqpair);
@@ -2777,6 +2816,7 @@ spdk_nvmf_rdma_poller_poll(struct spdk_nvmf_rdma_transport *rtransport,
			assert(spdk_nvmf_rdma_req_is_completing(rdma_req));

			spdk_nvmf_rdma_request_set_state(rdma_req, RDMA_REQUEST_STATE_COMPLETED);
			rqpair->current_send_depth--;
			spdk_nvmf_rdma_request_process(rtransport, rdma_req);

			count++;
@@ -2789,6 +2829,7 @@ spdk_nvmf_rdma_poller_poll(struct spdk_nvmf_rdma_transport *rtransport,
			assert(rdma_wr->type == RDMA_WR_TYPE_DATA);
			rdma_req = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvmf_rdma_request, data.rdma_wr);
			rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);
			rqpair->current_send_depth--;

			/* Try to process other queued requests */
			spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair, false);
@@ -2798,6 +2839,7 @@ spdk_nvmf_rdma_poller_poll(struct spdk_nvmf_rdma_transport *rtransport,
			assert(rdma_wr->type == RDMA_WR_TYPE_DATA);
			rdma_req = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvmf_rdma_request, data.rdma_wr);
			rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);
			rqpair->current_send_depth--;

			assert(rdma_req->state == RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER);
			/* wait for all outstanding reads associated with the same rdma_req to complete before proceeding. */
@@ -2817,7 +2859,12 @@ spdk_nvmf_rdma_poller_poll(struct spdk_nvmf_rdma_transport *rtransport,
			assert(rdma_wr->type == RDMA_WR_TYPE_RECV);
			rdma_recv = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvmf_rdma_recv, rdma_wr);
			rqpair = rdma_recv->qpair;

			/* The qpair should not send more requests than are allowed per qpair. */
			if (rqpair->current_recv_depth >= rqpair->max_queue_depth) {
				spdk_nvmf_qpair_disconnect(&rqpair->qpair, NULL, NULL);
			} else {
				rqpair->current_recv_depth++;
			}
			TAILQ_INSERT_TAIL(&rqpair->incoming_queue, rdma_recv, link);
			/* Try to process other queued requests */
			spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair, false);