Commit 56d702ab authored by Konrad Sztyber's avatar Konrad Sztyber Committed by Tomasz Zawadzki
Browse files

lib/blob: use bs_allocate_and_copy_cluster() in inflate



Using `bs_allocate_and_copy_cluster()` instead of a zero-length write
makes it possible to inflate/decouple snapshots, as the writes would
fail with -EPERM, because the snapshots are marked as read-only.

Additionally, zero-length non-vector requests are now completed
immediately.  It makes it consistent with the vector path (which already
does that) and allows us to use the zero-length reads as a context for
cluster copy.

Fixes #2028.

Signed-off-by: default avatarKonrad Sztyber <konrad.sztyber@intel.com>
Change-Id: Ib7fdee352972ecf808833aa179820d85cfab7eed
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/8918


Community-CI: Mellanox Build Bot
Community-CI: Broadcom CI <spdk-ci.pdl@broadcom.com>
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-by: default avatarTomasz Zawadzki <tomasz.zawadzki@intel.com>
parent 95a59652
Loading
Loading
Loading
Loading
+24 −4
Original line number Diff line number Diff line
@@ -2746,6 +2746,11 @@ blob_request_submit_op(struct spdk_blob *blob, struct spdk_io_channel *_channel,
		return;
	}

	if (length == 0) {
		cb_fn(cb_arg, 0);
		return;
	}

	if (offset + length > bs_cluster_to_lba(blob->bs, blob->active.num_clusters)) {
		cb_fn(cb_arg, -EINVAL);
		return;
@@ -6112,7 +6117,10 @@ bs_inflate_blob_done(struct spdk_clone_snapshot_ctx *ctx)
		_blob->back_bs_dev = bs_create_zeroes_dev();
	}

	/* Temporarily override md_ro flag for MD modification */
	_blob->md_ro = false;
	_blob->state = SPDK_BLOB_STATE_DIRTY;

	spdk_blob_sync_md(_blob, bs_clone_snapshot_origblob_cleanup, ctx);
}

@@ -6143,6 +6151,8 @@ bs_inflate_blob_touch_next(void *cb_arg, int bserrno)
{
	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
	struct spdk_blob *_blob = ctx->original.blob;
	struct spdk_bs_cpl cpl;
	spdk_bs_user_op_t *op;
	uint64_t offset;

	if (bserrno != 0) {
@@ -6159,12 +6169,22 @@ bs_inflate_blob_touch_next(void *cb_arg, int bserrno)
	if (ctx->cluster < _blob->active.num_clusters) {
		offset = bs_cluster_to_lba(_blob->bs, ctx->cluster);

		/* We may safely increment a cluster before write */
		/* We may safely increment a cluster before copying */
		ctx->cluster++;

		/* Use zero length write to touch a cluster */
		spdk_blob_io_write(_blob, ctx->channel, NULL, offset, 0,
				   bs_inflate_blob_touch_next, ctx);
		/* Use a dummy 0B read as a context for cluster copy */
		cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
		cpl.u.blob_basic.cb_fn = bs_inflate_blob_touch_next;
		cpl.u.blob_basic.cb_arg = ctx;

		op = bs_user_op_alloc(ctx->channel, &cpl, SPDK_BLOB_READ, _blob,
				      NULL, 0, offset, 0);
		if (!op) {
			bs_clone_snapshot_origblob_cleanup(ctx, -ENOMEM);
			return;
		}

		bs_allocate_and_copy_cluster(_blob, ctx->channel, offset, op);
	} else {
		bs_inflate_blob_done(ctx);
	}
+73 −0
Original line number Diff line number Diff line
@@ -6874,6 +6874,78 @@ blob_persist_test(void)
	poll_threads();
}

static void
blob_decouple_snapshot(void)
{
	struct spdk_blob_store *bs = g_bs;
	struct spdk_blob_opts opts;
	struct spdk_blob *blob, *snapshot1, *snapshot2;
	struct spdk_io_channel *channel;
	spdk_blob_id blobid, snapshotid;
	uint64_t cluster;

	channel = spdk_bs_alloc_io_channel(bs);
	SPDK_CU_ASSERT_FATAL(channel != NULL);

	ut_spdk_blob_opts_init(&opts);
	opts.num_clusters = 10;
	opts.thin_provision = false;

	blob = ut_blob_create_and_open(bs, &opts);
	blobid = spdk_blob_get_id(blob);

	/* Create first snapshot */
	CU_ASSERT_EQUAL(_get_snapshots_count(bs), 0);
	spdk_bs_create_snapshot(bs, blobid, NULL, blob_op_with_id_complete, NULL);
	poll_threads();
	CU_ASSERT(g_bserrno == 0);
	CU_ASSERT(g_blobid != SPDK_BLOBID_INVALID);
	CU_ASSERT_EQUAL(_get_snapshots_count(bs), 1);
	snapshotid = g_blobid;

	spdk_bs_open_blob(bs, snapshotid, blob_op_with_handle_complete, NULL);
	poll_threads();
	CU_ASSERT(g_bserrno == 0);
	SPDK_CU_ASSERT_FATAL(g_blob != NULL);
	snapshot1 = g_blob;

	/* Create the second one */
	CU_ASSERT_EQUAL(_get_snapshots_count(bs), 1);
	spdk_bs_create_snapshot(bs, blobid, NULL, blob_op_with_id_complete, NULL);
	poll_threads();
	CU_ASSERT(g_bserrno == 0);
	CU_ASSERT(g_blobid != SPDK_BLOBID_INVALID);
	CU_ASSERT_EQUAL(_get_snapshots_count(bs), 2);
	snapshotid = g_blobid;

	spdk_bs_open_blob(bs, snapshotid, blob_op_with_handle_complete, NULL);
	poll_threads();
	CU_ASSERT(g_bserrno == 0);
	SPDK_CU_ASSERT_FATAL(g_blob != NULL);
	snapshot2 = g_blob;
	CU_ASSERT_EQUAL(spdk_blob_get_parent_snapshot(bs, snapshot2->id), snapshot1->id);

	/* Now decouple the second snapshot forcing it to copy the written clusters */
	spdk_bs_blob_decouple_parent(bs, channel, snapshot2->id, blob_op_complete, NULL);
	poll_threads();
	CU_ASSERT(g_bserrno == 0);

	/* Verify that the snapshot has been decoupled and that the clusters have been copied */
	CU_ASSERT_EQUAL(spdk_blob_get_parent_snapshot(bs, snapshot2->id), SPDK_BLOBID_INVALID);
	for (cluster = 0; cluster < snapshot2->active.num_clusters; ++cluster) {
		CU_ASSERT_NOT_EQUAL(snapshot2->active.clusters[cluster], 0);
		CU_ASSERT_NOT_EQUAL(snapshot2->active.clusters[cluster],
				    snapshot1->active.clusters[cluster]);
	}

	spdk_bs_free_io_channel(channel);

	ut_blob_close_and_delete(bs, snapshot2);
	ut_blob_close_and_delete(bs, snapshot1);
	ut_blob_close_and_delete(bs, blob);
	poll_threads();
}

static void
suite_bs_setup(void)
{
@@ -7043,6 +7115,7 @@ int main(int argc, char **argv)
	CU_ADD_TEST(suite, blob_io_unit_compatiblity);
	CU_ADD_TEST(suite_bs, blob_simultaneous_operations);
	CU_ADD_TEST(suite_bs, blob_persist_test);
	CU_ADD_TEST(suite_bs, blob_decouple_snapshot);

	allocate_threads(2);
	set_thread(0);
+4 −2
Original line number Diff line number Diff line
@@ -157,8 +157,10 @@ dev_read(struct spdk_bs_dev *dev, struct spdk_io_channel *channel, void *payload
		length = lba_count * dev->blocklen;
		SPDK_CU_ASSERT_FATAL(offset + length <= DEV_BUFFER_SIZE);

		if (length > 0) {
			memcpy(payload, &g_dev_buffer[offset], length);
			g_dev_read_bytes += length;
		}
	} else {
		g_power_failure_rc = -EIO;
	}