Commit 6f95c325 authored by Ziye Yang's avatar Ziye Yang Committed by Jim Harris
Browse files

lib/nvmf: Create a shared buffer waiting list



Our previous implementation makes the buffer allocation
in an unfair way and this patch can solve this issue.

With this patch, we can use limited buffer to support
high I/O depth.

Change-Id: I0e7a073c0b4539090218aa461d50620287bb4b63
Signed-off-by: default avatarZiye Yang <optimistyzy@gmail.com>
Reviewed-on: https://review.gerrithub.io/382528


Tested-by: default avatarSPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: default avatarDaniel Verkamp <daniel.verkamp@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
parent ab8a0460
Loading
Loading
Loading
Loading
+47 −9
Original line number Diff line number Diff line
@@ -166,9 +166,6 @@ struct spdk_nvmf_rdma_qpair {
	/* Requests that are not in use */
	TAILQ_HEAD(, spdk_nvmf_rdma_request)	free_queue;

	/* Requests that are waiting to obtain a data buffer */
	TAILQ_HEAD(, spdk_nvmf_rdma_request)	pending_data_buf_queue;

	/* Requests that are waiting to perform an RDMA READ or WRITE */
	TAILQ_HEAD(, spdk_nvmf_rdma_request)	pending_rdma_rw_queue;

@@ -198,6 +195,10 @@ struct spdk_nvmf_rdma_qpair {

	TAILQ_ENTRY(spdk_nvmf_rdma_qpair)	link;
	TAILQ_ENTRY(spdk_nvmf_rdma_qpair)	pending_link;

	/* Mgmt channel */
	struct spdk_io_channel			*mgmt_channel;
	struct spdk_nvmf_rdma_mgmt_channel	*ch;
};

/* List of RDMA connections that have not yet received a CONNECT capsule */
@@ -254,6 +255,30 @@ struct spdk_nvmf_rdma_transport {
	TAILQ_HEAD(, spdk_nvmf_rdma_port)	ports;
};

struct spdk_nvmf_rdma_mgmt_channel {
	/* Requests that are waiting to obtain a data buffer */
	TAILQ_HEAD(, spdk_nvmf_rdma_request)	pending_data_buf_queue;
};

static int
spdk_nvmf_rdma_mgmt_channel_create(void *io_device, void *ctx_buf)
{
	struct spdk_nvmf_rdma_mgmt_channel *ch = ctx_buf;

	TAILQ_INIT(&ch->pending_data_buf_queue);
	return 0;
}

static void
spdk_nvmf_rdma_mgmt_channel_destroy(void *io_device, void *ctx_buf)
{
	struct spdk_nvmf_rdma_mgmt_channel *ch = ctx_buf;

	if (!TAILQ_EMPTY(&ch->pending_data_buf_queue)) {
		SPDK_ERRLOG("Pending I/O list wasn't empty on channel destruction\n");
	}
}

static void
spdk_nvmf_rdma_qpair_destroy(struct spdk_nvmf_rdma_qpair *rdma_qpair)
{
@@ -278,6 +303,7 @@ spdk_nvmf_rdma_qpair_destroy(struct spdk_nvmf_rdma_qpair *rdma_qpair)
		ibv_destroy_cq(rdma_qpair->cq);
	}

	spdk_put_io_channel(rdma_qpair->mgmt_channel);
	/* Free all memory */
	spdk_dma_free(rdma_qpair->cmds);
	spdk_dma_free(rdma_qpair->cpls);
@@ -314,7 +340,6 @@ spdk_nvmf_rdma_qpair_create(struct spdk_nvmf_transport *transport,
	rdma_qpair->max_rw_depth = max_rw_depth;
	TAILQ_INIT(&rdma_qpair->incoming_queue);
	TAILQ_INIT(&rdma_qpair->free_queue);
	TAILQ_INIT(&rdma_qpair->pending_data_buf_queue);
	TAILQ_INIT(&rdma_qpair->pending_rdma_rw_queue);

	rdma_qpair->cq = ibv_create_cq(id->verbs, max_queue_depth * 3, rdma_qpair, NULL, 0);
@@ -658,6 +683,14 @@ nvmf_rdma_connect(struct spdk_nvmf_transport *transport, struct rdma_cm_event *e
	 * is received. */
	TAILQ_INSERT_TAIL(&g_pending_conns, rdma_qpair, pending_link);

	rdma_qpair->mgmt_channel = spdk_get_io_channel(rtransport);
	if (!rdma_qpair->mgmt_channel) {
		goto err2;
	}

	rdma_qpair->ch = spdk_io_channel_get_ctx(rdma_qpair->mgmt_channel);
	assert(rdma_qpair->ch != NULL);

	return 0;

err2:
@@ -967,12 +1000,12 @@ spdk_nvmf_rdma_request_process(struct spdk_nvmf_rdma_transport *rtransport,
			}

			rdma_req->state = RDMA_REQUEST_STATE_NEED_BUFFER;
			TAILQ_INSERT_TAIL(&rqpair->pending_data_buf_queue, rdma_req, link);
			TAILQ_INSERT_TAIL(&rqpair->ch->pending_data_buf_queue, rdma_req, link);
			break;
		case RDMA_REQUEST_STATE_NEED_BUFFER:
			assert(rdma_req->req.xfer != SPDK_NVME_DATA_NONE);

			if (rdma_req != TAILQ_FIRST(&rqpair->pending_data_buf_queue)) {
			if (rdma_req != TAILQ_FIRST(&rqpair->ch->pending_data_buf_queue)) {
				/* This request needs to wait in line to obtain a buffer */
				break;
			}
@@ -980,7 +1013,7 @@ spdk_nvmf_rdma_request_process(struct spdk_nvmf_rdma_transport *rtransport,
			/* Try to get a data buffer */
			rc = spdk_nvmf_rdma_request_parse_sgl(rtransport, device, rdma_req);
			if (rc < 0) {
				TAILQ_REMOVE(&rqpair->pending_data_buf_queue, rdma_req, link);
				TAILQ_REMOVE(&rqpair->ch->pending_data_buf_queue, rdma_req, link);
				rsp->status.sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
				break;
@@ -991,7 +1024,7 @@ spdk_nvmf_rdma_request_process(struct spdk_nvmf_rdma_transport *rtransport,
				break;
			}

			TAILQ_REMOVE(&rqpair->pending_data_buf_queue, rdma_req, link);
			TAILQ_REMOVE(&rqpair->ch->pending_data_buf_queue, rdma_req, link);

			/* If data is transferring from host to controller and the data didn't
			 * arrive using in capsule data, we need to do a transfer from the host.
@@ -1141,6 +1174,10 @@ spdk_nvmf_rdma_create(struct spdk_nvmf_tgt *tgt)
		return NULL;
	}

	spdk_io_device_register(rtransport, spdk_nvmf_rdma_mgmt_channel_create,
				spdk_nvmf_rdma_mgmt_channel_destroy,
				sizeof(struct spdk_nvmf_rdma_mgmt_channel));

	contexts = rdma_get_devices(NULL);
	i = 0;
	rc = 0;
@@ -1212,6 +1249,7 @@ spdk_nvmf_rdma_destroy(struct spdk_nvmf_transport *transport)
	}

	spdk_mempool_free(rtransport->data_buf_pool);
	spdk_io_device_unregister(rtransport, NULL);
	free(rtransport);

	return 0;
@@ -1627,7 +1665,7 @@ spdk_nvmf_rdma_qpair_process_pending(struct spdk_nvmf_rdma_transport *rtransport
	}

	/* The second highest priority is I/O waiting on memory buffers. */
	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->pending_data_buf_queue, link, req_tmp) {
	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->ch->pending_data_buf_queue, link, req_tmp) {
		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false) {
			break;
		}