Commit 3788d696 authored by Ben Walker's avatar Ben Walker Committed by Tomasz Zawadzki
Browse files

bdevperf: Eliminate reactors



Now, each bdevperf_job gets it's own bdevperf_thread. Scheduling the
jobs onto cores is left to the underlying event framework in the normal
case. In the multi-thread case, cpumasks are set on the jobs' threads to
ensure they're distributed appropriately.

Change-Id: I55f1a44b4262d715954b3a63bf00b8d2321fafca
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/1512


Community-CI: Mellanox Build Bot
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
parent 4b4b3cca
Loading
Loading
Loading
Loading
+147 −256
Original line number Diff line number Diff line
@@ -85,8 +85,6 @@ static const char *g_job_bdev_name;
static bool g_wait_for_tests = false;
static struct spdk_jsonrpc_request *g_request = NULL;
static bool g_multithread_mode = false;
static uint32_t g_core_ordinal = 0;
pthread_mutex_t g_ordinal_lock = PTHREAD_MUTEX_INITIALIZER;

static struct spdk_poller *g_perf_timer = NULL;

@@ -99,7 +97,8 @@ struct bdevperf_job {
	struct spdk_bdev_desc		*bdev_desc;
	struct spdk_io_channel		*ch;
	TAILQ_ENTRY(bdevperf_job)	link;
	struct bdevperf_reactor		*reactor;
	struct spdk_thread		*thread;

	uint64_t			io_completed;
	uint64_t			prev_io_completed;
	double				ema_io_per_second;
@@ -117,31 +116,20 @@ struct bdevperf_job {
	TAILQ_HEAD(, bdevperf_task)	task_list;
};

struct bdevperf_reactor {
	struct spdk_thread		*thread;
	TAILQ_HEAD(, bdevperf_job)	jobs;
	uint32_t			lcore;
	uint32_t			multiplier;
	TAILQ_ENTRY(bdevperf_reactor)	link;
};

struct spdk_bdevperf {
	TAILQ_HEAD(, bdevperf_reactor)	reactors;
	uint32_t			num_reactors;
	TAILQ_HEAD(, bdevperf_job)	jobs;
	uint32_t			running_jobs;
};

static struct spdk_bdevperf g_bdevperf = {
	.reactors = TAILQ_HEAD_INITIALIZER(g_bdevperf.reactors),
	.num_reactors = 0,
	.jobs = TAILQ_HEAD_INITIALIZER(g_bdevperf.jobs),
	.running_jobs = 0,
};

struct bdevperf_reactor *g_next_reactor;

static bool g_performance_dump_active = false;

struct bdevperf_aggregate_stats {
	struct bdevperf_job		*current_job;
	uint64_t			io_time_in_usec;
	uint64_t			ema_period;
	double				total_io_per_second;
@@ -183,8 +171,8 @@ performance_dump_job(struct bdevperf_aggregate_stats *stats, struct bdevperf_job
{
	double io_per_second, mb_per_second;

	printf("\r Thread name: %s\n", spdk_thread_get_name(job->reactor->thread));
	printf("\r Core Mask: 0x%s\n", spdk_cpuset_fmt(spdk_thread_get_cpumask(job->reactor->thread)));
	printf("\r Thread name: %s\n", spdk_thread_get_name(job->thread));
	printf("\r Core Mask: 0x%s\n", spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)));

	if (stats->ema_period == 0) {
		io_per_second = get_cma_io_per_second(job, stats->io_time_in_usec);
@@ -290,41 +278,9 @@ verify_data(void *wr_buf, int wr_buf_len, void *rd_buf, int rd_buf_len, int bloc
	return true;
}

static void
_bdevperf_fini_thread_done(struct spdk_io_channel_iter *i, int status)
{
	spdk_io_device_unregister(&g_bdevperf, NULL);

	spdk_app_stop(g_run_rc);
}

static void
_bdevperf_fini_thread(struct spdk_io_channel_iter *i)
{
	struct spdk_io_channel *ch;
	struct bdevperf_reactor *reactor;

	ch = spdk_io_channel_iter_get_channel(i);
	reactor = spdk_io_channel_get_ctx(ch);

	TAILQ_REMOVE(&g_bdevperf.reactors, reactor, link);

	spdk_put_io_channel(ch);

	spdk_for_each_channel_continue(i, 0);
}

static void
bdevperf_fini(void)
{
	spdk_for_each_channel(&g_bdevperf, _bdevperf_fini_thread, NULL,
			      _bdevperf_fini_thread_done);
}

static void
bdevperf_test_done(void *ctx)
{
	struct bdevperf_reactor *reactor;
	struct bdevperf_job *job, *jtmp;
	struct bdevperf_task *task, *ttmp;

@@ -349,9 +305,8 @@ bdevperf_test_done(void *ctx)
		       (double)g_time_in_usec / 1000000);
	}

	TAILQ_FOREACH(reactor, &g_bdevperf.reactors, link) {
		TAILQ_FOREACH_SAFE(job, &reactor->jobs, link, jtmp) {
			TAILQ_REMOVE(&reactor->jobs, job, link);
	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
		TAILQ_REMOVE(&g_bdevperf.jobs, job, link);

		performance_dump_job(&g_stats, job);

@@ -369,7 +324,6 @@ bdevperf_test_done(void *ctx)
		free(job->name);
		free(job);
	}
	}

	printf("\r =====================================================\n");
	printf("\r %-20s: %10.2f IOPS %10.2f MiB/s\n",
@@ -379,7 +333,7 @@ bdevperf_test_done(void *ctx)
	if (g_request && !g_shutdown) {
		rpc_perform_tests_cb();
	} else {
		bdevperf_fini();
		spdk_app_stop(g_run_rc);
	}
}

@@ -864,8 +818,9 @@ reset_job(void *arg)
}

static void
bdevperf_job_run(struct bdevperf_job *job)
bdevperf_job_run(void *ctx)
{
	struct bdevperf_job *job = ctx;
	struct bdevperf_task *task;
	int i;

@@ -886,30 +841,9 @@ bdevperf_job_run(struct bdevperf_job *job)
}

static void
bdevperf_submit_on_reactor(struct spdk_io_channel_iter *i)
{
	struct spdk_io_channel *ch;
	struct bdevperf_reactor *reactor;
	struct bdevperf_job *job;

	ch = spdk_io_channel_iter_get_channel(i);
	reactor = spdk_io_channel_get_ctx(ch);

	/* Submit initial I/O for each block device. Each time one
	 * completes, another will be submitted. */
	TAILQ_FOREACH(job, &reactor->jobs, link) {
		bdevperf_job_run(job);
	}

	spdk_for_each_channel_continue(i, 0);
}

static void
_performance_dump_done(struct spdk_io_channel_iter *i, int status)
_performance_dump_done(void *ctx)
{
	struct bdevperf_aggregate_stats *stats;

	stats = spdk_io_channel_iter_get_ctx(i);
	struct bdevperf_aggregate_stats *stats = ctx;

	printf("\r =====================================================\n");
	printf("\r %-20s: %10.2f IOPS %10.2f MiB/s\n",
@@ -922,29 +856,20 @@ _performance_dump_done(struct spdk_io_channel_iter *i, int status)
}

static void
_performance_dump(struct spdk_io_channel_iter *i)
_performance_dump(void *ctx)
{
	struct bdevperf_aggregate_stats *stats;
	struct spdk_io_channel *ch;
	struct bdevperf_reactor *reactor;
	struct bdevperf_job *job;
	struct bdevperf_aggregate_stats *stats = ctx;

	stats = spdk_io_channel_iter_get_ctx(i);
	ch = spdk_io_channel_iter_get_channel(i);
	reactor = spdk_io_channel_get_ctx(ch);

	if (TAILQ_EMPTY(&reactor->jobs)) {
		goto exit;
	}
	performance_dump_job(stats, stats->current_job);

	TAILQ_FOREACH(job, &reactor->jobs, link) {
		performance_dump_job(stats, job);
	/* This assumes the jobs list is static after start up time.
	 * That's true right now, but if that ever changed this would need a lock. */
	stats->current_job = TAILQ_NEXT(stats->current_job, link);
	if (stats->current_job == NULL) {
		spdk_thread_send_msg(g_master_thread, _performance_dump_done, stats);
	} else {
		spdk_thread_send_msg(stats->current_job->thread, _performance_dump, stats);
	}

	fflush(stdout);

exit:
	spdk_for_each_channel_continue(i, 0);
}

static int
@@ -968,14 +893,25 @@ performance_statistics_thread(void *arg)
	stats->io_time_in_usec = g_show_performance_period_num * g_show_performance_period_in_usec;
	stats->ema_period = g_show_performance_ema_period;

	spdk_for_each_channel(&g_bdevperf, _performance_dump, stats,
			      _performance_dump_done);
	/* Iterate all of the jobs to gather stats
	 * These jobs will not get removed here until a final performance dump is run,
	 * so this should be safe without locking.
	 */
	stats->current_job = TAILQ_FIRST(&g_bdevperf.jobs);
	if (stats->current_job == NULL) {
		spdk_thread_send_msg(g_master_thread, _performance_dump_done, stats);
	} else {
		spdk_thread_send_msg(stats->current_job->thread, _performance_dump, stats);
	}

	return -1;
}

static void
bdevperf_test(void)
{
	struct bdevperf_job *job;

	printf("Running I/O for %" PRIu64 " seconds...\n", g_time_in_usec / 1000000);
	fflush(stdout);

@@ -986,8 +922,11 @@ bdevperf_test(void)
						    g_show_performance_period_in_usec);
	}

	/* Iterate reactors to start all I/O */
	spdk_for_each_channel(&g_bdevperf, bdevperf_submit_on_reactor, NULL, NULL);
	/* Iterate jobs to start all I/O */
	TAILQ_FOREACH(job, &g_bdevperf.jobs, link) {
		g_bdevperf.running_jobs++;
		spdk_thread_send_msg(job->thread, bdevperf_job_run, job);
	}
}

static void
@@ -995,9 +934,6 @@ bdevperf_bdev_removed(void *arg)
{
	struct bdevperf_job *job = arg;

	assert(spdk_io_channel_get_thread(spdk_io_channel_from_ctx(job->reactor)) ==
	       spdk_get_thread());

	bdevperf_job_drain(job);
}

@@ -1006,10 +942,8 @@ static uint32_t g_construct_job_count = 0;
static void
_bdevperf_construct_job_done(void *ctx)
{
	/* Update g_bdevperf.running_jobs on the master thread. */
	g_bdevperf.running_jobs++;

	if (--g_construct_job_count == 0) {

		if (g_run_rc != 0) {
			/* Something failed. */
			bdevperf_test_done(NULL);
@@ -1047,17 +981,27 @@ end:
}

static int
bdevperf_construct_job(struct spdk_bdev *bdev, struct bdevperf_reactor *reactor)
bdevperf_construct_job(struct spdk_bdev *bdev, struct spdk_cpuset *cpumask,
		       uint32_t offset, uint32_t length)
{
	struct bdevperf_job *job;
	struct bdevperf_task *task;
	int block_size, data_block_size;
	int rc;
	int task_num, n;
	char thread_name[32];
	struct spdk_thread *thread;

	/* This function runs on the master thread. */
	assert(g_master_thread == spdk_get_thread());

	snprintf(thread_name, sizeof(thread_name), "%s_%s", spdk_bdev_get_name(bdev),
		 spdk_cpuset_fmt(cpumask));

	/* Create a new thread for the job */
	thread = spdk_thread_create(thread_name, cpumask);
	assert(thread != NULL);

	block_size = spdk_bdev_get_block_size(bdev);
	data_block_size = spdk_bdev_get_data_block_size(bdev);

@@ -1096,13 +1040,15 @@ bdevperf_construct_job(struct spdk_bdev *bdev, struct bdevperf_reactor *reactor)
		job->dif_check_flags |= SPDK_DIF_FLAGS_GUARD_CHECK;
	}

	job->size_in_ios = spdk_bdev_get_num_blocks(bdev) / job->io_size_blocks;
	job->offset_in_ios = 0;

	if (g_multithread_mode) {
		job->size_in_ios = job->size_in_ios / g_bdevperf.num_reactors;
		job->ios_base = reactor->multiplier * job->size_in_ios;
	if (length != 0) {
		/* Use subset of disk */
		job->size_in_ios = length / job->io_size_blocks;
		job->ios_base = offset / job->io_size_blocks;
	} else {
		/* Use whole disk */
		job->size_in_ios = spdk_bdev_get_num_blocks(bdev) / job->io_size_blocks;
		job->ios_base = 0;
	}

@@ -1124,7 +1070,7 @@ bdevperf_construct_job(struct spdk_bdev *bdev, struct bdevperf_reactor *reactor)
		task_num += 1;
	}

	TAILQ_INSERT_TAIL(&reactor->jobs, job, link);
	TAILQ_INSERT_TAIL(&g_bdevperf.jobs, job, link);

	for (n = 0; n < task_num; n++) {
		task = calloc(1, sizeof(struct bdevperf_task));
@@ -1157,11 +1103,11 @@ bdevperf_construct_job(struct spdk_bdev *bdev, struct bdevperf_reactor *reactor)
		TAILQ_INSERT_TAIL(&job->task_list, task, link);
	}

	job->reactor = reactor;
	job->thread = thread;

	g_construct_job_count++;

	rc = spdk_thread_send_msg(reactor->thread, _bdevperf_construct_job, job);
	rc = spdk_thread_send_msg(thread, _bdevperf_construct_job, job);
	assert(rc == 0);

	return rc;
@@ -1171,9 +1117,23 @@ static void
bdevperf_construct_multithread_jobs(void)
{
	struct spdk_bdev *bdev;
	struct bdevperf_reactor *reactor;
	uint32_t i;
	struct spdk_cpuset cpumask;
	uint32_t num_cores;
	uint32_t blocks_per_job;
	uint32_t offset;
	int rc;

	num_cores = 0;
	SPDK_ENV_FOREACH_CORE(i) {
		num_cores++;
	}

	if (num_cores == 0) {
		g_run_rc = -EINVAL;
		return;
	}

	if (g_job_bdev_name != NULL) {
		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
		if (!bdev) {
@@ -1181,24 +1141,40 @@ bdevperf_construct_multithread_jobs(void)
			return;
		}

		/* Build a job for each reactor */
		TAILQ_FOREACH(reactor, &g_bdevperf.reactors, link) {
			rc = bdevperf_construct_job(bdev, reactor);
		blocks_per_job = spdk_bdev_get_num_blocks(bdev) / num_cores;
		offset = 0;

		SPDK_ENV_FOREACH_CORE(i) {
			spdk_cpuset_zero(&cpumask);
			spdk_cpuset_set_cpu(&cpumask, i, true);

			/* Construct the job */
			rc = bdevperf_construct_job(bdev, &cpumask, offset, blocks_per_job);
			if (rc < 0) {
				g_run_rc = rc;
				break;
			}

			offset += blocks_per_job;
		}
	} else {
		bdev = spdk_bdev_first_leaf();
		while (bdev != NULL) {
			/* Build a job for each reactor */
			TAILQ_FOREACH(reactor, &g_bdevperf.reactors, link) {
				rc = bdevperf_construct_job(bdev, reactor);
			blocks_per_job = spdk_bdev_get_num_blocks(bdev) / num_cores;
			offset = 0;

			SPDK_ENV_FOREACH_CORE(i) {
				spdk_cpuset_zero(&cpumask);
				spdk_cpuset_set_cpu(&cpumask, i, true);

				/* Construct the job */
				rc = bdevperf_construct_job(bdev, &cpumask, offset, blocks_per_job);
				if (rc < 0) {
					g_run_rc = rc;
					break;
				}

				offset += blocks_per_job;
			}

			if (g_run_rc != 0) {
@@ -1210,35 +1186,37 @@ bdevperf_construct_multithread_jobs(void)
	}
}


static struct bdevperf_reactor *
get_next_bdevperf_reactor(void)
static uint32_t
_get_next_core(void)
{
	struct bdevperf_reactor *reactor;
	static uint32_t current_core = SPDK_ENV_LCORE_ID_ANY;

	if (g_next_reactor == NULL) {
		g_next_reactor = TAILQ_FIRST(&g_bdevperf.reactors);
		assert(g_next_reactor != NULL);
	if (current_core == SPDK_ENV_LCORE_ID_ANY) {
		current_core = spdk_env_get_first_core();
		return current_core;
	}

	reactor = g_next_reactor;
	g_next_reactor = TAILQ_NEXT(g_next_reactor, link);
	current_core = spdk_env_get_next_core(current_core);
	if (current_core == SPDK_ENV_LCORE_ID_ANY) {
		current_core = spdk_env_get_first_core();
	}

	return reactor;
	return current_core;
}

static void
bdevperf_construct_jobs(void)
{
	struct spdk_bdev *bdev;
	struct bdevperf_reactor *reactor;
	uint32_t lcore;
	struct spdk_cpuset cpumask;
	int rc;

	/* There are two entirely separate modes for allocating jobs. Standard mode
	 * (the default) creates one job per bdev and assigns them to reactors round-robin.
	 * (the default) creates one spdk_thread per bdev and runs the I/O job there.
	 *
	 * The -C flag places bdevperf into "multithread" mode, meaning it creates
	 * one job per bdev per REACTOR.
	 * one spdk_thread per bdev PER CORE, and runs a copy of the job on each.
	 * This runs multiple threads per bdev, effectively.
	 */

@@ -1255,11 +1233,13 @@ bdevperf_construct_jobs(void)
	if (g_job_bdev_name != NULL) {
		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
		if (bdev) {
			/* Select the reactor for this job */
			reactor = get_next_bdevperf_reactor();
			lcore = _get_next_core();

			spdk_cpuset_zero(&cpumask);
			spdk_cpuset_set_cpu(&cpumask, lcore, true);

			/* Construct the job */
			rc = bdevperf_construct_job(bdev, reactor);
			rc = bdevperf_construct_job(bdev, &cpumask, 0, 0);
			if (rc < 0) {
				g_run_rc = rc;
			}
@@ -1268,12 +1248,15 @@ bdevperf_construct_jobs(void)
		}
	} else {
		bdev = spdk_bdev_first_leaf();

		while (bdev != NULL) {
			/* Select the reactor for this job */
			reactor = get_next_bdevperf_reactor();
			lcore = _get_next_core();

			spdk_cpuset_zero(&cpumask);
			spdk_cpuset_set_cpu(&cpumask, lcore, true);

			/* Construct the job */
			rc = bdevperf_construct_job(bdev, reactor);
			rc = bdevperf_construct_job(bdev, &cpumask, 0, 0);
			if (rc < 0) {
				g_run_rc = rc;
				break;
@@ -1295,48 +1278,10 @@ end:
	}
}

static int
bdevperf_reactor_create(void *io_device, void *ctx_buf)
{
	struct bdevperf_reactor *reactor = ctx_buf;

	TAILQ_INIT(&reactor->jobs);
	reactor->lcore = spdk_env_get_current_core();
	pthread_mutex_lock(&g_ordinal_lock);
	reactor->multiplier = g_core_ordinal++;
	pthread_mutex_unlock(&g_ordinal_lock);
	reactor->thread = spdk_get_thread();

	return 0;
}

static void
bdevperf_reactor_destroy(void *io_device, void *ctx_buf)
{
	struct bdevperf_reactor *reactor = ctx_buf;
	struct spdk_io_channel *ch;
	struct spdk_thread *thread;

	ch = spdk_io_channel_from_ctx(reactor);
	thread = spdk_io_channel_get_thread(ch);

	assert(thread == spdk_get_thread());

	spdk_thread_exit(thread);
}

static void
_bdevperf_init_thread_done(void *ctx)
bdevperf_run(void *arg1)
{
	struct bdevperf_reactor *reactor = ctx;

	TAILQ_INSERT_TAIL(&g_bdevperf.reactors, reactor, link);

	assert(g_bdevperf.num_reactors < spdk_env_get_core_count());

	if (++g_bdevperf.num_reactors < spdk_env_get_core_count()) {
		return;
	}
	g_master_thread = spdk_get_thread();

	if (g_wait_for_tests) {
		/* Do not perform any tests until RPC is received */
@@ -1346,46 +1291,6 @@ _bdevperf_init_thread_done(void *ctx)
	bdevperf_construct_jobs();
}

static void
_bdevperf_init_thread(void *ctx)
{
	struct spdk_io_channel *ch;
	struct bdevperf_reactor *reactor;

	ch = spdk_get_io_channel(&g_bdevperf);
	reactor = spdk_io_channel_get_ctx(ch);

	spdk_thread_send_msg(g_master_thread, _bdevperf_init_thread_done, reactor);
}

static void
bdevperf_run(void *arg1)
{
	struct spdk_cpuset tmp_cpumask = {};
	uint32_t i;
	char thread_name[32];
	struct spdk_thread *thread;

	g_master_thread = spdk_get_thread();

	spdk_io_device_register(&g_bdevperf, bdevperf_reactor_create, bdevperf_reactor_destroy,
				sizeof(struct bdevperf_reactor), "bdevperf");

	/* Create threads for CPU cores active for this application, and send a
	 * message to each thread to create a reactor on it.
	 */
	SPDK_ENV_FOREACH_CORE(i) {
		spdk_cpuset_zero(&tmp_cpumask);
		spdk_cpuset_set_cpu(&tmp_cpumask, i, true);
		snprintf(thread_name, sizeof(thread_name), "bdevperf_reactor_%u", i);

		thread = spdk_thread_create(thread_name, &tmp_cpumask);
		assert(thread != NULL);

		spdk_thread_send_msg(thread, _bdevperf_init_thread, NULL);
	}
}

static void
rpc_perform_tests_cb(void)
{
@@ -1428,32 +1333,16 @@ rpc_perform_tests(struct spdk_jsonrpc_request *request, const struct spdk_json_v
SPDK_RPC_REGISTER("perform_tests", rpc_perform_tests, SPDK_RPC_RUNTIME)

static void
bdevperf_stop_io_on_reactor(struct spdk_io_channel_iter *i)
_bdevperf_job_drain(void *ctx)
{
	struct spdk_io_channel *ch;
	struct bdevperf_reactor *reactor;
	struct bdevperf_job *job;

	ch = spdk_io_channel_iter_get_channel(i);
	reactor = spdk_io_channel_get_ctx(ch);

	/* Stop I/O for each block device. */
	TAILQ_FOREACH(job, &reactor->jobs, link) {
		bdevperf_job_drain(job);
	}

	spdk_for_each_channel_continue(i, 0);
	bdevperf_job_drain(ctx);
}

static void
spdk_bdevperf_shutdown_cb(void)
{
	g_shutdown = true;

	if (TAILQ_EMPTY(&g_bdevperf.reactors)) {
		spdk_app_stop(0);
		return;
	}
	struct bdevperf_job *job, *tmp;

	if (g_bdevperf.running_jobs == 0) {
		bdevperf_test_done(NULL);
@@ -1462,8 +1351,10 @@ spdk_bdevperf_shutdown_cb(void)

	g_shutdown_tsc = spdk_get_ticks() - g_shutdown_tsc;

	/* Send events to stop all I/O on each reactor */
	spdk_for_each_channel(&g_bdevperf, bdevperf_stop_io_on_reactor, NULL, NULL);
	/* Iterate jobs to stop all I/O */
	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, tmp) {
		spdk_thread_send_msg(job->thread, _bdevperf_job_drain, job);
	}
}

static int