Commit a137b9af authored by Jim Harris's avatar Jim Harris Committed by Daniel Verkamp
Browse files

blob: queue sync requests if one already in progress



For any given blob, if an spdk_blob_sync_md() operation
is already in progress, queue additional spdk_blob_sync_md()
operations until the previous one completes.

This ensures proper ordering of writing metadata to
disk.

Signed-off-by: default avatarJim Harris <james.r.harris@intel.com>
Change-Id: I2051e8cb5b8d1a033ec1238cb4811232110aa0f4

Reviewed-on: https://review.gerrithub.io/401257


Tested-by: default avatarSPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: default avatarDaniel Verkamp <daniel.verkamp@intel.com>
Reviewed-by: default avatarChangpeng Liu <changpeng.liu@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
parent 9022a59e
Loading
Loading
Loading
Loading
+76 −13
Original line number Diff line number Diff line
@@ -58,6 +58,7 @@ static int _spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const
static int _spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name,
				      const void **value, size_t *value_len, bool internal);
static int _spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal);
static struct spdk_blob *_spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid);

static void
_spdk_blob_verify_md_op(struct spdk_blob *blob)
@@ -199,10 +200,39 @@ _spdk_xattrs_free(struct spdk_xattr_tailq *xattrs)
	}
}

struct spdk_blob_persist_ctx {
	struct spdk_blob			*blob;
	struct spdk_blob_md_page		*pages;
	uint64_t				idx;
	spdk_bs_sequence_t			*seq;
	spdk_bs_sequence_cpl			cb_fn;
	void					*cb_arg;
	TAILQ_ENTRY(spdk_blob_persist_ctx)	link;
};

static void
_spdk_blob_free(struct spdk_blob *blob)
{
	struct spdk_blob_store *bs;
	struct spdk_bs_channel *bs_channel;
	struct spdk_blob_persist_ctx *ctx, *tmp;

	assert(blob != NULL);
	bs = blob->bs;
	bs_channel = spdk_io_channel_get_ctx(bs->md_channel);

	/*
	 * If the blob was freed, there should be no remaining queued persist
	 *  requests.  But just to make sure, add an assert if any are found.
	 */
	TAILQ_FOREACH_SAFE(ctx, &bs_channel->queued_blob_persists, link, tmp) {
		if (ctx->blob == blob) {
			TAILQ_REMOVE(&bs_channel->queued_blob_persists, ctx, link);
			spdk_dma_free(ctx->pages);
			free(ctx);
			assert(false);
		}
	}

	free(blob->active.clusters);
	free(blob->clean.clusters);
@@ -946,24 +976,18 @@ _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
				  _spdk_blob_load_cpl, ctx);
}

struct spdk_blob_persist_ctx {
	struct spdk_blob		*blob;

	struct spdk_blob_md_page	*pages;

	uint64_t			idx;

	spdk_bs_sequence_t		*seq;
	spdk_bs_sequence_cpl		cb_fn;
	void				*cb_arg;
};
static void _spdk_blob_persist_start(struct spdk_blob_persist_ctx *ctx);

static void
_spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
{
	struct spdk_blob_persist_ctx	*ctx = cb_arg;
	struct spdk_blob_persist_ctx	*ctx = cb_arg, *tmp;
	struct spdk_blob		*blob = ctx->blob;
	struct spdk_blob_store		*bs = blob->bs;
	struct spdk_bs_channel		*bs_channel = spdk_io_channel_get_ctx(bs->md_channel);
	spdk_blob_id			blobid = blob->id;

	assert(blob->persist_in_progress == true);
	if (bserrno == 0) {
		_spdk_blob_mark_clean(blob);
	}
@@ -974,6 +998,37 @@ _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
	/* Free the memory */
	spdk_dma_free(ctx->pages);
	free(ctx);

	/*
	 * Sometimes the callback function we just executed will have freed
	 *  the blob (create and delete calls for example).
	 *  So we may not access the blob after the callback function in that
	 *  case.  Determine if the blob was deleted by looking up the blob
	 *  by its blobid.  If it cannot be found, we know it was freed and we
	 *  just return immediately.
	 */
	blob = _spdk_blob_lookup(bs, blobid);

	if (blob == NULL) {
		return;
	}

	blob->persist_in_progress = false;

	TAILQ_FOREACH_SAFE(ctx, &bs_channel->queued_blob_persists, link, tmp) {
		if (ctx->blob != blob) {
			continue;
		}
		TAILQ_REMOVE(&bs_channel->queued_blob_persists, ctx, link);
		if (blob->state == SPDK_BLOB_STATE_CLEAN) {
			ctx->cb_fn(seq, ctx->cb_arg, 0);
			spdk_dma_free(ctx->pages);
			free(ctx);
		} else {
			_spdk_blob_persist_start(ctx);
			break;
		}
	}
}

static void
@@ -1275,6 +1330,9 @@ _spdk_blob_persist_start(struct spdk_blob_persist_ctx *ctx)
	uint32_t page_num;
	int rc;

	assert(blob->persist_in_progress == false);
	blob->persist_in_progress = true;

	if (blob->active.num_pages == 0) {
		/* This is the signal that the blob should be deleted.
		 * Immediately jump to the clean up routine. */
@@ -1360,8 +1418,12 @@ _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
	ctx->cb_fn = cb_fn;
	ctx->cb_arg = cb_arg;

	if (spdk_unlikely(blob->persist_in_progress)) {
		TAILQ_INSERT_TAIL(&seq->channel->queued_blob_persists, ctx, link);
	} else {
		_spdk_blob_persist_start(ctx);
	}
}

struct spdk_blob_copy_cluster_ctx {
	struct spdk_blob *blob;
@@ -1974,6 +2036,7 @@ _spdk_bs_channel_create(void *io_device, void *ctx_buf)
	}

	TAILQ_INIT(&channel->need_cluster_alloc);
	TAILQ_INIT(&channel->queued_blob_persists);

	return 0;
}
+3 −1
Original line number Diff line number Diff line
@@ -122,6 +122,7 @@ struct spdk_blob {
	struct spdk_blob_mut_data	clean;
	struct spdk_blob_mut_data	active;

	bool		persist_in_progress;
	bool		invalid;
	bool		data_ro;
	bool		md_ro;
@@ -183,6 +184,7 @@ struct spdk_bs_channel {
	struct spdk_io_channel		*dev_channel;

	TAILQ_HEAD(, spdk_bs_request_set)	need_cluster_alloc;
	TAILQ_HEAD(, spdk_blob_persist_ctx)	queued_blob_persists;
};

/** operation type */
+88 −3
Original line number Diff line number Diff line
@@ -46,6 +46,7 @@
struct spdk_blob_store *g_bs;
spdk_blob_id g_blobid;
struct spdk_blob *g_blob;
int g_blob_op_complete_count;
int g_bserrno;
struct spdk_xattr_names *g_names;
int g_done;
@@ -135,7 +136,6 @@ _bs_send_msg(spdk_thread_fn fn, void *ctx, void *thread_ctx)
	}
}

#if 0
static void
_bs_flush_scheduler(void)
{
@@ -148,7 +148,6 @@ _bs_flush_scheduler(void)
		free(ops);
	}
}
#endif

static void
bs_op_complete(void *cb_arg, int bserrno)
@@ -168,6 +167,7 @@ static void
blob_op_complete(void *cb_arg, int bserrno)
{
	g_bserrno = bserrno;
	g_blob_op_complete_count++;
}

static void
@@ -3285,7 +3285,91 @@ bs_load_iter(void)
	spdk_bs_unload(g_bs, bs_op_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	g_bs = NULL;
}

static void
blob_persist(void)
{
	struct spdk_bs_dev *dev;
	struct spdk_blob *blob;
	struct spdk_blob_opts opts;
	struct spdk_bs_channel *bs_channel;
	spdk_blob_id blobid;
	int rc;

	dev = init_dev();

	spdk_bs_init(dev, NULL, bs_op_with_handle_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	SPDK_CU_ASSERT_FATAL(g_bs != NULL);
	bs_channel = spdk_io_channel_get_ctx(g_bs->md_channel);

	spdk_blob_opts_init(&opts);

	spdk_bs_create_blob_ext(g_bs, &opts, blob_op_with_id_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	CU_ASSERT(g_blobid != SPDK_BLOBID_INVALID);
	blobid = g_blobid;

	spdk_bs_open_blob(g_bs, blobid, blob_op_with_handle_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	SPDK_CU_ASSERT_FATAL(g_blob != NULL);
	blob = g_blob;

	CU_ASSERT(blob->state == SPDK_BLOB_STATE_CLEAN);
	rc = spdk_blob_resize(blob, 5);
	CU_ASSERT(rc == 0);
	CU_ASSERT(blob->state == SPDK_BLOB_STATE_DIRTY);
	CU_ASSERT(blob->persist_in_progress == false);

	g_scheduler_delay = true;

	g_blob_op_complete_count = 0;
	spdk_blob_sync_md(blob, blob_op_complete, NULL);
	/* scheduler is delayed, so blob_op_complete should not have been called yet. */
	CU_ASSERT(g_blob_op_complete_count == 0);
	CU_ASSERT(blob->state == SPDK_BLOB_STATE_CLEAN);
	CU_ASSERT(blob->persist_in_progress == true);

	/* Dirty the blob's metadata again. */
	rc = spdk_blob_set_xattr(blob, "test", "foo", strlen("foo") + 1);
	CU_ASSERT(rc == 0);
	CU_ASSERT(blob->state == SPDK_BLOB_STATE_DIRTY);

	spdk_blob_sync_md(blob, blob_op_complete, NULL);
	/* scheduler is delayed, so blob_op_complete should not have been called yet. */
	CU_ASSERT(g_blob_op_complete_count == 0);
	/*
	 * The blob's metadata state should still be dirty.  We have not started persisting
	 *  the xattr change yet, because the sync operation for the resize operation is
	 *  still in progress.  Instead we should see a persist operation queued to execute
	 *  after the first one is done.
	 */
	CU_ASSERT(blob->state == SPDK_BLOB_STATE_DIRTY);
	CU_ASSERT(blob->persist_in_progress == true);
	CU_ASSERT(!TAILQ_EMPTY(&bs_channel->queued_blob_persists));

	/*
	 * Start another sync.  This just stresses the spdk_blob_persist_complete logic to
	 *  make sure it processes all of the persist operations when multiple ones are
	 *  queued.
	 */
	spdk_blob_sync_md(blob, blob_op_complete, NULL);
	CU_ASSERT(g_blob_op_complete_count == 0);
	CU_ASSERT(blob->state == SPDK_BLOB_STATE_DIRTY);
	CU_ASSERT(blob->persist_in_progress == true);
	CU_ASSERT(!TAILQ_EMPTY(&bs_channel->queued_blob_persists));

	_bs_flush_scheduler();
	CU_ASSERT(g_blob_op_complete_count == 3);
	CU_ASSERT(blob->state == SPDK_BLOB_STATE_CLEAN);
	CU_ASSERT(blob->persist_in_progress == false);
	CU_ASSERT(TAILQ_EMPTY(&bs_channel->queued_blob_persists));
	CU_ASSERT(g_bserrno == 0);

	spdk_blob_close(blob, blob_op_complete, NULL);
	CU_ASSERT(g_bserrno == 0);
	g_scheduler_delay = false;
}

int main(int argc, char **argv)
@@ -3342,7 +3426,8 @@ int main(int argc, char **argv)
		CU_add_test(suite, "blob_insert_cluster_msg", blob_insert_cluster_msg) == NULL ||
		CU_add_test(suite, "blob_thin_prov_rw", blob_thin_prov_rw) == NULL ||
		CU_add_test(suite, "blob_thin_prov_rw_iov", blob_thin_prov_rw_iov) == NULL ||
		CU_add_test(suite, "bs_load_iter", bs_load_iter) == NULL
		CU_add_test(suite, "bs_load_iter", bs_load_iter) == NULL ||
		CU_add_test(suite, "blob_persist", blob_persist) == NULL
	) {
		CU_cleanup_registry();
		return CU_get_error();