Commit 2cffc800 authored by Vitaliy Mysak's avatar Vitaliy Mysak Committed by Tomasz Zawadzki
Browse files

event: add rebalancing infrastructure



Features:
- does not delay thread creation
- does delay thread deletion
- singlethreaded, but can be made multithreaded.
  By being singlethreaded, we don't waste time,
  because reactors are not paused during rescheduling,
  but we do make statistics less up to date

Change-Id: Ie5a7e8569bc32b3fd4bb887804dfbc3f5c2ea858
Signed-off-by: default avatarVitaliy Mysak <vitaliy.mysak@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/3899


Reviewed-by: default avatarTomasz Zawadzki <tomasz.zawadzki@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarBen Walker <benjamin.walker@intel.com>
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
parent 7148f333
Loading
Loading
Loading
Loading
+164 −18
Original line number Diff line number Diff line
@@ -66,7 +66,9 @@ TAILQ_HEAD(, spdk_scheduler) g_scheduler_list
	= TAILQ_HEAD_INITIALIZER(g_scheduler_list);

static struct spdk_scheduler *g_scheduler;
static struct spdk_reactor *g_scheduling_reactor;
static uint32_t g_scheduler_period;
static struct spdk_scheduler_core_info *g_core_infos = NULL;

static int reactor_interrupt_init(struct spdk_reactor *reactor);
static void reactor_interrupt_fini(struct spdk_reactor *reactor);
@@ -197,6 +199,14 @@ spdk_reactors_init(void)
		return -1;
	}

	g_core_infos = calloc(last_core + 1, sizeof(*g_core_infos));
	if (g_core_infos == NULL) {
		SPDK_ERRLOG("Could not allocate memory for g_core_infos\n");
		spdk_mempool_free(g_spdk_event_mempool);
		free(g_reactors);
		return -ENOMEM;
	}

	memset(g_reactors, 0, (last_core + 1) * sizeof(struct spdk_reactor));

	spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported,
@@ -238,12 +248,18 @@ spdk_reactors_fini(void)
		if (reactor->interrupt_mode) {
			reactor_interrupt_fini(reactor);
		}

		if (g_core_infos != NULL) {
			free(g_core_infos[i].threads);
		}
	}

	spdk_mempool_free(g_spdk_event_mempool);

	free(g_reactors);
	g_reactors = NULL;
	free(g_core_infos);
	g_core_infos = NULL;
}

struct spdk_event *
@@ -423,6 +439,115 @@ _set_thread_name(const char *thread_name)
#endif
}

static void
_init_thread_stats(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
{
	struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);

	lw_thread->lcore = reactor->lcore;

	spdk_set_thread(thread);
	spdk_thread_get_stats(&lw_thread->current_stats);
}

static void
_threads_reschedule(struct spdk_scheduler_core_info *cores_info)
{
	struct spdk_scheduler_core_info *core;
	struct spdk_lw_thread *lw_thread;
	uint32_t i, j;

	SPDK_ENV_FOREACH_CORE(i) {
		core = &cores_info[i];
		for (j = 0; j < core->threads_count; j++) {
			lw_thread = core->threads[j];
			if (lw_thread->lcore != lw_thread->new_lcore) {
				_spdk_lw_thread_set_core(lw_thread, lw_thread->new_lcore);
			}
		}
	}
}

static void
_reactors_scheduler_fini(void *arg1, void *arg2)
{
	struct spdk_reactor *reactor;
	uint32_t last_core;
	uint32_t i;

	if (g_reactor_state == SPDK_REACTOR_STATE_RUNNING) {
		last_core = spdk_env_get_last_core();
		g_scheduler->balance(g_core_infos, last_core + 1);

		/* Reschedule based on the balancing output */
		_threads_reschedule(g_core_infos);

		SPDK_ENV_FOREACH_CORE(i) {
			reactor = spdk_reactor_get(i);
			reactor->flags.is_scheduling = false;
		}
	}
}

/* Phase 1 of thread scheduling is to gather metrics on the existing threads */
static void
_reactors_scheduler_gather_metrics(void *arg1, void *arg2)
{
	struct spdk_scheduler_core_info *core_info;
	struct spdk_lw_thread *lw_thread;
	struct spdk_reactor *reactor;
	struct spdk_event *evt;
	uint32_t next_core;
	uint32_t i;

	reactor = spdk_reactor_get(spdk_env_get_current_core());
	reactor->flags.is_scheduling = true;
	core_info = &g_core_infos[reactor->lcore];
	core_info->lcore = reactor->lcore;
	core_info->core_idle_tsc = reactor->idle_tsc;
	core_info->core_busy_tsc = reactor->busy_tsc;

	SPDK_DEBUGLOG(reactor, "Gathering metrics on %u\n", reactor->lcore);

	free(core_info->threads);
	core_info->threads = NULL;

	i = 0;

	TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
		_init_thread_stats(reactor, lw_thread);
		i++;
	}

	core_info->threads_count = i;

	if (core_info->threads_count > 0) {
		core_info->threads = calloc(core_info->threads_count, sizeof(struct spdk_lw_thread *));

		i = 0;
		TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
			core_info->threads[i] = lw_thread;
			i++;
		}
	}

	next_core = spdk_env_get_next_core(reactor->lcore);
	if (next_core == UINT32_MAX) {
		next_core = spdk_env_get_first_core();
	}

	/* If we've looped back around to the scheduler thread, move to the next phase */
	if (next_core == g_scheduling_reactor->lcore) {
		/* Phase 2 of scheduling is rebalancing - deciding which threads to move where */
		evt = spdk_event_allocate(next_core, _reactors_scheduler_fini, NULL, NULL);
		spdk_event_call(evt);
		return;
	}

	evt = spdk_event_allocate(next_core, _reactors_scheduler_gather_metrics, NULL, NULL);
	spdk_event_call(evt);
}

static int _reactor_schedule_thread(struct spdk_thread *thread);
static uint64_t g_rusage_period;

@@ -448,6 +573,7 @@ reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thre

	if (spdk_unlikely(spdk_thread_is_exited(thread) &&
			  spdk_thread_is_idle(thread))) {
		if (reactor->flags.is_scheduling == false) {
			TAILQ_REMOVE(&reactor->threads, lw_thread, link);
			assert(reactor->thread_count > 0);
			reactor->thread_count--;
@@ -459,6 +585,7 @@ reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thre
			spdk_thread_destroy(thread);
			return true;
		}
	}

	return false;
}
@@ -513,6 +640,7 @@ reactor_run(void *arg)
	struct spdk_thread	*thread;
	struct spdk_lw_thread	*lw_thread, *tmp;
	char			thread_name[32];
	uint64_t		last_sched = 0;

	SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore);

@@ -531,6 +659,14 @@ reactor_run(void *arg)
			_reactor_run(reactor);
		}

		if (spdk_unlikely((reactor->tsc_last - last_sched) > g_scheduler_period &&
				  reactor == g_scheduling_reactor &&
				  !reactor->flags.is_scheduling &&
				  g_scheduler->balance)) {
			last_sched = reactor->tsc_last;
			_reactors_scheduler_gather_metrics(NULL, NULL);
		}

		if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) {
			break;
		}
@@ -629,6 +765,7 @@ spdk_reactors_start(void)
	/* Start the master reactor */
	reactor = spdk_reactor_get(current_core);
	assert(reactor != NULL);
	g_scheduling_reactor = reactor;
	reactor_run(reactor);

	spdk_env_thread_wait_all();
@@ -712,9 +849,11 @@ _reactor_schedule_thread(struct spdk_thread *thread)

	lw_thread = spdk_thread_get_ctx(thread);
	assert(lw_thread != NULL);
	core = lw_thread->lcore;
	memset(lw_thread, 0, sizeof(*lw_thread));

	pthread_mutex_lock(&g_scheduler_mtx);
	if (core == SPDK_ENV_LCORE_ID_ANY) {
		for (i = 0; i < spdk_env_get_core_count(); i++) {
			if (g_next_core > spdk_env_get_last_core()) {
				g_next_core = spdk_env_get_first_core();
@@ -723,10 +862,13 @@ _reactor_schedule_thread(struct spdk_thread *thread)
			g_next_core = spdk_env_get_next_core(g_next_core);

			if (spdk_cpuset_get_cpu(cpumask, core)) {
			evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL);
				break;
			}
		}
	}

	evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL);

	pthread_mutex_unlock(&g_scheduler_mtx);

	assert(evt != NULL);
@@ -770,8 +912,12 @@ _reactor_request_thread_reschedule(struct spdk_thread *thread)
static int
reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op)
{
	struct spdk_lw_thread *lw_thread;

	switch (op) {
	case SPDK_THREAD_OP_NEW:
		lw_thread = spdk_thread_get_ctx(thread);
		lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY;
		return _reactor_schedule_thread(thread);
	case SPDK_THREAD_OP_RESCHED:
		_reactor_request_thread_reschedule(thread);