Commit 533470b2 authored by Konrad Sztyber's avatar Konrad Sztyber
Browse files

bdev/rbd: limit operations performed on main thread



Previously, it was possible to execute spdk_bdev_io_get_buf() on a
different thread than the one that the IO was submitted on, which is
unsafe.  Now, the buffers are always allocated on the correct thread and
we do spdk_thread_send_msg() only for the rbd library functions.

Signed-off-by: default avatarKonrad Sztyber <konrad.sztyber@intel.com>
Change-Id: I07be5a4d0bd1463e0b7cf1aaee146edc575df387
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/15671


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Community-CI: Mellanox Build Bot
Reviewed-by: default avatarShuhei Matsumoto <smatsumoto@nvidia.com>
Reviewed-by: default avatarAleksey Marchuk <alexeymar@nvidia.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
parent 10cb404a
Loading
Loading
Loading
Loading
+27 −34
Original line number Diff line number Diff line
@@ -396,7 +396,7 @@ bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
}

static void
bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io,
_bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io,
		    struct iovec *iov, int iovcnt, uint64_t offset, size_t len)
{
	int ret;
@@ -441,6 +441,20 @@ err:
	bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
}

static void
bdev_rbd_start_aio(void *ctx)
{
	struct spdk_bdev_io *bdev_io = ctx;
	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;

	_bdev_rbd_start_aio(disk,
			    bdev_io,
			    bdev_io->u.bdev.iovs,
			    bdev_io->u.bdev.iovcnt,
			    bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
			    bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
}

static int bdev_rbd_library_init(void);
static void bdev_rbd_library_fini(void);

@@ -496,8 +510,11 @@ bdev_rbd_reset_timer(void *arg)
}

static void
bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io)
bdev_rbd_reset(void *ctx)
{
	struct spdk_bdev_io *bdev_io = ctx;
	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;

	/*
	 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a
	 * poller to wait for in-flight I/O to complete.
@@ -580,20 +597,17 @@ bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
		return;
	}

	bdev_rbd_start_aio(disk,
			   bdev_io,
			   bdev_io->u.bdev.iovs,
			   bdev_io->u.bdev.iovcnt,
			   bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
			   bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
	spdk_thread_exec_msg(disk->main_td, bdev_rbd_start_aio, bdev_io);
}

static void
_bdev_rbd_submit_request(void *ctx)
bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
{
	struct spdk_bdev_io *bdev_io = ctx;
	struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch);
	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;

	rbd_io->submit_td = submit_td;
	switch (bdev_io->type) {
	case SPDK_BDEV_IO_TYPE_READ:
		spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb,
@@ -604,17 +618,11 @@ _bdev_rbd_submit_request(void *ctx)
	case SPDK_BDEV_IO_TYPE_UNMAP:
	case SPDK_BDEV_IO_TYPE_FLUSH:
	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
		bdev_rbd_start_aio(disk,
				   bdev_io,
				   bdev_io->u.bdev.iovs,
				   bdev_io->u.bdev.iovcnt,
				   bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
				   bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
		spdk_thread_exec_msg(disk->main_td, bdev_rbd_start_aio, bdev_io);
		break;

	case SPDK_BDEV_IO_TYPE_RESET:
		bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt,
			       bdev_io);
		spdk_thread_exec_msg(disk->main_td, bdev_rbd_reset, bdev_io);
		break;

	default:
@@ -624,21 +632,6 @@ _bdev_rbd_submit_request(void *ctx)
	}
}

static void
bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
{
	struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch);
	struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
	struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;

	rbd_io->submit_td = submit_td;
	if (disk->main_td != submit_td) {
		spdk_thread_send_msg(disk->main_td, _bdev_rbd_submit_request, bdev_io);
	} else {
		_bdev_rbd_submit_request(bdev_io);
	}
}

static bool
bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
{