Commit b1617bf8 authored by Ben Walker's avatar Ben Walker
Browse files

bdev/rbd: Poller can now handle channel deletion inline



An I/O completion callback may delete the channel, so
restructure the poller to avoid touching channel memory
while triggering completions.

Change-Id: I612f10ff172481084386c9a3056fdd5e2f19e854
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.gerrithub.io/370526


Tested-by: default avatarSPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: default avatarDaniel Verkamp <daniel.verkamp@intel.com>
parent 12b53038
Loading
Loading
Loading
Loading
+27 −42
Original line number Diff line number Diff line
@@ -48,13 +48,11 @@

#include "spdk_internal/bdev.h"

#define SPDK_RBD_QUEUE_DEPTH 128

static TAILQ_HEAD(, bdev_rbd) g_rbds = TAILQ_HEAD_INITIALIZER(g_rbds);
static int bdev_rbd_count = 0;

struct bdev_rbd_io {
	rbd_completion_t completion;
};

struct bdev_rbd {
	struct spdk_bdev disk;
	char *rbd_name;
@@ -68,8 +66,6 @@ struct bdev_rbd_io_channel {
	rados_t cluster;
	struct pollfd pfd;
	rbd_image_t image;
	rbd_completion_t *comps;
	uint32_t queue_depth;
	struct bdev_rbd *disk;
	struct spdk_bdev_poller *poller;
};
@@ -171,30 +167,30 @@ bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
}

static int
bdev_rbd_start_aio(rbd_image_t image, struct bdev_rbd_io *cmd,
bdev_rbd_start_aio(rbd_image_t image, struct spdk_bdev_io *bdev_io,
		   void *buf, uint64_t offset, size_t len)
{
	int ret;
	struct spdk_bdev_io *bdev_io = spdk_bdev_io_from_ctx(cmd);
	rbd_completion_t comp;

	ret = rbd_aio_create_completion((void *)cmd, bdev_rbd_finish_aiocb,
					&cmd->completion);
	ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb,
					&comp);
	if (ret < 0) {
		return -1;
	}

	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
		ret = rbd_aio_read(image, offset, len,
				   buf, cmd->completion);
				   buf, comp);
	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
		ret = rbd_aio_write(image, offset, len,
				    buf, cmd->completion);
				    buf, comp);
	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
		ret = rbd_aio_flush(image, cmd->completion);
		ret = rbd_aio_flush(image, comp);
	}

	if (ret < 0) {
		rbd_aio_release(cmd->completion);
		rbd_aio_release(comp);
		return -1;
	}

@@ -207,7 +203,7 @@ static void bdev_rbd_library_fini(void);
static int
bdev_rbd_get_ctx_size(void)
{
	return sizeof(struct bdev_rbd_io);
	return 0;
}

SPDK_BDEV_MODULE_REGISTER(rbd, bdev_rbd_library_init, bdev_rbd_library_fini, NULL,
@@ -215,7 +211,7 @@ SPDK_BDEV_MODULE_REGISTER(rbd, bdev_rbd_library_init, bdev_rbd_library_fini, NUL

static int64_t
bdev_rbd_readv(struct bdev_rbd *disk, struct spdk_io_channel *ch,
	       struct bdev_rbd_io *cmd, struct iovec *iov,
	       struct spdk_bdev_io *bdev_io, struct iovec *iov,
	       int iovcnt, size_t len, uint64_t offset)
{
	struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch);
@@ -223,12 +219,12 @@ bdev_rbd_readv(struct bdev_rbd *disk, struct spdk_io_channel *ch,
	if (iovcnt != 1 || iov->iov_len != len)
		return -1;

	return bdev_rbd_start_aio(rbdio_ch->image, cmd, iov->iov_base, offset, len);
	return bdev_rbd_start_aio(rbdio_ch->image, bdev_io, iov->iov_base, offset, len);
}

static int64_t
bdev_rbd_writev(struct bdev_rbd *disk, struct spdk_io_channel *ch,
		struct bdev_rbd_io *cmd, struct iovec *iov,
		struct spdk_bdev_io *bdev_io, struct iovec *iov,
		int iovcnt, size_t len, uint64_t offset)
{
	struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch);
@@ -236,16 +232,16 @@ bdev_rbd_writev(struct bdev_rbd *disk, struct spdk_io_channel *ch,
	if ((iovcnt != 1) || (iov->iov_len != len))
		return -1;

	return bdev_rbd_start_aio(rbdio_ch->image, cmd, (void *)iov->iov_base, offset, len);
	return bdev_rbd_start_aio(rbdio_ch->image, bdev_io, (void *)iov->iov_base, offset, len);
}

static int64_t
bdev_rbd_flush(struct bdev_rbd *disk, struct spdk_io_channel *ch,
	       struct bdev_rbd_io *cmd, uint64_t offset, uint64_t nbytes)
	       struct spdk_bdev_io *bdev_io, uint64_t offset, uint64_t nbytes)
{
	struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch);

	return bdev_rbd_start_aio(rbdio_ch->image, cmd, NULL, offset, nbytes);
	return bdev_rbd_start_aio(rbdio_ch->image, bdev_io, NULL, offset, nbytes);
}

static int
@@ -260,7 +256,7 @@ static void bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io

	ret = bdev_rbd_readv(bdev_io->bdev->ctxt,
			     ch,
			     (struct bdev_rbd_io *)bdev_io->driver_ctx,
			     bdev_io,
			     bdev_io->u.read.iovs,
			     bdev_io->u.read.iovcnt,
			     bdev_io->u.read.len,
@@ -281,7 +277,7 @@ static int _bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev
	case SPDK_BDEV_IO_TYPE_WRITE:
		return bdev_rbd_writev((struct bdev_rbd *)bdev_io->bdev->ctxt,
				       ch,
				       (struct bdev_rbd_io *)bdev_io->driver_ctx,
				       bdev_io,
				       bdev_io->u.write.iovs,
				       bdev_io->u.write.iovcnt,
				       bdev_io->u.write.len,
@@ -289,7 +285,7 @@ static int _bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev
	case SPDK_BDEV_IO_TYPE_FLUSH:
		return bdev_rbd_flush((struct bdev_rbd *)bdev_io->bdev->ctxt,
				      ch,
				      (struct bdev_rbd_io *)bdev_io->driver_ctx,
				      bdev_io,
				      bdev_io->u.flush.offset,
				      bdev_io->u.flush.length);
	default:
@@ -323,9 +319,10 @@ static void
bdev_rbd_io_poll(void *arg)
{
	struct bdev_rbd_io_channel *ch = arg;
	struct bdev_rbd_io *req;
	int i, io_status, rc;
	rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH];
	struct spdk_bdev_io *bdev_io;
	int i, io_status, status, rc;
	enum spdk_bdev_io_status status;

	rc = poll(&ch->pfd, 1, 0);

@@ -334,11 +331,10 @@ bdev_rbd_io_poll(void *arg)
		return;
	}

	rc = rbd_poll_io_events(ch->image, ch->comps, ch->queue_depth);
	rc = rbd_poll_io_events(ch->image, comps, SPDK_RBD_QUEUE_DEPTH);
	for (i = 0; i < rc; i++) {
		req = (struct bdev_rbd_io *)rbd_aio_get_arg(ch->comps[i]);
		bdev_io = spdk_bdev_io_from_ctx(req);
		io_status = rbd_aio_get_return_value(ch->comps[i]);
		bdev_io = rbd_aio_get_arg(comps[i]);
		io_status = rbd_aio_get_return_value(comps[i]);
		if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
			if ((int)bdev_io->u.read.len == io_status) {
				status = SPDK_BDEV_IO_STATUS_SUCCESS;
@@ -353,8 +349,8 @@ bdev_rbd_io_poll(void *arg)
				status = SPDK_BDEV_IO_STATUS_FAILED;
			}
		}
		rbd_aio_release(comps[i]);
		spdk_bdev_io_complete(bdev_io, status);
		rbd_aio_release(req->completion);
	}
}

@@ -377,10 +373,6 @@ bdev_rbd_free_channel(struct bdev_rbd_io_channel *ch)
		rados_shutdown(ch->cluster);
	}

	if (ch->comps) {
		free(ch->comps);
	}

	if (ch->pfd.fd >= 0) {
		close(ch->pfd.fd);
	}
@@ -435,13 +427,6 @@ bdev_rbd_create_cb(void *io_device, void *ctx_buf)
		goto err;
	}

	ch->queue_depth = 128;
	ch->comps = calloc(sizeof(rbd_completion_t), ch->queue_depth);
	if (!ch->comps) {
		SPDK_ERRLOG("Failed to allocate rbd completion array\n");
		goto err;
	}

	spdk_bdev_poller_start(&ch->poller, bdev_rbd_io_poll, ch,
			       spdk_env_get_current_core(), 0);