Commit e12c82ed authored by Jim Harris's avatar Jim Harris
Browse files

ublk: add initial iobuf support



The initial support has a few areas that will get
cleaned up in future patches.

* This patch will do same as before and always
  allocate a large buffer, even if the required
  size is a lot smaller.  Later patches will
  specify the size so that we can utilize the
  iobuf small buffers instead of always using large
  ones.
* iobuf channel failure doesn't result in an error
  being properly propagated back to the caller. But
  poller registration failure wasn't propagating
  errors back either, so at least for this patch the
  behavior is similar.  Upcoming patches will
  properly report errors in setting up a poll group.

Signed-off-by: default avatarJim Harris <james.r.harris@intel.com>
Change-Id: I32fbf1204f099f85851a8d747750be7b26bebc4e
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/17919


Community-CI: Mellanox Build Bot
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarChangpeng Liu <changpeng.liu@intel.com>
Reviewed-by: default avatarKonrad Sztyber <konrad.sztyber@intel.com>
parent 7f02ef10
Loading
Loading
Loading
Loading
+68 −30
Original line number Diff line number Diff line
@@ -34,13 +34,15 @@
#define UBLK_STOP_BUSY_WAITING_MS	10000
#define UBLK_BUSY_POLLING_INTERVAL_US	20000

#define UBLK_IOBUF_SMALL_CACHE_SIZE	128
#define UBLK_IOBUF_LARGE_CACHE_SIZE	32

#define UBLK_DEBUGLOG(ublk, format, ...) \
	SPDK_DEBUGLOG(ublk, "ublk%d: " format, ublk->ublk_id, ##__VA_ARGS__);

static uint32_t g_num_ublk_poll_groups = 0;
static uint32_t g_next_ublk_poll_group = 0;
static struct spdk_cpuset g_core_mask;
struct spdk_mempool *g_io_buf_pool;

struct ublk_queue;
struct ublk_poll_group;
@@ -82,6 +84,7 @@ struct ublk_io {
	struct ublk_queue	*q;
	/* for bdev io_wait */
	struct spdk_bdev_io_wait_entry bdev_io_wait;
	struct spdk_iobuf_entry	iobuf;

	TAILQ_ENTRY(ublk_io)	tailq;
};
@@ -136,6 +139,7 @@ struct spdk_ublk_dev {
struct ublk_poll_group {
	struct spdk_thread		*ublk_thread;
	struct spdk_poller		*ublk_poller;
	struct spdk_iobuf_channel	iobuf_ch;
	TAILQ_HEAD(, ublk_queue)	queue_list;
};

@@ -400,10 +404,16 @@ static void
ublk_poller_register(void *args)
{
	struct ublk_poll_group *poll_group = args;
	int rc;

	assert(spdk_get_thread() == poll_group->ublk_thread);
	TAILQ_INIT(&poll_group->queue_list);
	poll_group->ublk_poller = SPDK_POLLER_REGISTER(ublk_poll, poll_group, 0);
	rc = spdk_iobuf_channel_init(&poll_group->iobuf_ch, "ublk",
				     UBLK_IOBUF_SMALL_CACHE_SIZE, UBLK_IOBUF_LARGE_CACHE_SIZE);
	if (rc != 0) {
		assert(false);
	}
}

int
@@ -426,24 +436,14 @@ ublk_create_target(const char *cpumask_str)
		return rc;
	}

	/* Add 4096 to account for alignment. */
	g_io_buf_pool = spdk_mempool_create("ublk_io_pool",
					    UBLK_DEV_MAX_QUEUE_DEPTH * 5,
					    UBLK_IO_MAX_BYTES + 4096,
					    0,
					    SPDK_ENV_SOCKET_ID_ANY);
	if (!g_io_buf_pool) {
		SPDK_ERRLOG("could not allocate ublk_io_buf pool\n");
		return -ENOMEM;
	}

	rc = ublk_open();
	if (rc != 0) {
		spdk_mempool_free(g_io_buf_pool);
		SPDK_ERRLOG("Fail to open UBLK, error=%s\n", spdk_strerror(-rc));
		return rc;
	}

	spdk_iobuf_register_module("ublk");

	SPDK_ENV_FOREACH_CORE(i) {
		if (!spdk_cpuset_get_cpu(&g_core_mask, i)) {
			continue;
@@ -454,6 +454,7 @@ ublk_create_target(const char *cpumask_str)
		spdk_thread_send_msg(poll_group->ublk_thread, ublk_poller_register, poll_group);
		g_num_ublk_poll_groups++;
	}

	g_ublk_tgt.active = true;
	SPDK_NOTICELOG("UBLK target created successfully\n");

@@ -468,8 +469,6 @@ _ublk_fini_done(void *args)
	g_next_ublk_poll_group = 0;
	g_ublk_tgt.is_destroying = false;
	g_ublk_tgt.active = false;
	spdk_mempool_free(g_io_buf_pool);
	g_io_buf_pool = NULL;
	if (g_ublk_tgt.cb_fn) {
		g_ublk_tgt.cb_fn(g_ublk_tgt.cb_arg);
		g_ublk_tgt.cb_fn = NULL;
@@ -486,6 +485,7 @@ ublk_thread_exit(void *args)
	for (i = 0; i < g_num_ublk_poll_groups; i++) {
		if (g_ublk_tgt.poll_group[i].ublk_thread == ublk_thread) {
			spdk_poller_unregister(&g_ublk_tgt.poll_group[i].ublk_poller);
			spdk_iobuf_channel_fini(&g_ublk_tgt.poll_group[i].iobuf_ch);
			spdk_thread_exit(ublk_thread);
		}
	}
@@ -849,28 +849,33 @@ ublk_queue_io(struct ublk_io *io)
}

static void
ublk_io_get_buffer_cb(struct ublk_io *io, void *buf)
ublk_io_get_buffer_cb(struct spdk_iobuf_entry *iobuf, void *buf)
{
	struct ublk_io *io = SPDK_CONTAINEROF(iobuf, struct ublk_io, iobuf);

	io->mpool_entry = buf;
	io->payload = (void *)(uintptr_t)SPDK_ALIGN_CEIL((uintptr_t)buf, 4096ULL);
	io->get_buf_cb(io);
}

static void
ublk_io_get_buffer(struct ublk_io *io, ublk_get_buf_cb get_buf_cb)
ublk_io_get_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch,
		   ublk_get_buf_cb get_buf_cb)
{
	void *buf;

	io->get_buf_cb = get_buf_cb;
	buf = spdk_mempool_get(g_io_buf_pool);
	ublk_io_get_buffer_cb(io, buf);
	buf = spdk_iobuf_get(iobuf_ch, UBLK_IO_MAX_BYTES, &io->iobuf, ublk_io_get_buffer_cb);
	if (buf != NULL) {
		ublk_io_get_buffer_cb(&io->iobuf, buf);
	}
}

static void
ublk_io_put_buffer(struct ublk_io *io)
ublk_io_put_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch)
{
	if (io->payload) {
		spdk_mempool_put(g_io_buf_pool, io->mpool_entry);
		spdk_iobuf_put(iobuf_ch, io->mpool_entry, UBLK_IO_MAX_BYTES);
		io->mpool_entry = NULL;
		io->payload = NULL;
	}
@@ -905,6 +910,7 @@ ublk_submit_bdev_io(struct ublk_queue *q, uint16_t tag)
	struct ublk_io *io = &q->ios[tag];
	struct spdk_bdev_desc *desc = io->bdev_desc;
	struct spdk_io_channel *ch = io->bdev_ch;
	struct spdk_iobuf_channel *iobuf_ch = &q->poll_group->iobuf_ch;
	uint64_t offset_blocks, num_blocks;
	uint8_t ublk_op;
	int rc = 0;
@@ -917,7 +923,7 @@ ublk_submit_bdev_io(struct ublk_queue *q, uint16_t tag)
	io->result = num_blocks * spdk_bdev_get_data_block_size(ublk->bdev);
	switch (ublk_op) {
	case UBLK_IO_OP_READ:
		ublk_io_get_buffer(io, read_get_buffer_done);
		ublk_io_get_buffer(io, iobuf_ch, read_get_buffer_done);
		return;
	case UBLK_IO_OP_WRITE:
		assert((void *)iod->addr == io->payload);
@@ -996,6 +1002,7 @@ static int
ublk_io_xmit(struct ublk_queue *q)
{
	TAILQ_HEAD(, ublk_io) buffer_free_list;
	struct spdk_iobuf_channel *iobuf_ch;
	int rc = 0, count = 0, tag;
	struct ublk_io *io;

@@ -1036,10 +1043,11 @@ ublk_io_xmit(struct ublk_queue *q)
	 * future should we ever want to support async copy
	 * operations.
	 */
	iobuf_ch = &q->poll_group->iobuf_ch;
	while (!TAILQ_EMPTY(&buffer_free_list)) {
		io = TAILQ_FIRST(&buffer_free_list);
		TAILQ_REMOVE(&buffer_free_list, io, tailq);
		ublk_io_put_buffer(io);
		ublk_io_put_buffer(io, iobuf_ch);
	}
	return rc;
}
@@ -1061,12 +1069,14 @@ ublk_io_recv(struct ublk_queue *q)
	int fetch, count = 0;
	struct ublk_io *io;
	struct spdk_ublk_dev *dev = q->dev;
	struct spdk_iobuf_channel *iobuf_ch;
	unsigned __attribute__((unused)) cmd_op;

	if (q->cmd_inflight == 0) {
		return 0;
	}

	iobuf_ch = &q->poll_group->iobuf_ch;
	io_uring_for_each_cqe(&q->ring, head, cqe) {
		tag = user_data_to_tag(cqe->user_data);
		cmd_op = user_data_to_op(cqe->user_data);
@@ -1089,7 +1099,7 @@ ublk_io_recv(struct ublk_queue *q)
		if (cqe->res == UBLK_IO_RES_OK) {
			ublk_submit_bdev_io(q, tag);
		} else if (cqe->res == UBLK_IO_RES_NEED_GET_DATA) {
			ublk_io_get_buffer(io, write_get_buffer_done);
			ublk_io_get_buffer(io, iobuf_ch, write_get_buffer_done);
		} else {
			if (cqe->res != UBLK_IO_RES_ABORT) {
				SPDK_ERRLOG("ublk received error io: res %d qid %d tag %u cmd_op %u\n",
@@ -1334,11 +1344,33 @@ ublk_info_param_init(struct spdk_ublk_dev *ublk)
	ublk->dev_params = uparams;
}

static void
_ublk_free_dev(void *arg)
{
	struct spdk_ublk_dev *ublk = arg;

	ublk_free_dev(ublk);
}

static void
free_buffers(void *arg)
{
	struct ublk_queue *q = arg;
	uint32_t i;

	for (i = 0; i < q->q_depth; i++) {
		ublk_io_put_buffer(&q->ios[i], &q->poll_group->iobuf_ch);
	}
	free(q->ios);
	q->ios = NULL;
	spdk_thread_send_msg(spdk_thread_get_app_thread(), _ublk_free_dev, q->dev);
}

static void
ublk_free_dev(struct spdk_ublk_dev *ublk)
{
	struct ublk_queue *q;
	uint32_t i, q_idx;
	uint32_t q_idx;

	for (q_idx = 0; q_idx < ublk->num_queues; q_idx++) {
		q = &ublk->queues[q_idx];
@@ -1348,13 +1380,19 @@ ublk_free_dev(struct spdk_ublk_dev *ublk)
			continue;
		}

		for (i = 0; i < q->q_depth; i++) {
			ublk_io_put_buffer(&q->ios[i]);
		}
		free(q->ios);
		q->ios = NULL;
		/* We found a queue that has an ios array that may have buffers
		 * that need to be freed.  Send a message to the queue's thread
		 * so it can free the buffers back to that thread's iobuf channel.
		 * When it's done, it will set q->ios to NULL and send a message
		 * back to this function to continue.
		 */
		spdk_thread_send_msg(q->poll_group->ublk_thread, free_buffers, q);
		return;
	}

	/* All of the buffers associated with the queues have been freed, so now
	 * continue with releasing resources for the rest of the ublk device.
	 */
	if (ublk->bdev_desc) {
		spdk_bdev_close(ublk->bdev_desc);
		ublk->bdev_desc = NULL;