Commit 0cecfcb1 authored by paul luse's avatar paul luse Committed by Jim Harris
Browse files

examples/accel/perf: replace mempool of tasks with lists of tasks



For performance.  Also allocate the accurate number of tasks instead
of just blindly doubling to account for batching.

Signed-off-by: default avatarpaul luse <paul.e.luse@intel.com>
Change-Id: I4e5accfd97477f908368f16d3fc1d8f47896a4d6
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/3321


Community-CI: Mellanox Build Bot
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarBen Walker <benjamin.walker@intel.com>
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
parent f7d25cf7
Loading
Loading
Loading
Loading
+43 −26
Original line number Diff line number Diff line
@@ -39,6 +39,7 @@
#include "spdk/string.h"
#include "spdk/accel_engine.h"
#include "spdk/crc32.h"
#include "spdk/util.h"

#define DATA_PATTERN 0x5a
#define ALIGN_4K 0x1000
@@ -59,6 +60,7 @@ static struct worker_thread *g_workers = NULL;
static int g_num_workers = 0;
static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER;
uint64_t g_capabilites;
struct ap_task;

struct worker_thread {
	struct spdk_io_channel		*ch;
@@ -66,7 +68,7 @@ struct worker_thread {
	uint64_t			xfer_failed;
	uint64_t			injected_miscompares;
	uint64_t			current_queue_depth;
	struct spdk_mempool		*task_pool;
	TAILQ_HEAD(, ap_task)		tasks;
	struct worker_thread		*next;
	unsigned			core;
	struct spdk_thread		*thread;
@@ -82,6 +84,7 @@ struct ap_task {
	struct worker_thread	*worker;
	int			status;
	int			expected_status; /* used for compare */
	TAILQ_ENTRY(ap_task)	link;
};

static void
@@ -170,8 +173,13 @@ static void
unregister_worker(void *arg1)
{
	struct worker_thread *worker = arg1;
	struct ap_task *task;

	spdk_mempool_free(worker->task_pool);
	while (!TAILQ_EMPTY(&worker->tasks)) {
		task = TAILQ_FIRST(&worker->tasks);
		TAILQ_REMOVE(&worker->tasks, task, link);
		free(task);
	}
	spdk_put_io_channel(worker->ch);
	pthread_mutex_lock(&g_workers_lock);
	assert(g_num_workers >= 1);
@@ -307,7 +315,7 @@ _accel_done(void *arg1)
		if (g_workload_selection == ACCEL_DUALCAST) {
			spdk_free(task->dst2);
		}
		spdk_mempool_put(worker->task_pool, task);
		TAILQ_INSERT_TAIL(&worker->tasks, task, link);
	}
}

@@ -318,7 +326,7 @@ batch_done(void *cb_arg, int status)
	struct worker_thread *worker = task->worker;

	worker->current_queue_depth--;
	spdk_mempool_put(worker->task_pool, task);
	TAILQ_INSERT_TAIL(&worker->tasks, task, link);
}

static int
@@ -486,9 +494,8 @@ static void
_init_thread(void *arg1)
{
	struct worker_thread *worker;
	char task_pool_name[30];
	struct ap_task *task;
	int i, rc, max_per_batch, batch_count;
	int i, rc, max_per_batch, batch_count, num_tasks;
	int remaining = g_queue_depth;
	struct spdk_accel_batch *batch, *new_batch;

@@ -503,16 +510,19 @@ _init_thread(void *arg1)
	worker->next = g_workers;
	worker->ch = spdk_accel_engine_get_io_channel();

	snprintf(task_pool_name, sizeof(task_pool_name), "task_pool_%d", g_num_workers);
	worker->task_pool = spdk_mempool_create(task_pool_name,
						g_queue_depth * 2,
						sizeof(struct ap_task),
						SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
						SPDK_ENV_SOCKET_ID_ANY);
	if (!worker->task_pool) {
		fprintf(stderr, "Could not allocate buffer pool.\n");
		free(worker);
	max_per_batch = spdk_accel_batch_get_max(worker->ch);
	assert(max_per_batch > 0);
	num_tasks = g_queue_depth + spdk_divide_round_up(g_queue_depth, max_per_batch);

	TAILQ_INIT(&worker->tasks);
	for (i = 0; i < num_tasks; i++) {
		task = calloc(1, sizeof(struct ap_task));
		if (task == NULL) {
			fprintf(stderr, "Could not allocate task.\n");
			return;
			/* TODO cleanup */
		}
		TAILQ_INSERT_TAIL(&worker->tasks, task, link);
	}

	/* Register a poller that will stop the worker at time elapsed */
@@ -527,9 +537,6 @@ _init_thread(void *arg1)
	/* Batching is only possible if there is at least 2 operations. */
	if (g_queue_depth > 1) {

		/* Selected engine supports batching and we have enough, so do it. */
		max_per_batch = spdk_accel_batch_get_max(worker->ch);

		/* Outter loop sets up each batch command, inner loop populates the
		 * batch descriptors.
		 */
@@ -543,8 +550,10 @@ _init_thread(void *arg1)
			batch_count = 0;

			do {
				task = spdk_mempool_get(worker->task_pool);
				if (!task) {
				if (!TAILQ_EMPTY(&worker->tasks)) {
					task = TAILQ_FIRST(&worker->tasks);
					TAILQ_REMOVE(&worker->tasks, task, link);
				} else {
					fprintf(stderr, "Unable to get accel_task\n");
					goto error;
				}
@@ -566,8 +575,10 @@ _init_thread(void *arg1)
			} while (batch_count < max_per_batch && remaining > 0);

			/* Now send the batch command. */
			task = spdk_mempool_get(worker->task_pool);
			if (!task) {
			if (!TAILQ_EMPTY(&worker->tasks)) {
				task = TAILQ_FIRST(&worker->tasks);
				TAILQ_REMOVE(&worker->tasks, task, link);
			} else {
				fprintf(stderr, "Unable to get accel_task\n");
				goto error;
			}
@@ -593,8 +604,10 @@ _init_thread(void *arg1)
	 */
	for (i = 0; i < remaining; i++) {

		task = spdk_mempool_get(worker->task_pool);
		if (!task) {
		if (!TAILQ_EMPTY(&worker->tasks)) {
			task = TAILQ_FIRST(&worker->tasks);
			TAILQ_REMOVE(&worker->tasks, task, link);
		} else {
			fprintf(stderr, "Unable to get accel_task\n");
			goto error;
		}
@@ -610,7 +623,11 @@ _init_thread(void *arg1)
error:
	/* TODO clean exit */
	raise(SIGINT);
	spdk_mempool_free(worker->task_pool);
	while (!TAILQ_EMPTY(&worker->tasks)) {
		task = TAILQ_FIRST(&worker->tasks);
		TAILQ_REMOVE(&worker->tasks, task, link);
		free(task);
	}
	free(worker);
	spdk_app_stop(-1);
}