Commit baf3e12c authored by Shuhei Matsumoto's avatar Shuhei Matsumoto Committed by Tomasz Zawadzki
Browse files

lib/vhost: Replace poll group per session by thread per controller



Currently each controller is associated with one of the poll groups.
For each controller, all sessions are associated with the poll
groups with which the corresponding controller is associated.

Vhost poll group does not have any polling loop but its usage was
very complex.

Association of controller with poll group is done based on the
specified cpumask, and poll group is created per CPU core.

This is as same as association of thread with CPU core.

So in this patch, replace poll group per session by thread per
controller.

Signed-off-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Change-Id: Ifa1e136caae11959f7b097b06a22910bc2169b30
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/1146


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-by: default avatarChangpeng Liu <changpeng.liu@intel.com>
Reviewed-by: default avatarDarek Stojaczyk <dariusz.stojaczyk@intel.com>
parent d07e392f
Loading
Loading
Loading
Loading
+49 −139
Original line number Diff line number Diff line
@@ -43,8 +43,6 @@

#include "spdk_internal/memory.h"

static TAILQ_HEAD(, vhost_poll_group) g_poll_groups = TAILQ_HEAD_INITIALIZER(g_poll_groups);

/* Path to folder where character device will be created. Can be set by user. */
static char dev_dirname[PATH_MAX] = "";

@@ -590,6 +588,15 @@ vhost_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask)
	return 0;
}

static void
vhost_dev_thread_exit(void *arg1)
{
	int rc __attribute__((unused));

	rc = spdk_thread_exit(spdk_get_thread());
	assert(rc == 0);
}

int
vhost_dev_register(struct spdk_vhost_dev *vdev, const char *name, const char *mask_str,
		   const struct spdk_vhost_dev_backend *backend)
@@ -628,7 +635,13 @@ vhost_dev_register(struct spdk_vhost_dev *vdev, const char *name, const char *ma
		goto out;
	}

	spdk_cpuset_copy(&vdev->cpumask, &cpumask);
	vdev->thread = spdk_thread_create(vdev->name, &cpumask);
	if (vdev->thread == NULL) {
		SPDK_ERRLOG("Failed to create thread for vhost controller %s.\n", name);
		rc = -EIO;
		goto out;
	}

	vdev->registered = true;
	vdev->backend = backend;
	TAILQ_INIT(&vdev->vsessions);
@@ -638,6 +651,7 @@ vhost_dev_register(struct spdk_vhost_dev *vdev, const char *name, const char *ma

	if (vhost_register_unix_socket(path, name, vdev->virtio_features, vdev->disabled_features,
				       vdev->protocol_features)) {
		spdk_thread_send_msg(vdev->thread, vhost_dev_thread_exit, NULL);
		rc = -EIO;
		goto out;
	}
@@ -670,6 +684,8 @@ vhost_dev_unregister(struct spdk_vhost_dev *vdev)

	SPDK_INFOLOG(SPDK_LOG_VHOST, "Controller %s: removed\n", vdev->name);

	spdk_thread_send_msg(vdev->thread, vhost_dev_thread_exit, NULL);

	free(vdev->name);
	free(vdev->path);
	TAILQ_REMOVE(&g_vhost_devices, vdev, tailq);
@@ -701,51 +717,7 @@ const struct spdk_cpuset *
spdk_vhost_dev_get_cpumask(struct spdk_vhost_dev *vdev)
{
	assert(vdev != NULL);
	return &vdev->cpumask;
}

struct vhost_poll_group *
vhost_get_poll_group(struct spdk_cpuset *cpumask)
{
	struct spdk_cpuset tmp_cpuset = {};
	struct vhost_poll_group *pg, *selected_pg;
	uint32_t min_ctrlrs;

	min_ctrlrs = INT_MAX;
	selected_pg = TAILQ_FIRST(&g_poll_groups);

	TAILQ_FOREACH(pg, &g_poll_groups, tailq) {
		spdk_cpuset_copy(&tmp_cpuset, cpumask);
		spdk_cpuset_and(&tmp_cpuset, spdk_thread_get_cpumask(pg->thread));

		/* ignore threads which could be relocated to a non-masked cpu. */
		if (!spdk_cpuset_equal(&tmp_cpuset, spdk_thread_get_cpumask(pg->thread))) {
			continue;
		}

		if (pg->ref < min_ctrlrs) {
			selected_pg = pg;
			min_ctrlrs = pg->ref;
		}
	}

	assert(selected_pg != NULL);
	return selected_pg;
}

static struct vhost_poll_group *
_get_current_poll_group(void)
{
	struct vhost_poll_group *pg;
	struct spdk_thread *cur_thread = spdk_get_thread();

	TAILQ_FOREACH(pg, &g_poll_groups, tailq) {
		if (pg->thread == cur_thread) {
			return pg;
		}
	}

	return NULL;
	return spdk_thread_get_cpumask(vdev->thread);
}

static void
@@ -775,10 +747,6 @@ vhost_session_start_done(struct spdk_vhost_session *vsession, int response)
{
	if (response == 0) {
		vsession->started = true;
		vsession->poll_group = _get_current_poll_group();
		assert(vsession->poll_group != NULL);
		assert(vsession->poll_group->ref < UINT_MAX);
		vsession->poll_group->ref++;

		assert(vsession->vdev->active_session_num < UINT32_MAX);
		vsession->vdev->active_session_num++;
@@ -792,10 +760,6 @@ vhost_session_stop_done(struct spdk_vhost_session *vsession, int response)
{
	if (response == 0) {
		vsession->started = false;
		assert(vsession->poll_group != NULL);
		assert(vsession->poll_group->ref > 0);
		vsession->poll_group->ref--;
		vsession->poll_group = NULL;

		assert(vsession->vdev->active_session_num > 0);
		vsession->vdev->active_session_num--;
@@ -821,18 +785,18 @@ vhost_event_cb(void *arg1)
}

int
vhost_session_send_event(struct vhost_poll_group *pg,
			 struct spdk_vhost_session *vsession,
vhost_session_send_event(struct spdk_vhost_session *vsession,
			 spdk_vhost_session_fn cb_fn, unsigned timeout_sec,
			 const char *errmsg)
{
	struct vhost_session_fn_ctx ev_ctx = {0};
	struct spdk_vhost_dev *vdev = vsession->vdev;

	ev_ctx.vdev = vsession->vdev;
	ev_ctx.vdev = vdev;
	ev_ctx.vsession_id = vsession->id;
	ev_ctx.cb_fn = cb_fn;

	spdk_thread_send_msg(pg->thread, vhost_event_cb, &ev_ctx);
	spdk_thread_send_msg(vdev->thread, vhost_event_cb, &ev_ctx);

	pthread_mutex_unlock(&g_vhost_mutex);
	wait_for_semaphore(timeout_sec, errmsg);
@@ -888,18 +852,6 @@ foreach_session_continue_cb(void *arg1)
		goto out_unlock_continue;
	}

	if (vsession->started && vsession->poll_group->thread != spdk_get_thread()) {
		/* if session has been relocated to other thread, it is no longer thread-safe
		 * to access its contents here. Even though we're running under the global
		 * vhost mutex, the session itself (and its pollers) are not. We need to chase
		 * the session thread as many times as necessary.
		 */
		spdk_thread_send_msg(vsession->poll_group->thread,
				     foreach_session_continue_cb, arg1);
		pthread_mutex_unlock(&g_vhost_mutex);
		return;
	}

	rc = ctx->cb_fn(vdev, vsession, ctx->user_ctx);
	if (rc < 0) {
		pthread_mutex_unlock(&g_vhost_mutex);
@@ -933,7 +885,7 @@ foreach_session_continue(struct vhost_session_fn_ctx *ev_ctx,

	if (vsession != NULL) {
		ev_ctx->vsession_id = vsession->id;
		spdk_thread_send_msg(vsession->poll_group->thread,
		spdk_thread_send_msg(vdev->thread,
				     foreach_session_continue_cb, ev_ctx);
	} else {
		ev_ctx->vsession_id = UINT32_MAX;
@@ -1248,7 +1200,6 @@ vhost_new_connection_cb(int vid, const char *ifname)
		free(vsession);
		return -1;
	}
	vsession->poll_group = NULL;
	vsession->started = false;
	vsession->initialized = false;
	vsession->next_stats_check_time = 0;
@@ -1305,58 +1256,6 @@ spdk_vhost_unlock(void)
	pthread_mutex_unlock(&g_vhost_mutex);
}

static void
vhost_create_poll_group_done(void *ctx)
{
	spdk_vhost_init_cb init_cb = ctx;
	int ret;

	if (TAILQ_EMPTY(&g_poll_groups)) {
		/* No threads? Iteration failed? */
		init_cb(-ECHILD);
		return;
	}

	ret = vhost_scsi_controller_construct();
	if (ret != 0) {
		SPDK_ERRLOG("Cannot construct vhost controllers\n");
		goto out;
	}

	ret = vhost_blk_controller_construct();
	if (ret != 0) {
		SPDK_ERRLOG("Cannot construct vhost block controllers\n");
		goto out;
	}

#ifdef SPDK_CONFIG_VHOST_INTERNAL_LIB
	ret = vhost_nvme_controller_construct();
	if (ret != 0) {
		SPDK_ERRLOG("Cannot construct vhost NVMe controllers\n");
		goto out;
	}
#endif

out:
	init_cb(ret);
}

static void
vhost_create_poll_group(void *ctx)
{
	struct vhost_poll_group *pg;

	pg = calloc(1, sizeof(*pg));
	if (!pg) {
		SPDK_ERRLOG("Not enough memory to allocate poll groups\n");
		spdk_app_stop(-ENOMEM);
		return;
	}

	pg->thread = spdk_get_thread();
	TAILQ_INSERT_TAIL(&g_poll_groups, pg, tailq);
}

void
spdk_vhost_init(spdk_vhost_init_cb init_cb)
{
@@ -1370,7 +1269,7 @@ spdk_vhost_init(spdk_vhost_init_cb init_cb)
		if (getcwd(dev_dirname, sizeof(dev_dirname) - 1) == NULL) {
			SPDK_ERRLOG("getcwd failed (%d): %s\n", errno, spdk_strerror(errno));
			ret = -1;
			goto err_out;
			goto out;
		}

		len = strlen(dev_dirname);
@@ -1380,19 +1279,34 @@ spdk_vhost_init(spdk_vhost_init_cb init_cb)
		}
	}


	ret = sem_init(&g_dpdk_sem, 0, 0);
	if (ret != 0) {
		SPDK_ERRLOG("Failed to initialize semaphore for rte_vhost pthread.\n");
		ret = -1;
		goto err_out;
		goto out;
	}

	spdk_for_each_thread(vhost_create_poll_group,
			     init_cb,
			     vhost_create_poll_group_done);
	return;
err_out:
	ret = vhost_scsi_controller_construct();
	if (ret != 0) {
		SPDK_ERRLOG("Cannot construct vhost controllers\n");
		goto out;
	}

	ret = vhost_blk_controller_construct();
	if (ret != 0) {
		SPDK_ERRLOG("Cannot construct vhost block controllers\n");
		goto out;
	}

#ifdef SPDK_CONFIG_VHOST_INTERNAL_LIB
	ret = vhost_nvme_controller_construct();
	if (ret != 0) {
		SPDK_ERRLOG("Cannot construct vhost NVMe controllers\n");
		goto out;
	}
#endif

out:
	init_cb(ret);
}

@@ -1400,7 +1314,6 @@ static void
_spdk_vhost_fini(void *arg1)
{
	struct spdk_vhost_dev *vdev, *tmp;
	struct vhost_poll_group *pg, *tpg;

	spdk_vhost_lock();
	vdev = spdk_vhost_dev_next(NULL);
@@ -1414,10 +1327,7 @@ _spdk_vhost_fini(void *arg1)

	/* All devices are removed now. */
	sem_destroy(&g_dpdk_sem);
	TAILQ_FOREACH_SAFE(pg, &g_poll_groups, tailq, tpg) {
		TAILQ_REMOVE(&g_poll_groups, pg, tailq);
		free(pg);
	}

	g_fini_cpl_cb();
}

+5 −7
Original line number Diff line number Diff line
@@ -799,10 +799,7 @@ out:
static int
vhost_blk_start(struct spdk_vhost_session *vsession)
{
	struct vhost_poll_group *pg;

	pg = vhost_get_poll_group(&vsession->vdev->cpumask);
	return vhost_session_send_event(pg, vsession, vhost_blk_start_cb,
	return vhost_session_send_event(vsession, vhost_blk_start_cb,
					3, "start session");
}

@@ -857,8 +854,8 @@ vhost_blk_stop_cb(struct spdk_vhost_dev *vdev,
static int
vhost_blk_stop(struct spdk_vhost_session *vsession)
{
	return vhost_session_send_event(vsession->poll_group, vsession,
					vhost_blk_stop_cb, 3, "stop session");
	return vhost_session_send_event(vsession, vhost_blk_stop_cb,
					3, "stop session");
}

static void
@@ -900,7 +897,8 @@ vhost_blk_write_config_json(struct spdk_vhost_dev *vdev, struct spdk_json_write_
	spdk_json_write_named_object_begin(w, "params");
	spdk_json_write_named_string(w, "ctrlr", vdev->name);
	spdk_json_write_named_string(w, "dev_name", spdk_bdev_get_name(bvdev->bdev));
	spdk_json_write_named_string(w, "cpumask", spdk_cpuset_fmt(&vdev->cpumask));
	spdk_json_write_named_string(w, "cpumask",
				     spdk_cpuset_fmt(spdk_thread_get_cpumask(vdev->thread)));
	spdk_json_write_named_bool(w, "readonly", bvdev->readonly);
	spdk_json_write_object_end(w);

+2 −14
Original line number Diff line number Diff line
@@ -79,12 +79,6 @@
#define SPDK_VHOST_DISABLED_FEATURES ((1ULL << VIRTIO_RING_F_EVENT_IDX) | \
	(1ULL << VIRTIO_F_NOTIFY_ON_EMPTY))

struct vhost_poll_group {
	struct spdk_thread *thread;
	unsigned ref;
	TAILQ_ENTRY(vhost_poll_group) tailq;
};

typedef struct rte_vhost_resubmit_desc spdk_vhost_resubmit_desc;
typedef struct rte_vhost_resubmit_info spdk_vhost_resubmit_info;

@@ -123,8 +117,6 @@ struct spdk_vhost_session {
	/* Unique session name. */
	char *name;

	struct vhost_poll_group *poll_group;

	bool initialized;
	bool started;
	bool needs_restart;
@@ -157,7 +149,7 @@ struct spdk_vhost_dev {
	char *name;
	char *path;

	struct spdk_cpuset cpumask;
	struct spdk_thread *thread;
	bool registered;

	uint64_t virtio_features;
@@ -351,7 +343,6 @@ void vhost_dev_foreach_session(struct spdk_vhost_dev *dev,
 * will unlock for the time it's waiting. It's meant to be called only
 * from start/stop session callbacks.
 *
 * \param pg designated session's poll group
 * \param vsession vhost session
 * \param cb_fn the function to call. The void *arg parameter in cb_fn
 * is always NULL.
@@ -360,8 +351,7 @@ void vhost_dev_foreach_session(struct spdk_vhost_dev *dev,
 * \param errmsg error message to print once the timeout expires
 * \return return the code passed to spdk_vhost_session_event_done().
 */
int vhost_session_send_event(struct vhost_poll_group *pg,
			     struct spdk_vhost_session *vsession,
int vhost_session_send_event(struct spdk_vhost_session *vsession,
			     spdk_vhost_session_fn cb_fn, unsigned timeout_sec,
			     const char *errmsg);

@@ -402,8 +392,6 @@ int vhost_driver_unregister(const char *path);
int vhost_get_mem_table(int vid, struct rte_vhost_memory **mem);
int vhost_get_negotiated_features(int vid, uint64_t *negotiated_features);

struct vhost_poll_group *vhost_get_poll_group(struct spdk_cpuset *cpumask);

int remove_vhost_controller(struct spdk_vhost_dev *vdev);

#ifdef SPDK_CONFIG_VHOST_INTERNAL_LIB
+2 −5
Original line number Diff line number Diff line
@@ -1120,15 +1120,12 @@ out:
static int
spdk_vhost_nvme_start(struct spdk_vhost_session *vsession)
{
	struct vhost_poll_group *pg;

	if (vsession->vdev->active_session_num > 0) {
		/* We're trying to start a second session */
		SPDK_ERRLOG("Vhost-NVMe devices can support only one simultaneous connection.\n");
		return -1;
	}

	pg = vhost_get_poll_group(&vsession->vdev->cpumask);
	return vhost_session_send_event(pg, vsession, spdk_vhost_nvme_start_cb,
					3, "start session");
}
@@ -1215,8 +1212,8 @@ spdk_vhost_nvme_stop_cb(struct spdk_vhost_dev *vdev,
static int
spdk_vhost_nvme_stop(struct spdk_vhost_session *vsession)
{
	return vhost_session_send_event(vsession->poll_group, vsession,
					spdk_vhost_nvme_stop_cb, 3, "start session");
	return vhost_session_send_event(vsession, spdk_vhost_nvme_stop_cb,
					3, "start session");
}

static void
+2 −1
Original line number Diff line number Diff line
@@ -372,7 +372,8 @@ _spdk_rpc_get_vhost_controller(struct spdk_json_write_ctx *w, struct spdk_vhost_
	spdk_json_write_object_begin(w);

	spdk_json_write_named_string(w, "ctrlr", spdk_vhost_dev_get_name(vdev));
	spdk_json_write_named_string_fmt(w, "cpumask", "0x%s", spdk_cpuset_fmt(&vdev->cpumask));
	spdk_json_write_named_string_fmt(w, "cpumask", "0x%s",
					 spdk_cpuset_fmt(spdk_thread_get_cpumask(vdev->thread)));
	spdk_json_write_named_uint32(w, "delay_base_us", delay_base_us);
	spdk_json_write_named_uint32(w, "iops_threshold", iops_threshold);
	spdk_json_write_named_string(w, "socket", vdev->path);
Loading