Commit 422eab71 authored by Pawel Wodkowski's avatar Pawel Wodkowski Committed by Jim Harris
Browse files

bdev: enable IO vector operations



This patch enables vector operation for bdev drivers aio, malloc and
nvme.
The rbd driver still handle only one vector.

Change-Id: I5f401527c2717011ecc21116363bbb722e804112
Signed-off-by: default avatarPawel Wodkowski <pawelx.wodkowski@intel.com>
parent 8646b06a
Loading
Loading
Loading
Loading
+15 −4
Original line number Diff line number Diff line
@@ -189,11 +189,17 @@ struct spdk_bdev_io {
			/** The unaligned rbuf originally allocated. */
			void *buf_unaligned;

			/** For single buffer cases, pointer to the aligned data buffer.  */
			void *buf;
			/** For basic read case, use our own iovec element. */
			struct iovec iov;

			/** For SG buffer cases, array of iovecs to transfer. */
			struct iovec *iovs;

			/** For single buffer cases, size of the data buffer. */
			uint64_t nbytes;
			/** For SG buffer cases, number of iovecs in iovec array. */
			int iovcnt;

			/** For SG buffer cases, total size of data to be transferred. */
			size_t len;

			/** Starting offset (in bytes) of the blockdev for this I/O. */
			uint64_t offset;
@@ -279,6 +285,11 @@ bool spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type
struct spdk_bdev_io *spdk_bdev_read(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
				    void *buf, uint64_t offset, uint64_t nbytes,
				    spdk_bdev_io_completion_cb cb, void *cb_arg);
struct spdk_bdev_io *
spdk_bdev_readv(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
		struct iovec *iov, int iovcnt,
		uint64_t offset, uint64_t nbytes,
		spdk_bdev_io_completion_cb cb, void *cb_arg);
struct spdk_bdev_io *spdk_bdev_write(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
				     void *buf, uint64_t offset, uint64_t nbytes,
				     spdk_bdev_io_completion_cb cb, void *cb_arg);
+16 −14
Original line number Diff line number Diff line
@@ -98,8 +98,9 @@ blockdev_aio_close(struct file_disk *disk)
}

static int64_t
blockdev_aio_read(struct file_disk *fdisk, struct spdk_io_channel *ch,
		  struct blockdev_aio_task *aio_task, void *buf, uint64_t nbytes, uint64_t offset)
blockdev_aio_readv(struct file_disk *fdisk, struct spdk_io_channel *ch,
		   struct blockdev_aio_task *aio_task,
		   struct iovec *iov, int iovcnt, uint64_t nbytes, uint64_t offset)
{
	struct iocb *iocb = &aio_task->iocb;
	struct blockdev_aio_io_channel *aio_ch = spdk_io_channel_get_ctx(ch);
@@ -107,15 +108,15 @@ blockdev_aio_read(struct file_disk *fdisk, struct spdk_io_channel *ch,

	iocb->aio_fildes = fdisk->fd;
	iocb->aio_reqprio = 0;
	iocb->aio_lio_opcode = IO_CMD_PREAD;
	iocb->u.c.buf = buf;
	iocb->u.c.nbytes = nbytes;
	iocb->u.c.offset = offset;
	iocb->aio_lio_opcode = IO_CMD_PREADV;
	iocb->u.v.vec = iov;
	iocb->u.v.nr = iovcnt;
	iocb->u.v.offset = offset;
	iocb->data = aio_task;
	aio_task->len = nbytes;

	SPDK_TRACELOG(SPDK_TRACE_AIO, "read from %p of size %lu to off: %#lx\n",
		      buf, nbytes, offset);
	SPDK_TRACELOG(SPDK_TRACE_AIO, "read %d iovs size %lu to off: %#lx\n",
		      iovcnt, nbytes, offset);

	rc = io_submit(aio_ch->io_ctx, 1, &iocb);
	if (rc < 0) {
@@ -245,12 +246,13 @@ static void blockdev_aio_get_rbuf_cb(struct spdk_bdev_io *bdev_io)
{
	int ret = 0;

	ret = blockdev_aio_read((struct file_disk *)bdev_io->ctx,
	ret = blockdev_aio_readv((struct file_disk *)bdev_io->ctx,
				 bdev_io->ch,
				 (struct blockdev_aio_task *)bdev_io->driver_ctx,
				bdev_io->u.read.buf,
				bdev_io->u.read.nbytes,
				bdev_io->u.read.offset);
				 bdev_io->u.read.iovs,
				 bdev_io->u.read.iovcnt,
				 bdev_io->u.read.offset,
				 bdev_io->u.read.len);

	if (ret < 0) {
		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
+65 −6
Original line number Diff line number Diff line
@@ -114,8 +114,11 @@ spdk_bdev_io_set_rbuf(struct spdk_bdev_io *bdev_io, void *buf)
{
	assert(bdev_io->get_rbuf_cb != NULL);
	assert(buf != NULL);
	assert(bdev_io->u.read.iovs != NULL);

	bdev_io->u.read.buf_unaligned = buf;
	bdev_io->u.read.buf = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
	bdev_io->u.read.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
	bdev_io->u.read.iovs[0].iov_len = bdev_io->u.read.len;
	bdev_io->u.read.put_rbuf = true;
	bdev_io->get_rbuf_cb(bdev_io);
}
@@ -129,7 +132,9 @@ spdk_bdev_io_put_rbuf(struct spdk_bdev_io *bdev_io)
	need_rbuf_tailq_t *tailq;
	uint64_t length;

	length = bdev_io->u.read.nbytes;
	assert(bdev_io->u.read.iovcnt == 1);

	length = bdev_io->u.read.len;
	buf = bdev_io->u.read.buf_unaligned;

	if (length <= SPDK_BDEV_SMALL_RBUF_MAX_SIZE) {
@@ -362,7 +367,7 @@ spdk_bdev_put_io(struct spdk_bdev_io *bdev_io)
static void
_spdk_bdev_io_get_rbuf(struct spdk_bdev_io *bdev_io)
{
	uint64_t len = bdev_io->u.read.nbytes;
	uint64_t len = bdev_io->u.read.len;
	struct rte_mempool *pool;
	need_rbuf_tailq_t *tailq;
	int rc;
@@ -545,9 +550,62 @@ spdk_bdev_read(struct spdk_bdev *bdev, struct spdk_io_channel *ch,

	bdev_io->ch = ch;
	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
	bdev_io->u.read.buf = buf;
	bdev_io->u.read.nbytes = nbytes;
	bdev_io->u.read.iov.iov_base = buf;
	bdev_io->u.read.iov.iov_len = nbytes;
	bdev_io->u.read.iovs = &bdev_io->u.read.iov;
	bdev_io->u.read.iovcnt = 1;
	bdev_io->u.read.len = nbytes;
	bdev_io->u.read.offset = offset;
	bdev_io->u.read.put_rbuf = false;
	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);

	rc = spdk_bdev_io_submit(bdev_io);
	if (rc < 0) {
		spdk_bdev_put_io(bdev_io);
		return NULL;
	}

	return bdev_io;
}

struct spdk_bdev_io *
spdk_bdev_readv(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
		struct iovec *iov, int iovcnt,
		uint64_t offset, uint64_t nbytes,
		spdk_bdev_io_completion_cb cb, void *cb_arg)
{
	struct spdk_bdev_io *bdev_io;
	int rc;

	/* Return failure if nbytes is not a multiple of bdev->blocklen */
	if (nbytes % bdev->blocklen) {
		return NULL;
	}

	/* Return failure if offset + nbytes is less than offset; indicates there
	 * has been an overflow and hence the offset has been wrapped around */
	if ((offset + nbytes) < offset) {
		return NULL;
	}

	/* Return failure if offset + nbytes exceeds the size of the blockdev */
	if ((offset + nbytes) > (bdev->blockcnt * bdev->blocklen)) {
		return NULL;
	}

	bdev_io = spdk_bdev_get_io();
	if (!bdev_io) {
		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
		return NULL;
	}

	bdev_io->ch = ch;
	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
	bdev_io->u.read.iovs = iov;
	bdev_io->u.read.iovcnt = iovcnt;
	bdev_io->u.read.len = nbytes;
	bdev_io->u.read.offset = offset;
	bdev_io->u.read.put_rbuf = false;
	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);

	rc = spdk_bdev_io_submit(bdev_io);
@@ -834,8 +892,9 @@ void
spdk_bdev_io_get_rbuf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_rbuf_cb cb)
{
	assert(cb != NULL);
	assert(bdev_io->u.read.iovs != NULL);

	if (bdev_io->u.read.buf == NULL) {
	if (bdev_io->u.read.iovs[0].iov_base == NULL) {
		bdev_io->get_rbuf_cb = cb;
		_spdk_bdev_io_get_rbuf(bdev_io);
	} else {
+96 −21
Original line number Diff line number Diff line
@@ -70,6 +70,11 @@ malloc_done(void *ref, int status)
	spdk_bdev_io_complete(spdk_bdev_io_from_ctx(cp_task), bdev_status);
}

static void
malloc_iov_done_nop(void *ref, int status)
{
}

static struct malloc_disk *g_malloc_disk_head = NULL;

int malloc_disk_count = 0;
@@ -120,16 +125,58 @@ blockdev_malloc_destruct(struct spdk_bdev *bdev)
	return 0;
}

static int
blockdev_malloc_check_iov_len(struct iovec *iovs, int iovcnt, size_t nbytes)
{
	int i;

	for (i = 0; nbytes && i < iovcnt; i++) {
		if (nbytes < iovs[i].iov_len)
			break;

		nbytes -= iovs[i].iov_len;
	}

	/* IO len must be equal to total len of all buffers */
	return nbytes != 0 || iovcnt != i;
}

static int64_t
blockdev_malloc_read(struct malloc_disk *mdisk, struct spdk_io_channel *ch,
blockdev_malloc_readv(struct malloc_disk *mdisk, struct spdk_io_channel *ch,
		      struct copy_task *copy_req,
		     void *buf, uint64_t nbytes, uint64_t offset)
		      struct iovec *iov, int iovcnt, size_t len, uint64_t offset)
{
	SPDK_TRACELOG(SPDK_TRACE_MALLOC, "read %lu bytes from offset %#lx to %p\n",
		      nbytes, offset, buf);
	int64_t res = 0;
	copy_completion_cb completion_cb = malloc_iov_done_nop;
	void *src = mdisk->malloc_buf + offset;
	int i;

	if (blockdev_malloc_check_iov_len(iov, iovcnt, len))
		return -1;

	return spdk_copy_submit(copy_req, ch, buf, mdisk->malloc_buf + offset,
				nbytes, malloc_done);
	SPDK_TRACELOG(SPDK_TRACE_MALLOC, "read %lu bytes from offset %#lx\n",
		      len, offset);

	for (i = 0; i < iovcnt; i++) {
		/*
		 * The copy engine will complete all copy operations in order, so
		 * use a nop callback for the all iov completions before the last one.
		 * Then when the last iov is completed, we will actually complete the
		 * bdev operation back to the caller.
		 */
		if (len == iov[i].iov_len)
			completion_cb = malloc_done;

		res = spdk_copy_submit(copy_req, ch, iov[i].iov_base,
				       src, iov[i].iov_len, completion_cb);
		if (res)
			break;

		src += iov[i].iov_len;
		len -= iov[i].iov_len;
	}

	return res;
}

static int64_t
@@ -137,14 +184,37 @@ blockdev_malloc_writev(struct malloc_disk *mdisk, struct spdk_io_channel *ch,
		       struct copy_task *copy_req,
		       struct iovec *iov, int iovcnt, size_t len, uint64_t offset)
{
	if ((iovcnt != 1) || (iov->iov_len != len))
	int64_t res = 0;
	copy_completion_cb completion_cb = malloc_iov_done_nop;
	void *dst = mdisk->malloc_buf + offset;
	int i;

	if (blockdev_malloc_check_iov_len(iov, iovcnt, len))
		return -1;

	SPDK_TRACELOG(SPDK_TRACE_MALLOC, "wrote %lu bytes to offset %#lx from %p\n",
		      iov->iov_len, offset, iov->iov_base);
	SPDK_TRACELOG(SPDK_TRACE_MALLOC, "wrote %lu bytes to offset %#lx\n",
		      len, offset);

	for (i = 0; i < iovcnt; i++) {
		/*
		 * The copy engine will complete all copy operations in order, so
		 * use a nop callback for the all iov completions before the last one.
		 * Then when the last iov is completed, we will actually complete the
		 * bdev operation back to the caller.
		 */
		if (len == iov[i].iov_len)
			completion_cb = malloc_done;

		res = spdk_copy_submit(copy_req, ch, dst, iov[i].iov_base,
				       iov[i].iov_len, completion_cb);
		if (res)
			break;

		dst += iov[i].iov_len;
		len -= iov[i].iov_len;
	}

	return spdk_copy_submit(copy_req, ch, mdisk->malloc_buf + offset,
				iov->iov_base, len, malloc_done);
	return res;
}

static int
@@ -198,19 +268,24 @@ static int _blockdev_malloc_submit_request(struct spdk_bdev_io *bdev_io)
{
	switch (bdev_io->type) {
	case SPDK_BDEV_IO_TYPE_READ:
		if (bdev_io->u.read.buf == NULL) {
			bdev_io->u.read.buf = ((struct malloc_disk *)bdev_io->ctx)->malloc_buf +
		if (bdev_io->u.read.iovs[0].iov_base == NULL) {
			assert(bdev_io->u.read.iovcnt == 1);
			bdev_io->u.read.iovs[0].iov_base =
				((struct malloc_disk *)bdev_io->ctx)->malloc_buf +
				bdev_io->u.read.offset;
			bdev_io->u.read.iovs[0].iov_len = bdev_io->u.read.len;
			bdev_io->u.read.put_rbuf = false;
			spdk_bdev_io_complete(spdk_bdev_io_from_ctx(bdev_io->driver_ctx),
					      SPDK_BDEV_IO_STATUS_SUCCESS);
			return 0;
		}

		return blockdev_malloc_read((struct malloc_disk *)bdev_io->ctx,
		return blockdev_malloc_readv((struct malloc_disk *)bdev_io->ctx,
					     bdev_io->ch,
					     (struct copy_task *)bdev_io->driver_ctx,
					    bdev_io->u.read.buf,
					    bdev_io->u.read.nbytes,
					     bdev_io->u.read.iovs,
					     bdev_io->u.read.iovcnt,
					     bdev_io->u.read.len,
					     bdev_io->u.read.offset);

	case SPDK_BDEV_IO_TYPE_WRITE:
+87 −29
Original line number Diff line number Diff line
@@ -90,7 +90,17 @@ struct nvme_io_channel {

#define NVME_DEFAULT_MAX_UNMAP_BDESC_COUNT	1
struct nvme_blockio {
	int	reserved;
	/** array of iovecs to transfer. */
	struct iovec *iovs;

	/** Number of iovecs in iovs array. */
	int iovcnt;

	/** Current iovec position. */
	int iovpos;

	/** Offset in current iovec. */
	uint32_t iov_offset;
};

enum data_direction {
@@ -122,9 +132,10 @@ static void nvme_ctrlr_initialize_blockdevs(struct spdk_nvme_ctrlr *ctrlr,
		int bdev_per_ns, int ctrlr_id);
static int nvme_library_init(void);
static void nvme_library_fini(void);
int nvme_queue_cmd(struct nvme_blockdev *bdev, struct spdk_nvme_qpair *qpair,
static int nvme_queue_cmd(struct nvme_blockdev *bdev, struct spdk_nvme_qpair *qpair,
			  struct nvme_blockio *bio,
		   int direction, void *buf, uint64_t nbytes, uint64_t offset);
			  int direction, struct iovec *iov, int iovcnt, uint64_t nbytes,
			  uint64_t offset);

static int
nvme_get_ctx_size(void)
@@ -136,17 +147,18 @@ SPDK_BDEV_MODULE_REGISTER(nvme_library_init, NULL, blockdev_nvme_get_spdk_runnin
			  nvme_get_ctx_size)

static int64_t
blockdev_nvme_read(struct nvme_blockdev *nbdev, struct spdk_io_channel *ch,
blockdev_nvme_readv(struct nvme_blockdev *nbdev, struct spdk_io_channel *ch,
		    struct nvme_blockio *bio,
		   void *buf, uint64_t nbytes, uint64_t offset)
		    struct iovec *iov, int iovcnt, uint64_t nbytes, uint64_t offset)
{
	struct nvme_io_channel *nvme_ch = spdk_io_channel_get_ctx(ch);
	int64_t rc;

	SPDK_TRACELOG(SPDK_TRACE_BDEV_NVME, "read %lu bytes with offset %#lx to %p\n",
		      nbytes, offset, buf);
	SPDK_TRACELOG(SPDK_TRACE_BDEV_NVME, "read %lu bytes with offset %#lx\n",
		      nbytes, offset);

	rc = nvme_queue_cmd(nbdev, nvme_ch->qpair, bio, BDEV_DISK_READ, buf, nbytes, offset);
	rc = nvme_queue_cmd(nbdev, nvme_ch->qpair, bio, BDEV_DISK_READ,
			    iov, iovcnt, nbytes, offset);
	if (rc < 0)
		return -1;

@@ -161,18 +173,15 @@ blockdev_nvme_writev(struct nvme_blockdev *nbdev, struct spdk_io_channel *ch,
	struct nvme_io_channel *nvme_ch = spdk_io_channel_get_ctx(ch);
	int64_t rc;

	if ((iovcnt != 1) || (iov->iov_len != len))
		return -1;
	SPDK_TRACELOG(SPDK_TRACE_BDEV_NVME, "write %lu bytes with offset %#lx\n",
		      len, offset);

	SPDK_TRACELOG(SPDK_TRACE_BDEV_NVME, "write %lu bytes with offset %#lx from %p\n",
		      iov->iov_len, offset, iov->iov_base);

	rc = nvme_queue_cmd(nbdev, nvme_ch->qpair, bio, BDEV_DISK_WRITE, (void *)iov->iov_base,
			    iov->iov_len, offset);
	rc = nvme_queue_cmd(nbdev, nvme_ch->qpair, bio, BDEV_DISK_WRITE,
			    iov, iovcnt, len, offset);
	if (rc < 0)
		return -1;

	return iov->iov_len;
	return len;
}

static void
@@ -224,11 +233,12 @@ static void blockdev_nvme_get_rbuf_cb(struct spdk_bdev_io *bdev_io)
{
	int ret;

	ret = blockdev_nvme_read((struct nvme_blockdev *)bdev_io->ctx,
	ret = blockdev_nvme_readv((struct nvme_blockdev *)bdev_io->ctx,
				  bdev_io->ch,
				  (struct nvme_blockio *)bdev_io->driver_ctx,
				 bdev_io->u.read.buf,
				 bdev_io->u.read.nbytes,
				  bdev_io->u.read.iovs,
				  bdev_io->u.read.iovcnt,
				  bdev_io->u.read.len,
				  bdev_io->u.read.offset);

	if (ret < 0) {
@@ -609,10 +619,51 @@ queued_done(void *ref, const struct spdk_nvme_cpl *cpl)
	spdk_bdev_io_complete(spdk_bdev_io_from_ctx(bio), status);
}

static void
queued_reset_sgl(void *ref, uint32_t sgl_offset)
{
	struct nvme_blockio *bio = ref;
	struct iovec *iov;

	bio->iov_offset = sgl_offset;
	for (bio->iovpos = 0; bio->iovpos < bio->iovcnt; bio->iovpos++) {
		iov = &bio->iovs[bio->iovpos];
		if (bio->iov_offset < iov->iov_len)
			break;

		bio->iov_offset -= iov->iov_len;
	}
}

static int
queued_next_sge(void *ref, uint64_t *address, uint32_t *length)
{
	struct nvme_blockio *bio = ref;
	struct iovec *iov;

	assert(bio->iovpos < bio->iovcnt);

	iov = &bio->iovs[bio->iovpos];
	bio->iovpos++;

	*address = spdk_vtophys(iov->iov_base);
	*length = iov->iov_len;

	if (bio->iov_offset) {
		assert(bio->iov_offset <= iov->iov_len);
		*address += bio->iov_offset;
		*length -= bio->iov_offset;
		bio->iov_offset = 0;
	}

	return 0;
}

int
nvme_queue_cmd(struct nvme_blockdev *bdev, struct spdk_nvme_qpair *qpair,
	       struct nvme_blockio *bio,
	       int direction, void *buf, uint64_t nbytes, uint64_t offset)
	       int direction, struct iovec *iov, int iovcnt, uint64_t nbytes,
	       uint64_t offset)
{
	uint32_t ss = spdk_nvme_ns_get_sector_size(bdev->ns);
	uint32_t lba_count;
@@ -628,12 +679,19 @@ nvme_queue_cmd(struct nvme_blockdev *bdev, struct spdk_nvme_qpair *qpair,

	lba_count = nbytes / ss;

	bio->iovs = iov;
	bio->iovcnt = iovcnt;
	bio->iovpos = 0;
	bio->iov_offset = 0;

	if (direction == BDEV_DISK_READ) {
		rc = spdk_nvme_ns_cmd_read(bdev->ns, qpair, buf, next_lba,
					   lba_count, queued_done, bio, 0);
		rc = spdk_nvme_ns_cmd_readv(bdev->ns, qpair, next_lba,
					    lba_count, queued_done, bio, 0,
					    queued_reset_sgl, queued_next_sge);
	} else {
		rc = spdk_nvme_ns_cmd_write(bdev->ns, qpair, buf, next_lba,
					    lba_count, queued_done, bio, 0);
		rc = spdk_nvme_ns_cmd_writev(bdev->ns, qpair, next_lba,
					     lba_count, queued_done, bio, 0,
					     queued_reset_sgl, queued_next_sge);
	}

	if (rc != 0) {
Loading