Commit 183f37e8 authored by Jim Harris's avatar Jim Harris Committed by Ben Walker
Browse files

bdev: do not reuse bdev_io when splitting write_zeroes



Now that we split on I/O boundaries, that code needs to
be able to use the bdev_io split* members to track
what is left to submit.  This means that the write_zeroes
code cannot submit the parent bdev_io as the child bdev_io,
since the I/O boundary code will overwrite the write_zeroes
split accounting.

Signed-off-by: default avatarJim Harris <james.r.harris@intel.com>
Change-Id: I9316b59267508f60799766fc4f1ea05a4b3e5d9e

Reviewed-on: https://review.gerrithub.io/423404


Reviewed-by: default avatarSeth Howell <seth.howell5141@gmail.com>
Reviewed-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: default avatarChangpeng Liu <changpeng.liu@intel.com>
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Chandler-Test-Pool: SPDK Automated Test System <sys_sgsw@intel.com>
parent d38b3d28
Loading
Loading
Loading
Loading
+56 −53
Original line number Diff line number Diff line
@@ -259,7 +259,9 @@ struct spdk_bdev_iostat_ctx {
#define __bdev_to_io_dev(bdev)		(((char *)bdev) + 1)
#define __bdev_from_io_dev(io_dev)	((struct spdk_bdev *)(((char *)io_dev) - 1))

static void spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success,
		void *cb_arg);
static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io);

void
spdk_bdev_get_opts(struct spdk_bdev_opts *opts)
@@ -2236,8 +2238,6 @@ spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channe
	struct spdk_bdev *bdev = desc->bdev;
	struct spdk_bdev_io *bdev_io;
	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
	uint64_t len;
	bool split_request = false;

	if (!desc->write) {
		return -EBADF;
@@ -2253,47 +2253,26 @@ spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channe
		return -ENOMEM;
	}

	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
	bdev_io->internal.ch = channel;
	bdev_io->internal.desc = desc;
	bdev_io->u.bdev.offset_blocks = offset_blocks;

	if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
	bdev_io->u.bdev.num_blocks = num_blocks;
		bdev_io->u.bdev.iovs = NULL;
		bdev_io->u.bdev.iovcnt = 0;
	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);

	if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
		spdk_bdev_io_submit(bdev_io);
		return 0;
	} else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) {
		assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);

		len = spdk_bdev_get_block_size(bdev) * num_blocks;

		if (len > ZERO_BUFFER_SIZE) {
			split_request = true;
			len = ZERO_BUFFER_SIZE;
		}

		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
		bdev_io->u.bdev.iovs = &bdev_io->iov;
		bdev_io->u.bdev.iovs[0].iov_base = g_bdev_mgr.zero_buffer;
		bdev_io->u.bdev.iovs[0].iov_len = len;
		bdev_io->u.bdev.iovcnt = 1;
		bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev);
		bdev_io->u.bdev.split_remaining_num_blocks = num_blocks - bdev_io->u.bdev.num_blocks;
		bdev_io->u.bdev.split_current_offset_blocks = offset_blocks + bdev_io->u.bdev.num_blocks;
		bdev_io->u.bdev.split_remaining_num_blocks = num_blocks;
		bdev_io->u.bdev.split_current_offset_blocks = offset_blocks;
		_spdk_bdev_write_zero_buffer_next(bdev_io);
		return 0;
	} else {
		spdk_bdev_free_io(bdev_io);
		return -ENOTSUP;
	}

	if (split_request) {
		bdev_io->u.bdev.stored_user_cb = cb;
		spdk_bdev_io_init(bdev_io, bdev, cb_arg, spdk_bdev_write_zeroes_split);
	} else {
		spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
	}
	spdk_bdev_io_submit(bdev_io);
	return 0;
}

int
@@ -3442,33 +3421,57 @@ spdk_bdev_module_list_find(const char *name)
}

static void
spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
_spdk_bdev_write_zero_buffer_next(void *_bdev_io)
{
	uint64_t len;
	struct spdk_bdev_io *bdev_io = _bdev_io;
	uint64_t num_bytes, num_blocks;
	int rc;

	if (!success) {
		bdev_io->internal.cb = bdev_io->u.bdev.stored_user_cb;
		_spdk_bdev_io_complete(bdev_io);
		return;
	num_bytes = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) *
			     bdev_io->u.bdev.split_remaining_num_blocks,
			     ZERO_BUFFER_SIZE);
	num_blocks = num_bytes / spdk_bdev_get_block_size(bdev_io->bdev);

	rc = spdk_bdev_write_blocks(bdev_io->internal.desc,
				    spdk_io_channel_from_ctx(bdev_io->internal.ch),
				    g_bdev_mgr.zero_buffer,
				    bdev_io->u.bdev.split_current_offset_blocks, num_blocks,
				    _spdk_bdev_write_zero_buffer_done, bdev_io);
	if (rc == 0) {
		bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks;
		bdev_io->u.bdev.split_current_offset_blocks += num_blocks;
	} else if (rc == -ENOMEM) {
		bdev_io->internal.waitq_entry.bdev = bdev_io->bdev;
		bdev_io->internal.waitq_entry.cb_fn = _spdk_bdev_write_zero_buffer_next;
		bdev_io->internal.waitq_entry.cb_arg = bdev_io;
		spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch),
					&bdev_io->internal.waitq_entry);
	} else {
		/* This should never happen. */
		assert(false);
		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
		bdev_io->internal.cb(bdev_io, SPDK_BDEV_IO_STATUS_FAILED, bdev_io->internal.caller_ctx);
	}
}

	/* no need to perform the error checking from write_zeroes_blocks because this request already passed those checks. */
	len = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * bdev_io->u.bdev.split_remaining_num_blocks,
		       ZERO_BUFFER_SIZE);
static void
_spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
{
	struct spdk_bdev_io *parent_io = cb_arg;

	bdev_io->u.bdev.offset_blocks = bdev_io->u.bdev.split_current_offset_blocks;
	bdev_io->u.bdev.iovs[0].iov_len = len;
	bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev_io->bdev);
	bdev_io->u.bdev.split_remaining_num_blocks -= bdev_io->u.bdev.num_blocks;
	bdev_io->u.bdev.split_current_offset_blocks += bdev_io->u.bdev.num_blocks;
	if (!success) {
		parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
		parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_FAILED, parent_io->internal.caller_ctx);
		return;
	}

	/* if this round completes the i/o, change the callback to be the original user callback */
	if (bdev_io->u.bdev.split_remaining_num_blocks == 0) {
		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, bdev_io->u.bdev.stored_user_cb);
	} else {
		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, spdk_bdev_write_zeroes_split);
	if (parent_io->u.bdev.split_remaining_num_blocks == 0) {
		parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
		parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_SUCCESS, parent_io->internal.caller_ctx);
		return;
	}
	spdk_bdev_io_submit(bdev_io);

	_spdk_bdev_write_zero_buffer_next(parent_io);
}

struct set_qos_limit_ctx {