Commit 52bbb267 authored by Daniel Verkamp's avatar Daniel Verkamp Committed by Ben Walker
Browse files

event: process events in batches



Since we are usually going to be removing multiple events from the queue
at once, use the DPDK burst dequeue interface to improve efficiency.

Also rework the event queue runner to always process a fixed maximum
number of events per timeslice for simplicity.  This removes the
rte_ring_count() call from the hot path and improves fairness between
events and pollers.

Now that events are dequeued in bulk, we can also put the event objects
back into the mempool in bulk.  Add an env wrapper around
rte_mempool_put_bulk() and use it to free all of the events at once.

Basic performance benchmark using test/lib/event/event/event -t 10
is improved: previously ~40 million events per second, now ~46 million
events per second.

Change-Id: I432e8a48774a087eec2be3a64c38c339608af42a
Signed-off-by: default avatarDaniel Verkamp <daniel.verkamp@intel.com>
parent e3580da1
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
@@ -128,6 +128,12 @@ spdk_mempool_get(struct spdk_mempool *mp);
void
spdk_mempool_put(struct spdk_mempool *mp, void *ele);

/**
 * Put multiple elements back into the memory pool.
 */
void
spdk_mempool_put_bulk(struct spdk_mempool *mp, void *const *ele_arr, size_t count);


/**
 * Return true if the calling process is primary process
+1 −1
Original line number Diff line number Diff line
@@ -220,7 +220,7 @@ void spdk_event_call(spdk_event_t event);
#define spdk_event_get_arg2(event)	(event)->arg2

/* TODO: This is only used by tests and should be made private */
uint32_t spdk_event_queue_run_all(uint32_t lcore);
uint32_t spdk_event_queue_run_batch(uint32_t lcore);

/**
 * \brief Register a poller on the given lcore.
+6 −0
Original line number Diff line number Diff line
@@ -158,6 +158,12 @@ spdk_mempool_put(struct spdk_mempool *mp, void *ele)
	rte_mempool_put((struct rte_mempool *)mp, ele);
}

void
spdk_mempool_put_bulk(struct spdk_mempool *mp, void *const *ele_arr, size_t count)
{
	rte_mempool_put_bulk((struct rte_mempool *)mp, ele_arr, count);
}

bool
spdk_process_is_primary(void)
{
+20 −49
Original line number Diff line number Diff line
@@ -60,6 +60,8 @@

#define SPDK_REACTOR_SPIN_TIME_US	1

#define SPDK_EVENT_BATCH_SIZE		8

enum spdk_poller_state {
	/* The poller is registered with a reactor but not currently executing its fn. */
	SPDK_POLLER_STATE_WAITING,
@@ -144,7 +146,8 @@ spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2,
		    spdk_event_t next)
{
	struct spdk_event *event = NULL;
	uint8_t socket_id = rte_lcore_to_socket_id(lcore);
	unsigned socket_id = rte_lcore_to_socket_id(lcore);

	assert(socket_id < SPDK_MAX_SOCKET);

	event = spdk_mempool_get(g_spdk_event_mempool[socket_id]);
@@ -162,15 +165,6 @@ spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2,
	return event;
}

static void
spdk_event_free(uint32_t lcore, struct spdk_event *event)
{
	uint8_t socket_id = rte_lcore_to_socket_id(lcore);
	assert(socket_id < SPDK_MAX_SOCKET);

	spdk_mempool_put(g_spdk_event_mempool[socket_id], (void *)event);
}

void
spdk_event_call(spdk_event_t event)
{
@@ -186,55 +180,32 @@ spdk_event_call(spdk_event_t event)
	}
}

static uint32_t
spdk_event_queue_count(uint32_t lcore)
uint32_t
spdk_event_queue_run_batch(uint32_t lcore)
{
	struct spdk_reactor *reactor;
	unsigned socket_id;
	unsigned count, i;
	void *events[SPDK_EVENT_BATCH_SIZE];

	reactor = spdk_reactor_get(lcore);
	assert(reactor->events != NULL);

	if (reactor->events == NULL) {
		return 0;
	}
	socket_id = rte_lcore_to_socket_id(lcore);
	assert(socket_id < SPDK_MAX_SOCKET);

	return rte_ring_count(reactor->events);
	count = rte_ring_dequeue_burst(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
	if (count == 0) {
		return 0;
	}

static void
spdk_event_queue_run_single(uint32_t lcore)
{
	struct spdk_event *event = NULL;
	struct spdk_reactor *reactor;
	int rc;

	reactor = spdk_reactor_get(lcore);

	assert(reactor->events != NULL);
	rc = rte_ring_dequeue(reactor->events, (void **)&event);

	if ((rc != 0) || event == NULL) {
		return;
	}
	for (i = 0; i < count; i++) {
		struct spdk_event *event = events[i];

		event->fn(event);
	spdk_event_free(lcore, event);
	}

static void
spdk_event_queue_run(uint32_t lcore, uint32_t count)
{
	while (count--) {
		spdk_event_queue_run_single(lcore);
	}
}

uint32_t
spdk_event_queue_run_all(uint32_t lcore)
{
	uint32_t count;

	count = spdk_event_queue_count(lcore);
	spdk_event_queue_run(lcore, count);
	spdk_mempool_put_bulk(g_spdk_event_mempool[socket_id], events, count);

	return count;
}
@@ -344,7 +315,7 @@ _spdk_reactor_run(void *arg)
	last_action = spdk_get_ticks();

	while (1) {
		event_count = spdk_event_queue_run_all(rte_lcore_id());
		event_count = spdk_event_queue_run_batch(rte_lcore_id());
		if (event_count > 0) {
			last_action = spdk_get_ticks();
		}
+1 −1
Original line number Diff line number Diff line
@@ -82,7 +82,7 @@ event_work_fn(void *arg)

	while (1) {

		spdk_event_queue_run_all(rte_lcore_id());
		spdk_event_queue_run_batch(rte_lcore_id());

		if (spdk_get_ticks() > tsc_end) {
			break;