Commit f536eb58 authored by Jim Harris's avatar Jim Harris
Browse files

blob: turn non-iov splitting into additional blob calls



For I/O that do not span a cluster boundary, just issue
a single batch command to underlying block device.

For I/O that do span a cluster boundary, issue a batch
command for each against the blob (not the block device)
for each cluster accessed by the I/O.

This is all in preparation for upcoming patches which
enable thin provisioning and hence cluster allocation
in the I/O path.  It will simplify implementation of
the cluster allocation path since now that code only
needs to be concerned with a single allocation at once.

Splitting for readv/writev will be handled in a
later patch.

Signed-off-by: default avatarJim Harris <james.r.harris@intel.com>
Change-Id: Ia2341abbda599dace3357c4eec06ab6602ef81a8

Reviewed-on: https://review.gerrithub.io/395027


Tested-by: default avatarSPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-by: default avatarMaciej Szwed <maciej.szwed@intel.com>
Reviewed-by: default avatarTomasz Zawadzki <tomasz.zawadzki@intel.com>
Reviewed-by: default avatarDaniel Verkamp <daniel.verkamp@intel.com>
parent 96d11441
Loading
Loading
Loading
Loading
+50 −22
Original line number Diff line number Diff line
@@ -1194,47 +1194,70 @@ _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob_data *blob,
}

static void
_spdk_blob_request_submit_op_impl(spdk_bs_batch_t *batch, struct spdk_blob_data *blob,
_spdk_blob_request_submit_op_split(spdk_bs_batch_t *batch, struct spdk_blob *_blob,
				   void *payload, uint64_t offset, uint64_t length,
				   spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
{
	uint64_t			lba;
	uint32_t			lba_count;
	struct spdk_blob_data	*blob = __blob_to_data(_blob);
	uint64_t		op_length;
	uint8_t			*buf;
	uint64_t			page;

	assert(blob != NULL);

	length = _spdk_bs_page_to_lba(blob->bs, length);
	page = offset;
	buf = payload;
	while (length > 0) {
		lba = _spdk_bs_blob_page_to_lba(blob, page);
		lba_count = spdk_min(length,
				     _spdk_bs_page_to_lba(blob->bs,
						     _spdk_bs_num_pages_to_cluster_boundary(blob, page)));
		op_length = spdk_min(length, _spdk_bs_num_pages_to_cluster_boundary(blob, offset));

		switch (op_type) {
		case SPDK_BLOB_READ:
			spdk_bs_batch_read_dev(batch, buf, lba, lba_count);
			spdk_bs_batch_read_blob(batch, _blob, buf, offset, op_length);
			break;
		case SPDK_BLOB_WRITE:
			spdk_bs_batch_write_dev(batch, buf, lba, lba_count);
			spdk_bs_batch_write_blob(batch, _blob, buf, offset, op_length);
			break;
		case SPDK_BLOB_UNMAP:
			spdk_bs_batch_unmap_dev(batch, lba, lba_count);
			spdk_bs_batch_unmap_blob(batch, _blob, offset, op_length);
			break;
		case SPDK_BLOB_WRITE_ZEROES:
			spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
			spdk_bs_batch_write_zeroes_blob(batch, _blob, offset, op_length);
			break;
		}

		length -= lba_count;
		page += _spdk_bs_lba_to_page(blob->bs, lba_count);
		length -= op_length;
		offset += op_length;
		if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) {
			buf += _spdk_bs_lba_to_byte(blob->bs, lba_count);
			buf += op_length * SPDK_BS_PAGE_SIZE;
		}
	}
}

static void
_spdk_blob_request_submit_op_single(spdk_bs_batch_t *batch, struct spdk_blob_data *blob,
				    void *payload, uint64_t offset, uint64_t length,
				    spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
{
	uint64_t lba;
	uint32_t lba_count;

	assert(blob != NULL);

	lba = _spdk_bs_blob_page_to_lba(blob, offset);
	lba_count = _spdk_bs_page_to_lba(blob->bs, length);

	switch (op_type) {
	case SPDK_BLOB_READ:
		spdk_bs_batch_read_dev(batch, payload, lba, lba_count);
		break;
	case SPDK_BLOB_WRITE:
		spdk_bs_batch_write_dev(batch, payload, lba, lba_count);
		break;
	case SPDK_BLOB_UNMAP:
		spdk_bs_batch_unmap_dev(batch, lba, lba_count);
		break;
	case SPDK_BLOB_WRITE_ZEROES:
		spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
		break;
	}
}

static void
@@ -1268,8 +1291,13 @@ _spdk_blob_request_submit_op(struct spdk_blob *_blob, struct spdk_io_channel *_c
		return;
	}

	_spdk_blob_request_submit_op_impl(batch, blob, payload, offset, length,
	if (length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset)) {
		_spdk_blob_request_submit_op_single(batch, blob, payload, offset, length,
						    cb_fn, cb_arg, op_type);
	} else {
		_spdk_blob_request_submit_op_split(batch, _blob, payload, offset, length,
						   cb_fn, cb_arg, op_type);
	}

	spdk_bs_batch_close(batch);
}