Commit ff87d29c authored by Ben Walker's avatar Ben Walker Committed by Jim Harris
Browse files

io_channel: Add mechanism to call a function on each channel



For a given I/O device, call a user function on the correct
thread for each channel.

Change-Id: I568a443184ac834c80c5e36b80aa9b6f8bb0ac99
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.gerrithub.io/362256


Tested-by: default avatarSPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
parent a30ffb98
Loading
Loading
Loading
Loading
+17 −0
Original line number Diff line number Diff line
@@ -52,6 +52,10 @@ typedef void (*spdk_thread_pass_msg)(spdk_thread_fn fn, void *ctx,
typedef int (*spdk_io_channel_create_cb)(void *io_device, void *ctx_buf);
typedef void (*spdk_io_channel_destroy_cb)(void *io_device, void *ctx_buf);

typedef void (*spdk_channel_msg)(void *io_device, struct spdk_io_channel *ch,
				 void *ctx);
typedef void (*spdk_channel_for_each_cpl)(void *io_device, void *ctx);

/**
 * \brief Initializes the calling thread for I/O channel allocation.
 *
@@ -136,4 +140,17 @@ void spdk_put_io_channel(struct spdk_io_channel *ch);
 */
void *spdk_io_channel_get_ctx(struct spdk_io_channel *ch);

/**
 * \brief Call 'fn' on each channel associated with io_device. This happens
 * asynchronously, so fn may be called after spdk_for_each_channel returns.
 * 'fn' will be called on the correct thread for each channel. 'fn' will be
 * called for each channel serially, such that two calls to 'fn' will not
 * overlap in time.
 *
 * Once 'fn' has been called on each channel, 'cpl' will be called
 * on the thread that spdk_for_each_channel was initially called from.
 */
void spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
			   spdk_channel_for_each_cpl cpl);

#endif /* SPDK_IO_CHANNEL_H_ */
+86 −0
Original line number Diff line number Diff line
@@ -295,3 +295,89 @@ spdk_io_channel_get_ctx(struct spdk_io_channel *ch)
{
	return (uint8_t *)ch + sizeof(*ch);
}

struct call_channel {
	void *io_device;
	spdk_channel_msg fn;
	void *ctx;

	struct spdk_thread *cur_thread;
	struct spdk_io_channel *cur_ch;

	struct spdk_thread *orig_thread;
	spdk_channel_for_each_cpl cpl;
};

static void
_call_channel(void *ctx)
{
	struct call_channel *ch_ctx = ctx;
	struct spdk_thread *thread;
	struct spdk_io_channel *ch;

	thread = ch_ctx->cur_thread;
	ch = ch_ctx->cur_ch;

	ch_ctx->fn(ch_ctx->io_device, ch, ch_ctx->ctx);

	pthread_mutex_lock(&g_devlist_mutex);
	thread = TAILQ_NEXT(thread, tailq);
	while (thread) {
		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
			if (ch->io_device == ch_ctx->io_device) {
				ch_ctx->cur_thread = thread;
				ch_ctx->cur_ch = ch;
				pthread_mutex_unlock(&g_devlist_mutex);
				spdk_thread_send_msg(thread, _call_channel, ch_ctx);
				return;
			}
		}
		thread = TAILQ_NEXT(thread, tailq);
	}

	pthread_mutex_unlock(&g_devlist_mutex);

	ch_ctx->cpl(ch_ctx->io_device, ch_ctx->ctx);
	free(ch_ctx);
}

void
spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
		      spdk_channel_for_each_cpl cpl)
{
	struct spdk_thread *thread;
	struct spdk_io_channel *ch;
	struct call_channel *ch_ctx;

	ch_ctx = calloc(1, sizeof(*ch_ctx));
	if (!ch_ctx) {
		SPDK_ERRLOG("Unable to allocate context\n");
		return;
	}

	ch_ctx->io_device = io_device;
	ch_ctx->fn = fn;
	ch_ctx->ctx = ctx;
	ch_ctx->cpl = cpl;

	pthread_mutex_lock(&g_devlist_mutex);
	ch_ctx->orig_thread = _get_thread();

	TAILQ_FOREACH(thread, &g_threads, tailq) {
		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
			if (ch->io_device == io_device) {
				ch_ctx->cur_thread = thread;
				ch_ctx->cur_ch = ch;
				pthread_mutex_unlock(&g_devlist_mutex);
				spdk_thread_send_msg(thread, _call_channel, ch_ctx);
				return;
			}
		}
	}

	free(ch_ctx);

	pthread_mutex_unlock(&g_devlist_mutex);

	cpl(io_device, ctx);
}