Commit 3d6305b9 authored by paul luse's avatar paul luse Committed by Jim Harris
Browse files

bdev/compress: fix issues with multi-thread



Submission logic was incorrect and completion logic was not
present yet.  This passes now with multi-thread bdevio test.

Signed-off-by: default avatarpaul luse <paul.e.luse@intel.com>
Change-Id: Ia8ed1c123be511240d93503a2c5e501ccad445bf
Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/455018


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
parent 4397e387
Loading
Loading
Loading
Loading
+73 −30
Original line number Diff line number Diff line
@@ -145,6 +145,7 @@ struct comp_bdev_io {
	struct spdk_bdev_io_wait_entry	bdev_io_wait;		/* for bdev_io_wait */
	struct spdk_bdev_io		*orig_io;		/* the original IO */
	struct spdk_io_channel		*ch;			/* for resubmission */
	int				status;			/* save for completion on orig thread */
};

/* Shared mempools between all devices on this system */
@@ -375,19 +376,38 @@ error_create_mbuf:
	return rc;
}

/* for completing rw requests on the orig IO thread. */
static void
_spdk_reduce_rw_blocks_cb(void *arg)
{
	struct comp_bdev_io *io_ctx = arg;

	if (io_ctx->status == 0) {
		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
	} else {
		SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status);
		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
	}
}

/* Completion callback for r/w that were issued via reducelib. */
static void
spdk_reduce_rw_blocks_cb(void *arg, int reduce_errno)
{
	struct spdk_bdev_io *bdev_io = arg;
	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);

	/* TODO: need to decide which error codes are bdev_io success vs failure;
	 * example examine calls reading metadata */
	if (reduce_errno == 0) {
		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);

	io_ctx->status = reduce_errno;

	/* Send this request to the orig IO thread. */
	if (spdk_io_channel_get_thread(ch) != spdk_get_thread()) {
		spdk_thread_send_msg(spdk_io_channel_get_thread(ch), _spdk_reduce_rw_blocks_cb, io_ctx);
	} else {
		SPDK_ERRLOG("ERROR %d on operation from reduce API\n", reduce_errno);
		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
		_spdk_reduce_rw_blocks_cb(io_ctx);
	}
}

@@ -630,55 +650,48 @@ comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, b
			      spdk_reduce_rw_blocks_cb, bdev_io);
}

/* scheduled for completion on IO thread */
static void
_spdk_bdev_io_submit(void *arg)
_complete_other_io(void *arg)
{
	struct comp_bdev_io *io_ctx = arg;

	vbdev_compress_submit_request(spdk_io_channel_from_ctx(io_ctx->comp_ch), io_ctx->orig_io);
	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg;
	if (io_ctx->status == 0) {
		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
	} else {
		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
	}
}

/* Called when someone above submits IO to this vbdev. */
/* scheduled for submission on reduce thread */
static void
vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
_spdk_bdev_io_submit(void *arg)
{
	struct spdk_bdev_io *bdev_io = arg;
	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
					   comp_bdev);
	struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch);
	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
	int rc = 0;

	memset(io_ctx, 0, sizeof(struct comp_bdev_io));
	io_ctx->comp_bdev = comp_bdev;
	io_ctx->comp_ch = comp_ch;
	io_ctx->orig_io = bdev_io;

	/* Send this request to the reduce_thread. */
	if (spdk_io_channel_get_thread(spdk_bdev_io_get_io_channel(bdev_io))
	    != comp_bdev->reduce_thread) {
		spdk_thread_send_msg(comp_bdev->reduce_thread, _spdk_bdev_io_submit, io_ctx);
		return;
	}

	switch (bdev_io->type) {
	case SPDK_BDEV_IO_TYPE_READ:
		spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb,
				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
		break;
		return;
	case SPDK_BDEV_IO_TYPE_WRITE:
		spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
				       bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
				       spdk_reduce_rw_blocks_cb, bdev_io);
		break;
		return;
	/* TODO in future patch in the series */
	case SPDK_BDEV_IO_TYPE_RESET:
		break;
	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
	case SPDK_BDEV_IO_TYPE_UNMAP:
	case SPDK_BDEV_IO_TYPE_FLUSH:
	case SPDK_BDEV_IO_TYPE_RESET:
	default:
		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
		return;
		rc = -EINVAL;
	}

	if (rc) {
@@ -686,10 +699,40 @@ vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *b
			SPDK_ERRLOG("No memory, start to queue io for compress.\n");
			io_ctx->ch = ch;
			vbdev_compress_queue_io(bdev_io);
			return;
		} else {
			SPDK_ERRLOG("on bdev_io submission!\n");
			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
			io_ctx->status = rc;
		}
	}

	/* Complete this on the orig IO thread. */
	if (spdk_io_channel_get_thread(ch) != spdk_get_thread()) {
		spdk_thread_send_msg(spdk_io_channel_get_thread(ch), _complete_other_io, io_ctx);
	} else {
		_complete_other_io(io_ctx);
	}
}

/* Called when someone above submits IO to this vbdev. */
static void
vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
{
	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
					   comp_bdev);
	struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch);

	memset(io_ctx, 0, sizeof(struct comp_bdev_io));
	io_ctx->comp_bdev = comp_bdev;
	io_ctx->comp_ch = comp_ch;
	io_ctx->orig_io = bdev_io;

	/* Send this request to the reduce_thread if that's not what we're on. */
	if (spdk_io_channel_get_thread(ch) != comp_bdev->reduce_thread) {
		spdk_thread_send_msg(comp_bdev->reduce_thread, _spdk_bdev_io_submit, bdev_io);
	} else {
		_spdk_bdev_io_submit(bdev_io);
	}
}