Commit ebad2567 authored by JinYu's avatar JinYu Committed by Tomasz Zawadzki
Browse files

nvmf_example: initlize the thread layer



Change-Id: Ic00e7f5b557b657f18b0b510987c682363be1da8
Signed-off-by: default avatarJinYu <jin.yu@intel.com>
Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/468656


Reviewed-by: default avatarGangCao <gang.cao@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: default avatarChangpeng Liu <changpeng.liu@intel.com>
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
parent b9793e68
Loading
Loading
Loading
Loading
+199 −0
Original line number Diff line number Diff line
@@ -35,9 +35,28 @@
#include "spdk/env.h"
#include "spdk/event.h"
#include "spdk/string.h"
#include "spdk/thread.h"

static const char *g_rpc_addr = SPDK_DEFAULT_RPC_ADDR;

struct nvmf_lw_thread {
	TAILQ_ENTRY(nvmf_lw_thread) link;
};

struct nvmf_reactor {
	uint32_t core;
	pthread_mutex_t mutex;

	TAILQ_HEAD(, nvmf_lw_thread)	threads;
	TAILQ_ENTRY(nvmf_reactor)	link;
};
TAILQ_HEAD(, nvmf_reactor) g_reactors = TAILQ_HEAD_INITIALIZER(g_reactors);

static struct nvmf_reactor *g_master_reactor = NULL;
static struct nvmf_reactor *g_next_reactor = NULL;
static pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER;
static bool g_reactors_exit = false;

static void
usage(char *program_name)
{
@@ -94,6 +113,178 @@ parse_args(int argc, char **argv, struct spdk_env_opts *opts)
	return 0;
}

static int
nvmf_reactor_run(void *arg)
{
	struct nvmf_reactor *nvmf_reactor = arg;
	struct nvmf_lw_thread *lw_thread, *tmp;
	struct spdk_thread *thread;

	/* foreach all the lightweight threads in this nvmf_reactor */
	do {
		pthread_mutex_lock(&nvmf_reactor->mutex);
		TAILQ_FOREACH_SAFE(lw_thread, &nvmf_reactor->threads, link, tmp) {
			thread = spdk_thread_get_from_ctx(lw_thread);

			spdk_thread_poll(thread, 0, 0);
		}
		pthread_mutex_unlock(&nvmf_reactor->mutex);
	} while (!g_reactors_exit);

	/* free all the lightweight threads */
	pthread_mutex_lock(&nvmf_reactor->mutex);
	TAILQ_FOREACH_SAFE(lw_thread, &nvmf_reactor->threads, link, tmp) {
		thread = spdk_thread_get_from_ctx(lw_thread);
		TAILQ_REMOVE(&nvmf_reactor->threads, lw_thread, link);
		spdk_set_thread(thread);
		spdk_thread_exit(thread);
		spdk_thread_destroy(thread);
	}
	pthread_mutex_unlock(&nvmf_reactor->mutex);

	return 0;
}

static int
nvmf_schedule_spdk_thread(struct spdk_thread *thread)
{
	struct nvmf_reactor *nvmf_reactor;
	struct nvmf_lw_thread *lw_thread;
	struct spdk_cpuset *cpumask;
	uint32_t i;

	/* Lightweight threads may have a requested cpumask.
	 * This is a request only - the scheduler does not have to honor it.
	 * For this scheduler implementation, each reactor is pinned to
	 * a particular core so honoring the request is reasonably easy.
	 */
	cpumask = spdk_thread_get_cpumask(thread);

	lw_thread = spdk_thread_get_ctx(thread);
	assert(lw_thread != NULL);
	memset(lw_thread, 0, sizeof(*lw_thread));

	/* assign lightweight threads to nvmf reactor(core)
	 * Here we use the mutex.The way the actual SPDK event framework
	 * solves this is by using internal rings for messages between reactors
	 */
	for (i = 0; i < spdk_env_get_core_count(); i++) {
		pthread_mutex_lock(&g_mutex);
		if (g_next_reactor == NULL) {
			g_next_reactor = TAILQ_FIRST(&g_reactors);
		}
		nvmf_reactor = g_next_reactor;
		g_next_reactor = TAILQ_NEXT(g_next_reactor, link);
		pthread_mutex_unlock(&g_mutex);

		/* each spdk_thread has the core affinity */
		if (spdk_cpuset_get_cpu(cpumask, nvmf_reactor->core)) {
			pthread_mutex_lock(&nvmf_reactor->mutex);
			TAILQ_INSERT_TAIL(&nvmf_reactor->threads, lw_thread, link);
			pthread_mutex_unlock(&nvmf_reactor->mutex);
			break;
		}
	}

	if (i == spdk_env_get_core_count()) {
		fprintf(stderr, "failed to schedule spdk thread\n");
		return -1;
	}
	return 0;
}

static int
nvmf_init_threads(void)
{
	int rc;
	uint32_t i;
	char thread_name[32];
	struct nvmf_reactor *nvmf_reactor;
	struct spdk_thread *thread;
	struct spdk_cpuset cpumask;
	uint32_t master_core = spdk_env_get_current_core();

	/* Whenever SPDK creates a new lightweight thread it will call
	 * nvmf_schedule_spdk_thread asking for the application to begin
	 * polling it via spdk_thread_poll(). Each lightweight thread in
	 * SPDK optionally allocates extra memory to be used by the application
	 * framework. The size of the extra memory allocated is the second parameter.
	 */
	spdk_thread_lib_init(nvmf_schedule_spdk_thread, sizeof(struct nvmf_lw_thread));

	/* Spawn one system thread per CPU core. The system thread is called a reactor.
	 * SPDK will spawn lightweight threads that must be mapped to reactors in
	 * nvmf_schedule_spdk_thread. Using a single system thread per CPU core is a
	 * choice unique to this application. SPDK itself does not require this specific
	 * threading model. For example, another viable threading model would be
	 * dynamically scheduling the lightweight threads onto a thread pool using a
	 * work queue.
	 */
	SPDK_ENV_FOREACH_CORE(i) {
		nvmf_reactor = calloc(1, sizeof(struct nvmf_reactor));
		if (!nvmf_reactor) {
			fprintf(stderr, "failed to alloc nvmf reactor\n");
			rc = -ENOMEM;
			goto err_exit;
		}

		nvmf_reactor->core = i;
		pthread_mutex_init(&nvmf_reactor->mutex, NULL);
		TAILQ_INIT(&nvmf_reactor->threads);
		TAILQ_INSERT_TAIL(&g_reactors, nvmf_reactor, link);

		if (i == master_core) {
			g_master_reactor = nvmf_reactor;
			g_next_reactor = g_master_reactor;
		} else {
			rc = spdk_env_thread_launch_pinned(i,
							   nvmf_reactor_run,
							   nvmf_reactor);
			if (rc) {
				fprintf(stderr, "failed to pin reactor launch\n");
				goto err_exit;
			}
		}
	}

	/* Some SPDK libraries assume that there is at least some number of lightweight
	 * threads that exist from the beginning of time. That assumption is currently
	 * being removed from the SPDK libraries, but until that work is completed spawn
	 * one lightweight thread per reactor here.
	 */
	SPDK_ENV_FOREACH_CORE(i) {
		spdk_cpuset_zero(&cpumask);
		spdk_cpuset_set_cpu(&cpumask, i, true);
		snprintf(thread_name, sizeof(thread_name), "spdk_thread_%u", i);
		thread = spdk_thread_create(thread_name, &cpumask);
		if (!thread) {
			fprintf(stderr, "failed to create spdk thread\n");
			return -1;
		}
	}

	fprintf(stdout, "nvmf threads initlize successfully\n");
	return 0;

err_exit:
	return rc;
}

static void
nvmf_destroy_threads(void)
{
	struct nvmf_reactor *nvmf_reactor, *tmp;

	TAILQ_FOREACH_SAFE(nvmf_reactor, &g_reactors, link, tmp) {
		pthread_mutex_destroy(&nvmf_reactor->mutex);
		free(nvmf_reactor);
	}

	pthread_mutex_destroy(&g_mutex);
	spdk_thread_lib_fini();
	fprintf(stdout, "nvmf threads destroy successfully\n");
}

int main(int argc, char **argv)
{
	int rc;
@@ -112,5 +303,13 @@ int main(int argc, char **argv)
		return -EINVAL;
	}

	/* Initialize the threads */
	rc = nvmf_init_threads();
	assert(rc == 0);

	g_reactors_exit = true;
	nvmf_reactor_run(g_master_reactor);
	spdk_env_thread_wait_all();
	nvmf_destroy_threads();
	return rc;
}