Commit 9ec598d1 authored by Konrad Sztyber's avatar Konrad Sztyber Committed by Tomasz Zawadzki
Browse files

lib/thread: poller pause / resume



Added spdk_poller_(pause|resume) that allow a poller to be paused and
then resumed at a later point.  These functions come in handy in cases
when a poller is known to be idle until a certain event occurs.

Change-Id: I7f21c80eb9ac4e8e1cf24d66f99da5687aafe358
Signed-off-by: default avatarKonrad Sztyber <konrad.sztyber@intel.com>
Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/477920


Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
parent f4576dec
Loading
Loading
Loading
Loading
+20 −0
Original line number Diff line number Diff line
@@ -417,6 +417,26 @@ struct spdk_poller *spdk_poller_register(spdk_poller_fn fn,
 */
void spdk_poller_unregister(struct spdk_poller **ppoller);

/**
 * Pause a poller on the current thread.
 *
 * The poller is not run until it is resumed with spdk_poller_resume().  It is
 * perfectly fine to pause an already paused poller.
 *
 * \param poller The poller to pause.
 */
void spdk_poller_pause(struct spdk_poller *poller);

/**
 * Resume a poller on the current thread.
 *
 * Resumes a poller paused with spdk_poller_pause().  It is perfectly fine to
 * resume an unpaused poller.
 *
 * \param poller The poller to resume.
 */
void spdk_poller_resume(struct spdk_poller *poller);

/**
 * Register the opaque io_device context as an I/O device.
 *
+139 −13
Original line number Diff line number Diff line
@@ -88,6 +88,16 @@ enum spdk_poller_state {

	/* The poller was unregistered during the execution of its fn. */
	SPDK_POLLER_STATE_UNREGISTERED,

	/* The poller is in the process of being paused.  It will be paused
	 * during the next time it's supposed to be executed.
	 */
	SPDK_POLLER_STATE_PAUSING,

	/* The poller is registered but currently paused.  It's on the
	 * paused_pollers list.
	 */
	SPDK_POLLER_STATE_PAUSED,
};

struct spdk_poller {
@@ -127,6 +137,13 @@ struct spdk_thread {
	 */
	TAILQ_HEAD(timer_pollers_head, spdk_poller)	timer_pollers;

	/*
	 * Contains paused pollers.  Pollers on this queue are waiting until
	 * they are resumed (in which case they're put onto the active/timer
	 * queues) or unregistered.
	 */
	TAILQ_HEAD(paused_pollers_head, spdk_poller)	paused_pollers;

	struct spdk_ring		*messages;

	SLIST_HEAD(, spdk_msg)		msg_cache;
@@ -202,7 +219,7 @@ _free_thread(struct spdk_thread *thread)
	}

	TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) {
		if (poller->state == SPDK_POLLER_STATE_WAITING) {
		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
			SPDK_WARNLOG("poller %p still registered at thread exit\n",
				     poller);
		}
@@ -213,7 +230,7 @@ _free_thread(struct spdk_thread *thread)


	TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, ptmp) {
		if (poller->state == SPDK_POLLER_STATE_WAITING) {
		if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
			SPDK_WARNLOG("poller %p still registered at thread exit\n",
				     poller);
		}
@@ -222,6 +239,12 @@ _free_thread(struct spdk_thread *thread)
		free(poller);
	}

	TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) {
		SPDK_WARNLOG("poller %p still registered at thread exit\n", poller);
		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
		free(poller);
	}

	pthread_mutex_lock(&g_devlist_mutex);
	assert(g_thread_count > 0);
	g_thread_count--;
@@ -267,6 +290,7 @@ spdk_thread_create(const char *name, struct spdk_cpuset *cpumask)
	TAILQ_INIT(&thread->io_channels);
	TAILQ_INIT(&thread->active_pollers);
	TAILQ_INIT(&thread->timer_pollers);
	TAILQ_INIT(&thread->paused_pollers);
	SLIST_INIT(&thread->msg_cache);
	thread->msg_cache_count = 0;

@@ -444,6 +468,16 @@ _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller
	TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq);
}

static void
_spdk_thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller)
{
	if (poller->period_ticks) {
		_spdk_poller_insert_timer(thread, poller, spdk_get_ticks());
	} else {
		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
	}
}

int
spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
{
@@ -476,6 +510,11 @@ spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
			free(poller);
			continue;
		} else if (poller->state == SPDK_POLLER_STATE_PAUSING) {
			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
			TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
			poller->state = SPDK_POLLER_STATE_PAUSED;
			continue;
		}

		poller->state = SPDK_POLLER_STATE_RUNNING;
@@ -485,9 +524,9 @@ spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
			free(poller);
			continue;
		}

		} else if (poller->state != SPDK_POLLER_STATE_PAUSED) {
			poller->state = SPDK_POLLER_STATE_WAITING;
		}

#ifdef DEBUG
		if (poller_rc == -1) {
@@ -512,6 +551,11 @@ spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
			free(poller);
			continue;
		} else if (poller->state == SPDK_POLLER_STATE_PAUSING) {
			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
			TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
			poller->state = SPDK_POLLER_STATE_PAUSED;
			continue;
		}

		if (now < poller->next_run_tick) {
@@ -530,7 +574,7 @@ spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
			free(poller);
		} else {
		} else if (poller->state != SPDK_POLLER_STATE_PAUSED) {
			poller->state = SPDK_POLLER_STATE_WAITING;
			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
			_spdk_poller_insert_timer(thread, poller, now);
@@ -574,8 +618,8 @@ spdk_thread_has_active_pollers(struct spdk_thread *thread)
	return !TAILQ_EMPTY(&thread->active_pollers);
}

bool
spdk_thread_has_pollers(struct spdk_thread *thread)
static bool
_spdk_thread_has_unpaused_pollers(struct spdk_thread *thread)
{
	if (TAILQ_EMPTY(&thread->active_pollers) &&
	    TAILQ_EMPTY(&thread->timer_pollers)) {
@@ -585,11 +629,22 @@ spdk_thread_has_pollers(struct spdk_thread *thread)
	return true;
}

bool
spdk_thread_has_pollers(struct spdk_thread *thread)
{
	if (!_spdk_thread_has_unpaused_pollers(thread) &&
	    TAILQ_EMPTY(&thread->paused_pollers)) {
		return false;
	}

	return true;
}

bool
spdk_thread_is_idle(struct spdk_thread *thread)
{
	if (spdk_ring_count(thread->messages) ||
	    spdk_thread_has_pollers(thread)) {
	    _spdk_thread_has_unpaused_pollers(thread)) {
		return false;
	}

@@ -723,11 +778,7 @@ spdk_poller_register(spdk_poller_fn fn,
		poller->period_ticks = 0;
	}

	if (poller->period_ticks) {
		_spdk_poller_insert_timer(thread, poller, spdk_get_ticks());
	} else {
		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
	}
	_spdk_thread_insert_poller(thread, poller);

	return poller;
}
@@ -751,12 +802,87 @@ spdk_poller_unregister(struct spdk_poller **ppoller)
		return;
	}

	/* If the poller was paused, put it on the active_pollers list so that
	 * its unregistration can be processed by spdk_thread_poll().
	 */
	if (poller->state == SPDK_POLLER_STATE_PAUSED) {
		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
		poller->period_ticks = 0;
	}

	/* Simply set the state to unregistered. The poller will get cleaned up
	 * in a subsequent call to spdk_thread_poll().
	 */
	poller->state = SPDK_POLLER_STATE_UNREGISTERED;
}

void
spdk_poller_pause(struct spdk_poller *poller)
{
	struct spdk_thread *thread;

	if (poller->state == SPDK_POLLER_STATE_PAUSED ||
	    poller->state == SPDK_POLLER_STATE_PAUSING) {
		return;
	}

	thread = spdk_get_thread();
	if (!thread) {
		assert(false);
		return;
	}

	/* If a poller is paused from within itself, we can immediately move it
	 * on the paused_pollers list.  Otherwise we just set its state to
	 * SPDK_POLLER_STATE_PAUSING and let spdk_thread_poll() move it.  It
	 * allows a poller to be paused from another one's context without
	 * breaking the TAILQ_FOREACH_REVERSE_SAFE iteration.
	 */
	if (poller->state != SPDK_POLLER_STATE_RUNNING) {
		poller->state = SPDK_POLLER_STATE_PAUSING;
	} else {
		if (poller->period_ticks > 0) {
			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
		} else {
			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
		}

		TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
		poller->state = SPDK_POLLER_STATE_PAUSED;
	}
}

void
spdk_poller_resume(struct spdk_poller *poller)
{
	struct spdk_thread *thread;

	if (poller->state != SPDK_POLLER_STATE_PAUSED &&
	    poller->state != SPDK_POLLER_STATE_PAUSING) {
		return;
	}

	thread = spdk_get_thread();
	if (!thread) {
		assert(false);
		return;
	}

	/* If a poller is paused it has to be removed from the paused pollers
	 * list and put on the active / timer list depending on its
	 * period_ticks.  If a poller is still in the process of being paused,
	 * we just need to flip its state back to waiting, as it's already on
	 * the appropriate list.
	 */
	if (poller->state == SPDK_POLLER_STATE_PAUSED) {
		TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
		_spdk_thread_insert_poller(thread, poller);
	}

	poller->state = SPDK_POLLER_STATE_WAITING;
}

struct call_thread {
	struct spdk_thread *cur_thread;
	spdk_msg_fn fn;
+164 −0
Original line number Diff line number Diff line
@@ -179,6 +179,169 @@ thread_poller(void)
	free_threads();
}

struct poller_ctx {
	struct spdk_poller	*poller;
	bool			run;
};

static int
poller_run_pause(void *ctx)
{
	struct poller_ctx *poller_ctx = ctx;

	poller_ctx->run = true;
	spdk_poller_pause(poller_ctx->poller);

	return 0;
}

static void
poller_msg_pause_cb(void *ctx)
{
	struct spdk_poller *poller = ctx;

	spdk_poller_pause(poller);
}

static void
poller_msg_resume_cb(void *ctx)
{
	struct spdk_poller *poller = ctx;

	spdk_poller_resume(poller);
}

static void
poller_pause(void)
{
	struct poller_ctx poller_ctx = {};
	unsigned int delay[] = { 0, 1000 };
	unsigned int i;

	allocate_threads(1);
	set_thread(0);

	/* Register a poller that pauses itself */
	poller_ctx.poller = spdk_poller_register(poller_run_pause, &poller_ctx, 0);
	CU_ASSERT_PTR_NOT_NULL(poller_ctx.poller);

	poller_ctx.run = false;
	poll_threads();
	CU_ASSERT_EQUAL(poller_ctx.run, true);

	poller_ctx.run = false;
	poll_threads();
	CU_ASSERT_EQUAL(poller_ctx.run, false);

	spdk_poller_unregister(&poller_ctx.poller);
	CU_ASSERT_PTR_NULL(poller_ctx.poller);

	/* Verify that resuming an unpaused poller doesn't do anything */
	poller_ctx.poller = spdk_poller_register(poller_run_done, &poller_ctx.run, 0);
	CU_ASSERT_PTR_NOT_NULL(poller_ctx.poller);

	spdk_poller_resume(poller_ctx.poller);

	poller_ctx.run = false;
	poll_threads();
	CU_ASSERT_EQUAL(poller_ctx.run, true);

	/* Verify that pausing the same poller twice works too */
	spdk_poller_pause(poller_ctx.poller);

	poller_ctx.run = false;
	poll_threads();
	CU_ASSERT_EQUAL(poller_ctx.run, false);

	spdk_poller_pause(poller_ctx.poller);
	poll_threads();
	CU_ASSERT_EQUAL(poller_ctx.run, false);

	spdk_poller_resume(poller_ctx.poller);
	poll_threads();
	CU_ASSERT_EQUAL(poller_ctx.run, true);

	/* Verify that a poller is run when it's resumed immediately after pausing */
	poller_ctx.run = false;
	spdk_poller_pause(poller_ctx.poller);
	spdk_poller_resume(poller_ctx.poller);
	poll_threads();
	CU_ASSERT_EQUAL(poller_ctx.run, true);

	spdk_poller_unregister(&poller_ctx.poller);
	CU_ASSERT_PTR_NULL(poller_ctx.poller);

	/* Poll the thread to make sure the previous poller gets unregistered */
	poll_threads();
	CU_ASSERT_EQUAL(spdk_thread_has_pollers(spdk_get_thread()), false);

	/* Verify that it's possible to unregister a paused poller */
	poller_ctx.poller = spdk_poller_register(poller_run_done, &poller_ctx.run, 0);
	CU_ASSERT_PTR_NOT_NULL(poller_ctx.poller);

	poller_ctx.run = false;
	poll_threads();
	CU_ASSERT_EQUAL(poller_ctx.run, true);

	spdk_poller_pause(poller_ctx.poller);

	poller_ctx.run = false;
	poll_threads();
	CU_ASSERT_EQUAL(poller_ctx.run, false);

	spdk_poller_unregister(&poller_ctx.poller);

	poll_threads();
	CU_ASSERT_EQUAL(poller_ctx.run, false);
	CU_ASSERT_EQUAL(spdk_thread_has_pollers(spdk_get_thread()), false);

	/* Register pollers with 0 and 1000us wait time and pause/resume them */
	for (i = 0; i < SPDK_COUNTOF(delay); ++i) {
		poller_ctx.poller = spdk_poller_register(poller_run_done, &poller_ctx.run, delay[i]);
		CU_ASSERT_PTR_NOT_NULL(poller_ctx.poller);

		spdk_delay_us(delay[i]);
		poller_ctx.run = false;
		poll_threads();
		CU_ASSERT_EQUAL(poller_ctx.run, true);

		spdk_poller_pause(poller_ctx.poller);

		spdk_delay_us(delay[i]);
		poller_ctx.run = false;
		poll_threads();
		CU_ASSERT_EQUAL(poller_ctx.run, false);

		spdk_poller_resume(poller_ctx.poller);

		spdk_delay_us(delay[i]);
		poll_threads();
		CU_ASSERT_EQUAL(poller_ctx.run, true);

		/* Verify that the poller can be paused/resumed from spdk_thread_send_msg */
		spdk_thread_send_msg(spdk_get_thread(), poller_msg_pause_cb, poller_ctx.poller);

		spdk_delay_us(delay[i]);
		poller_ctx.run = false;
		poll_threads();
		CU_ASSERT_EQUAL(poller_ctx.run, false);

		spdk_thread_send_msg(spdk_get_thread(), poller_msg_resume_cb, poller_ctx.poller);

		poll_threads();
		if (delay[i] > 0) {
			spdk_delay_us(delay[i]);
			poll_threads();
		}
		CU_ASSERT_EQUAL(poller_ctx.run, true);

		spdk_poller_unregister(&poller_ctx.poller);
		CU_ASSERT_PTR_NULL(poller_ctx.poller);
	}

	free_threads();
}

static void
for_each_cb(void *ctx)
{
@@ -579,6 +742,7 @@ main(int argc, char **argv)
		CU_add_test(suite, "thread_alloc", thread_alloc) == NULL ||
		CU_add_test(suite, "thread_send_msg", thread_send_msg) == NULL ||
		CU_add_test(suite, "thread_poller", thread_poller) == NULL ||
		CU_add_test(suite, "poller_pause", poller_pause) == NULL ||
		CU_add_test(suite, "thread_for_each", thread_for_each) == NULL ||
		CU_add_test(suite, "for_each_channel_remove", for_each_channel_remove) == NULL ||
		CU_add_test(suite, "for_each_channel_unreg", for_each_channel_unreg) == NULL ||