Commit 1717b5d5 authored by Paul Luse's avatar Paul Luse Committed by Jim Harris
Browse files

bdev/compress: add reduce integration



Basic integration, init and load sequences

Change-Id: I12595a251f38168aad15e95275651cb4ce40ea7d
Signed-off-by: default avatarpaul luse <paul.e.luse@intel.com>
Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/435764


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
parent 2f3147c0
Loading
Loading
Loading
Loading
+266 −20
Original line number Diff line number Diff line
@@ -33,6 +33,7 @@

#include "vbdev_compress.h"

#include "spdk/reduce.h"
#include "spdk/stdinc.h"
#include "spdk/rpc.h"
#include "spdk/env.h"
@@ -54,9 +55,14 @@
#define NUM_MAX_INFLIGHT_OPS 512
#define DEFAULT_WINDOW_SIZE 15
#define MAX_MBUFS_PER_OP 64
#define BACKING_IO_UNIT_SZ (1024 * 4)
#define CHUNK_SIZE (1024 * 16)

#define COMP_BDEV_NAME "compress"

/* TODO: need to get this from RPC on create or reduce metadata on load */
#define TEST_MD_PATH "/tmp"

/* To add support for new device types, follow the examples of the following...
 * Note that the string names are defined by the DPDK PMD in question so be
 * sure to use the exact names.
@@ -102,6 +108,9 @@ struct vbdev_compress {
	pthread_mutex_t			reduce_lock;
	uint32_t			ch_count;
	TAILQ_HEAD(, spdk_bdev_io)	pending_comp_ios;	/* outstanding operations to a comp library */
	struct spdk_reduce_vol_params	params;		/* params for the reduce volume */
	struct spdk_reduce_backing_dev	backing_dev;	/* backing device info for the reduce volume */
	struct spdk_reduce_vol		*vol;		/* the reduce volume */
	TAILQ_ENTRY(vbdev_compress)	link;
};
static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp);
@@ -134,6 +143,7 @@ static struct rte_mempool *g_comp_op_mp = NULL; /* comp operations, must be rt
static void vbdev_compress_examine(struct spdk_bdev *bdev);
static void vbdev_compress_claim(struct vbdev_compress *comp_bdev);
static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io);
struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev *bdev);
static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);

/* Called by vbdev_init_compress_drivers() to init each discovered compression device */
@@ -557,6 +567,205 @@ vbdev_compress_config_json(struct spdk_json_write_ctx *w)
	return 0;
}

/* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct
 * used for initial metadata operations to claim where it will be further filled out
 * and added to the global list.
 */
static void
vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
{
	struct vbdev_compress *meta_ctx = cb_arg;

	if (reduce_errno == 0) {
		meta_ctx->vol = vol;
		vbdev_compress_claim(meta_ctx);
	} else {
		SPDK_ERRLOG("for vol %s, error %u\n",
			    spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno);
		spdk_put_io_channel(meta_ctx->base_ch);
		spdk_bdev_close(meta_ctx->base_desc);
		free(meta_ctx);
	}
}

/* Callback for the function used by reduceLib to perform IO to/from the backing device. We just
 * call the callback provided by reduceLib when it called the read/write/unmap function and
 * free the bdev_io.
 */
static void
comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
{
	struct spdk_reduce_vol_cb_args *cb_args = arg;
	int reduce_errno;

	if (success) {
		reduce_errno = 0;
	} else {
		reduce_errno = -EIO;
	}
	cb_args->cb_fn(cb_args->cb_arg, reduce_errno);
	spdk_bdev_free_io(bdev_io);
}

/* This is the function provided to the reduceLib for sending reads directly to
 * the backing device.
 */
static void
_comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
{
	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
					   backing_dev);
	int rc;

	rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
				    iov, iovcnt, lba, lba_count,
				    comp_reduce_io_cb,
				    args);
	if (rc) {
		if (rc == -ENOMEM) {
			SPDK_ERRLOG("No memory, start to queue io.\n");
			/* TODO: there's no bdev_io to queue */
		} else {
			SPDK_ERRLOG("error submitting readv request\n");
		}
		args->cb_fn(args->cb_arg, rc);
	}
}

/* This is the function provided to the reduceLib for sending writes directly to
 * the backing device.
 */
static void
_comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
		    uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
{
	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
					   backing_dev);
	int rc;

	rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
				     iov, iovcnt, lba, lba_count,
				     comp_reduce_io_cb,
				     args);
	if (rc) {
		if (rc == -ENOMEM) {
			SPDK_ERRLOG("No memory, start to queue io.\n");
			/* TODO: there's no bdev_io to queue */
		} else {
			SPDK_ERRLOG("error submitting writev request\n");
		}
		args->cb_fn(args->cb_arg, rc);
	}
}

/* This is the function provided to the reduceLib for sending umaps directly to
 * the backing device.
 */
static void
_comp_reduce_unmap(struct spdk_reduce_backing_dev *dev,
		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
{
	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
					   backing_dev);
	int rc;

	rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
				    lba, lba_count,
				    comp_reduce_io_cb,
				    args);

	if (rc) {
		if (rc == -ENOMEM) {
			SPDK_ERRLOG("No memory, start to queue io.\n");
			/* TODO: there's no bdev_io to queue */
		} else {
			SPDK_ERRLOG("error submitting unmap request\n");
		}
		args->cb_fn(args->cb_arg, rc);
	}
}

/* Called when the underlying base bdev goes away. */
static void
vbdev_compress_base_bdev_hotremove_cb(void *ctx)
{
	struct vbdev_compress *comp_bdev, *tmp;
	struct spdk_bdev *bdev_find = ctx;

	TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) {
		if (bdev_find == comp_bdev->base_bdev) {
			spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL);
		}
	}
}

/* TODO: determine which parms we want user configurable, HC for now
 * params.vol_size
 * params.chunk_size
 * compression PMD, algorithm, window size, comp level, etc.
 * TEST_MD_PATH
 */

/* Common function for init and load to allocate and populate the minimal
 * information for reducelib to init or load.
 */
struct vbdev_compress *
_prepare_for_load_init(struct spdk_bdev *bdev)
{
	struct vbdev_compress *meta_ctx;

	meta_ctx = calloc(1, sizeof(struct vbdev_compress));
	if (meta_ctx == NULL) {
		SPDK_ERRLOG("failed to alloc init contexs\n");
		return NULL;
	}

	meta_ctx->base_bdev = bdev;
	meta_ctx->backing_dev.unmap = _comp_reduce_unmap;
	meta_ctx->backing_dev.readv = _comp_reduce_readv;
	meta_ctx->backing_dev.writev = _comp_reduce_writev;
	meta_ctx->backing_dev.blocklen = bdev->blocklen;
	meta_ctx->backing_dev.blockcnt = bdev->blockcnt;

	meta_ctx->params.chunk_size = CHUNK_SIZE;
	meta_ctx->params.backing_io_unit_size = BACKING_IO_UNIT_SZ;
	meta_ctx->params.logical_block_size = meta_ctx->params.backing_io_unit_size;

	return meta_ctx;
}

/* Call reducelib to initialize a new volume */
static void
vbdev_init_reduce(struct spdk_bdev *bdev, const char *vbdev_name, const char *comp_pmd)
{
	struct vbdev_compress *meta_ctx;
	int rc;

	meta_ctx = _prepare_for_load_init(bdev);
	if (meta_ctx == NULL) {
		return;
	}

	rc = spdk_bdev_open(meta_ctx->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb,
			    meta_ctx->base_bdev, &meta_ctx->base_desc);
	if (rc) {
		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
		free(meta_ctx);
		return;
	}
	meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);

	/* TODO: we'll want to pass name and compression parms to this
	 * function so they can be persisted.  We'll need to retrieve them
	 * in load.
	 */
	spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev,
			     TEST_MD_PATH,
			     vbdev_reduce_init_cb,
			     meta_ctx);
}

/* We provide this callback for the SPDK channel code to create a channel using
 * the channel struct we provided in our module get_io_channel() entry point. Here
 * we get and save off an underlying base channel of the device below us so that
@@ -657,7 +866,7 @@ create_compress_disk(const char *bdev_name, const char *vbdev_name, const char *
		return -ENODEV;
	}

	/* TODO: vbdev_init_reduce(bdev); goes here */
	vbdev_init_reduce(bdev, vbdev_name, comp_pmd);

	return 0;
}
@@ -670,6 +879,7 @@ vbdev_compress_init(void)
		SPDK_ERRLOG("Error setting up compression devices\n");
		return -EINVAL;
	}

	return 0;
}

@@ -715,28 +925,13 @@ static struct spdk_bdev_module compress_if = {
	.module_init = vbdev_compress_init,
	.config_text = NULL,
	.get_ctx_size = vbdev_compress_get_ctx_size,
	.examine_config = vbdev_compress_examine,
	.examine_disk = vbdev_compress_examine,
	.module_fini = vbdev_compress_finish,
	.config_json = vbdev_compress_config_json
};

SPDK_BDEV_MODULE_REGISTER(compress, &compress_if)

/* Called when the underlying base bdev goes away. */
static void
vbdev_compress_examine_hotremove_cb(void *ctx)
{
	/* TODO */
}

/* Claim isn't called until a future series but removing this function
 * alone generates many other warnings about basic vbdev stuff (function
 * tables, etc..  I think it's better to have all the base functionality
 * in this first patch, I'll remove the pragmas as soon as claim is called
 * which happens after reduce is integrated.
 */
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-function"
static void
vbdev_compress_claim(struct vbdev_compress *comp_bdev)
{
@@ -779,7 +974,7 @@ vbdev_compress_claim(struct vbdev_compress *comp_bdev)

	TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);

	rc = spdk_bdev_open(comp_bdev->base_bdev, true, vbdev_compress_examine_hotremove_cb,
	rc = spdk_bdev_open(comp_bdev->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb,
			    comp_bdev->base_bdev, &comp_bdev->base_desc);
	if (rc) {
		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev));
@@ -804,6 +999,7 @@ vbdev_compress_claim(struct vbdev_compress *comp_bdev)
	}

	SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name);
	spdk_bdev_module_examine_done(&compress_if);

	return;
	/* Error cleanup paths. */
@@ -817,9 +1013,11 @@ error_open:
error_drv_name:
	free(comp_bdev->comp_bdev.name);
error_bdev_name:
	spdk_put_io_channel(comp_bdev->base_ch);
	spdk_bdev_close(comp_bdev->base_desc);
	free(comp_bdev);
	spdk_bdev_module_examine_done(&compress_if);
}
#pragma GCC diagnostic pop

void
delete_compress_disk(struct spdk_bdev *bdev, spdk_delete_compress_complete cb_fn, void *cb_arg)
@@ -832,10 +1030,58 @@ delete_compress_disk(struct spdk_bdev *bdev, spdk_delete_compress_complete cb_fn
	spdk_bdev_unregister(bdev, cb_fn, cb_arg);
}

/* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct
 * used for initial metadata operations to claim where it will be further filled out
 * and added to the global list.
 */
static void
vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
{
	struct vbdev_compress *meta_ctx = cb_arg;

	if (reduce_errno != 0) {
		/* This error means it is not a compress disk. */
		if (reduce_errno != -EILSEQ) {
			SPDK_ERRLOG("for vol %s, error %u\n",
				    spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno);
		}
		spdk_put_io_channel(meta_ctx->base_ch);
		spdk_bdev_close(meta_ctx->base_desc);
		free(meta_ctx);
		spdk_bdev_module_examine_done(&compress_if);
		return;
	}

	meta_ctx->vol = vol;
	vbdev_compress_claim(meta_ctx);
}

/* Examine_disk entry point: will do a metadata load to see if this is ours,
 * and if so will go ahead and claim it.
 */
static void
vbdev_compress_examine(struct spdk_bdev *bdev)
{
	struct vbdev_compress *meta_ctx;
	int rc;

	meta_ctx = _prepare_for_load_init(bdev);
	if (meta_ctx == NULL) {
		spdk_bdev_module_examine_done(&compress_if);
		return;
	}

	rc = spdk_bdev_open(meta_ctx->base_bdev, false, vbdev_compress_base_bdev_hotremove_cb,
			    meta_ctx->base_bdev, &meta_ctx->base_desc);
	if (rc) {
		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
		free(meta_ctx);
		spdk_bdev_module_examine_done(&compress_if);
		return;
	}

	meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);
	spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx);
}

SPDK_LOG_REGISTER_COMPONENT("vbdev_compress", SPDK_LOG_VBDEV_COMPRESS)
+2 −1
Original line number Diff line number Diff line
@@ -45,7 +45,8 @@ BLOCKDEV_MODULES_LIST += ocfenv
endif

ifeq ($(CONFIG_REDUCE),y)
BLOCKDEV_MODULES_LIST += bdev_compress
BLOCKDEV_MODULES_LIST += bdev_compress reduce
SYS_LIBS += -lpmem
endif

ifeq ($(CONFIG_RDMA),y)