Commit 4e003b67 authored by Daniel Verkamp's avatar Daniel Verkamp
Browse files

jsonrpc: allow asynchronous request completion



Move the per-connection send buffer into each request, and allow a
connection to have a queue of responses ready to be sent.

Change-Id: If6b2151691c4cd76f3cf7cde0cdd8f20cac77ceb
Signed-off-by: default avatarDaniel Verkamp <daniel.verkamp@intel.com>
Reviewed-on: https://review.gerrithub.io/368470


Tested-by: default avatarSPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarBen Walker <benjamin.walker@intel.com>
parent 977fd230
Loading
Loading
Loading
Loading
+12 −3
Original line number Diff line number Diff line
@@ -36,6 +36,7 @@

#include "spdk/stdinc.h"

#include "spdk/env.h"
#include "spdk/jsonrpc.h"

#include "spdk_internal/log.h"
@@ -52,16 +53,22 @@ struct spdk_jsonrpc_request {
	/* Copy of request id value */
	struct spdk_json_val id;
	uint8_t id_data[SPDK_JSONRPC_ID_MAX_LEN];

	size_t send_len;
	size_t send_offset;
	uint8_t send_buf[SPDK_JSONRPC_SEND_BUF_SIZE];
};

struct spdk_jsonrpc_server_conn {
	struct spdk_jsonrpc_server *server;
	int sockfd;
	bool closed;
	struct spdk_json_val values[SPDK_JSONRPC_MAX_VALUES];
	size_t recv_len;
	uint8_t recv_buf[SPDK_JSONRPC_RECV_BUF_SIZE];
	size_t send_len;
	uint8_t send_buf[SPDK_JSONRPC_SEND_BUF_SIZE];
	uint32_t outstanding_requests;
	struct spdk_ring *send_queue;
	struct spdk_jsonrpc_request *send_request;
};

struct spdk_jsonrpc_server {
@@ -73,13 +80,15 @@ struct spdk_jsonrpc_server {
};

/* jsonrpc_server_tcp */
int spdk_jsonrpc_server_write_cb(void *cb_ctx, const void *data, size_t size);
void spdk_jsonrpc_server_handle_request(struct spdk_jsonrpc_request *request,
					const struct spdk_json_val *method,
					const struct spdk_json_val *params);
void spdk_jsonrpc_server_handle_error(struct spdk_jsonrpc_request *request, int error);
void spdk_jsonrpc_server_send_response(struct spdk_jsonrpc_server_conn *conn,
				       struct spdk_jsonrpc_request *request);

/* jsonrpc_server */
int spdk_jsonrpc_parse_request(struct spdk_jsonrpc_server_conn *conn, void *json, size_t size);
void spdk_jsonrpc_free_request(struct spdk_jsonrpc_request *request);

#endif
+39 −13
Original line number Diff line number Diff line
@@ -136,10 +136,14 @@ spdk_jsonrpc_parse_request(struct spdk_jsonrpc_server_conn *conn, void *json, si
		return -1;
	}

	conn->outstanding_requests++;

	request->conn = conn;
	request->id.start = request->id_data;
	request->id.len = 0;
	request->id.type = SPDK_JSON_VAL_INVALID;
	request->send_offset = 0;
	request->send_len = 0;

	if (rc < 0 || rc > SPDK_JSONRPC_MAX_VALUES) {
		SPDK_TRACELOG(SPDK_TRACE_RPC, "JSON parse error\n");
@@ -176,12 +180,28 @@ spdk_jsonrpc_parse_request(struct spdk_jsonrpc_server_conn *conn, void *json, si
	return end - json;
}

static int
spdk_jsonrpc_server_write_cb(void *cb_ctx, const void *data, size_t size)
{
	struct spdk_jsonrpc_request *request = cb_ctx;

	if (SPDK_JSONRPC_SEND_BUF_SIZE - request->send_len < size) {
		SPDK_ERRLOG("Not enough space in send buf\n");
		return -1;
	}

	memcpy(request->send_buf + request->send_len, data, size);
	request->send_len += size;

	return 0;
}

static struct spdk_json_write_ctx *
begin_response(struct spdk_jsonrpc_server_conn *conn, const struct spdk_json_val *id)
begin_response(struct spdk_jsonrpc_request *request)
{
	struct spdk_json_write_ctx *w;

	w = spdk_json_write_begin(spdk_jsonrpc_server_write_cb, conn, 0);
	w = spdk_json_write_begin(spdk_jsonrpc_server_write_cb, request, 0);
	if (w == NULL) {
		return NULL;
	}
@@ -191,17 +211,25 @@ begin_response(struct spdk_jsonrpc_server_conn *conn, const struct spdk_json_val
	spdk_json_write_string(w, "2.0");

	spdk_json_write_name(w, "id");
	spdk_json_write_val(w, id);
	spdk_json_write_val(w, &request->id);

	return w;
}

static void
end_response(struct spdk_jsonrpc_server_conn *conn, struct spdk_json_write_ctx *w)
end_response(struct spdk_jsonrpc_request *request, struct spdk_json_write_ctx *w)
{
	spdk_json_write_object_end(w);
	spdk_json_write_end(w);
	spdk_jsonrpc_server_write_cb(conn, "\n", 1);
	spdk_jsonrpc_server_write_cb(request, "\n", 1);
	spdk_jsonrpc_server_send_response(request->conn, request);
}

void
spdk_jsonrpc_free_request(struct spdk_jsonrpc_request *request)
{
	request->conn->outstanding_requests--;
	free(request);
}

struct spdk_json_write_ctx *
@@ -211,13 +239,13 @@ spdk_jsonrpc_begin_result(struct spdk_jsonrpc_request *request)

	if (request->id.type == SPDK_JSON_VAL_INVALID) {
		/* Notification - no response required */
		free(request);
		spdk_jsonrpc_free_request(request);
		return NULL;
	}

	w = begin_response(request->conn, &request->id);
	w = begin_response(request);
	if (w == NULL) {
		free(request);
		spdk_jsonrpc_free_request(request);
		return NULL;
	}

@@ -231,8 +259,7 @@ spdk_jsonrpc_end_result(struct spdk_jsonrpc_request *request, struct spdk_json_w
{
	assert(w != NULL);

	end_response(request->conn, w);
	free(request);
	end_response(request, w);
}

void
@@ -246,7 +273,7 @@ spdk_jsonrpc_send_error_response(struct spdk_jsonrpc_request *request,
		request->id.type = SPDK_JSON_VAL_NULL;
	}

	w = begin_response(request->conn, &request->id);
	w = begin_response(request);
	if (w == NULL) {
		free(request);
		return;
@@ -260,8 +287,7 @@ spdk_jsonrpc_send_error_response(struct spdk_jsonrpc_request *request,
	spdk_json_write_string(w, msg);
	spdk_json_write_object_end(w);

	end_response(request->conn, w);
	free(request);
	end_response(request, w);
}

SPDK_LOG_REGISTER_TRACE_FLAG("rpc", SPDK_TRACE_RPC)
+92 −39
Original line number Diff line number Diff line
@@ -115,6 +115,8 @@ spdk_jsonrpc_server_conn_remove(struct spdk_jsonrpc_server_conn *conn)

	close(conn->sockfd);

	spdk_ring_free(conn->send_queue);

	/* Swap conn with the last entry in conns */
	server->conns[conn_idx] = server->conns[server->num_conns - 1];
	server->num_conns--;
@@ -134,8 +136,16 @@ spdk_jsonrpc_server_accept(struct spdk_jsonrpc_server *server)
		conn = &server->conns[conn_idx];
		conn->server = server;
		conn->sockfd = rc;
		conn->closed = false;
		conn->recv_len = 0;
		conn->send_len = 0;
		conn->outstanding_requests = 0;
		conn->send_request = NULL;
		conn->send_queue = spdk_ring_create(SPDK_RING_TYPE_SP_SC, 128, SPDK_ENV_SOCKET_ID_ANY);
		if (conn->send_queue == NULL) {
			SPDK_ERRLOG("send_queue allocation failed\n");
			close(conn->sockfd);
			return -1;
		}

		nonblock = 1;
		rc = ioctl(conn->sockfd, FIONBIO, &nonblock);
@@ -162,22 +172,6 @@ spdk_jsonrpc_server_accept(struct spdk_jsonrpc_server *server)
	return -1;
}

int
spdk_jsonrpc_server_write_cb(void *cb_ctx, const void *data, size_t size)
{
	struct spdk_jsonrpc_server_conn *conn = cb_ctx;

	if (SPDK_JSONRPC_SEND_BUF_SIZE - conn->send_len < size) {
		SPDK_ERRLOG("Not enough space in send buf\n");
		return -1;
	}

	memcpy(conn->send_buf + conn->send_len, data, size);
	conn->send_len += size;

	return 0;
}

void
spdk_jsonrpc_server_handle_request(struct spdk_jsonrpc_request *request,
				   const struct spdk_json_val *method, const struct spdk_json_val *params)
@@ -261,12 +255,39 @@ spdk_jsonrpc_server_conn_recv(struct spdk_jsonrpc_server_conn *conn)
	return 0;
}

void
spdk_jsonrpc_server_send_response(struct spdk_jsonrpc_server_conn *conn,
				  struct spdk_jsonrpc_request *request)
{
	/* Queue the response to be sent */
	spdk_ring_enqueue(conn->send_queue, (void **)&request, 1);
}

static int
spdk_jsonrpc_server_conn_send(struct spdk_jsonrpc_server_conn *conn)
{
	struct spdk_jsonrpc_request *request;
	ssize_t rc;

	rc = send(conn->sockfd, conn->send_buf, conn->send_len, 0);
more:
	if (conn->outstanding_requests == 0) {
		return 0;
	}

	if (conn->send_request == NULL) {
		if (spdk_ring_dequeue(conn->send_queue, (void **)&conn->send_request, 1) != 1) {
			return 0;
		}
	}

	request = conn->send_request;
	if (request == NULL) {
		/* Nothing to send right now */
		return 0;
	}

	rc = send(conn->sockfd, request->send_buf + request->send_offset,
		  request->send_len, 0);
	if (rc < 0) {
		if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
			return 0;
@@ -281,7 +302,18 @@ spdk_jsonrpc_server_conn_send(struct spdk_jsonrpc_server_conn *conn)
		return -1;
	}

	conn->send_len -= rc;
	request->send_offset += rc;
	request->send_len -= rc;

	if (request->send_len == 0) {
		/*
		 * Full response has been sent.
		 * Free it and set send_request to NULL to move on to the next queued response.
		 */
		conn->send_request = NULL;
		spdk_jsonrpc_free_request(request);
		goto more;
	}

	return 0;
}
@@ -321,31 +353,52 @@ spdk_jsonrpc_server_poll(struct spdk_jsonrpc_server *server)
	for (i = 0; i < server->num_conns; i++) {
		pfd = &server->pollfds[i + 1];
		conn = &server->conns[i];
		if (conn->send_len) {

		if (conn->closed) {
			struct spdk_jsonrpc_request *request;

			/*
			 * If there is any data to send, keep sending it until the send buffer
			 *  is empty.  Each response should be allowed the full send buffer, so
			 *  don't accept any new requests until the previous response is sent out.
			 * The client closed the connection, but there may still be requests
			 * outstanding; we have no way to cancel outstanding requests, so wait until
			 * each outstanding request sends a response (which will be discarded, since
			 * the connection is closed).
			 */

			if (conn->send_request) {
				spdk_jsonrpc_free_request(conn->send_request);
				conn->send_request = NULL;
			}

			while (spdk_ring_dequeue(conn->send_queue, (void **)&request, 1) == 1) {
				spdk_jsonrpc_free_request(request);
			}

			if (conn->outstanding_requests == 0) {
				SPDK_TRACELOG(SPDK_TRACE_RPC, "all outstanding requests completed\n");
				spdk_jsonrpc_server_conn_remove(conn);
			}

			continue;
		}

		if (pfd->revents & POLLOUT) {
			rc = spdk_jsonrpc_server_conn_send(conn);
			if (rc != 0) {
				SPDK_TRACELOG(SPDK_TRACE_RPC, "closing conn due to send failure\n");
					spdk_jsonrpc_server_conn_remove(conn);
				conn->closed = true;
				continue;
			}
		}
		} else {
			/*
			 * No data to send - we can receive a new request.
			 */

		if (pfd->revents & POLLIN) {
			rc = spdk_jsonrpc_server_conn_recv(conn);
			if (rc != 0) {
				SPDK_TRACELOG(SPDK_TRACE_RPC, "closing conn due to recv failure\n");
					spdk_jsonrpc_server_conn_remove(conn);
				}
				conn->closed = true;
				continue;
			}
		}

		pfd->revents = 0;
	}

+3 −3
Original line number Diff line number Diff line
@@ -208,11 +208,11 @@ spdk_jsonrpc_server_handle_request(struct spdk_jsonrpc_request *request,
	ut_handle(request, 0, method, params);
}

int
spdk_jsonrpc_server_write_cb(void *cb_ctx, const void *data, size_t size)
void
spdk_jsonrpc_server_send_response(struct spdk_jsonrpc_server_conn *conn,
				  struct spdk_jsonrpc_request *request)
{
	/* TODO */
	return -1;
}

static void