Commit 27e85f52 authored by Ben Walker's avatar Ben Walker Committed by Tomasz Zawadzki
Browse files

accel_perf: Add compress and decompress support



Compression and decompression both start with an uncompressed file,
provided using the new -l command line option. The file is then
compressed in chunks according to the specified block size. Each
operation works on one chunk.

Change-Id: Ia7d3853627d938f73e6aa3ee09fccd11d9bca706
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
Signed-off-by: default avatarPaul Luse <paul.e.luser@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/14681


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarAleksey Marchuk <alexeymar@nvidia.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
parent f43678c1
Loading
Loading
Loading
Loading
+340 −6
Original line number Diff line number Diff line
@@ -36,9 +36,26 @@ static const char *g_workload_type = NULL;
static enum accel_opcode g_workload_selection;
static struct worker_thread *g_workers = NULL;
static int g_num_workers = 0;
static char *g_cd_file_in_name = NULL;
static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER;
static struct spdk_app_opts g_opts = {};

struct ap_compress_seg {
	void		*uncompressed_data;
	uint32_t	uncompressed_len;
	struct iovec	*uncompressed_iovs;
	uint32_t	uncompressed_iovcnt;

	void		*compressed_data;
	uint32_t	compressed_len;
	struct iovec	*compressed_iovs;
	uint32_t	compressed_iovcnt;

	STAILQ_ENTRY(ap_compress_seg)	link;
};

static STAILQ_HEAD(, ap_compress_seg) g_compress_segs = STAILQ_HEAD_INITIALIZER(g_compress_segs);

struct worker_thread;
static void accel_done(void *ref, int status);

@@ -56,6 +73,8 @@ struct ap_task {
	void			*dst;
	void			*dst2;
	uint32_t		crc_dst;
	uint32_t		compressed_sz;
	struct ap_compress_seg *cur_seg;
	struct worker_thread	*worker;
	int			expected_status; /* used for the compare operation */
	TAILQ_ENTRY(ap_task)	link;
@@ -109,6 +128,9 @@ dump_user_config(void)
	}
	printf("vector count    %u\n", g_chained_count);
	printf("Module:         %s\n", module_name);
	if (g_workload_selection == ACCEL_OPC_COMPRESS || g_workload_selection == ACCEL_OPC_DECOMPRESS) {
		printf("File Name:      %s\n", g_cd_file_in_name);
	}
	printf("Queue depth:    %u\n", g_queue_depth);
	printf("Allocate depth: %u\n", g_allocate_depth);
	printf("# threads/core: %u\n", g_threads_per_core);
@@ -125,9 +147,10 @@ usage(void)
	printf("\t[-C for supported workloads, use this value to configure the io vector size to test (default 1)\n");
	printf("\t[-T number of threads per core\n");
	printf("\t[-n number of channels]\n");
	printf("\t[-o transfer size in bytes]\n");
	printf("\t[-o transfer size in bytes (default: 4KiB. For compress/decompress, 0 means the input file size)]\n");
	printf("\t[-t time in seconds]\n");
	printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, dualcast\n");
	printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, compress, decompress, dualcast\n");
	printf("\t[-l for compress/decompress workloads, name of uncompressed input file\n");
	printf("\t[-s for crc32c workload, use this seed value (default 0)\n");
	printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n");
	printf("\t[-f for fill workload, use this BYTE value (default 255)\n");
@@ -169,6 +192,9 @@ parse_args(int argc, char *argv)
	case 'C':
		g_chained_count = argval;
		break;
	case 'l':
		g_cd_file_in_name = optarg;
		break;
	case 'f':
		g_fill_pattern = (uint8_t)argval;
		break;
@@ -207,6 +233,10 @@ parse_args(int argc, char *argv)
			g_workload_selection = ACCEL_OPC_COMPARE;
		} else if (!strcmp(g_workload_type, "dualcast")) {
			g_workload_selection = ACCEL_OPC_DUALCAST;
		} else if (!strcmp(g_workload_type, "compress")) {
			g_workload_selection = ACCEL_OPC_COMPRESS;
		} else if (!strcmp(g_workload_type, "decompress")) {
			g_workload_selection = ACCEL_OPC_DECOMPRESS;
		} else {
			usage();
			return 1;
@@ -239,6 +269,29 @@ unregister_worker(void *arg1)
	pthread_mutex_unlock(&g_workers_lock);
}

static void
accel_perf_construct_iovs(void *buf, uint64_t sz, struct iovec *iovs, uint32_t iovcnt)
{
	uint64_t ele_size;
	uint8_t *data;
	uint32_t i;

	ele_size = spdk_divide_round_up(sz, iovcnt);

	data = buf;
	for (i = 0; i < iovcnt; i++) {
		ele_size = spdk_min(ele_size, sz);
		assert(ele_size > 0);

		iovs[i].iov_base = data;
		iovs[i].iov_len = ele_size;

		data += ele_size;
		sz -= ele_size;
	}
	assert(sz == 0);
}

static int
_get_task_data_bufs(struct ap_task *task)
{
@@ -253,7 +306,11 @@ _get_task_data_bufs(struct ap_task *task)
		align = ALIGN_4K;
	}

	if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
	if (g_workload_selection == ACCEL_OPC_COMPRESS ||
	    g_workload_selection == ACCEL_OPC_DECOMPRESS) {
		task->cur_seg = STAILQ_FIRST(&g_compress_segs);
	} else if (g_workload_selection == ACCEL_OPC_CRC32C ||
		   g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
		assert(g_chained_count > 0);
		task->src_iovcnt = g_chained_count;
		task->src_iovs = calloc(task->src_iovcnt, sizeof(struct iovec));
@@ -303,6 +360,16 @@ _get_task_data_bufs(struct ap_task *task)
		} else {
			memset(task->dst, ~DATA_PATTERN, dst_buff_len);
		}

		if (g_workload_selection == ACCEL_OPC_DECOMPRESS) {
			task->dst_iovs = calloc(g_chained_count, sizeof(struct iovec));
			if (!task->dst_iovs) {
				fprintf(stderr, "cannot allocate task->dst_iovs for task=%p\n", task);
				return -ENOMEM;
			}
			task->dst_iovcnt = g_chained_count;
			accel_perf_construct_iovs(task->dst, dst_buff_len, task->dst_iovs, task->dst_iovcnt);
		}
	}

	/* For dualcast 2 buffers are needed for the operation.  */
@@ -379,6 +446,18 @@ _submit_single(struct worker_thread *worker, struct ap_task *task)
		rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2,
						task->src, g_xfer_size_bytes, flags, accel_done, task);
		break;
	case ACCEL_OPC_COMPRESS:
		task->src_iovs = task->cur_seg->uncompressed_iovs;
		task->src_iovcnt = task->cur_seg->uncompressed_iovcnt;
		rc = spdk_accel_submit_compress(worker->ch, task->dst, g_xfer_size_bytes, task->src_iovs,
						task->src_iovcnt, &task->compressed_sz, flags, accel_done, task);
		break;
	case ACCEL_OPC_DECOMPRESS:
		task->src_iovs = task->cur_seg->compressed_iovs;
		task->src_iovcnt = task->cur_seg->compressed_iovcnt;
		rc = spdk_accel_submit_decompress(worker->ch, task->dst_iovs, task->dst_iovcnt, task->src_iovs,
						  task->src_iovcnt, flags, accel_done, task);
		break;
	default:
		assert(false);
		break;
@@ -396,7 +475,10 @@ _free_task_buffers(struct ap_task *task)
{
	uint32_t i;

	if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
	if (g_workload_selection == ACCEL_OPC_DECOMPRESS) {
		free(task->dst_iovs);
	} else if (g_workload_selection == ACCEL_OPC_CRC32C ||
		   g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
		if (task->src_iovs) {
			for (i = 0; i < task->src_iovcnt; i++) {
				if (task->src_iovs[i].iov_base) {
@@ -493,12 +575,28 @@ accel_done(void *arg1, int status)
			break;
		case ACCEL_OPC_COMPARE:
			break;
		case ACCEL_OPC_COMPRESS:
			break;
		case ACCEL_OPC_DECOMPRESS:
			if (memcmp(task->dst, task->cur_seg->uncompressed_data, task->cur_seg->uncompressed_len)) {
				SPDK_NOTICELOG("Data miscompare on decompression\n");
				worker->xfer_failed++;
			}
			break;
		default:
			assert(false);
			break;
		}
	}

	if (worker->workload == ACCEL_OPC_COMPRESS || g_workload_selection == ACCEL_OPC_DECOMPRESS) {
		/* Advance the task to the next segment */
		task->cur_seg = STAILQ_NEXT(task->cur_seg, link);
		if (task->cur_seg == NULL) {
			task->cur_seg = STAILQ_FIRST(&g_compress_segs);
		}
	}

	if (task->expected_status == -EILSEQ) {
		assert(status != 0);
		worker->injected_miscompares++;
@@ -715,6 +813,239 @@ accel_perf_start(void *arg1)
	}
}

static void
accel_perf_free_compress_segs(void)
{
	struct ap_compress_seg *seg, *tmp;

	STAILQ_FOREACH_SAFE(seg, &g_compress_segs, link, tmp) {
		free(seg->uncompressed_iovs);
		free(seg->compressed_iovs);
		spdk_dma_free(seg->compressed_data);
		spdk_dma_free(seg->uncompressed_data);
		STAILQ_REMOVE_HEAD(&g_compress_segs, link);
		free(seg);
	}
}

struct accel_perf_prep_ctx {
	FILE			*file;
	long			remaining;
	struct spdk_io_channel	*ch;
	struct ap_compress_seg	*cur_seg;
};

static void accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx);

static void
accel_perf_prep_process_seg_cpl(void *ref, int status)
{
	struct accel_perf_prep_ctx *ctx = ref;
	struct ap_compress_seg *seg;

	if (status != 0) {
		fprintf(stderr, "error (%d) on initial compress completion\n", status);
		spdk_dma_free(ctx->cur_seg->compressed_data);
		spdk_dma_free(ctx->cur_seg->uncompressed_data);
		free(ctx->cur_seg);
		spdk_put_io_channel(ctx->ch);
		fclose(ctx->file);
		free(ctx);
		spdk_app_stop(-status);
		return;
	}

	seg = ctx->cur_seg;

	if (g_workload_selection == ACCEL_OPC_DECOMPRESS) {
		seg->compressed_iovs = calloc(g_chained_count, sizeof(struct iovec));
		if (seg->compressed_iovs == NULL) {
			fprintf(stderr, "unable to allocate iovec\n");
			spdk_dma_free(seg->compressed_data);
			spdk_dma_free(seg->uncompressed_data);
			free(seg);
			spdk_put_io_channel(ctx->ch);
			fclose(ctx->file);
			free(ctx);
			spdk_app_stop(-ENOMEM);
			return;
		}
		seg->compressed_iovcnt = g_chained_count;

		accel_perf_construct_iovs(seg->compressed_data, seg->compressed_len, seg->compressed_iovs,
					  seg->compressed_iovcnt);
	}

	STAILQ_INSERT_TAIL(&g_compress_segs, seg, link);
	ctx->remaining -= seg->uncompressed_len;

	accel_perf_prep_process_seg(ctx);
}

static void
accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx)
{
	struct ap_compress_seg *seg;
	int sz, sz_read;
	void *ubuf, *cbuf;
	struct iovec iov[1];
	int rc;

	if (ctx->remaining == 0) {
		spdk_put_io_channel(ctx->ch);
		fclose(ctx->file);
		free(ctx);
		accel_perf_start(NULL);
		return;
	}

	sz = spdk_min(ctx->remaining, g_xfer_size_bytes);

	ubuf = spdk_dma_zmalloc(sz, ALIGN_4K, NULL);
	if (!ubuf) {
		fprintf(stderr, "unable to allocate uncompress buffer\n");
		rc = -ENOMEM;
		goto error;
	}

	cbuf = spdk_dma_malloc(sz, ALIGN_4K, NULL);
	if (!cbuf) {
		fprintf(stderr, "unable to allocate compress buffer\n");
		rc = -ENOMEM;
		spdk_dma_free(ubuf);
		goto error;
	}

	seg = calloc(1, sizeof(*seg));
	if (!seg) {
		fprintf(stderr, "unable to allocate comp/decomp segment\n");
		spdk_dma_free(ubuf);
		spdk_dma_free(cbuf);
		rc = -ENOMEM;
		goto error;
	}

	sz_read = fread(ubuf, sizeof(uint8_t), sz, ctx->file);
	if (sz_read != sz) {
		fprintf(stderr, "unable to read input file\n");
		free(seg);
		spdk_dma_free(ubuf);
		spdk_dma_free(cbuf);
		rc = -errno;
		goto error;
	}

	if (g_workload_selection == ACCEL_OPC_COMPRESS) {
		seg->uncompressed_iovs = calloc(g_chained_count, sizeof(struct iovec));
		if (seg->uncompressed_iovs == NULL) {
			fprintf(stderr, "unable to allocate iovec\n");
			free(seg);
			spdk_dma_free(ubuf);
			spdk_dma_free(cbuf);
			rc = -ENOMEM;
			goto error;
		}
		seg->uncompressed_iovcnt = g_chained_count;
		accel_perf_construct_iovs(ubuf, sz, seg->uncompressed_iovs, seg->uncompressed_iovcnt);
	}

	seg->uncompressed_data = ubuf;
	seg->uncompressed_len = sz;
	seg->compressed_data = cbuf;
	seg->compressed_len = sz;

	ctx->cur_seg = seg;
	iov[0].iov_base = seg->uncompressed_data;
	iov[0].iov_len = seg->uncompressed_len;
	/* Note that anytime a call is made to spdk_accel_submit_compress() there's a chance
	 * it will fail with -ENOMEM in the event that the destination buffer is not large enough
	 * to hold the compressed data.  This example app simply uses the same size as the input
	 * buffer which will work for example purposes but when using the API in your application
	 * be sure to allocate enough room in the destination buffer for cases where the data is
	 * no compressible, the addition of header information will cause it to be larger than the
	 * original input.
	 */
	rc = spdk_accel_submit_compress(ctx->ch, seg->compressed_data, seg->compressed_len, iov, 1,
					&seg->compressed_len, 0, accel_perf_prep_process_seg_cpl, ctx);
	if (rc < 0) {
		fprintf(stderr, "error (%d) on initial compress submission\n", rc);
		goto error;
	}

	return;

error:
	spdk_put_io_channel(ctx->ch);
	fclose(ctx->file);
	free(ctx);
	spdk_app_stop(rc);
}

static void
accel_perf_prep(void *arg1)
{
	struct accel_perf_prep_ctx *ctx;
	int rc = 0;

	if (g_workload_selection != ACCEL_OPC_COMPRESS &&
	    g_workload_selection != ACCEL_OPC_DECOMPRESS) {
		accel_perf_start(arg1);
		return;
	}

	if (g_cd_file_in_name == NULL) {
		fprintf(stdout, "A filename is required.\n");
		rc = -EINVAL;
		goto error_end;
	}

	if (g_workload_selection == ACCEL_OPC_COMPRESS && g_verify) {
		fprintf(stdout, "\nCompression does not support the verify option, aborting.\n");
		rc = -ENOTSUP;
		goto error_end;
	}

	printf("Preparing input file...\n");

	ctx = calloc(1, sizeof(*ctx));
	if (ctx == NULL) {
		rc = -ENOMEM;
		goto error_end;
	}

	ctx->file = fopen(g_cd_file_in_name, "r");
	if (ctx->file == NULL) {
		fprintf(stderr, "Could not open file %s.\n", g_cd_file_in_name);
		rc = -errno;
		goto error_ctx;
	}

	fseek(ctx->file, 0L, SEEK_END);
	ctx->remaining = ftell(ctx->file);
	fseek(ctx->file, 0L, SEEK_SET);

	ctx->ch = spdk_accel_get_io_channel();
	if (ctx->ch == NULL) {
		rc = -EAGAIN;
		goto error_file;
	}

	if (g_xfer_size_bytes == 0) {
		/* size of 0 means "file at a time" */
		g_xfer_size_bytes = ctx->remaining;
	}

	accel_perf_prep_process_seg(ctx);
	return;

error_file:
	fclose(ctx->file);
error_ctx:
	free(ctx);
error_end:
	spdk_app_stop(rc);
}

int
main(int argc, char **argv)
{
@@ -724,7 +1055,7 @@ main(int argc, char **argv)
	spdk_app_opts_init(&g_opts, sizeof(g_opts));
	g_opts.name = "accel_perf";
	g_opts.reactor_mask = "0x1";
	if (spdk_app_parse_args(argc, argv, &g_opts, "a:C:o:q:t:yw:P:f:T:", NULL, parse_args,
	if (spdk_app_parse_args(argc, argv, &g_opts, "a:C:o:q:t:yw:P:f:T:l:", NULL, parse_args,
				usage) != SPDK_APP_PARSE_ARGS_SUCCESS) {
		g_rc = -1;
		goto cleanup;
@@ -735,6 +1066,8 @@ main(int argc, char **argv)
	    (g_workload_selection != ACCEL_OPC_CRC32C) &&
	    (g_workload_selection != ACCEL_OPC_COPY_CRC32C) &&
	    (g_workload_selection != ACCEL_OPC_COMPARE) &&
	    (g_workload_selection != ACCEL_OPC_COMPRESS) &&
	    (g_workload_selection != ACCEL_OPC_DECOMPRESS) &&
	    (g_workload_selection != ACCEL_OPC_DUALCAST)) {
		usage();
		g_rc = -1;
@@ -759,7 +1092,7 @@ main(int argc, char **argv)
		goto cleanup;
	}

	g_rc = spdk_app_start(&g_opts, accel_perf_start, NULL);
	g_rc = spdk_app_start(&g_opts, accel_perf_prep, NULL);
	if (g_rc) {
		SPDK_ERRLOG("ERROR starting application\n");
	}
@@ -773,6 +1106,7 @@ main(int argc, char **argv)
		worker = tmp;
	}
cleanup:
	accel_perf_free_compress_segs();
	spdk_app_fini();
	return g_rc;
}
+10 −0
Original line number Diff line number Diff line
@@ -14,3 +14,13 @@ run_test "accel_copy_crc32c" $SPDK_EXAMPLE_DIR/accel_perf -t 1 -w copy_crc32c -y
run_test "accel_copy_crc32c_C2" $SPDK_EXAMPLE_DIR/accel_perf -t 1 -w copy_crc32c -y -C 2
run_test "accel_dualcast" $SPDK_EXAMPLE_DIR/accel_perf -t 1 -w dualcast -y
run_test "accel_compare" $SPDK_EXAMPLE_DIR/accel_perf -t 1 -w compare -y
# do not run compress/decompress unless ISAL is installed
if [[ $CONFIG_ISAL == y ]]; then
	run_test "accel_comp" $SPDK_EXAMPLE_DIR/accel_perf -t 1 -w compress -l $testdir/bib
	run_test "accel_decomp" $SPDK_EXAMPLE_DIR/accel_perf -t 1 -w decompress -l $testdir/bib -y
	run_test "accel_decmop_full" $SPDK_EXAMPLE_DIR/accel_perf -t 1 -w decompress -l $testdir/bib -y -o 0
	run_test "accel_decomp_mcore" $SPDK_EXAMPLE_DIR/accel_perf -t 1 -w decompress -l $testdir/bib -y -m 0xf
	run_test "accel_decomp_full_mcore" $SPDK_EXAMPLE_DIR/accel_perf -t 1 -w decompress -l $testdir/bib -y -o 0 -m 0xf
	run_test "accel_decomp_mthread" $SPDK_EXAMPLE_DIR/accel_perf -t 1 -w decompress -l $testdir/bib -y -T 2
	run_test "accel_deomp_full_mthread" $SPDK_EXAMPLE_DIR/accel_perf -t 1 -w decompress -l $testdir/bib -y -o 0 -T 2
fi

test/accel/bib

0 → 100644
+6280 −0

File added.

Preview size limit exceeded, changes collapsed.