Commit 84f8b633 authored by Jim Harris's avatar Jim Harris
Browse files

reduce: start implementation of spdk_reduce_vol_writev



Focus of this patch is adding the foundations of manipulating
the chunk_map and backing_block bit arrays and persisting
chunk maps and the logical map to the pm_file.

No data is writting the backing device yet.  That will come
in later patches.  This also does not rebuild the bit arrays
from the pm_file during spdk_reduce_vol_load - that will also come
in a later patch.

Signed-off-by: default avatarJim Harris <james.r.harris@intel.com>
Change-Id: Ib5336dbe907c253e545704471de8c4e812bbc157

Reviewed-on: https://review.gerrithub.io/434115


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Chandler-Test-Pool: SPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: default avatarChangpeng Liu <changpeng.liu@intel.com>
parent 3213962e
Loading
Loading
Loading
Loading
+18 −0
Original line number Diff line number Diff line
@@ -171,4 +171,22 @@ void spdk_reduce_vol_unload(struct spdk_reduce_vol *vol,
			    spdk_reduce_vol_op_complete cb_fn,
			    void *cb_arg);

/**
 * Write data to a libreduce compressed volume.
 *
 * This function will only write to logical blocks on the comparessed volume that
 * fall within the same chunk.
 *
 * \param vol Volume to write data.
 * \param iov iovec array describing the data to be written
 * \param iovcnt Number of elements in the iovec array
 * \param offset Offset (in logical blocks) to write the data on the compressed volume
 * \param length Length (in logical blocks) of the data to write
 * \param cb_fn Callback function to signal completion of the writev operation.
 * \param cb_arg Argument to pass to the callback function.
 */
void spdk_reduce_vol_writev(struct spdk_reduce_vol *vol,
			    struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
			    spdk_reduce_vol_op_complete cb_fn, void *cb_arg);

#endif /* SPDK_REDUCE_H_ */
+167 −0
Original line number Diff line number Diff line
@@ -76,6 +76,15 @@ struct spdk_reduce_pm_file {

struct spdk_reduce_vol_request {
	uint8_t					*buf;
	struct iovec				*iov;
	struct spdk_reduce_vol			*vol;
	int					iovcnt;
	uint64_t				offset;
	uint64_t				length;
	uint64_t				chunk_map_index;
	uint64_t				*chunk;
	spdk_reduce_vol_op_complete		cb_fn;
	void					*cb_arg;
	TAILQ_ENTRY(spdk_reduce_vol_request)	tailq;
};

@@ -157,6 +166,12 @@ _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backi
	return divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) * REDUCE_PM_SIZE_ALIGNMENT;
}

static uint64_t *
_reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
{
	return vol->pm_chunk_maps + (chunk_map_index * vol->backing_io_units_per_chunk);
}

static int
_validate_vol_params(struct spdk_reduce_vol_params *params)
{
@@ -658,4 +673,156 @@ spdk_reduce_vol_unload(struct spdk_reduce_vol *vol,
	cb_fn(cb_arg, 0);
}

static bool
_request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length)
{
	uint64_t start_chunk, end_chunk;

	start_chunk = offset / vol->logical_blocks_per_chunk;
	end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk;

	return (start_chunk != end_chunk);
}

typedef void (*reduce_request_fn)(void *_req, int reduce_errno);

static void
_reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno)
{
	req->cb_fn(req->cb_arg, reduce_errno);
	TAILQ_INSERT_HEAD(&req->vol->requests, req, tailq);
}

static void
_write_complete_req(void *_req, int reduce_errno)
{
	struct spdk_reduce_vol_request *req = _req;
	struct spdk_reduce_vol *vol = req->vol;
	uint64_t logical_map_index, old_chunk_map_index;
	uint64_t *old_chunk;
	uint32_t i;

	if (reduce_errno != 0) {
		_reduce_vol_complete_req(req, reduce_errno);
		return;
	}

	logical_map_index = req->offset / vol->logical_blocks_per_chunk;

	old_chunk_map_index = vol->pm_logical_map[logical_map_index];
	old_chunk = _reduce_vol_get_chunk_map(vol, old_chunk_map_index);
	for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
		if (old_chunk[i] == REDUCE_EMPTY_MAP_ENTRY) {
			break;
		}
		assert(spdk_bit_array_get(vol->allocated_backing_io_units, old_chunk[i]) == true);
		spdk_bit_array_clear(vol->allocated_backing_io_units, old_chunk[i]);
		old_chunk[i] = REDUCE_EMPTY_MAP_ENTRY;
	}
	spdk_bit_array_clear(vol->allocated_chunk_maps, old_chunk_map_index);

	/*
	 * We don't need to persist the clearing of the old chunk map here.  The old chunk map
	 * becomes invalid after we update the logical map, since the old chunk map will no
	 * longer have a reference to it in the logical map.
	 */

	/* Persist the new chunk map.  This must be persisted before we update the logical map. */
	_reduce_persist(vol, req->chunk, sizeof(uint64_t) * vol->backing_io_units_per_chunk);

	vol->pm_logical_map[logical_map_index] = req->chunk_map_index;

	_reduce_persist(vol, &vol->pm_logical_map[logical_map_index], sizeof(uint64_t));

	_reduce_vol_complete_req(req, 0);
}

static void
_reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
{
	struct spdk_reduce_vol *vol = req->vol;
	uint32_t i;

	req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0);

	/* TODO: fail if no chunk map found - but really this should not happen if we
	 * size the number of requests similarly to number of extra chunk maps
	 */
	assert(req->chunk_map_index != UINT32_MAX);
	spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index);

	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);

	for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
		req->chunk[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0);
		/* TODO: fail if no backing block found - but really this should also not
		 * happen (see comment above).
		 */
		assert(req->chunk[i] != UINT32_MAX);
		spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk[i]);
	}

	next_fn(req, 0);
}

static void
_write_read_done(void *_req, int reduce_errno)
{
	struct spdk_reduce_vol_request *req = _req;

	if (reduce_errno != 0) {
		_reduce_vol_complete_req(req, reduce_errno);
		return;
	}

	_reduce_vol_write_chunk(req, _write_complete_req);
}

static void
_reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
{
	next_fn(req, 0);
}

void
spdk_reduce_vol_writev(struct spdk_reduce_vol *vol,
		       struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
{
	struct spdk_reduce_vol_request *req;
	uint64_t chunk;

	if (_request_spans_chunk_boundary(vol, offset, length)) {
		cb_fn(cb_arg, -EINVAL);
		return;
	}

	req = TAILQ_FIRST(&vol->requests);
	if (req == NULL) {
		cb_fn(cb_arg, -ENOMEM);
		return;
	}

	TAILQ_REMOVE(&vol->requests, req, tailq);
	req->vol = vol;
	req->iov = iov;
	req->iovcnt = iovcnt;
	req->offset = offset;
	req->length = length;
	req->cb_fn = cb_fn;
	req->cb_arg = cb_arg;

	chunk = offset / vol->logical_blocks_per_chunk;
	if (vol->pm_logical_map[chunk] != REDUCE_EMPTY_MAP_ENTRY) {
		/* Read old chunk, then overwrite with data from this write operation.
		 * TODO: bypass reading old chunk if this write operation overwrites
		 * the entire chunk.
		 */
		_reduce_vol_read_chunk(req, _write_read_done);
		return;
	}

	_reduce_vol_write_chunk(req, _write_complete_req);
}

SPDK_LOG_REGISTER_COMPONENT("reduce", SPDK_LOG_REDUCE)
+108 −1
Original line number Diff line number Diff line
@@ -469,6 +469,112 @@ load(void)
	backing_dev_destroy(&backing_dev);
}

static uint64_t
_vol_get_chunk_map_index(struct spdk_reduce_vol *vol, uint64_t offset)
{
	uint64_t logical_map_index = offset / vol->logical_blocks_per_chunk;

	return vol->pm_logical_map[logical_map_index];
}

static uint64_t *
_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
{
	return &vol->pm_chunk_maps[chunk_map_index * vol->backing_io_units_per_chunk];
}

static void
write_cb(void *arg, int reduce_errno)
{
	g_reduce_errno = reduce_errno;
}

static void
write_maps(void)
{
	struct spdk_reduce_vol_params params = {};
	struct spdk_reduce_backing_dev backing_dev = {};
	struct iovec iov;
	char buf[16 * 1024]; /* chunk size */
	uint32_t i;
	uint64_t old_chunk0_map_index, new_chunk0_map_index;
	uint64_t *old_chunk0_map, *new_chunk0_map;

	params.vol_size = 1024 * 1024; /* 1MB */
	params.chunk_size = 16 * 1024;
	params.backing_io_unit_size = 4096;
	params.logical_block_size = 512;
	spdk_uuid_generate(&params.uuid);

	backing_dev_init(&backing_dev, &params);

	g_vol = NULL;
	g_reduce_errno = -1;
	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
	CU_ASSERT(g_reduce_errno == 0);
	SPDK_CU_ASSERT_FATAL(g_vol != NULL);

	for (i = 0; i < g_vol->params.vol_size / g_vol->params.chunk_size; i++) {
		CU_ASSERT(_vol_get_chunk_map_index(g_vol, i) == REDUCE_EMPTY_MAP_ENTRY);
	}

	iov.iov_base = buf;
	iov.iov_len = params.logical_block_size;
	g_reduce_errno = -1;
	spdk_reduce_vol_writev(g_vol, &iov, 1, 0, 1, write_cb, NULL);
	CU_ASSERT(g_reduce_errno == 0);

	old_chunk0_map_index = _vol_get_chunk_map_index(g_vol, 0);
	CU_ASSERT(old_chunk0_map_index != REDUCE_EMPTY_MAP_ENTRY);
	CU_ASSERT(spdk_bit_array_get(g_vol->allocated_chunk_maps, old_chunk0_map_index) == true);

	old_chunk0_map = _vol_get_chunk_map(g_vol, old_chunk0_map_index);
	for (i = 0; i < g_vol->backing_io_units_per_chunk; i++) {
		CU_ASSERT(old_chunk0_map[i] != REDUCE_EMPTY_MAP_ENTRY);
		CU_ASSERT(spdk_bit_array_get(g_vol->allocated_backing_io_units, old_chunk0_map[i]) == true);
	}

	g_reduce_errno = -1;
	spdk_reduce_vol_writev(g_vol, &iov, 1, 0, 1, write_cb, NULL);
	CU_ASSERT(g_reduce_errno == 0);

	new_chunk0_map_index = _vol_get_chunk_map_index(g_vol, 0);
	CU_ASSERT(new_chunk0_map_index != REDUCE_EMPTY_MAP_ENTRY);
	CU_ASSERT(new_chunk0_map_index != old_chunk0_map_index);
	CU_ASSERT(spdk_bit_array_get(g_vol->allocated_chunk_maps, new_chunk0_map_index) == true);
	CU_ASSERT(spdk_bit_array_get(g_vol->allocated_chunk_maps, old_chunk0_map_index) == false);

	for (i = 0; i < g_vol->backing_io_units_per_chunk; i++) {
		CU_ASSERT(spdk_bit_array_get(g_vol->allocated_backing_io_units, old_chunk0_map[i]) == false);
	}

	new_chunk0_map = _vol_get_chunk_map(g_vol, new_chunk0_map_index);
	for (i = 0; i < g_vol->backing_io_units_per_chunk; i++) {
		CU_ASSERT(new_chunk0_map[i] != REDUCE_EMPTY_MAP_ENTRY);
		CU_ASSERT(spdk_bit_array_get(g_vol->allocated_backing_io_units, new_chunk0_map[i]) == true);
	}

	g_reduce_errno = -1;
	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
	CU_ASSERT(g_reduce_errno == 0);

	g_vol = NULL;
	g_reduce_errno = -1;
	spdk_reduce_vol_load(&backing_dev, load_cb, NULL);
	CU_ASSERT(g_reduce_errno == 0);
	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
	CU_ASSERT(g_vol->params.vol_size == params.vol_size);
	CU_ASSERT(g_vol->params.chunk_size == params.chunk_size);
	CU_ASSERT(g_vol->params.backing_io_unit_size == params.backing_io_unit_size);

	g_reduce_errno = -1;
	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
	CU_ASSERT(g_reduce_errno == 0);

	persistent_pm_buf_destroy();
	backing_dev_destroy(&backing_dev);
}

int
main(int argc, char **argv)
{
@@ -491,7 +597,8 @@ main(int argc, char **argv)
		CU_add_test(suite, "init_failure", init_failure) == NULL ||
		CU_add_test(suite, "init_md", init_md) == NULL ||
		CU_add_test(suite, "init_backing_dev", init_backing_dev) == NULL ||
		CU_add_test(suite, "load", load) == NULL
		CU_add_test(suite, "load", load) == NULL ||
		CU_add_test(suite, "write_maps", write_maps) == NULL
	) {
		CU_cleanup_registry();
		return CU_get_error();