Commit c41ab41c authored by Daniel Verkamp's avatar Daniel Verkamp Committed by Ben Walker
Browse files

event: add timer-based pollers



Allow pollers to be scheduled to be run periodically every N
microseconds instead of every iteration of the reactor loop.

Change-Id: Iaea3e98965d81044e6dc5ce5f406bcb7a455289e
Signed-off-by: default avatarDaniel Verkamp <daniel.verkamp@intel.com>
parent aeb3d506
Loading
Loading
Loading
Loading
+4 −1
Original line number Diff line number Diff line
@@ -105,6 +105,8 @@ typedef void (*spdk_poller_fn)(void *arg);
struct spdk_poller {
	TAILQ_ENTRY(spdk_poller)	tailq;
	uint32_t			lcore;
	uint64_t			period_ticks;
	uint64_t			next_run_tick;
	spdk_poller_fn			fn;
	void				*arg;
};
@@ -220,7 +222,8 @@ void spdk_event_queue_run_all(uint32_t lcore);
 */
void spdk_poller_register(struct spdk_poller *poller,
			  uint32_t lcore,
			  struct spdk_event *complete);
			  struct spdk_event *complete,
			  uint64_t period_microseconds);

/**
 * \brief Unregister a poller on the given lcore.
+1 −1
Original line number Diff line number Diff line
@@ -448,7 +448,7 @@ spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
		if (lcore == 0) {
			lcore = rte_lcore_id();
		}
		spdk_poller_register(&bdev->poller, lcore, NULL);
		spdk_poller_register(&bdev->poller, lcore, NULL, 0);
	}

	if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) {
+72 −9
Original line number Diff line number Diff line
@@ -47,6 +47,7 @@
#endif

#include <rte_config.h>
#include <rte_cycles.h>
#include <rte_debug.h>
#include <rte_mempool.h>
#include <rte_ring.h>
@@ -78,6 +79,11 @@ struct spdk_reactor {
	 */
	TAILQ_HEAD(, spdk_poller)			active_pollers;

	/**
	 * Contains pollers running on this reactor with a periodic timer.
	 */
	TAILQ_HEAD(timer_pollers_head, spdk_poller)	timer_pollers;

	struct rte_ring					*events;
};

@@ -220,6 +226,30 @@ static void set_reactor_thread_name(void)
#endif
}

static void
spdk_poller_insert_timer(struct spdk_reactor *reactor, struct spdk_poller *poller, uint64_t now)
{
	struct spdk_poller *iter;
	uint64_t next_run_tick;

	next_run_tick = now + poller->period_ticks;
	poller->next_run_tick = next_run_tick;

	/*
	 * Insert poller in the reactor's timer_pollers list in sorted order by next scheduled
	 * run time.
	 */
	TAILQ_FOREACH_REVERSE(iter, &reactor->timer_pollers, timer_pollers_head, tailq) {
		if (iter->next_run_tick <= next_run_tick) {
			TAILQ_INSERT_AFTER(&reactor->timer_pollers, iter, poller, tailq);
			return;
		}
	}

	/* No earlier pollers were found, so this poller must be the new head */
	TAILQ_INSERT_HEAD(&reactor->timer_pollers, poller, tailq);
}

/**

\brief This is the main function of the reactor thread.
@@ -270,6 +300,17 @@ _spdk_reactor_run(void *arg)
			TAILQ_INSERT_TAIL(&reactor->active_pollers, poller, tailq);
		}

		poller = TAILQ_FIRST(&reactor->timer_pollers);
		if (poller) {
			uint64_t now = rte_get_timer_cycles();

			if (now >= poller->next_run_tick) {
				TAILQ_REMOVE(&reactor->timer_pollers, poller, tailq);
				poller->fn(poller->arg);
				spdk_poller_insert_timer(reactor, poller, now);
			}
		}

		if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) {
			break;
		}
@@ -286,6 +327,7 @@ spdk_reactor_construct(struct spdk_reactor *reactor, uint32_t lcore)
	reactor->lcore = lcore;

	TAILQ_INIT(&reactor->active_pollers);
	TAILQ_INIT(&reactor->timer_pollers);

	snprintf(ring_name, sizeof(ring_name) - 1, "spdk_event_queue_%u", lcore);
	reactor->events =
@@ -523,16 +565,20 @@ _spdk_event_add_poller(spdk_event_t event)

	poller->lcore = reactor->lcore;

	if (poller->period_ticks) {
		spdk_poller_insert_timer(reactor, poller, rte_get_timer_cycles());
	} else {
		TAILQ_INSERT_TAIL(&reactor->active_pollers, poller, tailq);
	}

	if (next) {
		spdk_event_call(next);
	}
}

void
spdk_poller_register(struct spdk_poller *poller,
		     uint32_t lcore, spdk_event_t complete)
static void
_spdk_poller_register(struct spdk_poller *poller, uint32_t lcore,
		      struct spdk_event *complete)
{
	struct spdk_reactor *reactor;
	struct spdk_event *event;
@@ -542,6 +588,19 @@ spdk_poller_register(struct spdk_poller *poller,
	spdk_event_call(event);
}

void
spdk_poller_register(struct spdk_poller *poller,
		     uint32_t lcore, struct spdk_event *complete, uint64_t period_microseconds)
{
	if (period_microseconds) {
		poller->period_ticks = (rte_get_timer_hz() * period_microseconds) / 1000000ULL;
	} else {
		poller->period_ticks = 0;
	}

	_spdk_poller_register(poller, lcore, complete);
}

static void
_spdk_event_remove_poller(spdk_event_t event)
{
@@ -549,7 +608,11 @@ _spdk_event_remove_poller(spdk_event_t event)
	struct spdk_poller *poller = spdk_event_get_arg2(event);
	struct spdk_event *next = spdk_event_get_next(event);

	if (poller->period_ticks) {
		TAILQ_REMOVE(&reactor->timer_pollers, poller, tailq);
	} else {
		TAILQ_REMOVE(&reactor->active_pollers, poller, tailq);
	}

	if (next) {
		spdk_event_call(next);
@@ -579,7 +642,7 @@ _spdk_poller_migrate(spdk_event_t event)
	 * because we already set this event up so that it is called
	 * on the new_lcore.
	 */
	spdk_poller_register(poller, rte_lcore_id(), next);
	_spdk_poller_register(poller, rte_lcore_id(), next);
}

void
+1 −1
Original line number Diff line number Diff line
@@ -115,7 +115,7 @@ nvmf_create_subsystem(int num, const char *name,

	subsystem->poller.fn = spdk_nvmf_subsystem_poller;
	subsystem->poller.arg = subsystem;
	spdk_poller_register(&subsystem->poller, lcore, NULL);
	spdk_poller_register(&subsystem->poller, lcore, NULL, 0);

	TAILQ_INSERT_HEAD(&g_subsystems, subsystem, entries);

+1 −1
Original line number Diff line number Diff line
@@ -34,7 +34,7 @@
SPDK_ROOT_DIR := $(abspath $(CURDIR)/../../..)
include $(SPDK_ROOT_DIR)/mk/spdk.common.mk

DIRS-y = event subsystem
DIRS-y = event reactor subsystem

.PHONY: all clean $(DIRS-y)

Loading