Commit 33eac886 authored by paul luse's avatar paul luse Committed by Jim Harris
Browse files

lib/idxd: refactor batching for increased performance



And to eliminate an artificial constraint on # of user descriptors.
The main idea here was to move from a single ring that covered all
user descriptors to a pre-allocated ring per pre-allocated batch.

In addition, the other major change here is in how we poll for
completions.  We used to poll the batch rings then the main ring.
Now when commands are prepared their completion address is added to
a per channel list and the poller simply runs through that list
not caring which ring the completion address belongs too. This
simplifies the completion logic considerably and will avoid
polling locations that can't potentially have a completion.

Some minor rework was included as well, mainly getting rid of the
ring_ctrl struct as it didn't serve much of a purpose anyway and
with how things are setup now its easier to read with all the
elements in the channel struct.

Also, a change that came in while this was WIP needed a few fixes
to function correctly.  Addressed those and moved them to a
helper function so we have one point of control for xlations.

Added support for NOP in cases where a batch is submitted with
only 1 descriptor.

Signed-off-by: default avatarpaul luse <paul.e.luse@intel.com>
Change-Id: Ie201b28118823100e908e0d1b08e7c10bb8fa9e7
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/3654


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Community-CI: Mellanox Build Bot
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarBen Walker <benjamin.walker@intel.com>
parent 17bb748a
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -34,7 +34,7 @@
SPDK_ROOT_DIR := $(abspath $(CURDIR)/../..)
include $(SPDK_ROOT_DIR)/mk/spdk.common.mk

SO_VER := 2
SO_VER := 3
SO_MINOR := 0

C_SRCS = idxd.c
+329 −329

File changed.

Preview size limit exceeded, changes collapsed.

+52 −32
Original line number Diff line number Diff line
@@ -59,25 +59,37 @@ static inline void movdir64b(void *dst, const void *src)
#define IDXD_REGISTER_TIMEOUT_US		50
#define IDXD_DRAIN_TIMEOUT_US			500000

/* TODO: make some of these RPC selectable */
#define WQ_MODE_DEDICATED	1
#define LOG2_WQ_MAX_BATCH	8  /* 2^8 = 256 */

/* TODO: consider setting the max per batch limit via RPC. */

/* The following sets up a max desc count per batch of 16 */
#define LOG2_WQ_MAX_BATCH	4  /* 2^4 = 16 */
#define DESC_PER_BATCH		(1 << LOG2_WQ_MAX_BATCH)
/* We decide how many batches we want to support based on what max queue
 * depth makes sense resource wise. There is a small price to pay with
 * larger numbers wrt polling for completions.
 */
#define NUM_BATCHES_PER_CHANNEL	0x400
#define MIN_USER_DESC_COUNT	2

#define LOG2_WQ_MAX_XFER	30 /* 2^30 = 1073741824 */
#define WQCFG_NUM_DWORDS	8
#define WQ_PRIORITY_1		1
#define IDXD_MAX_QUEUES		64

#define TOTAL_USER_DESC		(1 << LOG2_WQ_MAX_BATCH)
#define DESC_PER_BATCH		16 /* TODO maybe make this a startup RPC */
#define NUM_BATCHES		(TOTAL_USER_DESC / DESC_PER_BATCH)
#define MIN_USER_DESC_COUNT	2

/* Each pre-allocated batch structure goes on a per channel list and
 * contains the memory for both user descriptors. The array of comp_ctx
 * holds the list of completion contexts that we will add to the list
 * used by the poller.  The list is updated when the batch is submitted.
 */
struct idxd_batch {
	uint32_t			batch_desc_index;
	uint32_t			batch_num;
	uint32_t			cur_index;
	uint32_t			start_index;
	struct idxd_hw_desc		*user_desc;
	struct idxd_comp		*user_completions;
	uint32_t			remaining;
	bool				submitted;
	void				*comp_ctx[DESC_PER_BATCH];
	uint8_t				index;
	TAILQ_ENTRY(idxd_batch)		link;
};

@@ -90,40 +102,44 @@ struct device_config {
	uint16_t	total_engines;
};

struct idxd_ring_control {
	void				*portal;
struct idxd_comp ;

struct spdk_idxd_io_channel {
	struct spdk_idxd_device		*idxd;
	/* The portal is the address that we write descriptors to for submission. */
	void				*portal;
	uint16_t			ring_size;

	/*
	 * Rings for this channel, one for descriptors and one
	 * for completions, share the same index. Batch descriptors
	 * are managed independently from data descriptors.
	 * Descriptors and completions share the same index. User descriptors
	 * (those included in a batch) are managed independently from data descriptors
	 * and are located in the batch structure.
	 */
	struct idxd_hw_desc		*desc;
	struct idxd_comp		*completions;
	struct idxd_hw_desc		*user_desc;
	struct idxd_comp		*user_completions;

	/* Current list of oustanding completion addresses to poll. */
	TAILQ_HEAD(, idxd_comp)		comp_ctx_oustanding;

	/*
	 * We use one bit array to track ring slots for both
	 * desc and completions.
	 *
	 * TODO: We can get rid of the bit array and just use a uint
	 * to manage flow control as the current implementation saves
	 * enough info in comp_ctx that it doesn't need the index. Keeping
	 * the bit arrays for now as (a) they provide some extra debug benefit
	 * until we have silicon and (b) they may still be needed depending on
	 * polling implementation experiments that we need to run with real silicon.
	 */
	struct spdk_bit_array		*ring_slots;
	uint32_t			max_ring_slots;

	/*
	 * We use a separate bit array to track ring slots for
	 * descriptors submitted via the user in a batch.
	 */
	struct spdk_bit_array		*user_ring_slots;
};
	/* Lists of batches, free and in use. */
	TAILQ_HEAD(, idxd_batch)	batch_pool;
	TAILQ_HEAD(, idxd_batch)	batches;

struct spdk_idxd_io_channel {
	struct spdk_idxd_device		*idxd;
	struct idxd_ring_control	ring_ctrl;
	TAILQ_HEAD(, idxd_batch)	batch_pool; /* free batches */
	TAILQ_HEAD(, idxd_batch)	batches; /* in use batches */
	void				*batch_base;
};

struct pci_dev_id {
@@ -154,9 +170,13 @@ struct idxd_comp {
	void				*cb_arg;
	spdk_idxd_req_cb		cb_fn;
	struct idxd_batch		*batch;
	uint64_t			pad2;
} __attribute__((packed));
SPDK_STATIC_ASSERT(sizeof(struct idxd_comp) == 64, "size mismatch");
	bool				batch_op;
	struct idxd_hw_desc		*desc;
	uint32_t			index;
	char				pad[3];
	TAILQ_ENTRY(idxd_comp)		link;
};
SPDK_STATIC_ASSERT(sizeof(struct idxd_comp) == 96, "size mismatch");

struct idxd_wq {
	struct spdk_idxd_device		*idxd;
+23 −11
Original line number Diff line number Diff line
@@ -113,6 +113,7 @@ _process_single_task(struct spdk_io_channel *ch, struct spdk_accel_task *task)
{
	struct idxd_io_channel *chan = spdk_io_channel_get_ctx(ch);
	int rc = 0;
	uint8_t fill_pattern = (uint8_t)task->fill_pattern;

	switch (task->op_code) {
	case ACCEL_OPCODE_MEMMOVE:
@@ -126,6 +127,7 @@ _process_single_task(struct spdk_io_channel *ch, struct spdk_accel_task *task)
		rc = spdk_idxd_submit_compare(chan->chan, task->src, task->src2, task->nbytes, idxd_done, task);
		break;
	case ACCEL_OPCODE_MEMFILL:
		memset(&task->fill_pattern, fill_pattern, sizeof(uint64_t));
		rc = spdk_idxd_submit_fill(chan->chan, task->dst, task->fill_pattern, task->nbytes, idxd_done,
					   task);
		break;
@@ -148,6 +150,7 @@ idxd_submit_tasks(struct spdk_io_channel *ch, struct spdk_accel_task *first_task
	struct idxd_io_channel *chan = spdk_io_channel_get_ctx(ch);
	struct spdk_accel_task *task, *tmp, *batch_task;
	struct idxd_batch *idxd_batch;
	uint8_t fill_pattern;
	TAILQ_HEAD(, spdk_accel_task) batch_tasks;
	int rc = 0;
	uint32_t task_count = 0;
@@ -178,15 +181,26 @@ idxd_submit_tasks(struct spdk_io_channel *ch, struct spdk_accel_task *first_task
		return 0;
	}

	/* TODO: The DSA batch interface has performance tradeoffs that we need to measure with real
	 * silicon.  It's unclear which workloads will benefit from batching and with what sizes. Or
	 * if there are some cases where batching is more beneficial on the completion side by turning
	 * off completion notifications for elements within the batch. Need to run some experiments where
	 * we use the batching interface versus not to help provide guidance on how to use these batching
	 * API.  Here below is one such place, currently those batching using the framework will end up
	 * also using the DSA batch interface.  We could send each of these operations as single commands
	 * to the low level library.
	 */

	/* More than one task, create IDXD batch(es). */
	do {
		idxd_batch = spdk_idxd_batch_create(chan->chan);
		task_count = 0;
		if (idxd_batch == NULL) {
			/* Queue them all and try again later */
			goto queue_tasks;
		}

		task_count = 0;

		/* Keep track of each batch's tasks in case we need to cancel. */
		TAILQ_INIT(&batch_tasks);
		do {
@@ -204,6 +218,8 @@ idxd_submit_tasks(struct spdk_io_channel *ch, struct spdk_accel_task *first_task
								  task->nbytes, idxd_done, task);
				break;
			case ACCEL_OPCODE_MEMFILL:
				fill_pattern = (uint8_t)task->fill_pattern;
				memset(&task->fill_pattern, fill_pattern, sizeof(uint64_t));
				rc = spdk_idxd_batch_prep_fill(chan->chan, idxd_batch, task->dst, task->fill_pattern,
							       task->nbytes, idxd_done, task);
				break;
@@ -232,25 +248,21 @@ idxd_submit_tasks(struct spdk_io_channel *ch, struct spdk_accel_task *first_task
		if (!TAILQ_EMPTY(&batch_tasks)) {
			rc = spdk_idxd_batch_submit(chan->chan, idxd_batch, NULL, NULL);

			/* If we can't submit the batch, just destroy it and queue up all the operations
			 * from the latest batch and try again later. If this list was from an accel_fw batch,
			 * all of the batch info is still associated with the tasks that we're about to
			 * queue up so nothing is lost.
			 */
			if (rc) {
				/* Cancel the batch, requeue the items in the batch adn then
				 * any tasks that still hadn't been processed yet.
				 */
				spdk_idxd_batch_cancel(chan->chan, idxd_batch);

				while (!TAILQ_EMPTY(&batch_tasks)) {
					batch_task = TAILQ_FIRST(&batch_tasks);
					TAILQ_REMOVE(&batch_tasks, batch_task, link);
					TAILQ_INSERT_TAIL(&chan->queued_tasks, batch_task, link);
				}
				rc = 0;
				goto queue_tasks;
			}
		} else {
			/* the last batch task list was empty so all tasks had their cb_fn called. */
			rc = 0;
		}
	} while (task && rc == 0);
	} while (task);

	return 0;

+9 −9
Original line number Diff line number Diff line
@@ -95,7 +95,7 @@ test_idxd_wq_config(void)
{
	struct spdk_idxd_device idxd = {};
	union idxd_wqcfg wqcfg = {};
	uint32_t expected[8] = {0x10, 0, 0x11, 0x11e, 0, 0, 0x40000000, 0};
	uint32_t expected[8] = {0x10, 0, 0x11, 0x9e, 0, 0, 0x40000000, 0};
	uint32_t wq_size;
	int rc, i, j;

@@ -260,18 +260,18 @@ test_spdk_idxd_reconfigure_chan(void)
	uint32_t test_ring_size = 8;
	uint32_t num_channels = 2;

	chan.ring_ctrl.ring_slots = spdk_bit_array_create(test_ring_size);
	chan.ring_ctrl.ring_size = test_ring_size;
	chan.ring_ctrl.completions = spdk_zmalloc(test_ring_size * sizeof(struct idxd_hw_desc), 0, NULL,
	chan.ring_slots = spdk_bit_array_create(test_ring_size);
	chan.ring_size = test_ring_size;
	chan.completions = spdk_zmalloc(test_ring_size * sizeof(struct idxd_hw_desc), 0, NULL,
					SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
	SPDK_CU_ASSERT_FATAL(chan.ring_ctrl.completions != NULL);
	SPDK_CU_ASSERT_FATAL(chan.completions != NULL);

	rc = spdk_idxd_reconfigure_chan(&chan, num_channels);
	CU_ASSERT(rc == 0);
	CU_ASSERT(chan.ring_ctrl.max_ring_slots == test_ring_size / num_channels);
	CU_ASSERT(chan.max_ring_slots == test_ring_size / num_channels);

	spdk_bit_array_free(&chan.ring_ctrl.ring_slots);
	spdk_free(chan.ring_ctrl.completions);
	spdk_bit_array_free(&chan.ring_slots);
	spdk_free(chan.completions);
	return 0;
}