Commit 36df38c0 authored by Konrad Sztyber's avatar Konrad Sztyber Committed by Tomasz Zawadzki
Browse files

thread: cache a number of iobuf buffers on each channel



Users can now specify a number of small/large buffers to be cached on
each iobuf channel.  Previously, we relied on the cache of the
underlying spdk_mempool, which has per-core caches. However, since iobuf
channels are tied to a module and an SPDK thread, each module and each
thread is now guaranteed to have a number of buffers available, so it
won't be starved by other modules/threads.

Signed-off-by: default avatarKonrad Sztyber <konrad.sztyber@intel.com>
Change-Id: I1e29fe29f78a13de371ab21d3e40bf55fbc9c639
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/15634


Reviewed-by: default avatarAleksey Marchuk <alexeymar@nvidia.com>
Reviewed-by: default avatarShuhei Matsumoto <smatsumoto@nvidia.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarBen Walker <benjamin.walker@intel.com>
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Community-CI: Mellanox Build Bot
parent 3aceb2da
Loading
Loading
Loading
Loading
+43 −8
Original line number Diff line number Diff line
@@ -16,8 +16,10 @@
#endif

#include "spdk/stdinc.h"
#include "spdk/assert.h"
#include "spdk/cpuset.h"
#include "spdk/env.h"
#include "spdk/util.h"

#ifdef __cplusplus
extern "C" {
@@ -973,11 +975,27 @@ struct spdk_iobuf_entry {
	STAILQ_ENTRY(spdk_iobuf_entry)	stailq;
};

#define SPDK_IOBUF_DATA_OFFSET SPDK_CACHE_LINE_SIZE

struct spdk_iobuf_buffer {
	STAILQ_ENTRY(spdk_iobuf_buffer)	stailq;
};

SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_buffer) <= SPDK_IOBUF_DATA_OFFSET,
		   "Invalid data offset");

typedef STAILQ_HEAD(, spdk_iobuf_entry) spdk_iobuf_entry_stailq_t;
typedef STAILQ_HEAD(, spdk_iobuf_buffer) spdk_iobuf_buffer_stailq_t;

struct spdk_iobuf_pool {
	/** Buffer pool */
	struct spdk_mempool		*pool;
	/** Buffer cache */
	spdk_iobuf_buffer_stailq_t	cache;
	/** Number of elements in the cache */
	uint32_t			cache_count;
	/** Size of the cache */
	uint32_t			cache_size;
	/** Buffer wait queue */
	spdk_iobuf_entry_stailq_t	*queue;
	/** Buffer size */
@@ -1105,7 +1123,7 @@ spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len,
	       struct spdk_iobuf_entry *entry, spdk_iobuf_get_cb cb_fn)
{
	struct spdk_iobuf_pool *pool;
	void *buf;
	struct spdk_iobuf_buffer *buf;

	assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread());
	if (len <= ch->small.bufsize) {
@@ -1115,14 +1133,23 @@ spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len,
		pool = &ch->large;
	}

	buf = spdk_mempool_get(pool->pool);
	buf = STAILQ_FIRST(&pool->cache);
	if (buf) {
		STAILQ_REMOVE_HEAD(&pool->cache, stailq);
		assert(pool->cache_count > 0);
		pool->cache_count--;
	} else {
		buf = (struct spdk_iobuf_buffer *)spdk_mempool_get(pool->pool);
		if (!buf) {
			STAILQ_INSERT_TAIL(pool->queue, entry, stailq);
			entry->module = ch->module;
			entry->cb_fn = cb_fn;

			return NULL;
		}
	}

	return buf;
	return (char *)buf + SPDK_IOBUF_DATA_OFFSET;
}

/**
@@ -1137,6 +1164,7 @@ static inline void
spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len)
{
	struct spdk_iobuf_entry *entry;
	struct spdk_iobuf_buffer *iobuf_buf;
	struct spdk_iobuf_pool *pool;

	assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread());
@@ -1147,7 +1175,14 @@ spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len)
	}

	if (STAILQ_EMPTY(pool->queue)) {
		spdk_mempool_put(pool->pool, buf);
		iobuf_buf = (struct spdk_iobuf_buffer *)((char *)buf - SPDK_IOBUF_DATA_OFFSET);

		if (pool->cache_count < pool->cache_size) {
			STAILQ_INSERT_HEAD(&pool->cache, iobuf_buf, stailq);
			pool->cache_count++;
		} else {
			spdk_mempool_put(pool->pool, iobuf_buf);
		}
	} else {
		entry = STAILQ_FIRST(pool->queue);
		STAILQ_REMOVE_HEAD(pool->queue, stailq);
+55 −14
Original line number Diff line number Diff line
@@ -2940,15 +2940,10 @@ int
spdk_iobuf_initialize(void)
{
	struct spdk_iobuf_opts *opts = &g_iobuf.opts;
	int cache_size, rc = 0;
	int rc = 0;

	/**
	 * Ensure no more than half of the total buffers end up local caches, by using
	 * spdk_env_get_core_count() to determine how many local caches we need to account for.
	 */
	cache_size = opts->small_pool_count / (2 * spdk_env_get_core_count());
	g_iobuf.small_pool = spdk_mempool_create("iobuf_small_pool", opts->small_pool_count,
			     opts->small_bufsize, cache_size,
			     opts->small_bufsize + SPDK_IOBUF_DATA_OFFSET, 0,
			     SPDK_ENV_SOCKET_ID_ANY);
	if (!g_iobuf.small_pool) {
		SPDK_ERRLOG("Failed to create small iobuf pool\n");
@@ -2956,9 +2951,8 @@ spdk_iobuf_initialize(void)
		goto error;
	}

	cache_size = opts->large_pool_count / (2 * spdk_env_get_core_count());
	g_iobuf.large_pool = spdk_mempool_create("iobuf_large_pool", opts->large_pool_count,
			     opts->large_bufsize, cache_size,
			     opts->large_bufsize + SPDK_IOBUF_DATA_OFFSET, 0,
			     SPDK_ENV_SOCKET_ID_ANY);
	if (!g_iobuf.large_pool) {
		SPDK_ERRLOG("Failed to create large iobuf pool\n");
@@ -3056,11 +3050,8 @@ spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
	struct spdk_io_channel *ioch;
	struct iobuf_channel *iobuf_ch;
	struct iobuf_module *module;

	if (small_cache_size != 0 || large_cache_size != 0) {
		SPDK_ERRLOG("iobuf cache is currently unsupported\n");
		return -EINVAL;
	}
	struct spdk_iobuf_buffer *buf;
	uint32_t i;

	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
		if (strcmp(name, module->name) == 0) {
@@ -3089,14 +3080,47 @@ spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
	ch->large.bufsize = g_iobuf.opts.large_bufsize;
	ch->parent = ioch;
	ch->module = module;
	ch->small.cache_size = small_cache_size;
	ch->large.cache_size = large_cache_size;
	ch->small.cache_count = 0;
	ch->large.cache_count = 0;

	STAILQ_INIT(&ch->small.cache);
	STAILQ_INIT(&ch->large.cache);

	for (i = 0; i < small_cache_size; ++i) {
		buf = spdk_mempool_get(g_iobuf.small_pool);
		if (buf == NULL) {
			SPDK_ERRLOG("Failed to populate iobuf small buffer cache. "
				    "You may need to increase spdk_iobuf_opts.small_pool_count\n");
			goto error;
		}
		STAILQ_INSERT_TAIL(&ch->small.cache, buf, stailq);
		ch->small.cache_count++;
	}
	for (i = 0; i < large_cache_size; ++i) {
		buf = spdk_mempool_get(g_iobuf.large_pool);
		if (buf == NULL) {
			SPDK_ERRLOG("Failed to populate iobuf large buffer cache. "
				    "You may need to increase spdk_iobuf_opts.large_pool_count\n");
			goto error;
		}
		STAILQ_INSERT_TAIL(&ch->large.cache, buf, stailq);
		ch->large.cache_count++;
	}

	return 0;
error:
	spdk_iobuf_channel_fini(ch);

	return -ENOMEM;
}

void
spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch)
{
	struct spdk_iobuf_entry *entry __attribute__((unused));
	struct spdk_iobuf_buffer *buf;

	/* Make sure none of the wait queue entries are coming from this module */
	STAILQ_FOREACH(entry, ch->small.queue, stailq) {
@@ -3106,6 +3130,23 @@ spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch)
		assert(entry->module != ch->module);
	}

	/* Release cached buffers back to the pool */
	while (!STAILQ_EMPTY(&ch->small.cache)) {
		buf = STAILQ_FIRST(&ch->small.cache);
		STAILQ_REMOVE_HEAD(&ch->small.cache, stailq);
		spdk_mempool_put(ch->small.pool, buf);
		ch->small.cache_count--;
	}
	while (!STAILQ_EMPTY(&ch->large.cache)) {
		buf = STAILQ_FIRST(&ch->large.cache);
		STAILQ_REMOVE_HEAD(&ch->large.cache, stailq);
		spdk_mempool_put(ch->large.pool, buf);
		ch->large.cache_count--;
	}

	assert(ch->small.cache_count == 0);
	assert(ch->large.cache_count == 0);

	spdk_put_io_channel(ch->parent);
	ch->parent = NULL;
}
+159 −0
Original line number Diff line number Diff line
@@ -2366,6 +2366,164 @@ iobuf(void)
	free_cores();
}

static void
iobuf_cache(void)
{
	struct spdk_iobuf_opts opts = {
		.small_pool_count = 4,
		.large_pool_count = 4,
		.small_bufsize = 2,
		.large_bufsize = 4,
	};
	struct spdk_iobuf_channel iobuf_ch[2];
	struct ut_iobuf_entry *entry;
	struct ut_iobuf_entry mod0_entries[] = {
		{ .thread_id = 0, .module = "ut_module0", },
		{ .thread_id = 0, .module = "ut_module0", },
		{ .thread_id = 0, .module = "ut_module0", },
		{ .thread_id = 0, .module = "ut_module0", },
	};
	struct ut_iobuf_entry mod1_entries[] = {
		{ .thread_id = 0, .module = "ut_module1", },
		{ .thread_id = 0, .module = "ut_module1", },
	};
	int rc, finish = 0;
	uint32_t i, j, bufsize;

	allocate_cores(1);
	allocate_threads(1);

	set_thread(0);

	/* We cannot use spdk_iobuf_set_opts(), as it won't allow us to use such small pools */
	g_iobuf.opts = opts;
	rc = spdk_iobuf_initialize();
	CU_ASSERT_EQUAL(rc, 0);

	rc = spdk_iobuf_register_module("ut_module0");
	CU_ASSERT_EQUAL(rc, 0);

	rc = spdk_iobuf_register_module("ut_module1");
	CU_ASSERT_EQUAL(rc, 0);

	/* First check that channel initialization fails when it's not possible to fill in the cache
	 * from the pool.
	 */
	rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 5, 1);
	CU_ASSERT_EQUAL(rc, -ENOMEM);
	rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 1, 5);
	CU_ASSERT_EQUAL(rc, -ENOMEM);

	rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 4, 4);
	CU_ASSERT_EQUAL(rc, 0);
	rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 4, 4);
	CU_ASSERT_EQUAL(rc, -ENOMEM);

	spdk_iobuf_channel_fini(&iobuf_ch[0]);
	poll_threads();

	/* Initialize one channel with cache, acquire buffers, and check that a second one can be
	 * created once the buffers acquired from the first one are returned to the pool
	 */
	rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 2, 2);
	CU_ASSERT_EQUAL(rc, 0);

	for (i = 0; i < 3; ++i) {
		mod0_entries[i].buf = spdk_iobuf_get(&iobuf_ch[0], 4, &mod0_entries[i].iobuf,
						     ut_iobuf_get_buf_cb);
		CU_ASSERT_PTR_NOT_NULL(mod0_entries[i].buf);
	}

	/* It should be able to create a channel with a single entry in the cache */
	rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 1);
	CU_ASSERT_EQUAL(rc, 0);
	spdk_iobuf_channel_fini(&iobuf_ch[1]);
	poll_threads();

	/* But not with two entries */
	rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 2);
	CU_ASSERT_EQUAL(rc, -ENOMEM);

	for (i = 0; i < 2; ++i) {
		spdk_iobuf_put(&iobuf_ch[0], mod0_entries[i].buf, 4);
		rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 2);
		CU_ASSERT_EQUAL(rc, -ENOMEM);
	}

	spdk_iobuf_put(&iobuf_ch[0], mod0_entries[2].buf, 4);

	/* The last buffer should be released back to the pool, so we should be able to create a new
	 * channel
	 */
	rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 2);
	CU_ASSERT_EQUAL(rc, 0);

	spdk_iobuf_channel_fini(&iobuf_ch[0]);
	spdk_iobuf_channel_fini(&iobuf_ch[1]);
	poll_threads();

	/* Check that the pool is only used when the cache is empty and that the cache guarantees a
	 * certain set of buffers
	 */
	rc = spdk_iobuf_channel_init(&iobuf_ch[0], "ut_module0", 2, 2);
	CU_ASSERT_EQUAL(rc, 0);
	rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 1, 1);
	CU_ASSERT_EQUAL(rc, 0);

	uint32_t buffer_sizes[] = { 2, 4 };
	for (i = 0; i < SPDK_COUNTOF(buffer_sizes); ++i) {
		bufsize = buffer_sizes[i];

		for (j = 0; j < 3; ++j) {
			entry = &mod0_entries[j];
			entry->buf = spdk_iobuf_get(&iobuf_ch[0], bufsize, &entry->iobuf,
						    ut_iobuf_get_buf_cb);
			CU_ASSERT_PTR_NOT_NULL(entry->buf);
		}

		mod1_entries[0].buf = spdk_iobuf_get(&iobuf_ch[1], bufsize, &mod1_entries[0].iobuf,
						     ut_iobuf_get_buf_cb);
		CU_ASSERT_PTR_NOT_NULL(mod1_entries[0].buf);

		/* The whole pool is exhausted now */
		mod1_entries[1].buf = spdk_iobuf_get(&iobuf_ch[1], bufsize, &mod1_entries[1].iobuf,
						     ut_iobuf_get_buf_cb);
		CU_ASSERT_PTR_NULL(mod1_entries[1].buf);
		mod0_entries[3].buf = spdk_iobuf_get(&iobuf_ch[0], bufsize, &mod0_entries[3].iobuf,
						     ut_iobuf_get_buf_cb);
		CU_ASSERT_PTR_NULL(mod0_entries[3].buf);

		/* If there are outstanding requests waiting for a buffer, they should have priority
		 * over filling in the cache, even if they're from different modules.
		 */
		spdk_iobuf_put(&iobuf_ch[0], mod0_entries[2].buf, bufsize);
		/* Also make sure the queue is FIFO and doesn't care about which module requested
		 * and which module released the buffer.
		 */
		CU_ASSERT_PTR_NOT_NULL(mod1_entries[1].buf);
		CU_ASSERT_PTR_NULL(mod0_entries[3].buf);

		/* Return the buffers back */
		spdk_iobuf_entry_abort(&iobuf_ch[0], &mod0_entries[3].iobuf, bufsize);
		for (j = 0; j < 2; ++j) {
			spdk_iobuf_put(&iobuf_ch[0], mod0_entries[j].buf, bufsize);
			spdk_iobuf_put(&iobuf_ch[1], mod1_entries[j].buf, bufsize);
		}
	}

	spdk_iobuf_channel_fini(&iobuf_ch[0]);
	spdk_iobuf_channel_fini(&iobuf_ch[1]);
	poll_threads();

	spdk_iobuf_finish(ut_iobuf_finish_cb, &finish);
	poll_threads();

	CU_ASSERT_EQUAL(finish, 1);

	free_threads();
	free_cores();
}

int
main(int argc, char **argv)
{
@@ -2396,6 +2554,7 @@ main(int argc, char **argv)
	CU_ADD_TEST(suite, io_device_lookup);
	CU_ADD_TEST(suite, spdk_spin);
	CU_ADD_TEST(suite, iobuf);
	CU_ADD_TEST(suite, iobuf_cache);

	CU_basic_set_mode(CU_BRM_VERBOSE);
	CU_basic_run_tests();