Commit 3aceb2da authored by Konrad Sztyber's avatar Konrad Sztyber Committed by Tomasz Zawadzki
Browse files

thread: introduce iobuf buffer pools



The idea behind "iobuf" is to have a single place for allocating data
buffers across different libraries.  That way, each library won't need
to allocate its own mempools, therefore decreasing the memory footprint
of the whole application.

There are two reasons for putting these kind of functions in the thread
library.  Firstly, the code is pretty small, so it doesn't make sense to
create a new library. Secondly, it relies on the IO channel abstraction,
so users will need to pull in the thread library anyway.

It's very much inspired by the way bdev layer handles data buffers (much
of the code was directly copied over).  There are two global mempools,
one for small and one for large buffers, and per-thread queues that hold
requests waiting for a buffer.  The main difference is that we also need
to track which module requested a buffer in order to allow users to
iterate over its pending requests.

The usage is fairly simple:

```
/* Embed spdk_iobuf_channel into an existing IO channel */
struct foo_channel {
	...
	struct spdk_iobuf_channel iobuf;
};

/* Embed spdk_iobuf_entry into objects that will request buffers */
struct foo_object {
	...
	struct spdk_iobuf_entry entry;
};

/* Register the module as iobuf user */
spdk_iobuf_register_module("foo");

/* Initialize iobuf channel in foo_channel's create cb */
spdk_iobuf_channel_init(&foo_channel->iobuf, "foo", 0, 0);

/* Finally, request a buffer... */
buf = spdk_iobuf_get(&foo_channel->iobuf, length,
		     &foo_objet.entry, buf_get_cb);

...

/* ...and release it */
spdk_iobuf_put(&foo_channel->iobuf, buf, length);

```

Signed-off-by: default avatarKonrad Sztyber <konrad.sztyber@intel.com>
Change-Id: Ifaa6934c03ed6587ddba972198e606921bd85008
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/15326


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Community-CI: Mellanox Build Bot
Reviewed-by: default avatarShuhei Matsumoto <smatsumoto@nvidia.com>
Reviewed-by: default avatarAleksey Marchuk <alexeymar@nvidia.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
parent 7019dbbd
Loading
Loading
Loading
Loading
+205 −0
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@

#include "spdk/stdinc.h"
#include "spdk/cpuset.h"
#include "spdk/env.h"

#ifdef __cplusplus
extern "C" {
@@ -950,6 +951,210 @@ void spdk_spin_unlock(struct spdk_spinlock *sspin);
 */
bool spdk_spin_held(struct spdk_spinlock *sspin);

struct spdk_iobuf_opts {
	/** Maximum number of small buffers */
	uint64_t small_pool_count;
	/** Maximum number of large buffers */
	uint64_t large_pool_count;
	/** Size of a single small buffer */
	uint32_t small_bufsize;
	/** Size of a single large buffer */
	uint32_t large_bufsize;
};

struct spdk_iobuf_entry;

typedef void (*spdk_iobuf_get_cb)(struct spdk_iobuf_entry *entry, void *buf);

/** iobuf queue entry */
struct spdk_iobuf_entry {
	spdk_iobuf_get_cb		cb_fn;
	const void			*module;
	STAILQ_ENTRY(spdk_iobuf_entry)	stailq;
};

typedef STAILQ_HEAD(, spdk_iobuf_entry) spdk_iobuf_entry_stailq_t;

struct spdk_iobuf_pool {
	/** Buffer pool */
	struct spdk_mempool		*pool;
	/** Buffer wait queue */
	spdk_iobuf_entry_stailq_t	*queue;
	/** Buffer size */
	uint32_t			bufsize;
};

/** iobuf channel */
struct spdk_iobuf_channel {
	/** Small buffer memory pool */
	struct spdk_iobuf_pool		small;
	/** Large buffer memory pool */
	struct spdk_iobuf_pool		large;
	/** Module pointer */
	const void			*module;
	/** Parent IO channel */
	struct spdk_io_channel		*parent;
};

/**
 * Initialize and allocate iobuf pools.
 *
 * \return 0 on success, negative errno otherwise.
 */
int spdk_iobuf_initialize(void);

typedef void (*spdk_iobuf_finish_cb)(void *cb_arg);

/**
 * Clean up and free iobuf pools.
 *
 * \param cb_fn Callback to be executed once the clean up is completed.
 * \param cb_arg Callback argument.
 */
void spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg);

/**
 * Set iobuf options.  These options will be used during `spdk_iobuf_initialize()`.
 *
 * \param opts Options describing the size of the pools to reserve.
 *
 * \return 0 on success, negative errno otherwise.
 */
int spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts);

/**
 * Get iobuf options.
 *
 * \param opts Options to fill in.
 */
void spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts);

/**
 * Register a module as an iobuf pool user.  Only registered users can request buffers from the
 * iobuf pool.
 *
 * \name Name of the module.
 *
 * \return 0 on success, negative errno otherwise.
 */
int spdk_iobuf_register_module(const char *name);

/**
 * Initialize an iobuf channel.
 *
 * \param ch iobuf channel to initialize.
 * \param name Name of the module registered via `spdk_iobuf_register_module()`.
 * \param small_cache_size Number of small buffers to be cached by this channel.
 * \param large_cache_size Number of large buffers to be cached by this channel.
 *
 * \return 0 on success, negative errno otherwise.
 */
int spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
			    uint32_t small_cache_size, uint32_t large_cache_size);

/**
 * Release resources tied to an iobuf channel.
 *
 * \param ch iobuf channel.
 */
void spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch);

typedef int (*spdk_iobuf_for_each_entry_fn)(struct spdk_iobuf_channel *ch,
		struct spdk_iobuf_entry *entry, void *ctx);

/**
 * Iterate over all entries on a given queue and execute a callback on those that were requested
 * using `ch`.  The iteration is stopped if the callback returns non-zero status.
 *
 * \param ch iobuf channel to iterate over.
 * \param pool Pool to iterate over (`small` or `large`).
 * \param cb_fn Callback to execute on each entry on the queue that was requested using `ch`.
 * \param cb_ctx Argument passed to `cb_fn`.
 *
 * \return status of the last callback.
 */
int spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool *pool,
			      spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx);

/**
 * Abort an outstanding request waiting for a buffer.
 *
 * \param ch iobuf channel on which the entry is waiting.
 * \param entry Entry to remove from the wait queue.
 * \param len Length of the requested buffer (must be the exact same value as specified in
 *            `spdk_iobuf_get()`.
 */
void spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry,
			    uint64_t len);

/**
 * Get a buffer from the iobuf pool.  If no buffers are available, the request is queued until a
 * buffer is released.
 *
 * \param ch iobuf channel.
 * \param len Length of the buffer to retrieve.  The user is responsible for making sure the length
 *            doesn't exceed large_bufsize.
 * \param entry Wait queue entry.
 * \param cb_fn Callback to be executed once a buffer becomes available.  If a buffer is available
 *              immediately, it is NOT be executed.
 *
 * \return pointer to a buffer or NULL if no buffers are currently available.
 */
static inline void *
spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len,
	       struct spdk_iobuf_entry *entry, spdk_iobuf_get_cb cb_fn)
{
	struct spdk_iobuf_pool *pool;
	void *buf;

	assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread());
	if (len <= ch->small.bufsize) {
		pool = &ch->small;
	} else {
		assert(len <= ch->large.bufsize);
		pool = &ch->large;
	}

	buf = spdk_mempool_get(pool->pool);
	if (!buf) {
		STAILQ_INSERT_TAIL(pool->queue, entry, stailq);
		entry->module = ch->module;
		entry->cb_fn = cb_fn;
	}

	return buf;
}

/**
 * Release a buffer back to the iobuf pool.  If there are outstanding requests waiting for a buffer,
 * this buffer will be passed to one of them.
 *
 * \param ch iobuf channel.
 * \param buf Buffer to release
 * \param len Length of the buffer (must be the exact same value as specified in `spdk_iobuf_get()`).
 */
static inline void
spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len)
{
	struct spdk_iobuf_entry *entry;
	struct spdk_iobuf_pool *pool;

	assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread());
	if (len <= ch->small.bufsize) {
		pool = &ch->small;
	} else {
		pool = &ch->large;
	}

	if (STAILQ_EMPTY(pool->queue)) {
		spdk_mempool_put(pool->pool, buf);
	} else {
		entry = STAILQ_FIRST(pool->queue);
		STAILQ_REMOVE_HEAD(pool->queue, stailq);
		entry->cb_fn(entry, buf);
	}
}

#ifdef __cplusplus
}
#endif
+9 −0
Original line number Diff line number Diff line
@@ -62,6 +62,15 @@
	spdk_spin_lock;
	spdk_spin_unlock;
	spdk_spin_held;
	spdk_iobuf_initialize;
	spdk_iobuf_finish;
	spdk_iobuf_set_opts;
	spdk_iobuf_get_opts;
	spdk_iobuf_channel_init;
	spdk_iobuf_channel_fini;
	spdk_iobuf_register_module;
	spdk_iobuf_for_each_entry;
	spdk_iobuf_entry_abort;

	# internal functions in spdk_internal/thread.h
	spdk_poller_get_name;
+296 −0
Original line number Diff line number Diff line
@@ -7,6 +7,7 @@
#include "spdk/stdinc.h"

#include "spdk/env.h"
#include "spdk/bdev.h"
#include "spdk/likely.h"
#include "spdk/queue.h"
#include "spdk/string.h"
@@ -32,6 +33,13 @@
#define SPDK_THREAD_EXIT_TIMEOUT_SEC	5
#define SPDK_MAX_POLLER_NAME_LEN	256
#define SPDK_MAX_THREAD_NAME_LEN	256
#define IOBUF_MIN_SMALL_POOL_SIZE	8191
#define IOBUF_MIN_LARGE_POOL_SIZE	1023
#define IOBUF_ALIGNMENT			512
#define IOBUF_MIN_SMALL_BUFSIZE		(SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + \
					 IOBUF_ALIGNMENT)
#define IOBUF_MIN_LARGE_BUFSIZE		(SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + \
					 IOBUF_ALIGNMENT)

static struct spdk_thread *g_app_thread;

@@ -230,6 +238,35 @@ struct io_device {
	bool				unregistered;
};

struct iobuf_channel {
	spdk_iobuf_entry_stailq_t small_queue;
	spdk_iobuf_entry_stailq_t large_queue;
};

struct iobuf_module {
	char				*name;
	TAILQ_ENTRY(iobuf_module)	tailq;
};

struct iobuf {
	struct spdk_mempool		*small_pool;
	struct spdk_mempool		*large_pool;
	struct spdk_iobuf_opts		opts;
	TAILQ_HEAD(, iobuf_module)	modules;
	spdk_iobuf_finish_cb		finish_cb;
	void				*finish_arg;
};

static struct iobuf g_iobuf = {
	.modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules),
	.opts = {
		.small_pool_count = IOBUF_MIN_SMALL_POOL_SIZE,
		.large_pool_count = IOBUF_MIN_LARGE_POOL_SIZE,
		.small_bufsize = IOBUF_MIN_SMALL_BUFSIZE,
		.large_bufsize = IOBUF_MIN_LARGE_BUFSIZE,
	},
};

static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices);

static int
@@ -2879,4 +2916,263 @@ spdk_spin_held(struct spdk_spinlock *sspin)
	return sspin->thread == thread;
}

static int
iobuf_channel_create_cb(void *io_device, void *ctx)
{
	struct iobuf_channel *ch = ctx;

	STAILQ_INIT(&ch->small_queue);
	STAILQ_INIT(&ch->large_queue);

	return 0;
}

static void
iobuf_channel_destroy_cb(void *io_device, void *ctx)
{
	struct iobuf_channel *ch __attribute__((unused)) = ctx;

	assert(STAILQ_EMPTY(&ch->small_queue));
	assert(STAILQ_EMPTY(&ch->large_queue));
}

int
spdk_iobuf_initialize(void)
{
	struct spdk_iobuf_opts *opts = &g_iobuf.opts;
	int cache_size, rc = 0;

	/**
	 * Ensure no more than half of the total buffers end up local caches, by using
	 * spdk_env_get_core_count() to determine how many local caches we need to account for.
	 */
	cache_size = opts->small_pool_count / (2 * spdk_env_get_core_count());
	g_iobuf.small_pool = spdk_mempool_create("iobuf_small_pool", opts->small_pool_count,
			     opts->small_bufsize, cache_size,
			     SPDK_ENV_SOCKET_ID_ANY);
	if (!g_iobuf.small_pool) {
		SPDK_ERRLOG("Failed to create small iobuf pool\n");
		rc = -ENOMEM;
		goto error;
	}

	cache_size = opts->large_pool_count / (2 * spdk_env_get_core_count());
	g_iobuf.large_pool = spdk_mempool_create("iobuf_large_pool", opts->large_pool_count,
			     opts->large_bufsize, cache_size,
			     SPDK_ENV_SOCKET_ID_ANY);
	if (!g_iobuf.large_pool) {
		SPDK_ERRLOG("Failed to create large iobuf pool\n");
		rc = -ENOMEM;
		goto error;
	}

	spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb,
				sizeof(struct iobuf_channel), "iobuf");

	return 0;
error:
	spdk_mempool_free(g_iobuf.small_pool);
	return rc;
}

static void
iobuf_unregister_cb(void *io_device)
{
	struct iobuf_module *module;

	while (!TAILQ_EMPTY(&g_iobuf.modules)) {
		module = TAILQ_FIRST(&g_iobuf.modules);
		TAILQ_REMOVE(&g_iobuf.modules, module, tailq);
		free(module->name);
		free(module);
	}

	if (spdk_mempool_count(g_iobuf.small_pool) != g_iobuf.opts.small_pool_count) {
		SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n",
			    spdk_mempool_count(g_iobuf.small_pool), g_iobuf.opts.small_pool_count);
	}

	if (spdk_mempool_count(g_iobuf.large_pool) != g_iobuf.opts.large_pool_count) {
		SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n",
			    spdk_mempool_count(g_iobuf.large_pool), g_iobuf.opts.large_pool_count);
	}

	spdk_mempool_free(g_iobuf.small_pool);
	spdk_mempool_free(g_iobuf.large_pool);

	if (g_iobuf.finish_cb != NULL) {
		g_iobuf.finish_cb(g_iobuf.finish_arg);
	}
}

void
spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg)
{
	g_iobuf.finish_cb = cb_fn;
	g_iobuf.finish_arg = cb_arg;

	spdk_io_device_unregister(&g_iobuf, iobuf_unregister_cb);
}

int
spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts)
{
	if (opts->small_pool_count < IOBUF_MIN_SMALL_POOL_SIZE) {
		SPDK_ERRLOG("small_pool_count must be at least %" PRIu32 "\n",
			    IOBUF_MIN_SMALL_POOL_SIZE);
		return -EINVAL;
	}
	if (opts->large_pool_count < IOBUF_MIN_LARGE_POOL_SIZE) {
		SPDK_ERRLOG("large_pool_count must be at least %" PRIu32 "\n",
			    IOBUF_MIN_LARGE_POOL_SIZE);
		return -EINVAL;
	}
	if (opts->small_bufsize < IOBUF_MIN_SMALL_BUFSIZE) {
		SPDK_ERRLOG("small_bufsize must be at least %" PRIu32 "\n",
			    IOBUF_MIN_SMALL_BUFSIZE);
		return -EINVAL;
	}
	if (opts->large_bufsize < IOBUF_MIN_LARGE_BUFSIZE) {
		SPDK_ERRLOG("large_bufsize must be at least %" PRIu32 "\n",
			    IOBUF_MIN_LARGE_BUFSIZE);
		return -EINVAL;
	}

	g_iobuf.opts = *opts;

	return 0;
}

void
spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts)
{
	*opts = g_iobuf.opts;
}

int
spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
			uint32_t small_cache_size, uint32_t large_cache_size)
{
	struct spdk_io_channel *ioch;
	struct iobuf_channel *iobuf_ch;
	struct iobuf_module *module;

	if (small_cache_size != 0 || large_cache_size != 0) {
		SPDK_ERRLOG("iobuf cache is currently unsupported\n");
		return -EINVAL;
	}

	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
		if (strcmp(name, module->name) == 0) {
			break;
		}
	}

	if (module == NULL) {
		SPDK_ERRLOG("Couldn't find iobuf module: '%s'\n", name);
		return -ENODEV;
	}

	ioch = spdk_get_io_channel(&g_iobuf);
	if (ioch == NULL) {
		SPDK_ERRLOG("Couldn't get iobuf IO channel\n");
		return -ENOMEM;
	}

	iobuf_ch = spdk_io_channel_get_ctx(ioch);

	ch->small.queue = &iobuf_ch->small_queue;
	ch->large.queue = &iobuf_ch->large_queue;
	ch->small.pool = g_iobuf.small_pool;
	ch->large.pool = g_iobuf.large_pool;
	ch->small.bufsize = g_iobuf.opts.small_bufsize;
	ch->large.bufsize = g_iobuf.opts.large_bufsize;
	ch->parent = ioch;
	ch->module = module;

	return 0;
}

void
spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch)
{
	struct spdk_iobuf_entry *entry __attribute__((unused));

	/* Make sure none of the wait queue entries are coming from this module */
	STAILQ_FOREACH(entry, ch->small.queue, stailq) {
		assert(entry->module != ch->module);
	}
	STAILQ_FOREACH(entry, ch->large.queue, stailq) {
		assert(entry->module != ch->module);
	}

	spdk_put_io_channel(ch->parent);
	ch->parent = NULL;
}

int
spdk_iobuf_register_module(const char *name)
{
	struct iobuf_module *module;

	TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
		if (strcmp(name, module->name) == 0) {
			return -EEXIST;
		}
	}

	module = calloc(1, sizeof(*module));
	if (module == NULL) {
		return -ENOMEM;
	}

	module->name = strdup(name);
	if (module->name == NULL) {
		free(module);
		return -ENOMEM;
	}

	TAILQ_INSERT_TAIL(&g_iobuf.modules, module, tailq);

	return 0;
}

int
spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool *pool,
			  spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx)
{
	struct spdk_iobuf_entry *entry, *tmp;
	int rc;

	STAILQ_FOREACH_SAFE(entry, pool->queue, stailq, tmp) {
		/* We only want to iterate over the entries requested by the module which owns ch */
		if (entry->module != ch->module) {
			continue;
		}

		rc = cb_fn(ch, entry, cb_ctx);
		if (rc != 0) {
			return rc;
		}
	}

	return 0;
}

void
spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry,
		       uint64_t len)
{
	struct spdk_iobuf_pool *pool;

	if (len <= ch->small.bufsize) {
		pool = &ch->small;
	} else {
		assert(len <= ch->large.bufsize);
		pool = &ch->large;
	}

	STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq);
}

SPDK_LOG_REGISTER_COMPONENT(thread)
+454 −0

File changed.

Preview size limit exceeded, changes collapsed.