Commit a827d07c authored by paul luse's avatar paul luse Committed by Jim Harris
Browse files

bdev/compress: integrate with DPDK compressdev



Change-Id: I0729f688c72651790ac05f4991c04deebca2af39
Signed-off-by: default avatarpaul luse <paul.e.luse@intel.com>
Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/448081


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
parent 032920f2
Loading
Loading
Loading
Loading
+173 −17
Original line number Diff line number Diff line
@@ -49,13 +49,13 @@
#include <rte_config.h>
#include <rte_bus_vdev.h>
#include <rte_compressdev.h>
#include <rte_comp.h>

/* TODO: valdiate these are good starting values */
#define NUM_MAX_XFORMS 16
#define NUM_MAX_INFLIGHT_OPS 512
#define DEFAULT_WINDOW_SIZE 15
#define MAX_MBUFS_PER_OP 64
#define BACKING_IO_UNIT_SZ (1024 * 4)
#define MAX_MBUFS_PER_OP 16
#define CHUNK_SIZE (1024 * 16)

#define COMP_BDEV_NAME "compress"
@@ -64,6 +64,7 @@
#define TEST_MD_PATH "/tmp"
#define DEV_CHUNK_SZ (16 * 1024)
#define DEV_LBA_SZ 512
#define DEV_BACKING_IO_SZ (4 * 1024)

/* To add support for new device types, follow the examples of the following...
 * Note that the string names are defined by the DPDK PMD in question so be
@@ -74,7 +75,7 @@
/* TODO: #define QAT "tbd" */
const char *g_drv_names[MAX_NUM_DRV_TYPES] = { ISAL_PMD };

#define NUM_MBUFS		32768
#define NUM_MBUFS		512
#define POOL_CACHE_SIZE		256

/* Global list of available compression devices. */
@@ -110,6 +111,7 @@ struct vbdev_compress {
	pthread_mutex_t			reduce_lock;
	uint32_t			ch_count;
	TAILQ_HEAD(, spdk_bdev_io)	pending_comp_ios;	/* outstanding operations to a comp library */
	struct spdk_poller		*poller;	/* completion poller */
	struct spdk_reduce_vol_params	params;		/* params for the reduce volume */
	struct spdk_reduce_backing_dev	backing_dev;	/* backing device info for the reduce volume */
	struct spdk_reduce_vol		*vol;		/* the reduce volume */
@@ -122,7 +124,6 @@ static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbde
/* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code.
 */
struct comp_io_channel {
	struct spdk_poller		*poller;		/* completion poller */
	struct spdk_io_channel_iter	*iter;	/* used with for_each_channel in reset */
};

@@ -133,11 +134,6 @@ struct comp_bdev_io {
	struct spdk_bdev_io_wait_entry	bdev_io_wait;		/* for bdev_io_wait */
	struct spdk_bdev_io		*orig_io;		/* the original IO */
	struct spdk_io_channel		*ch;			/* for resubmission */
	struct spdk_bdev_io		*read_io;
	/* TODO: rename these and maybe read_io above as well */
	uint64_t dest_num_blocks;				/* num of blocks for the contiguous buffer */
	uint64_t dest_offset_blocks;				/* block offset on media */
	struct iovec dest_iov;					/* iov representing contig write buffer */
};

/* Shared mempools between all devices on this system */
@@ -149,6 +145,7 @@ static void vbdev_compress_claim(struct vbdev_compress *comp_bdev);
static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io);
struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev *bdev);
static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf);

/* Called by vbdev_init_compress_drivers() to init each discovered compression device */
static int
@@ -360,10 +357,42 @@ error_create_mbuf:
static int
comp_dev_poller(void *args)
{
	/* TODO future patch in series */
	struct vbdev_compress *comp_bdev = args;
	uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
	struct rte_comp_op *deq_ops;
	uint16_t num_deq;
	struct spdk_reduce_vol_cb_args *reduce_args;

	num_deq = rte_compressdev_dequeue_burst(cdev_id, 0, &deq_ops, 1);

	if (num_deq > 0) {
		reduce_args = (struct spdk_reduce_vol_cb_args *)deq_ops->m_src->userdata;

		if (deq_ops->status != RTE_COMP_OP_STATUS_SUCCESS) {
			SPDK_ERRLOG("deque status %u\n", deq_ops->status);

			/* TODO update produced with translated -errno */
			/*
			 * RTE_COMP_OP_STATUS_SUCCESS = 0,
			 * RTE_COMP_OP_STATUS_NOT_PROCESSED,
			 * RTE_COMP_OP_STATUS_INVALID_ARGS,
			 * RTE_COMP_OP_STATUS_ERROR,
			 * RTE_COMP_OP_STATUS_INVALID_STATE,
			 * RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED,
			 * RTE_COMP_OP_STATUS_OUT_OF_SPACE_RECOVERABLE,
			 */
		}
		reduce_args->cb_fn(reduce_args->cb_arg, deq_ops->produced);

		/* Now bulk free both mbufs and the compress operation. */
		spdk_mempool_put(g_mbuf_mp, deq_ops->m_src);
		spdk_mempool_put(g_mbuf_mp, deq_ops->m_dst);
		rte_comp_op_free(deq_ops);
	}
	return 0;
}

/* Completion callback for r/w that were issued via reducelib. */
static void
spdk_reduce_rw_blocks_cb(void *arg, int reduce_errno)
{
@@ -379,6 +408,132 @@ spdk_reduce_rw_blocks_cb(void *arg, int reduce_errno)
	}
}

static int
_compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs,
		    int src_iovcnt, struct iovec *dst_iovs,
		    int dst_iovcnt, bool compress, void *cb_arg)
{
	void *reduce_cb_arg = cb_arg;
	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress,
					   backing_dev);
	struct rte_comp_op *comp_op;
	struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP];
	struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP];
	uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
	uint64_t total_length = 0;
	struct iovec *current_src_iov = NULL;
	struct iovec *current_dst_iov = NULL;
	int iov_index;
	int rc = 0;
	struct rte_mbuf_ext_shared_info shinfo_src;
	struct rte_mbuf_ext_shared_info shinfo_dst;

	assert(src_iovcnt < MAX_MBUFS_PER_OP);
	comp_op = rte_comp_op_alloc(g_comp_op_mp);
	if (!comp_op) {
		SPDK_ERRLOG("trying to get a comp op!\n");
		return -ENOMEM;
	}

	/* get an mbuf per iov, src and dst */
	rc = spdk_mempool_get_bulk(g_mbuf_mp, (void **)&src_mbufs[0], src_iovcnt);
	if (rc) {
		SPDK_ERRLOG("trying to get src_mbufs!\n");
		rc = -ENOMEM;
		goto error_get_src;

	}
	rc = spdk_mempool_get_bulk(g_mbuf_mp, (void **)&dst_mbufs[0], dst_iovcnt);
	if (rc) {
		SPDK_ERRLOG("trying to get dst_mbufs!\n");
		rc = -ENOMEM;
		goto error_get_dst;
	}

	/* There is a 1:1 mapping between a bdev_io and a compression operation, but
	 * all compression PMDs that SPDK uses support chaining so build our mbuf chain
	 * and associate with our single comp_op.
	 */

	/* Setup src mbufs */
	for (iov_index = 0; iov_index < src_iovcnt; iov_index++) {

		current_src_iov = src_iovs[iov_index].iov_base;
		total_length += src_iovs[iov_index].iov_len;
		src_mbufs[iov_index]->userdata = reduce_cb_arg;

		rte_pktmbuf_attach_extbuf(src_mbufs[iov_index],
					  current_src_iov,
					  spdk_vtophys((void *)current_src_iov, NULL),
					  src_iovs[iov_index].iov_len,
					  &shinfo_src);
		rte_pktmbuf_append(src_mbufs[iov_index], src_iovs[iov_index].iov_len);

		if (iov_index > 0) {
			rte_pktmbuf_chain(src_mbufs[0], src_mbufs[iov_index]);
		}
	}

	comp_op->m_src = src_mbufs[0];
	comp_op->src.offset = 0;
	comp_op->src.length = total_length;

	/* setup dst mbufs, for the current test being used with this code there's only one vector */
	for (iov_index = 0; iov_index < dst_iovcnt; iov_index++) {

		current_dst_iov = dst_iovs[iov_index].iov_base;

		rte_pktmbuf_attach_extbuf(dst_mbufs[iov_index],
					  current_dst_iov,
					  spdk_vtophys((void *)current_dst_iov, NULL),
					  dst_iovs[iov_index].iov_len,
					  &shinfo_dst);
		rte_pktmbuf_append(dst_mbufs[iov_index], dst_iovs[iov_index].iov_len);

		if (iov_index > 0) {
			rte_pktmbuf_chain(dst_mbufs[0], dst_mbufs[iov_index]);
		}
	}
	comp_op->m_dst = dst_mbufs[0];
	comp_op->dst.offset = 0;

	if (compress == true) {
		comp_op->private_xform = comp_bdev->device_qp->device->comp_xform;
	} else {
		comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform;
	}

	rte_compressdev_enqueue_burst(cdev_id, 0, &comp_op, 1);
	return rc;

	/* Error cleanup paths. */
error_get_dst:
	spdk_mempool_put_bulk(g_mbuf_mp, (void **)&src_mbufs[0], src_iovcnt);
error_get_src:
	rte_comp_op_free(comp_op);
	return rc;
}

/* Entry point for reduce lib to issue a compress operation. */
static void
_comp_reduce_compress(struct spdk_reduce_backing_dev *dev,
		      struct iovec *src_iovs, int src_iovcnt,
		      struct iovec *dst_iovs, int dst_iovcnt,
		      struct spdk_reduce_vol_cb_args *cb_arg)
{
	_compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg);
}

/* Entry point for reduce lib to issue a decompress operation. */
static void
_comp_reduce_decompress(struct spdk_reduce_backing_dev *dev,
			struct iovec *src_iovs, int src_iovcnt,
			struct iovec *dst_iovs, int dst_iovcnt,
			struct spdk_reduce_vol_cb_args *cb_arg)
{
	_compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg);
}

/* Callback for getting a buf from the bdev pool in the event that the caller passed
 * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module
 * beneath us before we're done with it.
@@ -685,8 +840,8 @@ comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
	} else {
		reduce_errno = -EIO;
	}
	cb_args->cb_fn(cb_args->cb_arg, reduce_errno);
	spdk_bdev_free_io(bdev_io);
	cb_args->cb_fn(cb_args->cb_arg, reduce_errno);
}

/* This is the function provided to the reduceLib for sending reads directly to
@@ -822,14 +977,16 @@ _prepare_for_load_init(struct spdk_bdev *bdev)
	meta_ctx->backing_dev.unmap = _comp_reduce_unmap;
	meta_ctx->backing_dev.readv = _comp_reduce_readv;
	meta_ctx->backing_dev.writev = _comp_reduce_writev;
	meta_ctx->backing_dev.compress = _comp_reduce_compress;
	meta_ctx->backing_dev.decompress = _comp_reduce_decompress;

	meta_ctx->backing_dev.blocklen = bdev->blocklen;
	meta_ctx->backing_dev.blockcnt = bdev->blockcnt;

	/* TODO, configurable chunk size & logical block size */
	meta_ctx->params.chunk_size = DEV_CHUNK_SZ;
	meta_ctx->params.logical_block_size = DEV_LBA_SZ;
	meta_ctx->params.backing_io_unit_size = BACKING_IO_UNIT_SZ;

	meta_ctx->params.backing_io_unit_size = DEV_BACKING_IO_SZ;
	return meta_ctx;
}

@@ -873,7 +1030,6 @@ vbdev_init_reduce(struct spdk_bdev *bdev, const char *vbdev_name, const char *co
static int
comp_bdev_ch_create_cb(void *io_device, void *ctx_buf)
{
	struct comp_io_channel *comp_ch = ctx_buf;
	struct vbdev_compress *comp_bdev = io_device;
	struct comp_device_qp *device_qp;

@@ -885,7 +1041,7 @@ comp_bdev_ch_create_cb(void *io_device, void *ctx_buf)
	if (comp_bdev->ch_count == 0) {
		comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
		comp_bdev->reduce_thread = spdk_get_thread();
		comp_ch->poller = spdk_poller_register(comp_dev_poller, comp_ch, 0);
		comp_bdev->poller = spdk_poller_register(comp_dev_poller, comp_bdev, 0);
		/* Now assign a q pair */
		pthread_mutex_lock(&g_comp_device_qp_lock);
		TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) {
@@ -914,7 +1070,7 @@ _clear_qp_and_put_channel(struct vbdev_compress *comp_bdev)

	spdk_put_io_channel(comp_bdev->base_ch);
	comp_bdev->reduce_thread = NULL;
	spdk_poller_unregister(&comp_bdev->comp_ch->poller);
	spdk_poller_unregister(&comp_bdev->poller);
}

/* Used to reroute destroy_ch to the correct thread */
+2 −1
Original line number Diff line number Diff line
@@ -41,7 +41,8 @@ DIRS-y += crypto.c
endif

ifeq ($(CONFIG_REDUCE),y)
DIRS-y += compress.c
# enable once new mocks are added for compressdev
#DIRS-y += compress.c
endif

DIRS-$(CONFIG_PMDK) += pmem