Commit eba178bf authored by Jim Harris's avatar Jim Harris
Browse files

thread: add spdk_iobuf_node_cache



We also rename spdk_iobuf_pool to spdk_iobuf_pool_cache.

This makes it more clear that the iobuf_channel has a cache of
buffers per NUMA node. Currently there is only one node, but
future changes will extend this to support multiple NUMA nodes
when configured.

Signed-off-by: default avatarJim Harris <jim.harris@samsung.com>
Change-Id: Id403a089a0de943bd3717e40aba156cbb2368cab
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/24517


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Community-CI: Broadcom CI <spdk-ci.pdl@broadcom.com>
Community-CI: Mellanox Build Bot
Community-CI: Community CI Samsung <spdk.community.ci.samsung@gmail.com>
Reviewed-by: default avatarBen Walker <ben@nvidia.com>
Reviewed-by: default avatarAleksey Marchuk <alexeymar@nvidia.com>
Reviewed-by: default avatarKonrad Sztyber <konrad.sztyber@intel.com>
parent f2017d2e
Loading
Loading
Loading
Loading
+10 −5
Original line number Diff line number Diff line
@@ -1084,7 +1084,7 @@ struct spdk_iobuf_buffer {
typedef STAILQ_HEAD(, spdk_iobuf_entry) spdk_iobuf_entry_stailq_t;
typedef STAILQ_HEAD(, spdk_iobuf_buffer) spdk_iobuf_buffer_stailq_t;

struct spdk_iobuf_pool {
struct spdk_iobuf_pool_cache {
	/** Buffer pool */
	struct spdk_ring		*pool;
	/** Buffer cache */
@@ -1101,12 +1101,17 @@ struct spdk_iobuf_pool {
	struct spdk_iobuf_pool_stats	stats;
};

struct spdk_iobuf_node_cache {
	/** Small buffer memory pool cache */
	struct spdk_iobuf_pool_cache	small;
	/** Large buffer memory pool cache */
	struct spdk_iobuf_pool_cache	large;
};

/** iobuf channel */
struct spdk_iobuf_channel {
	/** Small buffer memory pool */
	struct spdk_iobuf_pool		small;
	/** Large buffer memory pool */
	struct spdk_iobuf_pool		large;
	/* Buffer cache */
	struct spdk_iobuf_node_cache	cache;
	/** Module pointer */
	const void			*module;
	/** Parent IO channel */
+1 −1
Original line number Diff line number Diff line
@@ -1821,7 +1821,7 @@ bdev_io_get_buf(struct spdk_bdev_io *bdev_io, uint64_t len)
	mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
	max_len = bdev_io_get_max_buf_len(bdev_io, len);

	if (spdk_unlikely(max_len > mgmt_ch->iobuf.large.bufsize)) {
	if (spdk_unlikely(max_len > mgmt_ch->iobuf.cache.large.bufsize)) {
		SPDK_ERRLOG("Length %" PRIu64 " is larger than allowed\n", max_len);
		bdev_io_get_buf_complete(bdev_io, false);
		return;
+2 −2
Original line number Diff line number Diff line
@@ -6,8 +6,8 @@
SPDK_ROOT_DIR := $(abspath $(CURDIR)/../..)
include $(SPDK_ROOT_DIR)/mk/spdk.common.mk

SO_VER := 19
SO_MINOR := 1
SO_VER := 20
SO_MINOR := 0

C_SRCS = ctrlr.c ctrlr_discovery.c ctrlr_bdev.c \
	 subsystem.c nvmf.c nvmf_rpc.c transport.c tcp.c \
+75 −52
Original line number Diff line number Diff line
@@ -306,6 +306,7 @@ spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
	struct iobuf_channel *iobuf_ch;
	struct iobuf_module *module;
	struct spdk_iobuf_buffer *buf;
	struct spdk_iobuf_node_cache *cache;
	uint32_t i;

	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
@@ -339,21 +340,23 @@ spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
		goto error;
	}

	ch->small.queue = &iobuf_ch->small_queue;
	ch->large.queue = &iobuf_ch->large_queue;
	ch->small.pool = g_iobuf.small_pool;
	ch->large.pool = g_iobuf.large_pool;
	ch->small.bufsize = g_iobuf.opts.small_bufsize;
	ch->large.bufsize = g_iobuf.opts.large_bufsize;
	ch->parent = ioch;
	ch->module = module;
	ch->small.cache_size = small_cache_size;
	ch->large.cache_size = large_cache_size;
	ch->small.cache_count = 0;
	ch->large.cache_count = 0;

	STAILQ_INIT(&ch->small.cache);
	STAILQ_INIT(&ch->large.cache);
	cache = &ch->cache;

	cache->small.queue = &iobuf_ch->small_queue;
	cache->large.queue = &iobuf_ch->large_queue;
	cache->small.pool = g_iobuf.small_pool;
	cache->large.pool = g_iobuf.large_pool;
	cache->small.bufsize = g_iobuf.opts.small_bufsize;
	cache->large.bufsize = g_iobuf.opts.large_bufsize;
	cache->small.cache_size = small_cache_size;
	cache->large.cache_size = large_cache_size;
	cache->small.cache_count = 0;
	cache->large.cache_count = 0;

	STAILQ_INIT(&cache->small.cache);
	STAILQ_INIT(&cache->large.cache);

	for (i = 0; i < small_cache_size; ++i) {
		if (spdk_ring_dequeue(g_iobuf.small_pool, (void **)&buf, 1) == 0) {
@@ -364,8 +367,8 @@ spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
				    "this value.\n");
			goto error;
		}
		STAILQ_INSERT_TAIL(&ch->small.cache, buf, stailq);
		ch->small.cache_count++;
		STAILQ_INSERT_TAIL(&cache->small.cache, buf, stailq);
		cache->small.cache_count++;
	}
	for (i = 0; i < large_cache_size; ++i) {
		if (spdk_ring_dequeue(g_iobuf.large_pool, (void **)&buf, 1) == 0) {
@@ -376,8 +379,8 @@ spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
				    "this value.\n");
			goto error;
		}
		STAILQ_INSERT_TAIL(&ch->large.cache, buf, stailq);
		ch->large.cache_count++;
		STAILQ_INSERT_TAIL(&cache->large.cache, buf, stailq);
		cache->large.cache_count++;
	}

	return 0;
@@ -393,32 +396,35 @@ spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch)
	struct spdk_iobuf_entry *entry __attribute__((unused));
	struct spdk_iobuf_buffer *buf;
	struct iobuf_channel *iobuf_ch;
	struct spdk_iobuf_node_cache *cache;
	uint32_t i;

	cache = &ch->cache;

	/* Make sure none of the wait queue entries are coming from this module */
	STAILQ_FOREACH(entry, ch->small.queue, stailq) {
	STAILQ_FOREACH(entry, cache->small.queue, stailq) {
		assert(entry->module != ch->module);
	}
	STAILQ_FOREACH(entry, ch->large.queue, stailq) {
	STAILQ_FOREACH(entry, cache->large.queue, stailq) {
		assert(entry->module != ch->module);
	}

	/* Release cached buffers back to the pool */
	while (!STAILQ_EMPTY(&ch->small.cache)) {
		buf = STAILQ_FIRST(&ch->small.cache);
		STAILQ_REMOVE_HEAD(&ch->small.cache, stailq);
	while (!STAILQ_EMPTY(&cache->small.cache)) {
		buf = STAILQ_FIRST(&cache->small.cache);
		STAILQ_REMOVE_HEAD(&cache->small.cache, stailq);
		spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL);
		ch->small.cache_count--;
		cache->small.cache_count--;
	}
	while (!STAILQ_EMPTY(&ch->large.cache)) {
		buf = STAILQ_FIRST(&ch->large.cache);
		STAILQ_REMOVE_HEAD(&ch->large.cache, stailq);
	while (!STAILQ_EMPTY(&cache->large.cache)) {
		buf = STAILQ_FIRST(&cache->large.cache);
		STAILQ_REMOVE_HEAD(&cache->large.cache, stailq);
		spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL);
		ch->large.cache_count--;
		cache->large.cache_count--;
	}

	assert(ch->small.cache_count == 0);
	assert(ch->large.cache_count == 0);
	assert(cache->small.cache_count == 0);
	assert(cache->large.cache_count == 0);

	iobuf_ch = spdk_io_channel_get_ctx(ch->parent);
	for (i = 0; i < IOBUF_MAX_CHANNELS; ++i) {
@@ -477,7 +483,7 @@ spdk_iobuf_unregister_module(const char *name)
}

static int
iobuf_pool_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool *pool,
iobuf_pool_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool_cache *pool,
			  spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx)
{
	struct spdk_iobuf_entry *entry, *tmp;
@@ -502,26 +508,32 @@ int
spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch,
			  spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx)
{
	struct spdk_iobuf_node_cache *cache;
	int rc;

	rc = iobuf_pool_for_each_entry(ch, &ch->small, cb_fn, cb_ctx);
	cache = &ch->cache;

	rc = iobuf_pool_for_each_entry(ch, &cache->small, cb_fn, cb_ctx);
	if (rc != 0) {
		return rc;
	}
	return iobuf_pool_for_each_entry(ch, &ch->large, cb_fn, cb_ctx);
	return iobuf_pool_for_each_entry(ch, &cache->large, cb_fn, cb_ctx);
}

void
spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry,
		       uint64_t len)
{
	struct spdk_iobuf_pool *pool;
	struct spdk_iobuf_node_cache *cache;
	struct spdk_iobuf_pool_cache *pool;

	cache = &ch->cache;

	if (len <= ch->small.bufsize) {
		pool = &ch->small;
	if (len <= ch->cache.small.bufsize) {
		pool = &cache->small;
	} else {
		assert(len <= ch->large.bufsize);
		pool = &ch->large;
		assert(len <= cache->large.bufsize);
		pool = &cache->large;
	}

	STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq);
@@ -533,15 +545,18 @@ void *
spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len,
	       struct spdk_iobuf_entry *entry, spdk_iobuf_get_cb cb_fn)
{
	struct spdk_iobuf_pool *pool;
	struct spdk_iobuf_node_cache *cache;
	struct spdk_iobuf_pool_cache *pool;
	void *buf;

	cache = &ch->cache;

	assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread());
	if (len <= ch->small.bufsize) {
		pool = &ch->small;
	if (len <= cache->small.bufsize) {
		pool = &cache->small;
	} else {
		assert(len <= ch->large.bufsize);
		pool = &ch->large;
		assert(len <= cache->large.bufsize);
		pool = &cache->large;
	}

	buf = (void *)STAILQ_FIRST(&pool->cache);
@@ -586,14 +601,17 @@ spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len)
{
	struct spdk_iobuf_entry *entry;
	struct spdk_iobuf_buffer *iobuf_buf;
	struct spdk_iobuf_pool *pool;
	struct spdk_iobuf_node_cache *cache;
	struct spdk_iobuf_pool_cache *pool;
	size_t sz;

	cache = &ch->cache;

	assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread());
	if (len <= ch->small.bufsize) {
		pool = &ch->small;
	if (len <= cache->small.bufsize) {
		pool = &cache->small;
	} else {
		pool = &ch->large;
		pool = &cache->large;
	}

	if (STAILQ_EMPTY(pool->queue)) {
@@ -666,12 +684,17 @@ iobuf_get_channel_stats(struct spdk_io_channel_iter *iter)
			it = &ctx->modules[i];
			module = (struct iobuf_module *)channel->module;
			if (strcmp(it->module, module->name) == 0) {
				it->small_pool.cache += channel->small.stats.cache;
				it->small_pool.main += channel->small.stats.main;
				it->small_pool.retry += channel->small.stats.retry;
				it->large_pool.cache += channel->large.stats.cache;
				it->large_pool.main += channel->large.stats.main;
				it->large_pool.retry += channel->large.stats.retry;
				struct spdk_iobuf_pool_cache *cache;

				cache = &channel->cache.small;
				it->small_pool.cache += cache->stats.cache;
				it->small_pool.main += cache->stats.main;
				it->small_pool.retry += cache->stats.retry;

				cache = &channel->cache.large;
				it->large_pool.cache += cache->stats.cache;
				it->large_pool.main += cache->stats.main;
				it->large_pool.retry += cache->stats.retry;
				break;
			}
		}
+20 −10
Original line number Diff line number Diff line
@@ -54,11 +54,15 @@ int
spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
			uint32_t small_cache_size, uint32_t large_cache_size)
{
	struct spdk_iobuf_node_cache *cache;

	cache = &ch->cache;

	STAILQ_INIT(&g_iobuf_entries);
	ch->small.cache_count = small_cache_size;
	ch->small.cache_size = small_cache_size;
	ch->large.cache_count = large_cache_size;
	ch->large.cache_size = large_cache_size;
	cache->small.cache_count = small_cache_size;
	cache->small.cache_size = small_cache_size;
	cache->large.cache_count = large_cache_size;
	cache->large.cache_size = large_cache_size;
	return 0;
}

@@ -67,17 +71,20 @@ void *
spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len,
	       struct spdk_iobuf_entry *entry, spdk_iobuf_get_cb cb_fn)
{
	struct spdk_iobuf_pool *pool;
	struct spdk_iobuf_node_cache *cache;
	struct spdk_iobuf_pool_cache *pool;
	uint32_t *count;
	void *buf;

	HANDLE_RETURN_MOCK(spdk_iobuf_get);

	cache = &ch->cache;

	if (len > g_iobuf.opts.small_bufsize) {
		pool = &ch->large;
		pool = &cache->large;
		count = &g_iobuf.large_pool_count;
	} else {
		pool = &ch->small;
		pool = &cache->small;
		count = &g_iobuf.small_pool_count;
	}

@@ -107,14 +114,17 @@ void
spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len)
{
	struct spdk_iobuf_entry *entry;
	struct spdk_iobuf_pool *pool;
	struct spdk_iobuf_node_cache *cache;
	struct spdk_iobuf_pool_cache *pool;
	uint32_t *count;

	cache = &ch->cache;

	if (len > g_iobuf.opts.small_bufsize) {
		pool = &ch->large;
		pool = &cache->large;
		count = &g_iobuf.large_pool_count;
	} else {
		pool = &ch->small;
		pool = &cache->small;
		count = &g_iobuf.small_pool_count;
	}

Loading