Commit 7b641c75 authored by paul luse's avatar paul luse Committed by Jim Harris
Browse files

bdev/compress: change how we manage mbufs



In working on a future patch to queue compress operations, I found
some corruptions in our mbuf pool stemming from an inconsistency
in using mbuf macros when constructing a compress operation and
how we allocated and freed both the pool and the mbufs.

This also fixes an issue that I hadn't addressed yet with freeing
chained mbufs.  Use of the mbuf macro to free them instead of the
generic spdk functions takes care of that for us.

Change-Id: I7b29a0d740ab745574698c721d575d8a735ad5cb
Signed-off-by: default avatarpaul luse <paul.e.luse@intel.com>
Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/453028


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
parent f123e841
Loading
Loading
Loading
Loading
+39 −21
Original line number Diff line number Diff line
@@ -135,8 +135,9 @@ struct comp_bdev_io {
};

/* Shared mempools between all devices on this system */
static struct spdk_mempool *g_mbuf_mp = NULL;			/* mbuf mempool */
static struct rte_mempool *g_mbuf_mp = NULL;			/* mbuf mempool */
static struct rte_mempool *g_comp_op_mp = NULL;			/* comp operations, must be rte* mempool */
static struct rte_mbuf_ext_shared_info g_shinfo = {};		/* used by DPDK mbuf macros */

static void vbdev_compress_examine(struct spdk_bdev *bdev);
static void vbdev_compress_claim(struct vbdev_compress *comp_bdev);
@@ -145,6 +146,15 @@ struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev *bdev);
static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf);

/* Dummy function used by DPDK to free ext attached buffers
 * to mbufs, we free them ourselves but this callback has to
 * be here.
 */
static void
shinfo_free_cb(void *arg1, void *arg2)
{
}

/* Called by vbdev_init_compress_drivers() to init each discovered compression device */
static int
create_compress_dev(uint8_t index, uint16_t num_lcores)
@@ -311,9 +321,8 @@ vbdev_init_compress_drivers(void)
		return -EINVAL;
	}

	g_mbuf_mp = spdk_mempool_create("comp_mbuf_mp", NUM_MBUFS, sizeof(struct rte_mbuf),
					SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
					SPDK_ENV_SOCKET_ID_ANY);
	g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE,
					    sizeof(struct rte_mbuf), 0, rte_socket_id());
	if (g_mbuf_mp == NULL) {
		SPDK_ERRLOG("Cannot create mbuf pool\n");
		rc = -ENOMEM;
@@ -336,6 +345,8 @@ vbdev_init_compress_drivers(void)
		}
	}

	g_shinfo.free_cb = shinfo_free_cb;

	return 0;

	/* Error cleanup paths. */
@@ -346,7 +357,7 @@ error_create_compress_devs:
	}
error_create_op:
error_create_mbuf:
	spdk_mempool_free(g_mbuf_mp);
	rte_mempool_free(g_mbuf_mp);

	return rc;
}
@@ -382,9 +393,12 @@ comp_dev_poller(void *args)
		}
		reduce_args->cb_fn(reduce_args->cb_arg, deq_ops->produced);

		/* Now bulk free both mbufs and the compress operation. */
		spdk_mempool_put(g_mbuf_mp, deq_ops->m_src);
		spdk_mempool_put(g_mbuf_mp, deq_ops->m_dst);
		/* Now free both mbufs and the compress operation. The rte_pktmbuf_free()
		 * call takes care of freeing all of the mbufs in the chain back to their
		 * original pool.
		 */
		rte_pktmbuf_free(deq_ops->m_src);
		rte_pktmbuf_free(deq_ops->m_dst);
		rte_comp_op_free(deq_ops);
	}
	return 0;
@@ -423,8 +437,7 @@ _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *s
	struct iovec *current_dst_iov = NULL;
	int iov_index;
	int rc = 0;
	struct rte_mbuf_ext_shared_info shinfo_src;
	struct rte_mbuf_ext_shared_info shinfo_dst;
	int i;

	assert(src_iovcnt < MAX_MBUFS_PER_OP);
	comp_op = rte_comp_op_alloc(g_comp_op_mp);
@@ -434,17 +447,17 @@ _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *s
	}

	/* get an mbuf per iov, src and dst */
	rc = spdk_mempool_get_bulk(g_mbuf_mp, (void **)&src_mbufs[0], src_iovcnt);
	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt);
	if (rc) {
		SPDK_ERRLOG("trying to get src_mbufs!\n");
		rc = -ENOMEM;
		SPDK_ERRLOG("ERROR trying to get src_mbufs!\n");
		return -ENOMEM;
		goto error_get_src;

	}
	rc = spdk_mempool_get_bulk(g_mbuf_mp, (void **)&dst_mbufs[0], dst_iovcnt);

	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt);
	if (rc) {
		SPDK_ERRLOG("trying to get dst_mbufs!\n");
		rc = -ENOMEM;
		SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n");
		return -ENOMEM;
		goto error_get_dst;
	}

@@ -464,7 +477,7 @@ _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *s
					  current_src_iov,
					  spdk_vtophys((void *)current_src_iov, NULL),
					  src_iovs[iov_index].iov_len,
					  &shinfo_src);
					  &g_shinfo);
		rte_pktmbuf_append(src_mbufs[iov_index], src_iovs[iov_index].iov_len);

		if (iov_index > 0) {
@@ -485,7 +498,7 @@ _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *s
					  current_dst_iov,
					  spdk_vtophys((void *)current_dst_iov, NULL),
					  dst_iovs[iov_index].iov_len,
					  &shinfo_dst);
					  &g_shinfo);
		rte_pktmbuf_append(dst_mbufs[iov_index], dst_iovs[iov_index].iov_len);

		if (iov_index > 0) {
@@ -505,8 +518,13 @@ _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *s
	return rc;

	/* Error cleanup paths. */
	for (i = 0; i < dst_iovcnt; i++) {
		rte_pktmbuf_free((struct rte_mbuf *)&dst_mbufs[i]);
	}
error_get_dst:
	spdk_mempool_put_bulk(g_mbuf_mp, (void **)&src_mbufs[0], src_iovcnt);
	for (i = 0; i < src_iovcnt; i++) {
		rte_pktmbuf_free((struct rte_mbuf *)&src_mbufs[i]);
	}
error_get_src:
	rte_comp_op_free(comp_op);
	return rc;
@@ -1149,7 +1167,7 @@ vbdev_compress_finish(void)
	pthread_mutex_destroy(&g_comp_device_qp_lock);

	rte_mempool_free(g_comp_op_mp);
	spdk_mempool_free(g_mbuf_mp);
	rte_mempool_free(g_mbuf_mp);
}

/* During init we'll be asked how much memory we'd like passed to us
+19 −0
Original line number Diff line number Diff line
@@ -109,6 +109,22 @@ mock_rte_comp_op_pool_create(const char *name, unsigned int nb_elts,
	return NULL;
}

void mock_rte_pktmbuf_free(struct rte_mbuf *m);
#define rte_pktmbuf_free mock_rte_pktmbuf_free
void mock_rte_pktmbuf_free(struct rte_mbuf *m)
{
}

static int ut_rte_pktmbuf_alloc_bulk = 0;
int mock_rte_pktmbuf_alloc_bulk(struct rte_mempool *pool, struct rte_mbuf **mbufs,
				unsigned count);
#define rte_pktmbuf_alloc_bulk mock_rte_pktmbuf_alloc_bulk
int mock_rte_pktmbuf_alloc_bulk(struct rte_mempool *pool, struct rte_mbuf **mbufs,
				unsigned count)
{
	return ut_rte_pktmbuf_alloc_bulk;
}

#include "bdev/compress/vbdev_compress.c"

/* SPDK stubs */
@@ -160,6 +176,9 @@ DEFINE_STUB(rte_compressdev_enqueue_burst, uint16_t,
	    (uint8_t dev_id, uint16_t qp_id, struct rte_comp_op **ops, uint16_t nb_ops), 0);
DEFINE_STUB_V(rte_comp_op_free, (struct rte_comp_op *op));
DEFINE_STUB(rte_comp_op_alloc, struct rte_comp_op *, (struct rte_mempool *mempool), NULL);
DEFINE_STUB(rte_pktmbuf_pool_create, struct rte_mempool *, (const char *name, unsigned n,
		unsigned cache_size, uint16_t priv_size,
		uint16_t data_room_size, int socket_id), NULL);

struct spdk_bdev_io *g_bdev_io;
struct spdk_io_channel *g_io_ch;