Commit df107f80 authored by Ben Walker's avatar Ben Walker Committed by Jim Harris
Browse files

thread: iobuf no longer uses an spdk_mempool



Use an spdk_ring instead. This will allow for some later optimizations
and simplifications.

Change-Id: I8d256998e3cab7cf10ff8d5c7c7be48d4230b376
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/16278


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarAleksey Marchuk <alexeymar@nvidia.com>
Community-CI: Mellanox Build Bot
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
parent 0a638e9d
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -981,7 +981,7 @@ typedef STAILQ_HEAD(, spdk_iobuf_buffer) spdk_iobuf_buffer_stailq_t;

struct spdk_iobuf_pool {
	/** Buffer pool */
	struct spdk_mempool		*pool;
	struct spdk_ring		*pool;
	/** Buffer cache */
	spdk_iobuf_buffer_stailq_t	cache;
	/** Number of elements in the cache */
+64 −22
Original line number Diff line number Diff line
@@ -32,8 +32,10 @@ struct iobuf_module {
};

struct iobuf {
	struct spdk_mempool		*small_pool;
	struct spdk_mempool		*large_pool;
	struct spdk_ring		*small_pool;
	struct spdk_ring		*large_pool;
	void				*small_pool_base;
	void				*large_pool_base;
	struct spdk_iobuf_opts		opts;
	TAILQ_HEAD(, iobuf_module)	modules;
	spdk_iobuf_finish_cb		finish_cb;
@@ -42,6 +44,10 @@ struct iobuf {

static struct iobuf g_iobuf = {
	.modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules),
	.small_pool = NULL,
	.large_pool = NULL,
	.small_pool_base = NULL,
	.large_pool_base = NULL,
	.opts = {
		.small_pool_count = IOBUF_MIN_SMALL_POOL_SIZE,
		.large_pool_count = IOBUF_MIN_LARGE_POOL_SIZE,
@@ -75,29 +81,61 @@ spdk_iobuf_initialize(void)
{
	struct spdk_iobuf_opts *opts = &g_iobuf.opts;
	int rc = 0;
	uint64_t i;
	struct spdk_iobuf_buffer *buf;

	g_iobuf.small_pool = spdk_mempool_create("iobuf_small_pool", opts->small_pool_count,
			     opts->small_bufsize, 0, SPDK_ENV_SOCKET_ID_ANY);
	g_iobuf.small_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->small_pool_count,
					      SPDK_ENV_SOCKET_ID_ANY);
	if (!g_iobuf.small_pool) {
		SPDK_ERRLOG("Failed to create small iobuf pool\n");
		rc = -ENOMEM;
		goto error;
	}

	g_iobuf.large_pool = spdk_mempool_create("iobuf_large_pool", opts->large_pool_count,
			     opts->large_bufsize, 0, SPDK_ENV_SOCKET_ID_ANY);
	g_iobuf.small_pool_base = spdk_malloc(opts->small_bufsize * opts->small_pool_count, 0, NULL,
					      SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
	if (g_iobuf.small_pool_base == NULL) {
		SPDK_ERRLOG("Unable to allocate requested small iobuf pool size\n");
		rc = -ENOMEM;
		goto error;
	}

	g_iobuf.large_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->large_pool_count,
					      SPDK_ENV_SOCKET_ID_ANY);
	if (!g_iobuf.large_pool) {
		SPDK_ERRLOG("Failed to create large iobuf pool\n");
		rc = -ENOMEM;
		goto error;
	}

	g_iobuf.large_pool_base = spdk_malloc(opts->large_bufsize * opts->large_pool_count, 0, NULL,
					      SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
	if (g_iobuf.large_pool_base == NULL) {
		SPDK_ERRLOG("Unable to allocate requested large iobuf pool size\n");
		rc = -ENOMEM;
		goto error;
	}

	for (i = 0; i < opts->small_pool_count; i++) {
		buf = g_iobuf.small_pool_base + i * opts->small_bufsize;
		spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL);
	}

	for (i = 0; i < opts->large_pool_count; i++) {
		buf = g_iobuf.large_pool_base + i * opts->large_bufsize;
		spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL);
	}

	spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb,
				sizeof(struct iobuf_channel), "iobuf");

	return 0;
error:
	spdk_mempool_free(g_iobuf.small_pool);
	spdk_free(g_iobuf.small_pool_base);
	spdk_ring_free(g_iobuf.small_pool);
	spdk_free(g_iobuf.large_pool_base);
	spdk_ring_free(g_iobuf.large_pool);

	return rc;
}

@@ -113,18 +151,25 @@ iobuf_unregister_cb(void *io_device)
		free(module);
	}

	if (spdk_mempool_count(g_iobuf.small_pool) != g_iobuf.opts.small_pool_count) {
	if (spdk_ring_count(g_iobuf.small_pool) != g_iobuf.opts.small_pool_count) {
		SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n",
			    spdk_mempool_count(g_iobuf.small_pool), g_iobuf.opts.small_pool_count);
			    spdk_ring_count(g_iobuf.small_pool), g_iobuf.opts.small_pool_count);
	}

	if (spdk_mempool_count(g_iobuf.large_pool) != g_iobuf.opts.large_pool_count) {
	if (spdk_ring_count(g_iobuf.large_pool) != g_iobuf.opts.large_pool_count) {
		SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n",
			    spdk_mempool_count(g_iobuf.large_pool), g_iobuf.opts.large_pool_count);
			    spdk_ring_count(g_iobuf.large_pool), g_iobuf.opts.large_pool_count);
	}

	spdk_mempool_free(g_iobuf.small_pool);
	spdk_mempool_free(g_iobuf.large_pool);
	spdk_free(g_iobuf.small_pool_base);
	g_iobuf.small_pool_base = NULL;
	spdk_ring_free(g_iobuf.small_pool);
	g_iobuf.small_pool = NULL;

	spdk_free(g_iobuf.large_pool_base);
	g_iobuf.large_pool_base = NULL;
	spdk_ring_free(g_iobuf.large_pool);
	g_iobuf.large_pool = NULL;

	if (g_iobuf.finish_cb != NULL) {
		g_iobuf.finish_cb(g_iobuf.finish_arg);
@@ -221,8 +266,7 @@ spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
	STAILQ_INIT(&ch->large.cache);

	for (i = 0; i < small_cache_size; ++i) {
		buf = spdk_mempool_get(g_iobuf.small_pool);
		if (buf == NULL) {
		if (spdk_ring_dequeue(g_iobuf.small_pool, (void **)&buf, 1) == 0) {
			SPDK_ERRLOG("Failed to populate iobuf small buffer cache. "
				    "You may need to increase spdk_iobuf_opts.small_pool_count\n");
			goto error;
@@ -231,8 +275,7 @@ spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
		ch->small.cache_count++;
	}
	for (i = 0; i < large_cache_size; ++i) {
		buf = spdk_mempool_get(g_iobuf.large_pool);
		if (buf == NULL) {
		if (spdk_ring_dequeue(g_iobuf.large_pool, (void **)&buf, 1) == 0) {
			SPDK_ERRLOG("Failed to populate iobuf large buffer cache. "
				    "You may need to increase spdk_iobuf_opts.large_pool_count\n");
			goto error;
@@ -266,13 +309,13 @@ spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch)
	while (!STAILQ_EMPTY(&ch->small.cache)) {
		buf = STAILQ_FIRST(&ch->small.cache);
		STAILQ_REMOVE_HEAD(&ch->small.cache, stailq);
		spdk_mempool_put(ch->small.pool, buf);
		spdk_ring_enqueue(g_iobuf.small_pool, (void **)&buf, 1, NULL);
		ch->small.cache_count--;
	}
	while (!STAILQ_EMPTY(&ch->large.cache)) {
		buf = STAILQ_FIRST(&ch->large.cache);
		STAILQ_REMOVE_HEAD(&ch->large.cache, stailq);
		spdk_mempool_put(ch->large.pool, buf);
		spdk_ring_enqueue(g_iobuf.large_pool, (void **)&buf, 1, NULL);
		ch->large.cache_count--;
	}

@@ -369,8 +412,7 @@ spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len,
		assert(pool->cache_count > 0);
		pool->cache_count--;
	} else {
		buf = spdk_mempool_get(pool->pool);
		if (!buf) {
		if (spdk_ring_dequeue(pool->pool, (void **)&buf, 1) == 0) {
			STAILQ_INSERT_TAIL(pool->queue, entry, stailq);
			entry->module = ch->module;
			entry->cb_fn = cb_fn;
@@ -400,7 +442,7 @@ spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len)
			STAILQ_INSERT_HEAD(&pool->cache, (struct spdk_iobuf_buffer *)buf, stailq);
			pool->cache_count++;
		} else {
			spdk_mempool_put(pool->pool, buf);
			spdk_ring_enqueue(pool->pool, (void **)&buf, 1, NULL);
		}
	} else {
		entry = STAILQ_FIRST(pool->queue);