Commit 35fdf2dd authored by Konrad Sztyber's avatar Konrad Sztyber Committed by Tomasz Zawadzki
Browse files

lib/ftl: write buffer batches



This patch adds the ftl_batch strucutre, which describes a single batch
of data to be written to the disk.  It's comprised of multiple write
buffer entries (the actual number depends on the write unit size of the
underyling device).

Additionally, a function responsible for filling out a batch was added.
It iterates over available IO channels and dequeues submitted write
buffer entries until a batch is completed.  The IO channel queue is
shifted, so that each time the function is called a subsequent IO
channel is used first, which guarantees that all channels are treated
fairly.

Change-Id: Ie61d8c6cb51d5c5175540c975447bc27590c5cb4
Signed-off-by: default avatarKonrad Sztyber <konrad.sztyber@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/905


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarWojciech Malikowski <wojciech.malikowski@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
parent 1e8c9a4d
Loading
Loading
Loading
Loading
+88 −3
Original line number Diff line number Diff line
@@ -193,9 +193,7 @@ ftl_acquire_wbuf_entry(struct ftl_io_channel *io_channel, int io_flags)
	return entry;
}

void ftl_release_wbuf_entry(struct ftl_wbuf_entry *entry);

void
static void
ftl_release_wbuf_entry(struct ftl_wbuf_entry *entry)
{
	struct ftl_io_channel *io_channel = entry->ioch;
@@ -207,6 +205,93 @@ ftl_release_wbuf_entry(struct ftl_wbuf_entry *entry)
	spdk_ring_enqueue(io_channel->free_queue, (void **)&entry, 1, NULL);
}

struct ftl_batch *ftl_get_next_batch(struct spdk_ftl_dev *dev);

struct ftl_batch *
ftl_get_next_batch(struct spdk_ftl_dev *dev)
{
	struct ftl_batch *batch = dev->current_batch;
	struct ftl_io_channel *ioch;
#define FTL_DEQUEUE_ENTRIES 128
	struct ftl_wbuf_entry *entries[FTL_DEQUEUE_ENTRIES];
	TAILQ_HEAD(, ftl_io_channel) ioch_queue;
	size_t i, num_dequeued, num_remaining;

	if (batch == NULL) {
		batch = TAILQ_FIRST(&dev->free_batches);
		if (spdk_unlikely(batch == NULL)) {
			return NULL;
		}

		assert(TAILQ_EMPTY(&batch->entries));
		assert(batch->num_entries == 0);
		TAILQ_REMOVE(&dev->free_batches, batch, tailq);
	}

	/*
	 * Keep shifting the queue to ensure fairness in IO channel selection.  Each time
	 * ftl_get_next_batch() is called, we're starting to dequeue write buffer entries from a
	 * different IO channel.
	 */
	TAILQ_INIT(&ioch_queue);
	while (!TAILQ_EMPTY(&dev->ioch_queue)) {
		ioch = TAILQ_FIRST(&dev->ioch_queue);
		TAILQ_REMOVE(&dev->ioch_queue, ioch, tailq);
		TAILQ_INSERT_TAIL(&ioch_queue, ioch, tailq);

		num_remaining = dev->xfer_size - batch->num_entries;
		while (num_remaining > 0) {
			num_dequeued = spdk_ring_dequeue(ioch->submit_queue, (void **)entries,
							 spdk_min(num_remaining,
									 FTL_DEQUEUE_ENTRIES));
			if (num_dequeued == 0) {
				break;
			}

			for (i = 0; i < num_dequeued; ++i) {
				batch->iov[batch->num_entries + i].iov_base = entries[i]->payload;
				batch->iov[batch->num_entries + i].iov_len = FTL_BLOCK_SIZE;
				TAILQ_INSERT_TAIL(&batch->entries, entries[i], tailq);
			}

			batch->num_entries += num_dequeued;
			num_remaining -= num_dequeued;
		}

		if (num_remaining == 0) {
			break;
		}
	}

	TAILQ_CONCAT(&dev->ioch_queue, &ioch_queue, tailq);

	if (batch->num_entries == dev->xfer_size) {
		dev->current_batch = NULL;
	} else {
		dev->current_batch = batch;
		batch = NULL;
	}

	return batch;
}

void ftl_release_batch(struct spdk_ftl_dev *dev, struct ftl_batch *batch);

void
ftl_release_batch(struct spdk_ftl_dev *dev, struct ftl_batch *batch)
{
	struct ftl_wbuf_entry *entry;

	while (!TAILQ_EMPTY(&batch->entries)) {
		entry = TAILQ_FIRST(&batch->entries);
		TAILQ_REMOVE(&batch->entries, entry, tailq);
		ftl_release_wbuf_entry(entry);
	}

	batch->num_entries = 0;
	TAILQ_INSERT_TAIL(&dev->free_batches, batch, tailq);
}

static void
ftl_io_cmpl_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
{
+20 −0
Original line number Diff line number Diff line
@@ -107,6 +107,17 @@ struct ftl_nv_cache {
	pthread_spinlock_t			lock;
};

struct ftl_batch {
	/* Queue of write buffer entries, can reach up to xfer_size entries */
	TAILQ_HEAD(, ftl_wbuf_entry)		entries;
	/* Number of entries in the queue above */
	uint32_t				num_entries;
	/* Index within spdk_ftl_dev.batch_array */
	uint32_t				index;
	struct iovec				*iov;
	TAILQ_ENTRY(ftl_batch)			tailq;
};

struct spdk_ftl_dev {
	/* Device instance */
	struct spdk_uuid			uuid;
@@ -209,6 +220,15 @@ struct spdk_ftl_dev {
	TAILQ_HEAD(, ftl_io_channel)		ioch_queue;
	uint64_t				num_io_channels;

	/* Write buffer batches */
#define FTL_BATCH_COUNT 4096
	struct ftl_batch			batch_array[FTL_BATCH_COUNT];
	/* Iovec buffer used by batches */
	struct iovec				*iov_buf;
	/* Batch currently being filled  */
	struct ftl_batch			*current_batch;
	TAILQ_HEAD(, ftl_batch)			free_batches;

	/* Devices' list */
	STAILQ_ENTRY(spdk_ftl_dev)		stailq;
};
+21 −0
Original line number Diff line number Diff line
@@ -1195,13 +1195,33 @@ ftl_io_channel_destroy_cb(void *io_device, void *ctx)
static int
ftl_dev_init_io_channel(struct spdk_ftl_dev *dev)
{
	struct ftl_batch *batch;
	uint32_t i;

	dev->ioch_array = calloc(dev->conf.max_io_channels, sizeof(*dev->ioch_array));
	if (!dev->ioch_array) {
		SPDK_ERRLOG("Failed to allocate IO channel array\n");
		return -1;
	}

	dev->iov_buf = calloc(FTL_BATCH_COUNT, dev->xfer_size * sizeof(struct iovec));
	if (!dev->iov_buf) {
		SPDK_ERRLOG("Failed to allocate iovec buffer\n");
		return -1;
	}

	TAILQ_INIT(&dev->free_batches);
	TAILQ_INIT(&dev->ioch_queue);

	for (i = 0; i < FTL_BATCH_COUNT; ++i) {
		batch = &dev->batch_array[i];
		batch->iov = &dev->iov_buf[i * dev->xfer_size];
		batch->num_entries = 0;
		batch->index = i;
		TAILQ_INIT(&batch->entries);
		TAILQ_INSERT_TAIL(&dev->free_batches, batch, tailq);
	}

	dev->num_io_channels = 0;

	spdk_io_device_register(dev, ftl_io_channel_create_cb, ftl_io_channel_destroy_cb,
@@ -1345,6 +1365,7 @@ ftl_dev_free_sync(struct spdk_ftl_dev *dev)

	assert(dev->num_io_channels == 0);
	free(dev->ioch_array);
	free(dev->iov_buf);
	free(dev->name);
	free(dev->bands);
	free(dev->l2p);
+1 −0
Original line number Diff line number Diff line
@@ -154,6 +154,7 @@ struct ftl_wbuf_entry {
	bool					valid;
	/* Lock that protects the entry from being evicted from the L2P */
	pthread_spinlock_t			lock;
	TAILQ_ENTRY(ftl_wbuf_entry)		tailq;
};

struct ftl_io_channel {
+129 −8
Original line number Diff line number Diff line
@@ -94,7 +94,7 @@ channel_destroy_cb(void *io_device, void *ctx)
{}

static struct spdk_ftl_dev *
setup_device(uint32_t num_threads)
setup_device(uint32_t num_threads, uint32_t xfer_size)
{
	struct spdk_ftl_dev *dev;
	struct _ftl_io_channel *_ioch;
@@ -121,6 +121,7 @@ setup_device(uint32_t num_threads)
	SPDK_CU_ASSERT_FATAL(ioch->io_pool != NULL);

	dev->conf = g_default_conf;
	dev->xfer_size = xfer_size;
	dev->base_bdev_desc = (struct spdk_bdev_desc *)0xdeadbeef;
	spdk_io_device_register(dev->base_bdev_desc, channel_create_cb, channel_destroy_cb, 0, NULL);

@@ -144,6 +145,7 @@ free_device(struct spdk_ftl_dev *dev)
	free_threads();

	free(dev->ioch_array);
	free(dev->iov_buf);
	free(dev->ioch);
	free(dev);
}
@@ -183,7 +185,7 @@ test_completion(void)
	int req, status = 0;
	size_t pool_size;

	dev = setup_device(1);
	dev = setup_device(1, 16);
	ioch = ftl_io_channel_get_ctx(dev->ioch);
	pool_size = spdk_mempool_count(ioch->io_pool);

@@ -225,7 +227,7 @@ test_alloc_free(void)
	int parent_status = -1;
	size_t pool_size;

	dev = setup_device(1);
	dev = setup_device(1, 16);
	ioch = ftl_io_channel_get_ctx(dev->ioch);
	pool_size = spdk_mempool_count(ioch->io_pool);

@@ -271,7 +273,7 @@ test_child_requests(void)
	int status[MAX_CHILDREN + 1], i;
	size_t pool_size;

	dev = setup_device(1);
	dev = setup_device(1, 16);
	ioch = ftl_io_channel_get_ctx(dev->ioch);
	pool_size = spdk_mempool_count(ioch->io_pool);

@@ -371,7 +373,7 @@ test_child_status(void)
	int parent_status, child_status[2];
	size_t pool_size, i;

	dev = setup_device(1);
	dev = setup_device(1, 16);
	ioch = ftl_io_channel_get_ctx(dev->ioch);
	pool_size = spdk_mempool_count(ioch->io_pool);

@@ -458,7 +460,7 @@ test_multi_generation(void)
	size_t pool_size;
	int i, j;

	dev = setup_device(1);
	dev = setup_device(1, 16);
	ioch = ftl_io_channel_get_ctx(dev->ioch);
	pool_size = spdk_mempool_count(ioch->io_pool);

@@ -583,7 +585,7 @@ test_io_channel_create(void)
	struct ftl_io_channel *ftl_ioch;
	uint32_t ioch_idx;

	dev = setup_device(g_default_conf.max_io_channels + 1);
	dev = setup_device(g_default_conf.max_io_channels + 1, 16);

	ioch = spdk_get_io_channel(dev);
	CU_ASSERT(ioch != NULL);
@@ -656,7 +658,7 @@ test_acquire_entry(void)
	uint32_t num_entries, num_io_channels = 2;
	uint32_t ioch_idx, entry_idx, tmp_idx;

	dev = setup_device(num_io_channels);
	dev = setup_device(num_io_channels, 16);

	num_entries = dev->conf.rwb_size / FTL_BLOCK_SIZE;
	entries = calloc(num_entries * num_io_channels, sizeof(*entries));
@@ -798,6 +800,123 @@ test_acquire_entry(void)
	free_device(dev);
}

static void
test_submit_batch(void)
{
	struct spdk_ftl_dev *dev;
	struct spdk_io_channel **_ioch_array;
	struct ftl_io_channel **ioch_array;
	struct ftl_wbuf_entry *entry;
	struct ftl_batch *batch;
	uint32_t num_io_channels = 16;
	uint32_t ioch_idx, tmp_idx, entry_idx;
	uint64_t ioch_bitmap;
	size_t num_entries;

	dev = setup_device(num_io_channels, num_io_channels);

	_ioch_array = calloc(num_io_channels, sizeof(*_ioch_array));
	SPDK_CU_ASSERT_FATAL(_ioch_array != NULL);
	ioch_array = calloc(num_io_channels, sizeof(*ioch_array));
	SPDK_CU_ASSERT_FATAL(ioch_array != NULL);

	for (ioch_idx = 0; ioch_idx < num_io_channels; ++ioch_idx) {
		set_thread(ioch_idx);
		_ioch_array[ioch_idx] = spdk_get_io_channel(dev);
		SPDK_CU_ASSERT_FATAL(_ioch_array[ioch_idx] != NULL);
		ioch_array[ioch_idx] = ftl_io_channel_get_ctx(_ioch_array[ioch_idx]);
		poll_threads();
	}

	/* Make sure the IO channels are not starved and entries are popped in RR fashion */
	for (ioch_idx = 0; ioch_idx < num_io_channels; ++ioch_idx) {
		set_thread(ioch_idx);

		for (entry_idx = 0; entry_idx < dev->xfer_size; ++entry_idx) {
			entry = ftl_acquire_wbuf_entry(ioch_array[ioch_idx], 0);
			SPDK_CU_ASSERT_FATAL(entry != NULL);

			num_entries = spdk_ring_enqueue(ioch_array[ioch_idx]->submit_queue,
							(void **)&entry, 1, NULL);
			CU_ASSERT(num_entries == 1);
		}
	}

	for (ioch_idx = 0; ioch_idx < num_io_channels; ++ioch_idx) {
		for (tmp_idx = 0; tmp_idx < ioch_idx; ++tmp_idx) {
			set_thread(tmp_idx);

			while (spdk_ring_count(ioch_array[tmp_idx]->submit_queue) < dev->xfer_size) {
				entry = ftl_acquire_wbuf_entry(ioch_array[tmp_idx], 0);
				SPDK_CU_ASSERT_FATAL(entry != NULL);

				num_entries = spdk_ring_enqueue(ioch_array[tmp_idx]->submit_queue,
								(void **)&entry, 1, NULL);
				CU_ASSERT(num_entries == 1);
			}
		}

		set_thread(ioch_idx);

		batch = ftl_get_next_batch(dev);
		SPDK_CU_ASSERT_FATAL(batch != NULL);

		TAILQ_FOREACH(entry, &batch->entries, tailq) {
			CU_ASSERT(entry->ioch == ioch_array[ioch_idx]);
		}

		ftl_release_batch(dev, batch);

		CU_ASSERT(spdk_ring_count(ioch_array[ioch_idx]->free_queue) ==
			  ioch_array[ioch_idx]->num_entries);
	}

	for (ioch_idx = 0; ioch_idx < num_io_channels - 1; ++ioch_idx) {
		batch = ftl_get_next_batch(dev);
		SPDK_CU_ASSERT_FATAL(batch != NULL);
		ftl_release_batch(dev, batch);
	}

	/* Make sure the batch can be built from entries from any IO channel */
	for (ioch_idx = 0; ioch_idx < num_io_channels; ++ioch_idx) {
		set_thread(ioch_idx);
		entry = ftl_acquire_wbuf_entry(ioch_array[ioch_idx], 0);
		SPDK_CU_ASSERT_FATAL(entry != NULL);

		num_entries = spdk_ring_enqueue(ioch_array[ioch_idx]->submit_queue,
						(void **)&entry, 1, NULL);
		CU_ASSERT(num_entries == 1);
	}

	batch = ftl_get_next_batch(dev);
	SPDK_CU_ASSERT_FATAL(batch != NULL);

	ioch_bitmap = 0;
	TAILQ_FOREACH(entry, &batch->entries, tailq) {
		ioch_bitmap |= 1 << entry->ioch->index;
	}

	for (ioch_idx = 0; ioch_idx < num_io_channels; ++ioch_idx) {
		CU_ASSERT((ioch_bitmap & (1 << ioch_array[ioch_idx]->index)) != 0);
	}
	ftl_release_batch(dev, batch);

	for (ioch_idx = 0; ioch_idx < num_io_channels; ++ioch_idx) {
		CU_ASSERT(spdk_ring_count(ioch_array[ioch_idx]->free_queue) ==
			  ioch_array[ioch_idx]->num_entries);
	}

	for (ioch_idx = 0; ioch_idx < num_io_channels; ++ioch_idx) {
		set_thread(ioch_idx);
		spdk_put_io_channel(_ioch_array[ioch_idx]);
	}
	poll_threads();

	free(_ioch_array);
	free(ioch_array);
	free_device(dev);
}

int
main(int argc, char **argv)
{
@@ -829,6 +948,8 @@ main(int argc, char **argv)
			       test_io_channel_create) == NULL
		|| CU_add_test(suite, "test_acquire_entry",
			       test_acquire_entry) == NULL
		|| CU_add_test(suite, "test_submit_batch",
			       test_submit_batch) == NULL

	) {
		CU_cleanup_registry();