Commit 3c981508 authored by Ben Walker's avatar Ben Walker Committed by Jim Harris
Browse files

thread: Update unit test to use new threading logic



No longer override the send_msg implementation.

This updates ut_multithread.c, which has minor ripple
effects into bdev_ut.c. But that unit test is otherwise
unchanged.

Change-Id: I2fd30a1010bdff0a810d376d985ab1b8a2b22fb9
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.gerrithub.io/424262


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
parent d761ddbf
Loading
Loading
Loading
Loading
+13 −143
Original line number Diff line number Diff line
@@ -35,15 +35,14 @@
#include "spdk/thread.h"
#include "spdk_internal/mock.h"

#include "common/lib/test_env.c"

static uint32_t g_ut_num_threads;
static uint64_t g_current_time_in_us = 0;

int allocate_threads(int num_threads);
void free_threads(void);
void poll_threads(void);
int poll_thread(uintptr_t thread_id);
void increment_time(uint64_t time_in_us);
void reset_time(void);
bool poll_thread(uintptr_t thread_id);

struct ut_msg {
	spdk_thread_fn		fn;
@@ -54,62 +53,10 @@ struct ut_msg {
struct ut_thread {
	struct spdk_thread	*thread;
	struct spdk_io_channel	*ch;
	TAILQ_HEAD(, ut_msg)	msgs;
	TAILQ_HEAD(, ut_poller)	pollers;
};

struct ut_thread *g_ut_threads;

struct ut_poller {
	spdk_poller_fn		fn;
	void			*arg;
	TAILQ_ENTRY(ut_poller)	tailq;
	uint64_t		period_us;
	uint64_t		next_expiration_in_us;
};

static void
__send_msg(spdk_thread_fn fn, void *ctx, void *thread_ctx)
{
	struct ut_thread *thread = thread_ctx;
	struct ut_msg *msg;

	msg = calloc(1, sizeof(*msg));
	SPDK_CU_ASSERT_FATAL(msg != NULL);

	msg->fn = fn;
	msg->ctx = ctx;
	TAILQ_INSERT_TAIL(&thread->msgs, msg, link);
}

static struct spdk_poller *
__start_poller(void *thread_ctx, spdk_poller_fn fn, void *arg, uint64_t period_microseconds)
{
	struct ut_thread *thread = thread_ctx;
	struct ut_poller *poller = calloc(1, sizeof(struct ut_poller));

	SPDK_CU_ASSERT_FATAL(poller != NULL);

	poller->fn = fn;
	poller->arg = arg;
	poller->period_us = period_microseconds;
	poller->next_expiration_in_us = g_current_time_in_us + poller->period_us;

	TAILQ_INSERT_TAIL(&thread->pollers, poller, tailq);

	return (struct spdk_poller *)poller;
}

static void
__stop_poller(struct spdk_poller *poller, void *thread_ctx)
{
	struct ut_thread *thread = thread_ctx;

	TAILQ_REMOVE(&thread->pollers, (struct ut_poller *)poller, tailq);

	free(poller);
}

#define INVALID_THREAD 0x1000

static uintptr_t g_thread_id = INVALID_THREAD;
@@ -138,13 +85,9 @@ allocate_threads(int num_threads)

	for (i = 0; i < g_ut_num_threads; i++) {
		set_thread(i);
		spdk_allocate_thread(__send_msg, __start_poller, __stop_poller,
				     &g_ut_threads[i], NULL);
		thread = spdk_get_thread();
		thread = spdk_allocate_thread(NULL, NULL, NULL, NULL, NULL);
		SPDK_CU_ASSERT_FATAL(thread != NULL);
		g_ut_threads[i].thread = thread;
		TAILQ_INIT(&g_ut_threads[i].msgs);
		TAILQ_INIT(&g_ut_threads[i].pollers);
	}

	set_thread(INVALID_THREAD);
@@ -166,51 +109,12 @@ free_threads(void)
	g_ut_threads = NULL;
}

void
increment_time(uint64_t time_in_us)
{
	g_current_time_in_us += time_in_us;
	spdk_delay_us(time_in_us);
}

static void
reset_pollers(void)
{
	uint32_t		i = 0;
	struct ut_thread	*thread = NULL;
	struct ut_poller	*poller = NULL;
	uintptr_t		original_thread_id = g_thread_id;

	CU_ASSERT(g_current_time_in_us == 0);

	for (i = 0; i < g_ut_num_threads; i++) {
		set_thread(i);
		thread = &g_ut_threads[i];

		TAILQ_FOREACH(poller, &thread->pollers, tailq) {
			poller->next_expiration_in_us = g_current_time_in_us + poller->period_us;
		}
	}

	set_thread(original_thread_id);
}

void
reset_time(void)
{
	g_current_time_in_us = 0;
	reset_pollers();
}

int
bool
poll_thread(uintptr_t thread_id)
{
	int count = 0;
	bool busy = false;
	struct ut_thread *thread = &g_ut_threads[thread_id];
	struct ut_msg *msg;
	struct ut_poller *poller;
	uintptr_t original_thread_id;
	TAILQ_HEAD(, ut_poller)	tmp_pollers;

	CU_ASSERT(thread_id != (uintptr_t)INVALID_THREAD);
	CU_ASSERT(thread_id < g_ut_num_threads);
@@ -218,60 +122,26 @@ poll_thread(uintptr_t thread_id)
	original_thread_id = g_thread_id;
	set_thread(thread_id);

	while (!TAILQ_EMPTY(&thread->msgs)) {
		msg = TAILQ_FIRST(&thread->msgs);
		TAILQ_REMOVE(&thread->msgs, msg, link);

		msg->fn(msg->ctx);
		count++;
		free(msg);
	while (spdk_thread_poll(thread->thread, 0) > 0) {
		busy = true;
	}

	TAILQ_INIT(&tmp_pollers);

	while (!TAILQ_EMPTY(&thread->pollers)) {
		poller = TAILQ_FIRST(&thread->pollers);
		TAILQ_REMOVE(&thread->pollers, poller, tailq);

		if (g_current_time_in_us >= poller->next_expiration_in_us) {
			if (poller->fn) {
				poller->fn(poller->arg);
			}

			if (poller->period_us == 0) {
				break;
			} else {
				poller->next_expiration_in_us += poller->period_us;
			}
		}

		TAILQ_INSERT_TAIL(&tmp_pollers, poller, tailq);
	}

	TAILQ_SWAP(&tmp_pollers, &thread->pollers, ut_poller, tailq);

	set_thread(original_thread_id);

	return count;
	return busy;
}

void
poll_threads(void)
{
	bool msg_processed;
	uint32_t i, count;

	while (true) {
		msg_processed = false;
		bool busy = false;

		for (i = 0; i < g_ut_num_threads; i++) {
			count = poll_thread(i);
			if (count > 0) {
				msg_processed = true;
			}
		for (uint32_t i = 0; i < g_ut_num_threads; i++) {
			busy = busy || poll_thread(i);
		}

		if (!msg_processed) {
		if (!busy) {
			break;
		}
	}
+4 −5
Original line number Diff line number Diff line
@@ -33,7 +33,6 @@

#include "spdk_cunit.h"

#include "common/lib/test_env.c"
#include "common/lib/ut_multithread.c"
#include "unit/lib/json_mock.c"

@@ -720,7 +719,7 @@ io_during_qos_queue(void)
	int rc;

	setup_test();
	reset_time();
	MOCK_SET(spdk_get_ticks, 0);

	/* Enable QoS */
	bdev = &g_bdev.bdev;
@@ -776,7 +775,7 @@ io_during_qos_queue(void)
	}

	/* Advance in time by a millisecond */
	increment_time(1000);
	spdk_delay_us(1000);

	/* Complete more I/O */
	poll_threads();
@@ -810,7 +809,7 @@ io_during_qos_reset(void)
	int rc;

	setup_test();
	reset_time();
	MOCK_SET(spdk_get_ticks, 0);

	/* Enable QoS */
	bdev = &g_bdev.bdev;
@@ -1158,7 +1157,7 @@ qos_dynamic_enable(void)
	int status, second_status, rc, i;

	setup_test();
	reset_time();
	MOCK_SET(spdk_get_ticks, 0);

	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
		limits[i] = UINT64_MAX;
+14 −14
Original line number Diff line number Diff line
@@ -36,15 +36,8 @@
#include "spdk_cunit.h"

#include "thread/thread.c"
#include "common/lib/test_env.c"
#include "common/lib/ut_multithread.c"

static void
_send_msg(spdk_thread_fn fn, void *ctx, void *thread_ctx)
{
	fn(ctx);
}

static void
thread_alloc(void)
{
@@ -116,7 +109,7 @@ thread_poller(void)
	allocate_threads(1);

	set_thread(0);
	reset_time();
	MOCK_SET(spdk_get_ticks, 0);
	/* Register a poller with no-wait time and test execution */
	poller = spdk_poller_register(poller_run_done, &poller_run, 0);
	CU_ASSERT(poller != NULL);
@@ -135,16 +128,15 @@ thread_poller(void)
	poll_threads();
	CU_ASSERT(poller_run == false);

	increment_time(1000);
	spdk_delay_us(1000);
	poll_threads();
	CU_ASSERT(poller_run == true);

	reset_time();
	poller_run = false;
	poll_threads();
	CU_ASSERT(poller_run == false);

	increment_time(1000);
	spdk_delay_us(1000);
	poll_threads();
	CU_ASSERT(poller_run == true);

@@ -246,6 +238,7 @@ for_each_channel_remove(void)
	 */
	set_thread(0);
	spdk_put_io_channel(ch0);
	poll_threads();
	spdk_for_each_channel(&io_target, channel_msg, &count, channel_cpl);
	poll_threads();

@@ -350,7 +343,7 @@ thread_name(void)
	const char *name;

	/* Create thread with no name, which automatically generates one */
	spdk_allocate_thread(_send_msg, NULL, NULL, NULL, NULL);
	spdk_allocate_thread(NULL, NULL, NULL, NULL, NULL);
	thread = spdk_get_thread();
	SPDK_CU_ASSERT_FATAL(thread != NULL);
	name = spdk_thread_get_name(thread);
@@ -358,7 +351,7 @@ thread_name(void)
	spdk_free_thread();

	/* Create thread named "test_thread" */
	spdk_allocate_thread(_send_msg, NULL, NULL, NULL, "test_thread");
	spdk_allocate_thread(NULL, NULL, NULL, NULL, "test_thread");
	thread = spdk_get_thread();
	SPDK_CU_ASSERT_FATAL(thread != NULL);
	name = spdk_thread_get_name(thread);
@@ -414,10 +407,12 @@ destroy_cb_2(void *io_device, void *ctx_buf)
static void
channel(void)
{
	struct spdk_thread *thread;
	struct spdk_io_channel *ch1, *ch2;
	void *ctx;

	spdk_allocate_thread(_send_msg, NULL, NULL, NULL, "thread0");
	thread = spdk_allocate_thread(NULL, NULL, NULL, NULL, "thread0");
	SPDK_CU_ASSERT_FATAL(thread != NULL);
	spdk_io_device_register(&device1, create_cb_1, destroy_cb_1, sizeof(ctx1), NULL);
	spdk_io_device_register(&device2, create_cb_2, destroy_cb_2, sizeof(ctx2), NULL);

@@ -434,6 +429,7 @@ channel(void)

	g_destroy_cb_calls = 0;
	spdk_put_io_channel(ch2);
	while (spdk_thread_poll(thread, 0) > 0) {}
	CU_ASSERT(g_destroy_cb_calls == 0);

	g_create_cb_calls = 0;
@@ -447,17 +443,21 @@ channel(void)

	g_destroy_cb_calls = 0;
	spdk_put_io_channel(ch1);
	while (spdk_thread_poll(thread, 0) > 0) {}
	CU_ASSERT(g_destroy_cb_calls == 1);

	g_destroy_cb_calls = 0;
	spdk_put_io_channel(ch2);
	while (spdk_thread_poll(thread, 0) > 0) {}
	CU_ASSERT(g_destroy_cb_calls == 1);

	ch1 = spdk_get_io_channel(&device3);
	CU_ASSERT(ch1 == NULL);

	spdk_io_device_unregister(&device1, NULL);
	while (spdk_thread_poll(thread, 0) > 0) {}
	spdk_io_device_unregister(&device2, NULL);
	while (spdk_thread_poll(thread, 0) > 0) {}
	CU_ASSERT(TAILQ_EMPTY(&g_io_devices));
	spdk_free_thread();
	CU_ASSERT(TAILQ_EMPTY(&g_threads));