Commit 309f7791 authored by Jim Harris's avatar Jim Harris
Browse files

reduce: plumb basic compress/decompress callbacks



The unit tests don't really try to compress anything
yet, but this at least gets the pipeline in place.

Signed-off-by: default avatarJim Harris <james.r.harris@intel.com>
Change-Id: Ic413850b30e4d9631f3ece2bab40d9026225e5b2

Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/449097


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarChangpeng Liu <changpeng.liu@intel.com>
Reviewed-by: default avatarPaul Luse <paul.e.luse@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
parent 72da9b75
Loading
Loading
Loading
Loading
+22 −0
Original line number Diff line number Diff line
@@ -88,6 +88,18 @@ typedef void (*spdk_reduce_vol_op_with_handle_complete)(void *ctx,
		struct spdk_reduce_vol *vol,
		int reduce_errno);

/**
 * Defines function type for callback functions called when backing_dev
 *  operations are complete.
 *
 * \param cb_arg Callback argument
 * \param reduce_errno Completion status of backing_dev operation
 *		       Negative values indicate negated errno value
 *		       0 indicates successful readv/writev/unmap operation
 *		       Positive value indicates successful compress/decompress
 *		       operations; number indicates number of bytes written to
 *		       destination iovs
 */
typedef void (*spdk_reduce_dev_cpl)(void *cb_arg, int reduce_errno);

struct spdk_reduce_vol_cb_args {
@@ -105,6 +117,16 @@ struct spdk_reduce_backing_dev {
	void (*unmap)(struct spdk_reduce_backing_dev *dev,
		      uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args);

	void (*compress)(struct spdk_reduce_backing_dev *dev,
			 struct iovec *src_iov, int src_iovcnt,
			 struct iovec *dst_iov, int dst_iovcnt,
			 struct spdk_reduce_vol_cb_args *args);

	void (*decompress)(struct spdk_reduce_backing_dev *dev,
			   struct iovec *src_iov, int src_iovcnt,
			   struct iovec *dst_iov, int dst_iovcnt,
			   struct spdk_reduce_vol_cb_args *args);

	uint64_t	blockcnt;
	uint32_t	blocklen;
};
+109 −20
Original line number Diff line number Diff line
@@ -950,14 +950,14 @@ _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *
	req->backing_cb_args.cb_fn = next_fn;
	req->backing_cb_args.cb_arg = req;
	for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
		req->decomp_buf_iov[i].iov_base = req->decomp_buf + i * vol->params.backing_io_unit_size;
		req->decomp_buf_iov[i].iov_len = vol->params.backing_io_unit_size;
		req->comp_buf_iov[i].iov_base = req->comp_buf + i * vol->params.backing_io_unit_size;
		req->comp_buf_iov[i].iov_len = vol->params.backing_io_unit_size;
		if (is_write) {
			vol->backing_dev->writev(vol->backing_dev, &req->decomp_buf_iov[i], 1,
			vol->backing_dev->writev(vol->backing_dev, &req->comp_buf_iov[i], 1,
						 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit,
						 vol->backing_lba_per_io_unit, &req->backing_cb_args);
		} else {
			vol->backing_dev->readv(vol->backing_dev, &req->decomp_buf_iov[i], 1,
			vol->backing_dev->readv(vol->backing_dev, &req->comp_buf_iov[i], 1,
						req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit,
						vol->backing_lba_per_io_unit, &req->backing_cb_args);
		}
@@ -965,7 +965,8 @@ _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *
}

static void
_reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
_reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn,
			uint32_t compressed_size)
{
	struct spdk_reduce_vol *vol = req->vol;
	uint32_t i;
@@ -979,7 +980,7 @@ _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn n
	spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index);

	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
	req->chunk->compressed_size = vol->params.chunk_size;
	req->chunk->compressed_size = compressed_size;

	for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
		req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0);
@@ -993,14 +994,93 @@ _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn n
	_issue_backing_ops(req, vol, next_fn, true /* write */);
}


static void
_write_read_done(void *_req, int reduce_errno)
_write_compress_done(void *_req, int reduce_errno)
{
	struct spdk_reduce_vol_request *req = _req;

	/* Negative reduce_errno indicates failure for compression operations. */
	if (reduce_errno < 0) {
		_reduce_vol_complete_req(req, reduce_errno);
		return;
	}

	/* Positive reduce_errno indicates number of bytes in compressed buffer. */
	_reduce_vol_write_chunk(req, _write_write_done, (uint32_t)reduce_errno);
}

static void
_reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
{
	struct spdk_reduce_vol *vol = req->vol;

	req->backing_cb_args.cb_fn = next_fn;
	req->backing_cb_args.cb_arg = req;
	req->comp_buf_iov[0].iov_base = req->comp_buf;
	req->comp_buf_iov[0].iov_len = vol->params.chunk_size;
	req->decomp_buf_iov[0].iov_base = req->decomp_buf;
	req->decomp_buf_iov[0].iov_len = vol->params.chunk_size;
	vol->backing_dev->compress(vol->backing_dev,
				   req->decomp_buf_iov, 1, req->comp_buf_iov, 1,
				   &req->backing_cb_args);
}

static void
_reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
{
	struct spdk_reduce_vol *vol = req->vol;

	req->backing_cb_args.cb_fn = next_fn;
	req->backing_cb_args.cb_arg = req;
	req->comp_buf_iov[0].iov_base = req->comp_buf;
	req->comp_buf_iov[0].iov_len = vol->params.chunk_size;
	req->decomp_buf_iov[0].iov_base = req->decomp_buf;
	req->decomp_buf_iov[0].iov_len = vol->params.chunk_size;
	vol->backing_dev->decompress(vol->backing_dev,
				     req->comp_buf_iov, 1, req->decomp_buf_iov, 1,
				     &req->backing_cb_args);
}

static void
_write_decompress_done(void *_req, int reduce_errno)
{
	struct spdk_reduce_vol_request *req = _req;
	struct spdk_reduce_vol *vol = req->vol;
	uint64_t chunk_offset;
	uint8_t *buf;
	int i;

	/* Negative reduce_errno indicates failure for compression operations. */
	if (reduce_errno < 0) {
		_reduce_vol_complete_req(req, reduce_errno);
		return;
	}

	/* Positive reduce_errno indicates number of bytes in decompressed
	 *  buffer.  This should equal the chunk size - otherwise that's another
	 *  type of failure.
	 */
	if ((uint32_t)reduce_errno != vol->params.chunk_size) {
		_reduce_vol_complete_req(req, -EIO);
		return;
	}

	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
	buf = req->decomp_buf + chunk_offset * vol->params.logical_block_size;
	for (i = 0; i < req->iovcnt; i++) {
		memcpy(buf, req->iov[i].iov_base, req->iov[i].iov_len);
		buf += req->iov[i].iov_len;
	}

	_reduce_vol_compress_chunk(req, _write_compress_done);
}

static void
_write_read_done(void *_req, int reduce_errno)
{
	struct spdk_reduce_vol_request *req = _req;

	if (reduce_errno != 0) {
		req->reduce_errno = reduce_errno;
	}
@@ -1015,26 +1095,35 @@ _write_read_done(void *_req, int reduce_errno)
		return;
	}

	chunk_offset = req->offset % req->vol->logical_blocks_per_chunk;
	buf = req->decomp_buf + chunk_offset * req->vol->params.logical_block_size;
	for (i = 0; i < req->iovcnt; i++) {
		memcpy(buf, req->iov[i].iov_base, req->iov[i].iov_len);
		buf += req->iov[i].iov_len;
	}

	_reduce_vol_write_chunk(req, _write_write_done);
	_reduce_vol_decompress_chunk(req, _write_decompress_done);
}

static void
_read_complete_req(void *_req, int reduce_errno)
_read_decompress_done(void *_req, int reduce_errno)
{
	struct spdk_reduce_vol_request *req = _req;
	struct spdk_reduce_vol *vol = req->vol;
	uint64_t chunk_offset;
	uint8_t *buf;
	int i;

	chunk_offset = req->offset % req->vol->logical_blocks_per_chunk;
	buf = req->decomp_buf + chunk_offset * req->vol->params.logical_block_size;
	/* Negative reduce_errno indicates failure for compression operations. */
	if (reduce_errno < 0) {
		_reduce_vol_complete_req(req, reduce_errno);
		return;
	}

	/* Positive reduce_errno indicates number of bytes in decompressed
	 *  buffer.  This should equal the chunk size - otherwise that's another
	 *  type of failure.
	 */
	if ((uint32_t)reduce_errno != vol->params.chunk_size) {
		_reduce_vol_complete_req(req, -EIO);
		return;
	}

	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
	buf = req->decomp_buf + chunk_offset * vol->params.logical_block_size;
	for (i = 0; i < req->iovcnt; i++) {
		memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len);
		buf += req->iov[i].iov_len;
@@ -1061,7 +1150,7 @@ _read_read_done(void *_req, int reduce_errno)
		return;
	}

	_read_complete_req(req, 0);
	_reduce_vol_decompress_chunk(req, _read_decompress_done);
}

static void
@@ -1213,7 +1302,7 @@ _start_writev_request(struct spdk_reduce_vol_request *req)
	if (chunk_offset != lb_per_chunk) {
		memset(buf, 0, (lb_per_chunk - chunk_offset) * lbsize);
	}
	_reduce_vol_write_chunk(req, _write_write_done);
	_reduce_vol_compress_chunk(req, _write_compress_done);
}

void
+28 −0
Original line number Diff line number Diff line
@@ -389,6 +389,32 @@ backing_dev_io_execute(uint32_t count)
	}
}

static void
backing_dev_compress(struct spdk_reduce_backing_dev *backing_dev,
		     struct iovec *src_iov, int src_iovcnt,
		     struct iovec *dst_iov, int dst_iovcnt,
		     struct spdk_reduce_vol_cb_args *args)
{
	CU_ASSERT(src_iovcnt == 1);
	CU_ASSERT(dst_iovcnt == 1);
	CU_ASSERT(src_iov[0].iov_len == dst_iov[0].iov_len);
	memcpy(dst_iov[0].iov_base, src_iov[0].iov_base, src_iov[0].iov_len);
	args->cb_fn(args->cb_arg, src_iov[0].iov_len);
}

static void
backing_dev_decompress(struct spdk_reduce_backing_dev *backing_dev,
		       struct iovec *src_iov, int src_iovcnt,
		       struct iovec *dst_iov, int dst_iovcnt,
		       struct spdk_reduce_vol_cb_args *args)
{
	CU_ASSERT(src_iovcnt == 1);
	CU_ASSERT(dst_iovcnt == 1);
	CU_ASSERT(src_iov[0].iov_len == dst_iov[0].iov_len);
	memcpy(dst_iov[0].iov_base, src_iov[0].iov_base, src_iov[0].iov_len);
	args->cb_fn(args->cb_arg, src_iov[0].iov_len);
}

static void
backing_dev_destroy(struct spdk_reduce_backing_dev *backing_dev)
{
@@ -411,6 +437,8 @@ backing_dev_init(struct spdk_reduce_backing_dev *backing_dev, struct spdk_reduce
	backing_dev->readv = backing_dev_readv;
	backing_dev->writev = backing_dev_writev;
	backing_dev->unmap = backing_dev_unmap;
	backing_dev->compress = backing_dev_compress;
	backing_dev->decompress = backing_dev_decompress;

	g_backing_dev_buf = calloc(1, size);
	SPDK_CU_ASSERT_FATAL(g_backing_dev_buf != NULL);