Commit dfae1900 authored by Yifan Bian's avatar Yifan Bian Committed by Jim Harris
Browse files

ublk: enable UBLK_F_NEED_GET_DATA



Use UBLK_F_NEED_GET_DATA to allow our ublk server to
allocate buffers on demand rather than pre-allocation.
This allows us to generally use a global mempool to
satisfy all of the buffer allocations across all
devices.

Upcoming patches will actually switch this global
mempool to iobuf instead.  Using iobufs will
require more surgery to ublk, so using the global
mempool as a stopgap will help with breaking up
the series for easier reviews.

Change-Id: I2bbd5b47b535c3039f741bde48a5d83dd2818cf3
Signed-off-by: default avatarYifan Bian <yifan.bian@intel.com>
Signed-off-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/16692


Reviewed-by: default avatarKonrad Sztyber <konrad.sztyber@intel.com>
Community-CI: Mellanox Build Bot
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarXiaodong Liu <xiaodong.liu@intel.com>
parent c54a7a27
Loading
Loading
Loading
Loading
+59 −28
Original line number Diff line number Diff line
@@ -40,6 +40,7 @@
static uint32_t g_num_ublk_threads = 0;
static uint32_t g_queue_thread_id = 0;
static struct spdk_cpuset g_core_mask;
struct spdk_mempool *g_io_buf_pool;

struct ublk_queue;
struct ublk_thread_ctx;
@@ -66,6 +67,7 @@ __attribute__((unused)) = {
struct ublk_io {
	void			*payload;
	void			*mpool_entry;
	bool			need_data;
	uint32_t		payload_size;
	uint32_t		cmd_op;
	int32_t			result;
@@ -106,8 +108,6 @@ struct spdk_ublk_dev {
	uint32_t		ublk_id;
	uint32_t		num_queues;
	uint32_t		queue_depth;

	struct spdk_mempool	*io_buf_pool;
	struct ublk_queue	queues[UBLK_DEV_MAX_QUEUES];

	struct spdk_poller	*retry_poller;
@@ -430,8 +430,20 @@ ublk_create_target(const char *cpumask_str)
		return rc;
	}

	/* Add 4096 to account for alignment. */
	g_io_buf_pool = spdk_mempool_create("ublk_io_pool",
					    UBLK_DEV_MAX_QUEUE_DEPTH * 5,
					    UBLK_IO_MAX_BYTES + 4096,
					    0,
					    SPDK_ENV_SOCKET_ID_ANY);
	if (!g_io_buf_pool) {
		SPDK_ERRLOG("could not allocate ublk_io_buf pool\n");
		return -ENOMEM;
	}

	rc = ublk_open();
	if (rc != 0) {
		spdk_mempool_free(g_io_buf_pool);
		SPDK_ERRLOG("Fail to open UBLK, error=%s\n", spdk_strerror(-rc));
		return rc;
	}
@@ -461,6 +473,8 @@ _ublk_fini_done(void *args)
	g_queue_thread_id = 0;
	g_ublk_tgt.is_destroying = false;
	g_ublk_tgt.active = false;
	spdk_mempool_free(g_io_buf_pool);
	g_io_buf_pool = NULL;
	if (g_ublk_tgt.cb_fn) {
		g_ublk_tgt.cb_fn(g_ublk_tgt.cb_arg);
		g_ublk_tgt.cb_fn = NULL;
@@ -808,7 +822,7 @@ ublk_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
{
	struct ublk_io	*io = cb_arg;
	struct ublk_queue *q = io->q;
	int res;
	int res, tag;

	if (success) {
		res = io->result;
@@ -817,9 +831,11 @@ ublk_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
	}

	ublk_mark_io_done(io, res);
	tag = (int)(io - q->ios);
	q->ios[tag].need_data = false;

	SPDK_DEBUGLOG(ublk_io, "(qid %d tag %d res %d)\n",
		      q->q_id, (int)(io - q->ios), res);
		      q->q_id, tag, res);
	TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
	TAILQ_INSERT_TAIL(&q->completed_io_list, io, tailq);

@@ -860,7 +876,7 @@ ublk_io_get_buffer(struct ublk_io *io)
{
	void *buf;

	buf = spdk_mempool_get(io->q->dev->io_buf_pool);
	buf = spdk_mempool_get(g_io_buf_pool);
	io->mpool_entry = buf;
	io->payload = (void *)(uintptr_t)SPDK_ALIGN_CEIL((uintptr_t)buf, 4096ULL);
}
@@ -869,7 +885,7 @@ static void
ublk_io_put_buffer(struct ublk_io *io)
{
	if (io->payload) {
		spdk_mempool_put(io->q->dev->io_buf_pool, io->mpool_entry);
		spdk_mempool_put(g_io_buf_pool, io->mpool_entry);
		io->mpool_entry = NULL;
		io->payload = NULL;
	}
@@ -899,7 +915,8 @@ ublk_submit_bdev_io(struct ublk_queue *q, uint16_t tag)
	io->result = num_blocks * spdk_bdev_get_data_block_size(ublk->bdev);
	switch (ublk_op) {
	case UBLK_IO_OP_READ:
		rc = spdk_bdev_read_blocks(desc, ch, payload, offset_blocks, num_blocks, ublk_io_done, io);
		ublk_io_get_buffer(io);
		rc = spdk_bdev_read_blocks(desc, ch, io->payload, offset_blocks, num_blocks, ublk_io_done, io);
		break;
	case UBLK_IO_OP_WRITE:
		rc = spdk_bdev_write_blocks(desc, ch, payload, offset_blocks, num_blocks, ublk_io_done, io);
@@ -976,6 +993,7 @@ ublksrv_queue_io_cmd(struct ublk_queue *q,
static int
ublk_io_xmit(struct ublk_queue *q)
{
	TAILQ_HEAD(, ublk_io) buffer_free_list;
	int rc = 0, count = 0, tag;
	struct ublk_io *io;

@@ -983,6 +1001,7 @@ ublk_io_xmit(struct ublk_queue *q)
		return 0;
	}

	TAILQ_INIT(&buffer_free_list);
	while (!TAILQ_EMPTY(&q->completed_io_list)) {
		io = TAILQ_FIRST(&q->completed_io_list);
		tag = io - io->q->ios;
@@ -993,6 +1012,9 @@ ublk_io_xmit(struct ublk_queue *q)
		 * taken to work around a scan-build use-after-free mischaracterization.
		 */
		TAILQ_REMOVE(&q->completed_io_list, io, tailq);
		if (!io->need_data) {
			TAILQ_INSERT_TAIL(&buffer_free_list, io, tailq);
		}
		ublksrv_queue_io_cmd(q, io, tag);
		count++;
	}
@@ -1003,6 +1025,20 @@ ublk_io_xmit(struct ublk_queue *q)
		assert(false);
	}

	/* Note: for READ io, ublk will always copy the data out of
	 * the buffers in the io_uring_submit context.  Since we
	 * are not using SQPOLL for IO rings, we can safely free
	 * those IO buffers here.  This design doesn't seem ideal,
	 * but it's what's possible since there is no discrete
	 * COMMIT_REQ operation.  That will need to change in the
	 * future should we ever want to support async copy
	 * operations.
	 */
	while (!TAILQ_EMPTY(&buffer_free_list)) {
		io = TAILQ_FIRST(&buffer_free_list);
		TAILQ_REMOVE(&buffer_free_list, io, tailq);
		ublk_io_put_buffer(io);
	}
	return rc;
}

@@ -1042,10 +1078,11 @@ ublk_io_recv(struct ublk_queue *q)
		if (cqe->res == UBLK_IO_RES_OK) {
			ublk_submit_bdev_io(q, tag);
		} else if (cqe->res == UBLK_IO_RES_NEED_GET_DATA) {
			ublk_io_get_buffer(io);
			io->need_data = true;
			ublk_mark_io_get_data(io);
			TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
			TAILQ_INSERT_TAIL(&q->completed_io_list, io, tailq);

		} else {
			if (cqe->res != UBLK_IO_RES_ABORT) {
				SPDK_ERRLOG("ublk received error io: res %d qid %d tag %u cmd_op %u\n",
@@ -1191,16 +1228,28 @@ ublk_dev_queue_fini(struct ublk_queue *q)
static void
ublk_dev_queue_io_init(struct ublk_queue *q)
{
	struct ublk_io *io;
	uint32_t i;
	int rc __attribute__((unused));

	/* submit all io commands to ublk driver */
	for (i = 0; i < q->q_depth; i++) {
		ublksrv_queue_io_cmd(q, &q->ios[i], i);
		io = &q->ios[i];
		/* Some older kernels require a buffer to get posted, even
		 * when NEED_GET_DATA has been specified.  So allocate the
		 * buffers temporarily, then free them after the io_uring_submit.
		 */
		ublk_io_get_buffer(io);
		ublksrv_queue_io_cmd(q, io, i);
	}

	rc = io_uring_submit(&q->ring);
	assert(rc == (int)q->q_depth);
	for (i = 0; i < q->q_depth; i++) {
		io = &q->ios[i];
		assert(io->payload != NULL);
		ublk_io_put_buffer(io);
	}
}

static void
@@ -1238,7 +1287,7 @@ ublk_info_param_init(struct spdk_ublk_dev *ublk)
		.dev_id = ublk->ublk_id,
		.max_io_buf_bytes = UBLK_IO_MAX_BYTES,
		.ublksrv_pid = getpid(),
		.flags = UBLK_F_URING_CMD_COMP_IN_TASK,
		.flags = UBLK_F_NEED_GET_DATA | UBLK_F_URING_CMD_COMP_IN_TASK,
	};
	struct ublk_params uparams = {
		.types = UBLK_PARAM_TYPE_BASIC,
@@ -1287,32 +1336,15 @@ ublk_ios_fini(struct spdk_ublk_dev *ublk)
		free(q->ios);
		q->ios = NULL;
	}

	spdk_mempool_free(ublk->io_buf_pool);
}

static int
ublk_ios_init(struct spdk_ublk_dev *ublk)
{
	char mempool_name[32];
	int rc;
	uint32_t i, j;
	struct ublk_queue *q;

	snprintf(mempool_name, sizeof(mempool_name), "ublk_io_buf_pool_%d", ublk->ublk_id);

	/* Create a mempool to allocate buf for each io */
	ublk->io_buf_pool = spdk_mempool_create(mempool_name,
						ublk->num_queues * ublk->queue_depth,
						UBLK_IO_MAX_BYTES + 4096,
						SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
						SPDK_ENV_SOCKET_ID_ANY);
	if (ublk->io_buf_pool == NULL) {
		rc = -ENOMEM;
		SPDK_ERRLOG("could not allocate ublk_io_buf pool\n");
		return rc;
	}

	for (i = 0; i < ublk->num_queues; i++) {
		q = &ublk->queues[i];

@@ -1329,7 +1361,6 @@ ublk_ios_init(struct spdk_ublk_dev *ublk)
		}
		for (j = 0; j < q->q_depth; j++) {
			q->ios[j].q = q;
			ublk_io_get_buffer(&q->ios[j]);
		}
	}