Commit 34ab803d authored by Ziye Yang's avatar Ziye Yang Committed by Tomasz Zawadzki
Browse files

sock/uring: Add the MSG zero copy feature



This patch tries to add MSG zero copy feature.
Though io_uring supports buffer registration, it only
support io_uring_prep_write_fixed. It means only one
registered buffer can be used. It does not satisfy our
current usage mode.

According to this situation, we still use the MSG_ZEROCOPY
flags in io_uring_prep_sendmsg.

Furthermore, this new feature is dependent on the kernel
version, The currently verified version is
kernel 5.12 rc3. So it is not enabled in the default manner.

For example, if you want to use it on the target side, you can
use the following rpc to configure:

./scripts/rpc.py sock_impl_set_options -i uring --enable-zerocopy-send-server

Change-Id: Ie7bb828f466362add94891989ddf0950dccd9e80
Signed-off-by: default avatarZiye Yang <ziye.yang@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/957


Community-CI: Broadcom CI <spdk-ci.pdl@broadcom.com>
Community-CI: Mellanox Build Bot
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarAleksey Marchuk <alexeymar@mellanox.com>
Reviewed-by: default avatarBen Walker <benjamin.walker@intel.com>
parent beb6ffd2
Loading
Loading
Loading
Loading
+185 −13
Original line number Diff line number Diff line
@@ -34,6 +34,7 @@
#include "spdk/stdinc.h"
#include "spdk/config.h"

#include <linux/errqueue.h>
#include <sys/epoll.h>
#include <liburing.h>

@@ -51,13 +52,19 @@
#define MAX_TMPBUF 1024
#define PORTNUMLEN 32
#define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096
#define SPDK_SOCK_CMG_INFO_SIZE (sizeof(struct cmsghdr) + sizeof(struct sock_extended_err))

enum spdk_sock_task_type {
	SPDK_SOCK_TASK_POLLIN = 0,
	SPDK_SOCK_TASK_RECV,
	SPDK_SOCK_TASK_WRITE,
	SPDK_SOCK_TASK_CANCEL,
};

#if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
#define SPDK_ZEROCOPY
#endif

enum spdk_uring_sock_task_status {
	SPDK_URING_SOCK_TASK_NOT_IN_USE = 0,
	SPDK_URING_SOCK_TASK_IN_PROCESS,
@@ -77,16 +84,21 @@ struct spdk_uring_task {
struct spdk_uring_sock {
	struct spdk_sock			base;
	int					fd;
	uint32_t				sendmsg_idx;
	struct spdk_uring_sock_group_impl	*group;
	struct spdk_uring_task			write_task;
	struct spdk_uring_task			recv_task;
	struct spdk_uring_task			pollin_task;
	struct spdk_uring_task			cancel_task;
	struct spdk_pipe			*recv_pipe;
	void					*recv_buf;
	int					recv_buf_sz;
	bool					zcopy;
	bool					pending_recv;
	int					zcopy_send_flags;
	int					connection_status;
	int					placement_id;
	uint8_t					buf[SPDK_SOCK_CMG_INFO_SIZE];
	TAILQ_ENTRY(spdk_uring_sock)		link;
};

@@ -107,6 +119,8 @@ static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = {
	.enable_recv_pipe = true,
	.enable_quickack = false,
	.enable_placement_id = PLACEMENT_NONE,
	.enable_zerocopy_send_server = false,
	.enable_zerocopy_send_client = false,
};

static struct spdk_sock_map g_map = {
@@ -346,7 +360,7 @@ uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
}

static struct spdk_uring_sock *
uring_sock_alloc(int fd)
uring_sock_alloc(int fd, bool enable_zero_copy)
{
	struct spdk_uring_sock *sock;
#if defined(__linux__)
@@ -374,6 +388,18 @@ uring_sock_alloc(int fd)

	spdk_sock_get_placement_id(sock->fd, g_spdk_uring_sock_impl_opts.enable_placement_id,
				   &sock->placement_id);
#ifdef SPDK_ZEROCOPY
	/* Try to turn on zero copy sends */
	flag = 1;

	if (enable_zero_copy) {
		rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
		if (rc == 0) {
			sock->zcopy = true;
			sock->zcopy_send_flags = MSG_ZEROCOPY;
		}
	}
#endif
#endif

	return sock;
@@ -392,6 +418,10 @@ uring_sock_create(const char *ip, int port,
	int fd, flag;
	int val = 1;
	int rc;
	bool enable_zcopy_impl_opts = false;
	bool enable_zcopy_user_opts = true;

	assert(opts != NULL);

	if (ip == NULL) {
		return NULL;
@@ -506,6 +536,7 @@ retry:
				fd = -1;
				break;
			}
			enable_zcopy_impl_opts = g_spdk_uring_sock_impl_opts.enable_zerocopy_send_server;
		} else if (type == SPDK_SOCK_CREATE_CONNECT) {
			rc = connect(fd, res->ai_addr, res->ai_addrlen);
			if (rc != 0) {
@@ -515,6 +546,8 @@ retry:
				fd = -1;
				continue;
			}

			enable_zcopy_impl_opts = g_spdk_uring_sock_impl_opts.enable_zerocopy_send_client;
		}

		flag = fcntl(fd, F_GETFL);
@@ -532,7 +565,8 @@ retry:
		return NULL;
	}

	sock = uring_sock_alloc(fd);
	enable_zcopy_user_opts = opts->zcopy;
	sock = uring_sock_alloc(fd, enable_zcopy_user_opts && enable_zcopy_impl_opts);
	if (sock == NULL) {
		SPDK_ERRLOG("sock allocation failed\n");
		close(fd);
@@ -595,7 +629,7 @@ uring_sock_accept(struct spdk_sock *_sock)
	}
#endif

	new_sock = uring_sock_alloc(fd);
	new_sock = uring_sock_alloc(fd, sock->zcopy);
	if (new_sock == NULL) {
		close(fd);
		return NULL;
@@ -745,11 +779,22 @@ uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
static int
sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc)
{
	struct spdk_uring_sock *sock = __uring_sock(_sock);
	struct spdk_sock_request *req;
	int i, retval;
	unsigned int offset;
	size_t len;

	if (sock->zcopy) {
		/* Handling overflow case, because we use psock->sendmsg_idx - 1 for the
		 * req->internal.offset, so sendmsg_idx should not be zero */
		if (spdk_unlikely(sock->sendmsg_idx == UINT32_MAX)) {
			sock->sendmsg_idx = 1;
		} else {
			sock->sendmsg_idx++;
		}
	}

	/* Consume the requests that were actually written */
	req = TAILQ_FIRST(&_sock->queued_reqs);
	while (req) {
@@ -779,10 +824,17 @@ sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc)
		/* Handled a full request. */
		spdk_sock_request_pend(_sock, req);

		if (!sock->zcopy) {
			retval = spdk_sock_request_put(_sock, req, 0);
			if (retval) {
				return retval;
			}
		} else {
			/* Re-use the offset field to hold the sendmsg call index. The
			 * index is 0 based, so subtract one here because we've already
			 * incremented above. */
			req->internal.offset = sock->sendmsg_idx - 1;
		}

		if (rc == 0) {
			break;
@@ -794,6 +846,89 @@ sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc)
	return 0;
}

#ifdef SPDK_ZEROCOPY
static int
_sock_check_zcopy(struct spdk_sock *_sock, int status)
{
	struct spdk_uring_sock *sock = __uring_sock(_sock);
	ssize_t rc;
	struct sock_extended_err *serr;
	struct cmsghdr *cm;
	uint32_t idx;
	struct spdk_sock_request *req, *treq;
	bool found;

	assert(sock->zcopy == true);
	if (spdk_unlikely(status) < 0) {
		if (!TAILQ_EMPTY(&_sock->pending_reqs)) {
			SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries, status =%d\n",
				    status);
		} else {
			SPDK_WARNLOG("Recvmsg yielded an error!\n");
		}
		return 0;
	}

	cm = CMSG_FIRSTHDR(&sock->recv_task.msg);
	if (cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) {
		SPDK_WARNLOG("Unexpected cmsg level or type!\n");
		return 0;
	}

	serr = (struct sock_extended_err *)CMSG_DATA(cm);
	if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
		SPDK_WARNLOG("Unexpected extended error origin\n");
		return 0;
	}

	/* Most of the time, the pending_reqs array is in the exact
	 * order we need such that all of the requests to complete are
	 * in order, in the front. It is guaranteed that all requests
	 * belonging to the same sendmsg call are sequential, so once
	 * we encounter one match we can stop looping as soon as a
	 * non-match is found.
	 */
	for (idx = serr->ee_info; idx <= serr->ee_data; idx++) {
		found = false;
		TAILQ_FOREACH_SAFE(req, &_sock->pending_reqs, internal.link, treq) {
			if (req->internal.offset == idx) {
				found = true;
				rc = spdk_sock_request_put(_sock, req, 0);
				if (rc < 0) {
					return rc;
				}

			} else if (found) {
				break;
			}
		}
	}

	return 0;
}

static void
_sock_prep_recv(struct spdk_sock *_sock)
{
	struct spdk_uring_sock *sock = __uring_sock(_sock);
	struct spdk_uring_task *task = &sock->recv_task;
	struct io_uring_sqe *sqe;

	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
		return;
	}

	assert(sock->group != NULL);
	sock->group->io_queued++;

	sqe = io_uring_get_sqe(&sock->group->uring);
	io_uring_prep_recvmsg(sqe, sock->fd, &task->msg, MSG_ERRQUEUE);
	io_uring_sqe_set_data(sqe, task);
	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
}

#endif

static void
_sock_flush(struct spdk_sock *_sock)
{
@@ -801,6 +936,7 @@ _sock_flush(struct spdk_sock *_sock)
	struct spdk_uring_task *task = &sock->write_task;
	uint32_t iovcnt;
	struct io_uring_sqe *sqe;
	int flags = MSG_DONTWAIT | sock->zcopy_send_flags;

	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
		return;
@@ -819,7 +955,7 @@ _sock_flush(struct spdk_sock *_sock)
	sock->group->io_queued++;

	sqe = io_uring_get_sqe(&sock->group->uring);
	io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, 0);
	io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, flags);
	io_uring_sqe_set_data(sqe, task);
	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
}
@@ -832,7 +968,7 @@ _sock_prep_pollin(struct spdk_sock *_sock)
	struct io_uring_sqe *sqe;

	/* Do not prepare pollin event */
	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || sock->pending_recv) {
	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || (sock->pending_recv && !sock->zcopy)) {
		return;
	}

@@ -840,7 +976,7 @@ _sock_prep_pollin(struct spdk_sock *_sock)
	sock->group->io_queued++;

	sqe = io_uring_get_sqe(&sock->group->uring);
	io_uring_prep_poll_add(sqe, sock->fd, POLLIN);
	io_uring_prep_poll_add(sqe, sock->fd, POLLIN | POLLERR);
	io_uring_sqe_set_data(sqe, task);
	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
}
@@ -899,13 +1035,16 @@ sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max
		task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE;

		if (spdk_unlikely(status <= 0)) {
			if (status == -EAGAIN || status == -EWOULDBLOCK) {
			if (status == -EAGAIN || status == -EWOULDBLOCK || (status == -ENOBUFS && sock->zcopy)) {
				continue;
			}
		}

		switch (task->type) {
		case SPDK_SOCK_TASK_POLLIN:
			if ((status & POLLERR) == POLLERR) {
				_sock_prep_recv(&sock->base);
			}
			if ((status & POLLIN) == POLLIN) {
				if (sock->base.cb_fn != NULL &&
				    sock->pending_recv == false) {
@@ -915,7 +1054,6 @@ sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max
			}
			break;
		case SPDK_SOCK_TASK_WRITE:
			assert(TAILQ_EMPTY(&sock->base.pending_reqs));
			task->last_req = NULL;
			task->iov_cnt = 0;
			if (spdk_unlikely(status) < 0) {
@@ -926,6 +1064,15 @@ sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max
			}

			break;
#ifdef SPDK_ZEROCOPY
		case SPDK_SOCK_TASK_RECV:
			if (spdk_unlikely(status == -ECANCELED)) {
				sock->connection_status = status;
				break;
			}
			_sock_check_zcopy(&sock->base, status);
			break;
#endif
		case SPDK_SOCK_TASK_CANCEL:
			/* Do nothing */
			break;
@@ -1002,6 +1149,7 @@ _sock_flush_client(struct spdk_sock *_sock)
	struct iovec iovs[IOV_BATCH_SIZE];
	int iovcnt;
	ssize_t rc;
	int flags = sock->zcopy_send_flags;

	/* Can't flush from within a callback or we end up with recursive calls */
	if (_sock->cb_cnt > 0) {
@@ -1017,7 +1165,7 @@ _sock_flush_client(struct spdk_sock *_sock)
	/* Perform the vectored write */
	msg.msg_iov = iovs;
	msg.msg_iovlen = iovcnt;
	rc = sendmsg(sock->fd, &msg, 0);
	rc = sendmsg(sock->fd, &msg, flags);
	if (rc <= 0) {
		if (errno == EAGAIN || errno == EWOULDBLOCK) {
			return 0;
@@ -1027,6 +1175,12 @@ _sock_flush_client(struct spdk_sock *_sock)

	sock_complete_reqs(_sock, rc);

#ifdef SPDK_ZEROCOPY
	if (sock->zcopy && !TAILQ_EMPTY(&_sock->pending_reqs)) {
		_sock_check_zcopy(_sock, 0);
	}
#endif

	return 0;
}

@@ -1192,6 +1346,11 @@ uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group,
	sock->pollin_task.sock = sock;
	sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN;

	sock->recv_task.sock = sock;
	sock->recv_task.type = SPDK_SOCK_TASK_RECV;
	sock->recv_task.msg.msg_control = sock->buf;
	sock->recv_task.msg.msg_controllen = sizeof(sock->buf);

	sock->cancel_task.sock = sock;
	sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL;

@@ -1276,6 +1435,16 @@ uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
		}
	}

	if (sock->recv_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
		_sock_prep_cancel_task(_sock, &sock->recv_task);
		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
		 * currently can use a while loop here. */
		while ((sock->recv_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
			uring_sock_group_impl_poll(_group, 32, NULL);
		}
	}

	if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
		_sock_prep_cancel_task(_sock, &sock->pollin_task);
		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
@@ -1285,7 +1454,6 @@ uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
			uring_sock_group_impl_poll(_group, 32, NULL);
		}
	}

	if (sock->pending_recv) {
		TAILQ_REMOVE(&group->pending_recv, sock, link);
		sock->pending_recv = false;
@@ -1344,6 +1512,8 @@ uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
	GET_FIELD(enable_recv_pipe);
	GET_FIELD(enable_quickack);
	GET_FIELD(enable_placement_id);
	GET_FIELD(enable_zerocopy_send_server);
	GET_FIELD(enable_zerocopy_send_client);

#undef GET_FIELD
#undef FIELD_OK
@@ -1373,6 +1543,8 @@ uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
	SET_FIELD(enable_recv_pipe);
	SET_FIELD(enable_quickack);
	SET_FIELD(enable_placement_id);
	SET_FIELD(enable_zerocopy_send_server);
	SET_FIELD(enable_zerocopy_send_client);

#undef SET_FIELD
#undef FIELD_OK