Commit e1e73440 authored by Ziye Yang's avatar Ziye Yang Committed by Tomasz Zawadzki
Browse files

bdev/rbd: open image on only one spdk_thread



Purpose: Only Open image on one spdk_thread
due to the limitation of librbd module in order to
eliminate the lock overhead among different
spdk_threads on operating on the same image.

Change-Id: I64c62e8ae1c3324b92cfd953b44ec08af6688530
Signed-off-by: default avatarZiye Yang <ziye.yang@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/6812


Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: default avatarGangCao <gang.cao@intel.com>
Reviewed-by: default avatarXiaodong Liu <xiaodong.liu@intel.com>
Reviewed-by: default avatarAleksey Marchuk <alexeymar@mellanox.com>
Community-CI: Broadcom CI <spdk-ci.pdl@broadcom.com>
Community-CI: Mellanox Build Bot
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
parent cc0d05b4
Loading
Loading
Loading
Loading
+230 −92
Original line number Diff line number Diff line
@@ -62,10 +62,22 @@ struct bdev_rbd {
	char *user_id;
	char *pool_name;
	char **config;

	rados_t cluster;
	rados_t *cluster_p;
	char *cluster_name;

	rados_ioctx_t io_ctx;
	rbd_image_t image;
	int pfd;

	rbd_image_info_t info;
	pthread_mutex_t mutex;
	struct spdk_thread *main_td;
	uint32_t ch_count;
	struct bdev_rbd_group_channel *group_ch;

	bool deferred_free;
	TAILQ_ENTRY(bdev_rbd) tailq;
	struct spdk_poller *reset_timer;
	struct spdk_bdev_io *reset_bdev_io;
@@ -77,14 +89,12 @@ struct bdev_rbd_group_channel {
};

struct bdev_rbd_io_channel {
	rados_ioctx_t io_ctx;
	int pfd;
	rbd_image_t image;
	struct bdev_rbd *disk;
	struct bdev_rbd_group_channel *group_ch;
};

struct bdev_rbd_io {
	struct	spdk_thread *submit_td;
	enum	spdk_bdev_io_status status;
	size_t	total_len;
};

@@ -156,6 +166,10 @@ bdev_rbd_free(struct bdev_rbd *rbd)
	free(rbd->pool_name);
	bdev_rbd_free_config(rbd->config);

	if (rbd->io_ctx) {
		rados_ioctx_destroy(rbd->io_ctx);
	}

	if (rbd->cluster_name) {
		bdev_rbd_put_cluster(&rbd->cluster_p);
		free(rbd->cluster_name);
@@ -163,6 +177,7 @@ bdev_rbd_free(struct bdev_rbd *rbd)
		rados_shutdown(rbd->cluster);
	}

	pthread_mutex_destroy(&rbd->mutex);
	free(rbd);
}

@@ -300,12 +315,37 @@ bdev_rbd_cluster_handle(void *arg)
	return ret;
}

static void *
bdev_rbd_init_context(void *arg)
{
	struct bdev_rbd *rbd = arg;
	int rc;

	if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->io_ctx) < 0) {
		SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd);
		return NULL;
	}

	rc = rbd_open(rbd->io_ctx, rbd->rbd_name, &rbd->image, NULL);
	if (rc < 0) {
		SPDK_ERRLOG("Failed to open specified rbd device\n");
		return NULL;
	}

	rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info));
	rbd_close(rbd->image);
	if (rc < 0) {
		SPDK_ERRLOG("Failed to stat specified rbd device\n");
		return NULL;
	}

	return arg;
}

static int
bdev_rbd_init(struct bdev_rbd *rbd)
{
	int ret = 0;
	rados_ioctx_t io_ctx = NULL;
	rbd_image_t image = NULL;

	if (!rbd->cluster_name) {
		rbd->cluster_p = &rbd->cluster;
@@ -324,25 +364,10 @@ bdev_rbd_init(struct bdev_rbd *rbd)
		}
	}

	ret = rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &io_ctx);
	if (ret < 0) {
		SPDK_ERRLOG("Failed to create ioctx\n");
		return -1;
	}

	ret = rbd_open(io_ctx, rbd->rbd_name, &image, NULL);
	if (ret < 0) {
		SPDK_ERRLOG("Failed to open specified rbd device\n");
		goto end;
	}
	ret = rbd_stat(image, &rbd->info, sizeof(rbd->info));
	rbd_close(image);
	if (ret < 0) {
		SPDK_ERRLOG("Failed to stat specified rbd device\n");
	if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) {
		SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd);
	}

end:
	rados_ioctx_destroy(io_ctx);
	return ret;
}

@@ -360,14 +385,34 @@ bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
}

static void
bdev_rbd_start_aio(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
_bdev_rbd_io_complete(void *_rbd_io)
{
	struct bdev_rbd_io *rbd_io = _rbd_io;

	spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status);
}

static void
bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
{
	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;

	rbd_io->status = status;
	if (rbd_io->submit_td != NULL) {
		spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io);
	} else {
		_bdev_rbd_io_complete(rbd_io);
	}
}

static void
bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io,
		   struct iovec *iov, int iovcnt, uint64_t offset, size_t len)
{
	struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch);
	int ret;
	rbd_completion_t comp;
	struct bdev_rbd_io *rbd_io;
	rbd_image_t image = rbdio_ch->image;
	rbd_image_t image = disk->image;

	ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb,
					&comp);
@@ -401,7 +446,7 @@ bdev_rbd_start_aio(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
	return;

err:
	spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
	bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
}

static int bdev_rbd_library_init(void);
@@ -432,7 +477,7 @@ bdev_rbd_reset_timer(void *arg)
	 * TODO: This should check if any I/O is still in flight before completing the reset.
	 * For now, just complete after the timer expires.
	 */
	spdk_bdev_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
	bdev_rbd_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
	spdk_poller_unregister(&disk->reset_timer);
	disk->reset_bdev_io = NULL;

@@ -458,8 +503,16 @@ bdev_rbd_free_cb(void *io_device)

	assert(rbd != NULL);

	pthread_mutex_lock(&rbd->mutex);

	if (rbd->ch_count != 0) {
		rbd->deferred_free = true;
		pthread_mutex_unlock(&rbd->mutex);
	} else {
		pthread_mutex_unlock(&rbd->mutex);
		bdev_rbd_free((struct bdev_rbd *)rbd);
	}
}

static int
bdev_rbd_destruct(void *ctx)
@@ -475,12 +528,14 @@ static void
bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
		    bool success)
{
	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;

	if (!success) {
		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
		return;
	}

	bdev_rbd_start_aio(ch,
	bdev_rbd_start_aio(disk,
			   bdev_io,
			   bdev_io->u.bdev.iovs,
			   bdev_io->u.bdev.iovcnt,
@@ -489,8 +544,11 @@ bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
}

static void
bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
_bdev_rbd_submit_request(void *ctx)
{
	struct spdk_bdev_io *bdev_io = ctx;
	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;

	switch (bdev_io->type) {
	case SPDK_BDEV_IO_TYPE_READ:
		spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb,
@@ -499,7 +557,7 @@ bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io

	case SPDK_BDEV_IO_TYPE_WRITE:
	case SPDK_BDEV_IO_TYPE_FLUSH:
		bdev_rbd_start_aio(ch,
		bdev_rbd_start_aio(disk,
				   bdev_io,
				   bdev_io->u.bdev.iovs,
				   bdev_io->u.bdev.iovcnt,
@@ -514,11 +572,27 @@ bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io

	default:
		SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type);
		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
		bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
		break;
	}
}

static void
bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
{
	struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch);
	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;

	if (disk->main_td != submit_td) {
		rbd_io->submit_td = submit_td;
		spdk_thread_send_msg(disk->main_td, _bdev_rbd_submit_request, bdev_io);
	} else {
		rbd_io->submit_td = NULL;
		_bdev_rbd_submit_request(bdev_io);
	}
}

static bool
bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
{
@@ -535,7 +609,7 @@ bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
}

static void
bdev_rbd_io_poll(struct bdev_rbd_io_channel *ch)
bdev_rbd_io_poll(struct bdev_rbd *disk)
{
	int i, io_status, rc;
	rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH];
@@ -543,7 +617,7 @@ bdev_rbd_io_poll(struct bdev_rbd_io_channel *ch)
	struct bdev_rbd_io *rbd_io;
	enum spdk_bdev_io_status bio_status;

	rc = rbd_poll_io_events(ch->image, comps, SPDK_RBD_QUEUE_DEPTH);
	rc = rbd_poll_io_events(disk->image, comps, SPDK_RBD_QUEUE_DEPTH);
	for (i = 0; i < rc; i++) {
		bdev_io = rbd_aio_get_arg(comps[i]);
		rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
@@ -563,49 +637,46 @@ bdev_rbd_io_poll(struct bdev_rbd_io_channel *ch)

		rbd_aio_release(comps[i]);

		spdk_bdev_io_complete(bdev_io, bio_status);
		bdev_rbd_io_complete(bdev_io, bio_status);
	}
}

static void
bdev_rbd_free_channel(struct bdev_rbd_io_channel *ch)
bdev_rbd_free_channel_resources(struct bdev_rbd *disk)
{
	if (!ch) {
		return;
	}
	int rc;

	if (ch->image) {
		bdev_rbd_exit(ch->image);
	assert(disk != NULL);
	assert(disk->main_td == spdk_get_thread());
	assert(disk->ch_count == 0);
	assert(disk->group_ch != NULL);
	rc = epoll_ctl(disk->group_ch->epoll_fd, EPOLL_CTL_DEL,
		       disk->pfd, NULL);
	if (rc < 0) {
		SPDK_ERRLOG("Failed to remove fd on disk=%p from the polling group=%p\n",
			    disk, disk->group_ch);
	}
	spdk_put_io_channel(spdk_io_channel_from_ctx(disk->group_ch));

	if (ch->io_ctx) {
		rados_ioctx_destroy(ch->io_ctx);
	if (disk->image) {
		bdev_rbd_exit(disk->image);
	}

	if (ch->pfd >= 0) {
		close(ch->pfd);
	if (disk->pfd >= 0) {
		close(disk->pfd);
	}

	if (ch->group_ch) {
		spdk_put_io_channel(spdk_io_channel_from_ctx(ch->group_ch));
	}
	disk->main_td = NULL;
	disk->group_ch = NULL;
}

static void *
bdev_rbd_handle(void *arg)
{
	struct bdev_rbd_io_channel *ch = arg;
	struct bdev_rbd *disk = arg;
	void *ret = arg;

	assert(ch->disk->cluster_p != NULL);

	if (rados_ioctx_create(*(ch->disk->cluster_p), ch->disk->pool_name, &ch->io_ctx) < 0) {
		SPDK_ERRLOG("Failed to create ioctx\n");
		ret = NULL;
		return ret;
	}

	if (rbd_open(ch->io_ctx, ch->disk->rbd_name, &ch->image, NULL) < 0) {
	if (rbd_open(disk->io_ctx, disk->rbd_name, &disk->image, NULL) < 0) {
		SPDK_ERRLOG("Failed to open specified rbd device\n");
		ret = NULL;
	}
@@ -614,67 +685,126 @@ bdev_rbd_handle(void *arg)
}

static int
bdev_rbd_create_cb(void *io_device, void *ctx_buf)
_bdev_rbd_create_cb(struct bdev_rbd *disk)
{
	struct bdev_rbd_io_channel *ch = ctx_buf;
	int ret;
	struct epoll_event event;
	struct epoll_event event = {};

	ch->disk = io_device;
	ch->image = NULL;
	ch->io_ctx = NULL;
	ch->pfd = -1;
	disk->group_ch = spdk_io_channel_get_ctx(spdk_get_io_channel(&rbd_if));
	assert(disk->group_ch != NULL);
	event.events = EPOLLIN;
	event.data.ptr = disk;

	if (spdk_call_unaffinitized(bdev_rbd_handle, ch) == NULL) {
	if (spdk_call_unaffinitized(bdev_rbd_handle, disk) == NULL) {
		goto err;
	}

	ch->pfd = eventfd(0, EFD_NONBLOCK);
	if (ch->pfd < 0) {
	disk->pfd = eventfd(0, EFD_NONBLOCK);
	if (disk->pfd < 0) {
		SPDK_ERRLOG("Failed to get eventfd\n");
		goto err;
	}

	ret = rbd_set_image_notification(ch->image, ch->pfd, EVENT_TYPE_EVENTFD);
	ret = rbd_set_image_notification(disk->image, disk->pfd, EVENT_TYPE_EVENTFD);
	if (ret < 0) {
		SPDK_ERRLOG("Failed to set rbd image notification\n");
		goto err;
	}

	ch->group_ch = spdk_io_channel_get_ctx(spdk_get_io_channel(&rbd_if));
	assert(ch->group_ch != NULL);
	memset(&event, 0, sizeof(event));
	event.events = EPOLLIN;
	event.data.ptr = ch;

	ret = epoll_ctl(ch->group_ch->epoll_fd, EPOLL_CTL_ADD, ch->pfd, &event);
	ret = epoll_ctl(disk->group_ch->epoll_fd, EPOLL_CTL_ADD, disk->pfd, &event);
	if (ret < 0) {
		SPDK_ERRLOG("Failed to add the fd of ch(%p) to the epoll group from group_ch=%p\n", ch,
			    ch->group_ch);
		SPDK_ERRLOG("Failed to add the fd of disk=%p to the epoll group from group_ch=%p\n", disk,
			    disk->group_ch);
		goto err;
	}

	return 0;

err:
	bdev_rbd_free_channel(ch);
	bdev_rbd_free_channel_resources(disk);
	return -1;
}

static void
bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
static int
bdev_rbd_create_cb(void *io_device, void *ctx_buf)
{
	struct bdev_rbd_io_channel *io_channel = ctx_buf;
	struct bdev_rbd_io_channel *ch = ctx_buf;
	struct bdev_rbd *disk = io_device;
	int rc;

	rc = epoll_ctl(io_channel->group_ch->epoll_fd, EPOLL_CTL_DEL,
		       io_channel->pfd, NULL);
	if (rc < 0) {
		SPDK_ERRLOG("Failed to remove fd on io_channel=%p from the polling group=%p\n",
			    io_channel, io_channel->group_ch);
	ch->disk = disk;
	pthread_mutex_lock(&disk->mutex);
	if (disk->ch_count == 0) {
		assert(disk->main_td == NULL);
		rc = _bdev_rbd_create_cb(disk);
		if (rc) {
			SPDK_ERRLOG("Cannot create channel for disk=%p\n", disk);
			pthread_mutex_unlock(&disk->mutex);
			return rc;
		}

		disk->main_td = spdk_get_thread();
	}

	disk->ch_count++;
	pthread_mutex_unlock(&disk->mutex);

	return 0;
}

	bdev_rbd_free_channel(io_channel);
static void
_bdev_rbd_destroy_cb(void *ctx)
{
	struct bdev_rbd *disk = ctx;
	bool deferred_free;

	pthread_mutex_lock(&disk->mutex);
	assert(disk->ch_count > 0);
	disk->ch_count--;

	if (disk->ch_count > 0) {
		/* A new channel was created between when message was sent and this function executed */
		pthread_mutex_unlock(&disk->mutex);
		return;
	}

	bdev_rbd_free_channel_resources(disk);

	deferred_free = disk->deferred_free;
	pthread_mutex_unlock(&disk->mutex);

	/* Need to free rbd structure if there is deferred_free case
	 * by the bdev_rbd_destruct function */
	if (deferred_free) {
		bdev_rbd_free(disk);
	}
}

static void
bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
{
	struct bdev_rbd *disk = io_device;
	struct spdk_thread *thread;

	pthread_mutex_lock(&disk->mutex);
	assert(disk->ch_count > 0);
	disk->ch_count--;
	if (disk->ch_count == 0) {
		assert(disk->main_td != NULL);
		if (disk->main_td != spdk_get_thread()) {
			/* The final channel was destroyed on a different thread
			 * than where the first channel was created. Pass a message
			 * to the main thread to unregister the poller. */
			disk->ch_count++;
			thread = disk->main_td;
			pthread_mutex_unlock(&disk->mutex);
			spdk_thread_send_msg(thread, _bdev_rbd_destroy_cb, disk);
			return;
		}

		bdev_rbd_free_channel_resources(disk);
	}
	pthread_mutex_unlock(&disk->mutex);
}

static struct spdk_io_channel *
@@ -1051,6 +1181,14 @@ bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id,
		return -ENOMEM;
	}

	ret = pthread_mutex_init(&rbd->mutex, NULL);
	if (ret) {
		SPDK_ERRLOG("Cannot init mutex on rbd=%p\n", rbd->disk.name);
		free(rbd);
		return ret;
	}

	rbd->pfd = -1;
	rbd->rbd_name = strdup(rbd_name);
	if (!rbd->rbd_name) {
		bdev_rbd_free(rbd);
@@ -1161,7 +1299,7 @@ bdev_rbd_resize(struct spdk_bdev *bdev, const uint64_t new_size_in_mb)
	rbd_io_ch = spdk_io_channel_get_ctx(ch);
	new_size_in_byte = new_size_in_mb * 1024 * 1024;

	rc = rbd_resize(rbd_io_ch->image, new_size_in_byte);
	rc = rbd_resize(rbd_io_ch->disk->image, new_size_in_byte);
	spdk_put_io_channel(ch);
	if (rc != 0) {
		SPDK_ERRLOG("failed to resize the ceph bdev.\n");
@@ -1191,7 +1329,7 @@ bdev_rbd_group_poll(void *arg)
	}

	for (i = 0; i < num_events; i++) {
		bdev_rbd_io_poll((struct bdev_rbd_io_channel *)events[i].data.ptr);
		bdev_rbd_io_poll((struct bdev_rbd *)events[i].data.ptr);
	}

	return SPDK_POLLER_BUSY;