Commit 11e25ed2 authored by Ziye Yang's avatar Ziye Yang Committed by Tomasz Zawadzki
Browse files

uring: Add the Internally buffer reads support



Do large reads from the socket and buffer into a pipe.

Signed-off-by: default avatarZiye Yang <ziye.yang@intel.com>
Change-Id: Ibce74917a1cd7248ff1a325204c0d338cc6a3470
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/1349


Community-CI: Mellanox Build Bot
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: default avatarBen Walker <benjamin.walker@intel.com>
parent 74e5708b
Loading
Loading
Loading
Loading
+220 −15
Original line number Diff line number Diff line
@@ -40,6 +40,7 @@
#include "spdk/barrier.h"
#include "spdk/likely.h"
#include "spdk/log.h"
#include "spdk/pipe.h"
#include "spdk/sock.h"
#include "spdk/string.h"
#include "spdk/util.h"
@@ -82,6 +83,11 @@ struct spdk_uring_sock {
	struct spdk_uring_task			write_task;
	struct spdk_uring_task			pollin_task;
	int					outstanding_io;
	struct spdk_pipe			*recv_pipe;
	void					*recv_buf;
	int					recv_buf_sz;
	bool					pending_recv;
	TAILQ_ENTRY(spdk_uring_sock)		link;
};

struct spdk_uring_sock_group_impl {
@@ -90,6 +96,7 @@ struct spdk_uring_sock_group_impl {
	uint32_t				io_inflight;
	uint32_t				io_queued;
	uint32_t				io_avail;
	TAILQ_HEAD(, spdk_uring_sock)		pending_recv;
};

#define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request)))
@@ -202,6 +209,70 @@ enum spdk_uring_sock_create_type {
	SPDK_SOCK_CREATE_CONNECT,
};

static int
spdk_uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz)
{
	uint8_t *new_buf;
	struct spdk_pipe *new_pipe;
	struct iovec siov[2];
	struct iovec diov[2];
	int sbytes;
	ssize_t bytes;

	if (sock->recv_buf_sz == sz) {
		return 0;
	}

	/* If the new size is 0, just free the pipe */
	if (sz == 0) {
		spdk_pipe_destroy(sock->recv_pipe);
		free(sock->recv_buf);
		sock->recv_pipe = NULL;
		sock->recv_buf = NULL;
		return 0;
	}

	/* Round up to next 64 byte multiple */
	new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
	if (!new_buf) {
		SPDK_ERRLOG("socket recv buf allocation failed\n");
		return -ENOMEM;
	}

	new_pipe = spdk_pipe_create(new_buf, sz + 1);
	if (new_pipe == NULL) {
		SPDK_ERRLOG("socket pipe allocation failed\n");
		free(new_buf);
		return -ENOMEM;
	}

	if (sock->recv_pipe != NULL) {
		/* Pull all of the data out of the old pipe */
		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
		if (sbytes > sz) {
			/* Too much data to fit into the new pipe size */
			spdk_pipe_destroy(new_pipe);
			free(new_buf);
			return -EINVAL;
		}

		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
		assert(sbytes == sz);

		bytes = spdk_iovcpy(siov, 2, diov, 2);
		spdk_pipe_writer_advance(new_pipe, bytes);

		spdk_pipe_destroy(sock->recv_pipe);
		free(sock->recv_buf);
	}

	sock->recv_buf_sz = sz;
	sock->recv_buf = new_buf;
	sock->recv_pipe = new_pipe;

	return 0;
}

static int
spdk_uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
{
@@ -210,6 +281,16 @@ spdk_uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz)

	assert(sock != NULL);

#ifndef __aarch64__
	/* On ARM systems, this buffering does not help. Skip it. */
	/* The size of the pipe is purely derived from benchmarks. It seems to work well. */
	rc = spdk_uring_sock_alloc_pipe(sock, sz);
	if (rc) {
		SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock);
		return rc;
	}
#endif

	if (sz < SO_RCVBUF_SIZE) {
		sz = SO_RCVBUF_SIZE;
	}
@@ -254,7 +335,6 @@ _spdk_uring_sock_alloc(int fd)
	}

	sock->fd = fd;

	return sock;
}

@@ -469,6 +549,9 @@ spdk_uring_sock_close(struct spdk_sock *_sock)

	assert(TAILQ_EMPTY(&_sock->pending_reqs));
	assert(sock->group == NULL);

	spdk_pipe_destroy(sock->recv_pipe);
	free(sock->recv_buf);
	rc = close(sock->fd);
	if (rc == 0) {
		free(sock);
@@ -478,21 +561,110 @@ spdk_uring_sock_close(struct spdk_sock *_sock)
}

static ssize_t
spdk_uring_sock_recv(struct spdk_sock *_sock, void *buf, size_t len)
spdk_uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt)
{
	struct spdk_uring_sock *sock = __uring_sock(_sock);
	struct iovec siov[2];
	int sbytes;
	ssize_t bytes;
	struct spdk_uring_sock_group_impl *group;

	return recv(sock->fd, buf, len, MSG_DONTWAIT);
	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
	if (sbytes < 0) {
		errno = EINVAL;
		return -1;
	} else if (sbytes == 0) {
		errno = EAGAIN;
		return -1;
	}

	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);

	if (bytes == 0) {
		/* The only way this happens is if diov is 0 length */
		errno = EINVAL;
		return -1;
	}

	spdk_pipe_reader_advance(sock->recv_pipe, bytes);

	/* If we drained the pipe, take it off the level-triggered list */
	if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
		group = __uring_group_impl(sock->base.group_impl);
		TAILQ_REMOVE(&group->pending_recv, sock, link);
		sock->pending_recv = false;
	}

	return bytes;
}

static inline ssize_t
_spdk_uring_sock_read(struct spdk_uring_sock *sock)
{
	struct iovec iov[2];
	int bytes;
	struct spdk_uring_sock_group_impl *group;

	bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);

	if (bytes > 0) {
		bytes = readv(sock->fd, iov, 2);
		if (bytes > 0) {
			spdk_pipe_writer_advance(sock->recv_pipe, bytes);
			if (sock->base.group_impl) {
				group = __uring_group_impl(sock->base.group_impl);
				TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
				sock->pending_recv = true;
			}
		}
	}

	return bytes;
}

static ssize_t
spdk_uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
{
	struct spdk_uring_sock *sock = __uring_sock(_sock);
	int rc, i;
	size_t len;

	if (sock->recv_pipe == NULL) {
		return readv(sock->fd, iov, iovcnt);
	}

	len = 0;
	for (i = 0; i < iovcnt; i++) {
		len += iov[i].iov_len;
	}

	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
		/* If the user is receiving a sufficiently large amount of data,
		 * receive directly to their buffers. */
		if (len >= 1024) {
			return readv(sock->fd, iov, iovcnt);
		}

		/* Otherwise, do a big read into our pipe */
		rc = _spdk_uring_sock_read(sock);
		if (rc <= 0) {
			return rc;
		}
	}

	return spdk_uring_sock_recv_from_pipe(sock, iov, iovcnt);
}

static ssize_t
spdk_uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
{
	struct iovec iov[1];

	iov[0].iov_base = buf;
	iov[0].iov_len = len;

	return spdk_uring_sock_readv(sock, iov, 1);
}

static ssize_t
spdk_uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
{
@@ -649,7 +821,8 @@ _sock_prep_pollin(struct spdk_sock *_sock)
	struct spdk_uring_task *task = &sock->pollin_task;
	struct io_uring_sqe *sqe;

	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
	/* Do not prepare pollin event */
	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || sock->pending_recv) {
		return;
	}

@@ -663,18 +836,17 @@ _sock_prep_pollin(struct spdk_sock *_sock)
}

static int
spdk_sock_uring_group_reap(struct io_uring *ring, int max,
spdk_sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events,
			   struct spdk_sock **socks)
{
	int i, count, ret;
	struct io_uring_cqe *cqe;
	struct spdk_uring_sock *sock;
	struct spdk_uring_sock *sock, *tmp;
	struct spdk_uring_task *task;
	int status;

	count = 0;
	for (i = 0; i < max; i++) {
		ret = io_uring_peek_cqe(ring, &cqe);
		ret = io_uring_peek_cqe(&group->uring, &cqe);
		if (ret != 0) {
			break;
		}
@@ -688,10 +860,11 @@ spdk_sock_uring_group_reap(struct io_uring *ring, int max,
		sock = task->sock;
		assert(sock != NULL);
		assert(sock->group != NULL);
		assert(sock->group == group);
		sock->group->io_inflight--;
		sock->group->io_avail++;
		status = cqe->res;
		io_uring_cqe_seen(ring, cqe);
		io_uring_cqe_seen(&group->uring, cqe);

		task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE;

@@ -705,8 +878,9 @@ spdk_sock_uring_group_reap(struct io_uring *ring, int max,
		case SPDK_SOCK_TASK_POLLIN:
			if ((status & POLLIN) == POLLIN) {
				if ((socks != NULL) && (sock->base.cb_fn != NULL)) {
					socks[count] = &sock->base;
					count++;
					assert(sock->pending_recv == false);
					sock->pending_recv = true;
					TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
				}
			} else {
				SPDK_UNREACHABLE();
@@ -736,6 +910,29 @@ spdk_sock_uring_group_reap(struct io_uring *ring, int max,
		}
	}

	count = 0;
	TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) {
		if (count == max_read_events) {
			break;
		}

		socks[count++] = &sock->base;
	}

	/* Cycle the pending_recv list so that each time we poll things aren't
	 * in the same order. */
	for (i = 0; i < count; i++) {
		sock = __uring_sock(socks[i]);

		TAILQ_REMOVE(&group->pending_recv, sock, link);

		if (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
			sock->pending_recv = false;
		} else {
			TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
		}
	}

	return count;
}

@@ -928,6 +1125,8 @@ spdk_uring_sock_group_impl_create(void)
		return NULL;
	}

	TAILQ_INIT(&group_impl->pending_recv);

	return &group_impl->base;
}

@@ -953,6 +1152,7 @@ spdk_uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
				       struct spdk_sock *_sock)
{
	struct spdk_uring_sock *sock = __uring_sock(_sock);
	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);

	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
		sock->outstanding_io++;
@@ -962,6 +1162,12 @@ spdk_uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
		sock->outstanding_io++;
	}

	if ((sock->recv_pipe != NULL) &&
	    spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0) {
		TAILQ_REMOVE(&group->pending_recv, sock, link);
		sock->pending_recv = false;
	}

	if (!sock->outstanding_io) {
		sock->group = NULL;
	}
@@ -1001,8 +1207,7 @@ spdk_uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_eve
	count = 0;
	to_complete = group->io_inflight;
	if (to_complete > 0) {
		to_complete = spdk_min(to_complete, max_events);
		count = spdk_sock_uring_group_reap(&group->uring, to_complete, socks);
		count = spdk_sock_uring_group_reap(group, to_complete, max_events, socks);
	}

	return count;