Commit 5686278b authored by Vladislav Fedyaev's avatar Vladislav Fedyaev Committed by Tomasz Zawadzki
Browse files

lib/thread: Faster spdk_for_each_channel implementation



Current implementation performs poorly on big number of threads, because
it needs to iterate through all of them to find the ones with channels
opened to current bdev. This implementation stores such threads in
RB-tree, dramaticaly increasing performance. This is important, because
spdk_for_each_channel requires g_devlist_mutex locked.

Change-Id: If73cc279c87a70cc3b7ae915d0863e6c81a1f83d
Signed-off-by: default avatarVladislav Fedyaev <a37206@gmail.com>
Reviewed-on: https://review.spdk.io/c/spdk/spdk/+/26121


Tested-by: default avatarSPDK Automated Test System <spdkbot@gmail.com>
Reviewed-by: default avatarShuhei Matsumoto <smatsumoto@nvidia.com>
Reviewed-by: default avatarJim Harris <jim.harris@nvidia.com>
Community-CI: Mellanox Build Bot
Reviewed-by: default avatarVasilii Ivanov <iwanovvvasilij@gmail.com>
parent f4cdd081
Loading
Loading
Loading
Loading
+71 −20
Original line number Diff line number Diff line
@@ -257,6 +257,12 @@ spin_abort g_spin_abort_fn = __posix_abort;
#define SPIN_ASSERT_RETURN(cond, err, ret)	SPIN_ASSERT_IMPL(cond, err, , return ret)
#define SPIN_ASSERT(cond, err)			SPIN_ASSERT_IMPL(cond, err, ,)

struct thread_link {
	struct spdk_thread *thread;
	uint64_t id;
	RB_ENTRY(thread_link)	node;
};

struct io_device {
	void				*io_device;
	char				name[SPDK_MAX_DEVICE_NAME_LEN + 1];
@@ -272,8 +278,17 @@ struct io_device {

	bool				pending_unregister;
	bool				unregistered;
	RB_HEAD(thread_link_tree, thread_link) threads;
};

static inline int
thread_link_compare(struct thread_link *tl1, struct thread_link *tl2)
{
	return (tl1->id < tl2->id ? -1 : tl1->id > tl2->id);
}

RB_GENERATE_STATIC(thread_link_tree, thread_link, node, thread_link_compare);

static RB_HEAD(io_device_tree, io_device) g_io_devices = RB_INITIALIZER(g_io_devices);

static int
@@ -2212,6 +2227,7 @@ spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
	dev->for_each_count = 0;
	dev->unregistered = false;
	dev->refcnt = 0;
	RB_INIT(&dev->threads);

	SPDK_DEBUGLOG(thread, "Registering io_device %s (%p) on thread %s\n",
		      dev->name, dev->io_device, thread->name);
@@ -2346,6 +2362,7 @@ struct spdk_io_channel *
spdk_get_io_channel(void *io_device)
{
	struct spdk_io_channel *ch;
	struct thread_link *thr_link;
	struct spdk_thread *thread;
	struct io_device *dev;
	int rc;
@@ -2396,6 +2413,14 @@ spdk_get_io_channel(void *io_device)
		return NULL;
	}

	thr_link = calloc(1, sizeof(struct thread_link));
	if (thr_link == NULL) {
		free(ch);
		SPDK_ERRLOG("could not calloc thread_link\n");
		pthread_mutex_unlock(&g_devlist_mutex);
		return NULL;
	}

	ch->dev = dev;
	ch->destroy_cb = dev->destroy_cb;
	ch->thread = thread;
@@ -2408,6 +2433,12 @@ spdk_get_io_channel(void *io_device)

	dev->refcnt++;

	thr_link->thread = thread;
	thr_link->id = thread->id;
	if (RB_INSERT(thread_link_tree, &dev->threads, thr_link)) {
		assert(false);
	}

	pthread_mutex_unlock(&g_devlist_mutex);

	rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
@@ -2416,6 +2447,8 @@ spdk_get_io_channel(void *io_device)
		RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch);
		dev->refcnt--;
		free(ch);
		RB_REMOVE(thread_link_tree, &dev->threads, thr_link);
		free(thr_link);
		SPDK_ERRLOG("could not create io_channel for io_device %s (%p): %s (rc=%d)\n",
			    dev->name, io_device, spdk_strerror(-rc), rc);
		if (dev->unregistered && dev->refcnt == 0) {
@@ -2439,6 +2472,7 @@ put_io_channel(void *arg)
	struct spdk_io_channel *ch = arg;
	bool do_remove_dev = true;
	struct spdk_thread *thread;
	struct thread_link *thr_link, *ptmp;

	thread = spdk_get_thread();
	if (!thread) {
@@ -2466,6 +2500,13 @@ put_io_channel(void *arg)

	pthread_mutex_lock(&g_devlist_mutex);
	RB_REMOVE(io_channel_tree, &ch->thread->io_channels, ch);
	RB_FOREACH_SAFE(thr_link, thread_link_tree, &ch->dev->threads, ptmp) {
		if (thr_link->thread == thread) {
			RB_REMOVE(thread_link_tree, &ch->dev->threads, thr_link);
			free(thr_link);
			break;
		}
	}
	pthread_mutex_unlock(&g_devlist_mutex);

	/* Don't hold the devlist mutex while the destroy_cb is called. */
@@ -2626,9 +2667,8 @@ void
spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
		      spdk_channel_for_each_cpl cpl)
{
	struct spdk_thread *thread;
	struct spdk_io_channel *ch;
	struct spdk_io_channel_iter *i;
	struct thread_link *thr_link;
	int rc __attribute__((unused));

	i = calloc(1, sizeof(*i));
@@ -2664,17 +2704,19 @@ spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
		goto end;
	}

	TAILQ_FOREACH(thread, &g_threads, tailq) {
		ch = thread_get_io_channel(thread, i->dev);
		if (ch != NULL) {
			ch->dev->for_each_count++;
			i->cur_thread = thread;
			i->ch = ch;
			rc = spdk_thread_send_msg(thread, _call_channel, i);
	thr_link = RB_MIN(thread_link_tree, &i->dev->threads);
	if (thr_link != NULL) {
		i->dev->for_each_count++;
		i->cur_thread = thr_link->thread;
		i->ch = thread_get_io_channel(thr_link->thread, i->dev); /* must exist, thread is in tail queue */
		rc = spdk_thread_send_msg(i->cur_thread, _call_channel, i);
		if (rc == 0) {
			pthread_mutex_unlock(&g_devlist_mutex);
			assert(rc == 0);
			return;
		}
		SPDK_ERRLOG("spdk_for_each_channel(): can't continue iteration on thread '%s', spdk_thread_send_msg() rc = %d\n",
			    thr_link->thread->name, rc);
		assert(false);
	}

end:
@@ -2694,11 +2736,20 @@ __pending_unregister(void *arg)
	spdk_io_device_unregister(dev->io_device, dev->unregister_cb);
}

static struct spdk_thread *
io_dev_get_next_thread(struct io_device *dev, struct spdk_thread *thread)
{
	struct thread_link find = {}, *res;

	find.id = thread->id + 1;
	res = RB_NFIND(thread_link_tree, &dev->threads, &find);
	return res ? res->thread : NULL;
}

void
spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
{
	struct spdk_thread *thread;
	struct spdk_io_channel *ch;
	struct io_device *dev;
	int rc __attribute__((unused));

@@ -2712,18 +2763,18 @@ spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
		goto end;
	}

	thread = TAILQ_NEXT(i->cur_thread, tailq);
	while (thread) {
		ch = thread_get_io_channel(thread, dev);
		if (ch != NULL) {
	thread = io_dev_get_next_thread(i->dev, i->cur_thread);
	if (thread != NULL) {
		i->cur_thread = thread;
			i->ch = ch;
			rc = spdk_thread_send_msg(thread, _call_channel, i);
		i->ch = thread_get_io_channel(thread, dev);
		rc = spdk_thread_send_msg(i->cur_thread, _call_channel, i);
		if (rc == 0) {
			pthread_mutex_unlock(&g_devlist_mutex);
			assert(rc == 0);
			return;
		}
		thread = TAILQ_NEXT(thread, tailq);
		SPDK_ERRLOG("spdk_for_each_channel(): can't continue iteration on thread '%s', spdk_thread_send_msg() rc = %d\n",
			    thread->name, rc);
		assert(false);
	}

end: