Commit eff5b149 authored by Liu Xiaodong's avatar Liu Xiaodong Committed by Tomasz Zawadzki
Browse files

reactor: add cpuset to indicate intr mode



Each bit of the cpuset indicates whether a reactor
is going to be in interrupt mode.
Each spdk_cpuset is allocated to each reactor. So it
can only be touched by its reactor.

Change-Id: Ic186de341588b701d7471bf09336309d28b1bf4e
Signed-off-by: default avatarLiu Xiaodong <xiaodong.liu@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/5850


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarChangpeng Liu <changpeng.liu@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarPaul Luse <paul.e.luse@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: default avatarTomasz Zawadzki <tomasz.zawadzki@intel.com>
parent 8689f6a3
Loading
Loading
Loading
Loading
+4 −1
Original line number Diff line number Diff line
@@ -97,7 +97,10 @@ struct spdk_reactor {
	uint64_t					busy_tsc;
	uint64_t					idle_tsc;

	bool						interrupt_mode;
	/* Each bit of cpuset indicates whether a reactor probably requires event notification */
	struct spdk_cpuset				notify_cpuset;
	/* Indicate whether this reactor currently runs in interrupt */
	bool						in_interrupt;
	struct spdk_fd_group				*fgrp;
	int						resched_fd;
} __attribute__((aligned(SPDK_CACHE_LINE_SIZE)));
+59 −17
Original line number Diff line number Diff line
@@ -172,6 +172,7 @@ reactor_construct(struct spdk_reactor *reactor, uint32_t lcore)

	TAILQ_INIT(&reactor->threads);
	reactor->thread_count = 0;
	spdk_cpuset_zero(&reactor->notify_cpuset);

	reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
	if (reactor->events == NULL) {
@@ -179,8 +180,26 @@ reactor_construct(struct spdk_reactor *reactor, uint32_t lcore)
		assert(false);
	}

	/* Always initialize interrupt facilities for reactor */
	if (reactor_interrupt_init(reactor) != 0) {
		/* Reactor interrupt facilities are necessary if seting app to interrupt mode. */
		if (spdk_interrupt_mode_is_enabled()) {
		reactor_interrupt_init(reactor);
			SPDK_ERRLOG("Failed to prepare intr facilities\n");
			assert(false);
		}
		return;
	}

	/* If application runs with full interrupt ability,
	 * all reactors are going to run in interrupt mode.
	 */
	if (spdk_interrupt_mode_is_enabled()) {
		uint32_t i;

		SPDK_ENV_FOREACH_CORE(i) {
			spdk_cpuset_set_cpu(&reactor->notify_cpuset, i, true);
		}
		reactor->in_interrupt = true;
	}
}

@@ -298,9 +317,7 @@ spdk_reactors_fini(void)
			spdk_ring_free(reactor->events);
		}

		if (reactor->interrupt_mode) {
		reactor_interrupt_fini(reactor);
		}

		if (g_core_infos != NULL) {
			free(g_core_infos[i].threads);
@@ -345,6 +362,8 @@ spdk_event_call(struct spdk_event *event)
{
	int rc;
	struct spdk_reactor *reactor;
	struct spdk_reactor *local_reactor = NULL;
	uint32_t current_core = spdk_env_get_current_core();

	reactor = spdk_reactor_get(event->lcore);

@@ -356,7 +375,16 @@ spdk_event_call(struct spdk_event *event)
		assert(false);
	}

	if (reactor->interrupt_mode) {
	if (current_core != SPDK_ENV_LCORE_ID_ANY) {
		local_reactor = spdk_reactor_get(current_core);
	}

	/* If spdk_event_call isn't called on a reactor, always send a notification.
	 * If it is called on a reactor, send a notification if the destination reactor
	 * is indicated in interrupt mode state.
	 */
	if (spdk_unlikely(local_reactor == NULL) ||
	    spdk_unlikely(spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, event->lcore))) {
		uint64_t notify = 1;

		rc = write(reactor->events_fd, &notify, sizeof(notify));
@@ -383,7 +411,8 @@ event_queue_run_batch(struct spdk_reactor *reactor)
	memset(events, 0, sizeof(events));
#endif

	if (reactor->interrupt_mode) {
	/* Operate event notification if this reactor currently runs in interrupt state */
	if (spdk_unlikely(reactor->in_interrupt)) {
		uint64_t notify = 1;
		int rc;

@@ -638,7 +667,8 @@ reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thre
		assert(reactor->thread_count > 0);
		reactor->thread_count--;

		if (reactor->interrupt_mode) {
		/* Operate thread intr if running with full interrupt ability */
		if (spdk_interrupt_mode_is_enabled()) {
			efd = spdk_thread_get_interrupt_fd(thread);
			spdk_fd_group_remove(reactor->fgrp, efd);
		}
@@ -653,7 +683,8 @@ reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thre
			assert(reactor->thread_count > 0);
			reactor->thread_count--;

			if (reactor->interrupt_mode) {
			/* Operate thread intr if running with full interrupt ability */
			if (spdk_interrupt_mode_is_enabled()) {
				efd = spdk_thread_get_interrupt_fd(thread);
				spdk_fd_group_remove(reactor->fgrp, efd);
			}
@@ -728,7 +759,8 @@ reactor_run(void *arg)
	reactor->tsc_last = spdk_get_ticks();

	while (1) {
		if (spdk_unlikely(reactor->interrupt_mode)) {
		/* Execute interrupt process fn if this reactor currently runs in interrupt state */
		if (spdk_unlikely(reactor->in_interrupt)) {
			reactor_interrupt_run(reactor);
		} else {
			_reactor_run(reactor);
@@ -769,7 +801,9 @@ reactor_run(void *arg)
				TAILQ_REMOVE(&reactor->threads, lw_thread, link);
				assert(reactor->thread_count > 0);
				reactor->thread_count--;
				if (reactor->interrupt_mode) {

				/* Operate thread intr if running with full interrupt ability */
				if (spdk_interrupt_mode_is_enabled()) {
					int efd = spdk_thread_get_interrupt_fd(thread);

					spdk_fd_group_remove(reactor->fgrp, efd);
@@ -851,12 +885,18 @@ spdk_reactors_stop(void *arg1)
	uint32_t i;
	int rc;
	struct spdk_reactor *reactor;
	struct spdk_reactor *local_reactor;
	uint64_t notify = 1;

	g_reactor_state = SPDK_REACTOR_STATE_EXITING;
	local_reactor = spdk_reactor_get(spdk_env_get_current_core());

	if (spdk_interrupt_mode_is_enabled()) {
	SPDK_ENV_FOREACH_CORE(i) {
		/* If spdk_event_call isn't called  on a reactor, always send a notification.
		 * If it is called on a reactor, send a notification if the destination reactor
		 * is indicated in interrupt mode state.
		 */
		if (local_reactor == NULL || spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, i)) {
			reactor = spdk_reactor_get(i);
			assert(reactor != NULL);
			rc = write(reactor->events_fd, &notify, sizeof(notify));
@@ -888,14 +928,14 @@ _schedule_thread(void *arg1, void *arg2)
	int efd;

	current_core = spdk_env_get_current_core();

	reactor = spdk_reactor_get(current_core);
	assert(reactor != NULL);

	TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link);
	reactor->thread_count++;

	if (reactor->interrupt_mode) {
	/* Operate thread intr if running with full interrupt ability */
	if (spdk_interrupt_mode_is_enabled()) {
		int rc;
		struct spdk_thread *thread;

@@ -975,7 +1015,9 @@ _reactor_request_thread_reschedule(struct spdk_thread *thread)
	current_core = spdk_env_get_current_core();
	reactor = spdk_reactor_get(current_core);
	assert(reactor != NULL);
	if (reactor->interrupt_mode) {

	/* Send a notification if the destination reactor is indicated in intr mode state */
	if (spdk_unlikely(spdk_cpuset_get_cpu(&reactor->notify_cpuset, reactor->lcore))) {
		uint64_t notify = 1;

		if (write(reactor->resched_fd, &notify, sizeof(notify)) < 0) {
@@ -1086,7 +1128,7 @@ reactor_schedule_thread_event(void *arg)
	uint32_t count = 0;
	uint64_t notify = 1;

	assert(reactor->interrupt_mode);
	assert(reactor->in_interrupt);

	if (read(reactor->resched_fd, &notify, sizeof(notify)) < 0) {
		SPDK_ERRLOG("failed to acknowledge reschedule: %s.\n", spdk_strerror(errno));
@@ -1141,11 +1183,11 @@ reactor_interrupt_init(struct spdk_reactor *reactor)
		goto err;
	}

	reactor->interrupt_mode = true;
	return 0;

err:
	spdk_fd_group_destroy(reactor->fgrp);
	reactor->fgrp = NULL;
	return rc;
}
#else
+12 −3
Original line number Diff line number Diff line
@@ -53,6 +53,7 @@ test_create_reactor(void)
	CU_ASSERT(spdk_reactor_get(0) == &reactor);

	spdk_ring_free(reactor.events);
	reactor_interrupt_fini(&reactor);
	g_reactors = NULL;
}

@@ -105,6 +106,8 @@ test_event_call(void)
	evt = spdk_event_allocate(0, ut_event_fn, &test1, &test2);
	CU_ASSERT(evt != NULL);

	MOCK_SET(spdk_env_get_current_core, 0);

	spdk_event_call(evt);

	reactor = spdk_reactor_get(0);
@@ -114,6 +117,8 @@ test_event_call(void)
	CU_ASSERT(test1 == 1);
	CU_ASSERT(test2 == 0xFF);

	MOCK_CLEAR(spdk_env_get_current_core);

	spdk_reactors_fini();

	free_cores();
@@ -138,6 +143,8 @@ test_schedule_thread(void)
	spdk_cpuset_set_cpu(&cpuset, 3, true);
	g_next_core = 4;

	MOCK_SET(spdk_env_get_current_core, 3);

	/* _reactor_schedule_thread() will be called in spdk_thread_create()
	 * at its end because it is passed to SPDK thread library by
	 * spdk_thread_lib_init().
@@ -148,8 +155,6 @@ test_schedule_thread(void)
	reactor = spdk_reactor_get(3);
	CU_ASSERT(reactor != NULL);

	MOCK_SET(spdk_env_get_current_core, 3);

	CU_ASSERT(event_queue_run_batch(reactor) == 1);

	MOCK_CLEAR(spdk_env_get_current_core);
@@ -192,6 +197,7 @@ test_reschedule_thread(void)
	spdk_cpuset_set_cpu(&g_reactor_core_mask, 2, true);
	g_next_core = 0;

	MOCK_SET(spdk_env_get_current_core, 1);
	/* Create and schedule the thread to core 1. */
	spdk_cpuset_set_cpu(&cpuset, 1, true);

@@ -201,7 +207,6 @@ test_reschedule_thread(void)

	reactor = spdk_reactor_get(1);
	CU_ASSERT(reactor != NULL);
	MOCK_SET(spdk_env_get_current_core, 1);

	CU_ASSERT(event_queue_run_batch(reactor) == 1);
	CU_ASSERT(TAILQ_FIRST(&reactor->threads) == lw_thread);
@@ -302,12 +307,15 @@ test_for_each_reactor(void)
	for (i = 0; i < 5; i++) {
		reactor = spdk_reactor_get(i);
		CU_ASSERT(reactor != NULL);
		MOCK_SET(spdk_env_get_current_core, i);

		event_queue_run_batch(reactor);
		CU_ASSERT(count == (i + 1));
		CU_ASSERT(done == false);
		MOCK_CLEAR(spdk_env_get_current_core);
	}

	MOCK_SET(spdk_env_get_current_core, 0);
	/* After each reactor is called, the completion calls it one more time. */
	reactor = spdk_reactor_get(0);
	CU_ASSERT(reactor != NULL);
@@ -315,6 +323,7 @@ test_for_each_reactor(void)
	event_queue_run_batch(reactor);
	CU_ASSERT(count == 6);
	CU_ASSERT(done == true);
	MOCK_CLEAR(spdk_env_get_current_core);

	spdk_reactors_fini();