Commit 1b6ed1ff authored by Jim Harris's avatar Jim Harris
Browse files

ublk: remove extra pthreads from ctrl uring processing



We make a few changes here to enable this:

1) Set IORING_SETUP_SQPOLL on the control ring.
   Otherwise when UBLK_START_DEV is submitted it
   will be processed in the context of the system
   call itself, resulting the kernel block layer
   submitting reads to the new device which blocks
   the thread - meaning the system call may never
   return.
2) Save the cmd_op in each sqe, along with the
   spdk_ublk_dev pointer.
3) Add a poller to poll the ctrl ring.  The poller
   can get the ublk and cmd_op from the cqe to
   know which ctrl operation completed and take
   next steps as necessary.

Signed-off-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/16404

 (master)

(cherry picked from commit 57c27f02)
Change-Id: Ia0e51a4ff74781c85967c54969fbfc67a0d3f115
Signed-off-by: default avatarKrzysztof Karas <krzysztof.karas@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/16478


Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarTomasz Zawadzki <tomasz.zawadzki@intel.com>
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
parent 3d302700
Loading
Loading
Loading
Loading
+86 −87
Original line number Diff line number Diff line
@@ -43,7 +43,9 @@ struct ublk_thread_ctx;
static void ublk_submit_bdev_io(struct ublk_queue *q, uint16_t tag);
static void ublk_dev_queue_fini(struct ublk_queue *q);
static int ublk_poll(void *arg);
static int finish_start(struct spdk_ublk_dev *ublk);
static void ublk_set_params(struct spdk_ublk_dev *ublk);
static void ublk_finish_start(struct spdk_ublk_dev *ublk);
static void ublk_delete_dev(struct spdk_ublk_dev *ublk);

struct ublk_io {
	void			*payload;
@@ -91,14 +93,14 @@ struct spdk_ublk_dev {
	struct spdk_mempool	*io_buf_pool;
	struct ublk_queue	queues[UBLK_DEV_MAX_QUEUES];

	/* Synchronize ublk_start_kernel pthread and ublk_stop */
	volatile int8_t		pending_ublk_pthread;
	struct spdk_poller	*retry_poller;
	int			retry_count;
	uint32_t		q_deinit_num;
	volatile bool		is_closing;
	ublk_del_cb		del_cb;
	void			*cb_arg;
	uint32_t		last_cmd_op;
	uint32_t		ctrl_ops_in_progress;

	TAILQ_ENTRY(spdk_ublk_dev) tailq;
};
@@ -116,6 +118,8 @@ struct ublk_tgt {
	spdk_ublk_fini_cb	cb_fn;
	void			*cb_arg;
	struct io_uring		ctrl_ring;
	struct spdk_poller	*ctrl_poller;
	uint32_t		ctrl_ops_in_progress;
	struct ublk_thread_ctx	thread_ctx[UBLK_THREAD_MAX];
};

@@ -186,13 +190,54 @@ spdk_ublk_init(void)
	}
}

static int
ublk_ctrl_poller(void *arg)
{
	struct io_uring *ring = &g_ublk_tgt.ctrl_ring;
	struct spdk_ublk_dev *ublk;
	struct io_uring_cqe *cqe;
	const int max = 8;
	int i, count = 0, rc;

	for (i = 0; i < max; i++) {
		rc = io_uring_peek_cqe(ring, &cqe);
		if (rc == -EAGAIN) {
			break;
		}

		assert(cqe != NULL);
		g_ublk_tgt.ctrl_ops_in_progress--;
		if (g_ublk_tgt.ctrl_ops_in_progress == 0) {
			spdk_poller_unregister(&g_ublk_tgt.ctrl_poller);
		}
		ublk = (struct spdk_ublk_dev *)cqe->user_data;
		ublk->ctrl_ops_in_progress--;
		switch (ublk->last_cmd_op) {
		case UBLK_CMD_ADD_DEV:
			ublk_set_params(ublk);
			break;
		case UBLK_CMD_SET_PARAMS:
			ublk_finish_start(ublk);
			break;
		case UBLK_CMD_DEL_DEV:
			ublk_delete_dev(ublk);
			break;
		default:
			break;
		}
		io_uring_cqe_seen(ring, cqe);
		count++;
	}

	return count > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
}

static int
ublk_ctrl_cmd(struct spdk_ublk_dev *ublk, uint32_t cmd_op, void *args)
{
	uint32_t dev_id = ublk->ublk_id;
	int rc = -EINVAL;
	struct io_uring_sqe *sqe;
	struct io_uring_cqe *cqe;
	struct ublksrv_ctrl_cmd *cmd;
	int *ublk_pid;
	struct ublksrv_ctrl_dev_info *dev_info;
@@ -209,6 +254,7 @@ ublk_ctrl_cmd(struct spdk_ublk_dev *ublk, uint32_t cmd_op, void *args)
	sqe->ioprio = 0;
	cmd->dev_id = dev_id;
	cmd->queue_id = -1;
	ublk->last_cmd_op = cmd_op;

	switch (cmd_op) {
	case UBLK_CMD_START_DEV:
@@ -234,22 +280,21 @@ ublk_ctrl_cmd(struct spdk_ublk_dev *ublk, uint32_t cmd_op, void *args)
		return -EINVAL;
	}
	ublk_set_sqe_cmd_op(sqe, cmd_op);
	io_uring_sqe_set_data(sqe, cmd);
	io_uring_sqe_set_data(sqe, ublk);

	rc = io_uring_submit(&g_ublk_tgt.ctrl_ring);
	if (rc < 0) {
		SPDK_ERRLOG("uring submit rc %d\n", rc);
		return rc;
	if (g_ublk_tgt.ctrl_ops_in_progress == 0) {
		g_ublk_tgt.ctrl_poller = SPDK_POLLER_REGISTER(ublk_ctrl_poller, NULL, 10);
	}

	rc = io_uring_wait_cqe(&g_ublk_tgt.ctrl_ring, &cqe);
	rc = io_uring_submit(&g_ublk_tgt.ctrl_ring);
	if (rc < 0) {
		SPDK_ERRLOG("wait cqe: %s\n", strerror(-rc));
		SPDK_ERRLOG("uring submit rc %d\n", rc);
		return rc;
	}
	io_uring_cqe_seen(&g_ublk_tgt.ctrl_ring, cqe);
	g_ublk_tgt.ctrl_ops_in_progress++;
	ublk->ctrl_ops_in_progress++;

	return cqe->res;
	return 0;
}

static int
@@ -274,7 +319,8 @@ ublk_open(void)
		return -rc;
	}

	rc = ublk_setup_ring(UBLK_CTRL_RING_DEPTH, &g_ublk_tgt.ctrl_ring, IORING_SETUP_SQE128);
	rc = ublk_setup_ring(UBLK_CTRL_RING_DEPTH, &g_ublk_tgt.ctrl_ring,
			     IORING_SETUP_SQE128 | IORING_SETUP_SQPOLL);
	if (rc < 0) {
		SPDK_ERRLOG("UBLK ctrl queue_init: %s\n", spdk_strerror(-rc));
		close(g_ublk_tgt.ctrl_fd);
@@ -403,84 +449,33 @@ ublk_thread_exit(void *args)
	}
}

static void *
_ublk_start_kernel(void *arg)
static int
ublk_start_kernel(void *arg)
{
	struct spdk_ublk_dev *ublk = arg;
	int rc;

	SPDK_DEBUGLOG(ublk, "Enter start pthread for ublk %d\n", ublk->ublk_id);
	assert(ublk->dev_info.ublksrv_pid == getpid());
	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_START_DEV, &ublk->dev_info.ublksrv_pid);
	if (rc < 0) {
		SPDK_ERRLOG("start dev %d failed, rc %s\n", ublk->ublk_id,
			    spdk_strerror(-rc));
	}
	__atomic_fetch_sub(&ublk->pending_ublk_pthread, 1, __ATOMIC_RELAXED);
	SPDK_DEBUGLOG(ublk, "Exit start pthread for ublk %d\n", ublk->ublk_id);

	pthread_exit(NULL);
	return rc;
}

static int
ublk_start_kernel(struct spdk_ublk_dev *ublk)
{
	pthread_t tid;
	int rc;

	__atomic_fetch_add(&ublk->pending_ublk_pthread, 1, __ATOMIC_RELAXED);
	/*
	 * Commands UBLK_CMD_START_DEV will block the reactor while some io are
	 * transfered to SPDK for processing, which leads hanging. To avoid that,
	 * a pthread needs to be created. It's also the same scenario for UBLK_CMD_STOP_DEV.
	 */
	rc = pthread_create(&tid, NULL, _ublk_start_kernel, ublk);
	if (rc != 0) {
		__atomic_fetch_sub(&ublk->pending_ublk_pthread, 1, __ATOMIC_RELAXED);
		SPDK_ERRLOG("could not create thread: %s\n", spdk_strerror(rc));
		return -rc;
	}
	rc = pthread_detach(tid);
	assert(rc == 0);

	return 0;
}

static void *
_ublk_stop_kernel(void *arg)
ublk_stop_kernel(struct spdk_ublk_dev *ublk)
{
	struct spdk_ublk_dev *ublk = arg;
	int rc;

	SPDK_DEBUGLOG(ublk, "Enter stop pthread for ublk %d\n", ublk->ublk_id);

	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_STOP_DEV, NULL);
	if (rc < 0) {
		SPDK_ERRLOG("stop dev %d failed\n", ublk->ublk_id);
	}
	__atomic_fetch_sub(&ublk->pending_ublk_pthread, 1, __ATOMIC_RELAXED);
	SPDK_DEBUGLOG(ublk, "Exit stop pthread for ublk %d\n", ublk->ublk_id);

	pthread_exit(NULL);
}

static int
ublk_stop_kernel(struct spdk_ublk_dev *ublk)
{
	pthread_t tid;
	int rc;

	__atomic_fetch_add(&ublk->pending_ublk_pthread, 1, __ATOMIC_RELAXED);
	rc = pthread_create(&tid, NULL, _ublk_stop_kernel, ublk);
	if (rc != 0) {
		__atomic_fetch_sub(&ublk->pending_ublk_pthread, 1, __ATOMIC_RELAXED);
		SPDK_ERRLOG("could not create thread: %s\n", spdk_strerror(rc));
		return -rc;
	}
	rc = pthread_detach(tid);
	assert(rc == 0);

	return 0;
	return rc;
}

static int
@@ -506,6 +501,7 @@ _ublk_fini(void *args)

	/* Check if all ublks closed */
	if (TAILQ_EMPTY(&g_ublk_bdevs)) {
		spdk_poller_unregister(&g_ublk_tgt.ctrl_poller);
		close(g_ublk_tgt.ctrl_ring.ring_fd);
		close(g_ublk_tgt.ctrl_fd);
		g_ublk_tgt.ctrl_ring.ring_fd = 0;
@@ -701,7 +697,11 @@ ublk_close_dev_done(void *arg)
	if (rc < 0) {
		SPDK_ERRLOG("delete dev %d failed\n", ublk->ublk_id);
	}
}

static void
ublk_delete_dev(struct spdk_ublk_dev *ublk)
{
	if (ublk->bdev_desc) {
		spdk_bdev_close(ublk->bdev_desc);
		ublk->bdev_desc = NULL;
@@ -722,8 +722,7 @@ _ublk_try_close_dev(void *arg)
	struct spdk_ublk_dev *ublk = arg;

	assert(spdk_get_thread() == ublk->app_thread);
	/* Continue the stop procedure after the exit of ublk_start_kernel pthread */
	if (__atomic_load_n(&ublk->pending_ublk_pthread, __ATOMIC_RELAXED) > 0) {
	if (ublk->ctrl_ops_in_progress > 0) {
		if (ublk->retry_poller == NULL) {
			ublk->retry_count = UBLK_STOP_BUSY_WAITING_MS * 1000ULL / UBLK_BUSY_POLLING_INTERVAL_US;
			ublk->retry_poller = SPDK_POLLER_REGISTER(_ublk_try_close_dev, ublk,
@@ -733,7 +732,7 @@ _ublk_try_close_dev(void *arg)
		if (ublk->retry_count-- > 0) {
			return SPDK_POLLER_BUSY;
		}
		SPDK_ERRLOG("Failed to wait for returning of ublk pthread.\n");
		SPDK_ERRLOG("Timeout on ctrl op completion.\n");
	}
	spdk_poller_unregister(&ublk->retry_poller);
	/* Update queue deinit number */
@@ -1184,20 +1183,22 @@ _ublk_start_disk(struct spdk_ublk_dev *ublk)
	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_ADD_DEV, &ublk->dev_info);
	if (rc < 0) {
		SPDK_ERRLOG("UBLK can't add dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc));
		goto err;
	}

	return rc;
}

static void
ublk_set_params(struct spdk_ublk_dev *ublk)
{
	int rc;

	ublk->dev_params.len = sizeof(struct ublk_params);
	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_SET_PARAMS, &ublk->dev_params);
	if (rc < 0) {
		SPDK_ERRLOG("UBLK can't set params for dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc));
		goto err;
		_ublk_try_close_dev(ublk);
	}

	return 0;

err:
	return rc;
}

/* Set ublk device parameters based on bdev */
@@ -1386,15 +1387,15 @@ ublk_start_disk(const char *bdev_name, uint32_t ublk_id,
		goto err;
	}

	return finish_start(ublk);
	return 0;

err:
	_ublk_try_close_dev(ublk);
	return rc;
}

static int
finish_start(struct spdk_ublk_dev *ublk)
static void
ublk_finish_start(struct spdk_ublk_dev *ublk)
{
	int			rc;
	uint32_t		q_id, i;
@@ -1419,8 +1420,7 @@ finish_start(struct spdk_ublk_dev *ublk)
		}
	}

	rc = ublk_start_kernel(ublk);
	if (rc != 0) {
	if (ublk_start_kernel(ublk) != 0) {
		goto err;
	}

@@ -1435,11 +1435,10 @@ finish_start(struct spdk_ublk_dev *ublk)
		}
	}

	return 0;
	return;

err:
	_ublk_try_close_dev(ublk);
	return rc;
}

SPDK_LOG_REGISTER_COMPONENT(ublk)