Commit f1e8c9ce authored by Ben Walker's avatar Ben Walker Committed by Jim Harris
Browse files

thread: iobuf now grabs from the central pool in batches



This is much more efficient, but it is a slight behavior change. See the
unit test change in this patch.

Change-Id: I5eaa49a5bf7a0131d59925806c47aaf03b7c207e
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/16280


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarAleksey Marchuk <alexeymar@nvidia.com>
Community-CI: Mellanox Build Bot
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
parent 825fce32
Loading
Loading
Loading
Loading
+44 −5
Original line number Diff line number Diff line
@@ -402,6 +402,8 @@ spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *e
	STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq);
}

#define IOBUF_BATCH_SIZE 32

void *
spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len,
	       struct spdk_iobuf_entry *entry, spdk_iobuf_get_cb cb_fn)
@@ -423,13 +425,27 @@ spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len,
		assert(pool->cache_count > 0);
		pool->cache_count--;
	} else {
		if (spdk_ring_dequeue(pool->pool, (void **)&buf, 1) == 0) {
		struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE];
		size_t sz, i;

		/* If we're going to dequeue, we may as well dequeue a batch. */
		sz = spdk_ring_dequeue(pool->pool, (void **)bufs, spdk_min(IOBUF_BATCH_SIZE,
				       spdk_max(pool->cache_size, 1)));
		if (sz == 0) {
			STAILQ_INSERT_TAIL(pool->queue, entry, stailq);
			entry->module = ch->module;
			entry->cb_fn = cb_fn;

			return NULL;
		}

		for (i = 0; i < (sz - 1); i++) {
			STAILQ_INSERT_HEAD(&pool->cache, bufs[i], stailq);
			pool->cache_count++;
		}

		/* The last one is the one we'll return */
		buf = bufs[i];
	}

	return (char *)buf;
@@ -439,7 +455,9 @@ void
spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len)
{
	struct spdk_iobuf_entry *entry;
	struct spdk_iobuf_buffer *iobuf_buf;
	struct spdk_iobuf_pool *pool;
	size_t sz;

	assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread());
	if (len <= ch->small.bufsize) {
@@ -449,11 +467,32 @@ spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len)
	}

	if (STAILQ_EMPTY(pool->queue)) {
		if (pool->cache_count < pool->cache_size) {
			STAILQ_INSERT_HEAD(&pool->cache, (struct spdk_iobuf_buffer *)buf, stailq);
			pool->cache_count++;
		} else {
		if (pool->cache_size == 0) {
			spdk_ring_enqueue(pool->pool, (void **)&buf, 1, NULL);
			return;
		}

		iobuf_buf = (struct spdk_iobuf_buffer *)buf;

		STAILQ_INSERT_HEAD(&pool->cache, iobuf_buf, stailq);
		pool->cache_count++;

		/* The cache size may exceed the configured amount. We always dequeue from the
		 * central pool in batches of known size, so wait until at least a batch
		 * has been returned to actually return the buffers to the central pool. */
		sz = spdk_min(IOBUF_BATCH_SIZE, pool->cache_size);
		if (pool->cache_count >= pool->cache_size + sz) {
			struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE];
			size_t i;

			for (i = 0; i < sz; i++) {
				bufs[i] = STAILQ_FIRST(&pool->cache);
				STAILQ_REMOVE_HEAD(&pool->cache, stailq);
				assert(pool->cache_count > 0);
				pool->cache_count--;
			}

			spdk_ring_enqueue(pool->pool, (void **)bufs, sz, NULL);
		}
	} else {
		entry = STAILQ_FIRST(pool->queue);
+4 −15
Original line number Diff line number Diff line
@@ -536,24 +536,13 @@ iobuf_cache(void)
		CU_ASSERT_PTR_NOT_NULL(mod0_entries[i].buf);
	}

	/* It should be able to create a channel with a single entry in the cache */
	rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 1);
	CU_ASSERT_EQUAL(rc, 0);
	spdk_iobuf_channel_fini(&iobuf_ch[1]);
	poll_threads();

	/* But not with two entries */
	rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 2);
	CU_ASSERT_EQUAL(rc, -ENOMEM);

	for (i = 0; i < 2; ++i) {
	/* The channels can be temporarily greedy, holding more buffers than their configured cache
	 * size. We can only guarantee that we can create a channel if all outstanding buffers
	 * have been returned. */
	for (i = 0; i < 3; ++i) {
		spdk_iobuf_put(&iobuf_ch[0], mod0_entries[i].buf, LARGE_BUFSIZE);
		rc = spdk_iobuf_channel_init(&iobuf_ch[1], "ut_module1", 2, 2);
		CU_ASSERT_EQUAL(rc, -ENOMEM);
	}

	spdk_iobuf_put(&iobuf_ch[0], mod0_entries[2].buf, LARGE_BUFSIZE);

	/* The last buffer should be released back to the pool, so we should be able to create a new
	 * channel
	 */