Commit 70165ced authored by paul luse's avatar paul luse Committed by Jim Harris
Browse files

bdev/compress: queue up compression operations if we can't submit



The compressdev enqueue call will return the number of ops that
it was able to accept.  By design this will always be 1 for our
implementation so check that to make sure.

If it is 0 then we queue the compression operation up and try
again after something is dequeued in the poller.

We will also queue if we can't get an mbuf or op from our pools.

Change-Id: I7285749b4a599d1ee265e3c3e16073f25c4d7469
Signed-off-by: default avatarpaul luse <paul.e.luse@intel.com>
Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/451686


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
parent 122cc72c
Loading
Loading
Loading
Loading
+63 −7
Original line number Diff line number Diff line
@@ -96,6 +96,18 @@ struct comp_device_qp {
static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp);
static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER;

/* For queueing up compression operations that we can't submit for some reason */
struct vbdev_comp_op {
	struct spdk_reduce_backing_dev	*backing_dev;
	struct iovec			*src_iovs;
	int				src_iovcnt;
	struct iovec			*dst_iovs;
	int				dst_iovcnt;
	bool				compress;
	void				*cb_arg;
	TAILQ_ENTRY(vbdev_comp_op)	link;
};

/* List of virtual bdevs and associated info for each. */
struct vbdev_compress {
	struct spdk_bdev		*base_bdev;	/* the thing we're attaching to */
@@ -115,6 +127,7 @@ struct vbdev_compress {
	struct spdk_reduce_vol		*vol;		/* the reduce volume */
	spdk_delete_compress_complete	delete_cb_fn;
	void				*delete_cb_arg;
	TAILQ_HEAD(, vbdev_comp_op)	queued_comp_ops;
	TAILQ_ENTRY(vbdev_compress)	link;
};
static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp);
@@ -395,27 +408,27 @@ _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *s
	struct iovec *current_dst_iov = NULL;
	int iov_index;
	int rc = 0;
	struct vbdev_comp_op *op_to_queue;
	int i;

	assert(src_iovcnt < MAX_MBUFS_PER_OP);

	comp_op = rte_comp_op_alloc(g_comp_op_mp);
	if (!comp_op) {
		SPDK_ERRLOG("trying to get a comp op!\n");
		return -ENOMEM;
		goto error_get_op;
	}

	/* get an mbuf per iov, src and dst */
	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt);
	if (rc) {
		SPDK_ERRLOG("ERROR trying to get src_mbufs!\n");
		return -ENOMEM;
		goto error_get_src;
	}

	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt);
	if (rc) {
		SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n");
		return -ENOMEM;
		goto error_get_dst;
	}

@@ -472,8 +485,13 @@ _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *s
		comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform;
	}

	rte_compressdev_enqueue_burst(cdev_id, 0, &comp_op, 1);
	return rc;
	rc = rte_compressdev_enqueue_burst(cdev_id, 0, &comp_op, 1);
	assert(rc <= 1);

	/* We always expect 1 got queued, if 0 then we need to queue it up. */
	if (rc == 1) {
		return 0;
	}

	/* Error cleanup paths. */
	for (i = 0; i < dst_iovcnt; i++) {
@@ -485,7 +503,23 @@ error_get_dst:
	}
error_get_src:
	rte_comp_op_free(comp_op);
	return rc;
error_get_op:
	op_to_queue = calloc(1, sizeof(struct vbdev_comp_op));
	if (op_to_queue == NULL) {
		SPDK_ERRLOG("unable to alocate operation for queueing.\n");
		return -ENOMEM;
	}
	op_to_queue->backing_dev = backing_dev;
	op_to_queue->src_iovs = src_iovs;
	op_to_queue->src_iovcnt = src_iovcnt;
	op_to_queue->dst_iovs = dst_iovs;
	op_to_queue->dst_iovcnt = dst_iovcnt;
	op_to_queue->compress = compress;
	op_to_queue->cb_arg = cb_arg;
	TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops,
			  op_to_queue,
			  link);
	return 0;
}

/* Poller for the DPDK compression driver. */
@@ -497,12 +531,14 @@ comp_dev_poller(void *args)
	struct rte_comp_op *deq_ops;
	uint16_t num_deq;
	struct spdk_reduce_vol_cb_args *reduce_args;
	struct vbdev_comp_op *op_to_resubmit;
	int rc;

	num_deq = rte_compressdev_dequeue_burst(cdev_id, 0, &deq_ops, 1);

	if (num_deq > 0) {
		reduce_args = (struct spdk_reduce_vol_cb_args *)deq_ops->m_src->userdata;

		assert(num_deq == 1);
		if (deq_ops->status != RTE_COMP_OP_STATUS_SUCCESS) {
			SPDK_ERRLOG("deque status %u\n", deq_ops->status);

@@ -526,6 +562,23 @@ comp_dev_poller(void *args)
		rte_pktmbuf_free(deq_ops->m_src);
		rte_pktmbuf_free(deq_ops->m_dst);
		rte_comp_op_free(deq_ops);

		/* Check if there are any pending comp ops to process */
		while (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) {
			op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops);
			rc = _compress_operation(op_to_resubmit->backing_dev,
						 op_to_resubmit->src_iovs,
						 op_to_resubmit->src_iovcnt,
						 op_to_resubmit->dst_iovs,
						 op_to_resubmit->dst_iovcnt,
						 op_to_resubmit->compress,
						 op_to_resubmit->cb_arg);
			if (rc == 0) {
				TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link);
				free(op_to_resubmit);
			}
			break;
		}
	}
	return 0;
}
@@ -1053,6 +1106,9 @@ comp_bdev_ch_create_cb(void *io_device, void *ctx_buf)
	/* We use this queue to track outstanding IO in our lyaer. */
	TAILQ_INIT(&comp_bdev->pending_comp_ios);

	/* We use this to queue up compression operations as needed. */
	TAILQ_INIT(&comp_bdev->queued_comp_ops);

	/* Now set the reduce channel if it's not already set. */
	pthread_mutex_lock(&comp_bdev->reduce_lock);
	if (comp_bdev->ch_count == 0) {