Commit eb984889 authored by Ben Walker's avatar Ben Walker Committed by Tomasz Zawadzki
Browse files

sock/posix: Internally buffer reads.



Do large reads from the socket and buffer into a pipe.

Change-Id: I0a1bc5b356cb8c403048ab3b22a159332f085e6b
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/447


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
parent ea65bf61
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -107,7 +107,7 @@ DEPDIRS-accel_ioat := log ioat conf thread $(JSON_LIBS) accel
DEPDIRS-env_dpdk_rpc := log $(JSON_LIBS)

# module/sock
DEPDIRS-sock_posix := log sock
DEPDIRS-sock_posix := log sock util
DEPDIRS-sock_vpp := log sock util thread

# module/bdev
+228 −12
Original line number Diff line number Diff line
@@ -41,7 +41,9 @@
#endif

#include "spdk/log.h"
#include "spdk/pipe.h"
#include "spdk/sock.h"
#include "spdk/util.h"
#include "spdk_internal/sock.h"

#define MAX_TMPBUF 1024
@@ -60,11 +62,19 @@ struct spdk_posix_sock {

	uint32_t		sendmsg_idx;
	bool			zcopy;

	struct spdk_pipe	*recv_pipe;
	void			*recv_buf;
	int			recv_buf_sz;
	bool			pending_recv;

	TAILQ_ENTRY(spdk_posix_sock)	link;
};

struct spdk_posix_sock_group_impl {
	struct spdk_sock_group_impl	base;
	int				fd;
	TAILQ_HEAD(, spdk_posix_sock)	pending_recv;
};

static int
@@ -175,6 +185,70 @@ enum spdk_posix_sock_create_type {
	SPDK_SOCK_CREATE_CONNECT,
};

static int
spdk_posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz)
{
	uint8_t *new_buf;
	struct spdk_pipe *new_pipe;
	struct iovec siov[2];
	struct iovec diov[2];
	int sbytes;
	ssize_t bytes;

	if (sock->recv_buf_sz == sz) {
		return 0;
	}

	/* If the new size is 0, just free the pipe */
	if (sz == 0) {
		spdk_pipe_destroy(sock->recv_pipe);
		free(sock->recv_buf);
		sock->recv_pipe = NULL;
		sock->recv_buf = NULL;
		return 0;
	}

	/* Round up to next 64 byte multiple */
	new_buf = calloc(((sz + 1) >> 6) << 6, sizeof(uint8_t));
	if (!new_buf) {
		SPDK_ERRLOG("socket recv buf allocation failed\n");
		return -ENOMEM;
	}

	new_pipe = spdk_pipe_create(new_buf, sz + 1);
	if (new_pipe == NULL) {
		SPDK_ERRLOG("socket pipe allocation failed\n");
		free(new_buf);
		return -ENOMEM;
	}

	if (sock->recv_pipe != NULL) {
		/* Pull all of the data out of the old pipe */
		sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
		if (sbytes > sz) {
			/* Too much data to fit into the new pipe size */
			spdk_pipe_destroy(new_pipe);
			free(new_buf);
			return -EINVAL;
		}

		sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
		assert(sbytes == sz);

		bytes = spdk_iovcpy(siov, 2, diov, 2);
		spdk_pipe_writer_advance(new_pipe, bytes);

		spdk_pipe_destroy(sock->recv_pipe);
		free(sock->recv_buf);
	}

	sock->recv_buf_sz = sz;
	sock->recv_buf = new_buf;
	sock->recv_pipe = new_pipe;

	return 0;
}

static int
spdk_posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
{
@@ -219,8 +293,9 @@ static struct spdk_posix_sock *
_spdk_posix_sock_alloc(int fd)
{
	struct spdk_posix_sock *sock;
	int rc;
#ifdef SPDK_ZEROCOPY
	int flag, rc;
	int flag;
#endif

	sock = calloc(1, sizeof(*sock));
@@ -231,6 +306,16 @@ _spdk_posix_sock_alloc(int fd)

	sock->fd = fd;

#ifndef __aarch64__
	/* On ARM systems, this buffering does not help. Skip it. */
	/* The size of the pipe is purely derived from benchmarks. It seems to work well. */
	rc = spdk_posix_sock_alloc_pipe(sock, 8192);
	if (rc) {
		SPDK_ERRLOG("unable to allocate sufficient recvbuf\n");
		free(sock);
		return NULL;
	}
#endif

#ifdef SPDK_ZEROCOPY
	/* Try to turn on zero copy sends */
@@ -459,6 +544,8 @@ spdk_posix_sock_close(struct spdk_sock *_sock)
	 * memory. */
	close(sock->fd);

	spdk_pipe_destroy(sock->recv_pipe);
	free(sock->recv_buf);
	free(sock);

	return 0;
@@ -675,21 +762,110 @@ spdk_posix_sock_flush(struct spdk_sock *_sock)
}

static ssize_t
spdk_posix_sock_recv(struct spdk_sock *_sock, void *buf, size_t len)
spdk_posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt)
{
	struct spdk_posix_sock *sock = __posix_sock(_sock);
	struct iovec siov[2];
	int sbytes;
	ssize_t bytes;
	struct spdk_posix_sock_group_impl *group;

	sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
	if (sbytes < 0) {
		errno = EINVAL;
		return -1;
	} else if (sbytes == 0) {
		errno = EAGAIN;
		return -1;
	}

	bytes = spdk_iovcpy(siov, 2, diov, diovcnt);

	if (bytes == 0) {
		/* The only way this happens is if diov is 0 length */
		errno = EINVAL;
		return -1;
	}

	spdk_pipe_reader_advance(sock->recv_pipe, bytes);

	/* If we drained the pipe, take it off the level-triggered list */
	if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
		group = __posix_group_impl(sock->base.group_impl);
		TAILQ_REMOVE(&group->pending_recv, sock, link);
		sock->pending_recv = false;
	}

	return bytes;
}

	return recv(sock->fd, buf, len, MSG_DONTWAIT);
static inline ssize_t
_spdk_posix_sock_read(struct spdk_posix_sock *sock)
{
	struct iovec iov[2];
	int bytes;
	struct spdk_posix_sock_group_impl *group;

	bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);

	if (bytes > 0) {
		bytes = readv(sock->fd, iov, 2);
		if (bytes > 0) {
			spdk_pipe_writer_advance(sock->recv_pipe, bytes);
			if (sock->base.group_impl) {
				group = __posix_group_impl(sock->base.group_impl);
				TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
				sock->pending_recv = true;
			}
		}
	}

	return bytes;
}

static ssize_t
spdk_posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
{
	struct spdk_posix_sock *sock = __posix_sock(_sock);
	int rc, i;
	size_t len;

	if (sock->recv_pipe == NULL) {
		return readv(sock->fd, iov, iovcnt);
	}

	len = 0;
	for (i = 0; i < iovcnt; i++) {
		len += iov[i].iov_len;
	}

	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
		/* If the user is receiving a sufficiently large amount of data,
		 * receive directly to their buffers. */
		if (len >= 1024) {
			return readv(sock->fd, iov, iovcnt);
		}

		/* Otherwise, do a big read into our pipe */
		rc = _spdk_posix_sock_read(sock);
		if (rc <= 0) {
			return rc;
		}
	}

	return spdk_posix_sock_recv_from_pipe(sock, iov, iovcnt);
}

static ssize_t
spdk_posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
{
	struct iovec iov[1];

	iov[0].iov_base = buf;
	iov[0].iov_len = len;

	return spdk_posix_sock_readv(sock, iov, 1);
}

static ssize_t
spdk_posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
{
@@ -867,6 +1043,7 @@ spdk_posix_sock_group_impl_create(void)
	}

	group_impl->fd = fd;
	TAILQ_INIT(&group_impl->pending_recv);

	return &group_impl->base;
}
@@ -905,6 +1082,13 @@ spdk_posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, stru
	struct spdk_posix_sock *sock = __posix_sock(_sock);
	int rc;

	if (sock->recv_pipe != NULL) {
		if (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0) {
			TAILQ_REMOVE(&group->pending_recv, sock, link);
			sock->pending_recv = false;
		}
	}

#if defined(__linux__)
	struct epoll_event event;

@@ -934,7 +1118,8 @@ spdk_posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_eve
{
	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
	struct spdk_sock *sock, *tmp;
	int num_events, i, j, rc;
	int num_events, i, rc;
	struct spdk_posix_sock *psock, *ptmp;
#if defined(__linux__)
	struct epoll_event events[MAX_EVENTS_PER_POLL];
#elif defined(__FreeBSD__)
@@ -962,9 +1147,10 @@ spdk_posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_eve
		return -1;
	}

	for (i = 0, j = 0; i < num_events; i++) {
	for (i = 0; i < num_events; i++) {
#if defined(__linux__)
		sock = events[i].data.ptr;
		psock = __posix_sock(sock);

#ifdef SPDK_ZEROCOPY
		if (events[i].events & EPOLLERR) {
@@ -977,18 +1163,48 @@ spdk_posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_eve
			}
		}
#endif

		if (events[i].events & EPOLLIN) {
			socks[j++] = sock;
		if ((events[i].events & EPOLLIN) == 0) {
			continue;
		}

#elif defined(__FreeBSD__)
		socks[j++] = events[i].udata;
		sock = events[i].udata;
		psock = __posix_sock(sock);
#endif

		/* If the socket does not already have recv pending, add it now */
		if (!psock->pending_recv) {
			psock->pending_recv = true;
			TAILQ_INSERT_TAIL(&group->pending_recv, psock, link);
		}
	}

	num_events = 0;

	TAILQ_FOREACH_SAFE(psock, &group->pending_recv, link, ptmp) {
		if (num_events == max_events) {
			break;
		}

		socks[num_events++] = &psock->base;
	}

	/* Cycle the pending_recv list so that each time we poll things aren't
	 * in the same order. */
	for (i = 0; i < num_events; i++) {
		psock = __posix_sock(socks[i]);

		TAILQ_REMOVE(&group->pending_recv, psock, link);

		if (psock->recv_pipe != NULL || spdk_pipe_reader_bytes_available(psock->recv_pipe) == 0) {
			psock->pending_recv = false;
		} else {
			TAILQ_INSERT_TAIL(&group->pending_recv, psock, link);
		}

	}

	return j;
	return num_events;
}

static int