Commit 67b0dcfe authored by Seth Howell's avatar Seth Howell Committed by Tomasz Zawadzki
Browse files

nvme_rdma: add tracking for rdma objects in qpair.



Signed-off-by: default avatarSeth Howell <seth.howell@intel.com>
Change-Id: I0b45aed21dc649888bb9d93c5937fb553f35eb27
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/2568


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Community-CI: Mellanox Build Bot
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: default avatarAleksey Marchuk <alexeymar@mellanox.com>
parent 8bef6f0b
Loading
Loading
Loading
Loading
+90 −17
Original line number Diff line number Diff line
@@ -93,7 +93,7 @@
 * Number of poller cycles to keep a pointer to destroyed qpairs
 * in the poll group.
 */
#define NVME_RDMA_DESTROYED_QPAIR_EXPIRATION_CYCLES	10
#define NVME_RDMA_DESTROYED_QPAIR_EXPIRATION_CYCLES	50

enum nvme_rdma_wr_type {
	RDMA_WR_TYPE_RECV,
@@ -226,6 +226,10 @@ struct nvme_rdma_qpair {
	TAILQ_HEAD(, spdk_nvme_rdma_req)	free_reqs;
	TAILQ_HEAD(, spdk_nvme_rdma_req)	outstanding_reqs;

	/* Counts of outstanding send and recv objects */
	uint16_t				current_num_recvs;
	uint16_t				current_num_sends;

	/* Placed at the end of the struct since it is not used frequently */
	struct rdma_cm_event			*evt;

@@ -291,6 +295,8 @@ static const char *rdma_cm_event_str[] = {

static LIST_HEAD(, spdk_nvme_rdma_mr_map) g_rdma_mr_maps = LIST_HEAD_INITIALIZER(&g_rdma_mr_maps);
static pthread_mutex_t g_rdma_mr_maps_mutex = PTHREAD_MUTEX_INITIALIZER;
struct nvme_rdma_qpair *nvme_rdma_poll_group_get_qpair_by_id(struct nvme_rdma_poll_group *group,
		uint32_t qp_num);

static inline void *
nvme_rdma_calloc(size_t nmemb, size_t size)
@@ -620,6 +626,8 @@ nvme_rdma_qpair_init(struct nvme_rdma_qpair *rqpair)
	/* ibv_create_qp will change the values in attr.cap. Make sure we store the proper value. */
	rqpair->max_send_sge = spdk_min(NVME_RDMA_DEFAULT_TX_SGE, attr.cap.max_send_sge);
	rqpair->max_recv_sge = spdk_min(NVME_RDMA_DEFAULT_RX_SGE, attr.cap.max_recv_sge);
	rqpair->current_num_recvs = 0;
	rqpair->current_num_sends = 0;

	rctrlr->pd = rqpair->rdma_qp->qp->pd;

@@ -639,6 +647,11 @@ nvme_rdma_qpair_submit_sends(struct nvme_rdma_qpair *rqpair)
	if (spdk_unlikely(rc)) {
		SPDK_ERRLOG("Failed to post WRs on send queue, errno %d (%s), bad_wr %p\n",
			    rc, spdk_strerror(rc), bad_send_wr);
		while (bad_send_wr != NULL) {
			assert(rqpair->current_num_sends > 0);
			rqpair->current_num_sends--;
			bad_send_wr = bad_send_wr->next;
		}
		return rc;
	}

@@ -656,6 +669,11 @@ nvme_rdma_qpair_submit_recvs(struct nvme_rdma_qpair *rqpair)
		if (spdk_unlikely(rc)) {
			SPDK_ERRLOG("Failed to post WRs on receive queue, errno %d (%s), bad_wr %p\n",
				    rc, spdk_strerror(rc), bad_recv_wr);
			while (bad_recv_wr != NULL) {
				assert(rqpair->current_num_sends > 0);
				rqpair->current_num_recvs--;
				bad_recv_wr = bad_recv_wr->next;
			}
			return rc;
		}

@@ -671,6 +689,9 @@ nvme_rdma_qpair_queue_send_wr(struct nvme_rdma_qpair *rqpair, struct ibv_send_wr
{
	assert(wr->next == NULL);

	assert(rqpair->current_num_sends < rqpair->num_entries);

	rqpair->current_num_sends++;
	spdk_rdma_qp_queue_send_wrs(rqpair->rdma_qp, wr);

	if (!rqpair->delay_cmd_submit) {
@@ -685,8 +706,11 @@ nvme_rdma_qpair_queue_send_wr(struct nvme_rdma_qpair *rqpair, struct ibv_send_wr
static inline int
nvme_rdma_qpair_queue_recv_wr(struct nvme_rdma_qpair *rqpair, struct ibv_recv_wr *wr)
{

	assert(wr->next == NULL);
	assert(rqpair->current_num_recvs < rqpair->num_entries);

	rqpair->current_num_recvs++;
	if (rqpair->recvs_to_post.first == NULL) {
		rqpair->recvs_to_post.first = wr;
	} else {
@@ -2141,10 +2165,7 @@ nvme_rdma_conditional_fail_qpair(struct nvme_rdma_qpair *rqpair, struct nvme_rdm
{
	struct nvme_rdma_destroyed_qpair	*qpair_tracker;

	if (!rqpair) {
		return;
	}

	assert(rqpair);
	if (group) {
		STAILQ_FOREACH(qpair_tracker, &group->destroyed_qpairs, link) {
			if (qpair_tracker->destroyed_qpair_tracker == rqpair) {
@@ -2157,7 +2178,8 @@ nvme_rdma_conditional_fail_qpair(struct nvme_rdma_qpair *rqpair, struct nvme_rdm

static int
nvme_rdma_cq_process_completions(struct ibv_cq *cq, uint32_t batch_size,
				 struct nvme_rdma_poll_group *group)
				 struct nvme_rdma_poll_group *group,
				 struct nvme_rdma_qpair *rdma_qpair)
{
	struct ibv_wc			wc[MAX_COMPLETIONS_PER_POLL];
	struct nvme_rdma_qpair		*rqpair;
@@ -2183,6 +2205,8 @@ nvme_rdma_cq_process_completions(struct ibv_cq *cq, uint32_t batch_size,
		case RDMA_WR_TYPE_RECV:
			rdma_rsp = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvme_rdma_rsp, rdma_wr);
			rqpair = rdma_rsp->rqpair;
			assert(rqpair->current_num_recvs > 0);
			rqpair->current_num_recvs--;

			if (wc[i].status) {
				SPDK_ERRLOG("CQ error on Queue Pair %p, Response Index %lu (%d): %s\n",
@@ -2222,8 +2246,14 @@ nvme_rdma_cq_process_completions(struct ibv_cq *cq, uint32_t batch_size,
			/* If we are flushing I/O */
			if (wc[i].status) {
				rqpair = rdma_req->req ? nvme_rdma_qpair(rdma_req->req->qpair) : NULL;
				if (!rqpair) {
					rqpair = rdma_qpair != NULL ? rdma_qpair : nvme_rdma_poll_group_get_qpair_by_id(group,
							wc[i].qp_num);
				}
				assert(rqpair);
				assert(rqpair->current_num_sends > 0);
				rqpair->current_num_sends--;
				nvme_rdma_conditional_fail_qpair(rqpair, group);

				SPDK_ERRLOG("CQ error on Queue Pair %p, Response Index %lu (%d): %s\n",
					    rqpair, wc[i].wr_id, wc[i].status, ibv_wc_status_str(wc[i].status));
				completion_rc = -ENXIO;
@@ -2232,6 +2262,7 @@ nvme_rdma_cq_process_completions(struct ibv_cq *cq, uint32_t batch_size,

			rqpair = nvme_rdma_qpair(rdma_req->req->qpair);
			rdma_req->completion_flags |= NVME_RDMA_SEND_COMPLETED;
			rqpair->current_num_sends--;

			if ((rdma_req->completion_flags & NVME_RDMA_RECV_COMPLETED) != 0) {
				if (spdk_unlikely(nvme_rdma_request_ready(rqpair, rdma_req))) {
@@ -2305,7 +2336,7 @@ nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
	rqpair->num_completions = 0;
	do {
		batch_size = spdk_min((max_completions - rqpair->num_completions), MAX_COMPLETIONS_PER_POLL);
		rc = nvme_rdma_cq_process_completions(cq, batch_size, NULL);
		rc = nvme_rdma_cq_process_completions(cq, batch_size, NULL, rqpair);

		if (rc == 0) {
			break;
@@ -2450,6 +2481,37 @@ nvme_rdma_poll_group_create(void)
	return &group->group;
}

struct nvme_rdma_qpair *
nvme_rdma_poll_group_get_qpair_by_id(struct nvme_rdma_poll_group *group, uint32_t qp_num)
{
	struct spdk_nvme_qpair *qpair;
	struct nvme_rdma_destroyed_qpair *rqpair_tracker;
	struct nvme_rdma_qpair *rqpair;

	STAILQ_FOREACH(qpair, &group->group.disconnected_qpairs, poll_group_stailq) {
		rqpair = nvme_rdma_qpair(qpair);
		if (rqpair->rdma_qp->qp->qp_num == qp_num) {
			return rqpair;
		}
	}

	STAILQ_FOREACH(qpair, &group->group.connected_qpairs, poll_group_stailq) {
		rqpair = nvme_rdma_qpair(qpair);
		if (rqpair->rdma_qp->qp->qp_num == qp_num) {
			return rqpair;
		}
	}

	STAILQ_FOREACH(rqpair_tracker, &group->destroyed_qpairs, link) {
		rqpair = rqpair_tracker->destroyed_qpair_tracker;
		if (rqpair->rdma_qp->qp->qp_num == qp_num) {
			return rqpair;
		}
	}

	return NULL;
}

static int
nvme_rdma_poll_group_connect_qpair(struct spdk_nvme_qpair *qpair)
{
@@ -2536,6 +2598,20 @@ nvme_rdma_poll_group_remove(struct spdk_nvme_transport_poll_group *tgroup,
	return 0;
}

static void
nvme_rdma_poll_group_delete_qpair(struct nvme_rdma_poll_group *group,
				  struct nvme_rdma_destroyed_qpair *qpair_tracker)
{
	struct nvme_rdma_qpair *rqpair = qpair_tracker->destroyed_qpair_tracker;

	rqpair->defer_deletion_to_pg = false;
	if (nvme_qpair_get_state(&rqpair->qpair) == NVME_QPAIR_DESTROYING) {
		nvme_rdma_ctrlr_delete_io_qpair(rqpair->qpair.ctrlr, &rqpair->qpair);
	}
	STAILQ_REMOVE(&group->destroyed_qpairs, qpair_tracker, nvme_rdma_destroyed_qpair, link);
	free(qpair_tracker);
}

static int64_t
nvme_rdma_poll_group_process_completions(struct spdk_nvme_transport_poll_group *tgroup,
		uint32_t completions_per_qpair, spdk_nvme_disconnected_qpair_cb disconnected_qpair_cb)
@@ -2581,7 +2657,7 @@ nvme_rdma_poll_group_process_completions(struct spdk_nvme_transport_poll_group *
		poller_completions = 0;
		do {
			batch_size = spdk_min((completions_per_poller - poller_completions), MAX_COMPLETIONS_PER_POLL);
			rc = nvme_rdma_cq_process_completions(poller->cq, batch_size, group);
			rc = nvme_rdma_cq_process_completions(poller->cq, batch_size, group, NULL);
			if (rc <= 0) {
				if (rc == -ECANCELED) {
					return -EIO;
@@ -2608,18 +2684,15 @@ nvme_rdma_poll_group_process_completions(struct spdk_nvme_transport_poll_group *
	/*
	 * Once a qpair is disconnected, we can still get flushed completions for those disconnected qpairs.
	 * For most pieces of hardware, those requests will complete immediately. However, there are certain
	 * cases where flushed requests will linger.
	 * cases where flushed requests will linger. Default is to destroy qpair after all completions are freed,
	 * but have a fallback for other cases where we don't get all of our completions back.
	 */
	STAILQ_FOREACH_SAFE(qpair_tracker, &group->destroyed_qpairs, link, tmp_qpair_tracker) {
		qpair_tracker->completed_cycles++;
		rqpair = qpair_tracker->destroyed_qpair_tracker;
		if (qpair_tracker->completed_cycles > NVME_RDMA_DESTROYED_QPAIR_EXPIRATION_CYCLES) {
			rqpair->defer_deletion_to_pg = false;
			if (nvme_qpair_get_state(&rqpair->qpair) == NVME_QPAIR_DESTROYING) {
				nvme_rdma_ctrlr_delete_io_qpair(rqpair->qpair.ctrlr, &rqpair->qpair);
			}
			STAILQ_REMOVE(&group->destroyed_qpairs, qpair_tracker, nvme_rdma_destroyed_qpair, link);
			free(qpair_tracker);
		if ((rqpair->current_num_sends == 0 && rqpair->current_num_sends == 0) ||
		    qpair_tracker->completed_cycles > NVME_RDMA_DESTROYED_QPAIR_EXPIRATION_CYCLES) {
			nvme_rdma_poll_group_delete_qpair(group, qpair_tracker);
		}
	}