Commit b60ae8d6 authored by Jim Harris's avatar Jim Harris
Browse files

thread: create helper functions for iobuf_channel_init/free and abort



We break up the init process into two steps per node - init and populate.
init returns void because it cannot fail. This prepares for init'ing all
channel_nodes first, before we populate them. This will make it easier
to unwind when there is an allocation failure, because we can safely
iterate all of the nodes, since they have all been initialized, even if
they weren't populated.

Signed-off-by: default avatarJim Harris <jim.harris@samsung.com>
Change-Id: I9dc02a63aed8131316465f0441557864582468c7
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/24541


Community-CI: Mellanox Build Bot
Reviewed-by: default avatarKonrad Sztyber <konrad.sztyber@intel.com>
Community-CI: Community CI Samsung <spdk.community.ci.samsung@gmail.com>
Reviewed-by: default avatarAleksey Marchuk <alexeymar@nvidia.com>
Community-CI: Broadcom CI <spdk-ci.pdl@broadcom.com>
Reviewed-by: default avatarBen Walker <ben@nvidia.com>
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
parent 24d8985c
Loading
Loading
Loading
Loading
+94 −58
Original line number Diff line number Diff line
@@ -343,6 +343,66 @@ spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts, size_t opts_size)
	SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_opts) == 40, "Incorrect size");
}

static void
iobuf_channel_node_init(struct spdk_iobuf_channel *ch, struct iobuf_channel *iobuf_ch,
			uint32_t socket_id, uint32_t small_cache_size, uint32_t large_cache_size)
{
	struct iobuf_node *node = &g_iobuf.node[socket_id];
	struct spdk_iobuf_node_cache *cache = &ch->cache[socket_id];
	struct iobuf_channel_node *ch_node = &iobuf_ch->node[socket_id];

	cache->small.queue = &ch_node->small_queue;
	cache->large.queue = &ch_node->large_queue;
	cache->small.pool = node->small_pool;
	cache->large.pool = node->large_pool;
	cache->small.bufsize = g_iobuf.opts.small_bufsize;
	cache->large.bufsize = g_iobuf.opts.large_bufsize;
	cache->small.cache_size = small_cache_size;
	cache->large.cache_size = large_cache_size;
	cache->small.cache_count = 0;
	cache->large.cache_count = 0;

	STAILQ_INIT(&cache->small.cache);
	STAILQ_INIT(&cache->large.cache);
}

static int
iobuf_channel_node_populate(struct spdk_iobuf_channel *ch, const char *name, uint32_t socket_id)
{
	struct iobuf_node *node = &g_iobuf.node[socket_id];
	struct spdk_iobuf_node_cache *cache = &ch->cache[socket_id];
	uint32_t small_cache_size = cache->small.cache_size;
	uint32_t large_cache_size = cache->large.cache_size;
	struct spdk_iobuf_buffer *buf;
	uint32_t i;

	for (i = 0; i < small_cache_size; ++i) {
		if (spdk_ring_dequeue(node->small_pool, (void **)&buf, 1) == 0) {
			SPDK_ERRLOG("Failed to populate '%s' iobuf small buffer cache at %d/%d entries. "
				    "You may need to increase spdk_iobuf_opts.small_pool_count (%"PRIu64")\n",
				    name, i, small_cache_size, g_iobuf.opts.small_pool_count);
			SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate "
				    "this value.\n");
			return -ENOMEM;
		}
		STAILQ_INSERT_TAIL(&cache->small.cache, buf, stailq);
		cache->small.cache_count++;
	}
	for (i = 0; i < large_cache_size; ++i) {
		if (spdk_ring_dequeue(node->large_pool, (void **)&buf, 1) == 0) {
			SPDK_ERRLOG("Failed to populate '%s' iobuf large buffer cache at %d/%d entries. "
				    "You may need to increase spdk_iobuf_opts.large_pool_count (%"PRIu64")\n",
				    name, i, large_cache_size, g_iobuf.opts.large_pool_count);
			SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate "
				    "this value.\n");
			return -ENOMEM;
		}
		STAILQ_INSERT_TAIL(&cache->large.cache, buf, stailq);
		cache->large.cache_count++;
	}

	return 0;
}

int
spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
@@ -351,11 +411,8 @@ spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
	struct spdk_io_channel *ioch;
	struct iobuf_channel *iobuf_ch;
	struct iobuf_module *module;
	struct spdk_iobuf_buffer *buf;
	struct spdk_iobuf_node_cache *cache;
	struct iobuf_node *node = &g_iobuf.node[0];
	struct iobuf_channel_node *ch_node;
	uint32_t i;
	int rc;

	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
		if (strcmp(name, module->name) == 0) {
@@ -385,71 +442,32 @@ spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,

	if (i == IOBUF_MAX_CHANNELS) {
		SPDK_ERRLOG("Max number of iobuf channels (%" PRIu32 ") exceeded.\n", i);
		rc = -ENOMEM;
		goto error;
	}

	ch->parent = ioch;
	ch->module = module;
	cache = &ch->cache[0];
	ch_node = &iobuf_ch->node[0];

	cache->small.queue = &ch_node->small_queue;
	cache->large.queue = &ch_node->large_queue;
	cache->small.pool = node->small_pool;
	cache->large.pool = node->large_pool;
	cache->small.bufsize = g_iobuf.opts.small_bufsize;
	cache->large.bufsize = g_iobuf.opts.large_bufsize;
	cache->small.cache_size = small_cache_size;
	cache->large.cache_size = large_cache_size;
	cache->small.cache_count = 0;
	cache->large.cache_count = 0;

	STAILQ_INIT(&cache->small.cache);
	STAILQ_INIT(&cache->large.cache);

	for (i = 0; i < small_cache_size; ++i) {
		if (spdk_ring_dequeue(node->small_pool, (void **)&buf, 1) == 0) {
			SPDK_ERRLOG("Failed to populate '%s' iobuf small buffer cache at %d/%d entries. "
				    "You may need to increase spdk_iobuf_opts.small_pool_count (%"PRIu64")\n",
				    name, i, small_cache_size, g_iobuf.opts.small_pool_count);
			SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate "
				    "this value.\n");
			goto error;
		}
		STAILQ_INSERT_TAIL(&cache->small.cache, buf, stailq);
		cache->small.cache_count++;
	}
	for (i = 0; i < large_cache_size; ++i) {
		if (spdk_ring_dequeue(node->large_pool, (void **)&buf, 1) == 0) {
			SPDK_ERRLOG("Failed to populate '%s' iobuf large buffer cache at %d/%d entries. "
				    "You may need to increase spdk_iobuf_opts.large_pool_count (%"PRIu64")\n",
				    name, i, large_cache_size, g_iobuf.opts.large_pool_count);
			SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate "
				    "this value.\n");
	iobuf_channel_node_init(ch, iobuf_ch, 0, small_cache_size, large_cache_size);
	rc = iobuf_channel_node_populate(ch, name, 0);
	if (rc) {
		goto error;
	}
		STAILQ_INSERT_TAIL(&cache->large.cache, buf, stailq);
		cache->large.cache_count++;
	}

	return 0;
error:
	spdk_iobuf_channel_fini(ch);

	return -ENOMEM;
	return rc;
}

void
spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch)
static void
iobuf_channel_node_fini(struct spdk_iobuf_channel *ch, uint32_t socket_id)
{
	struct spdk_iobuf_node_cache *cache = &ch->cache[socket_id];
	struct iobuf_node *node = &g_iobuf.node[socket_id];
	struct spdk_iobuf_entry *entry __attribute__((unused));
	struct spdk_iobuf_buffer *buf;
	struct iobuf_channel *iobuf_ch;
	struct spdk_iobuf_node_cache *cache;
	struct iobuf_node *node = &g_iobuf.node[0];
	uint32_t i;

	cache = &ch->cache[0];

	/* Make sure none of the wait queue entries are coming from this module */
	STAILQ_FOREACH(entry, cache->small.queue, stailq) {
@@ -475,6 +493,15 @@ spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch)

	assert(cache->small.cache_count == 0);
	assert(cache->large.cache_count == 0);
}

void
spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch)
{
	struct iobuf_channel *iobuf_ch;
	uint32_t i;

	iobuf_channel_node_fini(ch, 0);

	iobuf_ch = spdk_io_channel_get_ctx(ch->parent);
	for (i = 0; i < IOBUF_MAX_CHANNELS; ++i) {
@@ -570,15 +597,15 @@ spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch,
	return iobuf_pool_for_each_entry(ch, &cache->large, cb_fn, cb_ctx);
}

void
spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry,
		       uint64_t len)
static bool
iobuf_entry_abort_node(struct spdk_iobuf_channel *ch, uint32_t socket_id,
		       struct spdk_iobuf_entry *entry, uint64_t len)
{
	struct spdk_iobuf_node_cache *cache;
	struct spdk_iobuf_pool_cache *pool;
	struct spdk_iobuf_entry *e;

	cache = &ch->cache[0];
	cache = &ch->cache[socket_id];

	if (len <= cache->small.bufsize) {
		pool = &cache->small;
@@ -590,9 +617,18 @@ spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *e
	STAILQ_FOREACH(e, pool->queue, stailq) {
		if (e == entry) {
			STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq);
			return;
			return true;
		}
	}

	return false;
}

void
spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry,
		       uint64_t len)
{
	iobuf_entry_abort_node(ch, 0, entry, len);
}

#define IOBUF_BATCH_SIZE 32