Commit ac9a1a83 authored by paul luse's avatar paul luse Committed by Tomasz Zawadzki
Browse files

examples/accel_perf: refactor task mgmt in prep for batching changes



* change how tasks are allocated and freed (simplifcation)
* added helper for getting and freeting a single task
* minor drive-by in chaning function parms for _submit_tasks()

Note that the task pool is used to manage tasks and their data
buffers.  It is fully allocated and populated before the first IO
is sent and tasks are never retired, they are re-used so they are
not removed from the list except for error or exit cleanup.

Signed-off-by: default avatarpaul luse <paul.e.luse@intel.com>
Change-Id: I5fea5ef8c989df6310f15b2c9bb4e8aef9bd3d3b
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/5487


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarZiye Yang <ziye.yang@intel.com>
parent 2600cd92
Loading
Loading
Loading
Loading
+61 −62
Original line number Diff line number Diff line
@@ -80,13 +80,14 @@ struct worker_thread {
	uint64_t			xfer_failed;
	uint64_t			injected_miscompares;
	uint64_t			current_queue_depth;
	TAILQ_HEAD(, ap_task)		tasks;
	TAILQ_HEAD(, ap_task)		tasks_pool;
	struct worker_thread		*next;
	unsigned			core;
	struct spdk_thread		*thread;
	bool				is_draining;
	struct spdk_poller		*is_draining_poller;
	struct spdk_poller		*stop_poller;
	void				*task_base;
};

static void
@@ -175,13 +176,8 @@ static void
unregister_worker(void *arg1)
{
	struct worker_thread *worker = arg1;
	struct ap_task *task;

	while (!TAILQ_EMPTY(&worker->tasks)) {
		task = TAILQ_FIRST(&worker->tasks);
		TAILQ_REMOVE(&worker->tasks, task, link);
		free(task);
	}
	free(worker->task_base);
	spdk_put_io_channel(worker->ch);
	pthread_mutex_lock(&g_workers_lock);
	assert(g_num_workers >= 1);
@@ -241,20 +237,34 @@ _get_task_data_bufs(struct ap_task *task)
	return 0;
}

inline static struct ap_task *
_get_task(struct worker_thread *worker)
{
	struct ap_task *task;

	if (!TAILQ_EMPTY(&worker->tasks_pool)) {
		task = TAILQ_FIRST(&worker->tasks_pool);
		TAILQ_REMOVE(&worker->tasks_pool, task, link);
	} else {
		fprintf(stderr, "Unable to get ap_task\n");
		return NULL;
	}

	task->worker = worker;
	task->worker->current_queue_depth++;
	return task;
}

static void accel_done(void *ref, int status);

static void
_submit_single(void *arg1, void *arg2)
_submit_single(struct worker_thread *worker, struct ap_task *task)
{
	struct worker_thread *worker = arg1;
	struct ap_task *task = arg2;
	int random_num;
	int rc = 0;

	assert(worker);

	task->worker = worker;
	task->worker->current_queue_depth++;
	switch (g_workload_selection) {
	case ACCEL_COPY:
		rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src,
@@ -297,6 +307,16 @@ _submit_single(void *arg1, void *arg2)
	}
}

static void
_free_task(struct ap_task *task)
{
	spdk_dma_free(task->src);
	spdk_dma_free(task->dst);
	if (g_workload_selection == ACCEL_DUALCAST) {
		spdk_dma_free(task->dst2);
	}
}

static void
_accel_done(void *arg1)
{
@@ -360,13 +380,7 @@ _accel_done(void *arg1)

	if (!worker->is_draining) {
		_submit_single(worker, task);
	} else {
		spdk_free(task->src);
		spdk_free(task->dst);
		if (g_workload_selection == ACCEL_DUALCAST) {
			spdk_free(task->dst2);
		}
		TAILQ_INSERT_TAIL(&worker->tasks, task, link);
		worker->current_queue_depth++;
	}
}

@@ -377,7 +391,6 @@ batch_done(void *cb_arg, int status)
	struct worker_thread *worker = task->worker;

	worker->current_queue_depth--;
	TAILQ_INSERT_TAIL(&worker->tasks, task, link);
}

static int
@@ -425,10 +438,15 @@ static int
_check_draining(void *arg)
{
	struct worker_thread *worker = arg;
	struct ap_task *task;

	assert(worker);

	if (worker->current_queue_depth == 0) {
		while ((task = TAILQ_FIRST(&worker->tasks_pool))) {
			TAILQ_REMOVE(&worker->tasks_pool, task, link);
			_free_task(task);
		}
		spdk_poller_unregister(&worker->is_draining_poller);
		unregister_worker(worker);
	}
@@ -516,15 +534,21 @@ _init_thread(void *arg1)
	assert(max_per_batch > 0);
	num_tasks = g_queue_depth + spdk_divide_round_up(g_queue_depth, max_per_batch);

	TAILQ_INIT(&worker->tasks);
	TAILQ_INIT(&worker->tasks_pool);
	worker->task_base = calloc(num_tasks, sizeof(struct ap_task));
	if (worker->task_base == NULL) {
		fprintf(stderr, "Could not allocate task base.\n");
		goto error;
	}

	task = worker->task_base;
	for (i = 0; i < num_tasks; i++) {
		task = calloc(1, sizeof(struct ap_task));
		if (task == NULL) {
			fprintf(stderr, "Could not allocate task.\n");
			return;
			/* TODO cleanup */
		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
		if (_get_task_data_bufs(task)) {
			fprintf(stderr, "Unable to get data bufs\n");
			goto error;
		}
		TAILQ_INSERT_TAIL(&worker->tasks, task, link);
		task++;
	}

	/* Register a poller that will stop the worker at time elapsed */
@@ -552,18 +576,8 @@ _init_thread(void *arg1)
			batch_count = 0;

			do {
				if (!TAILQ_EMPTY(&worker->tasks)) {
					task = TAILQ_FIRST(&worker->tasks);
					TAILQ_REMOVE(&worker->tasks, task, link);
				} else {
					fprintf(stderr, "Unable to get accel_task\n");
					goto error;
				}
				task->worker = worker;
				task->worker->current_queue_depth++;

				if (_get_task_data_bufs(task)) {
					fprintf(stderr, "Unable to get data bufs\n");
				task = _get_task(worker);
				if (task == NULL) {
					goto error;
				}

@@ -577,15 +591,10 @@ _init_thread(void *arg1)
			} while (batch_count < max_per_batch && remaining > 0);

			/* Now send the batch command. */
			if (!TAILQ_EMPTY(&worker->tasks)) {
				task = TAILQ_FIRST(&worker->tasks);
				TAILQ_REMOVE(&worker->tasks, task, link);
			} else {
				fprintf(stderr, "Unable to get accel_task\n");
			task = _get_task(worker);
			if (task == NULL) {
				goto error;
			}
			task->worker = worker;
			task->worker->current_queue_depth++;

			rc = spdk_accel_batch_submit(worker->ch, batch, batch_done, task);
			if (rc) {
@@ -606,16 +615,8 @@ _init_thread(void *arg1)
	 */
	for (i = 0; i < remaining; i++) {

		if (!TAILQ_EMPTY(&worker->tasks)) {
			task = TAILQ_FIRST(&worker->tasks);
			TAILQ_REMOVE(&worker->tasks, task, link);
		} else {
			fprintf(stderr, "Unable to get accel_task\n");
			goto error;
		}

		if (_get_task_data_bufs(task)) {
			fprintf(stderr, "Unable to get data bufs\n");
		task = _get_task(worker);
		if (task == NULL) {
			goto error;
		}

@@ -623,13 +624,11 @@ _init_thread(void *arg1)
	}
	return;
error:
	/* TODO clean exit */
	raise(SIGINT);
	while (!TAILQ_EMPTY(&worker->tasks)) {
		task = TAILQ_FIRST(&worker->tasks);
		TAILQ_REMOVE(&worker->tasks, task, link);
		free(task);
	while ((task = TAILQ_FIRST(&worker->tasks_pool))) {
		TAILQ_REMOVE(&worker->tasks_pool, task, link);
		_free_task(task);
	}
	free(worker->task_base);
	free(worker);
	spdk_app_stop(-1);
}