Commit 777627e0 authored by Piotr Pelplinski's avatar Piotr Pelplinski Committed by Jim Harris
Browse files

blobstore: add snapshot functionality



This patch adds new feature of blobstore.
New call creates a read-only snapshot of specified blob with provided options.

NOTE:

This patch doesn't cover recovery operation if snapshotting fails. This operation
will be implemented and added later.

Signed-off-by: default avatarPiotr Pelplinski <piotr.pelplinski@intel.com>
Signed-off-by: default avatarTomasz Kulasek <tomaszx.kulasek@intel.com>
Change-Id: I470ca13525638fa6df485d508b3adf71b6b69c0b
Reviewed-on: https://review.gerrithub.io/393935


Reviewed-by: default avatarTomasz Zawadzki <tomasz.zawadzki@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarMaciej Szwed <maciej.szwed@intel.com>
Tested-by: default avatarSPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: default avatarBen Walker <benjamin.walker@intel.com>
parent 8988543d
Loading
Loading
Loading
Loading
+19 −0
Original line number Diff line number Diff line
@@ -355,6 +355,25 @@ void spdk_bs_create_blob_ext(struct spdk_blob_store *bs, const struct spdk_blob_
void spdk_bs_create_blob(struct spdk_blob_store *bs,
			 spdk_blob_op_with_id_complete cb_fn, void *cb_arg);

/**
 * Create a read-only snapshot of specified blob with provided options.
 * This will automatically sync specified blob.
 *
 * When operation is done, original blob is converted to the thin-provisioned
 * blob with a newly created read-only snapshot set as a backing blob.
 * Structure snapshot_xattrs as well as anything it references (like e.g. names
 * array) must be valid until the completion is called.
 *
 * \param bs blobstore.
 * \param blobid Id of the source blob used to create a snapshot.
 * \param snapshot_xattrs xattrs specified for snapshot.
 * \param cb_fn Called when the operation is complete.
 * \param cb_arg Argument passed to function cb_fn.
 */
void spdk_bs_create_snapshot(struct spdk_blob_store *bs, spdk_blob_id blobid,
			     const struct spdk_blob_xattr_opts *snapshot_xattrs,
			     spdk_blob_op_with_id_complete cb_fn, void *cb_arg);

/**
 * Delete an existing blob from the given blobstore.
 *
+270 −0
Original line number Diff line number Diff line
@@ -41,6 +41,7 @@
#include "spdk/bit_array.h"
#include "spdk/likely.h"

#include "spdk_internal/assert.h"
#include "spdk_internal/log.h"

#include "blobstore.h"
@@ -3573,6 +3574,275 @@ void spdk_bs_create_blob_ext(struct spdk_blob_store *bs, const struct spdk_blob_

/* END spdk_bs_create_blob */

/* START blob_cleanup */

struct spdk_clone_snapshot_ctx {
	struct spdk_bs_cpl      cpl;
	int bserrno;

	struct {
		spdk_blob_id id;
		struct spdk_blob *blob;
	} original;
	struct {
		spdk_blob_id id;
		struct spdk_blob *blob;
	} new;

	/* xattrs specified for snapshot/clones only. They have no impact on
	 * the original blobs xattrs. */
	const struct spdk_blob_xattr_opts *xattrs;
};

static void
_spdk_bs_clone_snapshot_cleanup_finish(void *cb_arg, int bserrno)
{
	struct spdk_clone_snapshot_ctx *ctx = cb_arg;
	struct spdk_bs_cpl *cpl = &ctx->cpl;

	if (bserrno != 0) {
		if (ctx->bserrno != 0) {
			SPDK_ERRLOG("Cleanup error %d\n", bserrno);
		} else {
			ctx->bserrno = bserrno;
		}
	}

	switch (cpl->type) {
	case SPDK_BS_CPL_TYPE_BLOBID:
		cpl->u.blobid.cb_fn(cpl->u.blobid.cb_arg, cpl->u.blobid.blobid, ctx->bserrno);
		break;
	default:
		SPDK_UNREACHABLE();
		break;
	}

	free(ctx);
}

static void
_spdk_bs_clone_snapshot_origblob_cleanup(void *cb_arg, int bserrno)
{
	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
	struct spdk_blob *origblob = ctx->original.blob;

	if (bserrno != 0) {
		if (ctx->bserrno != 0) {
			SPDK_ERRLOG("Cleanup error %d\n", bserrno);
		} else {
			ctx->bserrno = bserrno;
		}
	}

	ctx->original.id = origblob->id;
	spdk_blob_close(origblob, _spdk_bs_clone_snapshot_cleanup_finish, ctx);
}

static void
_spdk_bs_clone_snapshot_newblob_cleanup(void *cb_arg, int bserrno)
{
	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
	struct spdk_blob *newblob = ctx->new.blob;

	if (bserrno != 0) {
		if (ctx->bserrno != 0) {
			SPDK_ERRLOG("Cleanup error %d\n", bserrno);
		} else {
			ctx->bserrno = bserrno;
		}
	}

	ctx->new.id = newblob->id;
	spdk_blob_close(newblob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx);
}

/* END blob_cleanup */

/* START spdk_bs_create_snapshot */

static void
_spdk_bs_snapshot_origblob_sync_cpl(void *cb_arg, int bserrno)
{
	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
	struct spdk_blob *newblob = ctx->new.blob;

	if (bserrno != 0) {
		_spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno);
		return;
	}

	/* Remove metadata descriptor SNAPSHOT_IN_PROGRESS */
	bserrno = _spdk_blob_remove_xattr(newblob, SNAPSHOT_IN_PROGRESS, true);
	if (bserrno != 0) {
		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
		return;
	}

	spdk_blob_set_read_only(newblob);

	/* sync snapshot metadata */
	spdk_blob_sync_md(newblob, _spdk_bs_clone_snapshot_origblob_cleanup, cb_arg);
}

static void
_spdk_bs_snapshot_newblob_sync_cpl(void *cb_arg, int bserrno)
{
	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
	struct spdk_blob *origblob = ctx->original.blob;
	struct spdk_blob *newblob = ctx->new.blob;

	if (bserrno != 0) {
		_spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno);
		return;
	}

	/* Set internal xattr for snapshot id */
	bserrno = _spdk_blob_set_xattr(origblob, BLOB_SNAPSHOT, &newblob->id, sizeof(spdk_blob_id), true);
	if (bserrno != 0) {
		_spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno);
		return;
	}

	/* Create new back_bs_dev for snapshot */
	origblob->back_bs_dev = spdk_bs_create_blob_bs_dev(newblob);
	if (origblob->back_bs_dev == NULL) {
		_spdk_bs_clone_snapshot_newblob_cleanup(ctx, -EINVAL);
		return;
	}

	/* set clone blob as thin provisioned */
	_spdk_blob_set_thin_provision(origblob);

	/* Zero out origblob cluster map */
	memset(origblob->active.clusters, 0,
	       origblob->active.num_clusters * sizeof(origblob->active.clusters));

	/* sync clone metadata */
	spdk_blob_sync_md(origblob, _spdk_bs_snapshot_origblob_sync_cpl, ctx);
}

static void
_spdk_bs_snapshot_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
{
	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
	struct spdk_blob *origblob = ctx->original.blob;
	struct spdk_blob *newblob = _blob;

	if (bserrno != 0) {
		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
		return;
	}

	ctx->new.blob = newblob;

	/* set new back_bs_dev for snapshot */
	newblob->back_bs_dev = origblob->back_bs_dev;
	/* Set invalid flags from origblob */
	newblob->invalid_flags = origblob->invalid_flags;

	/* Copy cluster map to snapshot */
	memcpy(newblob->active.clusters, origblob->active.clusters,
	       origblob->active.num_clusters * sizeof(origblob->active.clusters));

	/* sync snapshot metadata */
	spdk_blob_sync_md(newblob, _spdk_bs_snapshot_newblob_sync_cpl, ctx);
}

static void
_spdk_bs_snapshot_newblob_create_cpl(void *cb_arg, spdk_blob_id blobid, int bserrno)
{
	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
	struct spdk_blob *origblob = ctx->original.blob;

	if (bserrno != 0) {
		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
		return;
	}

	ctx->new.id = blobid;
	ctx->cpl.u.blobid.blobid = blobid;

	spdk_bs_open_blob(origblob->bs, ctx->new.id, _spdk_bs_snapshot_newblob_open_cpl, ctx);
}


static void
_spdk_bs_xattr_snapshot(void *arg, const char *name,
			const void **value, size_t *value_len)
{
	assert(strncmp(name, SNAPSHOT_IN_PROGRESS, sizeof(SNAPSHOT_IN_PROGRESS)) == 0);

	struct spdk_blob *blob = (struct spdk_blob *)arg;
	*value = &blob->id;
	*value_len = sizeof(blob->id);
}

static void
_spdk_bs_snapshot_origblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
{
	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
	struct spdk_blob_opts opts;
	struct spdk_blob_xattr_opts internal_xattrs;
	char *xattrs_names[] = { SNAPSHOT_IN_PROGRESS };

	if (bserrno != 0) {
		_spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno);
		return;
	}

	ctx->original.blob = _blob;

	if (_blob->data_ro || _blob->md_ro) {
		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create snapshot from read only blob with id %lu\n",
			      _blob->id);
		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, -EINVAL);
		return;
	}

	spdk_blob_opts_init(&opts);
	_spdk_blob_xattrs_init(&internal_xattrs);

	/* Change the size of new blob to the same as in original blob,
	 * but do not allocate clusters */
	opts.thin_provision = true;
	opts.num_clusters = spdk_blob_get_num_clusters(_blob);

	/* If there are any xattrs specified for snapshot, set them now */
	if (ctx->xattrs) {
		memcpy(&opts.xattrs, ctx->xattrs, sizeof(*ctx->xattrs));
	}
	/* Set internal xattr SNAPSHOT_IN_PROGRESS */
	internal_xattrs.count = 1;
	internal_xattrs.ctx = _blob;
	internal_xattrs.names = xattrs_names;
	internal_xattrs.get_value = _spdk_bs_xattr_snapshot;

	_spdk_bs_create_blob(_blob->bs, &opts, &internal_xattrs,
			     _spdk_bs_snapshot_newblob_create_cpl, ctx);
}

void spdk_bs_create_snapshot(struct spdk_blob_store *bs, spdk_blob_id blobid,
			     const struct spdk_blob_xattr_opts *snapshot_xattrs,
			     spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
{
	struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx));

	if (!ctx) {
		cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOMEM);
		return;
	}
	ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
	ctx->cpl.u.blobid.cb_fn = cb_fn;
	ctx->cpl.u.blobid.cb_arg = cb_arg;
	ctx->cpl.u.blobid.blobid = SPDK_BLOBID_INVALID;
	ctx->bserrno = 0;
	ctx->original.id = blobid;
	ctx->xattrs = snapshot_xattrs;

	spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_snapshot_origblob_open_cpl, ctx);
}
/* END spdk_bs_create_snapshot */

/* START spdk_blob_resize */
void
spdk_blob_resize(struct spdk_blob *blob, uint64_t sz, spdk_blob_op_complete cb_fn, void *cb_arg)
+1 −0
Original line number Diff line number Diff line
@@ -198,6 +198,7 @@ enum spdk_blob_op_type {
/* back bs_dev */

#define BLOB_SNAPSHOT "SNAP"
#define SNAPSHOT_IN_PROGRESS "SNAPTMP"

struct spdk_blob_bs_dev {
	struct spdk_bs_dev bs_dev;
+339 −1
Original line number Diff line number Diff line
@@ -35,6 +35,7 @@

#include "spdk_cunit.h"
#include "spdk/blob.h"
#include "spdk/string.h"

#include "common/lib/test_env.c"
#include "../bs_dev_common.c"
@@ -550,6 +551,129 @@ blob_thin_provision(void)
	g_bs = NULL;
}

static void
blob_snapshot(void)
{
	struct spdk_blob_store *bs;
	struct spdk_bs_dev *dev;
	struct spdk_blob *blob;
	struct spdk_blob *snapshot, *snapshot2;
	struct spdk_blob_bs_dev *blob_bs_dev;
	struct spdk_blob_opts opts;
	struct spdk_blob_xattr_opts xattrs;
	spdk_blob_id blobid;
	spdk_blob_id snapshotid;
	const void *value;
	size_t value_len;
	int rc;

	dev = init_dev();

	spdk_bs_init(dev, NULL, bs_op_with_handle_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	SPDK_CU_ASSERT_FATAL(g_bs != NULL);
	bs = g_bs;

	/* Create blob with 10 clusters */
	spdk_blob_opts_init(&opts);
	opts.num_clusters = 10;

	spdk_bs_create_blob_ext(bs, &opts, blob_op_with_id_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	CU_ASSERT(g_blobid != SPDK_BLOBID_INVALID);
	blobid = g_blobid;

	spdk_bs_open_blob(bs, blobid, blob_op_with_handle_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	SPDK_CU_ASSERT_FATAL(g_blob != NULL);
	blob = g_blob;
	CU_ASSERT(spdk_blob_get_num_clusters(blob) == 10)

	/* Create snapshot from blob */
	spdk_bs_create_snapshot(bs, blobid, NULL, blob_op_with_id_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	CU_ASSERT(g_blobid != SPDK_BLOBID_INVALID);
	snapshotid = g_blobid;

	spdk_bs_open_blob(bs, snapshotid, blob_op_with_handle_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	SPDK_CU_ASSERT_FATAL(g_blob != NULL);
	snapshot = g_blob;
	CU_ASSERT(snapshot->data_ro == true)
	CU_ASSERT(snapshot->md_ro == true)
	CU_ASSERT(spdk_blob_get_num_clusters(snapshot) == 10)

	CU_ASSERT(spdk_blob_get_num_clusters(blob) == 10)
	CU_ASSERT(blob->invalid_flags & SPDK_BLOB_THIN_PROV);
	CU_ASSERT(spdk_mem_all_zero(blob->active.clusters,
				    blob->active.num_clusters * sizeof(blob->active.clusters[0])));

	/* Try to create snapshot from clone with xattrs */
	xattrs.names = g_xattr_names;
	xattrs.get_value = _get_xattr_value;
	xattrs.count = 3;
	xattrs.ctx = &g_ctx;
	spdk_bs_create_snapshot(bs, blobid, &xattrs, blob_op_with_id_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	CU_ASSERT(g_blobid != SPDK_BLOBID_INVALID);
	blobid = g_blobid;

	spdk_bs_open_blob(bs, blobid, blob_op_with_handle_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	SPDK_CU_ASSERT_FATAL(g_blob != NULL);
	snapshot2 = g_blob;
	CU_ASSERT(snapshot2->data_ro == true)
	CU_ASSERT(snapshot2->md_ro == true)
	CU_ASSERT(spdk_blob_get_num_clusters(snapshot2) == 10)

	/* Confirm that blob is backed by snapshot2 and snapshot2 is backed by snapshot */
	CU_ASSERT(snapshot->back_bs_dev == NULL);
	SPDK_CU_ASSERT_FATAL(blob->back_bs_dev != NULL);
	SPDK_CU_ASSERT_FATAL(snapshot2->back_bs_dev != NULL);

	blob_bs_dev = (struct spdk_blob_bs_dev *)blob->back_bs_dev;
	CU_ASSERT(blob_bs_dev->blob == snapshot2);

	blob_bs_dev = (struct spdk_blob_bs_dev *)snapshot2->back_bs_dev;
	CU_ASSERT(blob_bs_dev->blob == snapshot);

	rc = spdk_blob_get_xattr_value(snapshot2, g_xattr_names[0], &value, &value_len);
	CU_ASSERT(rc == 0);
	SPDK_CU_ASSERT_FATAL(value != NULL);
	CU_ASSERT(value_len == strlen(g_xattr_values[0]));
	CU_ASSERT_NSTRING_EQUAL_FATAL(value, g_xattr_values[0], value_len);

	rc = spdk_blob_get_xattr_value(snapshot2, g_xattr_names[1], &value, &value_len);
	CU_ASSERT(rc == 0);
	SPDK_CU_ASSERT_FATAL(value != NULL);
	CU_ASSERT(value_len == strlen(g_xattr_values[1]));
	CU_ASSERT_NSTRING_EQUAL((char *)value, g_xattr_values[1], value_len);

	rc = spdk_blob_get_xattr_value(snapshot2, g_xattr_names[2], &value, &value_len);
	CU_ASSERT(rc == 0);
	SPDK_CU_ASSERT_FATAL(value != NULL);
	CU_ASSERT(value_len == strlen(g_xattr_values[2]));
	CU_ASSERT_NSTRING_EQUAL((char *)value, g_xattr_values[2], value_len);

	/* Try to create snapshot from snapshot */
	spdk_bs_create_snapshot(bs, snapshotid, NULL, blob_op_with_id_complete, NULL);
	CU_ASSERT(g_bserrno == -EINVAL);
	CU_ASSERT(g_blobid == SPDK_BLOBID_INVALID);

	spdk_blob_close(blob, blob_op_complete, NULL);
	CU_ASSERT(g_bserrno == 0);

	spdk_blob_close(snapshot, blob_op_complete, NULL);
	CU_ASSERT(g_bserrno == 0);

	spdk_blob_close(snapshot2, blob_op_complete, NULL);
	CU_ASSERT(g_bserrno == 0);

	spdk_bs_unload(g_bs, bs_op_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	g_bs = NULL;
}

static void
blob_delete(void)
{
@@ -3293,7 +3417,218 @@ bs_load_iter(void)
	spdk_bs_unload(g_bs, bs_op_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	g_bs = NULL;
}

static void
blob_snapshot_rw(void)
{
	static const uint8_t zero[10 * 4096] = { 0 };
	struct spdk_blob_store *bs;
	struct spdk_bs_dev *dev;
	struct spdk_blob *blob, *snapshot;
	struct spdk_io_channel *channel;
	struct spdk_blob_opts opts;
	spdk_blob_id blobid, snapshotid;
	uint64_t free_clusters;
	uint8_t payload_read[10 * 4096];
	uint8_t payload_write[10 * 4096];

	dev = init_dev();

	spdk_bs_init(dev, NULL, bs_op_with_handle_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	SPDK_CU_ASSERT_FATAL(g_bs != NULL);
	bs = g_bs;
	free_clusters = spdk_bs_free_cluster_count(bs);

	channel = spdk_bs_alloc_io_channel(bs);
	CU_ASSERT(channel != NULL);

	spdk_blob_opts_init(&opts);
	opts.thin_provision = true;
	opts.num_clusters = 5;

	spdk_bs_create_blob_ext(bs, &opts, blob_op_with_id_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	CU_ASSERT(g_blobid != SPDK_BLOBID_INVALID);
	CU_ASSERT(free_clusters == spdk_bs_free_cluster_count(bs));
	blobid = g_blobid;

	spdk_bs_open_blob(bs, blobid, blob_op_with_handle_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	SPDK_CU_ASSERT_FATAL(g_blob != NULL);
	blob = g_blob;

	CU_ASSERT(spdk_blob_get_num_clusters(blob) == 5);

	memset(payload_read, 0xFF, sizeof(payload_read));
	spdk_blob_io_read(blob, channel, payload_read, 4, 10, blob_op_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	CU_ASSERT(memcmp(zero, payload_read, 10 * 4096) == 0);

	memset(payload_write, 0xE5, sizeof(payload_write));
	spdk_blob_io_write(blob, channel, payload_write, 4, 10, blob_op_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	CU_ASSERT(free_clusters != spdk_bs_free_cluster_count(bs));

	/* Create snapshot from blob */
	spdk_bs_create_snapshot(bs, blobid, NULL, blob_op_with_id_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	CU_ASSERT(g_blobid != SPDK_BLOBID_INVALID);
	snapshotid = g_blobid;

	spdk_bs_open_blob(bs, snapshotid, blob_op_with_handle_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	SPDK_CU_ASSERT_FATAL(g_blob != NULL);
	snapshot = g_blob;
	CU_ASSERT(snapshot->data_ro == true)
	CU_ASSERT(snapshot->md_ro == true)

	CU_ASSERT(spdk_blob_get_num_clusters(snapshot) == 5)

	memset(payload_write, 0xAA, sizeof(payload_write));
	spdk_blob_io_write(blob, channel, payload_write, 4, 10, blob_op_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	CU_ASSERT(free_clusters != spdk_bs_free_cluster_count(bs));

	spdk_blob_io_read(blob, channel, payload_read, 4, 10, blob_op_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	CU_ASSERT(memcmp(payload_write, payload_read, 10 * 4096) == 0);

	/* Data on snapshot should not change after write to clone */
	memset(payload_write, 0xE5, sizeof(payload_write));
	spdk_blob_io_read(snapshot, channel, payload_read, 4, 10, blob_op_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	CU_ASSERT(memcmp(payload_write, payload_read, 10 * 4096) == 0);

	spdk_blob_close(blob, blob_op_complete, NULL);
	CU_ASSERT(g_bserrno == 0);

	spdk_blob_close(snapshot, blob_op_complete, NULL);
	CU_ASSERT(g_bserrno == 0);

	spdk_bs_delete_blob(bs, blobid, blob_op_complete, NULL);
	CU_ASSERT(g_bserrno == 0);

	spdk_bs_delete_blob(bs, snapshotid, blob_op_complete, NULL);
	CU_ASSERT(g_bserrno == 0);

	spdk_bs_free_io_channel(channel);

	/* Unload the blob store */
	spdk_bs_unload(g_bs, bs_op_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	g_bs = NULL;
	g_blob = NULL;
	g_blobid = 0;
}

static void
blob_snapshot_rw_iov(void)
{
	static const uint8_t zero[10 * 4096] = { 0 };
	struct spdk_blob_store *bs;
	struct spdk_bs_dev *dev;
	struct spdk_blob *blob, *snapshot;
	struct spdk_io_channel *channel;
	struct spdk_blob_opts opts;
	spdk_blob_id blobid, snapshotid;
	uint64_t free_clusters;
	uint8_t payload_read[10 * 4096];
	uint8_t payload_write[10 * 4096];
	struct iovec iov_read[3];
	struct iovec iov_write[3];

	dev = init_dev();

	spdk_bs_init(dev, NULL, bs_op_with_handle_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	SPDK_CU_ASSERT_FATAL(g_bs != NULL);
	bs = g_bs;
	free_clusters = spdk_bs_free_cluster_count(bs);

	channel = spdk_bs_alloc_io_channel(bs);
	CU_ASSERT(channel != NULL);

	spdk_blob_opts_init(&opts);
	opts.thin_provision = true;
	opts.num_clusters = 5;

	spdk_bs_create_blob_ext(bs, &opts, blob_op_with_id_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	CU_ASSERT(g_blobid != SPDK_BLOBID_INVALID);
	CU_ASSERT(free_clusters == spdk_bs_free_cluster_count(bs));
	blobid = g_blobid;

	spdk_bs_open_blob(bs, blobid, blob_op_with_handle_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	SPDK_CU_ASSERT_FATAL(g_blob != NULL);
	blob = g_blob;

	CU_ASSERT(spdk_blob_get_num_clusters(blob) == 5);

	/* Create snapshot from blob */
	spdk_bs_create_snapshot(bs, blobid, NULL, blob_op_with_id_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	CU_ASSERT(g_blobid != SPDK_BLOBID_INVALID);
	snapshotid = g_blobid;

	spdk_bs_open_blob(bs, snapshotid, blob_op_with_handle_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	SPDK_CU_ASSERT_FATAL(g_blob != NULL);
	snapshot = g_blob;
	CU_ASSERT(snapshot->data_ro == true)
	CU_ASSERT(snapshot->md_ro == true)
	CU_ASSERT(spdk_blob_get_num_clusters(snapshot) == 5);

	/* Payload should be all zeros from unallocated clusters */
	memset(payload_read, 0xAA, sizeof(payload_read));
	iov_read[0].iov_base = payload_read;
	iov_read[0].iov_len = 3 * 4096;
	iov_read[1].iov_base = payload_read + 3 * 4096;
	iov_read[1].iov_len = 4 * 4096;
	iov_read[2].iov_base = payload_read + 7 * 4096;
	iov_read[2].iov_len = 3 * 4096;
	spdk_blob_io_readv(blob, channel, iov_read, 3, 250, 10, blob_op_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	CU_ASSERT(memcmp(zero, payload_read, 10 * 4096) == 0);

	memset(payload_write, 0xE5, sizeof(payload_write));
	iov_write[0].iov_base = payload_write;
	iov_write[0].iov_len = 1 * 4096;
	iov_write[1].iov_base = payload_write + 1 * 4096;
	iov_write[1].iov_len = 5 * 4096;
	iov_write[2].iov_base = payload_write + 6 * 4096;
	iov_write[2].iov_len = 4 * 4096;

	spdk_blob_io_writev(blob, channel, iov_write, 3, 250, 10, blob_op_complete, NULL);
	CU_ASSERT(g_bserrno == 0);

	memset(payload_read, 0xAA, sizeof(payload_read));
	iov_read[0].iov_base = payload_read;
	iov_read[0].iov_len = 3 * 4096;
	iov_read[1].iov_base = payload_read + 3 * 4096;
	iov_read[1].iov_len = 4 * 4096;
	iov_read[2].iov_base = payload_read + 7 * 4096;
	iov_read[2].iov_len = 3 * 4096;
	spdk_blob_io_readv(blob, channel, iov_read, 3, 250, 10, blob_op_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	CU_ASSERT(memcmp(payload_write, payload_read, 10 * 4096) == 0);

	spdk_blob_close(blob, blob_op_complete, NULL);
	CU_ASSERT(g_bserrno == 0);

	spdk_blob_close(snapshot, blob_op_complete, NULL);
	CU_ASSERT(g_bserrno == 0);

	spdk_bs_free_io_channel(channel);

	/* Unload the blob store */
	spdk_bs_unload(g_bs, bs_op_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	g_bs = NULL;
	g_blob = NULL;
	g_blobid = 0;
}

int main(int argc, char **argv)
@@ -3317,6 +3652,7 @@ int main(int argc, char **argv)
		CU_add_test(suite, "blob_create", blob_create) == NULL ||
		CU_add_test(suite, "blob_create_internal", blob_create_internal) == NULL ||
		CU_add_test(suite, "blob_thin_provision", blob_thin_provision) == NULL ||
		CU_add_test(suite, "blob_snapshot", blob_snapshot) == NULL ||
		CU_add_test(suite, "blob_delete", blob_delete) == NULL ||
		CU_add_test(suite, "blob_resize", blob_resize) == NULL ||
		CU_add_test(suite, "blob_read_only", blob_read_only) == NULL ||
@@ -3350,7 +3686,9 @@ int main(int argc, char **argv)
		CU_add_test(suite, "blob_insert_cluster_msg", blob_insert_cluster_msg) == NULL ||
		CU_add_test(suite, "blob_thin_prov_rw", blob_thin_prov_rw) == NULL ||
		CU_add_test(suite, "blob_thin_prov_rw_iov", blob_thin_prov_rw_iov) == NULL ||
		CU_add_test(suite, "bs_load_iter", bs_load_iter) == NULL
		CU_add_test(suite, "bs_load_iter", bs_load_iter) == NULL ||
		CU_add_test(suite, "blob_snapshot_rw", blob_snapshot_rw) == NULL ||
		CU_add_test(suite, "blob_snapshot_rw_iov", blob_snapshot_rw_iov) == NULL
	) {
		CU_cleanup_registry();
		return CU_get_error();