Commit 1db3a037 authored by Ben Walker's avatar Ben Walker
Browse files

nvmf/rdma: Do not assume acks are ordered.



The RDMA protocol this module uses is strictly ordered,
which means messages are delivered in exactly the order
they are sent. However, we have detected a number of
cases where the acknowledgements for those messages
arrive out of order. This patch attempts to handle
that case.

Separate the data required to post a recv from the
data required to send a response. If a recv arrives
when no response object is available, queue the
recv.

Change-Id: I2d6f2f8636b820d0c746505e5a5e3d3442ce5ba4
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
parent 4a745801
Loading
Loading
Loading
Loading
+208 −75
Original line number Diff line number Diff line
@@ -66,16 +66,31 @@ struct spdk_nvmf_rdma_buf {
	SLIST_ENTRY(spdk_nvmf_rdma_buf) link;
};

struct spdk_nvmf_rdma_request {
	struct spdk_nvmf_request		req;
/* This structure holds commands as they are received off the wire.
 * It must be dynamically paired with a full request object
 * (spdk_nvmf_rdma_request) to service a request. It is separate
 * from the request because RDMA does not appear to order
 * completions, so occasionally we'll get a new incoming
 * command when there aren't any free request objects.
 */
struct spdk_nvmf_rdma_recv {
	struct ibv_recv_wr		wr;
	struct ibv_sge			sgl[NVMF_DEFAULT_RX_SGE];

	/* In Capsule data buffer */
	/* In-capsule data buffer */
	uint8_t				*buf;

	struct {
		struct ibv_recv_wr		wr;
		struct ibv_sge			sgl[NVMF_DEFAULT_RX_SGE];
	} cmd;
	TAILQ_ENTRY(spdk_nvmf_rdma_recv) link;

#ifdef DEBUG
	bool				in_use;
#endif
};

struct spdk_nvmf_rdma_request {
	struct spdk_nvmf_request		req;

	struct spdk_nvmf_rdma_recv		*recv;

	struct {
		struct	ibv_send_wr		wr;
@@ -111,6 +126,12 @@ struct spdk_nvmf_rdma_conn {
	/* The number of RDMA READ and WRITE requests that are outstanding */
	uint16_t				cur_rdma_rw_depth;

	/* Receives that are waiting for a request object */
	TAILQ_HEAD(, spdk_nvmf_rdma_recv)	incoming_queue;

	/* Requests that are not in use */
	TAILQ_HEAD(, spdk_nvmf_rdma_request)	free_queue;

	/* Requests that are waiting to obtain a data buffer */
	TAILQ_HEAD(, spdk_nvmf_rdma_request)	pending_data_buf_queue;

@@ -120,6 +141,9 @@ struct spdk_nvmf_rdma_conn {
	/* Array of size "max_queue_depth" containing RDMA requests. */
	struct spdk_nvmf_rdma_request		*reqs;

	/* Array of size "max_queue_depth" containing RDMA recvs. */
	struct spdk_nvmf_rdma_recv		*recvs;

	/* Array of size "max_queue_depth" containing 64 byte capsules
	 * used for receive.
	 */
@@ -203,7 +227,8 @@ get_rdma_sess(struct spdk_nvmf_session *sess)
			session));
}

static int nvmf_post_rdma_recv(struct spdk_nvmf_request *req);
static int nvmf_post_rdma_recv(struct spdk_nvmf_rdma_conn *rdma_conn,
			       struct spdk_nvmf_rdma_recv *rdma_recv);

static void
spdk_nvmf_rdma_conn_destroy(struct spdk_nvmf_rdma_conn *rdma_conn)
@@ -245,6 +270,7 @@ spdk_nvmf_rdma_conn_create(struct rdma_cm_id *id, struct ibv_comp_channel *chann
	struct spdk_nvmf_conn		*conn;
	int				rc, i;
	struct ibv_qp_init_attr		attr;
	struct spdk_nvmf_rdma_recv	*rdma_recv;
	struct spdk_nvmf_rdma_request	*rdma_req;

	rdma_conn = calloc(1, sizeof(struct spdk_nvmf_rdma_conn));
@@ -255,6 +281,8 @@ spdk_nvmf_rdma_conn_create(struct rdma_cm_id *id, struct ibv_comp_channel *chann

	rdma_conn->max_queue_depth = max_queue_depth;
	rdma_conn->max_rw_depth = max_rw_depth;
	TAILQ_INIT(&rdma_conn->incoming_queue);
	TAILQ_INIT(&rdma_conn->free_queue);
	TAILQ_INIT(&rdma_conn->pending_data_buf_queue);
	TAILQ_INIT(&rdma_conn->pending_rdma_rw_queue);

@@ -300,13 +328,15 @@ spdk_nvmf_rdma_conn_create(struct rdma_cm_id *id, struct ibv_comp_channel *chann
	SPDK_TRACELOG(SPDK_TRACE_RDMA, "New RDMA Connection: %p\n", conn);

	rdma_conn->reqs = calloc(max_queue_depth, sizeof(*rdma_conn->reqs));
	rdma_conn->recvs = calloc(max_queue_depth, sizeof(*rdma_conn->recvs));
	rdma_conn->cmds = spdk_zmalloc(max_queue_depth * sizeof(*rdma_conn->cmds),
				       0x1000, NULL);
	rdma_conn->cpls = spdk_zmalloc(max_queue_depth * sizeof(*rdma_conn->cpls),
				       0x1000, NULL);
	rdma_conn->bufs = spdk_zmalloc(max_queue_depth * g_rdma.in_capsule_data_size,
				       0x1000, NULL);
	if (!rdma_conn->reqs || !rdma_conn->cmds || !rdma_conn->cpls || !rdma_conn->bufs) {
	if (!rdma_conn->reqs || !rdma_conn->recvs || !rdma_conn->cmds ||
	    !rdma_conn->cpls || !rdma_conn->bufs) {
		SPDK_ERRLOG("Unable to allocate sufficient memory for RDMA queue.\n");
		spdk_nvmf_rdma_conn_destroy(rdma_conn);
		return NULL;
@@ -335,30 +365,38 @@ spdk_nvmf_rdma_conn_create(struct rdma_cm_id *id, struct ibv_comp_channel *chann
		      rdma_conn->bufs, max_queue_depth * g_rdma.in_capsule_data_size, rdma_conn->bufs_mr->lkey);

	for (i = 0; i < max_queue_depth; i++) {
		rdma_req = &rdma_conn->reqs[i];
		rdma_req->buf = (void *)((uintptr_t)rdma_conn->bufs + (i * g_rdma.in_capsule_data_size));
		rdma_req->req.conn = &rdma_conn->conn;
		rdma_recv = &rdma_conn->recvs[i];

		/* Set up memory to receive commands */
		rdma_req->req.cmd = &rdma_conn->cmds[i];
		rdma_recv->buf = (void *)((uintptr_t)rdma_conn->bufs + (i * g_rdma.in_capsule_data_size));

		rdma_req->cmd.sgl[0].addr = (uintptr_t)&rdma_conn->cmds[i];
		rdma_req->cmd.sgl[0].length = sizeof(rdma_conn->cmds[i]);
		rdma_req->cmd.sgl[0].lkey = rdma_conn->cmds_mr->lkey;
		rdma_recv->sgl[0].addr = (uintptr_t)&rdma_conn->cmds[i];
		rdma_recv->sgl[0].length = sizeof(rdma_conn->cmds[i]);
		rdma_recv->sgl[0].lkey = rdma_conn->cmds_mr->lkey;

		rdma_req->cmd.sgl[1].addr = (uintptr_t)rdma_req->buf;
		rdma_req->cmd.sgl[1].length = g_rdma.in_capsule_data_size;
		rdma_req->cmd.sgl[1].lkey = rdma_conn->bufs_mr->lkey;
		rdma_recv->sgl[1].addr = (uintptr_t)rdma_recv->buf;
		rdma_recv->sgl[1].length = g_rdma.in_capsule_data_size;
		rdma_recv->sgl[1].lkey = rdma_conn->bufs_mr->lkey;

		rdma_req->cmd.wr.wr_id = (uintptr_t)rdma_req;
		rdma_req->cmd.wr.sg_list = rdma_req->cmd.sgl;
		rdma_req->cmd.wr.num_sge = SPDK_COUNTOF(rdma_req->cmd.sgl);
		rdma_recv->wr.wr_id = (uintptr_t)rdma_recv;
		rdma_recv->wr.sg_list = rdma_recv->sgl;
		rdma_recv->wr.num_sge = SPDK_COUNTOF(rdma_recv->sgl);
#ifdef DEBUG
		rdma_recv->in_use = false;
#endif

		if (nvmf_post_rdma_recv(&rdma_req->req)) {
		if (nvmf_post_rdma_recv(rdma_conn, rdma_recv)) {
			SPDK_ERRLOG("Unable to post capsule for RDMA RECV\n");
			spdk_nvmf_rdma_conn_destroy(rdma_conn);
			return NULL;
		}
	}

	for (i = 0; i < max_queue_depth; i++) {
		rdma_req = &rdma_conn->reqs[i];

		rdma_req->req.conn = &rdma_conn->conn;
		rdma_req->req.cmd = NULL;

		/* Set up memory to send responses */
		rdma_req->req.rsp = &rdma_conn->cpls[i];
@@ -380,6 +418,8 @@ spdk_nvmf_rdma_conn_create(struct rdma_cm_id *id, struct ibv_comp_channel *chann
		rdma_req->data.wr.send_flags = IBV_SEND_SIGNALED;
		rdma_req->data.wr.sg_list = rdma_req->data.sgl;
		rdma_req->data.wr.num_sge = SPDK_COUNTOF(rdma_req->data.sgl);

		TAILQ_INSERT_TAIL(&rdma_conn->free_queue, rdma_req, link);
	}

	return rdma_conn;
@@ -468,17 +508,15 @@ nvmf_post_rdma_write(struct spdk_nvmf_request *req)
}

static int
nvmf_post_rdma_recv(struct spdk_nvmf_request *req)
nvmf_post_rdma_recv(struct spdk_nvmf_rdma_conn *rdma_conn,
		    struct spdk_nvmf_rdma_recv *rdma_recv)
{
	struct ibv_recv_wr *bad_wr = NULL;
	struct spdk_nvmf_conn *conn = req->conn;
	struct spdk_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn);
	struct spdk_nvmf_rdma_request *rdma_req = get_rdma_req(req);
	int rc;

	SPDK_TRACELOG(SPDK_TRACE_RDMA, "RDMA RECV POSTED. Request: %p Connection: %p\n", req, conn);
	SPDK_TRACELOG(SPDK_TRACE_RDMA, "RDMA RECV POSTED. Recv: %p Connection: %p\n", rdma_recv, rdma_conn);

	rc = ibv_post_recv(rdma_conn->cm_id->qp, &rdma_req->cmd.wr, &bad_wr);
	rc = ibv_post_recv(rdma_conn->cm_id->qp, &rdma_recv->wr, &bad_wr);
	if (rc) {
		SPDK_ERRLOG("Failure posting rdma recv, rc = 0x%x\n", rc);
	}
@@ -559,6 +597,8 @@ spdk_nvmf_rdma_request_send_completion(struct spdk_nvmf_request *req)
{
	int rc;
	struct spdk_nvmf_conn		*conn = req->conn;
	struct spdk_nvmf_rdma_conn	*rdma_conn = get_rdma_conn(conn);
	struct spdk_nvmf_rdma_request	*rdma_req = get_rdma_req(req);
	struct spdk_nvme_cpl		*rsp = &req->rsp->nvme_cpl;
	struct spdk_nvmf_rdma_session	*rdma_sess;
	struct spdk_nvmf_rdma_buf	*buf;
@@ -582,11 +622,17 @@ spdk_nvmf_rdma_request_send_completion(struct spdk_nvmf_request *req)
	rsp->sqhd = conn->sq_head;

	/* Post the capsule to the recv buffer */
	rc = nvmf_post_rdma_recv(req);
	assert(rdma_req->recv != NULL);
#ifdef DEBUG
	assert(rdma_req->recv->in_use == true);
	rdma_req->recv->in_use = false;
#endif
	rc = nvmf_post_rdma_recv(rdma_conn, rdma_req->recv);
	if (rc) {
		SPDK_ERRLOG("Unable to re-post rx descriptor\n");
		return rc;
	}
	rdma_req->recv = NULL;

	/* Send the completion */
	rc = nvmf_post_rdma_send(req);
@@ -856,7 +902,7 @@ spdk_nvmf_request_prep_data(struct spdk_nvmf_request *req)
		} else {
			/* Use the in capsule data buffer, even though this isn't in capsule data */
			SPDK_TRACELOG(SPDK_TRACE_RDMA, "Request using in capsule buffer for non-capsule data\n");
			req->data = rdma_req->buf;
			req->data = rdma_req->recv->buf;
		}
		if (req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER) {
			return SPDK_NVMF_REQUEST_PREP_PENDING_DATA;
@@ -891,7 +937,7 @@ spdk_nvmf_request_prep_data(struct spdk_nvmf_request *req)
			return SPDK_NVMF_REQUEST_PREP_READY;
		}

		req->data = rdma_req->buf + offset;
		req->data = rdma_req->recv->buf + offset;
		req->length = sgl->unkeyed.length;
		return SPDK_NVMF_REQUEST_PREP_READY;
	}
@@ -1366,6 +1412,104 @@ spdk_nvmf_rdma_close_conn(struct spdk_nvmf_conn *conn)
	spdk_nvmf_rdma_conn_destroy(get_rdma_conn(conn));
}

static int
process_incoming_queue(struct spdk_nvmf_rdma_conn *rdma_conn)
{
	struct spdk_nvmf_rdma_recv	*rdma_recv, *tmp;
	struct spdk_nvmf_rdma_request	*rdma_req;
	struct spdk_nvmf_request	*req;
	int rc, count;
	bool error = false;

	count = 0;
	TAILQ_FOREACH_SAFE(rdma_recv, &rdma_conn->incoming_queue, link, tmp) {
		rdma_req = TAILQ_FIRST(&rdma_conn->free_queue);
		if (rdma_req == NULL) {
			/* Need to wait for more SEND completions */
			break;
		}
		TAILQ_REMOVE(&rdma_conn->free_queue, rdma_req, link);
		TAILQ_REMOVE(&rdma_conn->incoming_queue, rdma_recv, link);
		rdma_req->recv = rdma_recv;
		req = &rdma_req->req;

		/* The first element of the SGL is the NVMe command */
		req->cmd = (union nvmf_h2c_msg *)rdma_recv->sgl[0].addr;

		spdk_trace_record(TRACE_NVMF_IO_START, 0, 0, (uint64_t)req, 0);

		memset(req->rsp, 0, sizeof(*req->rsp));
		rc = spdk_nvmf_request_prep_data(req);
		switch (rc) {
		case SPDK_NVMF_REQUEST_PREP_READY:
			SPDK_TRACELOG(SPDK_TRACE_RDMA, "Request %p is ready for execution\n", req);
			/* Data is immediately available */
			rc = spdk_nvmf_request_exec(req);
			if (rc < 0) {
				error = true;
				continue;
			}
			count++;
			break;
		case SPDK_NVMF_REQUEST_PREP_PENDING_BUFFER:
			SPDK_TRACELOG(SPDK_TRACE_RDMA, "Request %p needs data buffer\n", req);
			TAILQ_INSERT_TAIL(&rdma_conn->pending_data_buf_queue, rdma_req, link);
			break;
		case SPDK_NVMF_REQUEST_PREP_PENDING_DATA:
			SPDK_TRACELOG(SPDK_TRACE_RDMA, "Request %p needs data transfer\n", req);
			rc = spdk_nvmf_rdma_request_transfer_data(req);
			if (rc < 0) {
				error = true;
				continue;
			}
			break;
		case SPDK_NVMF_REQUEST_PREP_ERROR:
			spdk_nvmf_request_complete(req);
			break;
		}
	}

	if (error) {
		return -1;
	}

	return count;
}

static struct spdk_nvmf_rdma_request *
get_rdma_req_from_wc(struct spdk_nvmf_rdma_conn *rdma_conn,
		     struct ibv_wc *wc)
{
	struct spdk_nvmf_rdma_request *rdma_req;

	rdma_req = (struct spdk_nvmf_rdma_request *)wc->wr_id;
	assert(rdma_req != NULL);
	assert(rdma_req - rdma_conn->reqs >= 0);
	assert(rdma_req - rdma_conn->reqs < (ptrdiff_t)rdma_conn->max_queue_depth);

	return rdma_req;
}

static struct spdk_nvmf_rdma_recv *
get_rdma_recv_from_wc(struct spdk_nvmf_rdma_conn *rdma_conn,
		      struct ibv_wc *wc)
{
	struct spdk_nvmf_rdma_recv *rdma_recv;

	assert(wc->byte_len >= sizeof(struct spdk_nvmf_capsule_cmd));

	rdma_recv = (struct spdk_nvmf_rdma_recv *)wc->wr_id;
	assert(rdma_recv != NULL);
	assert(rdma_recv - rdma_conn->recvs >= 0);
	assert(rdma_recv - rdma_conn->recvs < (ptrdiff_t)rdma_conn->max_queue_depth);
#ifdef DEBUG
	assert(rdma_recv->in_use == false);
	rdma_recv->in_use = true;
#endif

	return rdma_recv;
}

/* Returns the number of times that spdk_nvmf_request_exec was called,
 * or -1 on error.
 */
@@ -1375,6 +1519,7 @@ spdk_nvmf_rdma_poll(struct spdk_nvmf_conn *conn)
	struct ibv_wc wc[32];
	struct spdk_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn);
	struct spdk_nvmf_rdma_request *rdma_req;
	struct spdk_nvmf_rdma_recv    *rdma_recv;
	struct spdk_nvmf_request *req;
	int reaped, i, rc;
	int count = 0;
@@ -1397,22 +1542,29 @@ spdk_nvmf_rdma_poll(struct spdk_nvmf_conn *conn)
			continue;
		}

		rdma_req = (struct spdk_nvmf_rdma_request *)wc[i].wr_id;
		assert(rdma_req != NULL);
		assert(rdma_req - rdma_conn->reqs >= 0);
		assert(rdma_req - rdma_conn->reqs < (ptrdiff_t)rdma_conn->max_queue_depth);
		req = &rdma_req->req;

		switch (wc[i].opcode) {
		case IBV_WC_SEND:
			rdma_req = get_rdma_req_from_wc(rdma_conn, &wc[i]);
			req = &rdma_req->req;

			assert(rdma_conn->cur_queue_depth > 0);
			SPDK_TRACELOG(SPDK_TRACE_RDMA,
				      "RDMA SEND Complete. Request: %p Connection: %p Outstanding I/O: %d\n",
				      req, conn, rdma_conn->cur_queue_depth - 1);
			rdma_conn->cur_queue_depth--;
			TAILQ_INSERT_TAIL(&rdma_conn->free_queue, rdma_req, link);
			rc = process_incoming_queue(rdma_conn);
			if (rc < 0) {
				error = true;
				continue;
			}
			count += rc;
			break;

		case IBV_WC_RDMA_WRITE:
			rdma_req = get_rdma_req_from_wc(rdma_conn, &wc[i]);
			req = &rdma_req->req;

			SPDK_TRACELOG(SPDK_TRACE_RDMA, "RDMA WRITE Complete. Request: %p Connection: %p\n",
				      req, conn);
			spdk_trace_record(TRACE_RDMA_WRITE_COMPLETE, 0, 0, (uint64_t)req, 0);
@@ -1433,6 +1585,9 @@ spdk_nvmf_rdma_poll(struct spdk_nvmf_conn *conn)
			break;

		case IBV_WC_RDMA_READ:
			rdma_req = get_rdma_req_from_wc(rdma_conn, &wc[i]);
			req = &rdma_req->req;

			SPDK_TRACELOG(SPDK_TRACE_RDMA, "RDMA READ Complete. Request: %p Connection: %p\n",
				      req, conn);
			spdk_trace_record(TRACE_RDMA_READ_COMPLETE, 0, 0, (uint64_t)req, 0);
@@ -1454,47 +1609,25 @@ spdk_nvmf_rdma_poll(struct spdk_nvmf_conn *conn)
			break;

		case IBV_WC_RECV:
			if (wc[i].byte_len < sizeof(struct spdk_nvmf_capsule_cmd)) {
				SPDK_ERRLOG("recv length %u less than capsule header\n", wc[i].byte_len);
				error = true;
				continue;
			}
			rdma_recv = get_rdma_recv_from_wc(rdma_conn, &wc[i]);

			rdma_conn->cur_queue_depth++;
			if (rdma_conn->cur_queue_depth > rdma_conn->max_queue_depth) {
				SPDK_TRACELOG(SPDK_TRACE_RDMA,
				      "RDMA RECV Complete. Request: %p Connection: %p Outstanding I/O: %d\n",
				      req, conn, rdma_conn->cur_queue_depth);
			spdk_trace_record(TRACE_NVMF_IO_START, 0, 0, (uint64_t)req, 0);

			memset(req->rsp, 0, sizeof(*req->rsp));
			rc = spdk_nvmf_request_prep_data(req);
			switch (rc) {
			case SPDK_NVMF_REQUEST_PREP_READY:
				SPDK_TRACELOG(SPDK_TRACE_RDMA, "Request %p is ready for execution\n", req);
				/* Data is immediately available */
				rc = spdk_nvmf_request_exec(req);
				if (rc < 0) {
					error = true;
					continue;
					      "Temporarily exceeded maximum queue depth (%u). Queueing.\n",
					      rdma_conn->cur_queue_depth);
			}
				count++;
				break;
			case SPDK_NVMF_REQUEST_PREP_PENDING_BUFFER:
				SPDK_TRACELOG(SPDK_TRACE_RDMA, "Request %p needs data buffer\n", req);
				TAILQ_INSERT_TAIL(&rdma_conn->pending_data_buf_queue, rdma_req, link);
				break;
			case SPDK_NVMF_REQUEST_PREP_PENDING_DATA:
				SPDK_TRACELOG(SPDK_TRACE_RDMA, "Request %p needs data transfer\n", req);
				rc = spdk_nvmf_rdma_request_transfer_data(req);
			SPDK_TRACELOG(SPDK_TRACE_RDMA,
				      "RDMA RECV Complete. Recv: %p Connection: %p Outstanding I/O: %d\n",
				      rdma_recv, conn, rdma_conn->cur_queue_depth);

			TAILQ_INSERT_TAIL(&rdma_conn->incoming_queue, rdma_recv, link);
			rc = process_incoming_queue(rdma_conn);
			if (rc < 0) {
				error = true;
				continue;
			}
				break;
			case SPDK_NVMF_REQUEST_PREP_ERROR:
				spdk_nvmf_request_complete(req);
				break;
			}
			count += rc;
			break;

		default: