Commit fa457d63 authored by Shuhei Matsumoto's avatar Shuhei Matsumoto Committed by Tomasz Zawadzki
Browse files

nvme_rdma: Add list to poll_group to do submits only for active qpairs



To improve the scalability, poll_group does submits only for active
qpairs.

Add num_outstanding_reqs to nvme_rdma_qpair and add active_qpairs list
to nvme_rdma_poll_group. Then, when the num_outstanding_reqs becomes
non-zero, add the qpair to the active_qpairs list. However, we have to
ensure resiliency. Even if the num_outstanding_reqs becomes zero, the
qpair may have queued IOs. Not to miss queued IOs, remove a qpair from
the active_qpairs list only if the num_outstanding_reqs is zero and
there is no queued IOs.

As we did in a previous patch, we use prev pointer to check if qpair is
in the active list or not.

Signed-off-by: default avatarShuhei Matsumoto <smatsumoto@nvidia.com>
Change-Id: Ic9cbf2718554623267d4bc7c5364514ce66d5cc0
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/18850


Reviewed-by: default avatarBen Walker <ben@nvidia.com>
Reviewed-by: default avatarAleksey Marchuk <alexeymar@nvidia.com>
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
parent b659997e
Loading
Loading
Loading
Loading
+41 −6
Original line number Diff line number Diff line
@@ -154,6 +154,7 @@ struct nvme_rdma_poll_group {
	STAILQ_HEAD(, nvme_rdma_poller)			pollers;
	uint32_t					num_pollers;
	TAILQ_HEAD(, nvme_rdma_qpair)			connecting_qpairs;
	TAILQ_HEAD(, nvme_rdma_qpair)			active_qpairs;
};

enum nvme_rdma_qpair_state {
@@ -210,6 +211,7 @@ struct nvme_rdma_qpair {
	bool					delay_cmd_submit;

	uint32_t				num_completions;
	uint32_t				num_outstanding_reqs;

	struct nvme_rdma_rsps			*rsps;

@@ -229,6 +231,8 @@ struct nvme_rdma_qpair {
	/* Count of outstanding send objects */
	uint16_t				current_num_sends;

	TAILQ_ENTRY(nvme_rdma_qpair)		link_active;

	/* Placed at the end of the struct since it is not used frequently */
	struct rdma_cm_event			*evt;
	struct nvme_rdma_poller			*poller;
@@ -454,6 +458,9 @@ nvme_rdma_req_complete(struct spdk_nvme_rdma_req *rdma_req,
		spdk_nvme_qpair_print_completion(qpair, rsp);
	}

	assert(rqpair->num_outstanding_reqs > 0);
	rqpair->num_outstanding_reqs--;

	TAILQ_REMOVE(&rqpair->outstanding_reqs, rdma_req, link);

	nvme_complete_request(req->cb_fn, req->cb_arg, qpair, req, rsp);
@@ -2296,6 +2303,7 @@ nvme_rdma_qpair_submit_request(struct spdk_nvme_qpair *qpair,
	struct nvme_rdma_qpair *rqpair;
	struct spdk_nvme_rdma_req *rdma_req;
	struct ibv_send_wr *wr;
	struct nvme_rdma_poll_group *group;

	rqpair = nvme_rdma_qpair(qpair);
	assert(rqpair != NULL);
@@ -2318,6 +2326,12 @@ nvme_rdma_qpair_submit_request(struct spdk_nvme_qpair *qpair,

	TAILQ_INSERT_TAIL(&rqpair->outstanding_reqs, rdma_req, link);

	if (!rqpair->link_active.tqe_prev && qpair->poll_group) {
		group = nvme_rdma_poll_group(qpair->poll_group);
		TAILQ_INSERT_TAIL(&group->active_qpairs, rqpair, link_active);
	}
	rqpair->num_outstanding_reqs++;

	assert(rqpair->current_num_sends < rqpair->num_entries);
	rqpair->current_num_sends++;

@@ -3047,6 +3061,7 @@ nvme_rdma_poll_group_create(void)

	STAILQ_INIT(&group->pollers);
	TAILQ_INIT(&group->connecting_qpairs);
	TAILQ_INIT(&group->active_qpairs);
	return &group->group;
}

@@ -3060,7 +3075,7 @@ static int
nvme_rdma_poll_group_disconnect_qpair(struct spdk_nvme_qpair *qpair)
{
	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
	struct nvme_rdma_poll_group *group = nvme_rdma_poll_group(qpair->poll_group);;
	struct nvme_rdma_poll_group *group = nvme_rdma_poll_group(qpair->poll_group);

	if (rqpair->link_connecting.tqe_prev) {
		TAILQ_REMOVE(&group->connecting_qpairs, rqpair, link_connecting);
@@ -3084,15 +3099,27 @@ static int
nvme_rdma_poll_group_remove(struct spdk_nvme_transport_poll_group *tgroup,
			    struct spdk_nvme_qpair *qpair)
{
	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
	struct nvme_rdma_poll_group *group = nvme_rdma_poll_group(qpair->poll_group);

	if (rqpair->link_active.tqe_prev) {
		TAILQ_REMOVE(&group->active_qpairs, rqpair, link_active);
		rqpair->link_active.tqe_prev = NULL;
	}

	return 0;
}

static inline void
nvme_rdma_qpair_process_submits(struct spdk_nvme_qpair *qpair)
nvme_rdma_qpair_process_submits(struct nvme_rdma_poll_group *group,
				struct nvme_rdma_qpair *rqpair)
{
	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
	struct spdk_nvme_qpair	*qpair = &rqpair->qpair;

	assert(rqpair->link_active.tqe_prev != NULL);

	if (spdk_unlikely(rqpair->state <= NVME_RDMA_QPAIR_STATE_INITIALIZING)) {
	if (spdk_unlikely(rqpair->state <= NVME_RDMA_QPAIR_STATE_INITIALIZING ||
			  rqpair->state >= NVME_RDMA_QPAIR_STATE_EXITING)) {
		return;
	}

@@ -3108,6 +3135,14 @@ nvme_rdma_qpair_process_submits(struct spdk_nvme_qpair *qpair)
		nvme_qpair_resubmit_requests(qpair, rqpair->num_completions);
		rqpair->num_completions = 0;
	}

	if (rqpair->num_outstanding_reqs == 0 && STAILQ_EMPTY(&qpair->queued_req)) {
		TAILQ_REMOVE(&group->active_qpairs, rqpair, link_active);
		/* We use prev pointer to check if qpair is in active list or not.
		 * TAILQ_REMOVE doesn't do it. So, we do it manually.
		 */
		rqpair->link_active.tqe_prev = NULL;
	}
}

static int64_t
@@ -3202,8 +3237,8 @@ nvme_rdma_poll_group_process_completions(struct spdk_nvme_transport_poll_group *
		}
	}

	STAILQ_FOREACH_SAFE(qpair, &tgroup->connected_qpairs, poll_group_stailq, tmp_qpair) {
		nvme_rdma_qpair_process_submits(qpair);
	TAILQ_FOREACH_SAFE(rqpair, &group->active_qpairs, link_active, tmp_rqpair) {
		nvme_rdma_qpair_process_submits(group, rqpair);
	}

	return rc2 != 0 ? rc2 : total_completions;