Commit f108bc4f authored by Konrad Sztyber's avatar Konrad Sztyber Committed by Tomasz Zawadzki
Browse files

iobuf: prioritize already waiting entries



If an iobuf entry was waiting for a buffer, requested another one in its
iobuf_get_cb, and got placed back onto the wait queue, it'll now be put
at the head of that queue.

This ensures that requests requiring multiple iobufs to execute will be
processed in the order they originally requested the buffers, which
should also ensure that we won't end up with a deadlock.

Signed-off-by: default avatarKonrad Sztyber <konrad.sztyber@intel.com>
Change-Id: I58d4a23bd5e73fc84d92a261f7a09c808e48992e
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/24237


Reviewed-by: default avatarJacek Kalwas <jacek.kalwas@intel.com>
Community-CI: Mellanox Build Bot
Reviewed-by: default avatarKrzysztof Goreczny <krzysztof.goreczny@dell.com>
Reviewed-by: default avatarTomasz Zawadzki <tomasz.zawadzki@intel.com>
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarJim Harris <jim.harris@samsung.com>
parent 188cdd86
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -615,6 +615,10 @@ spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len)
		entry = STAILQ_FIRST(pool->queue);
		STAILQ_REMOVE_HEAD(pool->queue, stailq);
		entry->cb_fn(entry, buf);
		if (spdk_unlikely(entry == STAILQ_LAST(pool->queue, spdk_iobuf_entry, stailq))) {
			STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq);
			STAILQ_INSERT_HEAD(pool->queue, entry, stailq);
		}
	}
}

+106 −0
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@ struct ut_iobuf_entry {
	struct spdk_iobuf_channel	*ioch;
	struct spdk_iobuf_entry		iobuf;
	void				*buf;
	void				*buf2;
	uint32_t			thread_id;
	const char			*module;
};
@@ -615,6 +616,110 @@ iobuf_cache(void)
	free_cores();
}

static void
ut_iobuf_get_buf2_cb(struct spdk_iobuf_entry *entry, void *buf)
{
	struct ut_iobuf_entry *ut_entry = SPDK_CONTAINEROF(entry, struct ut_iobuf_entry, iobuf);

	CU_ASSERT_PTR_NOT_NULL(ut_entry->buf);
	CU_ASSERT_PTR_NULL(ut_entry->buf2);

	ut_entry->buf2 = buf;
}

static void
ut_iobuf_get_buf1_cb(struct spdk_iobuf_entry *entry, void *buf)
{
	struct ut_iobuf_entry *ut_entry = SPDK_CONTAINEROF(entry, struct ut_iobuf_entry, iobuf);
	void *buf2;

	CU_ASSERT_PTR_NULL(ut_entry->buf);
	CU_ASSERT_PTR_NULL(ut_entry->buf2);
	ut_entry->buf = buf;

	buf2 = spdk_iobuf_get(ut_entry->ioch, SMALL_BUFSIZE, &ut_entry->iobuf,
			      ut_iobuf_get_buf2_cb);
	CU_ASSERT_PTR_NULL(buf2);
}

static void
iobuf_priority(void)
{
	struct spdk_iobuf_opts opts = {
		.small_pool_count = 2,
		.large_pool_count = 2,
		.small_bufsize = SMALL_BUFSIZE,
		.large_bufsize = LARGE_BUFSIZE,
	};
	struct ut_iobuf_entry entries[4] = {};
	struct spdk_iobuf_channel iobuf_ch;
	int rc, finish = 0;
	uint32_t i;

	allocate_cores(1);
	allocate_threads(1);

	set_thread(0);

	/* We cannot use spdk_iobuf_set_opts(), as it won't allow us to use such small pools */
	g_iobuf.opts = opts;
	rc = spdk_iobuf_initialize();
	CU_ASSERT_EQUAL(rc, 0);

	rc = spdk_iobuf_register_module("ut_module");
	CU_ASSERT_EQUAL(rc, 0);
	rc = spdk_iobuf_channel_init(&iobuf_ch, "ut_module", 0, 0);
	CU_ASSERT_EQUAL(rc, 0);

	for (i = 0; i < SPDK_COUNTOF(entries); ++i) {
		entries[i].ioch = &iobuf_ch;
	}

	/* Check that requests for an iobuf called from within the iobuf_get_cb are prioritized */
	entries[0].buf = spdk_iobuf_get(&iobuf_ch, SMALL_BUFSIZE, NULL, NULL);
	CU_ASSERT_PTR_NOT_NULL(entries[0].buf);
	entries[1].buf = spdk_iobuf_get(&iobuf_ch, SMALL_BUFSIZE, NULL, NULL);
	CU_ASSERT_PTR_NOT_NULL(entries[1].buf);

	/* Try to acquire two iobufs twice */
	entries[2].buf = spdk_iobuf_get(&iobuf_ch, SMALL_BUFSIZE, &entries[2].iobuf,
					ut_iobuf_get_buf1_cb);
	CU_ASSERT_PTR_NULL(entries[2].buf);
	entries[3].buf = spdk_iobuf_get(&iobuf_ch, SMALL_BUFSIZE, &entries[3].iobuf,
					ut_iobuf_get_buf1_cb);
	CU_ASSERT_PTR_NULL(entries[3].buf);

	/* Return one of the iobufs - the first entry on the wait queue should get it */
	spdk_iobuf_put(&iobuf_ch, entries[0].buf, SMALL_BUFSIZE);
	CU_ASSERT_PTR_NOT_NULL(entries[2].buf);
	CU_ASSERT_PTR_NULL(entries[3].buf);

	/* Return the second one, this time the same entry should get it, because it requested
	 * inside its iobuf_get_cb */
	spdk_iobuf_put(&iobuf_ch, entries[1].buf, SMALL_BUFSIZE);
	CU_ASSERT_PTR_NOT_NULL(entries[2].buf2);
	CU_ASSERT_PTR_NULL(entries[3].buf);

	/* Release it again, now the last entry should finally get it */
	spdk_iobuf_put(&iobuf_ch, entries[2].buf, SMALL_BUFSIZE);
	spdk_iobuf_put(&iobuf_ch, entries[2].buf2, SMALL_BUFSIZE);
	CU_ASSERT_PTR_NOT_NULL(entries[3].buf);
	CU_ASSERT_PTR_NOT_NULL(entries[3].buf2);
	spdk_iobuf_put(&iobuf_ch, entries[3].buf, SMALL_BUFSIZE);
	spdk_iobuf_put(&iobuf_ch, entries[3].buf2, SMALL_BUFSIZE);

	spdk_iobuf_channel_fini(&iobuf_ch);
	poll_threads();

	spdk_iobuf_finish(ut_iobuf_finish_cb, &finish);
	poll_threads();

	CU_ASSERT_EQUAL(finish, 1);

	free_threads();
	free_cores();
}

int
main(int argc, char **argv)
{
@@ -626,6 +731,7 @@ main(int argc, char **argv)
	suite = CU_add_suite("io_channel", NULL, NULL);
	CU_ADD_TEST(suite, iobuf);
	CU_ADD_TEST(suite, iobuf_cache);
	CU_ADD_TEST(suite, iobuf_priority);

	num_failures = spdk_ut_run_tests(argc, argv, NULL);
	CU_cleanup_registry();