Commit dadb9485 authored by Ben Walker's avatar Ben Walker Committed by Jim Harris
Browse files

bdev/aio: Reap completions from userspace if supported



Change-Id: I30d9cc619df2fddb870ed7bf187f14cd44376d19
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.gerrithub.io/c/443468


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
parent 5e5acd3d
Loading
Loading
Loading
Loading
+81 −6
Original line number Diff line number Diff line
@@ -35,10 +35,12 @@

#include "spdk/stdinc.h"

#include "spdk/barrier.h"
#include "spdk/bdev.h"
#include "spdk/conf.h"
#include "spdk/env.h"
#include "spdk/fd.h"
#include "spdk/likely.h"
#include "spdk/thread.h"
#include "spdk/json.h"
#include "spdk/util.h"
@@ -75,6 +77,21 @@ struct file_disk {
	bool			block_size_override;
};

/* For user space reaping of completions */
struct spdk_aio_ring {
	uint32_t id;
	uint32_t size;
	uint32_t head;
	uint32_t tail;

	uint32_t version;
	uint32_t compat_features;
	uint32_t incompat_features;
	uint32_t header_length;
};

#define SPDK_AIO_RING_VERSION	0xa10a10a1

static int bdev_aio_initialize(void);
static void bdev_aio_fini(void);
static void aio_free_disk(struct file_disk *fdisk);
@@ -230,6 +247,69 @@ bdev_aio_destruct(void *ctx)
	return rc;
}

static int
bdev_user_io_getevents(io_context_t io_ctx, unsigned int max, struct io_event *uevents)
{
	uint32_t head, tail, count;
	struct spdk_aio_ring *ring;
	struct timespec timeout;
	struct io_event *kevents;

	ring = (struct spdk_aio_ring *)io_ctx;

	if (spdk_unlikely(ring->version != SPDK_AIO_RING_VERSION || ring->incompat_features != 0)) {
		timeout.tv_sec = 0;
		timeout.tv_nsec = 0;

		return io_getevents(io_ctx, 0, max, uevents, &timeout);
	}

	/* Read the current state out of the ring */
	head = ring->head;
	tail = ring->tail;

	/* This memory barrier is required to prevent the loads above
	 * from being re-ordered with stores to the events array
	 * potentially occurring on other threads. */
	spdk_smp_rmb();

	/* Calculate how many items are in the circular ring */
	count = tail - head;
	if (tail < head) {
		count += ring->size;
	}

	/* Reduce the count to the limit provided by the user */
	count = spdk_min(max, count);

	/* Grab the memory location of the event array */
	kevents = (struct io_event *)((uintptr_t)ring + ring->header_length);

	/* Copy the events out of the ring. */
	if ((head + count) <= ring->size) {
		/* Only one copy is required */
		memcpy(uevents, &kevents[head], count * sizeof(struct io_event));
	} else {
		uint32_t first_part = ring->size - head;
		/* Two copies are required */
		memcpy(uevents, &kevents[head], first_part * sizeof(struct io_event));
		memcpy(&uevents[first_part], &kevents[0], (count - first_part) * sizeof(struct io_event));
	}

	/* Update the head pointer. On x86, stores will not be reordered with older loads,
	 * so the copies out of the event array will always be complete prior to this
	 * update becoming visible. On other architectures this is not guaranteed, so
	 * add a barrier. */
#if defined(__i386__) || defined(__x86_64__)
	spdk_compiler_barrier();
#else
	spdk_mb();
#endif
	ring->head = (head + count) % ring->size;

	return count;
}

static int
bdev_aio_group_poll(void *arg)
{
@@ -237,14 +317,9 @@ bdev_aio_group_poll(void *arg)
	int nr, i = 0;
	enum spdk_bdev_io_status status;
	struct bdev_aio_task *aio_task;
	struct timespec timeout;
	struct io_event events[SPDK_AIO_QUEUE_DEPTH];

	timeout.tv_sec = 0;
	timeout.tv_nsec = 0;

	nr = io_getevents(group_ch->io_ctx, 0, SPDK_AIO_QUEUE_DEPTH,
			  events, &timeout);
	nr = bdev_user_io_getevents(group_ch->io_ctx, SPDK_AIO_QUEUE_DEPTH, events);

	if (nr < 0) {
		return -1;