Commit 10cb404a authored by Konrad Sztyber's avatar Konrad Sztyber
Browse files

bdev/compress: limit operations performed on reduce_thread



Previously, it was possible to execute spdk_bdev_io_get_buf() on a
different thread than the one that the IO was submitted on, which is
unsafe.  Now, the buffers are always allocated on the correct thread and
we do spdk_thread_send_msg() only for the reduce library functions.

Signed-off-by: default avatarKonrad Sztyber <konrad.sztyber@intel.com>
Change-Id: I50bf9aa3054073636204ec3a147d464f194a8c8f
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/15667


Reviewed-by: default avatarShuhei Matsumoto <smatsumoto@nvidia.com>
Reviewed-by: default avatarAleksey Marchuk <alexeymar@nvidia.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Community-CI: Mellanox Build Bot
parent 467809f8
Loading
Loading
Loading
Loading
+37 −54
Original line number Diff line number Diff line
@@ -748,6 +748,31 @@ _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev,
	}
}

static void
_comp_submit_write(void *ctx)
{
	struct spdk_bdev_io *bdev_io = ctx;
	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
					   comp_bdev);

	spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
			       bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
			       reduce_rw_blocks_cb, bdev_io);
}

static void
_comp_submit_read(void *ctx)
{
	struct spdk_bdev_io *bdev_io = ctx;
	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
					   comp_bdev);

	spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
			      bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
			      reduce_rw_blocks_cb, bdev_io);
}


/* Callback for getting a buf from the bdev pool in the event that the caller passed
 * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module
 * beneath us before we're done with it.
@@ -764,33 +789,22 @@ comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, b
		return;
	}

	spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
			      bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
			      reduce_rw_blocks_cb, bdev_io);
	spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_read, bdev_io);
}

/* scheduled for completion on IO thread */
static void
_complete_other_io(void *arg)
{
	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg;
	if (io_ctx->status == 0) {
		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
	} else {
		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
	}
}

/* scheduled for submission on reduce thread */
/* Called when someone above submits IO to this vbdev. */
static void
_comp_bdev_io_submit(void *arg)
vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
{
	struct spdk_bdev_io *bdev_io = arg;
	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
					   comp_bdev);
	struct spdk_thread *orig_thread;
	struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch);

	memset(io_ctx, 0, sizeof(struct comp_bdev_io));
	io_ctx->comp_bdev = comp_bdev;
	io_ctx->comp_ch = comp_ch;
	io_ctx->orig_io = bdev_io;

	switch (bdev_io->type) {
	case SPDK_BDEV_IO_TYPE_READ:
@@ -798,9 +812,7 @@ _comp_bdev_io_submit(void *arg)
				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
		return;
	case SPDK_BDEV_IO_TYPE_WRITE:
		spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
				       bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
				       reduce_rw_blocks_cb, bdev_io);
		spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_write, bdev_io);
		return;
	/* TODO support RESET in future patch in the series */
	case SPDK_BDEV_IO_TYPE_RESET:
@@ -809,37 +821,8 @@ _comp_bdev_io_submit(void *arg)
	case SPDK_BDEV_IO_TYPE_FLUSH:
	default:
		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
		io_ctx->status = -ENOTSUP;

		/* Complete this on the orig IO thread. */
		orig_thread = spdk_io_channel_get_thread(ch);
		if (orig_thread != spdk_get_thread()) {
			spdk_thread_send_msg(orig_thread, _complete_other_io, io_ctx);
		} else {
			_complete_other_io(io_ctx);
		}
	}
}

/* Called when someone above submits IO to this vbdev. */
static void
vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
{
	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
					   comp_bdev);
	struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch);

	memset(io_ctx, 0, sizeof(struct comp_bdev_io));
	io_ctx->comp_bdev = comp_bdev;
	io_ctx->comp_ch = comp_ch;
	io_ctx->orig_io = bdev_io;

	/* Send this request to the reduce_thread if that's not what we're on. */
	if (spdk_get_thread() != comp_bdev->reduce_thread) {
		spdk_thread_send_msg(comp_bdev->reduce_thread, _comp_bdev_io_submit, bdev_io);
	} else {
		_comp_bdev_io_submit(bdev_io);
		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
		break;
	}
}