Commit 9b518c79 authored by Changpeng Liu's avatar Changpeng Liu Committed by Jim Harris
Browse files

ublk: optimize no sqe in ctrl uring case



Kernel `ublk_drv` driver can support up to 64 block devices by
default, we can read this parameter when starting ublk target,
then we can add this check in SPDK and use it as number of
control ring entries, because for each ublk device, the control
command is executed one by one, then we don't need to worry
about no sqe case for ctrl uring.

Change-Id: Id9334c99abac3424192a769adc8568c4b6827049
Signed-off-by: default avatarChangpeng Liu <changpeng.liu@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/18071


Community-CI: Mellanox Build Bot
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarKonrad Sztyber <konrad.sztyber@intel.com>
parent 93573400
Loading
Loading
Loading
Loading
+60 −32
Original line number Diff line number Diff line
@@ -25,7 +25,6 @@
#define UBLK_BLK_CDEV					"/dev/ublkc"

#define LINUX_SECTOR_SHIFT				9
#define UBLK_CTRL_RING_DEPTH		32
#define UBLK_POLL_GROUP_MAX				128
#define UBLK_IO_MAX_BYTES				SPDK_BDEV_LARGE_BUF_MAX_SIZE
#define UBLK_DEV_MAX_QUEUES				32
@@ -34,6 +33,8 @@
#define UBLK_STOP_BUSY_WAITING_MS			10000
#define UBLK_BUSY_POLLING_INTERVAL_US			20000
#define UBLK_DEFAULT_CTRL_URING_POLLING_INTERVAL_US	1000
/* By default, kernel ublk_drv driver can support up to 64 block devices */
#define UBLK_DEFAULT_MAX_SUPPORTED_DEVS			64

#define UBLK_IOBUF_SMALL_CACHE_SIZE			128
#define UBLK_IOBUF_LARGE_CACHE_SIZE			32
@@ -43,6 +44,7 @@

static uint32_t g_num_ublk_poll_groups = 0;
static uint32_t g_next_ublk_poll_group = 0;
static uint32_t g_ublks_max = UBLK_DEFAULT_MAX_SUPPORTED_DEVS;
static struct spdk_cpuset g_core_mask;

struct ublk_queue;
@@ -152,7 +154,7 @@ struct ublk_tgt {
	struct spdk_poller	*ctrl_poller;
	uint32_t		ctrl_ops_in_progress;
	struct ublk_poll_group	poll_group[UBLK_POLL_GROUP_MAX];
	TAILQ_HEAD(, spdk_ublk_dev)	ctrl_wait_tailq;
	uint32_t		num_ublk_devs;
};

static TAILQ_HEAD(, spdk_ublk_dev) g_ublk_devs = TAILQ_HEAD_INITIALIZER(g_ublk_devs);
@@ -246,13 +248,6 @@ ublk_ctrl_poller(void *arg)
			ublk->next_state_fn(ublk);
		}
		io_uring_cqe_seen(ring, cqe);
		ublk = TAILQ_FIRST(&g_ublk_tgt.ctrl_wait_tailq);
		if (ublk != NULL) {
			TAILQ_REMOVE(&g_ublk_tgt.ctrl_wait_tailq, ublk, wait_tailq);
			UBLK_DEBUGLOG(ublk, "resubmit ctrl cmd\n");
			rc = ublk_ctrl_cmd(ublk, ublk->ctrl_cmd_op);
			assert(rc == 0);
		}
		count++;
	}

@@ -272,13 +267,11 @@ ublk_ctrl_cmd(struct spdk_ublk_dev *ublk, uint32_t cmd_op)
	ublk->ctrl_cmd_op = cmd_op;
	sqe = io_uring_get_sqe(&g_ublk_tgt.ctrl_ring);
	if (!sqe) {
		/* The cmd_op was saved in the ublk, to ensure we use the
		 * correct cmd_op when it later gets resubmitted.
		 */
		UBLK_DEBUGLOG(ublk, "queue ctrl cmd\n");
		TAILQ_INSERT_TAIL(&g_ublk_tgt.ctrl_wait_tailq, ublk, wait_tailq);
		return 0;
		SPDK_ERRLOG("No available sqe in ctrl ring\n");
		assert(false);
		return -ENOENT;
	}

	cmd = (struct ublksrv_ctrl_cmd *)ublk_get_sqe_cmd(sqe);
	sqe->fd = g_ublk_tgt.ctrl_fd;
	sqe->opcode = IORING_OP_URING_CMD;
@@ -335,10 +328,31 @@ ublk_queue_cmd_buf_sz(uint32_t q_depth)
	return (size + page_sz - 1) & ~(page_sz - 1);
}

static int
ublk_get_max_support_devs(void)
{
	FILE *file;
	char str[128];

	file = fopen("/sys/module/ublk_drv/parameters/ublks_max", "r");
	if (!file) {
		return -ENOENT;
	}

	if (!fgets(str, sizeof(str), file)) {
		fclose(file);
		return -EINVAL;
	}
	fclose(file);

	spdk_str_chomp(str);
	return spdk_strtol(str, 10);
}

static int
ublk_open(void)
{
	int rc;
	int rc, ublks_max;

	g_ublk_tgt.ctrl_fd = open(UBLK_CTRL_DEV, O_RDWR);
	if (g_ublk_tgt.ctrl_fd < 0) {
@@ -347,10 +361,17 @@ ublk_open(void)
		return -rc;
	}

	ublks_max = ublk_get_max_support_devs();
	if (ublks_max > 0) {
		g_ublks_max = ublks_max;
	}

	/* We need to set SQPOLL for kernels 6.1 and earlier, since they would not defer ublk ctrl
	 * ring processing to a workqueue.  Ctrl ring processing is minimal, so SQPOLL is fine.
	 * All the commands sent via control uring for a ublk device is executed one by one, so use
	 * ublks_max * 2 as the number of uring entries is enough.
	 */
	rc = ublk_setup_ring(UBLK_CTRL_RING_DEPTH, &g_ublk_tgt.ctrl_ring,
	rc = ublk_setup_ring(g_ublks_max * 2, &g_ublk_tgt.ctrl_ring,
			     IORING_SETUP_SQE128 | IORING_SETUP_SQPOLL);
	if (rc < 0) {
		SPDK_ERRLOG("UBLK ctrl queue_init: %s\n", spdk_strerror(-rc));
@@ -430,8 +451,6 @@ ublk_create_target(const char *cpumask_str)
		return -EBUSY;
	}

	TAILQ_INIT(&g_ublk_tgt.ctrl_wait_tailq);

	rc = ublk_parse_core_mask(cpumask_str);
	if (rc != 0) {
		return rc;
@@ -672,6 +691,7 @@ ublk_dev_list_register(struct spdk_ublk_dev *ublk)

	UBLK_DEBUGLOG(ublk, "add to tailq\n");
	TAILQ_INSERT_TAIL(&g_ublk_devs, ublk, tailq);
	g_ublk_tgt.num_ublk_devs++;

	return 0;
}
@@ -687,6 +707,8 @@ ublk_dev_list_unregister(struct spdk_ublk_dev *ublk)
	if (ublk_dev_find_by_id(ublk->ublk_id)) {
		UBLK_DEBUGLOG(ublk, "remove from tailq\n");
		TAILQ_REMOVE(&g_ublk_devs, ublk, tailq);
		assert(g_ublk_tgt.num_ublk_devs);
		g_ublk_tgt.num_ublk_devs--;
		return;
	}

@@ -1502,6 +1524,12 @@ ublk_start_disk(const char *bdev_name, uint32_t ublk_id,
		SPDK_DEBUGLOG(ublk, "ublk id %d is in use.\n", ublk_id);
		return -EBUSY;
	}

	if (g_ublk_tgt.num_ublk_devs >= g_ublks_max) {
		SPDK_DEBUGLOG(ublk, "Reached maximum number of supported devices: %u\n", g_ublks_max);
		return -ENOTSUP;
	}

	ublk = calloc(1, sizeof(*ublk));
	if (ublk == NULL) {
		return -ENOMEM;