Commit 199e7390 authored by Maciej Szwed's avatar Maciej Szwed Committed by Daniel Verkamp
Browse files

lvol: Lvol store tasting



This patch introduces lvol store and lvols parameters saving
on persistent memories and loading it from saved data on app start.

Signed-off-by: default avatarMaciej Szwed <maciej.szwed@intel.com>
Change-Id: Ia63f0cf3d6365d59f31c5f0a1724636bfe73b5b8
Reviewed-on: https://review.gerrithub.io/375764


Tested-by: default avatarSPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarDaniel Verkamp <daniel.verkamp@intel.com>
parent ee5c4f6f
Loading
Loading
Loading
Loading
+3 −1
Original line number Diff line number Diff line
@@ -125,7 +125,9 @@ if [ $SPDK_TEST_VHOST -eq 1 ]; then
	run_test ./test/vhost/spdk_vhost.sh --integrity-blk
	run_test ./test/vhost/spdk_vhost.sh --integrity
	run_test ./test/vhost/spdk_vhost.sh --integrity-lvol-scsi
	run_test ./test/vhost/spdk_vhost.sh --integrity-lvol-blk
	# Disable for now, until we can properly destroy lvol store from scsi test
	#  before starting blk test.
	#run_test ./test/vhost/spdk_vhost.sh --integrity-lvol-blk
	run_test ./test/lvol/lvol.sh --test-cases=1,2,3,5,6,7,10,11,12,13,16,17,21,22,23
	timing_exit vhost
fi
+4 −0
Original line number Diff line number Diff line
@@ -72,5 +72,9 @@ struct spdk_io_channel *spdk_lvol_get_io_channel(struct spdk_lvol *lvol);
struct lvol_store_bdev *vbdev_get_lvs_bdev_by_lvs(struct spdk_lvol_store *lvs_orig);
struct spdk_lvol *vbdev_get_lvol_by_name(const char *name);
struct spdk_lvol_store *vbdev_get_lvol_store_by_uuid(uuid_t uuid);
void spdk_lvs_load(struct spdk_bs_dev *bs_dev, spdk_lvs_op_with_handle_complete cb_fn,
		   void *cb_arg);
void spdk_lvol_open(struct spdk_lvol *lvol, spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg);


#endif  /* SPDK_LVOL_H */
+2 −0
Original line number Diff line number Diff line
@@ -76,6 +76,8 @@ struct spdk_lvol_store {
	uuid_t				uuid;
	struct spdk_lvs_req		*destruct_req;
	uint64_t			total_blocks;
	int				lvol_count;
	int				lvols_opened;
	TAILQ_HEAD(, spdk_lvol)		lvols;
};

+124 −3
Original line number Diff line number Diff line
@@ -35,6 +35,7 @@
#include "spdk/rpc.h"
#include "spdk_internal/bdev.h"
#include "spdk_internal/log.h"
#include "spdk/string.h"

#include "vbdev_lvol.h"

@@ -286,7 +287,22 @@ vbdev_get_lvol_by_name(const char *name)
static void
_vbdev_lvol_close_cb(void *cb_arg, int lvserrno)
{
	SPDK_INFOLOG(SPDK_TRACE_VBDEV_LVOL, "Lvol closed\n");
	struct spdk_lvol_store *lvs;

	if (cb_arg == NULL) {
		/*
		 * This close cb is from unload/destruct - so do not continue to check
		 *  the lvol open counts.
		 */
		return;
	}

	lvs = cb_arg;

	if (lvs->lvols_opened >= lvs->lvol_count) {
		SPDK_INFOLOG(SPDK_TRACE_VBDEV_LVOL, "Opening lvols finished\n");
		spdk_bdev_module_examine_done(SPDK_GET_BDEV_MODULE(lvol));
	}
}

static void
@@ -646,12 +662,117 @@ vbdev_lvs_get_ctx_size(void)
	return sizeof(struct lvol_task);
}

static void
_vbdev_lvs_examine_finish(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
{
	struct spdk_lvol_store *lvs = cb_arg;
	struct spdk_bdev *bdev;

	if (lvolerrno != 0) {
		SPDK_ERRLOG("Error opening lvol %s\n", lvol->name);
		TAILQ_REMOVE(&lvs->lvols, lvol, link);
		lvs->lvol_count--;
		free(lvol->name);
		free(lvol);
		goto end;
	}

	bdev = _create_lvol_disk(lvol);
	if (bdev == NULL) {
		SPDK_ERRLOG("Cannot create bdev for lvol %s\n", lvol->name);
		TAILQ_REMOVE(&lvs->lvols, lvol, link);
		lvs->lvol_count--;
		spdk_bs_md_close_blob(&lvol->blob, _vbdev_lvol_close_cb, lvs);
		SPDK_INFOLOG(SPDK_TRACE_VBDEV_LVOL, "Opening lvol %s failed\n", lvol->name);
		free(lvol->name);
		free(lvol);
		return;
	}

	lvol->bdev = bdev;
	lvs->lvols_opened++;
	SPDK_INFOLOG(SPDK_TRACE_VBDEV_LVOL, "Opening lvol %s succeeded\n", lvol->name);

end:

	if (lvs->lvols_opened >= lvs->lvol_count) {
		SPDK_INFOLOG(SPDK_TRACE_VBDEV_LVOL, "Opening lvols finished\n");
		spdk_bdev_module_examine_done(SPDK_GET_BDEV_MODULE(lvol));
	}
}

static void
_vbdev_lvs_examine_cb(void *arg, struct spdk_lvol_store *lvol_store, int lvserrno)
{
	struct lvol_store_bdev *lvs_bdev;
	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)arg;
	struct spdk_lvol *lvol, *tmp;

	if (lvserrno != 0) {
		SPDK_INFOLOG(SPDK_TRACE_VBDEV_LVOL, "Lvol store not found on %s\n", req->base_bdev->name);
		spdk_bdev_module_examine_done(SPDK_GET_BDEV_MODULE(lvol));
		goto end;
	}

	lvserrno = spdk_bs_bdev_claim(lvol_store->bs_dev, SPDK_GET_BDEV_MODULE(lvol));
	if (lvserrno != 0) {
		SPDK_INFOLOG(SPDK_TRACE_VBDEV_LVOL, "Lvol store base bdev already claimed by another bdev\n");
		lvol_store->bs_dev->destroy(lvol_store->bs_dev);
		spdk_bdev_module_examine_done(SPDK_GET_BDEV_MODULE(lvol));
		goto end;
	}

	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
	if (!lvs_bdev) {
		SPDK_ERRLOG("Cannot alloc memory for lvs_bdev\n");
		spdk_bdev_module_examine_done(SPDK_GET_BDEV_MODULE(lvol));
		goto end;
	}

	lvs_bdev->lvs = lvol_store;
	lvs_bdev->bdev = req->base_bdev;

	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);

	SPDK_INFOLOG(SPDK_TRACE_VBDEV_LVOL, "Lvol store found on %s - begin parsing\n",
		     req->base_bdev->name);

	lvol_store->lvols_opened = 0;

	/* Open all lvols */
	TAILQ_FOREACH_SAFE(lvol, &lvol_store->lvols, link, tmp) {
		spdk_lvol_open(lvol, _vbdev_lvs_examine_finish, lvol_store);
	}

end:
	free(req);
}

static void
vbdev_lvs_examine(struct spdk_bdev *bdev)
{
	struct spdk_bs_dev *bs_dev;
	struct spdk_lvs_with_handle_req *req;

	req = calloc(1, sizeof(*req));
	if (req == NULL) {
		spdk_bdev_module_examine_done(SPDK_GET_BDEV_MODULE(lvol));
		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
		return;
	}

	bs_dev = spdk_bdev_create_bs_dev(bdev, vbdev_lvs_hotremove_cb, bdev);
	if (!bs_dev) {
		SPDK_ERRLOG("Cannot create bs dev\n");
		spdk_bdev_module_examine_done(SPDK_GET_BDEV_MODULE(lvol));
		free(req);
		return;
	}

	req->base_bdev = bdev;

	spdk_lvs_load(bs_dev, _vbdev_lvs_examine_cb, req);
}
SPDK_BDEV_MODULE_REGISTER(lvol, vbdev_lvs_init, vbdev_lvs_fini, NULL, vbdev_lvs_get_ctx_size,
			  vbdev_lvs_examine)
SPDK_LOG_REGISTER_TRACE_FLAG("vbdev_lvol", SPDK_TRACE_VBDEV_LVOL);
+266 −1
Original line number Diff line number Diff line
@@ -35,6 +35,7 @@
#include "spdk_internal/log.h"
#include "spdk/string.h"
#include "spdk/io_channel.h"
#include "spdk/blob_bdev.h"

/* Length of string returned from uuid_unparse() */
#define UUID_STRING_LEN 37
@@ -47,6 +48,268 @@ divide_round_up(size_t num, size_t divisor)
	return (num + divisor - 1) / divisor;
}

static void
_spdk_lvol_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
{
	struct spdk_lvol_with_handle_req *req = cb_arg;
	struct spdk_lvol *lvol = req->lvol;

	if (lvolerrno != 0) {
		SPDK_INFOLOG(SPDK_TRACE_LVOL, "Failed to open lvol %s\n", lvol->name);
		goto end;
	}

	lvol->blob = blob;
end:
	req->cb_fn(req->cb_arg, lvol, lvolerrno);
	free(req);
}

void
spdk_lvol_open(struct spdk_lvol *lvol, spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
{
	struct spdk_lvol_with_handle_req *req;

	assert(cb_fn != NULL);

	if (lvol == NULL) {
		SPDK_ERRLOG("lvol does not exist\n");
		cb_fn(cb_arg, NULL, -ENODEV);
		return;
	}

	req = calloc(1, sizeof(*req));
	if (req == NULL) {
		SPDK_ERRLOG("Cannot alloc memory for request structure\n");
		cb_fn(cb_arg, NULL, -ENOMEM);
		return;
	}

	req->cb_fn = cb_fn;
	req->cb_arg = cb_arg;
	req->lvol = lvol;

	spdk_bs_md_open_blob(lvol->lvol_store->blobstore, lvol->blob_id, _spdk_lvol_open_cb, req);
}


static void
_spdk_load_next_lvol(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
{
	struct spdk_lvs_with_handle_req *req = cb_arg;
	struct spdk_lvol_store *lvs = req->lvol_store;
	struct spdk_blob_store *bs = lvs->blobstore;
	struct spdk_lvol *lvol, *tmp;
	spdk_blob_id blob_id;
	char uuid[UUID_STRING_LEN];

	if (lvolerrno == -ENOENT) {
		/* Finished iterating */
		req->cb_fn(req->cb_arg, lvs, 0);
		free(req);
		return;
	} else if (lvolerrno < 0) {
		TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
			TAILQ_REMOVE(&lvs->lvols, lvol, link);
			free(lvol->name);
			free(lvol);
		}
		SPDK_ERRLOG("Failed to fetch blobs list\n");
		req->cb_fn(req->cb_arg, lvs, lvolerrno);
		free(req);
		return;
	}

	blob_id = spdk_blob_get_id(blob);

	if (blob_id == lvs->super_blob_id) {
		SPDK_INFOLOG(SPDK_TRACE_LVOL, "found superblob %"PRIu64"\n", (uint64_t)blob_id);
		spdk_bs_md_iter_next(bs, &blob, _spdk_load_next_lvol, req);
		return;
	}

	lvol = calloc(1, sizeof(*lvol));
	if (!lvol) {
		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
		req->cb_fn(req->cb_arg, lvs, -ENOMEM);
		free(req);
		return;
	}

	lvol->blob = blob;
	lvol->blob_id = blob_id;
	lvol->lvol_store = lvs;
	lvol->num_clusters = spdk_blob_get_num_clusters(blob);
	lvol->close_only = false;
	uuid_unparse(lvol->lvol_store->uuid, uuid);
	lvol->name = spdk_sprintf_alloc("%s_%"PRIu64, uuid, (uint64_t)blob_id);
	if (!lvol->name) {
		SPDK_ERRLOG("Cannot assign lvol name\n");
		req->cb_fn(req->cb_arg, lvs, -ENOMEM);
		free(req);
		free(lvol);
		return;
	}

	TAILQ_INSERT_TAIL(&lvs->lvols, lvol, link);

	lvs->lvol_count++;

	SPDK_INFOLOG(SPDK_TRACE_LVOL, "added lvol %s\n", lvol->name);

	spdk_bs_md_iter_next(bs, &blob, _spdk_load_next_lvol, req);
}

static void
_spdk_bs_unload_with_error_cb(void *cb_arg, int lvolerrno)
{
	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;

	req->cb_fn(req->cb_arg, NULL, -ENODEV);
	free(req);
}

static void
_spdk_close_super_cb(void *cb_arg, int lvolerrno)
{
	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
	struct spdk_lvol_store *lvs = req->lvol_store;
	struct spdk_blob_store *bs = lvs->blobstore;

	if (lvolerrno != 0) {
		SPDK_INFOLOG(SPDK_TRACE_LVOL, "Could not close super blob\n");
		free(lvs);
		spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
		return;
	}

	/* Start loading lvols */
	spdk_bs_md_iter_first(lvs->blobstore, _spdk_load_next_lvol, req);
}

static void
_spdk_close_super_blob_with_error_cb(void *cb_arg, int lvolerrno)
{
	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
	struct spdk_lvol_store *lvs = req->lvol_store;
	struct spdk_blob_store *bs = lvs->blobstore;

	free(lvs);

	spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
}

static void
_spdk_lvs_read_uuid(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
{
	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
	struct spdk_lvol_store *lvs = req->lvol_store;
	struct spdk_blob_store *bs = lvs->blobstore;
	const char *attr;
	size_t value_len;
	int rc;

	if (lvolerrno != 0) {
		SPDK_INFOLOG(SPDK_TRACE_LVOL, "Could not open super blob\n");
		free(lvs);
		spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
		return;
	}

	rc = spdk_bs_md_get_xattr_value(blob, "uuid", (const void **)&attr, &value_len);
	if (rc != 0 || value_len != UUID_STRING_LEN || attr[UUID_STRING_LEN - 1] != '\0') {
		SPDK_INFOLOG(SPDK_TRACE_LVOL, "missing or incorrect UUID\n");
		spdk_bs_md_close_blob(&blob, _spdk_close_super_blob_with_error_cb, req);
		return;
	}

	if (uuid_parse(attr, lvs->uuid)) {
		SPDK_INFOLOG(SPDK_TRACE_LVOL, "incorrect UUID '%s'\n", attr);
		spdk_bs_md_close_blob(&blob, _spdk_close_super_blob_with_error_cb, req);
		return;
	}

	lvs->super_blob_id = spdk_blob_get_id(blob);

	spdk_bs_md_close_blob(&blob, _spdk_close_super_cb, req);
}

static void
_spdk_lvs_open_super(void *cb_arg, spdk_blob_id blobid, int lvolerrno)
{
	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
	struct spdk_lvol_store *lvs = req->lvol_store;
	struct spdk_blob_store *bs = lvs->blobstore;

	if (lvolerrno != 0) {
		SPDK_INFOLOG(SPDK_TRACE_LVOL, "Super blob not found\n");
		free(lvs);
		spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
		return;
	}

	spdk_bs_md_open_blob(bs, blobid, _spdk_lvs_read_uuid, req);
}

static void
_spdk_lvs_load_cb(void *cb_arg, struct spdk_blob_store *bs, int lvolerrno)
{
	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
	struct spdk_lvol_store *lvs;

	if (lvolerrno != 0) {
		req->cb_fn(req->cb_arg, NULL, lvolerrno);
		free(req);
		return;
	}

	lvs = calloc(1, sizeof(*lvs));
	if (lvs == NULL) {
		SPDK_ERRLOG("Cannot alloc memory for lvol store\n");
		spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
		return;
	}

	lvs->blobstore = bs;
	lvs->bs_dev = req->bs_dev;
	TAILQ_INIT(&lvs->lvols);

	req->lvol_store = lvs;

	spdk_bs_get_super(bs, _spdk_lvs_open_super, req);
}

void
spdk_lvs_load(struct spdk_bs_dev *bs_dev, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
{
	struct spdk_lvs_with_handle_req *req;
	struct spdk_bs_opts opts = {};

	assert(cb_fn != NULL);

	if (bs_dev == NULL) {
		SPDK_ERRLOG("Blobstore device does not exist\n");
		cb_fn(cb_arg, NULL, -ENODEV);
		return;
	}

	req = calloc(1, sizeof(*req));
	if (req == NULL) {
		SPDK_ERRLOG("Cannot alloc memory for request structure\n");
		cb_fn(cb_arg, NULL, -ENOMEM);
		return;
	}

	req->cb_fn = cb_fn;
	req->cb_arg = cb_arg;
	req->bs_dev = bs_dev;

	spdk_bs_opts_init(&opts);
	strncpy(opts.bstype.bstype, "LVOLSTORE", SPDK_BLOBSTORE_TYPE_LENGTH);

	spdk_bs_load(bs_dev, &opts, _spdk_lvs_load_cb, req);
}

static void
_spdk_super_create_close_cb(void *cb_arg, int lvolerrno)
{
@@ -192,7 +455,7 @@ spdk_lvs_init(struct spdk_bs_dev *bs_dev, struct spdk_lvs_opts *o,
{
	struct spdk_lvol_store *lvs;
	struct spdk_lvs_with_handle_req *lvs_req;
	struct spdk_bs_opts opts;
	struct spdk_bs_opts opts = {};

	if (bs_dev == NULL) {
		SPDK_ERRLOG("Blobstore device does not exist\n");
@@ -227,6 +490,8 @@ spdk_lvs_init(struct spdk_bs_dev *bs_dev, struct spdk_lvs_opts *o,
	lvs_req->lvol_store = lvs;
	lvs->bs_dev = bs_dev;

	strncpy(opts.bstype.bstype, "LVOLSTORE", SPDK_BLOBSTORE_TYPE_LENGTH);

	SPDK_INFOLOG(SPDK_TRACE_LVOL, "Initializing lvol store\n");
	spdk_bs_init(bs_dev, &opts, _spdk_lvs_init_cb, lvs_req);

Loading