Commit 179ed697 authored by Jim Harris's avatar Jim Harris
Browse files

blob: add readv/writev support



Most of the work here revolves around having to split
an I/O that spans a cluster boundary.  In this case
we need to allocate a separate iov array, and then
issue each sub-I/O serially, copying the relevant
subset of the original iov array.

Signed-off-by: default avatarJim Harris <james.r.harris@intel.com>
Change-Id: I0d46b3f832245900d109ee6c78cc6d49cf96428b

Reviewed-on: https://review.gerrithub.io/374880


Reviewed-by: default avatarDaniel Verkamp <daniel.verkamp@intel.com>
Tested-by: default avatarSPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: default avatarBen Walker <benjamin.walker@intel.com>
parent 26e9b6ea
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -45,6 +45,10 @@ additional clarity when constructing spdk_mempools. Previously, -1 could be
passed and the library would choose a reasonable default, but this new value
makes it explicit that the default is being used.

### Blobstore

spdk_bs_io_readv_blob() and spdk_bs_io_writev_blob() were added to enable
scattered payloads.

## v17.07: Build system improvements, userspace vhost-blk target, and GPT bdev

+20 −1
Original line number Diff line number Diff line
@@ -118,6 +118,16 @@ struct spdk_bs_dev {
		      uint64_t lba, uint32_t lba_count,
		      struct spdk_bs_dev_cb_args *cb_args);

	void (*readv)(struct spdk_bs_dev *dev, struct spdk_io_channel *channel,
		      struct iovec *iov, int iovcnt,
		      uint64_t lba, uint32_t lba_count,
		      struct spdk_bs_dev_cb_args *cb_args);

	void (*writev)(struct spdk_bs_dev *dev, struct spdk_io_channel *channel,
		       struct iovec *iov, int iovcnt,
		       uint64_t lba, uint32_t lba_count,
		       struct spdk_bs_dev_cb_args *cb_args);

	void (*flush)(struct spdk_bs_dev *dev, struct spdk_io_channel *channel,
		      struct spdk_bs_dev_cb_args *cb_args);

@@ -233,12 +243,21 @@ void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *chann
			   void *payload, uint64_t offset, uint64_t length,
			   spdk_blob_op_complete cb_fn, void *cb_arg);


/* Read data from a blob. Offset is in pages from the beginning of the blob. */
void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
			  void *payload, uint64_t offset, uint64_t length,
			  spdk_blob_op_complete cb_fn, void *cb_arg);

/* Write data to a blob. Offset is in pages from the beginning of the blob. */
void spdk_bs_io_writev_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
			    struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
			    spdk_blob_op_complete cb_fn, void *cb_arg);

/* Read data from a blob. Offset is in pages from the beginning of the blob. */
void spdk_bs_io_readv_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
			   struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
			   spdk_blob_op_complete cb_fn, void *cb_arg);

/* Iterate through all blobs */
void spdk_bs_md_iter_first(struct spdk_blob_store *bs,
			   spdk_blob_op_with_handle_complete cb_fn, void *cb_arg);
+2 −0
Original line number Diff line number Diff line
@@ -102,6 +102,8 @@
	}

/* declare wrapper protos (alphabetically please) here */
DECLARE_WRAPPER(calloc, void *, (size_t nmemb, size_t size));

DECLARE_WRAPPER(pthread_mutex_init, int,
		(pthread_mutex_t *mtx, const pthread_mutexattr_t *attr));

+35 −0
Original line number Diff line number Diff line
@@ -103,6 +103,39 @@ bdev_blob_write(struct spdk_bs_dev *dev, struct spdk_io_channel *channel, void *
	}
}


static void
bdev_blob_readv(struct spdk_bs_dev *dev, struct spdk_io_channel *channel,
		struct iovec *iov, int iovcnt,
		uint64_t lba, uint32_t lba_count, struct spdk_bs_dev_cb_args *cb_args)
{
	struct spdk_bdev *bdev = __get_bdev(dev);
	int rc;
	uint32_t block_size = spdk_bdev_get_block_size(bdev);

	rc = spdk_bdev_readv(__get_desc(dev), channel, iov, iovcnt, lba * block_size,
			     lba_count * block_size, bdev_blob_io_complete, cb_args);
	if (rc) {
		cb_args->cb_fn(cb_args->channel, cb_args->cb_arg, rc);
	}
}

static void
bdev_blob_writev(struct spdk_bs_dev *dev, struct spdk_io_channel *channel,
		 struct iovec *iov, int iovcnt,
		 uint64_t lba, uint32_t lba_count, struct spdk_bs_dev_cb_args *cb_args)
{
	struct spdk_bdev *bdev = __get_bdev(dev);
	int rc;
	uint32_t block_size = spdk_bdev_get_block_size(bdev);

	rc = spdk_bdev_writev(__get_desc(dev), channel, iov, iovcnt, lba * block_size,
			      lba_count * block_size, bdev_blob_io_complete, cb_args);
	if (rc) {
		cb_args->cb_fn(cb_args->channel, cb_args->cb_arg, rc);
	}
}

static void
bdev_blob_unmap(struct spdk_bs_dev *dev, struct spdk_io_channel *channel, uint64_t lba,
		uint32_t lba_count, struct spdk_bs_dev_cb_args *cb_args)
@@ -171,6 +204,8 @@ spdk_bdev_create_bs_dev(struct spdk_bdev *bdev)
	b->bs_dev.destroy = bdev_blob_destroy;
	b->bs_dev.read = bdev_blob_read;
	b->bs_dev.write = bdev_blob_write;
	b->bs_dev.readv = bdev_blob_readv;
	b->bs_dev.writev = bdev_blob_writev;
	b->bs_dev.unmap = bdev_blob_unmap;

	return &b->bs_dev;
+177 −0
Original line number Diff line number Diff line
@@ -38,6 +38,7 @@
#include "spdk/queue.h"
#include "spdk/io_channel.h"
#include "spdk/bit_array.h"
#include "spdk/likely.h"

#include "spdk_internal/log.h"

@@ -1062,6 +1063,168 @@ _spdk_blob_request_submit_rw(struct spdk_blob *blob, struct spdk_io_channel *_ch
	spdk_bs_batch_close(batch);
}

struct rw_iov_ctx {
	struct spdk_blob *blob;
	bool read;
	int iovcnt;
	struct iovec *orig_iov;
	uint64_t page_offset;
	uint64_t pages_remaining;
	uint64_t pages_done;
	struct iovec iov[0];
};

static void
_spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
{
	assert(cb_arg == NULL);
	spdk_bs_sequence_finish(seq, bserrno);
}

static void
_spdk_rw_iov_split_next(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
{
	struct rw_iov_ctx *ctx = cb_arg;
	struct iovec *iov, *orig_iov;
	int iovcnt;
	size_t orig_iovoff;
	uint64_t lba;
	uint64_t page_count, pages_to_boundary;
	uint32_t lba_count;
	uint64_t byte_count;

	if (bserrno != 0 || ctx->pages_remaining == 0) {
		free(ctx);
		spdk_bs_sequence_finish(seq, bserrno);
		return;
	}

	pages_to_boundary = _spdk_bs_num_pages_to_cluster_boundary(ctx->blob, ctx->page_offset);
	page_count = spdk_min(ctx->pages_remaining, pages_to_boundary);
	lba = _spdk_bs_blob_page_to_lba(ctx->blob, ctx->page_offset);
	lba_count = _spdk_bs_page_to_lba(ctx->blob->bs, page_count);

	/*
	 * Get index and offset into the original iov array for our current position in the I/O sequence.
	 *  byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will
	 *  point to the current position in the I/O sequence.
	 */
	byte_count = ctx->pages_done * sizeof(struct spdk_blob_md_page);
	orig_iov = &ctx->orig_iov[0];
	orig_iovoff = 0;
	while (byte_count > 0) {
		if (byte_count >= orig_iov->iov_len) {
			byte_count -= orig_iov->iov_len;
			orig_iov++;
		} else {
			orig_iovoff = byte_count;
			byte_count = 0;
		}
	}

	/*
	 * Build an iov array for the next I/O in the sequence.  byte_count will keep track of how many
	 *  bytes of this next I/O remain to be accounted for in the new iov array.
	 */
	byte_count = page_count * sizeof(struct spdk_blob_md_page);
	iov = &ctx->iov[0];
	iovcnt = 0;
	while (byte_count > 0) {
		iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff);
		iov->iov_base = orig_iov->iov_base + orig_iovoff;
		byte_count -= iov->iov_len;
		orig_iovoff = 0;
		orig_iov++;
		iov++;
		iovcnt++;
	}

	ctx->page_offset += page_count;
	ctx->pages_done += page_count;
	ctx->pages_remaining -= page_count;
	iov = &ctx->iov[0];

	if (ctx->read) {
		spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx);
	} else {
		spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx);
	}
}

static void
_spdk_blob_request_submit_rw_iov(struct spdk_blob *blob, struct spdk_io_channel *_channel,
				 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
				 spdk_blob_op_complete cb_fn, void *cb_arg, bool read)
{
	spdk_bs_sequence_t		*seq;
	struct spdk_bs_cpl		cpl;

	assert(blob != NULL);

	if (length == 0) {
		cb_fn(cb_arg, 0);
		return;
	}

	if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
		cb_fn(cb_arg, -EINVAL);
		return;
	}

	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
	cpl.u.blob_basic.cb_fn = cb_fn;
	cpl.u.blob_basic.cb_arg = cb_arg;

	/*
	 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having
	 *  to split a request that spans a cluster boundary.  For I/O that do not span a cluster boundary,
	 *  there will be no noticeable difference compared to using a batch.  For I/O that do span a cluster
	 *  boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need
	 *  to allocate a separate iov array and split the I/O such that none of the resulting
	 *  smaller I/O cross a cluster boundary.  These smaller I/O will be issued in sequence (not in parallel)
	 *  but since this case happens very infrequently, any performance impact will be negligible.
	 *
	 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs
	 *  for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them
	 *  in a batch.  That would also require creating an intermediate spdk_bs_cpl that would get called
	 *  when the batch was completed, to allow for freeing the memory for the iov arrays.
	 */
	seq = spdk_bs_sequence_start(_channel, &cpl);
	if (!seq) {
		cb_fn(cb_arg, -ENOMEM);
		return;
	}

	if (spdk_likely(length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset))) {
		uint64_t lba = _spdk_bs_blob_page_to_lba(blob, offset);
		uint32_t lba_count = _spdk_bs_page_to_lba(blob->bs, length);

		if (read) {
			spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
		} else {
			spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
		}
	} else {
		struct rw_iov_ctx *ctx;

		ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec));
		if (ctx == NULL) {
			spdk_bs_sequence_finish(seq, -ENOMEM);
			return;
		}

		ctx->blob = blob;
		ctx->read = read;
		ctx->orig_iov = iov;
		ctx->iovcnt = iovcnt;
		ctx->page_offset = offset;
		ctx->pages_remaining = length;
		ctx->pages_done = 0;

		_spdk_rw_iov_split_next(seq, ctx, 0);
	}
}

static struct spdk_blob *
_spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid)
{
@@ -2167,6 +2330,20 @@ void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channe
	_spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, true);
}

void spdk_bs_io_writev_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
			    struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
			    spdk_blob_op_complete cb_fn, void *cb_arg)
{
	_spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false);
}

void spdk_bs_io_readv_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
			   struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
			   spdk_blob_op_complete cb_fn, void *cb_arg)
{
	_spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true);
}

struct spdk_bs_iter_ctx {
	int64_t page_num;
	struct spdk_blob_store *bs;
Loading