Commit 1e3c9893 authored by Michal Rozegnal's avatar Michal Rozegnal Committed by Tomasz Zawadzki
Browse files

test/fused_ordering: add multiple queue support



Fused ordering test used only single IO queue on a single core. Added
support for test launch on multiple cores with one IO queue for each
core.

Signed-off-by: default avatarMichal Rozegnal <michal.rozegnal@intel.com>
Change-Id: I8fe66a2e5806ba967079e562836a04e74354680c
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/18468


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarTomasz Zawadzki <tomasz.zawadzki@intel.com>
Reviewed-by: default avatarKonrad Sztyber <konrad.sztyber@intel.com>
parent 167ad6fb
Loading
Loading
Loading
Loading
+210 −42
Original line number Diff line number Diff line
@@ -9,55 +9,150 @@
#include "spdk/env.h"
#include "spdk/string.h"
#include "spdk/log.h"
#include "spdk/util.h"
#include "spdk/likely.h"

#define WRITE_BLOCKS 128
#define FUSED_BLOCKS 1
#define NO_WRITE_CMDS 8

struct worker_thread {
	TAILQ_ENTRY(worker_thread) link;
	unsigned lcore;
	void *cw_buf;
	void *large_buf;
	struct spdk_nvme_qpair *qpair;
	uint32_t poll_count;
	uint32_t outstanding;
	int status;
};

static struct spdk_nvme_ctrlr *g_ctrlr;
static struct spdk_nvme_ns *g_ns;
static struct spdk_nvme_qpair *g_qpair;
static struct spdk_nvme_transport_id g_trid = {};
static uint32_t g_outstanding;
static uint32_t g_num_workers = 0;
static TAILQ_HEAD(, worker_thread) g_workers = TAILQ_HEAD_INITIALIZER(g_workers);


static void
io_complete(void *arg, const struct spdk_nvme_cpl *cpl)
{
	struct worker_thread *worker = arg;

	if (spdk_nvme_cpl_is_error(cpl)) {
		spdk_nvme_print_completion(spdk_nvme_qpair_get_id(g_qpair),
		spdk_nvme_print_completion(spdk_nvme_qpair_get_id(worker->qpair),
					   (struct spdk_nvme_cpl *)cpl);
		exit(1);
	}

	g_outstanding--;
	worker->outstanding--;
}

#define WRITE_BLOCKS 128
#define FUSED_BLOCKS 1
static int
register_workers(void)
{
	uint32_t i;
	struct worker_thread *worker;

	SPDK_ENV_FOREACH_CORE(i) {
		worker = calloc(1, sizeof(*worker));
		if (worker == NULL) {
			fprintf(stderr, "Unable to allocate worker\n");
			return -1;
		}

		worker->lcore = i;
		TAILQ_INSERT_TAIL(&g_workers, worker, link);
		g_num_workers++;
	}

	return 0;
}

static void
fused_ordering(uint32_t poll_count)
unregister_workers(void)
{
	void *cw_buf, *large_buf;
	int rc;
	int i;
	struct worker_thread *worker, *tmp_worker;

	g_qpair = spdk_nvme_ctrlr_alloc_io_qpair(g_ctrlr, NULL, 0);
	if (g_qpair == NULL) {
		printf("ERROR: spdk_nvme_ctrlr_alloc_io_qpair() failed\n");
		exit(1);
	/* Free namespace context and worker thread */
	TAILQ_FOREACH_SAFE(worker, &g_workers, link, tmp_worker) {
		TAILQ_REMOVE(&g_workers, worker, link);
		free(worker);
	}
}

	cw_buf = spdk_zmalloc(FUSED_BLOCKS * 4096, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
static unsigned
init_workers(void)
{
	void *cw_buf = NULL, *large_buf = NULL;
	struct worker_thread *worker;
	int rc = 0;

	assert(g_num_workers);

	cw_buf = spdk_zmalloc(FUSED_BLOCKS * 4096, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY,
			      SPDK_MALLOC_DMA);
	if (cw_buf == NULL) {
		printf("ERROR: buffer allocation failed\n");
		return;
		printf("ERROR: buffer allocation failed.\n");
		rc = -1;
		goto error;
	}

	large_buf = spdk_zmalloc(WRITE_BLOCKS * 4096, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY,
				 SPDK_MALLOC_DMA);
	if (large_buf == NULL) {
		printf("ERROR: buffer allocation failed\n");
		return;
		printf("ERROR: buffer allocation failed.\n");
		rc = -1;
		goto error;
	}

	/* Free namespace context and worker thread */
	TAILQ_FOREACH(worker, &g_workers, link) {
		worker->qpair = spdk_nvme_ctrlr_alloc_io_qpair(g_ctrlr, NULL, 0);
		if (worker->qpair == NULL) {
			printf("ERROR: spdk_nvme_ctrlr_alloc_io_qpair() failed.\n");
			rc = -1;
			goto error;
		}
		worker->cw_buf = cw_buf;
		worker->large_buf = large_buf;
	}
	goto exit;

error:
	TAILQ_FOREACH(worker, &g_workers, link) {
		spdk_nvme_ctrlr_free_io_qpair(worker->qpair);
	}
	spdk_free(large_buf);
	spdk_free(cw_buf);
exit:
	return rc;
}

static void
fini_workers(void)
{
	void *cw_buf = NULL, *large_buf = NULL;
	struct worker_thread *worker;

	TAILQ_FOREACH(worker, &g_workers, link) {
		spdk_nvme_ctrlr_free_io_qpair(worker->qpair);
		cw_buf = worker->cw_buf;
		large_buf = worker->large_buf;
	}

	/* Issue a bunch of relatively large writes - big enough that the data will not fit
	spdk_free(large_buf);
	spdk_free(cw_buf);
}

static int
fused_ordering(void *arg)
{
	struct worker_thread *worker = (struct worker_thread *)arg;
	uint32_t i;
	uint32_t rc = 0;

	/* Issue relatively large writes - big enough that the data will not fit
	 * in-capsule - followed by the compare command. Then poll the completion queue a number of
	 * times matching the poll_count variable. This adds a variable amount of delay between
	 * the compare and the subsequent fused write submission.
@@ -66,41 +161,59 @@ fused_ordering(uint32_t poll_count)
	 * the host, that request could get sent to the target layer between the two fused commands. This
	 * variable delay would eventually induce this condition before the fix.
	 */
	for (i = 0; i < 8; i++) {
		rc = spdk_nvme_ns_cmd_write(g_ns, g_qpair, large_buf, 0, WRITE_BLOCKS, io_complete, NULL, 0);
	/* Submit 8 write commands per queue */
	for (i = 0; i < NO_WRITE_CMDS; i++) {
		rc = spdk_nvme_ns_cmd_write(g_ns, worker->qpair, worker->large_buf,
					    0,
					    WRITE_BLOCKS, io_complete,
					    worker,
					    0);
		if (rc != 0) {
			fprintf(stderr, "starting write I/O failed\n");
			exit(1);
			goto out;
		}
		g_outstanding++;

		worker->outstanding++;
	}

	rc = spdk_nvme_ns_cmd_compare(g_ns, g_qpair, cw_buf, 0, FUSED_BLOCKS, io_complete, NULL,
	/* Submit first fuse command, per queue */
	rc = spdk_nvme_ns_cmd_compare(g_ns, worker->qpair, worker->cw_buf,
				      0,
				      FUSED_BLOCKS, io_complete,
				      worker,
				      SPDK_NVME_IO_FLAGS_FUSE_FIRST);
	if (rc != 0) {
		fprintf(stderr, "starting compare I/O failed\n");
		exit(1);
		goto out;
	}
	g_outstanding++;
	while (poll_count--) {
		spdk_nvme_qpair_process_completions(g_qpair, 0);

	worker->outstanding++;

	/* Process completions */
	while (worker->poll_count-- > 0) {
		spdk_nvme_qpair_process_completions(worker->qpair, 0);
	}

	rc = spdk_nvme_ns_cmd_write(g_ns, g_qpair, cw_buf, 0, FUSED_BLOCKS, io_complete, NULL,
	/* Submit second fuse command, one per queue */
	rc = spdk_nvme_ns_cmd_write(g_ns, worker->qpair, worker->cw_buf, 0,
				    FUSED_BLOCKS, io_complete,
				    worker,
				    SPDK_NVME_IO_FLAGS_FUSE_SECOND);
	if (rc != 0) {
		fprintf(stderr, "starting write I/O failed\n");
		exit(1);
		goto out;
	}
	g_outstanding++;

	while (g_outstanding) {
		spdk_nvme_qpair_process_completions(g_qpair, 0);
	worker->outstanding++;

	/* Process completions */
	while (worker->outstanding > 0) {
		spdk_nvme_qpair_process_completions(worker->qpair, 0);
	}

	spdk_nvme_ctrlr_free_io_qpair(g_qpair);
	spdk_free(cw_buf);
	spdk_free(large_buf);
out:
	worker->status = rc;
	return rc;
}

static void
@@ -114,6 +227,7 @@ usage(const char *program_name)
	printf("\t[-L enable debug logging]\n");
#else
	printf("\t[-L enable debug logging (flag disabled, must reconfigure with --enable-debug)]\n");
	printf("\t[-c core mask]\n");
#endif
}

@@ -122,7 +236,7 @@ parse_args(int argc, char **argv, struct spdk_env_opts *env_opts)
{
	int op, rc;

	while ((op = getopt(argc, argv, "r:L:")) != -1) {
	while ((op = getopt(argc, argv, "r:L:q:c:")) != -1) {
		switch (op) {
		case 'r':
			if (spdk_nvme_transport_id_parse(&g_trid, optarg) != 0) {
@@ -141,6 +255,10 @@ parse_args(int argc, char **argv, struct spdk_env_opts *env_opts)
			spdk_log_set_print_level(SPDK_LOG_DEBUG);
#endif
			break;
		case 'c':
			env_opts->core_mask = optarg;
			break;

		default:
			usage(argv[0]);
			return 1;
@@ -157,6 +275,10 @@ main(int argc, char **argv)
	struct spdk_env_opts opts;
	struct spdk_nvme_ctrlr_opts ctrlr_opts;
	int nsid;
	const struct spdk_nvme_ctrlr_opts *ctrlr_opts_actual;
	uint32_t ctrlr_io_queues;
	uint32_t main_core;
	struct worker_thread *main_worker = NULL, *worker = NULL;

	spdk_env_opts_init(&opts);
	spdk_log_set_print_level(SPDK_LOG_NOTICE);
@@ -171,6 +293,11 @@ main(int argc, char **argv)
		return 1;
	}

	if (register_workers() != 0) {
		rc = -1;
		goto exit;
	}

	spdk_nvme_ctrlr_get_default_ctrlr_opts(&ctrlr_opts, sizeof(ctrlr_opts));
	ctrlr_opts.keep_alive_timeout_ms = 60 * 1000;
	g_ctrlr = spdk_nvme_connect(&g_trid, &ctrlr_opts, sizeof(ctrlr_opts));
@@ -192,12 +319,53 @@ main(int argc, char **argv)
	printf("  Namespace ID: %d size: %juGB\n", spdk_nvme_ns_get_id(g_ns),
	       spdk_nvme_ns_get_size(g_ns) / 1000000000);

	ctrlr_opts_actual = spdk_nvme_ctrlr_get_opts(g_ctrlr);
	ctrlr_io_queues = ctrlr_opts_actual->num_io_queues;

	/* One qpair per core */
	if (g_num_workers > ctrlr_io_queues) {
		printf("ERROR: Number of IO queues requested %d more then ctrlr caps %d.\n", g_num_workers,
		       ctrlr_io_queues);
		rc = -1;
		goto exit;
	}

	rc = init_workers();
	if (rc) {
		printf("ERROR: Workers initialization failed.\n");
		goto exit;
	}

	for (i = 0; i < 1024; i++) {
		printf("fused_ordering(%d)\n", i);
		fused_ordering(i);
		main_core = spdk_env_get_current_core();
		TAILQ_FOREACH(worker, &g_workers, link) {
			worker->poll_count = i;
			if (worker->lcore != main_core) {
				spdk_env_thread_launch_pinned(worker->lcore, fused_ordering, worker);
			} else {
				main_worker = worker;
			}
		}

		if (main_worker != NULL) {
			fused_ordering(main_worker);
		}

		spdk_env_thread_wait_all();

		TAILQ_FOREACH(worker, &g_workers, link) {
			if (spdk_unlikely(worker->status != 0)) {
				SPDK_ERRLOG("Iteration of fused ordering(%d) failed.\n", i - 1);
				rc = -1;
				goto exit;
			}
		}
	}

exit:
	fini_workers();
	unregister_workers();
	spdk_nvme_detach(g_ctrlr);
	spdk_env_fini();
	return rc;