Commit 1c05f3fb authored by Changpeng Liu's avatar Changpeng Liu Committed by Tomasz Zawadzki
Browse files

vhost-blk: enable multiple threads poll group support

The patch set is largely based on Jaylyn Ren's https://review.spdk.io/gerrit/c/spdk/spdk/+/16067
and https://review.spdk.io/gerrit/c/spdk/spdk/+/16068

 patches to
enable multiple threads support for vhost-blk, here I enable multiple
theads based on vhost session, this will make the code more readable
and easy to maintain.

For vhost-blk device using single core, it's same as before, if the
device is created with multiple core masks, we will start one thread
per core for the device, this is helpful when no competition on these
cores, >= 2 IO queues per thread is recommended in practice.

SPDK already has the CI test(lvol) case to create vhost-blk device
with the same core mask as vhost application, then for this case,
multiple threads are created for this vhost-blk device.

Signed-off-by: default avatarJaylyn Ren <Jaylyn.Ren@arm.com>
Signed-off-by: default avatarChangpeng Liu <changpeng.liu@intel.com>
Change-Id: I7cff622722824dfad780359a7af259d359a20f4b
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/22616


Community-CI: Mellanox Build Bot
Reviewed-by: default avatarTomasz Zawadzki <tomasz.zawadzki@intel.com>
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarJim Harris <jim.harris@samsung.com>
parent cc7971b6
Loading
Loading
Loading
Loading
+374 −93
Original line number Diff line number Diff line
@@ -58,15 +58,45 @@ struct spdk_vhost_blk_dev {
	const struct spdk_virtio_blk_transport_ops *ops;

	bool readonly;
	/* Next poll group index to be assigned */
	uint32_t next_pg_index;
};

struct vhost_user_pg_vq_info {
	struct vhost_user_poll_group *pg;
	struct spdk_vhost_virtqueue *vq;
	struct spdk_vhost_session *vsession;

	TAILQ_ENTRY(vhost_user_pg_vq_info) link;
};

struct vhost_user_poll_group {
	struct spdk_vhost_dev *vdev;
	struct spdk_vhost_session *vsession;

	struct spdk_thread *thread;
	struct spdk_poller *requestq_poller;
	struct spdk_io_channel *io_channel;

	int task_cnt;

	TAILQ_HEAD(, vhost_user_pg_vq_info) vqs;

	struct spdk_poller *stop_poller;
	uint32_t stop_retry_count;
};

struct spdk_vhost_blk_session {
	/* The parent session must be the very first field in this struct */
	struct spdk_vhost_session vsession;
	struct spdk_vhost_blk_dev *bvdev;
	struct spdk_poller *requestq_poller;
	struct spdk_io_channel *io_channel;
	struct spdk_poller *stop_poller;

	struct spdk_thread *thread;
	struct vhost_user_poll_group *poll_groups;
	uint32_t num_poll_groups;

	uint32_t num_stopped_poll_groups;
};

/* forward declaration */
@@ -75,13 +105,16 @@ static const struct spdk_vhost_dev_backend vhost_blk_device_backend;
static void vhost_user_blk_request_finish(uint8_t status, struct spdk_vhost_blk_task *task,
		void *cb_arg);

static void session_stop_poll_groups(struct spdk_vhost_blk_session *bvsession);

static int
vhost_user_process_blk_request(struct spdk_vhost_user_blk_task *user_task)
{
	struct spdk_vhost_blk_session *bvsession = user_task->bvsession;
	struct spdk_vhost_dev *vdev = &bvsession->bvdev->vdev;
	struct vhost_user_poll_group *pg = (struct vhost_user_poll_group *)user_task->vq->poll_group;

	return virtio_blk_process_request(vdev, bvsession->io_channel, &user_task->blk_task,
	return virtio_blk_process_request(vdev, pg->io_channel, &user_task->blk_task,
					  vhost_user_blk_request_finish, NULL);
}

@@ -120,14 +153,20 @@ to_blk_session(struct spdk_vhost_session *vsession)
static inline void
blk_task_inc_task_cnt(struct spdk_vhost_user_blk_task *task)
{
	task->bvsession->vsession.task_cnt++;
	struct spdk_vhost_virtqueue *vq = task->vq;
	struct vhost_user_poll_group *pg = (struct vhost_user_poll_group *)vq->poll_group;

	pg->task_cnt++;
}

static inline void
blk_task_dec_task_cnt(struct spdk_vhost_user_blk_task *task)
{
	assert(task->bvsession->vsession.task_cnt > 0);
	task->bvsession->vsession.task_cnt--;
	struct spdk_vhost_virtqueue *vq = task->vq;
	struct vhost_user_poll_group *pg = (struct vhost_user_poll_group *)vq->poll_group;

	assert(pg->task_cnt > 0);
	pg->task_cnt--;
}

static void
@@ -940,13 +979,15 @@ vdev_vq_worker(void *arg)
static int
vdev_worker(void *arg)
{
	struct spdk_vhost_blk_session *bvsession = arg;
	struct spdk_vhost_session *vsession = &bvsession->vsession;
	uint16_t q_idx;
	struct vhost_user_poll_group *pg = arg;
	struct vhost_user_pg_vq_info *vq_info;
	struct spdk_vhost_virtqueue *vq;
	int rc = 0;

	for (q_idx = 0; q_idx < vsession->max_queues; q_idx++) {
		rc += _vdev_vq_worker(&vsession->virtqueue[q_idx]);
	TAILQ_FOREACH(vq_info, &pg->vqs, link) {
		vq = vq_info->vq;
		assert(vq->poll_group == pg);
		rc = _vdev_vq_worker(vq);
	}

	return rc > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
@@ -1021,6 +1062,8 @@ _no_bdev_vdev_vq_worker(struct spdk_vhost_virtqueue *vq)
{
	struct spdk_vhost_session *vsession = vq->vsession;
	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
	struct vhost_user_poll_group *pg = (struct vhost_user_poll_group *)vq->poll_group;

	bool packed_ring;

	packed_ring = vq->packed.packed_ring;
@@ -1032,9 +1075,9 @@ _no_bdev_vdev_vq_worker(struct spdk_vhost_virtqueue *vq)

	vhost_session_vq_used_signal(vq);

	if (vsession->task_cnt == 0 && bvsession->io_channel) {
		vhost_blk_put_io_channel(bvsession->io_channel);
		bvsession->io_channel = NULL;
	if (pg->task_cnt == 0 && pg->io_channel) {
		vhost_blk_put_io_channel(pg->io_channel);
		pg->io_channel = NULL;
	}

	return SPDK_POLLER_BUSY;
@@ -1051,41 +1094,38 @@ no_bdev_vdev_vq_worker(void *arg)
static int
no_bdev_vdev_worker(void *arg)
{
	struct spdk_vhost_blk_session *bvsession = arg;
	struct spdk_vhost_session *vsession = &bvsession->vsession;
	uint16_t q_idx;
	struct vhost_user_poll_group *pg = arg;
	struct vhost_user_pg_vq_info *vq_info;
	int rc = 0;

	for (q_idx = 0; q_idx < vsession->max_queues; q_idx++) {
		_no_bdev_vdev_vq_worker(&vsession->virtqueue[q_idx]);
	TAILQ_FOREACH(vq_info, &pg->vqs, link) {
		rc = _no_bdev_vdev_vq_worker(vq_info->vq);
	}

	return SPDK_POLLER_BUSY;
	return rc > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
}

static void
vhost_blk_session_unregister_interrupts(struct spdk_vhost_blk_session *bvsession)
vhost_blk_pg_unregister_interrupts(struct vhost_user_poll_group *pg)
{
	struct spdk_vhost_session *vsession = &bvsession->vsession;
	struct vhost_user_pg_vq_info *vq_info;
	struct spdk_vhost_virtqueue *vq;
	int i;

	SPDK_DEBUGLOG(vhost_blk, "unregister virtqueues interrupt\n");
	for (i = 0; i < vsession->max_queues; i++) {
		vq = &vsession->virtqueue[i];
	TAILQ_FOREACH(vq_info, &pg->vqs, link) {
		vq = vq_info->vq;
		if (vq->intr == NULL) {
			break;
		}

		SPDK_DEBUGLOG(vhost_blk, "unregister vq[%d]'s kickfd is %d\n",
			      i, vq->vring.kickfd);
			      vq->vring_idx, vq->vring.kickfd);
		spdk_interrupt_unregister(&vq->intr);
	}
}

static void
_vhost_blk_vq_register_interrupt(void *arg)
vhost_blk_vq_register_interrupt(struct spdk_vhost_virtqueue *vq)
{
	struct spdk_vhost_virtqueue *vq = arg;
	struct spdk_vhost_session *vsession = vq->vsession;
	struct spdk_vhost_blk_dev *bvdev =  to_blk_dev(vsession->vdev);

@@ -1104,28 +1144,91 @@ _vhost_blk_vq_register_interrupt(void *arg)
	}
}

static void
add_vq_to_poll_group(void *arg)
{
	struct vhost_user_pg_vq_info *vq_info = arg;
	struct vhost_user_poll_group *pg = vq_info->pg;

	SPDK_DEBUGLOG(vhost_blk, "%s: vring %u is added to pg %p, thread %s, lcore %u\n",
		      pg->vsession->name,
		      vq_info->vq->vring_idx, pg, spdk_thread_get_name(spdk_get_thread()), spdk_env_get_current_core());

	TAILQ_INSERT_TAIL(&pg->vqs, vq_info, link);

	if (spdk_interrupt_mode_is_enabled()) {
		vhost_blk_vq_register_interrupt(vq_info->vq);
	}
}

static struct vhost_user_poll_group *
get_optimal_poll_group(struct spdk_vhost_blk_session *bvsession)
{
	struct vhost_user_poll_group *pg;
	struct spdk_vhost_blk_dev *bvdev;

	if (bvsession->bvdev == NULL) {
		return NULL;
	}

	/* round robin */
	bvdev = bvsession->bvdev;
	if (bvdev->next_pg_index >= bvsession->num_poll_groups) {
		bvdev->next_pg_index = 0;
	}

	pg = &bvsession->poll_groups[bvdev->next_pg_index];
	bvdev->next_pg_index++;

	return pg;
}

static int
vhost_blk_vq_enable(struct spdk_vhost_session *vsession, struct spdk_vhost_virtqueue *vq)
{
	if (spdk_interrupt_mode_is_enabled()) {
		spdk_thread_send_msg(vsession->vdev->thread, _vhost_blk_vq_register_interrupt, vq);
	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
	struct spdk_vhost_dev *vdev;
	struct spdk_vhost_user_dev *user_dev;
	struct vhost_user_pg_vq_info *vq_info;

	vdev = vsession->vdev;
	user_dev = to_user_dev(vdev);

	SPDK_DEBUGLOG(vhost_blk, "%s: enable vq %u\n", vsession->name, vq->vring_idx);

	pthread_mutex_lock(&user_dev->lock);
	if (vsession->started || vsession->starting) {
		pthread_mutex_unlock(&user_dev->lock);
		vq_info = calloc(1, sizeof(*vq_info));
		if (!vq_info) {
			SPDK_ERRLOG("Failed to allocate vq_info\n");
			return -ENOMEM;
		}
		vq_info->vq = vq;
		vq_info->pg = get_optimal_poll_group(bvsession);
		if (vq_info->pg == NULL) {
			free(vq_info);
			return -EFAULT;
		}
		vq->poll_group = (void *)vq_info->pg;
		spdk_thread_send_msg(vq_info->pg->thread, add_vq_to_poll_group, vq_info);
		return 0;
	}
	pthread_mutex_unlock(&user_dev->lock);

	return 0;
}

static int
vhost_blk_session_register_no_bdev_interrupts(struct spdk_vhost_blk_session *bvsession)
vhost_blk_pg_register_no_bdev_interrupts(struct vhost_user_poll_group *pg)
{
	struct spdk_vhost_session *vsession = &bvsession->vsession;
	struct spdk_vhost_virtqueue *vq = NULL;
	int i;
	struct vhost_user_pg_vq_info *vq_info;
	struct spdk_vhost_virtqueue *vq;

	SPDK_DEBUGLOG(vhost_blk, "Register virtqueues interrupt\n");
	for (i = 0; i < vsession->max_queues; i++) {
		vq = &vsession->virtqueue[i];
	TAILQ_FOREACH(vq_info, &pg->vqs, link) {
		vq = vq_info->vq;
		SPDK_DEBUGLOG(vhost_blk, "Register vq[%d]'s kickfd is %d\n",
			      i, vq->vring.kickfd);
			      vq->vring_idx, vq->vring.kickfd);
		vq->intr = spdk_interrupt_register(vq->vring.kickfd, no_bdev_vdev_vq_worker, vq,
						   "no_bdev_vdev_vq_worker");
		if (vq->intr == NULL) {
@@ -1137,7 +1240,7 @@ vhost_blk_session_register_no_bdev_interrupts(struct spdk_vhost_blk_session *bvs
	return 0;

err:
	vhost_blk_session_unregister_interrupts(bvsession);
	vhost_blk_pg_unregister_interrupts(pg);
	return -1;
}

@@ -1187,29 +1290,44 @@ vhost_user_blk_resize_cb(struct spdk_vhost_dev *vdev, bdev_event_cb_complete cb,
				       cb, cb_arg);
}

static int
vhost_user_session_bdev_remove_cb(struct spdk_vhost_dev *vdev,
				  struct spdk_vhost_session *vsession,
				  void *ctx)
static void
_vhost_user_session_bdev_remove_cb(void *arg)
{
	struct spdk_vhost_blk_session *bvsession;
	struct vhost_user_poll_group *pg = arg;
	struct spdk_vhost_session *vsession = pg->vsession;
	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
	int rc;

	bvsession = to_blk_session(vsession);
	if (bvsession->requestq_poller) {
		spdk_poller_unregister(&bvsession->requestq_poller);
	if (pg->requestq_poller == NULL) {
		return;
	}

	spdk_poller_unregister(&pg->requestq_poller);
	if (spdk_interrupt_mode_is_enabled()) {
			vhost_blk_session_unregister_interrupts(bvsession);
			rc = vhost_blk_session_register_no_bdev_interrupts(bvsession);
		vhost_blk_pg_unregister_interrupts(pg);
		rc = vhost_blk_pg_register_no_bdev_interrupts(pg);
		if (rc) {
				SPDK_ERRLOG("%s: Interrupt register failed\n", vsession->name);
				return rc;
			SPDK_ERRLOG("Interrupt register failed\n");
			return;
		}
	}

		bvsession->requestq_poller = SPDK_POLLER_REGISTER(no_bdev_vdev_worker, bvsession, 0);
		spdk_poller_register_interrupt(bvsession->requestq_poller, vhost_blk_poller_set_interrupt_mode,
					       bvsession);
	pg->requestq_poller = SPDK_POLLER_REGISTER(no_bdev_vdev_worker, pg, 0);
	spdk_poller_register_interrupt(pg->requestq_poller, vhost_blk_poller_set_interrupt_mode, bvsession);
}

static int
vhost_user_session_bdev_remove_cb(struct spdk_vhost_dev *vdev,
				  struct spdk_vhost_session *vsession,
				  void *ctx)
{
	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
	struct vhost_user_poll_group *pg;
	uint32_t i;

	for (i = 0; i < bvsession->num_poll_groups; i++) {
		pg = &bvsession->poll_groups[i];
		spdk_thread_send_msg(pg->thread, _vhost_user_session_bdev_remove_cb, pg);
	}

	return 0;
@@ -1328,6 +1446,124 @@ alloc_vq_task_pool(struct spdk_vhost_session *vsession, uint16_t qid)
	return 0;
}

static void
session_start_poll_group(void *args)
{
	struct vhost_user_pg_vq_info *vq_info;
	struct vhost_user_poll_group *pg = args;
	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(pg->vdev);
	struct spdk_vhost_blk_session *bvsession = to_blk_session(pg->vsession);

	if (bvdev->bdev) {
		pg->io_channel = vhost_blk_get_io_channel(pg->vdev);
		SPDK_DEBUGLOG(vhost_blk, "%s: pg %p, pg io channel %p, thread %s, lcore %u\n",
			      bvsession->vsession.name, pg,
			      pg->io_channel, spdk_thread_get_name(spdk_get_thread()), spdk_env_get_current_core());
		if (!pg->io_channel) {
			SPDK_ERRLOG("%s: I/O channel allocation failed\n", bvsession->vsession.name);
			return;
		}
	}

	if (spdk_interrupt_mode_is_enabled()) {
		TAILQ_FOREACH(vq_info, &pg->vqs, link) {
			vhost_blk_vq_register_interrupt(vq_info->vq);
		}
	}

	if (bvdev->bdev) {
		pg->requestq_poller = SPDK_POLLER_REGISTER(vdev_worker, pg, 0);
	} else {
		pg->requestq_poller = SPDK_POLLER_REGISTER(no_bdev_vdev_worker, pg, 0);
	}
	SPDK_INFOLOG(vhost, "%s: poller started on lcore %d\n",
		     bvsession->vsession.name, spdk_env_get_current_core());

	spdk_poller_register_interrupt(pg->requestq_poller, vhost_blk_poller_set_interrupt_mode, bvsession);
}

static int
session_start_poll_groups(struct spdk_vhost_dev *vdev, struct spdk_vhost_session *vsession)
{
	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
	struct vhost_user_poll_group *pg;
	struct vhost_user_pg_vq_info *vq_info;
	struct spdk_cpuset *cpumask;
	char thread_name[128];
	uint32_t i, index = 0;
	int rc = 0;

	bvsession->thread = vdev->thread;
	cpumask = spdk_thread_get_cpumask(vdev->thread);
	/* If no cpumask is input by user, we still start one thread for the device */
	if (vdev->use_default_cpumask) {
		bvsession->num_poll_groups = 1;
	} else {
		bvsession->num_poll_groups = spdk_cpuset_count(cpumask);
	}
	bvsession->poll_groups = calloc(bvsession->num_poll_groups, sizeof(struct vhost_user_poll_group));
	if (!bvsession->poll_groups) {
		SPDK_ERRLOG("Failed to allocate poll groups\n");
		return -ENOMEM;
	}

	for (i = 0; i < bvsession->num_poll_groups; i++) {
		pg = &bvsession->poll_groups[i];
		TAILQ_INIT(&pg->vqs);
	}

	for (i = 0; i < vsession->max_queues; i++) {
		vq_info = calloc(1, sizeof(*vq_info));
		if (!vq_info) {
			SPDK_ERRLOG("Failed to allocate vq_info\n");
			rc = -ENOMEM;
			goto err;
		}
		vq_info->vq = &vsession->virtqueue[i];
		vq_info->vsession = vsession;

		pg = get_optimal_poll_group(bvsession);
		if (pg == NULL) {
			free(vq_info);
			rc = -EFAULT;
			goto err;
		}
		vq_info->pg = pg;
		vq_info->vq->poll_group = pg;

		SPDK_DEBUGLOG(vhost_blk, "%s: vring %u is added to pg %p\n", vsession->name, i, pg);
		TAILQ_INSERT_TAIL(&pg->vqs, vq_info, link);
	}

	SPDK_ENV_FOREACH_CORE(i) {
		if (!spdk_cpuset_get_cpu(cpumask, i)) {
			continue;
		}

		snprintf(thread_name, sizeof(thread_name), "%s.%u_%u", vdev->name, vsession->vid, i);
		pg = &bvsession->poll_groups[index];
		pg->vdev = vdev;
		pg->vsession = vsession;
		pg->thread = spdk_thread_create(thread_name, cpumask);
		if (!pg->thread) {
			SPDK_ERRLOG("Failed to create %s session %d poll groups\n", vdev->name, vsession->vid);
			rc = -EFAULT;
			goto err;
		}
		spdk_thread_send_msg(pg->thread, session_start_poll_group, pg);
		index++;
		if (index == bvsession->num_poll_groups) {
			break;
		}
	}

	return 0;

err:
	session_stop_poll_groups(bvsession);
	return rc;
}

static int
vhost_blk_start(struct spdk_vhost_dev *vdev,
		struct spdk_vhost_session *vsession, void *unused)
@@ -1337,8 +1573,8 @@ vhost_blk_start(struct spdk_vhost_dev *vdev,
	int i;

	/* return if start is already in progress */
	if (bvsession->requestq_poller) {
		SPDK_INFOLOG(vhost, "%s: start in progress\n", vsession->name);
	if (vsession->started || vsession->starting) {
		SPDK_INFOLOG(vhost, "%s: is starting or started\n", vsession->name);
		return -EINPROGRESS;
	}

@@ -1357,27 +1593,84 @@ vhost_blk_start(struct spdk_vhost_dev *vdev,
	assert(bvdev != NULL);
	bvsession->bvdev = bvdev;

	if (bvdev->bdev) {
		bvsession->io_channel = vhost_blk_get_io_channel(vdev);
		if (!bvsession->io_channel) {
			free_task_pool(bvsession);
			SPDK_ERRLOG("%s: I/O channel allocation failed\n", vsession->name);
			return -1;
	return session_start_poll_groups(vdev, vsession);
}

static void
session_stop_poll_group_done(void *arg)
{
	struct spdk_vhost_blk_session *bvession = arg;

	bvession->num_stopped_poll_groups++;
}

	if (bvdev->bdev) {
		bvsession->requestq_poller = SPDK_POLLER_REGISTER(vdev_worker, bvsession, 0);
	} else {
		bvsession->requestq_poller = SPDK_POLLER_REGISTER(no_bdev_vdev_worker, bvsession, 0);
static int
pg_stop_poller_cb(void *args)
{
	struct vhost_user_poll_group *pg = args;
	struct spdk_vhost_blk_session *bvsession;
	struct vhost_user_pg_vq_info *vq_info, *tmp;

	if (!pg->task_cnt) {
		TAILQ_FOREACH_SAFE(vq_info, &pg->vqs, link, tmp) {
			TAILQ_REMOVE(&pg->vqs, vq_info, link);
			vq_info->vq->next_event_time = 0;
			vhost_vq_used_signal(pg->vsession, vq_info->vq);
			free(vq_info);
		}
		goto done;
	}

	pg->stop_retry_count--;
	if (pg->stop_retry_count) {
		return SPDK_POLLER_IDLE;
	}
	SPDK_INFOLOG(vhost, "%s: started poller on lcore %d\n",
		     vsession->name, spdk_env_get_current_core());

	spdk_poller_register_interrupt(bvsession->requestq_poller, vhost_blk_poller_set_interrupt_mode,
				       bvsession);
done:
	SPDK_INFOLOG(vhost, "%s: stopping poller on lcore %d\n",
		     pg->vsession->name, spdk_env_get_current_core());

	return 0;
	spdk_poller_unregister(&pg->stop_poller);
	if (pg->io_channel) {
		vhost_blk_put_io_channel(pg->io_channel);
		pg->io_channel = NULL;
	}

	bvsession = to_blk_session(pg->vsession);
	spdk_thread_exit(pg->thread);
	spdk_thread_send_msg(bvsession->thread, session_stop_poll_group_done, bvsession);

	return SPDK_POLLER_BUSY;
}

static void
session_stop_poll_group(void *args)
{
	struct vhost_user_poll_group *pg = args;

	spdk_poller_unregister(&pg->requestq_poller);
	vhost_blk_pg_unregister_interrupts(pg);

	/* Timeout value should be less than SPDK_VHOST_SESSION_STOP_RETRY_TIMEOUT_IN_SEC */
	pg->stop_retry_count = (SPDK_VHOST_SESSION_STOP_TIMEOUT_IN_SEC * 1000 *
				1000) / SPDK_VHOST_SESSION_STOP_RETRY_PERIOD_IN_US;
	pg->stop_poller = SPDK_POLLER_REGISTER(pg_stop_poller_cb, pg,
					       SPDK_VHOST_SESSION_STOP_RETRY_PERIOD_IN_US);
}

static void
session_stop_poll_groups(struct spdk_vhost_blk_session *bvsession)
{
	uint32_t i;
	struct vhost_user_poll_group *pg;

	bvsession->num_stopped_poll_groups = 0;
	for (i = 0; i < bvsession->num_poll_groups; i++) {
		pg = &bvsession->poll_groups[i];
		if (pg->thread) {
			spdk_thread_send_msg(pg->thread, session_stop_poll_group, pg);
		}
	}
}

static int
@@ -1386,14 +1679,14 @@ destroy_session_poller_cb(void *arg)
	struct spdk_vhost_blk_session *bvsession = arg;
	struct spdk_vhost_session *vsession = &bvsession->vsession;
	struct spdk_vhost_user_dev *user_dev = to_user_dev(vsession->vdev);
	int i;

	if (vsession->task_cnt > 0 || (pthread_mutex_trylock(&user_dev->lock) != 0)) {
	if ((bvsession->num_stopped_poll_groups != bvsession->num_poll_groups) ||
	    (pthread_mutex_trylock(&user_dev->lock) != 0)) {
		assert(vsession->stop_retry_count > 0);
		vsession->stop_retry_count--;
		if (vsession->stop_retry_count == 0) {
			SPDK_ERRLOG("%s: Timedout when destroy session (task_cnt %d)\n", vsession->name,
				    vsession->task_cnt);
			SPDK_ERRLOG("%s: Timedout when destroy session (number of stopped pg %d)\n", vsession->name,
				    bvsession->num_stopped_poll_groups);
			spdk_poller_unregister(&bvsession->stop_poller);
			vhost_user_session_stop_done(vsession, -ETIMEDOUT);
		}
@@ -1401,19 +1694,8 @@ destroy_session_poller_cb(void *arg)
		return SPDK_POLLER_BUSY;
	}

	for (i = 0; i < vsession->max_queues; i++) {
		vsession->virtqueue[i].next_event_time = 0;
		vhost_vq_used_signal(vsession, &vsession->virtqueue[i]);
	}

	SPDK_INFOLOG(vhost, "%s: stopping poller on lcore %d\n",
		     vsession->name, spdk_env_get_current_core());

	if (bvsession->io_channel) {
		vhost_blk_put_io_channel(bvsession->io_channel);
		bvsession->io_channel = NULL;
	}

	SPDK_DEBUGLOG(vhost_blk, "%s: session stoppped\n", vsession->name);
	free(bvsession->poll_groups);
	free_task_pool(bvsession);
	spdk_poller_unregister(&bvsession->stop_poller);
	vhost_user_session_stop_done(vsession, 0);
@@ -1433,8 +1715,7 @@ vhost_blk_stop(struct spdk_vhost_dev *vdev,
		return -EINPROGRESS;
	}

	spdk_poller_unregister(&bvsession->requestq_poller);
	vhost_blk_session_unregister_interrupts(bvsession);
	session_stop_poll_groups(bvsession);

	bvsession->vsession.stop_retry_count = (SPDK_VHOST_SESSION_STOP_RETRY_TIMEOUT_IN_SEC * 1000 *
						1000) / SPDK_VHOST_SESSION_STOP_RETRY_PERIOD_IN_US;
+2 −0
Original line number Diff line number Diff line
@@ -111,6 +111,8 @@ struct spdk_vhost_virtqueue {
	uint32_t vring_idx;

	struct spdk_vhost_session *vsession;
	/* Polling group private thread context */
	void *poll_group;

	struct spdk_interrupt *intr;
} __attribute((aligned(SPDK_CACHE_LINE_SIZE)));