Commit ccdc0b61 authored by Darek Stojaczyk's avatar Darek Stojaczyk Committed by Ben Walker
Browse files

vhost: operate on poll groups instead of lcores



With all the pieces in place we can finally remove
the legacy cross thread messages from vhost.

We replace spdk_vhost_allocate_reactor() with
spdk_vhost_get_poll_group(). The returned poll_group
has to be passed to spdk_vhost_session_send_event(),
where it will be assigned to the session. After the
session it started, that poll group will be used for
all the internal vhost cross-thread messaging.

Change-Id: I17f13d3cc6e2b64e4b614c3ceb1eddb31056669b
Signed-off-by: default avatarDarek Stojaczyk <dariusz.stojaczyk@intel.com>
Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/452207


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: default avatarBen Walker <benjamin.walker@intel.com>
parent 74019868
Loading
Loading
Loading
Loading
+48 −55
Original line number Diff line number Diff line
@@ -45,12 +45,14 @@

struct vhost_poll_group {
	struct spdk_thread *thread;
	unsigned ref;
	TAILQ_ENTRY(vhost_poll_group) tailq;
};

static TAILQ_HEAD(, vhost_poll_group) g_poll_groups = TAILQ_HEAD_INITIALIZER(g_poll_groups);

static uint32_t *g_num_ctrlrs;
/* Temporary cpuset for poll group assignment */
static struct spdk_cpuset *g_tmp_cpuset;

/* Path to folder where character device will be created. Can be set by user. */
static char dev_dirname[PATH_MAX] = "";
@@ -65,7 +67,7 @@ struct spdk_vhost_session_fn_ctx {
	/** ID of the session to send event to. */
	uint32_t vsession_id;

	/** User callback function to be executed on given lcore. */
	/** User callback function to be executed on given thread. */
	spdk_vhost_session_fn cb_fn;

	/** Semaphore used to signal that event is done. */
@@ -603,12 +605,6 @@ spdk_vhost_session_mem_unregister(struct spdk_vhost_session *vsession)

}

void
spdk_vhost_free_reactor(uint32_t lcore)
{
	g_num_ctrlrs[lcore]--;
}

struct spdk_vhost_dev *
spdk_vhost_dev_next(struct spdk_vhost_dev *vdev)
{
@@ -856,28 +852,41 @@ spdk_vhost_dev_get_cpumask(struct spdk_vhost_dev *vdev)
	return vdev->cpumask;
}

uint32_t
spdk_vhost_allocate_reactor(struct spdk_cpuset *cpumask)
struct vhost_poll_group *
spdk_vhost_get_poll_group(struct spdk_cpuset *cpumask)
{
	uint32_t i, selected_core;
	struct vhost_poll_group *pg, *selected_pg;
	uint32_t min_ctrlrs;

	min_ctrlrs = INT_MAX;
	selected_core = spdk_env_get_first_core();
	selected_pg = TAILQ_FIRST(&g_poll_groups);

	TAILQ_FOREACH(pg, &g_poll_groups, tailq) {
		spdk_cpuset_copy(g_tmp_cpuset, cpumask);
		spdk_cpuset_and(g_tmp_cpuset, spdk_thread_get_cpumask(pg->thread));

	SPDK_ENV_FOREACH_CORE(i) {
		if (!spdk_cpuset_get_cpu(cpumask, i)) {
		/* ignore threads which could be relocated to a non-masked cpu. */
		if (!spdk_cpuset_equal(g_tmp_cpuset, spdk_thread_get_cpumask(pg->thread))) {
			continue;
		}

		if (g_num_ctrlrs[i] < min_ctrlrs) {
			selected_core = i;
			min_ctrlrs = g_num_ctrlrs[i];
		if (pg->ref < min_ctrlrs) {
			selected_pg = pg;
			min_ctrlrs = pg->ref;
		}
	}

	assert(selected_pg != NULL);
	assert(selected_pg->ref < UINT_MAX);
	selected_pg->ref++;
	return selected_pg;
}

	g_num_ctrlrs[selected_core]++;
	return selected_core;
void
spdk_vhost_put_poll_group(struct vhost_poll_group *pg)
{
	assert(pg->ref > 0);
	pg->ref--;
}

static void
@@ -912,16 +921,13 @@ spdk_vhost_session_stop_done(struct spdk_vhost_session *vsession, int response)
}

static void
spdk_vhost_event_cb(void *arg1, void *arg2)
spdk_vhost_event_cb(void *arg1)
{
	struct spdk_vhost_session_fn_ctx *ctx = arg1;
	struct spdk_vhost_session *vsession;
	struct spdk_event *ev;

	if (pthread_mutex_trylock(&g_spdk_vhost_mutex) != 0) {
		ev = spdk_event_allocate(spdk_env_get_current_core(),
					 spdk_vhost_event_cb, arg1, NULL);
		spdk_event_call(ev);
		spdk_thread_send_msg(spdk_get_thread(), spdk_vhost_event_cb, arg1);
		return;
	}

@@ -935,18 +941,16 @@ static void spdk_vhost_external_event_foreach_continue(struct spdk_vhost_dev *vd
		spdk_vhost_session_fn fn, void *arg);

static void
spdk_vhost_event_async_foreach_fn(void *arg1, void *arg2)
spdk_vhost_event_async_foreach_fn(void *arg1)
{
	struct spdk_vhost_session_fn_ctx *ctx = arg1;
	struct spdk_vhost_session *vsession = NULL;
	struct spdk_vhost_dev *vdev = ctx->vdev;
	struct spdk_event *ev;
	int rc;

	if (pthread_mutex_trylock(&g_spdk_vhost_mutex) != 0) {
		ev = spdk_event_allocate(spdk_env_get_current_core(),
					 spdk_vhost_event_async_foreach_fn, arg1, NULL);
		spdk_event_call(ev);
		spdk_thread_send_msg(spdk_get_thread(),
				     spdk_vhost_event_async_foreach_fn, arg1);
		return;
	}

@@ -958,16 +962,15 @@ spdk_vhost_event_async_foreach_fn(void *arg1, void *arg2)
		goto out_unlock_continue;
	}

	if (vsession->started &&
	    (uint32_t)vsession->lcore != spdk_env_get_current_core()) {
		/* if session has been relocated to other core, it is no longer thread-safe

	if (vsession->started && vsession->poll_group->thread != spdk_get_thread()) {
		/* if session has been relocated to other thread, it is no longer thread-safe
		 * to access its contents here. Even though we're running under the global
		 * vhost mutex, the session itself (and its pollers) are not. We need to chase
		 * the session thread as many times as necessary.
		 */
		ev = spdk_event_allocate(vsession->lcore,
					 spdk_vhost_event_async_foreach_fn, arg1, NULL);
		spdk_event_call(ev);
		spdk_thread_send_msg(vsession->poll_group->thread,
				     spdk_vhost_event_async_foreach_fn, arg1);
		pthread_mutex_unlock(&g_spdk_vhost_mutex);
		return;
	}
@@ -986,12 +989,12 @@ out_unlock:
}

int
spdk_vhost_session_send_event(int32_t lcore, struct spdk_vhost_session *vsession,
spdk_vhost_session_send_event(struct vhost_poll_group *pg,
			      struct spdk_vhost_session *vsession,
			      spdk_vhost_session_fn cb_fn, unsigned timeout_sec,
			      const char *errmsg)
{
	struct spdk_vhost_session_fn_ctx ev_ctx = {0};
	struct spdk_event *ev;
	struct timespec timeout;
	int rc;

@@ -1005,11 +1008,9 @@ spdk_vhost_session_send_event(int32_t lcore, struct spdk_vhost_session *vsession
	ev_ctx.vsession_id = vsession->id;
	ev_ctx.cb_fn = cb_fn;

	vsession->lcore = lcore;
	vsession->poll_group = pg;
	vsession->event_ctx = &ev_ctx;
	ev = spdk_event_allocate(lcore, spdk_vhost_event_cb, &ev_ctx, NULL);
	assert(ev);
	spdk_event_call(ev);
	spdk_thread_send_msg(pg->thread, spdk_vhost_event_cb, &ev_ctx);
	pthread_mutex_unlock(&g_spdk_vhost_mutex);

	clock_gettime(CLOCK_REALTIME, &timeout);
@@ -1033,7 +1034,6 @@ spdk_vhost_event_async_send_foreach_continue(struct spdk_vhost_session *vsession
{
	struct spdk_vhost_dev *vdev = vsession->vdev;
	struct spdk_vhost_session_fn_ctx *ev_ctx;
	struct spdk_event *ev;

	ev_ctx = calloc(1, sizeof(*ev_ctx));
	if (ev_ctx == NULL) {
@@ -1047,11 +1047,8 @@ spdk_vhost_event_async_send_foreach_continue(struct spdk_vhost_session *vsession
	ev_ctx->cb_fn = cb_fn;
	ev_ctx->user_ctx = arg;

	ev = spdk_event_allocate(vsession->lcore,
				 spdk_vhost_event_async_foreach_fn, ev_ctx, NULL);
	assert(ev);
	spdk_event_call(ev);

	spdk_thread_send_msg(vsession->poll_group->thread,
			     spdk_vhost_event_async_foreach_fn, ev_ctx);
	return 0;
}

@@ -1333,7 +1330,7 @@ new_connection(int vid)
	vsession->vdev = vdev;
	vsession->id = vdev->vsessions_num++;
	vsession->vid = vid;
	vsession->lcore = -1;
	vsession->poll_group = NULL;
	vsession->started = false;
	vsession->initialized = false;
	vsession->next_stats_check_time = 0;
@@ -1489,7 +1486,6 @@ vhost_create_poll_group(void *ctx)
void
spdk_vhost_init(spdk_vhost_init_cb init_cb)
{
	uint32_t last_core;
	size_t len;
	int ret;

@@ -1507,11 +1503,8 @@ spdk_vhost_init(spdk_vhost_init_cb init_cb)
		}
	}

	last_core = spdk_env_get_last_core();
	g_num_ctrlrs = calloc(last_core + 1, sizeof(uint32_t));
	if (!g_num_ctrlrs) {
		SPDK_ERRLOG("Could not allocate array size=%u for g_num_ctrlrs\n",
			    last_core + 1);
	g_tmp_cpuset = spdk_cpuset_alloc();
	if (g_tmp_cpuset == NULL) {
		ret = -1;
		goto err_out;
	}
@@ -1541,7 +1534,7 @@ _spdk_vhost_fini(void *arg1)
	spdk_vhost_unlock();

	/* All devices are removed now. */
	free(g_num_ctrlrs);
	spdk_cpuset_free(g_tmp_cpuset);
	TAILQ_FOREACH_SAFE(pg, &g_poll_groups, tailq, tpg) {
		TAILQ_REMOVE(&g_poll_groups, pg, tailq);
		free(pg);
+5 −5
Original line number Diff line number Diff line
@@ -730,15 +730,15 @@ out:
static int
spdk_vhost_blk_start(struct spdk_vhost_session *vsession)
{
	int32_t lcore;
	struct vhost_poll_group *pg;
	int rc;

	lcore = spdk_vhost_allocate_reactor(vsession->vdev->cpumask);
	rc = spdk_vhost_session_send_event(lcore, vsession, spdk_vhost_blk_start_cb,
	pg = spdk_vhost_get_poll_group(vsession->vdev->cpumask);
	rc = spdk_vhost_session_send_event(pg, vsession, spdk_vhost_blk_start_cb,
					   3, "start session");

	if (rc != 0) {
		spdk_vhost_free_reactor(lcore);
		spdk_vhost_put_poll_group(pg);
	}

	return rc;
@@ -804,7 +804,7 @@ err:
static int
spdk_vhost_blk_stop(struct spdk_vhost_session *vsession)
{
	return spdk_vhost_session_send_event(vsession->lcore, vsession,
	return spdk_vhost_session_send_event(vsession->poll_group, vsession,
					     spdk_vhost_blk_stop_cb, 3, "stop session");
}

+8 −5
Original line number Diff line number Diff line
@@ -94,6 +94,8 @@
#define SPDK_VHOST_DISABLED_FEATURES ((1ULL << VIRTIO_RING_F_EVENT_IDX) | \
	(1ULL << VIRTIO_F_NOTIFY_ON_EMPTY))

struct vhost_poll_group;

struct spdk_vhost_virtqueue {
	struct rte_vhost_vring vring;
	uint16_t last_avail_idx;
@@ -126,7 +128,7 @@ struct spdk_vhost_session {
	/* Unique session ID. */
	unsigned id;

	int32_t lcore;
	struct vhost_poll_group *poll_group;

	bool initialized;
	bool started;
@@ -330,7 +332,7 @@ void spdk_vhost_dev_foreach_session(struct spdk_vhost_dev *dev,
 * will unlock for the time it's waiting. It's meant to be called only
 * from start/stop session callbacks.
 *
 * \param lcore target session's lcore
 * \param pg designated session's poll group
 * \param vsession vhost session
 * \param cb_fn the function to call. The void *arg parameter in cb_fn
 * is always NULL.
@@ -339,7 +341,8 @@ void spdk_vhost_dev_foreach_session(struct spdk_vhost_dev *dev,
 * \param errmsg error message to print once the timeout expires
 * \return return the code passed to spdk_vhost_session_event_done().
 */
int spdk_vhost_session_send_event(int32_t lcore, struct spdk_vhost_session *vsession,
int spdk_vhost_session_send_event(struct vhost_poll_group *pg,
				  struct spdk_vhost_session *vsession,
				  spdk_vhost_session_fn cb_fn, unsigned timeout_sec,
				  const char *errmsg);

@@ -376,8 +379,8 @@ struct spdk_vhost_session *spdk_vhost_session_find_by_vid(int vid);
void spdk_vhost_session_install_rte_compat_hooks(struct spdk_vhost_session *vsession);
void spdk_vhost_dev_install_rte_compat_hooks(struct spdk_vhost_dev *vdev);

void spdk_vhost_free_reactor(uint32_t lcore);
uint32_t spdk_vhost_allocate_reactor(struct spdk_cpuset *cpumask);
struct vhost_poll_group *spdk_vhost_get_poll_group(struct spdk_cpuset *cpumask);
void spdk_vhost_put_poll_group(struct vhost_poll_group *pg);

int spdk_remove_vhost_controller(struct spdk_vhost_dev *vdev);

+5 −5
Original line number Diff line number Diff line
@@ -1113,7 +1113,7 @@ spdk_vhost_nvme_start_cb(struct spdk_vhost_dev *vdev,
static int
spdk_vhost_nvme_start(struct spdk_vhost_session *vsession)
{
	int32_t lcore;
	struct vhost_poll_group *pg;
	int rc;

	if (vsession->vdev->active_session_num > 0) {
@@ -1122,12 +1122,12 @@ spdk_vhost_nvme_start(struct spdk_vhost_session *vsession)
		return -1;
	}

	lcore = spdk_vhost_allocate_reactor(vsession->vdev->cpumask);
	rc = spdk_vhost_session_send_event(lcore, vsession, spdk_vhost_nvme_start_cb,
	pg = spdk_vhost_get_poll_group(vsession->vdev->cpumask);
	rc = spdk_vhost_session_send_event(pg, vsession, spdk_vhost_nvme_start_cb,
					   3, "start session");

	if (rc != 0) {
		spdk_vhost_free_reactor(lcore);
		spdk_vhost_put_poll_group(pg);
	}

	return rc;
@@ -1215,7 +1215,7 @@ spdk_vhost_nvme_stop_cb(struct spdk_vhost_dev *vdev,
static int
spdk_vhost_nvme_stop(struct spdk_vhost_session *vsession)
{
	return spdk_vhost_session_send_event(vsession->lcore, vsession,
	return spdk_vhost_session_send_event(vsession->poll_group, vsession,
					     spdk_vhost_nvme_stop_cb, 3, "start session");
}

+9 −9
Original line number Diff line number Diff line
@@ -96,8 +96,8 @@ struct spdk_vhost_scsi_dev {
	struct spdk_vhost_dev vdev;
	struct spdk_scsi_dev_vhost_state scsi_dev_state[SPDK_VHOST_SCSI_CTRLR_MAX_DEVS];

	/* The CPU chosen to poll I/O of all active vhost sessions */
	int32_t lcore;
	/* The poll group for all active vhost sessions of this device */
	struct vhost_poll_group *poll_group;
};

/** Context for a SCSI target in a vhost session */
@@ -1376,15 +1376,15 @@ spdk_vhost_scsi_start(struct spdk_vhost_session *vsession)
	svsession->svdev = svdev;

	if (svdev->vdev.active_session_num == 0) {
		svdev->lcore = spdk_vhost_allocate_reactor(svdev->vdev.cpumask);
		svdev->poll_group = spdk_vhost_get_poll_group(svdev->vdev.cpumask);
	}

	rc = spdk_vhost_session_send_event(svdev->lcore, vsession, spdk_vhost_scsi_start_cb,
	rc = spdk_vhost_session_send_event(svdev->poll_group, vsession, spdk_vhost_scsi_start_cb,
					   3, "start session");
	if (rc != 0) {
		if (svdev->vdev.active_session_num == 0) {
			spdk_vhost_free_reactor(svdev->lcore);
			svdev->lcore = -1;
			spdk_vhost_put_poll_group(svdev->poll_group);
			svdev->poll_group = NULL;
		}
	}

@@ -1484,15 +1484,15 @@ spdk_vhost_scsi_stop(struct spdk_vhost_session *vsession)
		SPDK_ERRLOG("Trying to stop non-scsi session as a scsi one.\n");
		return -1;
	}
	rc = spdk_vhost_session_send_event(vsession->lcore, vsession,
	rc = spdk_vhost_session_send_event(vsession->poll_group, vsession,
					   spdk_vhost_scsi_stop_cb, 3, "stop session");
	if (rc != 0) {
		return rc;
	}

	if (vsession->vdev->active_session_num == 0) {
		spdk_vhost_free_reactor(svsession->svdev->lcore);
		svsession->svdev->lcore = -1;
		spdk_vhost_put_poll_group(svsession->svdev->poll_group);
		svsession->svdev->poll_group = NULL;
	}
	return 0;
}
Loading