Commit dfb7f70f authored by Ben Walker's avatar Ben Walker Committed by Jim Harris
Browse files

fio_plugin: Use new threading API



This plugin now only needs to periodically call
spdk_thread_poll() on each thread to function.

Change-Id: Ib87fe314b71b8689510ff142d4ab6acb20660283
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.gerrithub.io/419030


Chandler-Test-Pool: SPDK Automated Test System <sys_sgsw@intel.com>
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarChangpeng Liu <changpeng.liu@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: default avatarDarek Stojaczyk <dariusz.stojaczyk@intel.com>
parent 3032fe53
Loading
Loading
Loading
Loading
+7 −119
Original line number Diff line number Diff line
@@ -54,21 +54,6 @@ struct spdk_fio_options {
	bool mem_single_seg;
};

/* Used to pass messages between fio threads */
struct spdk_fio_msg {
	spdk_thread_fn	cb_fn;
	void		*cb_arg;
};

/* A polling function */
struct spdk_fio_poller {
	spdk_poller_fn		cb_fn;
	void			*cb_arg;
	uint64_t		period_microseconds;

	TAILQ_ENTRY(spdk_fio_poller)	link;
};

struct spdk_fio_request {
	struct io_u		*io;
	struct thread_data	*td;
@@ -85,9 +70,7 @@ struct spdk_fio_target {
struct spdk_fio_thread {
	struct thread_data		*td; /* fio thread context */
	struct spdk_thread		*thread; /* spdk thread context */
	struct spdk_ring		*ring; /* ring for passing messages to this thread */
	uint64_t			timeout; /* polling timeout */
	TAILQ_HEAD(, spdk_fio_poller)	pollers; /* list of registered pollers on this thread */

	TAILQ_HEAD(, spdk_fio_target)	targets;

@@ -105,78 +88,12 @@ static size_t spdk_fio_poll_thread(struct spdk_fio_thread *fio_thread);
/* Default polling timeout (us) */
#define SPDK_FIO_POLLING_TIMEOUT 1000000UL

static void
spdk_fio_send_msg(spdk_thread_fn fn, void *ctx, void *thread_ctx)
{
	struct spdk_fio_thread *thread = thread_ctx;
	struct spdk_fio_msg *msg;
	size_t count;

	msg = calloc(1, sizeof(*msg));
	assert(msg != NULL);

	msg->cb_fn = fn;
	msg->cb_arg = ctx;

	count = spdk_ring_enqueue(thread->ring, (void **)&msg, 1);
	if (count != 1) {
		SPDK_ERRLOG("Unable to send message to thread %p. rc: %lu\n", thread, count);
	}
}

static void
spdk_fio_bdev_init_done(void *cb_arg, int rc)
{
	*(bool *)cb_arg = true;
}

static struct spdk_poller *
spdk_fio_start_poller(void *thread_ctx,
		      spdk_poller_fn fn,
		      void *arg,
		      uint64_t period_microseconds)
{
	struct spdk_fio_thread *fio_thread = thread_ctx;
	struct spdk_fio_poller *fio_poller;

	fio_poller = calloc(1, sizeof(*fio_poller));
	if (!fio_poller) {
		SPDK_ERRLOG("Unable to allocate poller\n");
		return NULL;
	}

	fio_poller->cb_fn = fn;
	fio_poller->cb_arg = arg;
	fio_poller->period_microseconds = period_microseconds;
	fio_thread->timeout = spdk_min(fio_thread->timeout, period_microseconds);

	TAILQ_INSERT_TAIL(&fio_thread->pollers, fio_poller, link);

	return (struct spdk_poller *)fio_poller;
}

static void
spdk_fio_stop_poller(struct spdk_poller *poller, void *thread_ctx)
{
	struct spdk_fio_poller *fio_poller;
	struct spdk_fio_thread *fio_thread = thread_ctx;
	uint64_t timeout = SPDK_FIO_POLLING_TIMEOUT;

	fio_poller = (struct spdk_fio_poller *)poller;

	TAILQ_REMOVE(&fio_thread->pollers, fio_poller, link);

	if (fio_thread->timeout == fio_poller->period_microseconds) {
		TAILQ_FOREACH(fio_poller, &fio_thread->pollers, link) {
			timeout = spdk_min(timeout, fio_poller->period_microseconds);
		}

		fio_thread->timeout = timeout;
	}

	free(fio_poller);
}

static int
spdk_fio_init_thread(struct thread_data *td)
{
@@ -191,27 +108,13 @@ spdk_fio_init_thread(struct thread_data *td)
	fio_thread->td = td;
	td->io_ops_data = fio_thread;

	fio_thread->ring = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 4096, SPDK_ENV_SOCKET_ID_ANY);
	if (!fio_thread->ring) {
		SPDK_ERRLOG("failed to allocate ring\n");
		free(fio_thread);
		return -1;
	}

	fio_thread->thread = spdk_allocate_thread(spdk_fio_send_msg,
			     spdk_fio_start_poller,
			     spdk_fio_stop_poller,
			     fio_thread,
			     "fio_thread");
	fio_thread->thread = spdk_allocate_thread(NULL, NULL, NULL, NULL, "fio_thread");
	if (!fio_thread->thread) {
		spdk_ring_free(fio_thread->ring);
		free(fio_thread);
		SPDK_ERRLOG("failed to allocate thread\n");
		return -1;
	}

	TAILQ_INIT(&fio_thread->pollers);

	fio_thread->iocq_size = td->o.iodepth;
	fio_thread->iocq = calloc(fio_thread->iocq_size, sizeof(struct io_u *));
	assert(fio_thread->iocq != NULL);
@@ -238,7 +141,6 @@ spdk_fio_cleanup_thread(struct spdk_fio_thread *fio_thread)
	while (spdk_fio_poll_thread(fio_thread) > 0) {}

	spdk_free_thread();
	spdk_ring_free(fio_thread->ring);
	free(fio_thread->iocq);
	free(fio_thread);
}
@@ -327,6 +229,8 @@ spdk_init_thread_poll(void *arg)
	}
	spdk_unaffinitize_thread();

	spdk_thread_lib_init();

	/* Create an SPDK thread temporarily */
	rc = spdk_fio_init_thread(&td);
	if (rc < 0) {
@@ -352,9 +256,7 @@ spdk_init_thread_poll(void *arg)
	 * Continue polling until there are no more events.
	 * This handles any final events posted by pollers.
	 */
	do {
		count = spdk_fio_poll_thread(fio_thread);
	} while (count > 0);
	while (spdk_fio_poll_thread(fio_thread) > 0) {};

	/* Set condition variable */
	pthread_mutex_lock(&g_init_mtx);
@@ -689,23 +591,7 @@ spdk_fio_event(struct thread_data *td, int event)
static size_t
spdk_fio_poll_thread(struct spdk_fio_thread *fio_thread)
{
	struct spdk_fio_msg *msg;
	struct spdk_fio_poller *p, *tmp;
	size_t count;

	/* Process new events */
	count = spdk_ring_dequeue(fio_thread->ring, (void **)&msg, 1);
	if (count > 0) {
		msg->cb_fn(msg->cb_arg);
		free(msg);
	}

	/* Call all pollers */
	TAILQ_FOREACH_SAFE(p, &fio_thread->pollers, link, tmp) {
		p->cb_fn(p->cb_arg);
	}

	return count;
	return spdk_thread_poll(fio_thread->thread, 0);
}

static int
@@ -825,6 +711,8 @@ spdk_fio_finish_env(void)
	pthread_cond_signal(&g_init_cond);
	pthread_mutex_unlock(&g_init_mtx);
	pthread_join(g_init_thread_id, NULL);

	spdk_thread_lib_fini();
}

static void fio_exit spdk_fio_unregister(void)