Commit 09895428 authored by Krzysztof Karas's avatar Krzysztof Karas Committed by Tomasz Zawadzki
Browse files

lib/init: manage multiple RPC servers



RPC library now encapsulates RPC server specific data, so it is
possible to run multiple servers at the same time. To utilize that
in init poll each server that was launched through this library and
tie registering/unregistering of the poller to the existence of at
least one RPC server.

Modifications to existing APIs' behavior:
 * spdk_rpc_initialize() launches a new server instead of
   overwriting existing one,
 * spdk_rpc_finish() stops all existing servers instead of
   stopping a single, global server instance,

Addition of new API:
 * spdk_rpc_server_finish() will stop a server listening
   on a given address (if it exists).

Change-Id: I9c794e31f7ee27eb5607925b6c7a8446387670b7
Signed-off-by: default avatarKrzysztof Karas <krzysztof.karas@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/21080


Reviewed-by: default avatarJim Harris <jim.harris@samsung.com>
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarTomasz Zawadzki <tomasz.zawadzki@intel.com>
Community-CI: Mellanox Build Bot
parent 4d505568
Loading
Loading
Loading
Loading
+12 −3
Original line number Diff line number Diff line
@@ -42,8 +42,10 @@ struct spdk_rpc_opts {
SPDK_STATIC_ASSERT(sizeof(struct spdk_rpc_opts) == 24, "Incorrect size");

/**
 * Create the SPDK JSON-RPC server and listen at the provided address. The RPC server is optional and is
 * independent of subsystem initialization. The RPC server can be started and stopped at any time.
 * Create SPDK JSON-RPC server listening at provided address and start polling it for connections.
 *
 * The RPC server is optional and is independent of subsystem initialization.
 * The RPC server can be started and stopped at any time.
 *
 * \param listen_addr Path to a unix domain socket to listen on
 * \param opts Options for JSON-RPC server initialization. If NULL, default values are used.
@@ -54,10 +56,17 @@ int spdk_rpc_initialize(const char *listen_addr,
			const struct spdk_rpc_opts *opts);

/**
 * Shut down the SPDK JSON-RPC target
 * Stop SPDK JSON-RPC servers and stop polling for new connections on all addresses.
 */
void spdk_rpc_finish(void);

/**
 * Stop SPDK JSON-RPC server and stop polling for new connections on provided address.
 *
 * \param listen_addr Path to a unix domain socket.
 */
void spdk_rpc_server_finish(const char *listen_addr);

typedef void (*spdk_subsystem_init_fn)(int rc, void *ctx);

/**
+85 −8
Original line number Diff line number Diff line
@@ -15,10 +15,24 @@

static struct spdk_poller *g_rpc_poller = NULL;

struct init_rpc_server {
	struct spdk_rpc_server *server;
	char listen_addr[sizeof(((struct sockaddr_un *)0)->sun_path)];
	STAILQ_ENTRY(init_rpc_server) link;
};

static STAILQ_HEAD(, init_rpc_server) g_init_rpc_servers = STAILQ_HEAD_INITIALIZER(
			g_init_rpc_servers);

static int
rpc_subsystem_poll(void *arg)
rpc_subsystem_poll_servers(void *arg)
{
	spdk_rpc_accept();
	struct init_rpc_server *init_server;

	STAILQ_FOREACH(init_server, &g_init_rpc_servers, link) {
		spdk_rpc_server_accept(init_server->server);
	}

	return SPDK_POLLER_BUSY;
}

@@ -87,15 +101,32 @@ rpc_set_spdk_log_opts(const struct spdk_rpc_opts *_opts)
	rpc_opts_get_default(&opts, sizeof(opts));
	if (_opts != NULL) {
		rpc_opts_copy(&opts, _opts, _opts->size);
	} else if (!STAILQ_EMPTY(&g_init_rpc_servers)) {
		return;
	}

	spdk_jsonrpc_set_log_file(opts.log_file);
	spdk_jsonrpc_set_log_level(opts.log_level);
}

static struct init_rpc_server *
get_server_by_addr(const char *listen_addr)
{
	struct init_rpc_server *init_server;

	STAILQ_FOREACH(init_server, &g_init_rpc_servers, link) {
		if (strcmp(listen_addr, init_server->listen_addr) == 0) {
			return init_server;
		}
	}

	return NULL;
}

int
spdk_rpc_initialize(const char *listen_addr, const struct spdk_rpc_opts *opts)
{
	struct init_rpc_server *init_server;
	int rc;

	if (listen_addr == NULL) {
@@ -108,10 +139,30 @@ spdk_rpc_initialize(const char *listen_addr, const struct spdk_rpc_opts *opts)
		return rc;
	}

	if (get_server_by_addr(listen_addr) != NULL) {
		SPDK_ERRLOG("Socket listen_addr already in use\n");
		return -EADDRINUSE;
	}

	init_server = calloc(1, sizeof(struct init_rpc_server));
	if (init_server == NULL) {
		SPDK_ERRLOG("Unable to allocate init RPC server\n");
		return -ENOMEM;
	}

	rc = snprintf(init_server->listen_addr, sizeof(init_server->listen_addr), "%s",
		      listen_addr);
	if (rc < 0) {
		SPDK_ERRLOG("Unable to copy listen address %s\n", listen_addr);
		free(init_server);
		return -EINVAL;
	}

	/* Listen on the requested address */
	rc = spdk_rpc_listen(listen_addr);
	if (rc != 0) {
	init_server->server = spdk_rpc_server_listen(listen_addr);
	if (init_server->server == NULL) {
		SPDK_ERRLOG("Unable to start RPC service at %s\n", listen_addr);
		free(init_server);
		/* TODO: Eventually, treat this as an error. But it historically has not
		 * been and many tests rely on this gracefully failing. */
		return 0;
@@ -119,15 +170,41 @@ spdk_rpc_initialize(const char *listen_addr, const struct spdk_rpc_opts *opts)

	rpc_set_spdk_log_opts(opts);

	STAILQ_INSERT_TAIL(&g_init_rpc_servers, init_server, link);
	if (g_rpc_poller == NULL) {
		/* Register a poller to periodically check for RPCs */
	g_rpc_poller = SPDK_POLLER_REGISTER(rpc_subsystem_poll, NULL, RPC_SELECT_INTERVAL);
		g_rpc_poller = SPDK_POLLER_REGISTER(rpc_subsystem_poll_servers, NULL, RPC_SELECT_INTERVAL);
	}

	return 0;
}

void
spdk_rpc_finish(void)
spdk_rpc_server_finish(const char *listen_addr)
{
	spdk_rpc_close();
	struct init_rpc_server *init_server;

	init_server = get_server_by_addr(listen_addr);
	if (!init_server) {
		SPDK_ERRLOG("No server listening on provided address: %s\n", listen_addr);
		return;
	}

	spdk_rpc_server_close(init_server->server);
	STAILQ_REMOVE(&g_init_rpc_servers, init_server, init_rpc_server, link);
	free(init_server);

	if (STAILQ_EMPTY(&g_init_rpc_servers)) {
		spdk_poller_unregister(&g_rpc_poller);
	}
}

void
spdk_rpc_finish(void)
{
	struct init_rpc_server *init_server, *tmp;

	STAILQ_FOREACH_SAFE(init_server, &g_init_rpc_servers, link, tmp) {
		spdk_rpc_server_finish(init_server->listen_addr);
	}
}
+1 −0
Original line number Diff line number Diff line
@@ -12,6 +12,7 @@

	spdk_rpc_initialize;
	spdk_rpc_finish;
	spdk_rpc_server_finish;

	local: *;
};
+1 −0
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@ DEFINE_STUB(spdk_rpc_initialize, int, (const char *listen_addr,
				       const struct spdk_rpc_opts *opts), 0);
DEFINE_STUB_V(spdk_rpc_set_allowlist, (const char **rpc_allowlist));
DEFINE_STUB_V(spdk_rpc_finish, (void));
DEFINE_STUB_V(spdk_rpc_server_finish, (const char *listen_addr));
DEFINE_STUB_V(spdk_subsystem_init_from_json_config, (const char *json_config_file,
		const char *rpc_addr,
		spdk_subsystem_init_fn cb_fn, void *cb_arg, bool stop_on_error));
+1 −1
Original line number Diff line number Diff line
@@ -6,7 +6,7 @@
SPDK_ROOT_DIR := $(abspath $(CURDIR)/../../../..)
include $(SPDK_ROOT_DIR)/mk/spdk.common.mk

DIRS-y = subsystem.c
DIRS-y = subsystem.c rpc.c

.PHONY: all clean $(DIRS-y)

Loading