Commit d84a88c1 authored by Jim Harris's avatar Jim Harris Committed by Tomasz Zawadzki
Browse files

bdev: add base infrastructure for locked lba ranges



This adds new internal APIs bdev_lock_lba_range and
bdev_unlock_lba_range.  To start, these APIs will
manage dissemination of lock/unlock requests to all
existing channels for a given bdev.  This does not
yet interact at all with any I/O sent to the channel.

Future patches will check new I/O to see if they
are trying to write to a range that is locked.  Future
patches will also ensure we do not have overlapping
ranges active at once.

Signed-off-by: default avatarJim Harris <james.r.harris@intel.com>
Change-Id: I6d5b1cc84b41a7adc2a3c5791c766bb77376581f
Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/478225


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Community-CI: Broadcom SPDK FC-NVMe CI <spdk-ci.pdl@broadcom.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: default avatarPaul Luse <paul.e.luse@intel.com>
Reviewed-by: default avatarMaciej Szwed <maciej.szwed@intel.com>
Reviewed-by: default avatarBen Walker <benjamin.walker@intel.com>
parent b87080ef
Loading
Loading
Loading
Loading
+214 −0
Original line number Diff line number Diff line
@@ -124,6 +124,8 @@ static struct spdk_bdev_mgr g_bdev_mgr = {
	.mutex = PTHREAD_MUTEX_INITIALIZER,
};

typedef void (*lock_range_cb)(void *ctx, int status);

struct lba_range {
	uint64_t			offset;
	uint64_t			length;
@@ -131,6 +133,8 @@ struct lba_range {
	TAILQ_ENTRY(lba_range)		tailq;
};

typedef TAILQ_HEAD(, lba_range) lba_range_tailq_t;

static struct spdk_bdev_opts	g_bdev_opts = {
	.bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE,
	.bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE,
@@ -282,6 +286,8 @@ struct spdk_bdev_channel {
#endif

	bdev_io_tailq_t		queued_resets;

	lba_range_tailq_t	locked_ranges;
};

struct media_event_entry {
@@ -2102,6 +2108,13 @@ static void
bdev_channel_destroy_resource(struct spdk_bdev_channel *ch)
{
	struct spdk_bdev_shared_resource *shared_resource;
	struct lba_range *range;

	while (!TAILQ_EMPTY(&ch->locked_ranges)) {
		range = TAILQ_FIRST(&ch->locked_ranges);
		TAILQ_REMOVE(&ch->locked_ranges, range, tailq);
		free(range);
	}

	spdk_put_io_channel(ch->channel);

@@ -2358,6 +2371,7 @@ bdev_channel_create(void *io_device, void *ctx_buf)
	ch->stat.ticks_rate = spdk_get_ticks_hz();
	ch->io_outstanding = 0;
	TAILQ_INIT(&ch->queued_resets);
	TAILQ_INIT(&ch->locked_ranges);
	ch->flags = 0;
	ch->shared_resource = shared_resource;

@@ -5651,6 +5665,206 @@ spdk_bdev_notify_media_management(struct spdk_bdev *bdev)
	pthread_mutex_unlock(&bdev->internal.mutex);
}

struct locked_lba_range_ctx {
	struct lba_range		range;
	struct spdk_bdev		*bdev;
	struct lba_range		*owner_range;
	struct spdk_poller		*poller;
	lock_range_cb			cb_fn;
	void				*cb_arg;
};

static void
bdev_lock_error_cleanup_cb(struct spdk_io_channel_iter *i, int status)
{
	struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i);

	ctx->cb_fn(ctx->cb_arg, -ENOMEM);
	free(ctx);
}

static void
bdev_unlock_lba_range_get_channel(struct spdk_io_channel_iter *i);

static void
bdev_lock_lba_range_cb(struct spdk_io_channel_iter *i, int status)
{
	struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
	struct spdk_bdev *bdev = ctx->bdev;

	if (status == -ENOMEM) {
		/* One of the channels could not allocate a range object.
		 * So we have to go back and clean up any ranges that were
		 * allocated successfully before we return error status to
		 * the caller.  We can reuse the unlock function to do that
		 * clean up.
		 */
		spdk_for_each_channel(__bdev_to_io_dev(bdev),
				      bdev_unlock_lba_range_get_channel, ctx,
				      bdev_lock_error_cleanup_cb);
		return;
	}

	/* All channels have locked this range and no I/O overlapping the range
	 * are outstanding!  Set the owner_ch for the range object for the
	 * locking channel, so that this channel will know that it is allowed
	 * to write to this range.
	 */
	ctx->owner_range->owner_ch = ctx->range.owner_ch;
	ctx->cb_fn(ctx->cb_arg, status);
	free(ctx);
}

static void
bdev_lock_lba_range_get_channel(struct spdk_io_channel_iter *i)
{
	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
	struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
	struct lba_range *range;

	range = calloc(1, sizeof(*range));
	if (range == NULL) {
		spdk_for_each_channel_continue(i, -ENOMEM);
		return;
	}

	range->length = ctx->range.length;
	range->offset = ctx->range.offset;
	if (ctx->range.owner_ch == ch) {
		/* This is the range object for the channel that will hold
		 * the lock.  Store it in the ctx object so that we can easily
		 * set its owner_ch after the lock is finally acquired.
		 */
		ctx->owner_range = range;
	}
	TAILQ_INSERT_TAIL(&ch->locked_ranges, range, tailq);
	spdk_for_each_channel_continue(i, 0);
}

static void
bdev_lock_lba_range_ctx(struct spdk_bdev *bdev, struct locked_lba_range_ctx *ctx)
{
	/* We will add a copy of this range to each channel now. */
	spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_lock_lba_range_get_channel, ctx,
			      bdev_lock_lba_range_cb);
}

int
bdev_lock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch,
		    uint64_t offset, uint64_t length,
		    lock_range_cb cb_fn, void *cb_arg);

int
bdev_lock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch,
		    uint64_t offset, uint64_t length,
		    lock_range_cb cb_fn, void *cb_arg)
{
	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc);
	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
	struct locked_lba_range_ctx *ctx;

	ctx = calloc(1, sizeof(*ctx));
	if (ctx == NULL) {
		return -ENOMEM;
	}

	ctx->range.offset = offset;
	ctx->range.length = length;
	ctx->range.owner_ch = ch;
	ctx->bdev = bdev;
	ctx->cb_fn = cb_fn;
	ctx->cb_arg = cb_arg;

	bdev_lock_lba_range_ctx(bdev, ctx);
	return 0;
}

static void
bdev_unlock_lba_range_cb(struct spdk_io_channel_iter *i, int status)
{
	struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i);

	ctx->cb_fn(ctx->cb_arg, status);
	free(ctx);
}

static void
bdev_unlock_lba_range_get_channel(struct spdk_io_channel_iter *i)
{
	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
	struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
	struct lba_range *range;

	TAILQ_FOREACH(range, &ch->locked_ranges, tailq) {
		if (ctx->range.offset == range->offset &&
		    ctx->range.length == range->length) {
			TAILQ_REMOVE(&ch->locked_ranges, range, tailq);
			free(range);
			break;
		}
	}

	/* Note: we should almost always be able to assert that the range specified
	 * was found.  But there are some very rare corner cases where a new channel
	 * gets created simultaneously with a range unlock, where this function
	 * would execute on that new channel and wouldn't have the range.
	 * We also use this to clean up range allocations when a later allocation
	 * fails in the locking path.
	 * So we can't actually assert() here.
	 */

	spdk_for_each_channel_continue(i, 0);
}

int
bdev_unlock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch,
		      uint64_t offset, uint64_t length,
		      lock_range_cb cb_fn, void *cb_arg);

int
bdev_unlock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch,
		      uint64_t offset, uint64_t length,
		      lock_range_cb cb_fn, void *cb_arg)
{
	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc);
	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
	struct locked_lba_range_ctx *ctx;
	struct lba_range *range;
	bool range_found = false;

	/* Let's make sure the specified channel actually has a lock on
	 * the specified range.  Note that the range must match exactly.
	 */
	TAILQ_FOREACH(range, &ch->locked_ranges, tailq) {
		if (range->offset == offset && range->length == length &&
		    range->owner_ch == ch) {
			range_found = true;
			break;
		}
	}

	if (!range_found) {
		return -EINVAL;
	}

	ctx = calloc(1, sizeof(*ctx));
	if (ctx == NULL) {
		return -ENOMEM;
	}

	ctx->range.offset = offset;
	ctx->range.length = length;
	ctx->range.owner_ch = ch;
	ctx->cb_fn = cb_fn;
	ctx->cb_arg = cb_arg;

	spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_unlock_lba_range_get_channel, ctx,
			      bdev_unlock_lba_range_cb);
	return 0;
}

SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV)

SPDK_TRACE_REGISTER_FN(bdev_trace, "bdev", TRACE_GROUP_BDEV)
+72 −1
Original line number Diff line number Diff line
@@ -2586,6 +2586,76 @@ lba_range_overlap(void)
	CU_ASSERT(!bdev_lba_range_overlapped(&r1, &r2));
}

static bool g_lock_lba_range_done;
static bool g_unlock_lba_range_done;

static void
lock_lba_range_done(void *ctx, int status)
{
	g_lock_lba_range_done = true;
}

static void
unlock_lba_range_done(void *ctx, int status)
{
	g_unlock_lba_range_done = true;
}

static void
lock_lba_range_check_ranges(void)
{
	struct spdk_bdev *bdev;
	struct spdk_bdev_desc *desc = NULL;
	struct spdk_io_channel *io_ch;
	struct spdk_bdev_channel *channel;
	struct lba_range *range;
	int ctx1;
	int rc;

	spdk_bdev_initialize(bdev_init_cb, NULL);

	bdev = allocate_bdev("bdev0");

	rc = spdk_bdev_open(bdev, true, NULL, NULL, &desc);
	CU_ASSERT(rc == 0);
	CU_ASSERT(desc != NULL);
	io_ch = spdk_bdev_get_io_channel(desc);
	CU_ASSERT(io_ch != NULL);
	channel = spdk_io_channel_get_ctx(io_ch);

	g_lock_lba_range_done = false;
	rc = bdev_lock_lba_range(desc, io_ch, 20, 10, lock_lba_range_done, &ctx1);
	CU_ASSERT(rc == 0);
	poll_threads();

	CU_ASSERT(g_lock_lba_range_done == true);
	range = TAILQ_FIRST(&channel->locked_ranges);
	SPDK_CU_ASSERT_FATAL(range != NULL);
	CU_ASSERT(range->offset == 20);
	CU_ASSERT(range->length == 10);
	CU_ASSERT(range->owner_ch == channel);

	/* Unlocks must exactly match a lock. */
	g_unlock_lba_range_done = false;
	rc = bdev_unlock_lba_range(desc, io_ch, 20, 1, unlock_lba_range_done, &ctx1);
	CU_ASSERT(rc == -EINVAL);
	CU_ASSERT(g_unlock_lba_range_done == false);

	rc = bdev_unlock_lba_range(desc, io_ch, 20, 10, unlock_lba_range_done, &ctx1);
	CU_ASSERT(rc == 0);
	spdk_delay_us(100);
	poll_threads();

	CU_ASSERT(g_unlock_lba_range_done == true);
	CU_ASSERT(TAILQ_EMPTY(&channel->locked_ranges));

	spdk_put_io_channel(io_ch);
	spdk_bdev_close(desc);
	free_bdev(bdev);
	spdk_bdev_finish(bdev_fini_cb, NULL);
	poll_threads();
}

int
main(int argc, char **argv)
{
@@ -2622,7 +2692,8 @@ main(int argc, char **argv)
		CU_add_test(suite, "bdev_close_while_hotremove", bdev_close_while_hotremove) == NULL ||
		CU_add_test(suite, "bdev_open_ext", bdev_open_ext) == NULL ||
		CU_add_test(suite, "bdev_set_io_timeout", bdev_set_io_timeout) == NULL ||
		CU_add_test(suite, "lba_range_overlap", lba_range_overlap) == NULL
		CU_add_test(suite, "lba_range_overlap", lba_range_overlap) == NULL ||
		CU_add_test(suite, "lock_lba_range_check_ranges", lock_lba_range_check_ranges) == NULL
	) {
		CU_cleanup_registry();
		return CU_get_error();