Commit 7ef33c86 authored by Ben Walker's avatar Ben Walker Committed by Tomasz Zawadzki
Browse files

sock/posix: Zero copy send



If available, automatically use MSG_ZEROCOPY when sending on sockets.
Storage workloads contain sufficient data transfer sizes that this is
always a performance improvement, regardless of workload.

Change-Id: I14429d78c22ad3bc036aec13c9fce6453e899c92
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/471752


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarOr Gerlitz <gerlitz.or@gmail.com>
parent a02207d7
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -70,7 +70,7 @@ struct spdk_sock_request {
	 */
	struct __sock_request_internal {
		TAILQ_ENTRY(spdk_sock_request)	link;
		unsigned int			offset;
		uint32_t			offset;
	} internal;

	int				iovcnt;
+2 −0
Original line number Diff line number Diff line
@@ -148,6 +148,8 @@ spdk_sock_request_put(struct spdk_sock *sock, struct spdk_sock_request *req, int

	TAILQ_REMOVE(&sock->pending_reqs, req, internal.link);

	req->internal.offset = 0;

	closed = sock->flags.closed;
	sock->cb_cnt++;
	req->cb_fn(req->cb_arg, err);
+141 −12
Original line number Diff line number Diff line
@@ -35,6 +35,7 @@

#if defined(__linux__)
#include <sys/epoll.h>
#include <linux/errqueue.h>
#elif defined(__FreeBSD__)
#include <sys/event.h>
#endif
@@ -49,9 +50,16 @@
#define SO_SNDBUF_SIZE (2 * 1024 * 1024)
#define IOV_BATCH_SIZE 64

#if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
#define SPDK_ZEROCOPY
#endif

struct spdk_posix_sock {
	struct spdk_sock	base;
	int			fd;

	uint32_t		sendmsg_idx;
	bool			zcopy;
};

struct spdk_posix_sock_group_impl {
@@ -212,6 +220,9 @@ _spdk_posix_sock_alloc(int fd)
{
	struct spdk_posix_sock *sock;
	int rc;
#ifdef SPDK_ZEROCOPY
	int flag;
#endif

	sock = calloc(1, sizeof(*sock));
	if (sock == NULL) {
@@ -231,6 +242,15 @@ _spdk_posix_sock_alloc(int fd)
		/* Not fatal */
	}

#ifdef SPDK_ZEROCOPY
	/* Try to turn on zero copy sends */
	flag = 1;
	rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
	if (rc == 0) {
		sock->zcopy = true;
	}
#endif

	return sock;
}

@@ -365,6 +385,11 @@ retry:
		return NULL;
	}

	/* Disable zero copy for client sockets until support is added */
	if (type == SPDK_SOCK_CREATE_CONNECT) {
		sock->zcopy = false;
	}

	return &sock->base;
}

@@ -424,6 +449,8 @@ spdk_posix_sock_close(struct spdk_sock *_sock)
{
	struct spdk_posix_sock *sock = __posix_sock(_sock);

	assert(TAILQ_EMPTY(&_sock->pending_reqs));

	/* If the socket fails to close, the best choice is to
	 * leak the fd but continue to free the rest of the sock
	 * memory. */
@@ -434,6 +461,79 @@ spdk_posix_sock_close(struct spdk_sock *_sock)
	return 0;
}

#ifdef SPDK_ZEROCOPY
static void
_sock_check_zcopy(struct spdk_sock *sock)
{
	struct spdk_posix_sock *psock = __posix_sock(sock);
	struct msghdr msgh = {};
	uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)];
	ssize_t rc;
	struct sock_extended_err *serr;
	struct cmsghdr *cm;
	uint32_t idx;
	struct spdk_sock_request *req, *treq;
	bool found;

	msgh.msg_control = buf;
	msgh.msg_controllen = sizeof(buf);

	while (true) {
		rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE);

		if (rc < 0) {
			if (errno == EWOULDBLOCK || errno == EAGAIN) {
				return;
			}

			if (!TAILQ_EMPTY(&sock->pending_reqs)) {
				SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n");
			} else {
				SPDK_WARNLOG("Recvmsg yielded an error!\n");
			}
			return;
		}

		cm = CMSG_FIRSTHDR(&msgh);
		if (cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) {
			SPDK_WARNLOG("Unexpected cmsg level or type!\n");
			return;
		}

		serr = (struct sock_extended_err *)CMSG_DATA(cm);
		if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
			SPDK_WARNLOG("Unexpected extended error origin\n");
			return;
		}

		/* Most of the time, the pending_reqs array is in the exact
		 * order we need such that all of the requests to complete are
		 * in order, in the front. It is guaranteed that all requests
		 * belonging to the same sendmsg call are sequential, so once
		 * we encounter one match we can stop looping as soon as a
		 * non-match is found.
		 */
		for (idx = serr->ee_info; idx <= serr->ee_data; idx++) {
			found = false;
			TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) {
				if (req->internal.offset == idx) {
					found = true;

					rc = spdk_sock_request_put(sock, req, 0);
					if (rc < 0) {
						return;
					}

				} else if (found) {
					break;
				}
			}

		}
	}
}
#endif

static int
_sock_flush(struct spdk_sock *sock)
{
@@ -492,7 +592,14 @@ _sock_flush(struct spdk_sock *sock)
	/* Perform the vectored write */
	msg.msg_iov = iovs;
	msg.msg_iovlen = iovcnt;
#ifdef SPDK_ZEROCOPY
	if (psock->zcopy) {
		flags = MSG_ZEROCOPY;
	} else
#endif
	{
		flags = 0;
	}
	rc = sendmsg(psock->fd, &msg, flags);
	if (rc <= 0) {
		if (errno == EAGAIN || errno == EWOULDBLOCK) {
@@ -501,6 +608,8 @@ _sock_flush(struct spdk_sock *sock)
		return rc;
	}

	psock->sendmsg_idx++;

	/* Consume the requests that were actually written */
	req = TAILQ_FIRST(&sock->queued_reqs);
	while (req) {
@@ -528,14 +637,23 @@ _sock_flush(struct spdk_sock *sock)
		}

		/* Handled a full request. */
		req->internal.offset = 0;
		spdk_sock_request_pend(sock, req);

		if (!psock->zcopy) {
			/* The sendmsg syscall above isn't currently asynchronous,
			* so it's already done. */
			retval = spdk_sock_request_put(sock, req, 0);
			if (retval) {
				break;
			}
		} else {
			/* Re-use the offset field to hold the sendmsg call index. The
			 * index is 0 based, so subtract one here because we've already
			 * incremented above. */
			req->internal.offset = psock->sendmsg_idx - 1;
		}

		if (rc == 0 || retval) {
		if (rc == 0) {
			break;
		}

@@ -759,7 +877,8 @@ spdk_posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct
	struct epoll_event event;

	memset(&event, 0, sizeof(event));
	event.events = EPOLLIN;
	/* EPOLLERR is always on even if we don't set it, but be explicit for clarity */
	event.events = EPOLLIN | EPOLLERR;
	event.data.ptr = sock;

	rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event);
@@ -810,7 +929,7 @@ spdk_posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_eve
{
	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
	struct spdk_sock *sock, *tmp;
	int num_events, i, rc;
	int num_events, i, j, rc;
#if defined(__linux__)
	struct epoll_event events[MAX_EVENTS_PER_POLL];
#elif defined(__FreeBSD__)
@@ -838,15 +957,25 @@ spdk_posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_eve
		return -1;
	}

	for (i = 0; i < num_events; i++) {
	for (i = 0, j = 0; i < num_events; i++) {
#if defined(__linux__)
		socks[i] = events[i].data.ptr;
#ifdef SPDK_ZEROCOPY
		if (events[i].events & EPOLLERR) {
			_sock_check_zcopy(events[i].data.ptr);
		}
#endif

		if (events[i].events & EPOLLIN) {
			socks[j++] = events[i].data.ptr;
		}

#elif defined(__FreeBSD__)
		socks[i] = events[i].udata;
		socks[j++] = events[i].udata;
#endif

	}

	return num_events;
	return j;
}

static int
+6 −0
Original line number Diff line number Diff line
@@ -785,6 +785,12 @@ _sock_close(const char *ip, int port, char *impl_name)
	/* Poll the socket so the writev_async's send. The first one's
	 * callback will close the socket. */
	spdk_sock_group_poll(group);
	if (ctx.called == false) {
		/* Sometimes the zerocopy completion isn't posted immediately. Delay slightly
		* and poll one more time. */
		usleep(1000);
		spdk_sock_group_poll(group);
	}
	CU_ASSERT(ctx.called == true);
	CU_ASSERT(cb_arg2 == true);