Commit 70db0e1c authored by Ben Walker's avatar Ben Walker Committed by Daniel Verkamp
Browse files

nvme/perf: Add support for multiple cores per device.



Intelligently allocate cores and devices to handle
the following cases:

1) Equal cores and devices
2) More cores than devices by using multiple cores per device
3) More devices than cores by using multiple devices from a single core

Change-Id: I3703f5c523268539bd00d399fe104c474a8e8c99
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
parent afed5ba9
Loading
Loading
Loading
Loading
+149 −111
Original line number Diff line number Diff line
@@ -58,6 +58,7 @@
struct ctrlr_entry {
	struct nvme_controller	*ctrlr;
	struct ctrlr_entry	*next;
	char			name[1024];
};

enum entry_type {
@@ -76,24 +77,32 @@ struct ns_entry {
#if HAVE_LIBAIO
		struct {
			int			fd;
			io_context_t		ctx;
			struct io_event		*events;
		} aio;
#endif
	} u;

	struct ns_entry		*next;
	uint32_t		io_size_blocks;
	int			io_completed;
	int			current_queue_depth;
	uint64_t		size_in_ios;
	char			name[1024];
};

struct ns_worker_ctx {
	struct ns_entry		*entry;
	uint64_t		io_completed;
	uint64_t		current_queue_depth;
	uint64_t		offset_in_ios;
	bool			is_draining;
	char			name[1024];

#if HAVE_LIBAIO
	struct io_event		*events;
	io_context_t		ctx;
#endif
	struct ns_worker_ctx	*next;
};

struct perf_task {
	struct ns_entry		*entry;
	struct ns_worker_ctx	*ns_ctx;
	void			*buf;
#if HAVE_LIBAIO
	struct iocb		iocb;
@@ -101,7 +110,7 @@ struct perf_task {
};

struct worker_thread {
	struct ns_entry 	*namespaces;
	struct ns_worker_ctx 	*ns_ctx;
	struct worker_thread	*next;
	unsigned		lcore;
};
@@ -110,8 +119,10 @@ struct rte_mempool *request_mempool;
static struct rte_mempool *task_pool;

static struct ctrlr_entry *g_controllers = NULL;
static struct ns_entry *g_namespaces = NULL;
static int g_num_namespaces = 0;
static struct worker_thread *g_workers = NULL;
static struct worker_thread *g_current_worker = NULL;
static int g_num_workers = 0;

static uint64_t g_tsc_rate;

@@ -131,33 +142,21 @@ task_complete(struct perf_task *task);
static void
register_ns(struct nvme_controller *ctrlr, struct pci_device *pci_dev, struct nvme_namespace *ns)
{
	struct worker_thread *worker;
	struct ns_entry *entry = malloc(sizeof(struct ns_entry));
	const struct nvme_controller_data *cdata = nvme_ctrlr_get_data(ctrlr);

	worker = g_current_worker;

	entry->type = ENTRY_TYPE_NVME_NS;
	entry->u.nvme.ctrlr = ctrlr;
	entry->u.nvme.ns = ns;
	entry->next = worker->namespaces;
	entry->io_completed = 0;
	entry->current_queue_depth = 0;
	entry->offset_in_ios = 0;
	entry->size_in_ios = nvme_ns_get_size(ns) /
			     g_io_size_bytes;
	entry->io_size_blocks = g_io_size_bytes / nvme_ns_get_sector_size(ns);
	entry->is_draining = false;

	snprintf(entry->name, sizeof(cdata->mn), "%s", cdata->mn);
	printf("Assigning namespace %s to lcore %u\n", entry->name, worker->lcore);
	worker->namespaces = entry;
	snprintf(entry->name, 44, "%-20.20s (%-20.20s)", cdata->mn, cdata->sn);

	if (worker->next == NULL) {
		g_current_worker = g_workers;
	} else {
		g_current_worker = worker->next;
	}
	g_num_namespaces++;
	entry->next = g_namespaces;
	g_namespaces = entry;
}

static void
@@ -181,7 +180,6 @@ register_ctrlr(struct nvme_controller *ctrlr, struct pci_device *pci_dev)
static int
register_aio_file(const char *path)
{
	struct worker_thread *worker;
	struct ns_entry *entry;

	int flags, fd;
@@ -216,36 +214,18 @@ register_aio_file(const char *path)
		return -1;
	}

	worker = g_current_worker;

	entry = malloc(sizeof(struct ns_entry));

	entry->type = ENTRY_TYPE_AIO_FILE;
	entry->u.aio.fd = fd;
	entry->u.aio.ctx = 0;
	if (io_setup(g_queue_depth, &entry->u.aio.ctx) < 0) {
		perror("io_setup");
		return -1;
	}
	entry->u.aio.events = calloc(g_queue_depth, sizeof(struct io_event));
	entry->next = worker->namespaces;
	entry->io_completed = 0;
	entry->current_queue_depth = 0;
	entry->offset_in_ios = 0;
	entry->size_in_ios = size / g_io_size_bytes;
	entry->io_size_blocks = g_io_size_bytes / blklen;
	entry->is_draining = false;

	snprintf(entry->name, sizeof(entry->name), "%s", path);

	printf("Assigning AIO device %s to lcore %u\n", entry->name, worker->lcore);
	worker->namespaces = entry;

	if (worker->next == NULL) {
		g_current_worker = g_workers;
	} else {
		g_current_worker = worker->next;
	}
	g_num_namespaces++;
	entry->next = g_namespaces;
	g_namespaces = entry;

	return 0;
}
@@ -271,7 +251,7 @@ aio_submit(io_context_t aio_ctx, struct iocb *iocb, int fd, enum io_iocb_cmd cmd
}

static void
aio_check_io(struct ns_entry *entry)
aio_check_io(struct ns_worker_ctx *ns_ctx)
{
	int count, i;
	struct timespec timeout;
@@ -279,14 +259,14 @@ aio_check_io(struct ns_entry *entry)
	timeout.tv_sec = 0;
	timeout.tv_nsec = 0;

	count = io_getevents(entry->u.aio.ctx, 1, g_queue_depth, entry->u.aio.events, &timeout);
	count = io_getevents(ns_ctx->ctx, 1, g_queue_depth, ns_ctx->events, &timeout);
	if (count < 0) {
		fprintf(stderr, "io_getevents error\n");
		exit(1);
	}

	for (i = 0; i < count; i++) {
		task_complete(entry->u.aio.events[i].data);
		task_complete(ns_ctx->events[i].data);
	}
}
#endif /* HAVE_LIBAIO */
@@ -302,22 +282,23 @@ static void io_complete(void *ctx, const struct nvme_completion *completion);
static __thread unsigned int seed = 0;

static void
submit_single_io(struct ns_entry *entry)
submit_single_io(struct ns_worker_ctx *ns_ctx)
{
	struct perf_task	*task = NULL;
	uint64_t		offset_in_ios;
	int			rc;
	struct ns_entry		*entry = ns_ctx->entry;

	rte_mempool_get(task_pool, (void **)&task);

	task->entry = entry;
	task->ns_ctx = ns_ctx;

	if (g_is_random) {
		offset_in_ios = rand_r(&seed) % entry->size_in_ios;
	} else {
		offset_in_ios = entry->offset_in_ios++;
		if (entry->offset_in_ios == entry->size_in_ios) {
			entry->offset_in_ios = 0;
		offset_in_ios = ns_ctx->offset_in_ios++;
		if (ns_ctx->offset_in_ios == entry->size_in_ios) {
			ns_ctx->offset_in_ios = 0;
		}
	}

@@ -325,7 +306,7 @@ submit_single_io(struct ns_entry *entry)
	    (g_rw_percentage != 0 && ((rand_r(&seed) % 100) < g_rw_percentage))) {
#if HAVE_LIBAIO
		if (entry->type == ENTRY_TYPE_AIO_FILE) {
			rc = aio_submit(entry->u.aio.ctx, &task->iocb, entry->u.aio.fd, IO_CMD_PREAD, task->buf,
			rc = aio_submit(ns_ctx->ctx, &task->iocb, entry->u.aio.fd, IO_CMD_PREAD, task->buf,
					g_io_size_bytes, offset_in_ios * g_io_size_bytes, task);
		} else
#endif
@@ -336,7 +317,7 @@ submit_single_io(struct ns_entry *entry)
	} else {
#if HAVE_LIBAIO
		if (entry->type == ENTRY_TYPE_AIO_FILE) {
			rc = aio_submit(entry->u.aio.ctx, &task->iocb, entry->u.aio.fd, IO_CMD_PWRITE, task->buf,
			rc = aio_submit(ns_ctx->ctx, &task->iocb, entry->u.aio.fd, IO_CMD_PWRITE, task->buf,
					g_io_size_bytes, offset_in_ios * g_io_size_bytes, task);
		} else
#endif
@@ -350,17 +331,17 @@ submit_single_io(struct ns_entry *entry)
		fprintf(stderr, "starting I/O failed\n");
	}

	entry->current_queue_depth++;
	ns_ctx->current_queue_depth++;
}

static void
task_complete(struct perf_task *task)
{
	struct ns_entry		*entry;
	struct ns_worker_ctx	*ns_ctx;

	entry = task->entry;
	entry->current_queue_depth--;
	entry->io_completed++;
	ns_ctx = task->ns_ctx;
	ns_ctx->current_queue_depth--;
	ns_ctx->io_completed++;

	rte_mempool_put(task_pool, task);

@@ -370,8 +351,8 @@ task_complete(struct perf_task *task)
	 * to complete.  In this case, do not submit a new I/O to replace
	 * the one just completed.
	 */
	if (!entry->is_draining) {
		submit_single_io(entry);
	if (!ns_ctx->is_draining) {
		submit_single_io(ns_ctx);
	}
}

@@ -382,32 +363,32 @@ io_complete(void *ctx, const struct nvme_completion *completion)
}

static void
check_io(struct ns_entry *entry)
check_io(struct ns_worker_ctx *ns_ctx)
{
#if HAVE_LIBAIO
	if (entry->type == ENTRY_TYPE_AIO_FILE) {
		aio_check_io(entry);
	if (ns_ctx->entry->type == ENTRY_TYPE_AIO_FILE) {
		aio_check_io(ns_ctx);
	} else
#endif
	{
		nvme_ctrlr_process_io_completions(entry->u.nvme.ctrlr);
		nvme_ctrlr_process_io_completions(ns_ctx->entry->u.nvme.ctrlr);
	}
}

static void
submit_io(struct ns_entry *entry, int queue_depth)
submit_io(struct ns_worker_ctx *ns_ctx, int queue_depth)
{
	while (queue_depth-- > 0) {
		submit_single_io(entry);
		submit_single_io(ns_ctx);
	}
}

static void
drain_io(struct ns_entry *entry)
drain_io(struct ns_worker_ctx *ns_ctx)
{
	entry->is_draining = true;
	while (entry->current_queue_depth > 0) {
		check_io(entry);
	ns_ctx->is_draining = true;
	while (ns_ctx->current_queue_depth > 0) {
		check_io(ns_ctx);
	}
}

@@ -416,18 +397,17 @@ work_fn(void *arg)
{
	uint64_t tsc_end = rte_get_timer_cycles() + g_time_in_sec * g_tsc_rate;
	struct worker_thread *worker = (struct worker_thread *)arg;
	struct ns_entry *entry = NULL;
	struct ns_worker_ctx *ns_ctx = NULL;

	printf("Starting thread on core %u\n", worker->lcore);

	nvme_register_io_thread();

	/* Submit initial I/O for each namespace. */
	entry = worker->namespaces;
	while (entry != NULL) {

		submit_io(entry, g_queue_depth);
		entry = entry->next;
	ns_ctx = worker->ns_ctx;
	while (ns_ctx != NULL) {
		submit_io(ns_ctx, g_queue_depth);
		ns_ctx = ns_ctx->next;
	}

	while (1) {
@@ -436,10 +416,10 @@ work_fn(void *arg)
		 * I/O will be submitted in the io_complete callback
		 * to replace each I/O that is completed.
		 */
		entry = worker->namespaces;
		while (entry != NULL) {
			check_io(entry);
			entry = entry->next;
		ns_ctx = worker->ns_ctx;
		while (ns_ctx != NULL) {
			check_io(ns_ctx);
			ns_ctx = ns_ctx->next;
		}

		if (rte_get_timer_cycles() > tsc_end) {
@@ -447,10 +427,10 @@ work_fn(void *arg)
		}
	}

	entry = worker->namespaces;
	while (entry != NULL) {
		drain_io(entry);
		entry = entry->next;
	ns_ctx = worker->ns_ctx;
	while (ns_ctx != NULL) {
		drain_io(ns_ctx);
		ns_ctx = ns_ctx->next;
	}

	nvme_unregister_io_thread();
@@ -481,29 +461,28 @@ print_stats(void)
	float io_per_second, mb_per_second;
	float total_io_per_second, total_mb_per_second;
	struct worker_thread	*worker;
	struct ns_worker_ctx	*ns_ctx;

	total_io_per_second = 0;
	total_mb_per_second = 0;

	worker = g_workers;
	while (worker != NULL) {
		struct ns_entry *entry = worker->namespaces;
		while (entry != NULL) {
			io_per_second = (float)entry->io_completed /
					g_time_in_sec;
			mb_per_second = io_per_second * g_io_size_bytes /
					(1024 * 1024);
			printf("%-.20s: %10.2f IO/s %10.2f MB/s on lcore %u\n",
			       entry->name, io_per_second,
			       mb_per_second, worker->lcore);
	while (worker) {
		ns_ctx = worker->ns_ctx;
		while (ns_ctx) {
			io_per_second = (float)ns_ctx->io_completed / g_time_in_sec;
			mb_per_second = io_per_second * g_io_size_bytes / (1024 * 1024);
			printf("%-43.43s from core %u: %10.2f IO/s %10.2f MB/s\n",
			       ns_ctx->entry->name, worker->lcore,
			       io_per_second, mb_per_second);
			total_io_per_second += io_per_second;
			total_mb_per_second += mb_per_second;
			entry = entry->next;
			ns_ctx = ns_ctx->next;
		}
		worker = worker->next;
	}
	printf("=====================================================\n");
	printf("%-20s: %10.2f IO/s %10.2f MB/s\n",
	printf("========================================================\n");
	printf("%-55s: %10.2f IO/s %10.2f MB/s\n",
	       "Total", total_io_per_second, total_mb_per_second);
}

@@ -632,7 +611,8 @@ register_workers(void)
	memset(worker, 0, sizeof(struct worker_thread));
	worker->lcore = rte_get_master_lcore();

	g_workers = g_current_worker = worker;
	g_workers = worker;
	g_num_workers = 1;

	RTE_LCORE_FOREACH_SLAVE(lcore) {
		prev_worker = worker;
@@ -640,6 +620,7 @@ register_workers(void)
		memset(worker, 0, sizeof(struct worker_thread));
		worker->lcore = lcore;
		prev_worker->next = worker;
		g_num_workers++;
	}

	return 0;
@@ -700,6 +681,7 @@ static void
unregister_controllers(void)
{
	struct ctrlr_entry *entry = g_controllers;

	while (entry) {
		struct ctrlr_entry *next = entry->next;
		nvme_detach(entry->ctrlr);
@@ -725,6 +707,54 @@ register_aio_files(int argc, char **argv)
	return 0;
}

static int
associate_workers_with_ns(void)
{
	struct ns_entry		*entry = g_namespaces;
	struct worker_thread	*worker = g_workers;
	struct ns_worker_ctx	*ns_ctx;
	int			i, count;

	count = g_num_namespaces > g_num_workers ? g_num_namespaces : g_num_workers;

	for (i = 0; i < count; i++) {
		ns_ctx = malloc(sizeof(struct ns_worker_ctx));
		if (!ns_ctx) {
			return -1;
		}
		memset(ns_ctx, 0, sizeof(*ns_ctx));
#ifdef HAVE_LIBAIO
		ns_ctx->events = calloc(g_queue_depth, sizeof(struct io_event));
		if (!ns_ctx->events) {
			return -1;
		}
		ns_ctx->ctx = 0;
		if (io_setup(g_queue_depth, &ns_ctx->ctx) < 0) {
			perror("io_setup");
			return -1;
		}
#endif

		printf("Associating %s with lcore %d\n", entry->name, worker->lcore);
		ns_ctx->entry = entry;
		ns_ctx->next = worker->ns_ctx;
		worker->ns_ctx = ns_ctx;

		worker = worker->next;
		if (worker == NULL) {
			worker = g_workers;
		}

		entry = entry->next;
		if (entry == NULL) {
			entry = g_namespaces;
		}

	}

	return 0;
}

static char *ealargs[] = {
	"perf",
	"-c 0x1", /* This must be the second parameter. It is overwritten by index in main(). */
@@ -769,18 +799,28 @@ int main(int argc, char **argv)

	g_tsc_rate = rte_get_timer_hz();

	register_workers();
	if (register_workers() != 0) {
		return 1;
	}

	if (register_aio_files(argc, argv) != 0) {
		return 1;
	}
	register_controllers();

	if (register_controllers() != 0) {
		return 1;
	}

	if (associate_workers_with_ns() != 0) {
		return 1;
	}

	printf("Initialization complete. Launching workers.\n");

	/* Launch all of the slave workers */
	worker = g_workers->next;
	while (worker != NULL) {
		if (worker->namespaces != NULL) {
		rte_eal_remote_launch(work_fn, worker, worker->lcore);
		}
		worker = worker->next;
	}

@@ -788,11 +828,9 @@ int main(int argc, char **argv)

	worker = g_workers->next;
	while (worker != NULL) {
		if (worker->namespaces != NULL) {
		if (rte_eal_wait_lcore(worker->lcore) < 0) {
			return -1;
		}
		}
		worker = worker->next;
	}