Commit 42491fb8 authored by Ben Walker's avatar Ben Walker Committed by Daniel Verkamp
Browse files

env: Add wrappers for a lockless ring



Change-Id: I9679e4bcfc10e38672d1851f7e9f16b6adf7de9b
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
parent 1ad673a9
Loading
Loading
Loading
Loading
+31 −0
Original line number Diff line number Diff line
@@ -227,6 +227,37 @@ uint64_t spdk_get_ticks_hz(void);
 */
void spdk_delay_us(unsigned int us);

struct spdk_ring;

enum spdk_ring_type {
	SPDK_RING_TYPE_MP_SC,		/* Multi-producer, single-consumer */
};

/**
 * Create a ring
 */
struct spdk_ring *spdk_ring_create(enum spdk_ring_type type, size_t count,
				   size_t ele_size, int socket_id);

/**
 * Free the ring
 */
void spdk_ring_free(struct spdk_ring *ring);

/**
 * Queue the array of objects (with length count) on the ring.
 *
 * Return the number of objects enqueued.
 */
size_t spdk_ring_enqueue(struct spdk_ring *ring, void **objs, size_t count);

/**
 * Dequeue count objects from the ring into the array objs.
 *
 * Return the number of objects dequeued.
 */
size_t spdk_ring_dequeue(struct spdk_ring *ring, void **objs, size_t count);

#define SPDK_VTOPHYS_ERROR	(0xFFFFFFFFFFFFFFFFULL)

uint64_t spdk_vtophys(void *buf);
+49 −0
Original line number Diff line number Diff line
@@ -258,3 +258,52 @@ spdk_call_unaffinitized(void *cb(void *arg), void *arg)

	return ret;
}

struct spdk_ring *
spdk_ring_create(enum spdk_ring_type type, size_t count, size_t ele_size, int socket_id)
{
	char ring_name[64];
	static uint32_t ring_num = 0;

	if (type != SPDK_RING_TYPE_MP_SC) {
		return NULL;
	}

	snprintf(ring_name, sizeof(ring_name), "spdk_ring_%u",
		 __sync_fetch_and_add(&ring_num, 1));

	return (struct spdk_ring *)rte_ring_create(ring_name, count, socket_id, RING_F_SC_DEQ);
}

void
spdk_ring_free(struct spdk_ring *ring)
{
	rte_ring_free((struct rte_ring *)ring);
}

size_t
spdk_ring_enqueue(struct spdk_ring *ring, void **objs, size_t count)
{
	int rc;
#if RTE_VERSION < RTE_VERSION_NUM(17, 5, 0, 0)
	rc = rte_ring_mp_enqueue_bulk((struct rte_ring *)ring, objs, count);
#else
	rc = rte_ring_mp_enqueue_bulk((struct rte_ring *)ring, objs, count, NULL);
#endif

	if (rc == 0) {
		return count;
	}

	return 0;
}

size_t
spdk_ring_dequeue(struct spdk_ring *ring, void **objs, size_t count)
{
#if RTE_VERSION < RTE_VERSION_NUM(17, 5, 0, 0)
	return rte_ring_sc_dequeue_burst((struct rte_ring *)ring, objs, count);
#else
	return rte_ring_sc_dequeue_burst((struct rte_ring *)ring, objs, count, NULL);
#endif
}
+12 −17
Original line number Diff line number Diff line
@@ -44,8 +44,8 @@
#endif

#include <rte_config.h>
#include <rte_version.h>
#include <rte_ring.h>
#include <rte_launch.h>
#include <rte_lcore.h>

#include "spdk/log.h"
#include "spdk/io_channel.h"
@@ -111,7 +111,7 @@ struct spdk_reactor {
	 */
	TAILQ_HEAD(timer_pollers_head, spdk_poller)	timer_pollers;

	struct rte_ring					*events;
	struct spdk_ring				*events;

	/* Pointer to the per-socket g_spdk_event_mempool for this reactor. */
	struct spdk_mempool				*event_mempool;
@@ -165,8 +165,8 @@ spdk_event_call(struct spdk_event *event)
	reactor = spdk_reactor_get(event->lcore);

	assert(reactor->events != NULL);
	rc = rte_ring_mp_enqueue(reactor->events, event);
	if (rc != 0) {
	rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1);
	if (rc != 1) {
		assert(false);
	}
}
@@ -179,17 +179,14 @@ _spdk_event_queue_run_batch(struct spdk_reactor *reactor)

#ifdef DEBUG
	/*
	 * rte_ring_dequeue_burst() fills events and returns how many entries it wrote,
	 * spdk_ring_dequeue() fills events and returns how many entries it wrote,
	 * so we will never actually read uninitialized data from events, but just to be sure
	 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
	 */
	memset(events, 0, sizeof(events));
#endif
#if RTE_VERSION < RTE_VERSION_NUM(17,5,0,0)
	count = rte_ring_sc_dequeue_burst(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
#else
	count = rte_ring_sc_dequeue_burst(reactor->events, events, SPDK_EVENT_BATCH_SIZE, NULL);
#endif

	count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
	if (count == 0) {
		return 0;
	}
@@ -402,8 +399,6 @@ _spdk_reactor_run(void *arg)
static void
spdk_reactor_construct(struct spdk_reactor *reactor, uint32_t lcore, uint64_t max_delay_us)
{
	char	ring_name[64];

	reactor->lcore = lcore;
	reactor->socket_id = spdk_env_get_socket_id(lcore);
	assert(reactor->socket_id < SPDK_MAX_SOCKET);
@@ -412,9 +407,9 @@ spdk_reactor_construct(struct spdk_reactor *reactor, uint32_t lcore, uint64_t ma
	TAILQ_INIT(&reactor->active_pollers);
	TAILQ_INIT(&reactor->timer_pollers);

	snprintf(ring_name, sizeof(ring_name) - 1, "spdk_event_queue_%u", lcore);
	reactor->events =
		rte_ring_create(ring_name, 65536, reactor->socket_id, RING_F_SC_DEQ);
	reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536,
					   sizeof(struct event *),
					   reactor->socket_id);
	assert(reactor->events != NULL);

	reactor->event_mempool = g_spdk_event_mempool[reactor->socket_id];
@@ -615,7 +610,7 @@ spdk_reactors_fini(void)
	SPDK_ENV_FOREACH_CORE(i) {
		reactor = spdk_reactor_get(i);
		if (reactor->events != NULL) {
			rte_ring_free(reactor->events);
			spdk_ring_free(reactor->events);
		}
	}

+0 −1
Original line number Diff line number Diff line
@@ -34,7 +34,6 @@
#include "spdk/stdinc.h"

#include <rte_config.h>
#include <rte_ring.h>
#include <rte_lcore.h>

#include "spdk/env.h"