Commit d7393e2e authored by Shuhei Matsumoto's avatar Shuhei Matsumoto Committed by Tomasz Zawadzki
Browse files

lib/thread: Stop and reap pending messages after thread is marked as exited



This is a preparation to support voluntary thread termination by
calling spdk_thread_exit().

Previously, the exiting thread had discarded all pending mesasges.

We change this to stop accepting any new message in spdk_thread_send_msg()
and reap pending messages in _spdk_msg_queue_run_batch().

Add unit test case for the new behavior. Adding g_ prefix to global
variables for clarification is done together.

Signed-off-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Change-Id: Ida78e7bb1b86357602aea6938dd514897b67edd6
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/482


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-by: default avatarAleksey Marchuk <alexeymar@mellanox.com>
Reviewed-by: default avatarChangpeng Liu <changpeng.liu@intel.com>
parent f2576eb0
Loading
Loading
Loading
Loading
+5 −4
Original line number Diff line number Diff line
@@ -444,10 +444,6 @@ _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
		} else {
			spdk_mempool_put(g_spdk_msg_mempool, msg);
		}

		if (thread->exit) {
			break;
		}
	}

	return count;
@@ -716,6 +712,11 @@ spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx

	assert(thread != NULL);

	if (spdk_unlikely(thread->exit)) {
		SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name);
		return -EIO;
	}

	local_thread = _get_thread();

	msg = NULL;
+89 −23
Original line number Diff line number Diff line
@@ -587,12 +587,12 @@ thread_name(void)
	spdk_thread_lib_fini();
}

static uint64_t device1;
static uint64_t device2;
static uint64_t device3;
static uint64_t g_device1;
static uint64_t g_device2;
static uint64_t g_device3;

static uint64_t ctx1 = 0x1111;
static uint64_t ctx2 = 0x2222;
static uint64_t g_ctx1 = 0x1111;
static uint64_t g_ctx2 = 0x2222;

static int g_create_cb_calls = 0;
static int g_destroy_cb_calls = 0;
@@ -600,8 +600,8 @@ static int g_destroy_cb_calls = 0;
static int
create_cb_1(void *io_device, void *ctx_buf)
{
	CU_ASSERT(io_device == &device1);
	*(uint64_t *)ctx_buf = ctx1;
	CU_ASSERT(io_device == &g_device1);
	*(uint64_t *)ctx_buf = g_ctx1;
	g_create_cb_calls++;
	return 0;
}
@@ -609,16 +609,16 @@ create_cb_1(void *io_device, void *ctx_buf)
static void
destroy_cb_1(void *io_device, void *ctx_buf)
{
	CU_ASSERT(io_device == &device1);
	CU_ASSERT(*(uint64_t *)ctx_buf == ctx1);
	CU_ASSERT(io_device == &g_device1);
	CU_ASSERT(*(uint64_t *)ctx_buf == g_ctx1);
	g_destroy_cb_calls++;
}

static int
create_cb_2(void *io_device, void *ctx_buf)
{
	CU_ASSERT(io_device == &device2);
	*(uint64_t *)ctx_buf = ctx2;
	CU_ASSERT(io_device == &g_device2);
	*(uint64_t *)ctx_buf = g_ctx2;
	g_create_cb_calls++;
	return 0;
}
@@ -626,8 +626,8 @@ create_cb_2(void *io_device, void *ctx_buf)
static void
destroy_cb_2(void *io_device, void *ctx_buf)
{
	CU_ASSERT(io_device == &device2);
	CU_ASSERT(*(uint64_t *)ctx_buf == ctx2);
	CU_ASSERT(io_device == &g_device2);
	CU_ASSERT(*(uint64_t *)ctx_buf == g_ctx2);
	g_destroy_cb_calls++;
}

@@ -640,16 +640,16 @@ channel(void)
	allocate_threads(1);
	set_thread(0);

	spdk_io_device_register(&device1, create_cb_1, destroy_cb_1, sizeof(ctx1), NULL);
	spdk_io_device_register(&device2, create_cb_2, destroy_cb_2, sizeof(ctx2), NULL);
	spdk_io_device_register(&g_device1, create_cb_1, destroy_cb_1, sizeof(g_ctx1), NULL);
	spdk_io_device_register(&g_device2, create_cb_2, destroy_cb_2, sizeof(g_ctx2), NULL);

	g_create_cb_calls = 0;
	ch1 = spdk_get_io_channel(&device1);
	ch1 = spdk_get_io_channel(&g_device1);
	CU_ASSERT(g_create_cb_calls == 1);
	SPDK_CU_ASSERT_FATAL(ch1 != NULL);

	g_create_cb_calls = 0;
	ch2 = spdk_get_io_channel(&device1);
	ch2 = spdk_get_io_channel(&g_device1);
	CU_ASSERT(g_create_cb_calls == 0);
	CU_ASSERT(ch1 == ch2);
	SPDK_CU_ASSERT_FATAL(ch2 != NULL);
@@ -660,13 +660,13 @@ channel(void)
	CU_ASSERT(g_destroy_cb_calls == 0);

	g_create_cb_calls = 0;
	ch2 = spdk_get_io_channel(&device2);
	ch2 = spdk_get_io_channel(&g_device2);
	CU_ASSERT(g_create_cb_calls == 1);
	CU_ASSERT(ch1 != ch2);
	SPDK_CU_ASSERT_FATAL(ch2 != NULL);

	ctx = spdk_io_channel_get_ctx(ch2);
	CU_ASSERT(*(uint64_t *)ctx == ctx2);
	CU_ASSERT(*(uint64_t *)ctx == g_ctx2);

	g_destroy_cb_calls = 0;
	spdk_put_io_channel(ch1);
@@ -678,12 +678,12 @@ channel(void)
	poll_threads();
	CU_ASSERT(g_destroy_cb_calls == 1);

	ch1 = spdk_get_io_channel(&device3);
	ch1 = spdk_get_io_channel(&g_device3);
	CU_ASSERT(ch1 == NULL);

	spdk_io_device_unregister(&device1, NULL);
	spdk_io_device_unregister(&g_device1, NULL);
	poll_threads();
	spdk_io_device_unregister(&device2, NULL);
	spdk_io_device_unregister(&g_device2, NULL);
	poll_threads();
	CU_ASSERT(TAILQ_EMPTY(&g_io_devices));
	free_threads();
@@ -745,6 +745,71 @@ channel_destroy_races(void)
	CU_ASSERT(TAILQ_EMPTY(&g_threads));
}

static void
thread_exit(void)
{
	struct spdk_thread *thread;
	struct spdk_io_channel *ch;
	void *ctx;
	bool done1 = false, done2 = false;
	int rc __attribute__((unused));

	allocate_threads(3);

	/* Test all pending messages are reaped for the thread marked as exited. */
	set_thread(0);
	thread = spdk_get_thread();

	/* Sending message to thread 0 will be accepted. */
	set_thread(1);
	rc = spdk_thread_send_msg(thread, send_msg_cb, &done1);
	CU_ASSERT(rc == 0);
	CU_ASSERT(!done1);

	/* Mark thread 0 as exited. */
	set_thread(0);
	spdk_thread_exit(thread);

	/* Sending message to thread 0 will be rejected. */
	set_thread(1);
	rc = spdk_thread_send_msg(thread, send_msg_cb, &done2);
	CU_ASSERT(rc == -EIO);

	/* Thread 0 will reap pending message. */
	poll_thread(0);
	CU_ASSERT(done1 == true);
	CU_ASSERT(done2 == false);

	/* Test releasing I/O channel is reaped even after the thread is marked
	 * as exited.
	 */
	set_thread(2);

	spdk_io_device_register(&g_device1, create_cb_1, destroy_cb_1, sizeof(g_ctx1), NULL);

	g_create_cb_calls = 0;
	ch = spdk_get_io_channel(&g_device1);
	CU_ASSERT(g_create_cb_calls == 1);
	SPDK_CU_ASSERT_FATAL(ch != NULL);

	ctx = spdk_io_channel_get_ctx(ch);
	CU_ASSERT(*(uint64_t *)ctx == g_ctx1);

	g_destroy_cb_calls = 0;
	spdk_put_io_channel(ch);

	thread = spdk_get_thread();
	spdk_thread_exit(thread);

	poll_threads();
	CU_ASSERT(g_destroy_cb_calls == 1);

	spdk_io_device_unregister(&g_device1, NULL);
	poll_threads();

	free_threads();
}

int
main(int argc, char **argv)
{
@@ -771,7 +836,8 @@ main(int argc, char **argv)
		CU_add_test(suite, "for_each_channel_unreg", for_each_channel_unreg) == NULL ||
		CU_add_test(suite, "thread_name", thread_name) == NULL ||
		CU_add_test(suite, "channel", channel) == NULL ||
		CU_add_test(suite, "channel_destroy_races", channel_destroy_races) == NULL
		CU_add_test(suite, "channel_destroy_races", channel_destroy_races) == NULL ||
		CU_add_test(suite, "thread_exit", thread_exit) == NULL
	) {
		CU_cleanup_registry();
		return CU_get_error();