Commit 12759ab5 authored by Ben Walker's avatar Ben Walker Committed by Jim Harris
Browse files

event: The reactor now contains a list of threads



It iterates over the list and polls each one. However,
in practice the list still contains just one thread for
now.

Change-Id: I9bac7eb5ebf9b4edc6409caaf26747470b65e336
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.gerrithub.io/c/440763


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
parent 4e7bb83e
Loading
Loading
Loading
Loading
+41 −8
Original line number Diff line number Diff line
@@ -53,10 +53,17 @@ enum spdk_reactor_state {
	SPDK_REACTOR_STATE_SHUTDOWN = 4,
};

struct spdk_lw_thread {
	TAILQ_ENTRY(spdk_lw_thread)	link;
};

struct spdk_reactor {
	/* Logical core number for this reactor. */
	uint32_t					lcore;

	/* Lightweight threads running on this reactor */
	TAILQ_HEAD(, spdk_lw_thread)			threads;

	/* Poller for get the rusage for the reactor. */
	struct spdk_poller				*rusage_poller;

@@ -204,29 +211,47 @@ static int
_spdk_reactor_run(void *arg)
{
	struct spdk_reactor	*reactor = arg;
	struct spdk_thread	*thread;
	struct spdk_thread	*orig_thread, *thread;
	uint64_t		last_rusage = 0;
	struct spdk_lw_thread	*lw_thread, *tmp;
	char			thread_name[32];

	snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore);
	thread = spdk_thread_create(thread_name);
	if (!thread) {
	orig_thread = spdk_thread_create(thread_name);
	if (!orig_thread) {
		return -1;
	}

	lw_thread = (struct spdk_lw_thread *)spdk_thread_get_ctx(orig_thread);
	if (!lw_thread) {
		spdk_thread_exit(orig_thread);
		return -ENOMEM;
	}

	TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link);

	SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore);

	while (1) {
		uint64_t now;

		/* For each loop through the reactor, capture the time. This time
		 * is used for all threads. */
		now = spdk_get_ticks();

		TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
			thread = spdk_thread_get_from_ctx(lw_thread);

			_spdk_event_queue_run_batch(reactor, thread);

		spdk_thread_poll(thread, 0, 0);
			spdk_thread_poll(thread, 0, now);
		}

		if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) {
			break;
		}

		if (g_context_switch_monitor_enabled) {
			uint64_t now = spdk_get_ticks();

			if ((last_rusage + CONTEXT_SWITCH_MONITOR_PERIOD) < now) {
				get_rusage(reactor);
				last_rusage = now;
@@ -234,6 +259,12 @@ _spdk_reactor_run(void *arg)
		}
	}

	lw_thread = spdk_thread_get_ctx(orig_thread);
	TAILQ_REMOVE(&reactor->threads, lw_thread, link);
	assert(TAILQ_EMPTY(&reactor->threads));

	spdk_thread_exit(orig_thread);

	return 0;
}

@@ -242,6 +273,8 @@ spdk_reactor_construct(struct spdk_reactor *reactor, uint32_t lcore)
{
	reactor->lcore = lcore;

	TAILQ_INIT(&reactor->threads);

	reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
	assert(reactor->events != NULL);
}
@@ -343,7 +376,7 @@ spdk_reactors_init(void)

	memset(g_reactors, 0, (last_core + 1) * sizeof(struct spdk_reactor));

	spdk_thread_lib_init(NULL, 0);
	spdk_thread_lib_init(NULL, sizeof(struct spdk_lw_thread));

	SPDK_ENV_FOREACH_CORE(i) {
		reactor = spdk_reactor_get(i);