Commit cc27c1ab authored by Mike Gerdts's avatar Mike Gerdts Committed by Tomasz Zawadzki
Browse files

blobstore: missing lock leads to md page race



Many parts of the blobstore.c seem to have gone with the assumption that
blob creation, deletion, etc. all happen on the md thread. This
assumption would allow modification of the bs->used_md_pages and
bs->used_clusters bit arrays without holding a lock. Placing
"assert(spdk_get_thread() == bs->md_thread)" in bs_claim_md_page() and
bs_claim_cluster() show that each of these functions are called on other
threads due writes to thin provisioned volumes.

This problem was first seen in the wild with this failed assertion:

  bs_claim_md_page: Assertion
     `spdk_bit_array_get(bs->used_md_pages, page) == false' failed.

This commit adds "assert(spdk_spin_held(&bs->used_lock))" in those
places where bs->used_md_pages and bs->used_lock are modified, then
holds bs->used_lock in the places needed to satisfy these assertions.

Signed-off-by: default avatarMike Gerdts <mgerdts@nvidia.com>
Change-Id: I0523dd343ec490d994352932b2a73379a80e36f4
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/15953


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarAleksey Marchuk <alexeymar@nvidia.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <smatsumoto@nvidia.com>
Community-CI: Mellanox Build Bot
parent 67c7e858
Loading
Loading
Loading
Loading
+45 −7
Original line number Diff line number Diff line
@@ -73,6 +73,7 @@ bs_get_snapshot_entry(struct spdk_blob_store *bs, spdk_blob_id blobid)
static void
bs_claim_md_page(struct spdk_blob_store *bs, uint32_t page)
{
	assert(spdk_spin_held(&bs->used_lock));
	assert(page < spdk_bit_array_capacity(bs->used_md_pages));
	assert(spdk_bit_array_get(bs->used_md_pages, page) == false);

@@ -82,6 +83,7 @@ bs_claim_md_page(struct spdk_blob_store *bs, uint32_t page)
static void
bs_release_md_page(struct spdk_blob_store *bs, uint32_t page)
{
	assert(spdk_spin_held(&bs->used_lock));
	assert(page < spdk_bit_array_capacity(bs->used_md_pages));
	assert(spdk_bit_array_get(bs->used_md_pages, page) == true);

@@ -93,6 +95,8 @@ bs_claim_cluster(struct spdk_blob_store *bs)
{
	uint32_t cluster_num;

	assert(spdk_spin_held(&bs->used_lock));

	cluster_num = spdk_bit_pool_allocate_bit(bs->used_clusters);
	if (cluster_num == UINT32_MAX) {
		return UINT32_MAX;
@@ -107,6 +111,7 @@ bs_claim_cluster(struct spdk_blob_store *bs)
static void
bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
{
	assert(spdk_spin_held(&bs->used_lock));
	assert(cluster_num < spdk_bit_pool_capacity(bs->used_clusters));
	assert(spdk_bit_pool_is_allocated(bs->used_clusters, cluster_num) == true);
	assert(bs->num_free_clusters < bs->total_clusters);
@@ -138,6 +143,8 @@ bs_allocate_cluster(struct spdk_blob *blob, uint32_t cluster_num,
{
	uint32_t *extent_page = 0;

	assert(spdk_spin_held(&blob->bs->used_lock));

	*cluster = bs_claim_cluster(blob->bs);
	if (*cluster == UINT32_MAX) {
		/* No more free clusters. Cannot satisfy the request */
@@ -1666,6 +1673,8 @@ blob_persist_clear_extents_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrn
		return;
	}

	spdk_spin_lock(&bs->used_lock);

	/* Release all extent_pages that were truncated */
	for (i = blob->active.num_extent_pages; i < blob->active.extent_pages_array_size; i++) {
		/* Nothing to release if it was not allocated */
@@ -1674,6 +1683,8 @@ blob_persist_clear_extents_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrn
		}
	}

	spdk_spin_unlock(&bs->used_lock);

	if (blob->active.num_extent_pages == 0) {
		free(blob->active.extent_pages);
		blob->active.extent_pages = NULL;
@@ -1832,6 +1843,8 @@ blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
		return;
	}

	spdk_spin_lock(&bs->used_lock);

	/* This loop starts at 1 because the first page is special and handled
	 * below. The pages (except the first) are never written in place,
	 * so any pages in the clean list must be zeroed.
@@ -1847,6 +1860,8 @@ blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
		bs_release_md_page(bs, page_num);
	}

	spdk_spin_unlock(&bs->used_lock);

	/* Move on to clearing clusters */
	blob_persist_clear_clusters(seq, ctx);
}
@@ -1999,8 +2014,14 @@ blob_resize(struct spdk_blob *blob, uint64_t sz)
		current_num_ep = spdk_divide_round_up(num_clusters, SPDK_EXTENTS_PER_EP);
	}

	/* Check first that we have enough clusters and md pages before we start claiming them. */
	assert(!spdk_spin_held(&bs->used_lock));

	/* Check first that we have enough clusters and md pages before we start claiming them.
	 * bs->used_lock is held to ensure that clusters we think are free are still free when we go
	 * to claim them later in this function.
	 */
	if (sz > num_clusters && spdk_blob_is_thin_provisioned(blob) == false) {
		spdk_spin_lock(&bs->used_lock);
		if ((sz - num_clusters) > bs->num_free_clusters) {
			rc = -ENOSPC;
			goto out;
@@ -2049,12 +2070,10 @@ blob_resize(struct spdk_blob *blob, uint64_t sz)
	if (spdk_blob_is_thin_provisioned(blob) == false) {
		cluster = 0;
		lfmd = 0;
		spdk_spin_lock(&blob->bs->used_lock);
		for (i = num_clusters; i < sz; i++) {
			bs_allocate_cluster(blob, i, &cluster, &lfmd, true);
			lfmd++;
		}
		spdk_spin_unlock(&blob->bs->used_lock);
	}

	blob->active.num_clusters = sz;
@@ -2062,6 +2081,10 @@ blob_resize(struct spdk_blob *blob, uint64_t sz)

	rc = 0;
out:
	if (spdk_spin_held(&bs->used_lock)) {
		spdk_spin_unlock(&bs->used_lock);
	}

	return rc;
}

@@ -2093,14 +2116,17 @@ blob_persist_generate_new_md(struct spdk_blob_persist_ctx *ctx)
	}
	blob->active.pages = tmp;

	/* Assign this metadata to pages. This requires two passes -
	 * one to verify that there are enough pages and a second
	 * to actually claim them. */
	/* Assign this metadata to pages. This requires two passes - one to verify that there are
	 * enough pages and a second to actually claim them. The used_lock is held across
	 * both passes to ensure things don't change in the middle.
	 */
	spdk_spin_lock(&bs->used_lock);
	page_num = 0;
	/* Note that this loop starts at one. The first page location is fixed by the blobid. */
	for (i = 1; i < blob->active.num_pages; i++) {
		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
		if (page_num == UINT32_MAX) {
			spdk_spin_unlock(&bs->used_lock);
			blob_persist_complete(seq, ctx, -ENOMEM);
			return;
		}
@@ -2119,6 +2145,7 @@ blob_persist_generate_new_md(struct spdk_blob_persist_ctx *ctx)
		SPDK_DEBUGLOG(blob, "Claiming page %u for blob %" PRIu64 "\n", page_num, blob->id);
		page_num++;
	}
	spdk_spin_unlock(&bs->used_lock);
	ctx->pages[i - 1].crc = blob_md_page_calc_crc(&ctx->pages[i - 1]);
	/* Start writing the metadata from last page to first */
	blob->state = SPDK_BLOB_STATE_CLEAN;
@@ -2351,10 +2378,10 @@ blob_insert_cluster_cpl(void *cb_arg, int bserrno)
		}
		spdk_spin_lock(&ctx->blob->bs->used_lock);
		bs_release_cluster(ctx->blob->bs, ctx->new_cluster);
		spdk_spin_unlock(&ctx->blob->bs->used_lock);
		if (ctx->new_extent_page != 0) {
			bs_release_md_page(ctx->blob->bs, ctx->new_extent_page);
		}
		spdk_spin_unlock(&ctx->blob->bs->used_lock);
	}

	bs_sequence_finish(ctx->seq, bserrno);
@@ -4341,7 +4368,9 @@ bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
	page = ctx->page;
	if (bs_load_cur_md_page_valid(ctx) == true) {
		if (page->sequence_num == 0 || ctx->in_page_chain == true) {
			spdk_spin_lock(&ctx->bs->used_lock);
			bs_claim_md_page(ctx->bs, page_num);
			spdk_spin_unlock(&ctx->bs->used_lock);
			if (page->sequence_num == 0) {
				SPDK_NOTICELOG("Recover: blob %" PRIu32 "\n", page_num);
				spdk_bit_array_set(ctx->bs->used_blobids, page_num);
@@ -5667,8 +5696,10 @@ bs_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
	uint32_t page_idx = bs_blobid_to_page(blob->id);

	if (bserrno != 0) {
		spdk_spin_lock(&blob->bs->used_lock);
		spdk_bit_array_clear(blob->bs->used_blobids, page_idx);
		bs_release_md_page(blob->bs, page_idx);
		spdk_spin_unlock(&blob->bs->used_lock);
	}

	blob_free(blob);
@@ -5748,13 +5779,16 @@ bs_create_blob(struct spdk_blob_store *bs,

	assert(spdk_get_thread() == bs->md_thread);

	spdk_spin_lock(&bs->used_lock);
	page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0);
	if (page_idx == UINT32_MAX) {
		spdk_spin_unlock(&bs->used_lock);
		cb_fn(cb_arg, 0, -ENOMEM);
		return;
	}
	spdk_bit_array_set(bs->used_blobids, page_idx);
	bs_claim_md_page(bs, page_idx);
	spdk_spin_unlock(&bs->used_lock);

	id = bs_page_to_blobid(page_idx);

@@ -5819,8 +5853,10 @@ error:
	if (blob != NULL) {
		blob_free(blob);
	}
	spdk_spin_lock(&bs->used_lock);
	spdk_bit_array_clear(bs->used_blobids, page_idx);
	bs_release_md_page(bs, page_idx);
	spdk_spin_unlock(&bs->used_lock);
	cb_fn(cb_arg, 0, rc);
}

@@ -7528,8 +7564,10 @@ blob_insert_cluster_msg(void *arg)
		 * different cluster in the same extent page. In such case proceed with
		 * updating the existing extent page, but release the additional one. */
		if (ctx->extent_page != 0) {
			spdk_spin_lock(&ctx->blob->bs->used_lock);
			assert(spdk_bit_array_get(ctx->blob->bs->used_md_pages, ctx->extent_page) == true);
			bs_release_md_page(ctx->blob->bs, ctx->extent_page);
			spdk_spin_unlock(&ctx->blob->bs->used_lock);
			ctx->extent_page = 0;
		}
		/* Extent page already allocated.
+2 −0
Original line number Diff line number Diff line
@@ -4028,8 +4028,10 @@ blob_insert_cluster_msg_test(void)
	/* Specify cluster_num to allocate and new_cluster will be returned to insert on md_thread.
	 * This is to simulate behaviour when cluster is allocated after blob creation.
	 * Such as _spdk_bs_allocate_and_copy_cluster(). */
	spdk_spin_lock(&bs->used_lock);
	bs_allocate_cluster(blob, cluster_num, &new_cluster, &extent_page, false);
	CU_ASSERT(blob->active.clusters[cluster_num] == 0);
	spdk_spin_unlock(&bs->used_lock);

	blob_insert_cluster_on_md_thread(blob, cluster_num, new_cluster, extent_page, &page,
					 blob_op_complete, NULL);