Commit 1738550b authored by Ziye Yang's avatar Ziye Yang Committed by Tomasz Zawadzki
Browse files

bdev/rbd: Share the rados_ioctx_t if images are from same pool



For rbd images belonging to the same rados pool,
we can share the rados_ioctx_t structure, then it
can save the librados related resource in client side.

Because there is a configuration parameter `librados_thread_count`
which is reponsible for io_context_pool. Then if
we can share the same rados_ioctx_t if we have the same rados
pool under shared rados_t. It can save the client resources when
there are multiple images in the same pool.

Change-Id: I7f71191d7dd72eec359840a5767da74ff6e4ec92
Signed-off-by: default avatarZiye Yang <ziye.yang@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/18107


Community-CI: Mellanox Build Bot
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarChangpeng Liu <changpeng.liu@intel.com>
parent d9603a2f
Loading
Loading
Loading
Loading
+95 −5
Original line number Diff line number Diff line
@@ -23,6 +23,17 @@

static int bdev_rbd_count = 0;

struct bdev_rbd_pool_ctx {
	rados_t *cluster_p;
	char *name;
	rados_ioctx_t io_ctx;
	uint32_t ref;
	STAILQ_ENTRY(bdev_rbd_pool_ctx) link;
};

static STAILQ_HEAD(, bdev_rbd_pool_ctx) g_map_bdev_rbd_pool_ctx = STAILQ_HEAD_INITIALIZER(
			g_map_bdev_rbd_pool_ctx);

struct bdev_rbd {
	struct spdk_bdev disk;
	char *rbd_name;
@@ -34,7 +45,7 @@ struct bdev_rbd {
	rados_t *cluster_p;
	char *cluster_name;

	rados_ioctx_t io_ctx;
	struct bdev_rbd_pool_ctx *ctx;
	rbd_image_t image;

	rbd_image_info_t info;
@@ -116,6 +127,22 @@ bdev_rbd_put_cluster(rados_t **cluster)
	SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster);
}

static void
bdev_rbd_put_pool_ctx(struct bdev_rbd_pool_ctx *entry)
{
	assert(spdk_get_thread() == spdk_thread_get_app_thread());

	assert(entry != NULL);
	assert(entry->ref > 0);
	entry->ref--;
	if (entry->ref == 0) {
		STAILQ_REMOVE(&g_map_bdev_rbd_pool_ctx, entry, bdev_rbd_pool_ctx, link);
		rados_ioctx_destroy(entry->io_ctx);
		free(entry->name);
		free(entry);
	}
}

static void
bdev_rbd_free(struct bdev_rbd *rbd)
{
@@ -134,8 +161,12 @@ bdev_rbd_free(struct bdev_rbd *rbd)
	free(rbd->pool_name);
	bdev_rbd_free_config(rbd->config);

	if (rbd->io_ctx) {
		rados_ioctx_destroy(rbd->io_ctx);
	/* When rbd is destructed by bdev_rbd_destruct, it will not enter here
	 * because the ctx will already freed by bdev_rbd_free_cb in async manner.
	 * This path only happens during the rbd initialization procedure of rbd */
	if (rbd->ctx) {
		bdev_rbd_put_pool_ctx(rbd->ctx);
		rbd->ctx = NULL;
	}

	if (rbd->cluster_name) {
@@ -285,18 +316,69 @@ bdev_rbd_cluster_handle(void *arg)
	return ret;
}

static int
bdev_rbd_get_pool_ctx(rados_t *cluster_p, const char *name,  struct bdev_rbd_pool_ctx **ctx)
{
	struct bdev_rbd_pool_ctx *entry;

	assert(spdk_get_thread() == spdk_thread_get_app_thread());

	if (name == NULL || ctx == NULL) {
		return -1;
	}

	STAILQ_FOREACH(entry, &g_map_bdev_rbd_pool_ctx, link) {
		if (strcmp(name, entry->name) == 0 && cluster_p == entry->cluster_p) {
			entry->ref++;
			*ctx = entry;
			return 0;
		}
	}

	entry = calloc(1, sizeof(*entry));
	if (!entry) {
		SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name);
		return -1;
	}

	entry->name = strdup(name);
	if (entry->name == NULL) {
		SPDK_ERRLOG("Failed to allocate the name =%s space on entry =%p\n", name, entry);
		goto err_handle;
	}

	if (rados_ioctx_create(*cluster_p, name, &entry->io_ctx) < 0) {
		goto err_handle1;
	}

	entry->cluster_p = cluster_p;
	entry->ref = 1;
	*ctx = entry;
	STAILQ_INSERT_TAIL(&g_map_bdev_rbd_pool_ctx, entry, link);

	return 0;

err_handle1:
	free(entry->name);
err_handle:
	free(entry);

	return -1;
}

static void *
bdev_rbd_init_context(void *arg)
{
	struct bdev_rbd *rbd = arg;
	int rc;

	if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->io_ctx) < 0) {
	if (bdev_rbd_get_pool_ctx(rbd->cluster_p, rbd->pool_name, &rbd->ctx) < 0) {
		SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd);
		return NULL;
	}

	rc = rbd_open(rbd->io_ctx, rbd->rbd_name, &rbd->image, NULL);
	assert(rbd->ctx != NULL);
	rc = rbd_open(rbd->ctx->io_ctx, rbd->rbd_name, &rbd->image, NULL);
	if (rc < 0) {
		SPDK_ERRLOG("Failed to open specified rbd device\n");
		return NULL;
@@ -565,6 +647,14 @@ bdev_rbd_free_cb(void *io_device)
{
	struct bdev_rbd *rbd = io_device;

	assert(spdk_get_thread() == spdk_thread_get_app_thread());

	/* free the ctx */
	if (rbd->ctx) {
		bdev_rbd_put_pool_ctx(rbd->ctx);
		rbd->ctx = NULL;
	}

	/* The io device has been unregistered.  Send a message back to the
	 * original thread that started the destruct operation, so that the
	 * bdev unregister callback is invoked on the same thread that started