Commit 581e1bb5 authored by Alexey Marchuk's avatar Alexey Marchuk Committed by Tomasz Zawadzki
Browse files

nvme/rdma: Wait for completions of both RDMA RECV and SEND



In some situations we may get a completion of RDMA_RECV before
completion of RDMA_SEND and this can lead to a bug described in #1292
To avoid such situations we must complete nvme_request only when
we received both RMDA_RECV and RDMA_SEND completions.
Add a new field to spdk_nvme_rdma_req to store response idx -
it is used to complete nvme request when RDMA_RECV was completed
before RDMA_SEND
Repost RDMA_RECV when both RDMA_SEND and RDMA_RECV are completed
Side changes: change type of spdk_nvme_rdma_req::id to uint16_t,
repack struct nvme_rdma_qpair

Fixes #1292

Change-Id: Ie51fbbba425acf37c306c5af031479bc9de08955
Signed-off-by: default avatarAlexey Marchuk <alexeymar@mellanox.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/1770


Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-by: default avatar <dongx.yi@intel.com>
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
parent b96186ae
Loading
Loading
Loading
Loading
+48 −45
Original line number Diff line number Diff line
@@ -154,13 +154,14 @@ struct nvme_rdma_qpair {

	uint16_t				num_entries;

	bool					delay_cmd_submit;

	/* Parallel arrays of response buffers + response SGLs of size num_entries */
	struct ibv_sge				*rsp_sgls;
	struct spdk_nvme_cpl			*rsps;

	struct ibv_recv_wr			*rsp_recv_wrs;

	bool					delay_cmd_submit;
	struct spdk_nvme_send_wr_list		sends_to_post;
	struct spdk_nvme_recv_wr_list		recvs_to_post;

@@ -185,10 +186,19 @@ struct nvme_rdma_qpair {
	struct rdma_cm_event			*evt;
};

struct spdk_nvme_rdma_req {
	int					id;
enum NVME_RDMA_COMPLETION_FLAGS {
	NVME_RDMA_SEND_COMPLETED = 1u << 0,
	NVME_RDMA_RECV_COMPLETED = 1u << 1,
};

	bool					request_ready_to_put;
struct spdk_nvme_rdma_req {
	uint16_t				id;
	uint16_t				completion_flags: 2;
	uint16_t				reserved: 14;
	/* if completion of RDMA_RECV received before RDMA_SEND, we will complete nvme request
	 * during processing of RDMA_SEND. To complete the request we must know the index
	 * of nvme_cpl received in RDMA_RECV, so store it in this field */
	uint16_t				rsp_idx;

	struct ibv_send_wr			send_wr;

@@ -260,7 +270,7 @@ nvme_rdma_req_get(struct nvme_rdma_qpair *rqpair)
static void
nvme_rdma_req_put(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_req *rdma_req)
{
	rdma_req->request_ready_to_put = false;
	rdma_req->completion_flags = 0;
	TAILQ_REMOVE(&rqpair->outstanding_reqs, rdma_req, link);
	TAILQ_INSERT_HEAD(&rqpair->free_reqs, rdma_req, link);
}
@@ -684,7 +694,8 @@ fail:
static int
nvme_rdma_register_rsps(struct nvme_rdma_qpair *rqpair)
{
	int i, rc;
	uint16_t i;
	int rc;

	rqpair->rsp_mr = rdma_reg_msgs(rqpair->cm_id, rqpair->rsps,
				       rqpair->num_entries * sizeof(*rqpair->rsps));
@@ -750,7 +761,7 @@ nvme_rdma_free_reqs(struct nvme_rdma_qpair *rqpair)
static int
nvme_rdma_alloc_reqs(struct nvme_rdma_qpair *rqpair)
{
	int i;
	uint16_t i;

	rqpair->rdma_reqs = calloc(rqpair->num_entries, sizeof(struct spdk_nvme_rdma_req));
	if (rqpair->rdma_reqs == NULL) {
@@ -820,35 +831,6 @@ fail:
	return -ENOMEM;
}

static int
nvme_rdma_recv(struct nvme_rdma_qpair *rqpair, uint64_t rsp_idx, int *reaped)
{
	struct spdk_nvme_rdma_req *rdma_req;
	struct spdk_nvme_cpl *rsp;
	struct nvme_request *req;

	assert(rsp_idx < rqpair->num_entries);
	rsp = &rqpair->rsps[rsp_idx];
	rdma_req = &rqpair->rdma_reqs[rsp->cid];

	req = rdma_req->req;
	nvme_rdma_req_complete(req, rsp);

	if (rdma_req->request_ready_to_put) {
		(*reaped)++;
		nvme_rdma_req_put(rqpair, rdma_req);
	} else {
		rdma_req->request_ready_to_put = true;
	}

	if (nvme_rdma_post_recv(rqpair, rsp_idx)) {
		SPDK_ERRLOG("Unable to re-post rx descriptor\n");
		return -1;
	}

	return 0;
}

static int
nvme_rdma_resolve_addr(struct nvme_rdma_qpair *rqpair,
		       struct sockaddr *src_addr,
@@ -1979,6 +1961,14 @@ nvme_rdma_qpair_check_timeout(struct spdk_nvme_qpair *qpair)
	}
}

static inline int
nvme_rdma_request_ready(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_req *rdma_req)
{
	nvme_rdma_req_complete(rdma_req->req, &rqpair->rsps[rdma_req->rsp_idx]);
	nvme_rdma_req_put(rqpair, rdma_req);
	return nvme_rdma_post_recv(rqpair, rdma_req->rsp_idx);
}

#define MAX_COMPLETIONS_PER_POLL 128

static int
@@ -1988,10 +1978,12 @@ nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
	struct nvme_rdma_qpair		*rqpair = nvme_rdma_qpair(qpair);
	struct ibv_wc			wc[MAX_COMPLETIONS_PER_POLL];
	int				i, rc = 0, batch_size;
	uint32_t			reaped;
	uint32_t			reaped = 0;
	uint16_t			rsp_idx;
	struct ibv_cq			*cq;
	struct spdk_nvme_rdma_req	*rdma_req;
	struct nvme_rdma_ctrlr		*rctrlr;
	struct spdk_nvme_cpl		*rsp;

	if (max_completions == 0) {
		max_completions = rqpair->num_entries;
@@ -2011,7 +2003,6 @@ nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,

	cq = rqpair->cq;

	reaped = 0;
	do {
		batch_size = spdk_min((max_completions - reaped),
				      MAX_COMPLETIONS_PER_POLL);
@@ -2041,20 +2032,32 @@ nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
					goto fail;
				}

				if (nvme_rdma_recv(rqpair, wc[i].wr_id, &reaped)) {
					SPDK_ERRLOG("nvme_rdma_recv processing failure\n");
				assert(wc[i].wr_id < rqpair->num_entries);
				rsp_idx = (uint16_t)wc[i].wr_id;
				rsp = &rqpair->rsps[rsp_idx];
				rdma_req = &rqpair->rdma_reqs[rsp->cid];
				rdma_req->completion_flags |= NVME_RDMA_RECV_COMPLETED;
				rdma_req->rsp_idx = rsp_idx;

				if ((rdma_req->completion_flags & NVME_RDMA_SEND_COMPLETED) != 0) {
					if (spdk_unlikely(nvme_rdma_request_ready(rqpair, rdma_req))) {
						SPDK_ERRLOG("Unable to re-post rx descriptor\n");
						goto fail;
					}
					reaped++;
				}
				break;

			case IBV_WC_SEND:
				rdma_req = (struct spdk_nvme_rdma_req *)wc[i].wr_id;
				rdma_req->completion_flags |= NVME_RDMA_SEND_COMPLETED;

				if (rdma_req->request_ready_to_put) {
				if ((rdma_req->completion_flags & NVME_RDMA_RECV_COMPLETED) != 0) {
					if (spdk_unlikely(nvme_rdma_request_ready(rqpair, rdma_req))) {
						SPDK_ERRLOG("Unable to re-post rx descriptor\n");
						goto fail;
					}
					reaped++;
					nvme_rdma_req_put(rqpair, rdma_req);
				} else {
					rdma_req->request_ready_to_put = true;
				}
				break;