Commit 08d36a55 authored by Ben Walker's avatar Ben Walker Committed by Jim Harris
Browse files

thread: Make the thread internally manage pollers



The user of the thread library is now only responsible for
periodically calling spdk_thread_poll. Pollers are handled
internally.

In order to avoid changing all of the unit tests, the ability
to provide function pointers to change the behavior of
the  poller registration is still in the code. This should
only be used from tests until they are all converted.

Change-Id: Ie2c00ce1d57ca3710ed2c469cd711924768e23ef
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.gerrithub.io/417784


Chandler-Test-Pool: SPDK Automated Test System <sys_sgsw@intel.com>
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarChangpeng Liu <changpeng.liu@intel.com>
Reviewed-by: default avatarZiye Yang <optimistyzy@gmail.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
parent 4e3cc6e1
Loading
Loading
Loading
Loading
+25 −1
Original line number Diff line number Diff line
@@ -182,8 +182,11 @@ void spdk_thread_lib_fini(void);
 * pointer (spdk_thread_fn) that must be called on the same thread that spdk_allocate_thread
 * was called from.
 * \param start_poller_fn Function to be called to start a poller for the thread.
 * DEPRECATED. Only used in tests. Pass NULL for this parameter.
 * \param stop_poller_fn Function to be called to stop a poller for the thread.
 * \param thread_ctx Context that will be passed to fn, start_poller_fn and spdk_stop_poller.
 * DEPRECATED. Only used in tests. Pass NULL for this parameter.
 * \param thread_ctx Context that will be passed to msg_fn, start_poller_fn, and stop_poller_fn.
 * DEPRECATED. Only used in tests. Pass NULL for this parameter.
 * \param name Human-readable name for the thread; can be retrieved with spdk_thread_get_name().
 * The string is copied, so the pointed-to data only needs to be valid during the
 * spdk_allocate_thread() call. May be NULL to specify no name.
@@ -204,6 +207,27 @@ struct spdk_thread *spdk_allocate_thread(spdk_thread_pass_msg msg_fn,
 */
void spdk_free_thread(void);

/**
 * Perform one iteration worth of processing on the thread. This currently only
 * executes pollers.
 *
 * \param thread The thread to process
 *
 * \return 1 if work was done. 0 if no work was done. -1 if unknown.
 */
int spdk_thread_poll(struct spdk_thread *thread);

/**
 * Return the number of ticks until the next timed poller
 * would expire. Timed pollers are pollers for which
 * period_microseconds is greater than 0.
 *
 * \param thread The thread to check poller expiration times on
 *
 * \return Number of ticks. If no timed pollers, return 0.
 */
uint64_t spdk_thread_next_poller_expiration(struct spdk_thread *thread);

/**
 * Get count of allocated threads.
 */
+16 −209
Original line number Diff line number Diff line
@@ -46,30 +46,6 @@

#define SPDK_EVENT_BATCH_SIZE		8

enum spdk_poller_state {
	/* The poller is registered with a reactor but not currently executing its fn. */
	SPDK_POLLER_STATE_WAITING,

	/* The poller is currently running its fn. */
	SPDK_POLLER_STATE_RUNNING,

	/* The poller was unregistered during the execution of its fn. */
	SPDK_POLLER_STATE_UNREGISTERED,
};

struct spdk_poller {
	TAILQ_ENTRY(spdk_poller)	tailq;
	uint32_t			lcore;

	/* Current state of the poller; should only be accessed from the poller's thread. */
	enum spdk_poller_state		state;

	uint64_t			period_ticks;
	uint64_t			next_run_tick;
	spdk_poller_fn			fn;
	void				*arg;
};

enum spdk_reactor_state {
	SPDK_REACTOR_STATE_INVALID = 0,
	SPDK_REACTOR_STATE_INITIALIZED = 1,
@@ -96,19 +72,6 @@ struct spdk_reactor {
	/* The last known rusage values */
	struct rusage					rusage;

	/*
	 * Contains pollers actively running on this reactor.  Pollers
	 *  are run round-robin. The reactor takes one poller from the head
	 *  of the ring, executes it, then puts it back at the tail of
	 *  the ring.
	 */
	TAILQ_HEAD(, spdk_poller)			active_pollers;

	/**
	 * Contains pollers running on this reactor with a periodic timer.
	 */
	TAILQ_HEAD(timer_pollers_head, spdk_poller)	timer_pollers;

	struct spdk_ring				*events;

	/* Pointer to the per-socket g_spdk_event_mempool for this reactor. */
@@ -231,99 +194,6 @@ _spdk_reactor_send_msg(spdk_thread_fn fn, void *ctx, void *thread_ctx)
	spdk_event_call(event);
}

static void
_spdk_poller_insert_timer(struct spdk_reactor *reactor, struct spdk_poller *poller, uint64_t now)
{
	struct spdk_poller *iter;
	uint64_t next_run_tick;

	next_run_tick = now + poller->period_ticks;
	poller->next_run_tick = next_run_tick;

	/*
	 * Insert poller in the reactor's timer_pollers list in sorted order by next scheduled
	 * run time.
	 */
	TAILQ_FOREACH_REVERSE(iter, &reactor->timer_pollers, timer_pollers_head, tailq) {
		if (iter->next_run_tick <= next_run_tick) {
			TAILQ_INSERT_AFTER(&reactor->timer_pollers, iter, poller, tailq);
			return;
		}
	}

	/* No earlier pollers were found, so this poller must be the new head */
	TAILQ_INSERT_HEAD(&reactor->timer_pollers, poller, tailq);
}

static struct spdk_poller *
_spdk_reactor_start_poller(void *thread_ctx,
			   spdk_poller_fn fn,
			   void *arg,
			   uint64_t period_microseconds)
{
	struct spdk_poller *poller;
	struct spdk_reactor *reactor;
	uint64_t quotient, remainder, ticks;

	reactor = thread_ctx;

	poller = calloc(1, sizeof(*poller));
	if (poller == NULL) {
		SPDK_ERRLOG("Poller memory allocation failed\n");
		return NULL;
	}

	poller->lcore = reactor->lcore;
	poller->state = SPDK_POLLER_STATE_WAITING;
	poller->fn = fn;
	poller->arg = arg;

	if (period_microseconds) {
		quotient = period_microseconds / SPDK_SEC_TO_USEC;
		remainder = period_microseconds % SPDK_SEC_TO_USEC;
		ticks = spdk_get_ticks_hz();

		poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
	} else {
		poller->period_ticks = 0;
	}

	if (poller->period_ticks) {
		_spdk_poller_insert_timer(reactor, poller, spdk_get_ticks());
	} else {
		TAILQ_INSERT_TAIL(&reactor->active_pollers, poller, tailq);
	}

	return poller;
}

static void
_spdk_reactor_stop_poller(struct spdk_poller *poller, void *thread_ctx)
{
	struct spdk_reactor *reactor;

	reactor = thread_ctx;

	assert(poller->lcore == spdk_env_get_current_core());

	if (poller->state == SPDK_POLLER_STATE_RUNNING) {
		/*
		 * We are being called from the poller_fn, so set the state to unregistered
		 * and let the reactor loop free the poller.
		 */
		poller->state = SPDK_POLLER_STATE_UNREGISTERED;
	} else {
		/* Poller is not running currently, so just free it. */
		if (poller->period_ticks) {
			TAILQ_REMOVE(&reactor->timer_pollers, poller, tailq);
		} else {
			TAILQ_REMOVE(&reactor->active_pollers, poller, tailq);
		}

		free(poller);
	}
}

static int
get_rusage(void *arg)
{
@@ -446,32 +316,11 @@ spdk_reactor_get_tsc_stats(struct spdk_reactor_tsc_stats *tsc_stats, uint32_t co
	return 0;
}

/**
 *
 * \brief This is the main function of the reactor thread.
 *
 * \code
 *
 * while (1)
 *	if (events to run)
 *		dequeue and run a batch of events
 *
 *	if (active pollers)
 *		run the first poller in the list and move it to the back
 *
 *	if (first timer poller has expired)
 *		run the first timer poller and reinsert it in the timer list
 *
 *	if (no action taken and sleep enabled)
 *		sleep until next timer poller is scheduled to expire
 * \endcode
 *
 */
static int
_spdk_reactor_run(void *arg)
{
	struct spdk_reactor	*reactor = arg;
	struct spdk_poller	*poller;
	struct spdk_thread	*thread;
	uint32_t		event_count;
	uint64_t		now;
	uint64_t		sleep_cycles;
@@ -480,10 +329,8 @@ _spdk_reactor_run(void *arg)
	char			thread_name[32];

	snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore);
	if (spdk_allocate_thread(_spdk_reactor_send_msg,
				 _spdk_reactor_start_poller,
				 _spdk_reactor_stop_poller,
				 reactor, thread_name) == NULL) {
	thread = spdk_allocate_thread(_spdk_reactor_send_msg, NULL, NULL, reactor, thread_name);
	if (!thread) {
		return -1;
	}
	SPDK_NOTICELOG("Reactor started on core %u on socket %u\n", reactor->lcore,
@@ -507,68 +354,31 @@ _spdk_reactor_run(void *arg)
			took_action = true;
		}

		poller = TAILQ_FIRST(&reactor->active_pollers);
		if (poller) {
			TAILQ_REMOVE(&reactor->active_pollers, poller, tailq);
			poller->state = SPDK_POLLER_STATE_RUNNING;
			rc = poller->fn(poller->arg);
			now = spdk_get_ticks();
			spdk_reactor_add_tsc_stats(reactor, rc, now);
			if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
				free(poller);
			} else {
				poller->state = SPDK_POLLER_STATE_WAITING;
				TAILQ_INSERT_TAIL(&reactor->active_pollers, poller, tailq);
			}
			took_action = true;
		}

		poller = TAILQ_FIRST(&reactor->timer_pollers);
		if (poller) {
			if (took_action == false) {
				now = spdk_get_ticks();
			}

			if (now >= poller->next_run_tick) {
				uint64_t tmp_timer_tsc;

				TAILQ_REMOVE(&reactor->timer_pollers, poller, tailq);
				poller->state = SPDK_POLLER_STATE_RUNNING;
				rc = poller->fn(poller->arg);
				/* Save the tsc value from before poller->fn was executed. We want to
				 * use the current time for idle/busy tsc value accounting, but want to
				 * use the older time to reinsert to the timer poller below. */
				tmp_timer_tsc = now;
		rc = spdk_thread_poll(thread);
		if (rc != 0) {
			now = spdk_get_ticks();
			spdk_reactor_add_tsc_stats(reactor, rc, now);
				if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
					free(poller);
				} else {
					poller->state = SPDK_POLLER_STATE_WAITING;
					_spdk_poller_insert_timer(reactor, poller, tmp_timer_tsc);
				}
			took_action = true;
		}
		}

		/* Determine if the thread can sleep */
		if (sleep_cycles && !took_action) {
			uint64_t next_run_tick;

			now = spdk_get_ticks();
			sleep_us = reactor->max_delay_us;
			next_run_tick = spdk_thread_next_poller_expiration(thread);

			poller = TAILQ_FIRST(&reactor->timer_pollers);
			if (poller) {
			/* There are timers registered, so don't sleep beyond
			 * when the next timer should fire */
				if (poller->next_run_tick < (now + sleep_cycles)) {
					if (poller->next_run_tick <= now) {
			if (next_run_tick > 0 && next_run_tick < (now + sleep_cycles)) {
				if (next_run_tick <= now) {
					sleep_us = 0;
				} else {
						sleep_us = ((poller->next_run_tick - now) *
					sleep_us = ((next_run_tick - now) *
						    SPDK_SEC_TO_USEC) / spdk_get_ticks_hz();
				}
			}
			}

			if (sleep_us > 0) {
				usleep(sleep_us);
@@ -593,9 +403,6 @@ spdk_reactor_construct(struct spdk_reactor *reactor, uint32_t lcore, uint64_t ma
	assert(reactor->socket_id < SPDK_MAX_SOCKET);
	reactor->max_delay_us = max_delay_us;

	TAILQ_INIT(&reactor->active_pollers);
	TAILQ_INIT(&reactor->timer_pollers);

	reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, reactor->socket_id);
	if (!reactor->events) {
		SPDK_NOTICELOG("Ring creation failed on preferred socket %d. Try other sockets.\n",
+191 −9
Original line number Diff line number Diff line
@@ -33,8 +33,11 @@

#include "spdk/stdinc.h"

#include "spdk/env.h"
#include "spdk/queue.h"
#include "spdk/string.h"
#include "spdk/thread.h"
#include "spdk/util.h"

#include "spdk_internal/log.h"

@@ -66,6 +69,29 @@ struct io_device {

static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices);

enum spdk_poller_state {
	/* The poller is registered with a thread but not currently executing its fn. */
	SPDK_POLLER_STATE_WAITING,

	/* The poller is currently running its fn. */
	SPDK_POLLER_STATE_RUNNING,

	/* The poller was unregistered during the execution of its fn. */
	SPDK_POLLER_STATE_UNREGISTERED,
};

struct spdk_poller {
	TAILQ_ENTRY(spdk_poller)	tailq;

	/* Current state of the poller; should only be accessed from the poller's thread. */
	enum spdk_poller_state		state;

	uint64_t			period_ticks;
	uint64_t			next_run_tick;
	spdk_poller_fn			fn;
	void				*arg;
};

struct spdk_thread {
	pthread_t			thread_id;
	spdk_thread_pass_msg		msg_fn;
@@ -75,6 +101,19 @@ struct spdk_thread {
	TAILQ_HEAD(, spdk_io_channel)	io_channels;
	TAILQ_ENTRY(spdk_thread)	tailq;
	char				*name;

	/*
	 * Contains pollers actively running on this thread.  Pollers
	 *  are run round-robin. The thread takes one poller from the head
	 *  of the ring, executes it, then puts it back at the tail of
	 *  the ring.
	 */
	TAILQ_HEAD(, spdk_poller)	active_pollers;

	/**
	 * Contains pollers running on this thread with a periodic timer.
	 */
	TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers;
};

static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
@@ -138,6 +177,12 @@ spdk_allocate_thread(spdk_thread_pass_msg msg_fn,
		return NULL;
	}

	if ((start_poller_fn != NULL && stop_poller_fn == NULL) ||
	    (start_poller_fn == NULL && stop_poller_fn != NULL)) {
		SPDK_ERRLOG("start_poller_fn and stop_poller_fn must either both be NULL or both be non-NULL\n");
		return NULL;
	}

	thread = calloc(1, sizeof(*thread));
	if (!thread) {
		SPDK_ERRLOG("Unable to allocate memory for thread\n");
@@ -152,6 +197,10 @@ spdk_allocate_thread(spdk_thread_pass_msg msg_fn,
	thread->thread_ctx = thread_ctx;
	TAILQ_INIT(&thread->io_channels);
	TAILQ_INSERT_TAIL(&g_threads, thread, tailq);

	TAILQ_INIT(&thread->active_pollers);
	TAILQ_INIT(&thread->timer_pollers);

	g_thread_count++;
	if (name) {
		_set_thread_name(name);
@@ -192,6 +241,99 @@ spdk_free_thread(void)
	pthread_mutex_unlock(&g_devlist_mutex);
}

static void
_spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
{
	struct spdk_poller *iter;

	poller->next_run_tick = now + poller->period_ticks;

	/*
	 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled
	 * run time.
	 */
	TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) {
		if (iter->next_run_tick <= poller->next_run_tick) {
			TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq);
			return;
		}
	}

	/* No earlier pollers were found, so this poller must be the new head */
	TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq);
}

int
spdk_thread_poll(struct spdk_thread *thread)
{
	struct spdk_poller *poller;
	int rc = 0;

	poller = TAILQ_FIRST(&thread->active_pollers);
	if (poller) {
		TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
		poller->state = SPDK_POLLER_STATE_RUNNING;
		rc = poller->fn(poller->arg);
		if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
			free(poller);
		} else {
			poller->state = SPDK_POLLER_STATE_WAITING;
			TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
		}

#ifdef DEBUG
		if (rc == -1) {
			SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller);
		}
#endif
	}

	poller = TAILQ_FIRST(&thread->timer_pollers);
	if (poller) {
		uint64_t now = spdk_get_ticks();

		if (now >= poller->next_run_tick) {
			int timer_rc = 0;

			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
			poller->state = SPDK_POLLER_STATE_RUNNING;
			timer_rc = poller->fn(poller->arg);
			if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
				free(poller);
			} else {
				poller->state = SPDK_POLLER_STATE_WAITING;
				_spdk_poller_insert_timer(thread, poller, now);
			}

#ifdef DEBUG
			if (timer_rc == -1) {
				SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller);
			}
#endif

			if (timer_rc > rc) {
				rc = timer_rc;

			}
		}
	}

	return rc;
}

uint64_t
spdk_thread_next_poller_expiration(struct spdk_thread *thread)
{
	struct spdk_poller *poller;

	poller = TAILQ_FIRST(&thread->timer_pollers);
	if (poller) {
		return poller->next_run_tick;
	}

	return 0;
}

uint32_t
spdk_thread_get_count(void)
{
@@ -240,6 +382,7 @@ spdk_poller_register(spdk_poller_fn fn,
{
	struct spdk_thread *thread;
	struct spdk_poller *poller;
	uint64_t quotient, remainder, ticks;

	thread = spdk_get_thread();
	if (!thread) {
@@ -247,19 +390,36 @@ spdk_poller_register(spdk_poller_fn fn,
		return NULL;
	}

	if (!thread->start_poller_fn || !thread->stop_poller_fn) {
		SPDK_ERRLOG("No related functions to start requested poller\n");
		assert(false);
		return NULL;
	if (thread->start_poller_fn) {
		return thread->start_poller_fn(thread->thread_ctx, fn, arg, period_microseconds);
	}

	poller = thread->start_poller_fn(thread->thread_ctx, fn, arg, period_microseconds);
	if (!poller) {
		SPDK_ERRLOG("Unable to start requested poller\n");
		assert(false);
	poller = calloc(1, sizeof(*poller));
	if (poller == NULL) {
		SPDK_ERRLOG("Poller memory allocation failed\n");
		return NULL;
	}

	poller->state = SPDK_POLLER_STATE_WAITING;
	poller->fn = fn;
	poller->arg = arg;

	if (period_microseconds) {
		quotient = period_microseconds / SPDK_SEC_TO_USEC;
		remainder = period_microseconds % SPDK_SEC_TO_USEC;
		ticks = spdk_get_ticks_hz();

		poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
	} else {
		poller->period_ticks = 0;
	}

	if (poller->period_ticks) {
		_spdk_poller_insert_timer(thread, poller, spdk_get_ticks());
	} else {
		TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
	}

	return poller;
}

@@ -277,9 +437,31 @@ spdk_poller_unregister(struct spdk_poller **ppoller)
	*ppoller = NULL;

	thread = spdk_get_thread();
	if (!thread) {
		assert(false);
		return;
	}

	if (thread) {
	if (thread->stop_poller_fn) {
		thread->stop_poller_fn(poller, thread->thread_ctx);
		return;
	}

	if (poller->state == SPDK_POLLER_STATE_RUNNING) {
		/*
		 * We are being called from the poller_fn, so set the state to unregistered
		 * and let the thread poll loop free the poller.
		 */
		poller->state = SPDK_POLLER_STATE_UNREGISTERED;
	} else {
		/* Poller is not running currently, so just free it. */
		if (poller->period_ticks) {
			TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
		} else {
			TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
		}

		free(poller);
	}
}