Commit 6f4e0c95 authored by Shuhei Matsumoto's avatar Shuhei Matsumoto Committed by Tomasz Zawadzki
Browse files

thread: Don't move to EXITED if for_each_channel is in progress



There was a race between spdk_thread_exit() and spdk_for_each_channel().
A SPDK thread should not move to EXITED until spdk_for_each_channel()
completes especially if the SPDK thread is the caller.
Realize this adding a counter to spdk_thread structure and using it.
spdk_for_each_thread() has the same issue. Hence, fix
spdk_for_each_thread() together.
Verify this by simple unit test cases.

Fixes #3201

Signed-off-by: default avatarShuhei Matsumoto <smatsumoto@nvidia.com>
Change-Id: If6be50cf3204d13f71596bbbb84f4de0f9adfcec
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/20788


Reviewed-by: default avatarKonrad Sztyber <konrad.sztyber@intel.com>
Community-CI: Mellanox Build Bot
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarJim Harris <jim.harris@samsung.com>
parent df6a4149
Loading
Loading
Loading
Loading
+27 −2
Original line number Diff line number Diff line
@@ -133,6 +133,7 @@ struct spdk_thread {
	uint64_t			next_poller_id;
	enum spdk_thread_state		state;
	int				pending_unregister_count;
	uint32_t			for_each_count;

	RB_HEAD(io_channel_tree, spdk_io_channel)	io_channels;
	TAILQ_ENTRY(spdk_thread)			tailq;
@@ -635,6 +636,12 @@ thread_exit(struct spdk_thread *thread, uint64_t now)
		return;
	}

	if (thread->for_each_count > 0) {
		SPDK_INFOLOG(thread, "thread %s is still executing %u for_each_channels/threads\n",
			     thread->name, thread->for_each_count);
		return;
	}

	TAILQ_FOREACH(poller, &thread->active_pollers, tailq) {
		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
			SPDK_INFOLOG(thread,
@@ -1968,6 +1975,18 @@ struct call_thread {
	spdk_msg_fn cpl;
};

static void
_back_to_orig_thread(void *ctx)
{
	struct call_thread *ct = ctx;

	assert(ct->orig_thread->for_each_count > 0);
	ct->orig_thread->for_each_count--;

	ct->cpl(ct->ctx);
	free(ctx);
}

static void
_on_thread(void *ctx)
{
@@ -1988,8 +2007,7 @@ _on_thread(void *ctx)
	if (!ct->cur_thread) {
		SPDK_DEBUGLOG(thread, "Completed thread iteration\n");

		rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
		free(ctx);
		rc = spdk_thread_send_msg(ct->orig_thread, _back_to_orig_thread, ctx);
	} else {
		SPDK_DEBUGLOG(thread, "Continuing thread iteration to %s\n",
			      ct->cur_thread->name);
@@ -2026,6 +2044,8 @@ spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl)
	}
	ct->orig_thread = thread;

	ct->orig_thread->for_each_count++;

	pthread_mutex_lock(&g_devlist_mutex);
	ct->cur_thread = TAILQ_FIRST(&g_threads);
	pthread_mutex_unlock(&g_devlist_mutex);
@@ -2504,6 +2524,9 @@ _call_completion(void *ctx)
{
	struct spdk_io_channel_iter *i = ctx;

	assert(i->orig_thread->for_each_count > 0);
	i->orig_thread->for_each_count--;

	if (i->cpl != NULL) {
		i->cpl(i, i->status);
	}
@@ -2554,6 +2577,8 @@ spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
	i->cpl = cpl;
	i->orig_thread = _get_thread();

	i->orig_thread->for_each_count++;

	pthread_mutex_lock(&g_devlist_mutex);
	i->dev = io_device_get(io_device);
	if (i->dev == NULL) {
+104 −0
Original line number Diff line number Diff line
@@ -1913,6 +1913,108 @@ spdk_spin(void)
	g_spin_abort_fn = __posix_abort;
}

static void
for_each_channel_and_thread_exit_race(void)
{
	struct spdk_io_channel *ch1, *ch2;
	struct spdk_thread *thread0;
	int ch_count = 0;
	int msg_count = 0;

	allocate_threads(3);
	set_thread(0);
	spdk_io_device_register(&ch_count, channel_create, channel_destroy, sizeof(int), NULL);
	set_thread(1);
	ch1 = spdk_get_io_channel(&ch_count);
	set_thread(2);
	ch2 = spdk_get_io_channel(&ch_count);
	CU_ASSERT(ch_count == 2);

	/*
	 * Test one race condition between spdk_thread_exit() and spdk_for_each_channel().
	 *
	 * thread 0 does not have io_channel and calls spdk_thread_exit() immediately
	 * after spdk_for_each_channel(). In this case, thread 0 should exit after
	 * spdk_for_each_channel() completes.
	 */

	set_thread(0);
	thread0 = spdk_get_thread();

	CU_ASSERT(thread0->for_each_count == 0);

	spdk_for_each_channel(&ch_count, channel_msg, &msg_count, channel_cpl);
	CU_ASSERT(msg_count == 0);
	CU_ASSERT(thread0->for_each_count == 1);
	CU_ASSERT(thread0->state == SPDK_THREAD_STATE_RUNNING);

	spdk_thread_exit(thread0);
	CU_ASSERT(thread0->state == SPDK_THREAD_STATE_EXITING);

	poll_threads();
	CU_ASSERT(msg_count == 3);
	CU_ASSERT(thread0->for_each_count == 0);
	CU_ASSERT(thread0->state == SPDK_THREAD_STATE_EXITED);

	set_thread(1);
	spdk_put_io_channel(ch1);
	CU_ASSERT(ch_count == 2);
	set_thread(2);
	spdk_put_io_channel(ch2);
	CU_ASSERT(ch_count == 2);
	poll_threads();
	CU_ASSERT(ch_count == 0);

	spdk_io_device_unregister(&ch_count, NULL);
	poll_threads();

	free_threads();
}

static void
for_each_thread_and_thread_exit_race(void)
{
	struct spdk_thread *thread0;
	int count = 0;
	int i;

	allocate_threads(3);
	set_thread(0);
	thread0 = spdk_get_thread();

	/* Even if thread 0 starts exiting, spdk_for_each_thread() should complete normally
	 * and then thread 0 should be moved to EXITED.
	 */

	spdk_for_each_thread(for_each_cb, &count, for_each_cb);
	CU_ASSERT(thread0->for_each_count == 1);
	CU_ASSERT(thread0->state == SPDK_THREAD_STATE_RUNNING);

	spdk_thread_exit(thread0);
	CU_ASSERT(thread0->state == SPDK_THREAD_STATE_EXITING);

	/* We have not polled thread 0 yet, so count should be 0 */
	CU_ASSERT(count == 0);

	/* Poll each thread to verify the message is passed to each */
	for (i = 0; i < 3; i++) {
		poll_thread(i);
		CU_ASSERT(count == (i + 1));
	}

	/*
	 * After each thread is called, the completion calls it
	 * one more time.
	 */
	poll_thread(0);
	CU_ASSERT(count == 4);

	CU_ASSERT(thread0->for_each_count == 0);
	CU_ASSERT(thread0->state == SPDK_THREAD_STATE_EXITED);

	free_threads();
}

int
main(int argc, char **argv)
{
@@ -1941,6 +2043,8 @@ main(int argc, char **argv)
	CU_ADD_TEST(suite, multi_timed_pollers_have_same_expiration);
	CU_ADD_TEST(suite, io_device_lookup);
	CU_ADD_TEST(suite, spdk_spin);
	CU_ADD_TEST(suite, for_each_channel_and_thread_exit_race);
	CU_ADD_TEST(suite, for_each_thread_and_thread_exit_race);

	num_failures = spdk_ut_run_tests(argc, argv, NULL);
	CU_cleanup_registry();