Commit 814a2dd9 authored by Ben Walker's avatar Ben Walker Committed by Jim Harris
Browse files

fio_plugin: Perform initialization and teardown on consistent thread



Change-Id: I2798f2b0c5a7fe210f03b4e1477fd04f480febb3
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.gerrithub.io/431843


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Chandler-Test-Pool: SPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
parent 68b9d583
Loading
Loading
Loading
Loading
+83 −78
Original line number Diff line number Diff line
@@ -94,8 +94,6 @@ struct spdk_fio_thread {
	unsigned int		iocq_size;	/* number of iocq entries allocated */
};

static struct spdk_fio_thread *g_init_thread = NULL;
static pthread_t g_init_thread_id = 0;
static bool g_spdk_env_initialized = false;

static int spdk_fio_init(struct thread_data *td);
@@ -234,49 +232,29 @@ spdk_fio_module_finish_done(void *cb_arg)
	*(bool *)cb_arg = true;
}

static pthread_t g_init_thread_id = 0;
static pthread_mutex_t g_init_mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t g_init_cond = PTHREAD_COND_INITIALIZER;

static void *
spdk_init_thread_poll(void *arg)
{
	struct spdk_fio_thread *thread = arg;
	int oldstate;
	int rc;

	/* Loop until the thread is cancelled */
	while (true) {
		rc = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate);
		if (rc != 0) {
			SPDK_ERRLOG("Unable to set cancel state disabled on g_init_thread (%d): %s\n",
				    rc, spdk_strerror(rc));
		}

		spdk_fio_poll_thread(thread);

		rc = pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
		if (rc != 0) {
			SPDK_ERRLOG("Unable to set cancel state enabled on g_init_thread (%d): %s\n",
				    rc, spdk_strerror(rc));
		}

		/* This is a pthread cancellation point and cannot be removed. */
		sleep(1);
	}

	return NULL;
}

static int
spdk_fio_init_env(struct thread_data *td)
{
	struct spdk_fio_options		*eo = arg;
	struct spdk_fio_thread		*fio_thread;
	struct spdk_fio_options		*eo;
	bool				done = false;
	int				rc;
	struct spdk_conf		*config;
	struct spdk_env_opts		opts;
	bool				done;
	int				rc;
	size_t				count;
	struct timespec			ts;
	struct thread_data		td = {};

	/* Create a dummy thread data for use on the initialization thread. */
	td.o.iodepth = 32;
	td.eo = eo;

	/* Parse the SPDK configuration file */
	eo = td->eo;
	eo = arg;
	if (!eo->conf || !strlen(eo->conf)) {
		SPDK_ERRLOG("No configuration file provided\n");
		rc = EINVAL;
@@ -322,18 +300,19 @@ spdk_fio_init_env(struct thread_data *td)
	spdk_unaffinitize_thread();

	/* Create an SPDK thread temporarily */
	rc = spdk_fio_init_thread(td);
	rc = spdk_fio_init_thread(&td);
	if (rc < 0) {
		SPDK_ERRLOG("Failed to create initialization thread\n");
		goto err_exit;
	}

	g_init_thread = fio_thread = td->io_ops_data;
	fio_thread = td.io_ops_data;

	/* Initialize the copy engine */
	spdk_copy_engine_initialize();

	/* Initialize the bdev layer */
	done = false;
	spdk_bdev_initialize(spdk_fio_bdev_init_done, &done);

	/* First, poll until initialization is done. */
@@ -349,21 +328,76 @@ spdk_fio_init_env(struct thread_data *td)
		count = spdk_fio_poll_thread(fio_thread);
	} while (count > 0);

	/* Set condition variable */
	pthread_mutex_lock(&g_init_mtx);
	pthread_cond_signal(&g_init_cond);

	while (true) {
		spdk_fio_poll_thread(fio_thread);

		clock_gettime(CLOCK_REALTIME, &ts);
		ts.tv_sec += 1;
		rc = pthread_cond_timedwait(&g_init_cond, &g_init_mtx, &ts);

		if (rc != ETIMEDOUT) {
			break;
		}
	}

	pthread_mutex_unlock(&g_init_mtx);

	done = false;
	spdk_bdev_finish(spdk_fio_module_finish_done, &done);

	do {
		spdk_fio_poll_thread(fio_thread);
	} while (!done);

	do {
		count = spdk_fio_poll_thread(fio_thread);
	} while (count > 0);

	done = false;
	spdk_copy_engine_finish(spdk_fio_module_finish_done, &done);

	do {
		spdk_fio_poll_thread(fio_thread);
	} while (!done);

	do {
		count = spdk_fio_poll_thread(fio_thread);
	} while (count > 0);

	spdk_fio_cleanup_thread(fio_thread);

	pthread_exit(NULL);

err_exit:
	exit(rc);
	return NULL;
}

static int
spdk_fio_init_env(struct thread_data *td)
{
	int rc;

	/*
	 * Spawn a thread to continue polling this thread
	 * occasionally.
	 * Spawn a thread to handle initialization operations and to poll things
	 * like the admin queues periodically.
	 */

	rc = pthread_create(&g_init_thread_id, NULL, &spdk_init_thread_poll, fio_thread);
	rc = pthread_create(&g_init_thread_id, NULL, &spdk_init_thread_poll, td->eo);
	if (rc != 0) {
		SPDK_ERRLOG("Unable to spawn thread to poll admin queue. It won't be polled.\n");
	}

	return 0;
	/* Wait for background thread to advance past the initialization */
	pthread_mutex_lock(&g_init_mtx);
	pthread_cond_wait(&g_init_cond, &g_init_mtx);
	pthread_mutex_unlock(&g_init_mtx);

err_exit:
	exit(rc);
	return -1;
	return 0;
}

/* Called for each thread to fill in the 'real_file_size' member for
@@ -742,41 +776,12 @@ static void fio_init spdk_fio_register(void)
static void
spdk_fio_finish_env(void)
{
	struct spdk_fio_thread		*fio_thread;
	bool				done = false;
	size_t				count;

	/* the same thread that called spdk_fio_init_env */
	fio_thread = g_init_thread;

	if (pthread_cancel(g_init_thread_id) == 0) {
	pthread_mutex_lock(&g_init_mtx);
	pthread_cond_signal(&g_init_cond);
	pthread_mutex_unlock(&g_init_mtx);
	pthread_join(g_init_thread_id, NULL);
}

	spdk_bdev_finish(spdk_fio_module_finish_done, &done);

	do {
		spdk_fio_poll_thread(fio_thread);
	} while (!done);

	do {
		count = spdk_fio_poll_thread(fio_thread);
	} while (count > 0);

	done = false;
	spdk_copy_engine_finish(spdk_fio_module_finish_done, &done);

	do {
		spdk_fio_poll_thread(fio_thread);
	} while (!done);

	do {
		count = spdk_fio_poll_thread(fio_thread);
	} while (count > 0);

	spdk_fio_cleanup_thread(fio_thread);
}

static void fio_exit spdk_fio_unregister(void)
{
	if (g_spdk_env_initialized) {