Commit 6ade44c2 authored by Maciej Szwed's avatar Maciej Szwed Committed by Tomasz Zawadzki
Browse files

event: Implement new scheduler



This scheduler will group idle threads on first available
core and balance busy threads on other cores.

Change-Id: Ia0425c767dc3da2a66a9d82a20a0012fac83163c
Signed-off-by: default avatarVitaliy Mysak <vitaliy.mysak@intel.com>
Signed-off-by: default avatarMaciej Szwed <maciej.szwed@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/3901


Community-CI: Broadcom CI
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarTomasz Zawadzki <tomasz.zawadzki@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarPaul Luse <paul.e.luse@intel.com>
parent 80a13be9
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -41,7 +41,7 @@ CFLAGS += $(ENV_CFLAGS)

LIBNAME = event
C_SRCS = app.c reactor.c rpc.c subsystem.c json_config.c log_rpc.c \
	 app_rpc.c subsystem_rpc.c scheduler_static.c
	 app_rpc.c subsystem_rpc.c scheduler_static.c scheduler_dynamic.c

ifeq ($(OS),Linux)
C_SRCS += gscheduler.c dpdk_governor.c
+134 −0
Original line number Diff line number Diff line
/*-
 *   BSD LICENSE
 *
 *   Copyright (c) Intel Corporation.
 *   All rights reserved.
 *
 *   Redistribution and use in source and binary forms, with or without
 *   modification, are permitted provided that the following conditions
 *   are met:
 *
 *     * Redistributions of source code must retain the above copyright
 *       notice, this list of conditions and the following disclaimer.
 *     * Redistributions in binary form must reproduce the above copyright
 *       notice, this list of conditions and the following disclaimer in
 *       the documentation and/or other materials provided with the
 *       distribution.
 *     * Neither the name of Intel Corporation nor the names of its
 *       contributors may be used to endorse or promote products derived
 *       from this software without specific prior written permission.
 *
 *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

#include "spdk/stdinc.h"
#include "spdk/likely.h"
#include "spdk/event.h"
#include "spdk/log.h"
#include "spdk/env.h"

#include "spdk_internal/thread.h"
#include "spdk_internal/event.h"

static uint32_t g_next_lcore = SPDK_ENV_LCORE_ID_ANY;
static uint32_t g_main_lcore;

#define SCHEDULER_THREAD_BUSY 100

static uint8_t
_get_thread_load(struct spdk_lw_thread *lw_thread)
{
	uint64_t busy, idle;

	if (lw_thread->last_stats.busy_tsc == 0 && lw_thread->last_stats.idle_tsc == 0) {
		lw_thread->last_stats.busy_tsc = lw_thread->snapshot_stats.busy_tsc;
		lw_thread->last_stats.idle_tsc = lw_thread->snapshot_stats.idle_tsc;
		return SCHEDULER_THREAD_BUSY;
	}

	busy = lw_thread->snapshot_stats.busy_tsc - lw_thread->last_stats.busy_tsc;
	idle = lw_thread->snapshot_stats.idle_tsc - lw_thread->last_stats.idle_tsc;

	lw_thread->last_stats.busy_tsc = lw_thread->snapshot_stats.busy_tsc;
	lw_thread->last_stats.idle_tsc = lw_thread->snapshot_stats.idle_tsc;

	/* return percentage of time thread was busy */
	return busy  * 100 / (busy + idle);
}

static int
init(struct spdk_governor *governor)
{
	g_main_lcore = spdk_env_get_current_core();

	return 0;
}

static void
balance(struct spdk_scheduler_core_info *cores_info, int cores_count,
	struct spdk_governor *governor)
{
	struct spdk_lw_thread *lw_thread;
	struct spdk_thread *thread;
	struct spdk_scheduler_core_info *core;
	struct spdk_cpuset *cpumask;
	uint32_t target_lcore;
	uint32_t i, j, k;

	/* Distribute active threads across all cores except first one
	 * and move idle threads to first core */
	SPDK_ENV_FOREACH_CORE(i) {
		core = &cores_info[i];
		for (j = 0; j < core->threads_count; j++) {
			lw_thread = core->threads[j];
			lw_thread->new_lcore = lw_thread->lcore;
			thread = spdk_thread_get_from_ctx(lw_thread);
			cpumask = spdk_thread_get_cpumask(thread);

			if (_get_thread_load(lw_thread) < 50) {
				/* Continue searching for active threads */
				lw_thread->new_lcore = g_main_lcore;
				continue;
			}

			if (i != g_main_lcore) {
				/* Do not move active thread if it is not on the main core */
				continue;
			}

			/* Find a suitable reactor */
			for (k = 0; k < spdk_env_get_core_count(); k++) {
				if (g_next_lcore == SPDK_ENV_LCORE_ID_ANY) {
					g_next_lcore = spdk_env_get_first_core();
				}

				target_lcore = g_next_lcore;
				g_next_lcore = spdk_env_get_next_core(g_next_lcore);

				if (spdk_cpuset_get_cpu(cpumask, target_lcore)) {
					lw_thread->new_lcore = target_lcore;
					break;
				}
			}
		}
	}
}

static struct spdk_scheduler scheduler_dynamic = {
	.name = "dynamic",
	.init = init,
	.deinit = NULL,
	.balance = balance,
};

SPDK_SCHEDULER_REGISTER(scheduler_dynamic);
+165 −0
Original line number Diff line number Diff line
@@ -38,6 +38,7 @@
#include "event/reactor.c"
#include "spdk_internal/thread.h"
#include "event/scheduler_static.c"
#include "event/scheduler_dynamic.c"

static void
test_create_reactor(void)
@@ -445,6 +446,169 @@ test_reactor_stats(void)
	MOCK_CLEAR(spdk_env_get_current_core);
}

static void
test_scheduler(void)
{
	struct spdk_cpuset cpuset = {};
	struct spdk_thread *thread[3];
	struct spdk_lw_thread *lw_thread;
	struct spdk_reactor *reactor;
	struct spdk_poller *busy, *idle;
	int i;

	MOCK_SET(spdk_env_get_current_core, 0);

	allocate_cores(3);

	CU_ASSERT(spdk_reactors_init() == 0);

	_spdk_scheduler_set("dynamic");

	for (i = 0; i < 3; i++) {
		spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true);
	}
	g_next_core = 0;

	/* Create threads. */
	for (i = 0; i < 3; i++) {
		spdk_cpuset_set_cpu(&cpuset, i, true);
		thread[i] = spdk_thread_create(NULL, &cpuset);
		CU_ASSERT(thread[i] != NULL);
	}

	for (i = 0; i < 3; i++) {
		reactor = spdk_reactor_get(i);
		CU_ASSERT(reactor != NULL);
		MOCK_SET(spdk_env_get_current_core, i);
		CU_ASSERT(event_queue_run_batch(reactor) == 1);
		CU_ASSERT(!TAILQ_EMPTY(&reactor->threads));
	}

	g_reactor_state = SPDK_REACTOR_STATE_RUNNING;

	MOCK_SET(spdk_env_get_current_core, 0);

	/* Init threads stats (low load) */
	for (i = 0; i < 3; i++) {
		spdk_set_thread(thread[i]);
		busy = spdk_poller_register(poller_run_busy, (void *)10, 0);
		idle = spdk_poller_register(poller_run_idle, (void *)90, 0);
		reactor = spdk_reactor_get(i);
		CU_ASSERT(reactor != NULL);
		reactor->tsc_last = 100;
		_reactor_run(reactor);
		spdk_poller_unregister(&busy);
		spdk_poller_unregister(&idle);

		/* Update last stats so that we don't have to call scheduler twice */
		lw_thread = spdk_thread_get_ctx(thread[i]);
		lw_thread->last_stats.busy_tsc = UINT32_MAX;
		lw_thread->last_stats.idle_tsc = UINT32_MAX;
	}

	reactor = spdk_reactor_get(0);
	CU_ASSERT(reactor != NULL);

	_reactors_scheduler_gather_metrics(NULL, NULL);

	/* Gather metrics for all cores */
	reactor = spdk_reactor_get(1);
	CU_ASSERT(reactor != NULL);
	MOCK_SET(spdk_env_get_current_core, 1);
	CU_ASSERT(event_queue_run_batch(reactor) == 1);
	reactor = spdk_reactor_get(2);
	CU_ASSERT(reactor != NULL);
	MOCK_SET(spdk_env_get_current_core, 2);
	CU_ASSERT(event_queue_run_batch(reactor) == 1);
	reactor = spdk_reactor_get(0);
	CU_ASSERT(reactor != NULL);
	MOCK_SET(spdk_env_get_current_core, 0);
	CU_ASSERT(event_queue_run_batch(reactor) == 1);

	/* Threads were idle, so all of them should be placed on core 0 */
	for (i = 0; i < 3; i++) {
		reactor = spdk_reactor_get(i);
		CU_ASSERT(reactor != NULL);
		_reactor_run(reactor);
	}

	/* 2 threads should be scheduled to core 0 */
	reactor = spdk_reactor_get(0);
	CU_ASSERT(reactor != NULL);
	MOCK_SET(spdk_env_get_current_core, 0);
	CU_ASSERT(event_queue_run_batch(reactor) == 2);

	reactor = spdk_reactor_get(0);
	CU_ASSERT(reactor != NULL);
	CU_ASSERT(!TAILQ_EMPTY(&reactor->threads));
	reactor = spdk_reactor_get(1);
	CU_ASSERT(reactor != NULL);
	CU_ASSERT(TAILQ_EMPTY(&reactor->threads));
	reactor = spdk_reactor_get(2);
	CU_ASSERT(reactor != NULL);
	CU_ASSERT(TAILQ_EMPTY(&reactor->threads));

	/* Make threads busy */
	reactor = spdk_reactor_get(0);
	CU_ASSERT(reactor != NULL);
	reactor->tsc_last = 100;

	for (i = 0; i < 3; i++) {
		spdk_set_thread(thread[i]);
		busy = spdk_poller_register(poller_run_busy, (void *)100, 0);
		_reactor_run(reactor);
		spdk_poller_unregister(&busy);
	}

	/* Run scheduler again, this time all threads are busy */
	_reactors_scheduler_gather_metrics(NULL, NULL);

	/* Gather metrics for all cores */
	reactor = spdk_reactor_get(1);
	CU_ASSERT(reactor != NULL);
	MOCK_SET(spdk_env_get_current_core, 1);
	CU_ASSERT(event_queue_run_batch(reactor) == 1);
	reactor = spdk_reactor_get(2);
	CU_ASSERT(reactor != NULL);
	MOCK_SET(spdk_env_get_current_core, 2);
	CU_ASSERT(event_queue_run_batch(reactor) == 1);
	reactor = spdk_reactor_get(0);
	CU_ASSERT(reactor != NULL);
	MOCK_SET(spdk_env_get_current_core, 0);
	CU_ASSERT(event_queue_run_batch(reactor) == 1);

	/* Threads were busy, so they should be distributed evenly across cores */
	for (i = 0; i < 3; i++) {
		MOCK_SET(spdk_env_get_current_core, i);
		reactor = spdk_reactor_get(i);
		CU_ASSERT(reactor != NULL);
		_reactor_run(reactor);
	}

	for (i = 0; i < 3; i++) {
		reactor = spdk_reactor_get(i);
		CU_ASSERT(reactor != NULL);
		CU_ASSERT(!TAILQ_EMPTY(&reactor->threads));
	}

	g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED;

	/* Destroy threads */
	for (i = 0; i < 3; i++) {
		reactor = spdk_reactor_get(i);
		CU_ASSERT(reactor != NULL);
		reactor_run(reactor);
	}

	spdk_set_thread(NULL);

	MOCK_CLEAR(spdk_env_get_current_core);

	spdk_reactors_fini();

	free_cores();
}

int
main(int argc, char **argv)
{
@@ -463,6 +627,7 @@ main(int argc, char **argv)
	CU_ADD_TEST(suite, test_reschedule_thread);
	CU_ADD_TEST(suite, test_for_each_reactor);
	CU_ADD_TEST(suite, test_reactor_stats);
	CU_ADD_TEST(suite, test_scheduler);

	CU_basic_set_mode(CU_BRM_VERBOSE);
	CU_basic_run_tests();