Commit 4b7fdec4 authored by Jim Harris's avatar Jim Harris
Browse files

thread: add struct iobuf_node



iobuf_node represents a pool of buffers allocated from a NUMA node's
memory. Currently only one NUMA node is supported, but this will be
extended in future patches to support multiple NUMA nodes when
configured.

Signed-off-by: default avatarJim Harris <jim.harris@samsung.com>
Change-Id: I7fdcf79aa57e555b3d35164c2097becfef6f8a71
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/24518


Reviewed-by: default avatarShuhei Matsumoto <smatsumoto@nvidia.com>
Community-CI: Mellanox Build Bot
Reviewed-by: default avatarAleksey Marchuk <alexeymar@nvidia.com>
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarKonrad Sztyber <konrad.sztyber@intel.com>
Reviewed-by: default avatarBen Walker <ben@nvidia.com>
Community-CI: Broadcom CI <spdk-ci.pdl@broadcom.com>
Community-CI: Community CI Samsung <spdk.community.ci.samsung@gmail.com>
parent eba178bf
Loading
Loading
Loading
Loading
+49 −44
Original line number Diff line number Diff line
@@ -40,11 +40,15 @@ struct iobuf_module {
	TAILQ_ENTRY(iobuf_module)	tailq;
};

struct iobuf {
struct iobuf_node {
	struct spdk_ring		*small_pool;
	struct spdk_ring		*large_pool;
	void				*small_pool_base;
	void				*large_pool_base;
};

struct iobuf {
	struct iobuf_node		node;
	struct spdk_iobuf_opts		opts;
	TAILQ_HEAD(, iobuf_module)	modules;
	spdk_iobuf_finish_cb		finish_cb;
@@ -53,10 +57,7 @@ struct iobuf {

static struct iobuf g_iobuf = {
	.modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules),
	.small_pool = NULL,
	.large_pool = NULL,
	.small_pool_base = NULL,
	.large_pool_base = NULL,
	.node = {},
	.opts = {
		.small_pool_count = IOBUF_DEFAULT_SMALL_POOL_SIZE,
		.large_pool_count = IOBUF_DEFAULT_LARGE_POOL_SIZE,
@@ -96,6 +97,7 @@ int
spdk_iobuf_initialize(void)
{
	struct spdk_iobuf_opts *opts = &g_iobuf.opts;
	struct iobuf_node *node = &g_iobuf.node;
	int rc = 0;
	uint64_t i;
	struct spdk_iobuf_buffer *buf;
@@ -104,46 +106,46 @@ spdk_iobuf_initialize(void)
	opts->small_bufsize = SPDK_ALIGN_CEIL(opts->small_bufsize, IOBUF_ALIGNMENT);
	opts->large_bufsize = SPDK_ALIGN_CEIL(opts->large_bufsize, IOBUF_ALIGNMENT);

	g_iobuf.small_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->small_pool_count,
	node->small_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->small_pool_count,
					    SPDK_ENV_NUMA_ID_ANY);
	if (!g_iobuf.small_pool) {
	if (!node->small_pool) {
		SPDK_ERRLOG("Failed to create small iobuf pool\n");
		rc = -ENOMEM;
		goto error;
	}

	g_iobuf.small_pool_base = spdk_malloc(opts->small_bufsize * opts->small_pool_count, IOBUF_ALIGNMENT,
	node->small_pool_base = spdk_malloc(opts->small_bufsize * opts->small_pool_count, IOBUF_ALIGNMENT,
					    NULL, SPDK_ENV_NUMA_ID_ANY, SPDK_MALLOC_DMA);
	if (g_iobuf.small_pool_base == NULL) {
	if (node->small_pool_base == NULL) {
		SPDK_ERRLOG("Unable to allocate requested small iobuf pool size\n");
		rc = -ENOMEM;
		goto error;
	}

	g_iobuf.large_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->large_pool_count,
	node->large_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->large_pool_count,
					    SPDK_ENV_NUMA_ID_ANY);
	if (!g_iobuf.large_pool) {
	if (!node->large_pool) {
		SPDK_ERRLOG("Failed to create large iobuf pool\n");
		rc = -ENOMEM;
		goto error;
	}

	g_iobuf.large_pool_base = spdk_malloc(opts->large_bufsize * opts->large_pool_count, IOBUF_ALIGNMENT,
	node->large_pool_base = spdk_malloc(opts->large_bufsize * opts->large_pool_count, IOBUF_ALIGNMENT,
					    NULL, SPDK_ENV_NUMA_ID_ANY, SPDK_MALLOC_DMA);
	if (g_iobuf.large_pool_base == NULL) {
	if (node->large_pool_base == NULL) {
		SPDK_ERRLOG("Unable to allocate requested large iobuf pool size\n");
		rc = -ENOMEM;
		goto error;
	}

	for (i = 0; i < opts->small_pool_count; i++) {
		buf = g_iobuf.small_pool_base + i * opts->small_bufsize;
		spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL);
		buf = node->small_pool_base + i * opts->small_bufsize;
		spdk_ring_enqueue(node->small_pool, (void **)&buf, 1, NULL);
	}

	for (i = 0; i < opts->large_pool_count; i++) {
		buf = g_iobuf.large_pool_base + i * opts->large_bufsize;
		spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL);
		buf = node->large_pool_base + i * opts->large_bufsize;
		spdk_ring_enqueue(node->large_pool, (void **)&buf, 1, NULL);
	}

	spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb,
@@ -152,10 +154,10 @@ spdk_iobuf_initialize(void)

	return 0;
error:
	spdk_free(g_iobuf.small_pool_base);
	spdk_ring_free(g_iobuf.small_pool);
	spdk_free(g_iobuf.large_pool_base);
	spdk_ring_free(g_iobuf.large_pool);
	spdk_free(node->small_pool_base);
	spdk_ring_free(node->small_pool);
	spdk_free(node->large_pool_base);
	spdk_ring_free(node->large_pool);

	return rc;
}
@@ -164,6 +166,7 @@ static void
iobuf_unregister_cb(void *io_device)
{
	struct iobuf_module *module;
	struct iobuf_node *node = &g_iobuf.node;

	while (!TAILQ_EMPTY(&g_iobuf.modules)) {
		module = TAILQ_FIRST(&g_iobuf.modules);
@@ -172,25 +175,25 @@ iobuf_unregister_cb(void *io_device)
		free(module);
	}

	if (spdk_ring_count(g_iobuf.small_pool) != g_iobuf.opts.small_pool_count) {
	if (spdk_ring_count(node->small_pool) != g_iobuf.opts.small_pool_count) {
		SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n",
			    spdk_ring_count(g_iobuf.small_pool), g_iobuf.opts.small_pool_count);
			    spdk_ring_count(node->small_pool), g_iobuf.opts.small_pool_count);
	}

	if (spdk_ring_count(g_iobuf.large_pool) != g_iobuf.opts.large_pool_count) {
	if (spdk_ring_count(node->large_pool) != g_iobuf.opts.large_pool_count) {
		SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n",
			    spdk_ring_count(g_iobuf.large_pool), g_iobuf.opts.large_pool_count);
			    spdk_ring_count(node->large_pool), g_iobuf.opts.large_pool_count);
	}

	spdk_free(g_iobuf.small_pool_base);
	g_iobuf.small_pool_base = NULL;
	spdk_ring_free(g_iobuf.small_pool);
	g_iobuf.small_pool = NULL;
	spdk_free(node->small_pool_base);
	node->small_pool_base = NULL;
	spdk_ring_free(node->small_pool);
	node->small_pool = NULL;

	spdk_free(g_iobuf.large_pool_base);
	g_iobuf.large_pool_base = NULL;
	spdk_ring_free(g_iobuf.large_pool);
	g_iobuf.large_pool = NULL;
	spdk_free(node->large_pool_base);
	node->large_pool_base = NULL;
	spdk_ring_free(node->large_pool);
	node->large_pool = NULL;

	if (g_iobuf.finish_cb != NULL) {
		g_iobuf.finish_cb(g_iobuf.finish_arg);
@@ -307,6 +310,7 @@ spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
	struct iobuf_module *module;
	struct spdk_iobuf_buffer *buf;
	struct spdk_iobuf_node_cache *cache;
	struct iobuf_node *node = &g_iobuf.node;
	uint32_t i;

	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
@@ -346,8 +350,8 @@ spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,

	cache->small.queue = &iobuf_ch->small_queue;
	cache->large.queue = &iobuf_ch->large_queue;
	cache->small.pool = g_iobuf.small_pool;
	cache->large.pool = g_iobuf.large_pool;
	cache->small.pool = node->small_pool;
	cache->large.pool = node->large_pool;
	cache->small.bufsize = g_iobuf.opts.small_bufsize;
	cache->large.bufsize = g_iobuf.opts.large_bufsize;
	cache->small.cache_size = small_cache_size;
@@ -359,7 +363,7 @@ spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
	STAILQ_INIT(&cache->large.cache);

	for (i = 0; i < small_cache_size; ++i) {
		if (spdk_ring_dequeue(g_iobuf.small_pool, (void **)&buf, 1) == 0) {
		if (spdk_ring_dequeue(node->small_pool, (void **)&buf, 1) == 0) {
			SPDK_ERRLOG("Failed to populate '%s' iobuf small buffer cache at %d/%d entries. "
				    "You may need to increase spdk_iobuf_opts.small_pool_count (%"PRIu64")\n",
				    name, i, small_cache_size, g_iobuf.opts.small_pool_count);
@@ -371,7 +375,7 @@ spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
		cache->small.cache_count++;
	}
	for (i = 0; i < large_cache_size; ++i) {
		if (spdk_ring_dequeue(g_iobuf.large_pool, (void **)&buf, 1) == 0) {
		if (spdk_ring_dequeue(node->large_pool, (void **)&buf, 1) == 0) {
			SPDK_ERRLOG("Failed to populate '%s' iobuf large buffer cache at %d/%d entries. "
				    "You may need to increase spdk_iobuf_opts.large_pool_count (%"PRIu64")\n",
				    name, i, large_cache_size, g_iobuf.opts.large_pool_count);
@@ -397,6 +401,7 @@ spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch)
	struct spdk_iobuf_buffer *buf;
	struct iobuf_channel *iobuf_ch;
	struct spdk_iobuf_node_cache *cache;
	struct iobuf_node *node = &g_iobuf.node;
	uint32_t i;

	cache = &ch->cache;
@@ -413,13 +418,13 @@ spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch)
	while (!STAILQ_EMPTY(&cache->small.cache)) {
		buf = STAILQ_FIRST(&cache->small.cache);
		STAILQ_REMOVE_HEAD(&cache->small.cache, stailq);
		spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL);
		spdk_ring_enqueue(node->small_pool, (void **)&buf, 1, NULL);
		cache->small.cache_count--;
	}
	while (!STAILQ_EMPTY(&cache->large.cache)) {
		buf = STAILQ_FIRST(&cache->large.cache);
		STAILQ_REMOVE_HEAD(&cache->large.cache, stailq);
		spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL);
		spdk_ring_enqueue(node->large_pool, (void **)&buf, 1, NULL);
		cache->large.cache_count--;
	}

@@ -529,7 +534,7 @@ spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *e

	cache = &ch->cache;

	if (len <= ch->cache.small.bufsize) {
	if (len <= cache->small.bufsize) {
		pool = &cache->small;
	} else {
		assert(len <= cache->large.bufsize);