Commit 32d7c91c authored by Jim Harris's avatar Jim Harris
Browse files

bdev: add spdk_bdev_queue_io_wait()



This function is intended to be used when an spdk_bdev
I/O operation (such as spdk_bdev_write or spdk_bdev_write_blocks)
fails due to spdk_bdev_io buffer exhaustion.  The caller
can queue an spdk_bdev_io_wait structure on the calling thread
which will be invoked when an spdk_bdev_io buffer is available.

While here, turn off error messages in bdev.c related to
spdk_bdev_io pool exhaustion, since we now have an API designed
to gracefully recover from it.

Also modify bdevperf as an example of how to use this new API.

Signed-off-by: default avatarJim Harris <james.r.harris@intel.com>
Change-Id: Ia55f6582dc5a8d6d8bcc74689fd846d742324510

Reviewed-on: https://review.gerrithub.io/415074


Tested-by: default avatarSPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: default avatarDaniel Verkamp <daniel.verkamp@intel.com>
Reviewed-by: default avatarZiye Yang <optimistyzy@gmail.com>
Reviewed-by: default avatarBen Walker <benjamin.walker@intel.com>
parent 714776c0
Loading
Loading
Loading
Loading
+48 −0
Original line number Diff line number Diff line
@@ -44,6 +44,7 @@
#include "spdk/scsi_spec.h"
#include "spdk/nvme_spec.h"
#include "spdk/json.h"
#include "spdk/queue.h"

#ifdef __cplusplus
extern "C" {
@@ -889,6 +890,53 @@ int spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *bdev_desc,
 */
void spdk_bdev_free_io(struct spdk_bdev_io *bdev_io);

/**
 * Block device I/O wait callback
 *
 * Callback function to notify when an spdk_bdev_io structure is available
 * to satisfy a call to one of the @ref bdev_io_submit_functions.
 */
typedef void (*spdk_bdev_io_wait_cb)(void *cb_arg);

/**
 * Structure to register a callback when an spdk_bdev_io becomes available.
 */
struct spdk_bdev_io_wait_entry {
	struct spdk_bdev			*bdev;
	spdk_bdev_io_wait_cb			cb_fn;
	void					*cb_arg;
	TAILQ_ENTRY(spdk_bdev_io_wait_entry)	link;
};

/**
 * Add an entry into the calling thread's queue to be notified when an
 * spdk_bdev_io becomes available.
 *
 * When one of the @ref bdev_io_submit_functions returns -ENOMEM, it means
 * the spdk_bdev_io buffer pool has no available buffers. This function may
 * be called to register a callback to be notified when a buffer becomes
 * available on the calling thread.
 *
 * The callback function will always be called on the same thread as this
 * function was called.
 *
 * This function must only be called immediately after one of the
 * @ref bdev_io_submit_functions returns -ENOMEM.
 *
 * \param bdev Block device.  The block device that the caller will submit
 *             an I/O to when the callback is invoked.  Must match the bdev
 *             member in the entry parameter.
 * \param ch I/O channel. Obtained by calling spdk_bdev_get_io_channel().
 * \param entry Data structure allocated by the caller specifying the callback
 *              function and argument.
 *
 * \return 0 on success.
 *         -EINVAL if bdev parameter does not match bdev member in entry
 *         -EINVAL if an spdk_bdev_io structure was available on this thread.
 */
int spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
			    struct spdk_bdev_io_wait_entry *entry);

/**
 * Return I/O statistics for this channel.
 *
+39 −16
Original line number Diff line number Diff line
@@ -166,6 +166,7 @@ struct spdk_bdev_mgmt_channel {
	uint32_t	bdev_io_cache_size;

	TAILQ_HEAD(, spdk_bdev_shared_resource)	shared_resources;
	TAILQ_HEAD(, spdk_bdev_io_wait_entry)	io_wait_queue;
};

/*
@@ -541,6 +542,7 @@ spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
	}

	TAILQ_INIT(&ch->shared_resources);
	TAILQ_INIT(&ch->io_wait_queue);

	return 0;
}
@@ -924,12 +926,14 @@ spdk_bdev_get_io(struct spdk_bdev_channel *channel)
		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link);
		ch->per_thread_cache_count--;
	} else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) {
		/*
		 * Don't try to look for bdev_ios in the global pool if there are
		 * waiters on bdev_ios - we don't want this caller to jump the line.
		 */
		bdev_io = NULL;
	} else {
		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
		if (!bdev_io) {
			SPDK_ERRLOG("Unable to get spdk_bdev_io\n");
			return NULL;
		}
	}

	return bdev_io;
@@ -950,7 +954,16 @@ spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
	if (ch->per_thread_cache_count < ch->bdev_io_cache_size) {
		ch->per_thread_cache_count++;
		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link);
		while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) {
			struct spdk_bdev_io_wait_entry *entry;

			entry = TAILQ_FIRST(&ch->io_wait_queue);
			TAILQ_REMOVE(&ch->io_wait_queue, entry, link);
			entry->cb_fn(entry->cb_arg);
		}
	} else {
		/* We should never have a full cache with entries on the io wait queue. */
		assert(TAILQ_EMPTY(&ch->io_wait_queue));
		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
	}
}
@@ -1650,7 +1663,6 @@ spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,

	bdev_io = spdk_bdev_get_io(channel);
	if (!bdev_io) {
		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
		return -ENOMEM;
	}

@@ -1698,7 +1710,6 @@ int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *

	bdev_io = spdk_bdev_get_io(channel);
	if (!bdev_io) {
		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
		return -ENOMEM;
	}

@@ -1747,7 +1758,6 @@ spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,

	bdev_io = spdk_bdev_get_io(channel);
	if (!bdev_io) {
		SPDK_ERRLOG("bdev_io memory allocation failed duing write\n");
		return -ENOMEM;
	}

@@ -1800,7 +1810,6 @@ spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,

	bdev_io = spdk_bdev_get_io(channel);
	if (!bdev_io) {
		SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n");
		return -ENOMEM;
	}

@@ -1852,7 +1861,6 @@ spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channe
	bdev_io = spdk_bdev_get_io(channel);

	if (!bdev_io) {
		SPDK_ERRLOG("bdev_io memory allocation failed duing write_zeroes\n");
		return -ENOMEM;
	}

@@ -1933,7 +1941,6 @@ spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,

	bdev_io = spdk_bdev_get_io(channel);
	if (!bdev_io) {
		SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n");
		return -ENOMEM;
	}

@@ -1984,7 +1991,6 @@ spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,

	bdev_io = spdk_bdev_get_io(channel);
	if (!bdev_io) {
		SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n");
		return -ENOMEM;
	}

@@ -2091,7 +2097,6 @@ spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,

	bdev_io = spdk_bdev_get_io(channel);
	if (!bdev_io) {
		SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n");
		return -ENOMEM;
	}

@@ -2186,7 +2191,6 @@ spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channe

	bdev_io = spdk_bdev_get_io(channel);
	if (!bdev_io) {
		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
		return -ENOMEM;
	}

@@ -2224,7 +2228,6 @@ spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *

	bdev_io = spdk_bdev_get_io(channel);
	if (!bdev_io) {
		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
		return -ENOMEM;
	}

@@ -2262,7 +2265,6 @@ spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channe

	bdev_io = spdk_bdev_get_io(channel);
	if (!bdev_io) {
		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
		return -ENOMEM;
	}

@@ -2280,6 +2282,27 @@ spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channe
	return 0;
}

int
spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
			struct spdk_bdev_io_wait_entry *entry)
{
	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
	struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch;

	if (bdev != entry->bdev) {
		SPDK_ERRLOG("bdevs do not match\n");
		return -EINVAL;
	}

	if (mgmt_ch->per_thread_cache_count > 0) {
		SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n");
		return -EINVAL;
	}

	TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link);
	return 0;
}

static void
_spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
{
+23 −6
Original line number Diff line number Diff line
@@ -51,6 +51,7 @@ struct bdevperf_task {
	uint64_t			offset_blocks;
	enum spdk_bdev_io_type		io_type;
	TAILQ_ENTRY(bdevperf_task)	link;
	struct spdk_bdev_io_wait_entry	bdev_io_wait;
};

static int g_io_size = 0;
@@ -330,8 +331,7 @@ bdevperf_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
}

static void
bdevperf_verify_write_complete(struct spdk_bdev_io *bdev_io, bool success,
			       void *cb_arg)
bdevperf_verify_submit_read(void *cb_arg)
{
	struct io_target	*target;
	struct bdevperf_task	*task = cb_arg;
@@ -342,14 +342,24 @@ bdevperf_verify_write_complete(struct spdk_bdev_io *bdev_io, bool success,
	/* Read the data back in */
	rc = spdk_bdev_read_blocks(target->bdev_desc, target->ch, NULL, task->offset_blocks,
				   target->io_size_blocks, bdevperf_complete, task);
	if (rc) {
	if (rc == -ENOMEM) {
		task->bdev_io_wait.bdev = target->bdev;
		task->bdev_io_wait.cb_fn = bdevperf_verify_submit_read;
		task->bdev_io_wait.cb_arg = task;
		spdk_bdev_queue_io_wait(target->bdev, target->ch, &task->bdev_io_wait);
	} else if (rc != 0) {
		printf("Failed to submit read: %d\n", rc);
		target->is_draining = true;
		g_run_failed = true;
		return;
	}
}

static void
bdevperf_verify_write_complete(struct spdk_bdev_io *bdev_io, bool success,
			       void *cb_arg)
{
	spdk_bdev_free_io(bdev_io);
	bdevperf_verify_submit_read(cb_arg);
}

static __thread unsigned int seed = 0;
@@ -390,8 +400,9 @@ bdevperf_prep_task(struct bdevperf_task *task)
}

static void
bdevperf_submit_task(struct bdevperf_task *task)
bdevperf_submit_task(void *arg)
{
	struct bdevperf_task	*task = arg;
	struct io_target	*target = task->target;
	struct spdk_bdev_desc	*desc;
	struct spdk_io_channel	*ch;
@@ -427,7 +438,13 @@ bdevperf_submit_task(struct bdevperf_task *task)
		break;
	}

	if (rc) {
	if (rc == -ENOMEM) {
		task->bdev_io_wait.bdev = target->bdev;
		task->bdev_io_wait.cb_fn = bdevperf_submit_task;
		task->bdev_io_wait.cb_arg = task;
		spdk_bdev_queue_io_wait(target->bdev, ch, &task->bdev_io_wait);
		return;
	} else if (rc != 0) {
		printf("Failed to submit bdev_io: %d\n", rc);
		target->is_draining = true;
		g_run_failed = true;
+201 −1
Original line number Diff line number Diff line
@@ -77,19 +77,113 @@ stub_destruct(void *ctx)
	return 0;
}

struct bdev_ut_channel {
	TAILQ_HEAD(, spdk_bdev_io)	outstanding_io;
	uint32_t			outstanding_io_count;
};

static uint32_t g_bdev_ut_io_device;
static struct bdev_ut_channel *g_bdev_ut_channel;

static void
stub_submit_request(struct spdk_io_channel *_ch, struct spdk_bdev_io *bdev_io)
{
	struct bdev_ut_channel *ch = spdk_io_channel_get_ctx(_ch);

	TAILQ_INSERT_TAIL(&ch->outstanding_io, bdev_io, module_link);
	ch->outstanding_io_count++;
}

static uint32_t
stub_complete_io(uint32_t num_to_complete)
{
	struct bdev_ut_channel *ch = g_bdev_ut_channel;
	struct spdk_bdev_io *bdev_io;
	uint32_t num_completed = 0;

	while (num_completed < num_to_complete) {
		if (TAILQ_EMPTY(&ch->outstanding_io)) {
			break;
		}
		bdev_io = TAILQ_FIRST(&ch->outstanding_io);
		TAILQ_REMOVE(&ch->outstanding_io, bdev_io, module_link);
		ch->outstanding_io_count--;
		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
		num_completed++;
	}

	return num_completed;
}

static struct spdk_io_channel *
bdev_ut_get_io_channel(void *ctx)
{
	return spdk_get_io_channel(&g_bdev_ut_io_device);
}

static struct spdk_bdev_fn_table fn_table = {
	.destruct = stub_destruct,
	.submit_request = stub_submit_request,
	.get_io_channel = bdev_ut_get_io_channel,
};

static int
bdev_ut_create_ch(void *io_device, void *ctx_buf)
{
	struct bdev_ut_channel *ch = ctx_buf;

	CU_ASSERT(g_bdev_ut_channel == NULL);
	g_bdev_ut_channel = ch;

	TAILQ_INIT(&ch->outstanding_io);
	ch->outstanding_io_count = 0;
	return 0;
}

static void
bdev_ut_destroy_ch(void *io_device, void *ctx_buf)
{
	CU_ASSERT(g_bdev_ut_channel != NULL);
	g_bdev_ut_channel = NULL;
}

static int
bdev_ut_module_init(void)
{
	spdk_io_device_register(&g_bdev_ut_io_device, bdev_ut_create_ch, bdev_ut_destroy_ch,
				sizeof(struct bdev_ut_channel));
	return 0;
}

static void
bdev_ut_module_fini(void)
{
}

struct spdk_bdev_module bdev_ut_if = {
	.name = "bdev_ut",
	.module_init = bdev_ut_module_init,
	.module_fini = bdev_ut_module_fini,
};

static void vbdev_ut_examine(struct spdk_bdev *bdev);

static int
vbdev_ut_module_init(void)
{
	return 0;
}

static void
vbdev_ut_module_fini(void)
{
}

struct spdk_bdev_module vbdev_ut_if = {
	.name = "vbdev_ut",
	.examine = vbdev_ut_examine,
	.module_init = vbdev_ut_module_init,
	.module_fini = vbdev_ut_module_fini,
};

SPDK_BDEV_MODULE_REGISTER(&bdev_ut_if)
@@ -152,6 +246,7 @@ allocate_bdev(char *name)
	bdev->name = name;
	bdev->fn_table = &fn_table;
	bdev->module = &bdev_ut_if;
	bdev->blockcnt = 1;

	rc = spdk_bdev_register(bdev);
	CU_ASSERT(rc == 0);
@@ -543,6 +638,110 @@ alias_add_del_test(void)
	free(bdev[1]);
}

static void
io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
{
	spdk_bdev_free_io(bdev_io);
}

static void
bdev_init_cb(void *arg, int rc)
{
	CU_ASSERT(rc == 0);
}

struct bdev_ut_io_wait_entry {
	struct spdk_bdev_io_wait_entry	entry;
	struct spdk_io_channel		*io_ch;
	struct spdk_bdev_desc		*desc;
	bool				submitted;
};

static void
io_wait_cb(void *arg)
{
	struct bdev_ut_io_wait_entry *entry = arg;
	int rc;

	rc = spdk_bdev_read_blocks(entry->desc, entry->io_ch, NULL, 0, 1, io_done, NULL);
	CU_ASSERT(rc == 0);
	entry->submitted = true;
}

static void
bdev_io_wait_test(void)
{
	struct spdk_bdev *bdev;
	struct spdk_bdev_desc *desc;
	struct spdk_io_channel *io_ch;
	struct spdk_bdev_opts bdev_opts = {
		.bdev_io_pool_size = 4,
		.bdev_io_cache_size = 2,
	};
	struct bdev_ut_io_wait_entry io_wait_entry;
	struct bdev_ut_io_wait_entry io_wait_entry2;
	int rc;

	rc = spdk_bdev_set_opts(&bdev_opts);
	CU_ASSERT(rc == 0);
	spdk_bdev_initialize(bdev_init_cb, NULL);

	bdev = allocate_bdev("bdev0");

	rc = spdk_bdev_open(bdev, true, NULL, NULL, &desc);
	CU_ASSERT(rc == 0);
	CU_ASSERT(desc != NULL);
	io_ch = spdk_bdev_get_io_channel(desc);
	CU_ASSERT(io_ch != NULL);

	rc = spdk_bdev_read_blocks(desc, io_ch, NULL, 0, 1, io_done, NULL);
	CU_ASSERT(rc == 0);
	rc = spdk_bdev_read_blocks(desc, io_ch, NULL, 0, 1, io_done, NULL);
	CU_ASSERT(rc == 0);
	rc = spdk_bdev_read_blocks(desc, io_ch, NULL, 0, 1, io_done, NULL);
	CU_ASSERT(rc == 0);
	rc = spdk_bdev_read_blocks(desc, io_ch, NULL, 0, 1, io_done, NULL);
	CU_ASSERT(rc == 0);
	CU_ASSERT(g_bdev_ut_channel->outstanding_io_count == 4);

	rc = spdk_bdev_read_blocks(desc, io_ch, NULL, 0, 1, io_done, NULL);
	CU_ASSERT(rc == -ENOMEM);

	io_wait_entry.entry.bdev = bdev;
	io_wait_entry.entry.cb_fn = io_wait_cb;
	io_wait_entry.entry.cb_arg = &io_wait_entry;
	io_wait_entry.io_ch = io_ch;
	io_wait_entry.desc = desc;
	io_wait_entry.submitted = false;
	/* Cannot use the same io_wait_entry for two different calls. */
	memcpy(&io_wait_entry2, &io_wait_entry, sizeof(io_wait_entry));
	io_wait_entry2.entry.cb_arg = &io_wait_entry2;

	/* Queue two I/O waits. */
	rc = spdk_bdev_queue_io_wait(bdev, io_ch, &io_wait_entry.entry);
	CU_ASSERT(rc == 0);
	CU_ASSERT(io_wait_entry.submitted == false);
	rc = spdk_bdev_queue_io_wait(bdev, io_ch, &io_wait_entry2.entry);
	CU_ASSERT(rc == 0);
	CU_ASSERT(io_wait_entry2.submitted == false);

	stub_complete_io(1);
	CU_ASSERT(g_bdev_ut_channel->outstanding_io_count == 4);
	CU_ASSERT(io_wait_entry.submitted == true);
	CU_ASSERT(io_wait_entry2.submitted == false);

	stub_complete_io(1);
	CU_ASSERT(g_bdev_ut_channel->outstanding_io_count == 4);
	CU_ASSERT(io_wait_entry2.submitted == true);

	stub_complete_io(4);
	CU_ASSERT(g_bdev_ut_channel->outstanding_io_count == 0);

	spdk_put_io_channel(io_ch);
	spdk_bdev_close(desc);
	free_bdev(bdev);
}

int
main(int argc, char **argv)
{
@@ -565,7 +764,8 @@ main(int argc, char **argv)
		CU_add_test(suite, "io_valid", io_valid_test) == NULL ||
		CU_add_test(suite, "open_write", open_write_test) == NULL ||
		CU_add_test(suite, "alias_add_del", alias_add_del_test) == NULL ||
		CU_add_test(suite, "get_device_stat", get_device_stat_test) == NULL
		CU_add_test(suite, "get_device_stat", get_device_stat_test) == NULL ||
		CU_add_test(suite, "bdev_io_wait", bdev_io_wait_test) == NULL
	) {
		CU_cleanup_registry();
		return CU_get_error();