Commit 5c016026 authored by Ziye Yang's avatar Ziye Yang Committed by Tomasz Zawadzki
Browse files

bdev/rbd: Revise bdev_rbd_create rpc function.



Revise bdev_rbd_create rpc call to add an optional
parameter "--cluster-name", e.g., "--cluster-name Rados".

Then users can create a rbd bdev with registered
Rados Cluster. This shared strategy can be used to
remove the thread creation overhead if multiple rbds
are connected to the same Ceph cluster.

Signed-off-by: default avatarZiye Yang <ziye.yang@intel.com>
Change-Id: Ide5800f8fc6b2074805272a59731c666fe279b9a
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/7584


Community-CI: Broadcom CI
Community-CI: Mellanox Build Bot
Reviewed-by: default avatarChangpeng Liu <changpeng.liu@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
parent 4e4d865f
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -23,6 +23,9 @@ New RPC `bdev_rbd_register_cluster` and `bdev_rbd_unregister_cluster` was added,
and delete the rados object cluster, then users can choose the cluster to create related rbd
device.

Revised `bdev_rbd_create` parameter, it allows to use an optional parameter --cluster-name
to create a rbd bdev with  an already registered Rados Cluster Object.

## v21.04:

### accel
+32 −0
Original line number Diff line number Diff line
@@ -3421,6 +3421,7 @@ pool_name | Required | string | Pool name
rbd_name                | Required | string      | Image name
block_size              | Required | number      | Block size
config                  | Optional | string map  | Explicit librados configuration
cluster_name            | Optional | string      | Rados cluster object name created in this module.

If no config is specified, Ceph configuration files must exist with
all relevant settings for accessing the pool. If a config map is
@@ -3432,6 +3433,10 @@ secret key stored in Ceph keyrings) are enough.
When accessing the image as some user other than "admin" (the
default), the "user_id" has to be set.

If provided with cluster_name option, it will use the Rados cluster object
referenced by the name (created by bdev_rbd_register_cluster RPC) and ignores
"user_id + config" combination to create its own Rados cluster.

### Result

Name of newly created bdev.
@@ -3468,6 +3473,33 @@ response:
}
~~~

Example request with `cluster_name`:

~~
{
  "params": {
    "pool_name": "rbd",
    "rbd_name": "foo",
    "block_size": 4096,
    "cluster_name": "rbd_cluster"
  },
  "jsonrpc": "2.0",
  "method": "bdev_rbd_create",
  "id": 1
}
~~

Example response:

~~
response:
{
  "jsonrpc": "2.0",
  "id": 1,
  "result": "Ceph0"
}
~~

## bdev_rbd_delete {#rpc_bdev_rbd_delete}

Delete @ref bdev_config_rbd bdev
+145 −14
Original line number Diff line number Diff line
@@ -63,6 +63,8 @@ struct bdev_rbd {
	char *pool_name;
	char **config;
	rados_t cluster;
	rados_t *cluster_p;
	char *cluster_name;
	rbd_image_info_t info;
	TAILQ_ENTRY(bdev_rbd) tailq;
	struct spdk_poller *reset_timer;
@@ -113,14 +115,39 @@ bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry)
}

static void
bdev_rbd_free(struct bdev_rbd *rbd)
bdev_rbd_put_cluster(rados_t **cluster)
{
	if (!rbd) {
	struct bdev_rbd_cluster *entry;

	assert(cluster != NULL);

	/* No need go through the map if *cluster equals to NULL */
	if (*cluster == NULL) {
		return;
	}

	if (rbd->cluster) {
		rados_shutdown(rbd->cluster);
	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
		if (*cluster != &entry->cluster) {
			continue;
		}

		assert(entry->ref > 0);
		entry->ref--;
		*cluster = NULL;
		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
		return;
	}

	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
	SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster);
}

static void
bdev_rbd_free(struct bdev_rbd *rbd)
{
	if (!rbd) {
		return;
	}

	free(rbd->disk.name);
@@ -128,6 +155,14 @@ bdev_rbd_free(struct bdev_rbd *rbd)
	free(rbd->user_id);
	free(rbd->pool_name);
	bdev_rbd_free_config(rbd->config);

	if (rbd->cluster_name) {
		bdev_rbd_put_cluster(&rbd->cluster_p);
		free(rbd->cluster_name);
	} else if (rbd->cluster) {
		rados_shutdown(rbd->cluster);
	}

	free(rbd);
}

@@ -209,11 +244,49 @@ bdev_rados_cluster_init(const char *user_id, const char *const *config,
	return 0;
}

static int
bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster)
{
	struct bdev_rbd_cluster *entry;

	if (cluster == NULL) {
		SPDK_ERRLOG("cluster should not be NULL\n");
		return -1;
	}

	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
		if (strncmp(cluster_name, entry->name, strlen(entry->name)) == 0) {
			entry->ref++;
			*cluster = &entry->cluster;
			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
			return 0;
		}
	}

	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
	return -1;
}

static int
bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster)
{
	int ret;

	ret = bdev_rbd_get_cluster(cluster_name, cluster);
	if (ret < 0) {
		SPDK_ERRLOG("Failed to create rados_t struct\n");
		return -1;
	}

	return ret;
}

static void *
bdev_rbd_cluster_handle(void *arg)
{
	struct bdev_rbd *rbd = arg;
	void *ret = arg;
	struct bdev_rbd *rbd = arg;
	int rc;

	rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config,
@@ -234,14 +307,24 @@ bdev_rbd_init(struct bdev_rbd *rbd)
	rados_ioctx_t io_ctx = NULL;
	rbd_image_t image = NULL;

	if (!rbd->cluster_name) {
		rbd->cluster_p = &rbd->cluster;
		/* Cluster should be created in non-SPDK thread to avoid conflict between
		 * Rados and SPDK thread */
		if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) {
			SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd);
			return -1;
		}
	} else {
		ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p);
		if (ret < 0) {
			SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n",
				    rbd, rbd->cluster_name);
			return -1;
		}
	}

	ret = rados_ioctx_create(rbd->cluster, rbd->pool_name, &io_ctx);
	ret = rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &io_ctx);
	if (ret < 0) {
		SPDK_ERRLOG("Failed to create ioctx\n");
		return -1;
@@ -514,9 +597,9 @@ bdev_rbd_handle(void *arg)
	struct bdev_rbd_io_channel *ch = arg;
	void *ret = arg;

	assert(ch->disk->cluster != NULL);
	assert(ch->disk->cluster_p != NULL);

	if (rados_ioctx_create(ch->disk->cluster, ch->disk->pool_name, &ch->io_ctx) < 0) {
	if (rados_ioctx_create(*(ch->disk->cluster_p), ch->disk->pool_name, &ch->io_ctx) < 0) {
		SPDK_ERRLOG("Failed to create ioctx\n");
		ret = NULL;
		return ret;
@@ -602,6 +685,40 @@ bdev_rbd_get_io_channel(void *ctx)
	return spdk_get_io_channel(rbd_bdev);
}

static void
bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w)
{
	struct bdev_rbd_cluster *entry;

	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
		if (strncmp(cluster_name, entry->name, strlen(entry->name))) {
			continue;
		}
		if (entry->user_id) {
			spdk_json_write_named_string(w, "user_id", entry->user_id);
		}

		if (entry->config_param) {
			char **config_entry = entry->config_param;

			spdk_json_write_named_object_begin(w, "config_param");
			while (*config_entry) {
				spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
				config_entry += 2;
			}
			spdk_json_write_object_end(w);
		} else if (entry->config_file) {
			spdk_json_write_named_string(w, "config_file", entry->config_file);
		}

		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
		return;
	}

	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
}

static int
bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
{
@@ -613,6 +730,11 @@ bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)

	spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name);

	if (rbd_bdev->cluster_name) {
		bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w);
		goto end;
	}

	if (rbd_bdev->user_id) {
		spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id);
	}
@@ -628,6 +750,7 @@ bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
		spdk_json_write_object_end(w);
	}

end:
	spdk_json_write_object_end(w);

	return 0;
@@ -842,7 +965,8 @@ bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id,
		const char *pool_name,
		const char *const *config,
		const char *rbd_name,
		uint32_t block_size)
		uint32_t block_size,
		const char *cluster_name)
{
	struct bdev_rbd *rbd;
	int ret;
@@ -871,6 +995,13 @@ bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id,
		}
	}

	if (cluster_name) {
		rbd->cluster_name = strdup(cluster_name);
		if (!rbd->cluster_name) {
			bdev_rbd_free(rbd);
			return -ENOMEM;
		}
	}
	rbd->pool_name = strdup(pool_name);
	if (!rbd->pool_name) {
		bdev_rbd_free(rbd);
+1 −1
Original line number Diff line number Diff line
@@ -53,7 +53,7 @@ typedef void (*spdk_delete_rbd_complete)(void *cb_arg, int bdeverrno);
int bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id,
		    const char *pool_name,
		    const char *const *config,
		    const char *rbd_name, uint32_t block_size);
		    const char *rbd_name, uint32_t block_size, const char *cluster_name);
/**
 * Delete rbd bdev.
 *
+5 −2
Original line number Diff line number Diff line
@@ -44,6 +44,7 @@ struct rpc_create_rbd {
	char *rbd_name;
	uint32_t block_size;
	char **config;
	char *cluster_name;
};

static void
@@ -54,6 +55,7 @@ free_rpc_create_rbd(struct rpc_create_rbd *req)
	free(req->pool_name);
	free(req->rbd_name);
	bdev_rbd_free_config(req->config);
	free(req->cluster_name);
}

static int
@@ -104,7 +106,8 @@ static const struct spdk_json_object_decoder rpc_create_rbd_decoders[] = {
	{"pool_name", offsetof(struct rpc_create_rbd, pool_name), spdk_json_decode_string},
	{"rbd_name", offsetof(struct rpc_create_rbd, rbd_name), spdk_json_decode_string},
	{"block_size", offsetof(struct rpc_create_rbd, block_size), spdk_json_decode_uint32},
	{"config", offsetof(struct rpc_create_rbd, config), bdev_rbd_decode_config, true}
	{"config", offsetof(struct rpc_create_rbd, config), bdev_rbd_decode_config, true},
	{"cluster_name", offsetof(struct rpc_create_rbd, cluster_name), spdk_json_decode_string, true}
};

static void
@@ -128,7 +131,7 @@ rpc_bdev_rbd_create(struct spdk_jsonrpc_request *request,
	rc = bdev_rbd_create(&bdev, req.name, req.user_id, req.pool_name,
			     (const char *const *)req.config,
			     req.rbd_name,
			     req.block_size);
			     req.block_size, req.cluster_name);
	if (rc) {
		spdk_jsonrpc_send_error_response(request, rc, spdk_strerror(-rc));
		goto cleanup;
Loading