Commit 2b79861d authored by Ben Walker's avatar Ben Walker
Browse files

nvmf: Make spdk_nvmf_tgt_listen asynchronous



This was internally asynchronous already, but make it
explicitly asynchronous so other code can properly
wait on the operation to complete.

This fixes an intermittent CI crash.

Change-Id: I81c9b19673566047dcffa94796236ca9fd7fa7d0
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.gerrithub.io/406226


Tested-by: default avatarSPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: default avatarDaniel Verkamp <daniel.verkamp@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
parent 9770ee78
Loading
Loading
Loading
Loading
+10 −3
Original line number Diff line number Diff line
@@ -90,6 +90,8 @@ struct spdk_nvmf_tgt *spdk_nvmf_tgt_create(struct spdk_nvmf_tgt_opts *opts);
 */
void spdk_nvmf_tgt_destroy(struct spdk_nvmf_tgt *tgt);

typedef void (*spdk_nvmf_tgt_listen_done_fn)(void *ctx, int status);

/**
 * Begin accepting new connections at the address provided.
 *
@@ -99,11 +101,16 @@ void spdk_nvmf_tgt_destroy(struct spdk_nvmf_tgt *tgt);
 *
 * \param tgt The target associated with this listen address.
 * \param trid The address to listen at.
 * \param cb_fn A callback that will be called once the target is listening
 * \param cb_arg A context argument passed to cb_fn.
 *
 * \return 0 on success, or negated errno on failure.
 * \return void. The callback status argument will be 0 on success
 *	   or a negated errno on failure.
 */
int spdk_nvmf_tgt_listen(struct spdk_nvmf_tgt *tgt,
			 struct spdk_nvme_transport_id *trid);
void spdk_nvmf_tgt_listen(struct spdk_nvmf_tgt *tgt,
			  struct spdk_nvme_transport_id *trid,
			  spdk_nvmf_tgt_listen_done_fn cb_fn,
			  void *cb_arg);

typedef void (*new_qpair_fn)(struct spdk_nvmf_qpair *qpair);

+11 −6
Original line number Diff line number Diff line
@@ -128,6 +128,16 @@ spdk_nvmf_parse_nvmf_tgt(void)
	return 0;
}

static void
spdk_nvmf_tgt_listen_done(void *cb_arg, int status)
{
	/* TODO: Config parsing should wait for this operation to finish. */

	if (status) {
		SPDK_ERRLOG("Failed to listen on transport address\n");
	}
}

static int
spdk_nvmf_parse_subsystem(struct spdk_conf_section *sp)
{
@@ -284,12 +294,7 @@ spdk_nvmf_parse_subsystem(struct spdk_conf_section *sp)
		snprintf(trid.trsvcid, sizeof(trid.trsvcid), "%s", port);
		free(address_dup);

		ret = spdk_nvmf_tgt_listen(g_spdk_nvmf_tgt, &trid);
		if (ret) {
			SPDK_ERRLOG("Failed to listen on transport %s address %s\n",
				    transport, address);
			continue;
		}
		spdk_nvmf_tgt_listen(g_spdk_nvmf_tgt, &trid, spdk_nvmf_tgt_listen_done, NULL);

		spdk_nvmf_subsystem_add_listener(subsystem, &trid);
	}
+158 −59
Original line number Diff line number Diff line
@@ -538,6 +538,7 @@ struct rpc_subsystem {
static void
free_rpc_subsystem(struct rpc_subsystem *req)
{
	if (req) {
		free(req->mode);
		free(req->nqn);
		free(req->serial_number);
@@ -545,6 +546,8 @@ free_rpc_subsystem(struct rpc_subsystem *req)
		free_rpc_listen_addresses(&req->listen_addresses);
		free_rpc_hosts(&req->hosts);
	}
	free(req);
}

static void
spdk_rpc_nvmf_subsystem_started(struct spdk_nvmf_subsystem *subsystem,
@@ -574,19 +577,80 @@ static const struct spdk_json_object_decoder rpc_subsystem_decoders[] = {
	{"max_namespaces", offsetof(struct rpc_subsystem, num_ns), spdk_json_decode_uint32, true},
};

struct subsystem_listen_ctx {
	struct rpc_subsystem *req;
	struct spdk_nvmf_subsystem *subsystem;
	struct spdk_jsonrpc_request *request;

	uint32_t idx;
};

static void
spdk_rpc_construct_subsystem_listen_done(void *cb_arg, int status)
{
	struct subsystem_listen_ctx *ctx = cb_arg;
	struct rpc_listen_address *addr;
	struct spdk_nvme_transport_id trid = {0};

	if (status) {
		goto invalid;
	}

	addr = &ctx->req->listen_addresses.addresses[ctx->idx];
	if (rpc_listen_address_to_trid(addr, &trid)) {
		goto invalid;
	}

	spdk_nvmf_subsystem_add_listener(ctx->subsystem, &trid);

	ctx->idx++;

	if (ctx->idx < ctx->req->listen_addresses.num_listen_address) {
		addr = &ctx->req->listen_addresses.addresses[ctx->idx];

		if (rpc_listen_address_to_trid(addr, &trid)) {
			goto invalid;
		}

		spdk_nvmf_tgt_listen(g_spdk_nvmf_tgt, &trid, spdk_rpc_construct_subsystem_listen_done, ctx);
		return;
	}

	spdk_nvmf_subsystem_start(ctx->subsystem,
				  spdk_rpc_nvmf_subsystem_started,
				  ctx->request);

	free_rpc_subsystem(ctx->req);
	free(ctx);

	return;

invalid:
	spdk_nvmf_subsystem_destroy(ctx->subsystem);
	spdk_jsonrpc_send_error_response(ctx->request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
					 "Invalid parameters");
	free_rpc_subsystem(ctx->req);
	free(ctx);
}

static void
spdk_rpc_construct_nvmf_subsystem(struct spdk_jsonrpc_request *request,
				  const struct spdk_json_val *params)
{
	struct rpc_subsystem req = {};
	struct rpc_subsystem *req;
	struct spdk_nvmf_subsystem *subsystem;
	size_t i;

	req.core = -1;	/* Explicitly set the core as the uninitialized value */
	req = calloc(1, sizeof(*req));
	if (!req) {
		goto invalid;
	}

	req->core = -1;	/* Explicitly set the core as the uninitialized value */

	if (spdk_json_decode_object(params, rpc_subsystem_decoders,
				    SPDK_COUNTOF(rpc_subsystem_decoders),
				    &req)) {
				    req)) {
		SPDK_ERRLOG("spdk_json_decode_object failed\n");
		goto invalid;
	}
@@ -594,10 +658,10 @@ spdk_rpc_construct_nvmf_subsystem(struct spdk_jsonrpc_request *request,
	/* Mode is no longer a valid parameter, but print out a nice
	 * message if it exists to inform users.
	 */
	if (req.mode) {
	if (req->mode) {
		SPDK_NOTICELOG("Mode present in the construct NVMe-oF subsystem RPC.\n"
			       "Mode was removed as a valid parameter.\n");
		if (strcasecmp(req.mode, "Virtual") == 0) {
		if (strcasecmp(req->mode, "Virtual") == 0) {
			SPDK_NOTICELOG("Your mode value is 'Virtual' which is now the only possible mode.\n"
				       "Your RPC will work as expected.\n");
		} else {
@@ -609,46 +673,31 @@ spdk_rpc_construct_nvmf_subsystem(struct spdk_jsonrpc_request *request,
	/* Core is no longer a valid parameter, but print out a nice
	 * message if it exists to inform users.
	 */
	if (req.core != -1) {
	if (req->core != -1) {
		SPDK_NOTICELOG("Core present in the construct NVMe-oF subsystem RPC.\n"
			       "Core was removed as an option. Subsystems can now run on all available cores.\n");
		SPDK_NOTICELOG("Ignoring it and continuing.\n");
	}

	subsystem = spdk_nvmf_subsystem_create(g_spdk_nvmf_tgt, req.nqn, SPDK_NVMF_SUBTYPE_NVME,
					       req.num_ns);
	subsystem = spdk_nvmf_subsystem_create(g_spdk_nvmf_tgt, req->nqn, SPDK_NVMF_SUBTYPE_NVME,
					       req->num_ns);
	if (!subsystem) {
		goto invalid;
	}

	if (spdk_nvmf_subsystem_set_sn(subsystem, req.serial_number)) {
		SPDK_ERRLOG("Subsystem %s: invalid serial number '%s'\n", req.nqn, req.serial_number);
	if (spdk_nvmf_subsystem_set_sn(subsystem, req->serial_number)) {
		SPDK_ERRLOG("Subsystem %s: invalid serial number '%s'\n", req->nqn, req->serial_number);
		goto invalid;
	}

	for (i = 0; i < req.hosts.num_hosts; i++) {
		spdk_nvmf_subsystem_add_host(subsystem, req.hosts.hosts[i]);
	for (i = 0; i < req->hosts.num_hosts; i++) {
		spdk_nvmf_subsystem_add_host(subsystem, req->hosts.hosts[i]);
	}

	spdk_nvmf_subsystem_set_allow_any_host(subsystem, req.allow_any_host);

	for (i = 0; i < req.listen_addresses.num_listen_address; i++) {
		struct rpc_listen_address *addr = &req.listen_addresses.addresses[i];
		struct spdk_nvme_transport_id trid = {0};
	spdk_nvmf_subsystem_set_allow_any_host(subsystem, req->allow_any_host);

		if (rpc_listen_address_to_trid(addr, &trid)) {
			goto invalid;
		}

		if (spdk_nvmf_tgt_listen(g_spdk_nvmf_tgt, &trid)) {
			goto invalid;
		}

		spdk_nvmf_subsystem_add_listener(subsystem, &trid);
	}

	for (i = 0; i < req.namespaces.num_ns; i++) {
		struct spdk_nvmf_ns_params *ns_params = &req.namespaces.ns_params[i];
	for (i = 0; i < req->namespaces.num_ns; i++) {
		struct spdk_nvmf_ns_params *ns_params = &req->namespaces.ns_params[i];
		struct spdk_bdev *bdev;
		struct spdk_nvmf_ns_opts ns_opts;

@@ -675,7 +724,36 @@ spdk_rpc_construct_nvmf_subsystem(struct spdk_jsonrpc_request *request,
		}
	}

	free_rpc_subsystem(&req);
	if (req->listen_addresses.num_listen_address > 0) {
		struct rpc_listen_address *addr;
		struct spdk_nvme_transport_id trid = {0};
		struct subsystem_listen_ctx *ctx;

		ctx = calloc(1, sizeof(*ctx));
		if (!ctx) {
			spdk_nvmf_subsystem_destroy(subsystem);
			spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR, "No Memory");
			free_rpc_subsystem(req);
			return;
		}

		ctx->req = req;
		ctx->subsystem = subsystem;
		ctx->request = request;
		ctx->idx = 0;

		addr = &req->listen_addresses.addresses[0];

		if (rpc_listen_address_to_trid(addr, &trid)) {
			free(ctx);
			goto invalid;
		}

		spdk_nvmf_tgt_listen(g_spdk_nvmf_tgt, &trid, spdk_rpc_construct_subsystem_listen_done, ctx);
		return;
	}

	free_rpc_subsystem(req);

	spdk_nvmf_subsystem_start(subsystem,
				  spdk_rpc_nvmf_subsystem_started,
@@ -685,7 +763,7 @@ spdk_rpc_construct_nvmf_subsystem(struct spdk_jsonrpc_request *request,

invalid:
	spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS, "Invalid parameters");
	free_rpc_subsystem(&req);
	free_rpc_subsystem(req);
}
SPDK_RPC_REGISTER("construct_nvmf_subsystem", spdk_rpc_construct_nvmf_subsystem)

@@ -766,6 +844,7 @@ enum nvmf_rpc_listen_op {

struct nvmf_rpc_listener_ctx {
	char				*nqn;
	struct spdk_nvmf_subsystem	*subsystem;
	struct rpc_listen_address	address;

	struct spdk_jsonrpc_request	*request;
@@ -814,42 +893,58 @@ nvmf_rpc_listen_resumed(struct spdk_nvmf_subsystem *subsystem,
}

static void
nvmf_rpc_listen_paused(struct spdk_nvmf_subsystem *subsystem,
		       void *cb_arg, int status)
nvmf_rpc_tgt_listen(void *cb_arg, int status)
{
	struct nvmf_rpc_listener_ctx *ctx = cb_arg;

	if (ctx->op == NVMF_RPC_LISTEN_ADD) {
		if (spdk_nvmf_tgt_listen(g_spdk_nvmf_tgt, &ctx->trid)) {
			SPDK_ERRLOG("Unable to add listener.\n");
			goto invalid;
	if (status) {
		spdk_jsonrpc_send_error_response(ctx->request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
						 "Invalid parameters");
		ctx->response_sent = true;
	} else {
		if (spdk_nvmf_subsystem_add_listener(ctx->subsystem, &ctx->trid)) {
			spdk_jsonrpc_send_error_response(ctx->request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
							 "Invalid parameters");
			ctx->response_sent = true;
		}
	}

		if (spdk_nvmf_subsystem_add_listener(subsystem, &ctx->trid)) {
			goto invalid;
	if (spdk_nvmf_subsystem_resume(ctx->subsystem, nvmf_rpc_listen_resumed, ctx)) {
		if (!ctx->response_sent) {
			spdk_jsonrpc_send_error_response(ctx->request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR, "Internal error");
		}
		nvmf_rpc_listener_ctx_free(ctx);
		/* Can't really do anything to recover here - subsystem will remain paused. */
	}
}

static void
nvmf_rpc_listen_paused(struct spdk_nvmf_subsystem *subsystem,
		       void *cb_arg, int status)
{
	struct nvmf_rpc_listener_ctx *ctx = cb_arg;

	if (ctx->op == NVMF_RPC_LISTEN_ADD) {
		spdk_nvmf_tgt_listen(g_spdk_nvmf_tgt, &ctx->trid, nvmf_rpc_tgt_listen, ctx);
		return;
	} else if (ctx->op == NVMF_RPC_LISTEN_REMOVE) {
		if (spdk_nvmf_subsystem_remove_listener(subsystem, &ctx->trid)) {
			SPDK_ERRLOG("Unable to remove listener.\n");
			goto invalid;
			spdk_jsonrpc_send_error_response(ctx->request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
							 "Invalid parameters");
			ctx->response_sent = true;
		}
	} else {
		goto invalid;
	}
	if (spdk_nvmf_subsystem_resume(subsystem, nvmf_rpc_listen_resumed, ctx)) {
		spdk_jsonrpc_send_error_response(ctx->request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR, "Internal error");
		nvmf_rpc_listener_ctx_free(ctx);
		return;
	}
	return;

invalid:
		spdk_jsonrpc_send_error_response(ctx->request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
						 "Invalid parameters");
		ctx->response_sent = true;
	}

	if (spdk_nvmf_subsystem_resume(subsystem, nvmf_rpc_listen_resumed, ctx)) {
		SPDK_ERRLOG("Failed to resume subsystem\n");
		if (!ctx->response_sent) {
			spdk_jsonrpc_send_error_response(ctx->request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR, "Internal error");
		}
		nvmf_rpc_listener_ctx_free(ctx);
		/* Can't really do anything to recover here - subsystem will remain paused. */
	}
}
@@ -886,6 +981,8 @@ nvmf_rpc_subsystem_add_listener(struct spdk_jsonrpc_request *request,
		return;
	}

	ctx->subsystem = subsystem;

	if (rpc_listen_address_to_trid(&ctx->address, &ctx->trid)) {
		spdk_jsonrpc_send_error_response(ctx->request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
						 "Invalid parameters");
@@ -935,6 +1032,8 @@ nvmf_rpc_subsystem_remove_listener(struct spdk_jsonrpc_request *request,
		return;
	}

	ctx->subsystem = subsystem;

	if (rpc_listen_address_to_trid(&ctx->address, &ctx->trid)) {
		spdk_jsonrpc_send_error_response(ctx->request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
						 "Invalid parameters");
+37 −24
Original line number Diff line number Diff line
@@ -215,14 +215,20 @@ spdk_nvmf_tgt_destroy(struct spdk_nvmf_tgt *tgt)
}

struct spdk_nvmf_tgt_listen_ctx {
	struct spdk_nvmf_tgt *tgt;
	struct spdk_nvmf_transport *transport;
	struct spdk_nvme_transport_id trid;

	spdk_nvmf_tgt_listen_done_fn cb_fn;
	void *cb_arg;
};

static void
spdk_nvmf_tgt_listen_done(struct spdk_io_channel_iter *i, int status)
{
	void *ctx = spdk_io_channel_iter_get_ctx(i);
	struct spdk_nvmf_tgt_listen_ctx *ctx = spdk_io_channel_iter_get_ctx(i);

	ctx->cb_fn(ctx->cb_arg, status);

	free(ctx);
}
@@ -239,53 +245,60 @@ spdk_nvmf_tgt_listen_add_transport(struct spdk_io_channel_iter *i)
	spdk_for_each_channel_continue(i, rc);
}

int
void
spdk_nvmf_tgt_listen(struct spdk_nvmf_tgt *tgt,
		     struct spdk_nvme_transport_id *trid)
		     struct spdk_nvme_transport_id *trid,
		     spdk_nvmf_tgt_listen_done_fn cb_fn,
		     void *cb_arg)
{
	struct spdk_nvmf_transport *transport;
	int rc;
	bool propagate = false;

	transport = spdk_nvmf_tgt_get_transport(tgt, trid->trtype);
	if (!transport) {
		struct spdk_nvmf_tgt_listen_ctx *ctx;

		transport = spdk_nvmf_transport_create(tgt, trid->trtype);
		if (!transport) {
			SPDK_ERRLOG("Transport initialization failed\n");
			return -EINVAL;
			cb_fn(cb_arg, -EINVAL);
			return;
		}
		TAILQ_INSERT_TAIL(&tgt->transports, transport, link);

		propagate = true;
	}

	rc = spdk_nvmf_transport_listen(transport, trid);
	if (rc < 0) {
		SPDK_ERRLOG("Unable to listen on address '%s'\n", trid->traddr);
		cb_fn(cb_arg, rc);
		return;
	}

	tgt->discovery_genctr++;

	if (propagate) {
		struct spdk_nvmf_tgt_listen_ctx *ctx;

		ctx = calloc(1, sizeof(*ctx));
		if (!ctx) {
			return -ENOMEM;
			cb_fn(cb_arg, -ENOMEM);
			return;
		}

		ctx->trid = *trid;
		ctx->tgt = tgt;
		ctx->transport = transport;
		ctx->trid = *trid;
		ctx->cb_fn = cb_fn;
		ctx->cb_arg = cb_arg;

		/* Send a message to each poll group to notify it that a new transport
		 * is available.
		 * TODO: This call does not currently allow the user to wait for these
		 * messages to propagate. It also does not protect against two calls
		 * to this function overlapping
		 */
		spdk_for_each_channel(tgt,
				      spdk_nvmf_tgt_listen_add_transport,
				      ctx,
				      spdk_nvmf_tgt_listen_done);
	} else {
		cb_fn(cb_arg, 0);
	}

	rc = spdk_nvmf_transport_listen(transport, trid);
	if (rc < 0) {
		SPDK_ERRLOG("Unable to listen on address '%s'\n", trid->traddr);
		return rc;
	}

	tgt->discovery_genctr++;

	return 0;
}

struct spdk_nvmf_subsystem *