Commit e18eaee2 authored by Ziye Yang's avatar Ziye Yang Committed by Tomasz Zawadzki
Browse files

bdev/rbd: Add cluster register/unregister RPC support



This patch is used to add two rpc calls:
bdev_rbd_register_cluster
bdev_rbd_unregister_cluster

Then in the next patch, rbd bdev constructed on the same cluster object
can share the common Rados_t structure in order to remove the thread creation
overhead and improve the scalability.

Signed-off-by: default avatarZiye Yang <ziye.yang@intel.com>
Change-Id: I898cc4ffabb8e6721ba5bef099cbf948c64d2c98
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/7551


Community-CI: Broadcom CI
Community-CI: Mellanox Build Bot
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarChangpeng Liu <changpeng.liu@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
parent fb68d4e9
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
@@ -15,6 +15,12 @@ Added `min_cntlid` and `max_cntlid` to `nvmf_create_subsystem` to limit the cont

Added a new function `spdk_nvme_ns_cmd_copy` to submit a Simple Copy Command to a Namespace.

### rpc

New RPC `bdev_rbd_register_cluster` and `bdev_rbd_unregister_cluster` was added, it allows to create
and delete the rados object cluster, then users can choose the cluster to create related rbd
device.

## v21.04:

### accel
+102 −0
Original line number Diff line number Diff line
@@ -3303,6 +3303,108 @@ Example response:
}
~~~

## bdev_rbd_register_cluster {#rpc_bdev_rbd_register_cluster}

This method is available only if SPDK was build with Ceph RBD support.

### Parameters

Name                    | Optional | Type        | Description
----------------------- | -------- | ----------- | -----------
name                    | Required | string      | Registerd Rados cluster object name
user_id                 | Optional | string      | Ceph ID (i.e. admin, not client.admin)
config_param            | Optional | string map  | Explicit librados configuration
config_file             | Optional | string      | File path of libraodos configuration file

This RPC registers a Rados Cluster object handle which is only known
to rbd module, it uses user_id + config_param or user_id + config_file to
identify a Rados cluster object.

If no config_param is specified, Ceph configuration files must exist with
all relevant settings for accessing the Ceph cluster. If a config map is
passed, the configuration files are ignored and instead all key/value
pairs are passed to rados_conf_set to configure cluster access. In
practice, "mon_host" (= list of monitor address+port) and "key" (= the
secret key stored in Ceph keyrings) are enough.

When accessing the Ceph cluster as some user other than "admin" (the
default), the "user_id" has to be set.

### Result

Name of newly created Rados cluster object.

### Example

Example request with `key` from `/etc/ceph/ceph.client.admin.keyring`:

~~
{
  "params": {
    "name": "rbd_cluster",
    "config_param": {
      "mon_host": "192.168.7.1:6789,192.168.7.2:6789",
      "key": "AQDwf8db7zR1GRAA5k7NKXjS5S5V4mntwUDnGQ==",
    }
  },
  "jsonrpc": "2.0",
  "method": "bdev_rbd_register_cluster",
  "id": 1
}
~~

Example response:

~~
response:
{
  "jsonrpc": "2.0",
  "id": 1,
  "result": "rbd_cluster"
}
~~

## bdev_rbd_unregister_cluster {#rpc_bdev_rbd_unregister_cluster}

This method is available only if SPDK was build with Ceph RBD support.
If there is still rbd bdev using this cluster, the unregisteration operation
will fail.

### Result

`true` if Rados cluster object with provided name was deleted or `false` otherwise.

### Parameters

Name                    | Optional | Type        | Description
----------------------- | -------- | ----------- | -------------------------
name                    | Required | string      | Rados cluster object name

### Example

Example request:

~~
{
  "params": {
    "name": "rbd_cluster"
  },
  "jsonrpc": "2.0",
  "method": "bdev_rbd_unregister_cluster",
  "id": 1
}
~~

Example response:

~~
{
  "jsonrpc": "2.0",
  "id": 1,
  "result": true
}
~~

## bdev_rbd_create {#rpc_bdev_rbd_create}

Create @ref bdev_config_rbd bdev
+187 −0
Original line number Diff line number Diff line
@@ -86,6 +86,32 @@ struct bdev_rbd_io {
	size_t	total_len;
};

struct bdev_rbd_cluster {
	char *name;
	char *user_id;
	char **config_param;
	char *config_file;
	rados_t cluster;
	uint32_t ref;
	STAILQ_ENTRY(bdev_rbd_cluster) link;
};

static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER(
			g_map_bdev_rbd_cluster);
static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER;

static void
bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry)
{
	assert(entry != NULL);

	bdev_rbd_free_config(entry->config_param);
	free(entry->config_file);
	free(entry->user_id);
	free(entry->name);
	free(entry);
}

static void
bdev_rbd_free(struct bdev_rbd *rbd)
{
@@ -650,6 +676,167 @@ static const struct spdk_bdev_fn_table rbd_fn_table = {
	.write_config_json	= bdev_rbd_write_config_json,
};

static int
rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param,
		     const char *config_file)
{
	struct bdev_rbd_cluster *entry;
	int rc;

	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
		if (strncmp(name, entry->name, strlen(entry->name)) == 0) {
			SPDK_ERRLOG("Cluster name=%s already exists\n", name);
			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
			return -1;
		}
	}

	entry = calloc(1, sizeof(*entry));
	if (!entry) {
		SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name);
		pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
		return -1;
	}

	entry->name = strdup(name);
	if (entry->name == NULL) {
		SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry);
		goto err_handle;
	}

	if (user_id) {
		entry->user_id = strdup(user_id);
		if (entry->user_id == NULL) {
			SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry);
			goto err_handle;
		}
	}

	/* The first priority is the config_param, then we use the config_file */
	if (config_param) {
		entry->config_param = bdev_rbd_dup_config(config_param);
		if (entry->config_param == NULL) {
			SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry);
			goto err_handle;
		}
	} else if (config_file) {
		entry->config_file = strdup(config_file);
		if (entry->config_file == NULL) {
			SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry);
			goto err_handle;
		}
	}

	rc = rados_create(&entry->cluster, user_id);
	if (rc < 0) {
		SPDK_ERRLOG("Failed to create rados_t struct\n");
		goto err_handle;
	}

	if (config_param) {
		const char *const *config_entry = config_param;
		while (*config_entry) {
			rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]);
			if (rc < 0) {
				SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]);
				rados_shutdown(entry->cluster);
				goto err_handle;
			}
			config_entry += 2;
		}
	} else {
		rc = rados_conf_read_file(entry->cluster, entry->config_file);
		if (rc < 0) {
			SPDK_ERRLOG("Failed to read conf file\n");
			rados_shutdown(entry->cluster);
			goto err_handle;
		}
	}

	rc = rados_connect(entry->cluster);
	if (rc < 0) {
		SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster);
		rados_shutdown(entry->cluster);
		goto err_handle;
	}

	STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link);
	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);

	return 0;

err_handle:
	bdev_rbd_cluster_free(entry);
	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
	return -1;
}

int
bdev_rbd_unregister_cluster(const char *name)
{
	struct bdev_rbd_cluster *entry;
	int rc = 0;

	if (name == NULL) {
		return -1;
	}

	pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
	STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
		if (strncmp(name, entry->name, strlen(entry->name)) == 0) {
			if (entry->ref == 0) {
				STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link);
				rados_shutdown(entry->cluster);
				bdev_rbd_cluster_free(entry);
			} else {
				SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n",
					    entry->name);
				rc = -1;
			}

			pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
			return rc;
		}
	}

	pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);

	SPDK_ERRLOG("Could not find the cluster name =%p\n", name);

	return -1;
}

static void *
_bdev_rbd_register_cluster(void *arg)
{
	struct cluster_register_info *info = arg;
	void *ret = arg;
	int rc;

	rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id,
				  (const char *const *)info->config_param, (const char *)info->config_file);
	if (rc) {
		ret = NULL;
	}

	return ret;
}

int
bdev_rbd_register_cluster(struct cluster_register_info *info)
{
	assert(info != NULL);

	/* Rados cluster info need to be created in non SPDK-thread to avoid CPU
	 * resource contention */
	if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) {
		return -1;
	}

	return 0;
}

int
bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id,
		const char *pool_name,
+21 −0
Original line number Diff line number Diff line
@@ -38,6 +38,13 @@

#include "spdk/bdev.h"

struct cluster_register_info {
	char *name;
	char *user_id;
	char **config_param;
	char *config_file;
};

void bdev_rbd_free_config(char **config);
char **bdev_rbd_dup_config(const char *const *config);

@@ -65,4 +72,18 @@ void bdev_rbd_delete(struct spdk_bdev *bdev, spdk_delete_rbd_complete cb_fn,
 */
int bdev_rbd_resize(struct spdk_bdev *bdev, const uint64_t new_size_in_mb);

/**
 * Create a Rados cluster.
 *
 * \param info the info to register the Rados cluster object
 */
int bdev_rbd_register_cluster(struct cluster_register_info *info);

/**
 * Delete a registered cluster.
 *
 * \param name the name of the cluster to be deleted.
 */
int bdev_rbd_unregister_cluster(const char *name);

#endif /* SPDK_BDEV_RBD_H */
+89 −0
Original line number Diff line number Diff line
@@ -244,3 +244,92 @@ cleanup:
	free_rpc_bdev_rbd_resize(&req);
}
SPDK_RPC_REGISTER("bdev_rbd_resize", rpc_bdev_rbd_resize, SPDK_RPC_RUNTIME)

static void
free_rpc_register_cluster(struct cluster_register_info *req)
{
	free(req->name);
	free(req->user_id);
	bdev_rbd_free_config(req->config_param);
	free(req->config_file);
}

static const struct spdk_json_object_decoder rpc_register_cluster_decoders[] = {
	{"name", offsetof(struct cluster_register_info, name), spdk_json_decode_string, true},
	{"user_id", offsetof(struct cluster_register_info, user_id), spdk_json_decode_string, true},
	{"config_param", offsetof(struct cluster_register_info, config_param), bdev_rbd_decode_config, true},
	{"config_file", offsetof(struct cluster_register_info, config_file), bdev_rbd_decode_config, true}
};

static void
rpc_bdev_rbd_register_cluster(struct spdk_jsonrpc_request *request,
			      const struct spdk_json_val *params)
{
	struct cluster_register_info req = {};
	int rc = 0;
	struct spdk_json_write_ctx *w;

	if (spdk_json_decode_object(params, rpc_register_cluster_decoders,
				    SPDK_COUNTOF(rpc_register_cluster_decoders),
				    &req)) {
		SPDK_DEBUGLOG(bdev_rbd, "spdk_json_decode_object failed\n");
		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
						 "spdk_json_decode_object failed");
		goto cleanup;
	}

	rc = bdev_rbd_register_cluster(&req);
	if (rc) {
		spdk_jsonrpc_send_error_response(request, rc, spdk_strerror(-rc));
		goto cleanup;
	}

	w = spdk_jsonrpc_begin_result(request);
	spdk_json_write_string(w, req.name);
	spdk_jsonrpc_end_result(request, w);
cleanup:
	free_rpc_register_cluster(&req);
}
SPDK_RPC_REGISTER("bdev_rbd_register_cluster", rpc_bdev_rbd_register_cluster, SPDK_RPC_RUNTIME)

struct rpc_bdev_rbd_unregister_cluster {
	char *name;
};

static void
free_rpc_bdev_cluster_unregister(struct rpc_bdev_rbd_unregister_cluster *req)
{
	free(req->name);
}

static const struct spdk_json_object_decoder rpc_bdev_rbd_unregister_cluster_decoders[] = {
	{"name", offsetof(struct rpc_bdev_rbd_unregister_cluster, name), spdk_json_decode_string},
};

static void
rpc_bdev_rbd_unregister_cluster(struct spdk_jsonrpc_request *request,
				const struct spdk_json_val *params)
{
	struct rpc_bdev_rbd_unregister_cluster req = {NULL};
	int rc;

	if (spdk_json_decode_object(params, rpc_bdev_rbd_unregister_cluster_decoders,
				    SPDK_COUNTOF(rpc_bdev_rbd_unregister_cluster_decoders),
				    &req)) {
		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
						 "spdk_json_decode_object failed");
		goto cleanup;
	}

	rc = bdev_rbd_unregister_cluster(req.name);
	if (rc) {
		spdk_jsonrpc_send_error_response(request, rc, spdk_strerror(-rc));
		goto cleanup;
	}

	spdk_jsonrpc_send_bool_response(request, true);

cleanup:
	free_rpc_bdev_cluster_unregister(&req);
}
SPDK_RPC_REGISTER("bdev_rbd_unregister_cluster", rpc_bdev_rbd_unregister_cluster, SPDK_RPC_RUNTIME)
Loading