Commit 383b1173 authored by Jim Harris's avatar Jim Harris
Browse files

reduce: implement read and write with data



For write operations, copy data to req->buf and write
to disk.

If chunk already specified in logical map, read the
chunk first into req->buf, and overwrite with data
specified by the write operation.

If chunk not specified in logical map, fill logical
blocks not specified by the write operation with
zeroes.

For read operations, read chunk into req->buf first,
then copy relevant data into the buffers specified
by the read operations.

These operations are all functional, but have room
for future improvement.  For example, this patch
will issue a separate backing read/write operations
for each backing block in the chunk - this could be
optimized to coalesce operations where the backing
blocks are contiguous.

While here, clean up freeing bufspace in one of
the error paths - this needs to be freed using
spdk_dma_free instead.

Signed-off-by: default avatarJim Harris <james.r.harris@intel.com>
Change-Id: I6dbf4fc9a8fdf0f5424b1f1f9178c79891c96d0d

Reviewed-on: https://review.gerrithub.io/434116


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Chandler-Test-Pool: SPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
parent 84f8b633
Loading
Loading
Loading
Loading
+18 −0
Original line number Diff line number Diff line
@@ -171,6 +171,24 @@ void spdk_reduce_vol_unload(struct spdk_reduce_vol *vol,
			    spdk_reduce_vol_op_complete cb_fn,
			    void *cb_arg);

/**
 * Read data from a libreduce compressed volume.
 *
 * This function will only read from logical blocks on the comparessed volume that
 * fall within the same chunk.
 *
 * \param vol Volume to read data.
 * \param iov iovec array describing the data to be read
 * \param iovcnt Number of elements in the iovec array
 * \param offset Offset (in logical blocks) to read the data on the compressed volume
 * \param length Length (in logical blocks) of the data to read
 * \param cb_fn Callback function to signal completion of the readv operation.
 * \param cb_arg Argument to pass to the callback function.
 */
void spdk_reduce_vol_readv(struct spdk_reduce_vol *vol,
			   struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
			   spdk_reduce_vol_op_complete cb_fn, void *cb_arg);

/**
 * Write data to a libreduce compressed volume.
 *
+215 −6
Original line number Diff line number Diff line
@@ -76,9 +76,12 @@ struct spdk_reduce_pm_file {

struct spdk_reduce_vol_request {
	uint8_t					*buf;
	struct iovec				*buf_iov;
	struct iovec				*iov;
	struct spdk_reduce_vol			*vol;
	int					reduce_errno;
	int					iovcnt;
	int					num_backing_ops;
	uint64_t				offset;
	uint64_t				length;
	uint64_t				chunk_map_index;
@@ -86,11 +89,13 @@ struct spdk_reduce_vol_request {
	spdk_reduce_vol_op_complete		cb_fn;
	void					*cb_arg;
	TAILQ_ENTRY(spdk_reduce_vol_request)	tailq;
	struct spdk_reduce_vol_cb_args		backing_cb_args;
};

struct spdk_reduce_vol {
	struct spdk_reduce_vol_params		params;
	uint32_t				backing_io_units_per_chunk;
	uint32_t				backing_lba_per_io_unit;
	uint32_t				logical_blocks_per_chunk;
	struct spdk_reduce_pm_file		pm_file;
	struct spdk_reduce_backing_dev		*backing_dev;
@@ -105,6 +110,7 @@ struct spdk_reduce_vol {
	struct spdk_reduce_vol_request		*request_mem;
	TAILQ_HEAD(, spdk_reduce_vol_request)	requests;
	uint8_t					*bufspace;
	struct iovec				*buf_iov_mem;
};

/*
@@ -278,13 +284,22 @@ _allocate_vol_requests(struct spdk_reduce_vol *vol)

	vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req));
	if (vol->request_mem == NULL) {
		free(vol->bufspace);
		spdk_dma_free(vol->bufspace);
		return -ENOMEM;
	}

	vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS,
				  sizeof(struct iovec) * vol->backing_io_units_per_chunk);
	if (vol->buf_iov_mem == NULL) {
		free(vol->request_mem);
		spdk_dma_free(vol->bufspace);
		return -ENOMEM;
	}

	for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) {
		req = &vol->request_mem[i];
		TAILQ_INSERT_HEAD(&vol->requests, req, tailq);
		req->buf_iov = &vol->buf_iov_mem[i * vol->backing_io_units_per_chunk];
		req->buf = vol->bufspace + i * vol->params.chunk_size;
	}

@@ -304,6 +319,7 @@ _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx
		spdk_bit_array_free(&vol->allocated_chunk_maps);
		spdk_bit_array_free(&vol->allocated_backing_io_units);
		free(vol->request_mem);
		free(vol->buf_iov_mem);
		spdk_dma_free(vol->bufspace);
		free(vol);
	}
@@ -481,6 +497,7 @@ spdk_reduce_vol_init(struct spdk_reduce_vol_params *params,

	vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size;
	vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size;
	vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen;
	memcpy(&vol->params, params, sizeof(*params));

	rc = _allocate_bit_arrays(vol);
@@ -545,6 +562,7 @@ _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno)
	memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params));
	vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size;
	vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size;
	vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen;

	rc = _allocate_bit_arrays(vol);
	if (rc != 0) {
@@ -703,7 +721,16 @@ _write_complete_req(void *_req, int reduce_errno)
	uint32_t i;

	if (reduce_errno != 0) {
		_reduce_vol_complete_req(req, reduce_errno);
		req->reduce_errno = reduce_errno;
	}

	assert(req->num_backing_ops > 0);
	if (--req->num_backing_ops > 0) {
		return;
	}

	if (req->reduce_errno != 0) {
		_reduce_vol_complete_req(req, req->reduce_errno);
		return;
	}

@@ -737,6 +764,30 @@ _write_complete_req(void *_req, int reduce_errno)
	_reduce_vol_complete_req(req, 0);
}

static void
_issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol,
		   reduce_request_fn next_fn, bool is_write)
{
	uint32_t i;

	req->num_backing_ops = vol->backing_io_units_per_chunk;
	req->backing_cb_args.cb_fn = next_fn;
	req->backing_cb_args.cb_arg = req;
	for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
		req->buf_iov[i].iov_base = req->buf + i * vol->params.backing_io_unit_size;
		req->buf_iov[i].iov_len = vol->params.backing_io_unit_size;
		if (is_write) {
			vol->backing_dev->writev(vol->backing_dev, &req->buf_iov[i], 1,
						 req->chunk[i] * vol->backing_lba_per_io_unit,
						 vol->backing_lba_per_io_unit, &req->backing_cb_args);
		} else {
			vol->backing_dev->readv(vol->backing_dev, &req->buf_iov[i], 1,
						req->chunk[i] * vol->backing_lba_per_io_unit,
						vol->backing_lba_per_io_unit, &req->backing_cb_args);
		}
	}
}

static void
_reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
{
@@ -762,26 +813,154 @@ _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn n
		spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk[i]);
	}

	next_fn(req, 0);
	_issue_backing_ops(req, vol, next_fn, true /* write */);
}

static void
_write_read_done(void *_req, int reduce_errno)
{
	struct spdk_reduce_vol_request *req = _req;
	uint64_t chunk_offset;
	uint8_t *buf;
	int i;

	if (reduce_errno != 0) {
		_reduce_vol_complete_req(req, reduce_errno);
		req->reduce_errno = reduce_errno;
	}

	assert(req->num_backing_ops > 0);
	if (--req->num_backing_ops > 0) {
		return;
	}

	if (req->reduce_errno != 0) {
		_reduce_vol_complete_req(req, req->reduce_errno);
		return;
	}

	chunk_offset = req->offset % req->vol->logical_blocks_per_chunk;
	buf = req->buf + chunk_offset * req->vol->params.logical_block_size;
	for (i = 0; i < req->iovcnt; i++) {
		memcpy(buf, req->iov[i].iov_base, req->iov[i].iov_len);
		buf += req->iov[i].iov_len;
	}

	_reduce_vol_write_chunk(req, _write_complete_req);
}

static void
_read_read_done(void *_req, int reduce_errno)
{
	struct spdk_reduce_vol_request *req = _req;
	uint64_t chunk_offset;
	uint8_t *buf;
	int i;

	if (reduce_errno != 0) {
		req->reduce_errno = reduce_errno;
	}

	assert(req->num_backing_ops > 0);
	if (--req->num_backing_ops > 0) {
		return;
	}

	if (req->reduce_errno != 0) {
		_reduce_vol_complete_req(req, req->reduce_errno);
		return;
	}

	chunk_offset = req->offset % req->vol->logical_blocks_per_chunk;
	buf = req->buf + chunk_offset * req->vol->params.logical_block_size;
	for (i = 0; i < req->iovcnt; i++) {
		memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len);
		buf += req->iov[i].iov_len;
	}
	_reduce_vol_complete_req(req, 0);
}

static void
_reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
{
	next_fn(req, 0);
	struct spdk_reduce_vol *vol = req->vol;
	uint64_t chunk;

	chunk = req->offset / vol->logical_blocks_per_chunk;
	req->chunk_map_index = vol->pm_logical_map[chunk];
	assert(req->chunk_map_index != UINT32_MAX);

	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
	_issue_backing_ops(req, vol, next_fn, false /* read */);
}

static bool
_iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt,
		    uint64_t length)
{
	uint64_t size = 0;
	int i;

	for (i = 0; i < iovcnt; i++) {
		size += iov[i].iov_len;
	}

	return size == (length * vol->params.logical_block_size);
}

void
spdk_reduce_vol_readv(struct spdk_reduce_vol *vol,
		      struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
		      spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
{
	struct spdk_reduce_vol_request *req;
	uint64_t chunk;
	int i;

	if (length == 0) {
		cb_fn(cb_arg, 0);
		return;
	}

	if (_request_spans_chunk_boundary(vol, offset, length)) {
		cb_fn(cb_arg, -EINVAL);
		return;
	}

	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
		cb_fn(cb_arg, -EINVAL);
		return;
	}

	chunk = offset / vol->logical_blocks_per_chunk;
	if (vol->pm_logical_map[chunk] == REDUCE_EMPTY_MAP_ENTRY) {
		/*
		 * This chunk hasn't been allocated.  So treat the data as all
		 * zeroes for this chunk - do the memset and immediately complete
		 * the operation.
		 */
		for (i = 0; i < iovcnt; i++) {
			memset(iov[i].iov_base, 0, iov[i].iov_len);
		}
		cb_fn(cb_arg, 0);
		return;
	}

	req = TAILQ_FIRST(&vol->requests);
	if (req == NULL) {
		cb_fn(cb_arg, -ENOMEM);
		return;
	}

	TAILQ_REMOVE(&vol->requests, req, tailq);
	req->vol = vol;
	req->iov = iov;
	req->iovcnt = iovcnt;
	req->offset = offset;
	req->length = length;
	req->cb_fn = cb_fn;
	req->cb_arg = cb_arg;

	_reduce_vol_read_chunk(req, _read_read_done);
}

void
@@ -790,13 +969,26 @@ spdk_reduce_vol_writev(struct spdk_reduce_vol *vol,
		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
{
	struct spdk_reduce_vol_request *req;
	uint64_t chunk;
	uint64_t chunk, chunk_offset;
	uint32_t lbsize, lb_per_chunk;
	int i;
	uint8_t *buf;

	if (length == 0) {
		cb_fn(cb_arg, 0);
		return;
	}

	if (_request_spans_chunk_boundary(vol, offset, length)) {
		cb_fn(cb_arg, -EINVAL);
		return;
	}

	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
		cb_fn(cb_arg, -EINVAL);
		return;
	}

	req = TAILQ_FIRST(&vol->requests);
	if (req == NULL) {
		cb_fn(cb_arg, -ENOMEM);
@@ -822,6 +1014,23 @@ spdk_reduce_vol_writev(struct spdk_reduce_vol *vol,
		return;
	}

	buf = req->buf;
	lbsize = vol->params.logical_block_size;
	lb_per_chunk = vol->logical_blocks_per_chunk;
	/* Note: we must zero out parts of req->buf not specified by this write operation. */
	chunk_offset = offset % lb_per_chunk;
	if (chunk_offset != 0) {
		memset(buf, 0, chunk_offset * lbsize);
		buf += chunk_offset * lbsize;
	}
	for (i = 0; i < iovcnt; i++) {
		memcpy(buf, iov[i].iov_base, iov[i].iov_len);
		buf += iov[i].iov_len;
	}
	chunk_offset += length;
	if (chunk_offset != lb_per_chunk) {
		memset(buf, 0, (lb_per_chunk - chunk_offset) * lbsize);
	}
	_reduce_vol_write_chunk(req, _write_complete_req);
}

+82 −1
Original line number Diff line number Diff line
@@ -489,6 +489,12 @@ write_cb(void *arg, int reduce_errno)
	g_reduce_errno = reduce_errno;
}

static void
read_cb(void *arg, int reduce_errno)
{
	g_reduce_errno = reduce_errno;
}

static void
write_maps(void)
{
@@ -575,6 +581,80 @@ write_maps(void)
	backing_dev_destroy(&backing_dev);
}


static void
read_write(void)
{
	struct spdk_reduce_vol_params params = {};
	struct spdk_reduce_backing_dev backing_dev = {};
	struct iovec iov;
	char buf[16 * 1024]; /* chunk size */
	char compare_buf[16 * 1024];
	uint32_t i;

	params.vol_size = 1024 * 1024; /* 1MB */
	params.chunk_size = 16 * 1024;
	params.backing_io_unit_size = 4096;
	params.logical_block_size = 512;
	spdk_uuid_generate(&params.uuid);

	backing_dev_init(&backing_dev, &params);

	g_vol = NULL;
	g_reduce_errno = -1;
	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
	CU_ASSERT(g_reduce_errno == 0);
	SPDK_CU_ASSERT_FATAL(g_vol != NULL);

	/* Write 0xAA to 2 512-byte logical blocks, starting at LBA 2. */
	memset(buf, 0xAA, 2 * params.logical_block_size);
	iov.iov_base = buf;
	iov.iov_len = 2 * params.logical_block_size;
	g_reduce_errno = -1;
	spdk_reduce_vol_writev(g_vol, &iov, 1, 2, 2, write_cb, NULL);
	CU_ASSERT(g_reduce_errno == 0);

	memset(compare_buf, 0xAA, sizeof(compare_buf));
	for (i = 0; i < params.chunk_size / params.logical_block_size; i++) {
		memset(buf, 0xFF, params.logical_block_size);
		iov.iov_base = buf;
		iov.iov_len = params.logical_block_size;
		g_reduce_errno = -1;
		spdk_reduce_vol_readv(g_vol, &iov, 1, i, 1, read_cb, NULL);
		CU_ASSERT(g_reduce_errno == 0);

		switch (i) {
		case 2:
		case 3:
			CU_ASSERT(memcmp(buf, compare_buf, params.logical_block_size) == 0);
			break;
		default:
			CU_ASSERT(spdk_mem_all_zero(buf, params.logical_block_size));
			break;
		}
	}

	g_reduce_errno = -1;
	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
	CU_ASSERT(g_reduce_errno == 0);

	g_vol = NULL;
	g_reduce_errno = -1;
	spdk_reduce_vol_load(&backing_dev, load_cb, NULL);
	CU_ASSERT(g_reduce_errno == 0);
	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
	CU_ASSERT(g_vol->params.vol_size == params.vol_size);
	CU_ASSERT(g_vol->params.chunk_size == params.chunk_size);
	CU_ASSERT(g_vol->params.backing_io_unit_size == params.backing_io_unit_size);

	g_reduce_errno = -1;
	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
	CU_ASSERT(g_reduce_errno == 0);

	persistent_pm_buf_destroy();
	backing_dev_destroy(&backing_dev);
}

int
main(int argc, char **argv)
{
@@ -598,7 +678,8 @@ main(int argc, char **argv)
		CU_add_test(suite, "init_md", init_md) == NULL ||
		CU_add_test(suite, "init_backing_dev", init_backing_dev) == NULL ||
		CU_add_test(suite, "load", load) == NULL ||
		CU_add_test(suite, "write_maps", write_maps) == NULL
		CU_add_test(suite, "write_maps", write_maps) == NULL ||
		CU_add_test(suite, "read_write", read_write) == NULL
	) {
		CU_cleanup_registry();
		return CU_get_error();