Commit 0680c7a2 authored by Yash Raj Singh's avatar Yash Raj Singh Committed by Tomasz Zawadzki
Browse files

lib/nvme: create single thread to process admin commands to cuse device



Earlier, we used to create multiple threads to process admin commands
to cuse devices: one for each NVMe disk and one for each of its
namespaces. This resulted in large number of threads in machines with
multiple NVMe disks. Moreover most of these threads largely stay idle
because they are only responsible to handle admin commands on that cuse
device and are not in the IO hot path. Therefore it makes sense to
consolidate all these threads into a single thread which handles all
the commands. This patch does exactly the same by creating a
'cuse_thread' to receive and process admin commands to all the attached
cuse devices.

Note that we have created two lists, one with pending session's to be
polled on and other with actively polling cuse sessions. The reason for
creating two lists is to avoid taking any locks in the poller loop of
cuse_thread(). Whenever a new session is to be polled on we add the
session to 'g_pending_device_head' and generate an eventfd event by
writing on 'g_cuse_thread_msg_fd'. This event is observed by the
cuse_thread which then takes 'g_pending_device_mtx' lock and moves the
session from 'g_pending_device_head' to 'g_active_device_head' list. In
this way we end up taking lock only when we need to add a new session
to be polled on. We use 'g_cuse_session_fdgrp' to poll on the fd's of
all fuse sessions in 'g_active_device_head' list.

List of field modifications in cuse_device:

1. pthread_t tid: This field is removed as we will now have a single
   thread polling for all admin commands.

2. bool force_exit: Older code ensured that we invoke
   fuse_session_reset() before cuse_lowlevel_teardown() by invoking
   pthread_join() on that cuse_thread's tid during force destruction of
   cuse device. This order is important because
   cuse_lowlevel_teardown() frees the fuse_session memory on which
   fuse_session_reset() operates. If this order is not maintained we
   could get us into use after free error. To strictly follow the order
   we have added a bool field 'force_exit' in cuse_device struct. When
   this bool is set to true the onus to call cuse_lowlevel_teardown()
   is on the cuse_thread where we maintain this order. Also this field
   signals the cuse_thread to free the cuse_device since the session
   exit has been done by the spdk library itself.

3. int fuse_efd: We would need the fd related to fuse session to remove
   it from fd group after the fuse session has exited. Also, we can't
   use fuse_session_fd() once the session has exited because the
   function definition states that it will return an undefined value.
   Hence we store the fd beforehand.

4. TAILQ_ENTRY(cuse_device) cuse_thread_tailq: Since we are storing the
   same struct in two different lists we need a different tail link for
   each of the lists. The two lists are 'g_ctrlr_ctx_head' and
   'g_pending_device_head' or 'g_active_device_head' depending on
   whether we have started polling on that session.

[Test Fix] We also fix test issues opened up by this patch. They are -

1. In test_cuse_update() we invoked nvme_cuse_start() directly which
   led to failures because helper data structures for cuse_thread were
   not initialised. This initialisation now take place in the first
   call to spdk_nvme_cuse_register(). In the test code we now invoke
   the register function rather than directly starting the cuse device.

2. In the test_nvme_cuse_stop() we created the spdk controller and
   multiple cuse devices associated with it. Then we stopped them by
   invoking nvme_cuse_stop(). Here we expected all the space allocated
   for cuse devices to be freed by the stop call but this isn't the
   case now. Since we have not registered the devices by invoking
   spdk_nvme_cuse_register() we don't poll for them in the cuse thread,
   which is now responsible for freeing the cuse_device memory. We fix
   this by registering the controller with spdk before stopping it.
   Also we now spin on g_device_fdgrp before ending the test to ensure
   that all resources of the cuse_thread have been freed.

Change-Id: I0c1f5d57841ef670ba407cf4f08c3bbbd1bcf78a
Signed-off-by: default avatarYash Raj Singh <yash.rajsingh@nutanix.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/21593


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarTomasz Zawadzki <tomasz.zawadzki@intel.com>
Reviewed-by: default avatarJim Harris <jim.harris@samsung.com>
Reviewed-by: default avatarVasilii Ivanov <ivanov.vas@xinnor.io>
parent 80287ba0
Loading
Loading
Loading
Loading
+173 −51
Original line number Diff line number Diff line
@@ -3,7 +3,9 @@
 *   All rights reserved.
 */
#include "spdk/stdinc.h"
#include "spdk/string.h"
#include "spdk/config.h"
#include "spdk/fd_group.h"
#include "spdk/log.h"
#include "spdk/nvme.h"

@@ -19,6 +21,7 @@
#include "nvme_cuse.h"

struct cuse_device {
	bool				force_exit;
	char				dev_name[128];
	uint32_t			index;
	int				claim_fd;
@@ -27,19 +30,28 @@ struct cuse_device {
	struct spdk_nvme_ctrlr		*ctrlr;		/**< NVMe controller */
	uint32_t			nsid;		/**< NVMe name space id, or 0 */

	pthread_t			tid;
	struct fuse_session		*session;
	int				fuse_efd;

	struct cuse_device		*ctrlr_device;
	TAILQ_HEAD(, cuse_device)	ns_devices;

	TAILQ_ENTRY(cuse_device)	tailq;
	TAILQ_ENTRY(cuse_device)	cuse_thread_tailq;
};

static pthread_mutex_t g_cuse_mtx = PTHREAD_MUTEX_INITIALIZER;
static TAILQ_HEAD(, cuse_device) g_ctrlr_ctx_head = TAILQ_HEAD_INITIALIZER(g_ctrlr_ctx_head);
static struct spdk_bit_array *g_ctrlr_started;

static pthread_mutex_t g_pending_device_mtx = PTHREAD_MUTEX_INITIALIZER;
static struct spdk_fd_group *g_device_fdgrp;
static int g_cuse_thread_msg_fd;
static TAILQ_HEAD(, cuse_device) g_pending_device_head = TAILQ_HEAD_INITIALIZER(
			g_pending_device_head);
static TAILQ_HEAD(, cuse_device) g_active_device_head = TAILQ_HEAD_INITIALIZER(
			g_active_device_head);

struct cuse_io_ctx {
	struct spdk_nvme_cmd		nvme_cmd;
	enum spdk_nvme_data_transfer	data_transfer;
@@ -907,36 +919,102 @@ cuse_session_create(struct cuse_device *cuse_device)
		return -1;
	}
	SPDK_NOTICELOG("fuse session for device %s created\n", cuse_device->dev_name);
	cuse_device->fuse_efd = fuse_session_fd(cuse_device->session);

	pthread_mutex_lock(&g_pending_device_mtx);
	TAILQ_INSERT_TAIL(&g_pending_device_head, cuse_device, cuse_thread_tailq);
	if (eventfd_write(g_cuse_thread_msg_fd, 1) != 0) {
		TAILQ_REMOVE(&g_pending_device_head, cuse_device, cuse_thread_tailq);
		pthread_mutex_unlock(&g_pending_device_mtx);
		SPDK_ERRLOG("eventfd_write failed: (%s).\n", spdk_strerror(errno));
		return -errno;
	}
	pthread_mutex_unlock(&g_pending_device_mtx);
	return 0;
}

static void *
cuse_thread(void *arg)
static int
process_cuse_event(void *arg)
{
	struct cuse_device *cuse_device = arg;
	int rc;
	struct fuse_session *session = arg;
	struct fuse_buf buf = { .mem = NULL };
	struct pollfd fds;
	int rc = fuse_session_receive_buf(session, &buf);

	if (rc > 0) {
		fuse_session_process_buf(session, &buf);
	}
	free(buf.mem);
	return 0;
}

static int
cuse_thread_add_session(void *arg)
{
	struct cuse_device *cuse_device, *tmp;
	int ret;
	eventfd_t val;

	eventfd_read(g_cuse_thread_msg_fd, &val);

	pthread_mutex_lock(&g_pending_device_mtx);
	TAILQ_FOREACH_SAFE(cuse_device, &g_pending_device_head, cuse_thread_tailq, tmp) {
		ret = spdk_fd_group_add(g_device_fdgrp, cuse_device->fuse_efd, process_cuse_event,
					cuse_device->session, cuse_device->dev_name);
		if (ret < 0) {
			SPDK_ERRLOG("Failed to add fd %d: (%s).\n", cuse_device->fuse_efd,
				    spdk_strerror(-ret));
			TAILQ_REMOVE(&g_pending_device_head, cuse_device, cuse_thread_tailq);
			free(cuse_device);
			assert(false);
		}
	}
	TAILQ_CONCAT(&g_active_device_head, &g_pending_device_head, cuse_thread_tailq);
	pthread_mutex_unlock(&g_pending_device_mtx);
	return 0;
}

static void *
cuse_thread(void *unused)
{
	struct cuse_device *cuse_device, *tmp;
	int timeout_msecs = 500;
	bool retry;

	spdk_unaffinitize_thread();

	/* Receive and process fuse requests */
	fds.fd = fuse_session_fd(cuse_device->session);
	fds.events = POLLIN;
	while (!fuse_session_exited(cuse_device->session)) {
		rc = poll(&fds, 1, timeout_msecs);
		if (rc <= 0) {
			continue;
	do {
		retry = false;
		spdk_fd_group_wait(g_device_fdgrp, timeout_msecs);
		while (!TAILQ_EMPTY(&g_active_device_head)) {
			TAILQ_FOREACH_SAFE(cuse_device, &g_active_device_head, cuse_thread_tailq, tmp) {
				if (fuse_session_exited(cuse_device->session)) {
					spdk_fd_group_remove(g_device_fdgrp, cuse_device->fuse_efd);
					fuse_session_reset(cuse_device->session);
					TAILQ_REMOVE(&g_active_device_head, cuse_device, cuse_thread_tailq);
					if (cuse_device->force_exit) {
						cuse_lowlevel_teardown(cuse_device->session);
						free(cuse_device);
					}
		rc = fuse_session_receive_buf(cuse_device->session, &buf);
		if (rc > 0) {
			fuse_session_process_buf(cuse_device->session, &buf);
				}
			}
	free(buf.mem);
	fuse_session_reset(cuse_device->session);
	pthread_exit(NULL);
			/* Receive and process fuse event and new cuse device addition requests. */
			spdk_fd_group_wait(g_device_fdgrp, timeout_msecs);
		}
		pthread_mutex_lock(&g_cuse_mtx);
		if (!TAILQ_EMPTY(&g_pending_device_head)) {
			pthread_mutex_unlock(&g_cuse_mtx);
			/* Retry as we have some cuse devices pending to be polled on. */
			retry = true;
		}
	} while (retry);

	spdk_fd_group_remove(g_device_fdgrp, g_cuse_thread_msg_fd);
	close(g_cuse_thread_msg_fd);
	spdk_fd_group_destroy(g_device_fdgrp);
	g_device_fdgrp = NULL;
	pthread_mutex_unlock(&g_cuse_mtx);
	SPDK_NOTICELOG("Cuse thread exited.\n");
	return NULL;
}

static struct cuse_device *nvme_cuse_get_cuse_ns_device(struct spdk_nvme_ctrlr *ctrlr,
@@ -949,7 +1027,7 @@ static struct cuse_device *nvme_cuse_get_cuse_ns_device(struct spdk_nvme_ctrlr *
static int
cuse_nvme_ns_start(struct cuse_device *ctrlr_device, uint32_t nsid)
{
	struct cuse_device *ns_device;
	struct cuse_device *ns_device = NULL;
	int rv;

	ns_device = nvme_cuse_get_cuse_ns_device(ctrlr_device->ctrlr, nsid);
@@ -969,37 +1047,33 @@ cuse_nvme_ns_start(struct cuse_device *ctrlr_device, uint32_t nsid)
		      ctrlr_device->dev_name, ns_device->nsid);
	if (rv < 0) {
		SPDK_ERRLOG("Device name too long.\n");
		free(ns_device);
		return -ENAMETOOLONG;
		rv = -ENAMETOOLONG;
		goto free_device;
	}

	rv = cuse_session_create(ns_device);
	if (rv != 0) {
		free(ns_device);
		return rv;
	}
	rv = pthread_create(&ns_device->tid, NULL, cuse_thread, ns_device);
	if (rv != 0) {
		SPDK_ERRLOG("pthread_create failed\n");
		free(ns_device);
		return -rv;
		goto free_device;
	}

	TAILQ_INSERT_TAIL(&ctrlr_device->ns_devices, ns_device, tailq);

	return 0;

free_device:
	free(ns_device);
	return rv;
}

static void
cuse_nvme_ns_stop(struct cuse_device *ctrlr_device, struct cuse_device *ns_device)
{
	if (ns_device->session != NULL) {
		fuse_session_exit(ns_device->session);
	}
	pthread_join(ns_device->tid, NULL);
	TAILQ_REMOVE(&ctrlr_device->ns_devices, ns_device, tailq);
	/* ns_device will be freed by cuse_thread */
	if (ns_device->session != NULL) {
		cuse_lowlevel_teardown(ns_device->session);
		ns_device->force_exit = true;
		fuse_session_exit(ns_device->session);
	}
	free(ns_device);
}

static int
@@ -1075,18 +1149,16 @@ cuse_nvme_ctrlr_stop(struct cuse_device *ctrlr_device)

	assert(TAILQ_EMPTY(&ctrlr_device->ns_devices));

	fuse_session_exit(ctrlr_device->session);
	pthread_join(ctrlr_device->tid, NULL);
	TAILQ_REMOVE(&g_ctrlr_ctx_head, ctrlr_device, tailq);
	spdk_bit_array_clear(g_ctrlr_started, ctrlr_device->index);
	if (spdk_bit_array_count_set(g_ctrlr_started) == 0) {
		spdk_bit_array_free(&g_ctrlr_started);
	}
	nvme_cuse_unclaim(ctrlr_device);
	if (ctrlr_device->session != NULL) {
		cuse_lowlevel_teardown(ctrlr_device->session);
	}
	free(ctrlr_device);

	TAILQ_REMOVE(&g_ctrlr_ctx_head, ctrlr_device, tailq);
	/* ctrlr_device will be freed by cuse_thread */
	ctrlr_device->force_exit = true;
	fuse_session_exit(ctrlr_device->session);
}

static int
@@ -1187,13 +1259,6 @@ nvme_cuse_start(struct spdk_nvme_ctrlr *ctrlr)
		goto clear_and_free;
	}

	rv = pthread_create(&ctrlr_device->tid, NULL, cuse_thread, ctrlr_device);
	if (rv != 0) {
		SPDK_ERRLOG("pthread_create failed\n");
		rv = -rv;
		goto clear_and_free;
	}

	TAILQ_INSERT_TAIL(&g_ctrlr_ctx_head, ctrlr_device, tailq);

	TAILQ_INIT(&ctrlr_device->ns_devices);
@@ -1298,6 +1363,54 @@ static struct nvme_io_msg_producer cuse_nvme_io_msg_producer = {
	.update = nvme_cuse_update,
};

static int
start_cuse_thread(void)
{
	int rc = 0;
	pthread_t tid;

	rc = spdk_fd_group_create(&g_device_fdgrp);
	if (rc < 0) {
		SPDK_ERRLOG("Failed to create fd group: (%s).\n", spdk_strerror(-rc));
		return rc;
	}

	g_cuse_thread_msg_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
	if (g_cuse_thread_msg_fd < 0) {
		SPDK_ERRLOG("Failed to create eventfd: (%s).\n", spdk_strerror(errno));
		rc = -errno;
		goto destroy_fd_group;
	}

	rc = SPDK_FD_GROUP_ADD(g_device_fdgrp, g_cuse_thread_msg_fd,
			       cuse_thread_add_session, NULL);
	if (rc < 0) {
		SPDK_ERRLOG("Failed to add fd %d: %s.\n", g_cuse_thread_msg_fd,
			    spdk_strerror(-rc));
		goto close_and_destroy_fd;
	}

	rc = pthread_create(&tid, NULL, cuse_thread, NULL);
	if (rc != 0) {
		SPDK_ERRLOG("pthread_create failed\n");
		rc = -rc;
		goto remove_close_and_destroy_fd;
	}
	pthread_detach(tid);
	pthread_setname_np(tid, "cuse_thread");
	SPDK_NOTICELOG("Successfully started cuse thread to poll for admin commands\n");
	return rc;

remove_close_and_destroy_fd:
	spdk_fd_group_remove(g_device_fdgrp, g_cuse_thread_msg_fd);
close_and_destroy_fd:
	close(g_cuse_thread_msg_fd);
destroy_fd_group:
	spdk_fd_group_destroy(g_device_fdgrp);
	g_device_fdgrp = NULL;
	return rc;
}

int
spdk_nvme_cuse_register(struct spdk_nvme_ctrlr *ctrlr)
{
@@ -1315,6 +1428,15 @@ spdk_nvme_cuse_register(struct spdk_nvme_ctrlr *ctrlr)

	pthread_mutex_lock(&g_cuse_mtx);

	if (g_device_fdgrp == NULL) {
		rc = start_cuse_thread();
		if (rc < 0) {
			SPDK_ERRLOG("Failed to start cuse thread to poll for admin commands\n");
			pthread_mutex_unlock(&g_cuse_mtx);
			return rc;
		}
	}

	rc = nvme_cuse_start(ctrlr);
	if (rc) {
		nvme_io_msg_ctrlr_unregister(ctrlr, &cuse_nvme_io_msg_producer);
+1 −1
Original line number Diff line number Diff line
@@ -138,7 +138,7 @@ test_cuse_update(void)
	int rc;
	struct spdk_nvme_ctrlr	ctrlr = {};

	rc = nvme_cuse_start(&ctrlr);
	rc = spdk_nvme_cuse_register(&ctrlr);
	CU_ASSERT(rc == 0);

	g_active_num_ns = 4;
+53 −21
Original line number Diff line number Diff line
@@ -53,11 +53,58 @@ DEFINE_STUB(spdk_nvme_ctrlr_is_active_ns, bool,
	    (struct spdk_nvme_ctrlr *ctrlr, uint32_t nsid), true);

DEFINE_STUB(fuse_reply_err, int, (fuse_req_t req, int err), 0);
DEFINE_STUB_V(fuse_session_exit, (struct fuse_session *se));
DEFINE_STUB(pthread_join, int, (pthread_t tid, void **val), 0);

DEFINE_STUB_V(nvme_ctrlr_update_namespaces, (struct spdk_nvme_ctrlr *ctrlr));

DEFINE_STUB_V(fuse_session_reset, (struct fuse_session *session));

struct fuse_session {
	int exited;
	int fd;
};

int
fuse_session_fd(struct fuse_session *session)
{
	return session->fd;
}

void
fuse_session_exit(struct fuse_session *session)
{
	session->exited = 1;
}

int
fuse_session_exited(struct fuse_session *session)
{
	return session->exited;
}

struct fuse_session *
cuse_lowlevel_setup(int argc, char *argv[], const struct cuse_info *ci,
		    const struct cuse_lowlevel_ops *clop, int *multithreaded, void *userdata)
{
	struct fuse_session *fuse_session = calloc(1, sizeof(struct fuse_session));
	if (fuse_session == NULL) {
		return NULL;
	}
	fuse_session->fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
	if (fuse_session->fd < 0) {
		free(fuse_session);
		return NULL;
	}
	return fuse_session;
}

void
cuse_lowlevel_teardown(struct fuse_session	*fuse_session)
{
	close(fuse_session->fd);
	free(fuse_session);
}

static int
nvme_ns_cmp(struct spdk_nvme_ns *ns1, struct spdk_nvme_ns *ns2)
{
@@ -496,33 +543,18 @@ test_cuse_nvme_reset(void)
static void
test_nvme_cuse_stop(void)
{
	int rc;
	struct spdk_nvme_ctrlr ctrlr = {};
	struct cuse_device *ctrlr_device = NULL;
	struct cuse_device *ns_dev1, *ns_dev2;
	ctrlr.cdata.nn = 2;

	/* Allocate memory for nvme_cuse_stop() to free. */
	ctrlr_device = calloc(1, sizeof(struct cuse_device));
	SPDK_CU_ASSERT_FATAL(ctrlr_device != NULL);

	TAILQ_INIT(&ctrlr_device->ns_devices);
	ns_dev1 = calloc(1, sizeof(struct cuse_device));
	SPDK_CU_ASSERT_FATAL(ns_dev1 != NULL);
	ns_dev2 = calloc(1, sizeof(struct cuse_device));
	SPDK_CU_ASSERT_FATAL(ns_dev2 != NULL);

	g_ctrlr_started = spdk_bit_array_create(128);
	SPDK_CU_ASSERT_FATAL(g_ctrlr_started != NULL);

	TAILQ_INSERT_TAIL(&ctrlr_device->ns_devices, ns_dev1, tailq);
	TAILQ_INSERT_TAIL(&ctrlr_device->ns_devices, ns_dev2, tailq);
	ctrlr.cdata.nn = 2;
	ctrlr_device->ctrlr = &ctrlr;
	pthread_mutex_init(&g_cuse_mtx, NULL);
	TAILQ_INSERT_TAIL(&g_ctrlr_ctx_head, ctrlr_device, tailq);
	rc = spdk_nvme_cuse_register(&ctrlr);
	CU_ASSERT(rc == 0);

	nvme_cuse_stop(&ctrlr);
	CU_ASSERT(g_ctrlr_started == NULL);
	CU_ASSERT(TAILQ_EMPTY(&g_ctrlr_ctx_head));
	while (g_device_fdgrp != NULL);
}

static void