Commit 29e8013d authored by Tomasz Zawadzki's avatar Tomasz Zawadzki Committed by Daniel Verkamp
Browse files

blobstore: process split operation sequentially



Previously when operation for blobstore was spanning multiple
clusters, it was split into multiple operation processed in a batch.

This made it possible to max out channel ops when using large enough
operation. It was encountered when issuing unmap for whole device.

Now single large operation is processed sequentially, making
it take at most one ops from channel.

Fixes #251
Fixes #250

Change-Id: I132309877ba3b2d7217332daf1025fb5f9ee74d0
Signed-off-by: default avatarTomasz Zawadzki <tomasz.zawadzki@intel.com>
Reviewed-on: https://review.gerrithub.io/403306


Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Tested-by: default avatarSPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: default avatarDaniel Verkamp <daniel.verkamp@intel.com>
parent a8d3ac0c
Loading
Loading
Loading
Loading
+83 −36
Original line number Diff line number Diff line
@@ -1549,59 +1549,106 @@ _spdk_blob_calculate_lba_and_lba_count(struct spdk_blob *blob, uint64_t page, ui
	}
}

struct op_split_ctx {
	struct spdk_blob *blob;
	struct spdk_io_channel *channel;
	uint64_t page_offset;
	uint64_t pages_remaining;
	void *curr_payload;
	enum spdk_blob_op_type op_type;
	spdk_bs_sequence_t *seq;
};

static void
_spdk_blob_request_submit_op_split(struct spdk_io_channel *ch, struct spdk_blob *blob,
				   void *payload, uint64_t offset, uint64_t length,
				   spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
_spdk_blob_request_submit_op_split_next(void *cb_arg, int bserrno)
{
	spdk_bs_batch_t		*batch;
	struct spdk_bs_cpl	cpl;
	struct op_split_ctx	*ctx = cb_arg;
	struct spdk_blob	*blob = ctx->blob;
	struct spdk_io_channel	*ch = ctx->channel;
	enum spdk_blob_op_type	op_type = ctx->op_type;
	uint8_t			*buf = ctx->curr_payload;
	uint64_t		offset = ctx->page_offset;
	uint64_t		length = ctx->pages_remaining;
	uint64_t		op_length;
	uint8_t			*buf;

	assert(blob != NULL);

	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
	cpl.u.blob_basic.cb_fn = cb_fn;
	cpl.u.blob_basic.cb_arg = cb_arg;

	batch = spdk_bs_batch_open(ch, &cpl);
	if (!batch) {
		cb_fn(cb_arg, -ENOMEM);
	if (bserrno != 0 || ctx->pages_remaining == 0) {
		spdk_bs_sequence_finish(ctx->seq, bserrno);
		free(ctx);
		return;
	}

	buf = payload;
	while (length > 0) {
	op_length = spdk_min(length, _spdk_bs_num_pages_to_cluster_boundary(blob, offset));

	/* Update length and payload for next operation */
	ctx->pages_remaining -= op_length;
	ctx->page_offset += op_length;
	if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) {
		ctx->curr_payload += op_length;
	}

	switch (op_type) {
	case SPDK_BLOB_READ:
			spdk_bs_batch_read_blob(batch, blob, buf, offset, op_length);
		spdk_blob_io_read(blob, ch, buf, offset, op_length,
				  _spdk_blob_request_submit_op_split_next, ctx);
		break;
	case SPDK_BLOB_WRITE:
			spdk_bs_batch_write_blob(batch, blob, buf, offset, op_length);
		spdk_blob_io_write(blob, ch, buf, offset, op_length,
				   _spdk_blob_request_submit_op_split_next, ctx);
		break;
	case SPDK_BLOB_UNMAP:
			spdk_bs_batch_unmap_blob(batch, blob, offset, op_length);
		spdk_blob_io_unmap(blob, ch, offset, op_length,
				   _spdk_blob_request_submit_op_split_next, ctx);
		break;
	case SPDK_BLOB_WRITE_ZEROES:
			spdk_bs_batch_write_zeroes_blob(batch, blob, offset, op_length);
		spdk_blob_io_write_zeroes(blob, ch, offset, op_length,
					  _spdk_blob_request_submit_op_split_next, ctx);
		break;
	case SPDK_BLOB_READV:
	case SPDK_BLOB_WRITEV:
		SPDK_ERRLOG("readv/write not valid for %s\n", __func__);
		spdk_bs_sequence_finish(ctx->seq, -EINVAL);
		free(ctx);
		break;
	}
}

		length -= op_length;
		offset += op_length;
		if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) {
			buf += op_length * SPDK_BS_PAGE_SIZE;
static void
_spdk_blob_request_submit_op_split(struct spdk_io_channel *ch, struct spdk_blob *blob,
				   void *payload, uint64_t offset, uint64_t length,
				   spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
{
	struct op_split_ctx *ctx;
	spdk_bs_sequence_t *seq;
	struct spdk_bs_cpl cpl;

	assert(blob != NULL);

	ctx = calloc(1, sizeof(struct op_split_ctx));
	if (ctx == NULL) {
		cb_fn(cb_arg, -ENOMEM);
		return;
	}

	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
	cpl.u.blob_basic.cb_fn = cb_fn;
	cpl.u.blob_basic.cb_arg = cb_arg;

	seq = spdk_bs_sequence_start(ch, &cpl);
	if (!seq) {
		free(ctx);
		cb_fn(cb_arg, -ENOMEM);
		return;
	}

	spdk_bs_batch_close(batch);
	ctx->blob = blob;
	ctx->channel = ch;
	ctx->curr_payload = payload;
	ctx->page_offset = offset;
	ctx->pages_remaining = length;
	ctx->op_type = op_type;
	ctx->seq = seq;

	_spdk_blob_request_submit_op_split_next(ctx, 0);
}

static void