Commit 2446c5c6 authored by Ben Walker's avatar Ben Walker Committed by Darek Stojaczyk
Browse files

thread: Keep caches of message objects on the thread object.



Change-Id: I2b34d0c44fb2c4d3ec5d9e4b3c22bfce53543ea1
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.gerrithub.io/c/442774


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
parent c8d956fd
Loading
Loading
Loading
Loading
+64 −7
Original line number Diff line number Diff line
@@ -77,8 +77,11 @@ static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_device
struct spdk_msg {
	spdk_msg_fn		fn;
	void			*arg;

	SLIST_ENTRY(spdk_msg)	link;
};

#define SPDK_MSG_MEMPOOL_CACHE_SIZE	1024
static struct spdk_mempool *g_spdk_msg_mempool = NULL;

enum spdk_poller_state {
@@ -123,6 +126,9 @@ struct spdk_thread {
	TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers;

	struct spdk_ring		*messages;

	SLIST_HEAD(, spdk_msg)		msg_cache;
	size_t				msg_cache_count;
};

static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
@@ -160,7 +166,7 @@ spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn)
	g_spdk_msg_mempool = spdk_mempool_create(mempool_name,
			     262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
			     sizeof(struct spdk_msg),
			     SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
			     0, /* No cache. We do our own. */
			     SPDK_ENV_SOCKET_ID_ANY);

	if (!g_spdk_msg_mempool) {
@@ -188,6 +194,8 @@ struct spdk_thread *
spdk_thread_create(const char *name)
{
	struct spdk_thread *thread;
	struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE];
	int rc, i;

	thread = calloc(1, sizeof(*thread));
	if (!thread) {
@@ -198,6 +206,8 @@ spdk_thread_create(const char *name)
	TAILQ_INIT(&thread->io_channels);
	TAILQ_INIT(&thread->active_pollers);
	TAILQ_INIT(&thread->timer_pollers);
	SLIST_INIT(&thread->msg_cache);
	thread->msg_cache_count = 0;

	thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
	if (!thread->messages) {
@@ -206,6 +216,17 @@ spdk_thread_create(const char *name)
		return NULL;
	}

	/* Fill the local message pool cache. */
	rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
	if (rc == 0) {
		/* If we can't populate the cache it's ok. The cache will get filled
		 * up organically as messages are passed to the thread. */
		for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
			SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
			thread->msg_cache_count++;
		}
	}

	if (name) {
		_set_thread_name(name);
		thread->name = strdup(name);
@@ -237,6 +258,7 @@ void
spdk_thread_exit(struct spdk_thread *thread)
{
	struct spdk_io_channel *ch;
	struct spdk_msg *msg;

	SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Freeing thread %s\n", thread->name);

@@ -257,6 +279,19 @@ spdk_thread_exit(struct spdk_thread *thread)

	free(thread->name);

	msg = SLIST_FIRST(&thread->msg_cache);
	while (msg != NULL) {
		SLIST_REMOVE_HEAD(&thread->msg_cache, link);

		assert(thread->msg_cache_count > 0);
		thread->msg_cache_count--;
		spdk_mempool_put(g_spdk_msg_mempool, msg);

		msg = SLIST_FIRST(&thread->msg_cache);
	}

	assert(thread->msg_cache_count == 0);

	if (thread->messages) {
		spdk_ring_free(thread->messages);
	}
@@ -295,9 +330,16 @@ _spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)

		assert(msg != NULL);
		msg->fn(msg->arg);
	}

	spdk_mempool_put_bulk(g_spdk_msg_mempool, messages, count);
		if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
			/* Insert the messages at the head. We want to re-use the hot
			 * ones. */
			SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
			thread->msg_cache_count++;
		} else {
			spdk_mempool_put(g_spdk_msg_mempool, msg);
		}
	}

	return count;
}
@@ -452,6 +494,7 @@ spdk_thread_get_name(const struct spdk_thread *thread)
void
spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
{
	struct spdk_thread *local_thread;
	struct spdk_msg *msg;
	int rc;

@@ -460,11 +503,25 @@ spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx
		return;
	}

	local_thread = _get_thread();

	msg = NULL;
	if (local_thread != NULL) {
		if (local_thread->msg_cache_count > 0) {
			msg = SLIST_FIRST(&local_thread->msg_cache);
			assert(msg != NULL);
			SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
			local_thread->msg_cache_count--;
		}
	}

	if (msg == NULL) {
		msg = spdk_mempool_get(g_spdk_msg_mempool);
		if (!msg) {
			assert(false);
			return;
		}
	}

	msg->fn = fn;
	msg->arg = ctx;