Commit 63732d88 authored by Seth Howell's avatar Seth Howell Committed by Tomasz Zawadzki
Browse files

lib/nvme: split cq completion processing to its own function.



This helps create a separation between processing a qpair and processing
a completion queue which can be shared across multiple qpairs.

Signed-off-by: default avatarSeth Howell <seth.howell@intel.com>
Change-Id: I111dd16ec4327854f232988a96891a65813f00e1
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/1166


Community-CI: Mellanox Build Bot
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarAleksey Marchuk <alexeymar@mellanox.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
parent 7b8964c5
Loading
Loading
Loading
Loading
+130 −88
Original line number Diff line number Diff line
@@ -173,6 +173,8 @@ struct nvme_rdma_qpair {

	bool					delay_cmd_submit;

	uint32_t				num_completions;

	/* Parallel arrays of response buffers + response SGLs of size num_entries */
	struct ibv_sge				*rsp_sgls;
	struct spdk_nvme_rdma_rsp		*rsps;
@@ -2068,69 +2070,62 @@ nvme_rdma_request_ready(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_re

#define MAX_COMPLETIONS_PER_POLL 128

static void
nvme_rdma_fail_qpair(struct spdk_nvme_qpair *qpair, int failure_reason)
{
	if (failure_reason == IBV_WC_RETRY_EXC_ERR) {
		qpair->transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_REMOTE;
	} else if (qpair->transport_failure_reason == SPDK_NVME_QPAIR_FAILURE_NONE) {
		qpair->transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_UNKNOWN;
	}

	nvme_ctrlr_disconnect_qpair(qpair);
}

static int
nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
				    uint32_t max_completions)
nvme_rdma_cq_process_completions(struct ibv_cq *cq, uint32_t batch_size)
{
	struct nvme_rdma_qpair		*rqpair = nvme_rdma_qpair(qpair);
	struct ibv_wc			wc[MAX_COMPLETIONS_PER_POLL];
	int				i, rc = 0, batch_size;
	uint32_t			reaped = 0;
	struct ibv_cq			*cq;
	struct nvme_rdma_qpair		*rqpair;
	struct spdk_nvme_rdma_req	*rdma_req;
	struct spdk_nvme_rdma_rsp	*rdma_rsp;
	struct nvme_rdma_wr		*rdma_wr;
	struct nvme_rdma_ctrlr		*rctrlr;

	if (max_completions == 0) {
		max_completions = rqpair->num_entries;
	} else {
		max_completions = spdk_min(max_completions, rqpair->num_entries);
	}

	if (nvme_qpair_is_admin_queue(&rqpair->qpair)) {
		rctrlr = nvme_rdma_ctrlr(rqpair->qpair.ctrlr);
		nvme_rdma_poll_events(rctrlr);
	}
	nvme_rdma_qpair_process_cm_event(rqpair);

	if (spdk_unlikely(qpair->transport_failure_reason != SPDK_NVME_QPAIR_FAILURE_NONE)) {
		goto fail;
	}

	cq = rqpair->cq;
	uint32_t			reaped = 0;
	int				completion_rc = 0;
	int				rc, i;

	do {
		batch_size = spdk_min((max_completions - reaped),
				      MAX_COMPLETIONS_PER_POLL);
	rc = ibv_poll_cq(cq, batch_size, wc);
	if (rc < 0) {
		SPDK_ERRLOG("Error polling CQ! (%d): %s\n",
			    errno, spdk_strerror(errno));
			goto fail;
		return -ECANCELED;
	} else if (rc == 0) {
			/* Ran out of completions */
			break;
		return 0;
	}

	for (i = 0; i < rc; i++) {
		rdma_wr = (struct nvme_rdma_wr *)wc[i].wr_id;
		switch (rdma_wr->type) {
		case RDMA_WR_TYPE_RECV:
			rdma_rsp = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvme_rdma_rsp, rdma_wr);
			rqpair = rdma_rsp->rqpair;

			if (wc[i].status) {
				SPDK_ERRLOG("CQ error on Queue Pair %p, Response Index %lu (%d): %s\n",
					    qpair, wc[i].wr_id, wc[i].status, ibv_wc_status_str(wc[i].status));
				goto fail;
					    rqpair, wc[i].wr_id, wc[i].status, ibv_wc_status_str(wc[i].status));
				nvme_rdma_fail_qpair(&rqpair->qpair, 0);
				completion_rc = -ENXIO;
				continue;
			}

			switch (rdma_wr->type) {
			case RDMA_WR_TYPE_RECV:
				rdma_rsp = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvme_rdma_rsp, rdma_wr);
			SPDK_DEBUGLOG(SPDK_LOG_NVME, "CQ recv completion\n");

			if (wc[i].byte_len < sizeof(struct spdk_nvme_cpl)) {
				SPDK_ERRLOG("recv length %u less than expected response size\n", wc[i].byte_len);
					goto fail;
				nvme_rdma_fail_qpair(&rqpair->qpair, 0);
				completion_rc = -ENXIO;
				continue;
			}

			rdma_req = &rqpair->rdma_reqs[rdma_rsp->cpl.cid];
			rdma_req->completion_flags |= NVME_RDMA_RECV_COMPLETED;
			rdma_req->rsp_idx = rdma_rsp->idx;
@@ -2138,63 +2133,110 @@ nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
			if ((rdma_req->completion_flags & NVME_RDMA_SEND_COMPLETED) != 0) {
				if (spdk_unlikely(nvme_rdma_request_ready(rqpair, rdma_req))) {
					SPDK_ERRLOG("Unable to re-post rx descriptor\n");
						goto fail;
					nvme_rdma_fail_qpair(&rqpair->qpair, 0);
					completion_rc = -ENXIO;
					continue;
				}
				reaped++;
				rqpair->num_completions++;
			}
			break;

		case RDMA_WR_TYPE_SEND:
			rdma_req = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvme_rdma_req, rdma_wr);
			rqpair = nvme_rdma_qpair(rdma_req->req->qpair);
			rdma_req->completion_flags |= NVME_RDMA_SEND_COMPLETED;

			if (wc[i].status) {
				SPDK_ERRLOG("CQ error on Queue Pair %p, Response Index %lu (%d): %s\n",
					    rqpair, wc[i].wr_id, wc[i].status, ibv_wc_status_str(wc[i].status));
				nvme_rdma_fail_qpair(&rqpair->qpair, 0);
				completion_rc = -ENXIO;
				continue;
			}

			if ((rdma_req->completion_flags & NVME_RDMA_RECV_COMPLETED) != 0) {
				if (spdk_unlikely(nvme_rdma_request_ready(rqpair, rdma_req))) {
					SPDK_ERRLOG("Unable to re-post rx descriptor\n");
						goto fail;
					nvme_rdma_fail_qpair(&rqpair->qpair, 0);
					completion_rc = -ENXIO;
					continue;
				}
				reaped++;
				rqpair->num_completions++;
			}
			break;

		default:
			SPDK_ERRLOG("Received an unexpected opcode on the CQ: %d\n", rdma_wr->type);
				goto fail;
			return -ECANCELED;
		}
	}
	} while (reaped < max_completions);

	if (spdk_unlikely(nvme_rdma_qpair_submit_sends(rqpair) ||
			  nvme_rdma_qpair_submit_recvs(rqpair))) {
		goto fail;
	if (completion_rc) {
		return completion_rc;
	}

	if (spdk_unlikely(rqpair->qpair.ctrlr->timeout_enabled)) {
		nvme_rdma_qpair_check_timeout(qpair);
	return reaped;
}

	return reaped;
static int
nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
				    uint32_t max_completions)
{
	struct nvme_rdma_qpair		*rqpair = nvme_rdma_qpair(qpair);
	int				rc = 0, batch_size;
	struct ibv_cq			*cq;
	struct nvme_rdma_ctrlr		*rctrlr;

fail:
	/*
	 * Since admin queues take the ctrlr_lock before entering this function,
	 * we can call nvme_transport_ctrlr_disconnect_qpair. For other qpairs we need
	 * to call the generic function which will take the lock for us.
	 */
	if (rc == IBV_WC_RETRY_EXC_ERR) {
		qpair->transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_REMOTE;
	} else if (qpair->transport_failure_reason == SPDK_NVME_QPAIR_FAILURE_NONE) {
		qpair->transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_UNKNOWN;
	if (max_completions == 0) {
		max_completions = rqpair->num_entries;
	} else {
		max_completions = spdk_min(max_completions, rqpair->num_entries);
	}

	if (nvme_qpair_is_admin_queue(qpair)) {
		nvme_transport_ctrlr_disconnect_qpair(qpair->ctrlr, qpair);
	} else {
		nvme_ctrlr_disconnect_qpair(qpair);
	if (nvme_qpair_is_admin_queue(&rqpair->qpair)) {
		rctrlr = nvme_rdma_ctrlr(rqpair->qpair.ctrlr);
		nvme_rdma_poll_events(rctrlr);
	}
	nvme_rdma_qpair_process_cm_event(rqpair);

	if (spdk_unlikely(qpair->transport_failure_reason != SPDK_NVME_QPAIR_FAILURE_NONE)) {
		nvme_rdma_fail_qpair(qpair, 0);
		return -ENXIO;
	}

	cq = rqpair->cq;

	rqpair->num_completions = 0;
	do {
		batch_size = spdk_min((max_completions - rqpair->num_completions), MAX_COMPLETIONS_PER_POLL);
		rc = nvme_rdma_cq_process_completions(cq, batch_size);

		if (rc == 0) {
			break;
			/* Handle the case where we fail to poll the cq. */
		} else if (rc == -ECANCELED) {
			nvme_rdma_fail_qpair(qpair, 0);
			return -ENXIO;
		} else if (rc == -ENXIO) {
			return rc;
		}
	} while (rqpair->num_completions < max_completions);

	if (spdk_unlikely(nvme_rdma_qpair_submit_sends(rqpair) ||
			  nvme_rdma_qpair_submit_recvs(rqpair))) {
		nvme_rdma_fail_qpair(qpair, 0);
		return -ENXIO;
	}

	if (spdk_unlikely(rqpair->qpair.ctrlr->timeout_enabled)) {
		nvme_rdma_qpair_check_timeout(qpair);
	}

	return rqpair->num_completions;
}

static uint32_t
nvme_rdma_ctrlr_get_max_xfer_size(struct spdk_nvme_ctrlr *ctrlr)
{