Commit 921bf342 authored by Daniel Verkamp's avatar Daniel Verkamp
Browse files

event: replace poller rte_ring with TAILQ



The rte_ring used for pollers is already single-producer and
single-consumer, so it is not providing any thread safety guarantees.
ALl modifications to the active_pollers ring are done from the core that
is running the reactor (via events).  This means the rte_ring can be
replaced with a simpler intrusive linked list.

This simplifies the removal of pollers in the middle of the list and
avoids extra allocations for the ring.

Change-Id: Ica149b7a1668a8af1e6ca8f741c48f2217f6f9bf
Signed-off-by: default avatarDaniel Verkamp <daniel.verkamp@intel.com>
parent bd4ac74e
Loading
Loading
Loading
Loading
+4 −11
Original line number Diff line number Diff line
@@ -103,19 +103,12 @@ typedef void (*spdk_poller_fn)(void *arg);
 * \brief A poller is a function that is repeatedly called on an lcore.
 */
struct spdk_poller {
	TAILQ_ENTRY(spdk_poller)	tailq;
	uint32_t			lcore;
	spdk_poller_fn			fn;
	void				*arg;
};

#define SPDK_POLLER_RING_SIZE		4096

/*
 * -1 accounts for the empty slot needed to differentiate between ring empty
 *  and ring full.
 */
#define SPDK_MAX_POLLERS_PER_CORE	(SPDK_POLLER_RING_SIZE - 1)

typedef void (*spdk_app_shutdown_cb)(void);
typedef void (*spdk_sighandler_t)(int);

+9 −30
Original line number Diff line number Diff line
@@ -76,7 +76,7 @@ struct spdk_reactor {
	 *  of the ring, executes it, then puts it back at the tail of
	 *  the ring.
	 */
	struct rte_ring			*active_pollers;
	TAILQ_HEAD(, spdk_poller)	active_pollers;

	struct rte_ring			*events;
};
@@ -253,8 +253,7 @@ static int
_spdk_reactor_run(void *arg)
{
	struct spdk_reactor	*reactor = arg;
	struct spdk_poller	*poller = NULL;
	int			rc;
	struct spdk_poller	*poller;

	set_reactor_thread_name();
	SPDK_NOTICELOG("waiting for work item to arrive...\n");
@@ -264,14 +263,11 @@ _spdk_reactor_run(void *arg)

		rte_timer_manage();

		if (rte_ring_dequeue(reactor->active_pollers, (void **)&poller) == 0) {
		poller = TAILQ_FIRST(&reactor->active_pollers);
		if (poller) {
			TAILQ_REMOVE(&reactor->active_pollers, poller, tailq);
			poller->fn(poller->arg);
			rc = rte_ring_enqueue(reactor->active_pollers,
					      (void *)poller);
			if (rc != 0) {
				SPDK_ERRLOG("poller could not be enqueued\n");
				exit(EXIT_FAILURE);
			}
			TAILQ_INSERT_TAIL(&reactor->active_pollers, poller, tailq);
		}

		if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) {
@@ -289,10 +285,7 @@ spdk_reactor_construct(struct spdk_reactor *reactor, uint32_t lcore)

	reactor->lcore = lcore;

	snprintf(ring_name, sizeof(ring_name), "spdk_active_pollers_%d", lcore);
	reactor->active_pollers =
		rte_ring_create(ring_name, SPDK_POLLER_RING_SIZE, rte_lcore_to_socket_id(lcore),
				RING_F_SP_ENQ | RING_F_SC_DEQ);
	TAILQ_INIT(&reactor->active_pollers);

	snprintf(ring_name, sizeof(ring_name) - 1, "spdk_event_queue_%u", lcore);
	reactor->events =
@@ -530,7 +523,7 @@ _spdk_event_add_poller(spdk_event_t event)

	poller->lcore = reactor->lcore;

	rte_ring_enqueue(reactor->active_pollers, (void *)poller);
	TAILQ_INSERT_TAIL(&reactor->active_pollers, poller, tailq);

	if (next) {
		spdk_event_call(next);
@@ -555,22 +548,8 @@ _spdk_event_remove_poller(spdk_event_t event)
	struct spdk_reactor *reactor = spdk_event_get_arg1(event);
	struct spdk_poller *poller = spdk_event_get_arg2(event);
	struct spdk_event *next = spdk_event_get_next(event);
	struct spdk_poller *tmp = NULL;
	uint32_t i;
	int rc;

	/* Loop over all pollers, without breaking early, so that
	 * the list of pollers stays in the same order. */
	for (i = 0; i < rte_ring_count(reactor->active_pollers); i++) {
		rte_ring_dequeue(reactor->active_pollers, (void **)&tmp);
		if (tmp != poller) {
			rc = rte_ring_enqueue(reactor->active_pollers, (void *)tmp);
			if (rc != 0) {
				SPDK_ERRLOG("poller could not be enqueued\n");
				exit(EXIT_FAILURE);
			}
		}
	}
	TAILQ_REMOVE(&reactor->active_pollers, poller, tailq);

	if (next) {
		spdk_event_call(next);