Commit 257bd5f6 authored by Ben Walker's avatar Ben Walker Committed by Konrad Sztyber
Browse files

sock/uring: Implement recv next



This is a moderately efficient implementation that submits asynchronous
reads using pooled memory in the kernel.

Future implementations can use fixed buffers and multi-shot recv, plus
true kernel zero copy support, to improve.

Change-Id: Ife3ab3c3d7236231ba5329b0d18c6ba0e44a08f8
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/16108


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Community-CI: Mellanox Build Bot
Reviewed-by: default avatarKonrad Sztyber <konrad.sztyber@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
parent 7a50a6bc
Loading
Loading
Loading
Loading
+274 −10
Original line number Diff line number Diff line
@@ -72,6 +72,8 @@ struct spdk_uring_sock {
	int					fd;
	uint32_t				sendmsg_idx;
	struct spdk_uring_sock_group_impl	*group;
	STAILQ_HEAD(, spdk_uring_buf_tracker)	recv_stream;
	size_t					recv_offset;
	struct spdk_uring_task			write_task;
	struct spdk_uring_task			errqueue_task;
	struct spdk_uring_task			read_task;
@@ -91,6 +93,15 @@ struct spdk_uring_sock {

TAILQ_HEAD(pending_recv_list, spdk_uring_sock);

struct spdk_uring_buf_tracker {
	void					*buf;
	size_t					buflen;
	size_t					len;
	void					*ctx;
	int					id;
	STAILQ_ENTRY(spdk_uring_buf_tracker)	link;
};

struct spdk_uring_sock_group_impl {
	struct spdk_sock_group_impl		base;
	struct io_uring				uring;
@@ -98,6 +109,11 @@ struct spdk_uring_sock_group_impl {
	uint32_t				io_queued;
	uint32_t				io_avail;
	struct pending_recv_list		pending_recv;

	struct io_uring_buf_ring		*buf_ring;
	uint32_t				buf_ring_count;
	struct spdk_uring_buf_tracker		*trackers;
	STAILQ_HEAD(, spdk_uring_buf_tracker)	free_trackers;
};

static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = {
@@ -428,6 +444,8 @@ uring_sock_alloc(int fd, struct spdk_sock_impl_opts *impl_opts, bool enable_zero
	sock->fd = fd;
	memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts));

	STAILQ_INIT(&sock->recv_stream);

#if defined(__linux__)
	flag = 1;

@@ -814,16 +832,127 @@ static int
uring_sock_recv_next(struct spdk_sock *_sock, void **_buf, void **ctx)
{
	struct spdk_uring_sock *sock = __uring_sock(_sock);
	struct spdk_uring_sock_group_impl *group;
	struct spdk_uring_buf_tracker *tr;

	if (sock->connection_status < 0) {
		errno = -sock->connection_status;
		return -1;
	}

	if (sock->recv_pipe != NULL) {
		errno = ENOTSUP;
		return -1;
	}

	group = __uring_group_impl(_sock->group_impl);

	tr = STAILQ_FIRST(&sock->recv_stream);
	if (tr == NULL) {
		if (sock->group->buf_ring_count > 0) {
			/* There are buffers posted, but data hasn't arrived. */
			errno = EAGAIN;
		} else {
			/* There are no buffers posted, so this won't ever
			 * make forward progress. */
			errno = ENOBUFS;
		}
		return -1;
	}
	assert(sock->pending_recv == true);
	assert(tr->buf != NULL);

	*_buf = tr->buf + sock->recv_offset;
	*ctx = tr->ctx;

	STAILQ_REMOVE_HEAD(&sock->recv_stream, link);
	STAILQ_INSERT_HEAD(&group->free_trackers, tr, link);

	if (STAILQ_EMPTY(&sock->recv_stream)) {
		sock->pending_recv = false;
		TAILQ_REMOVE(&group->pending_recv, sock, link);
	}

	return tr->len - sock->recv_offset;
}

static ssize_t
uring_sock_readv_no_pipe(struct spdk_sock *_sock, struct iovec *iovs, int iovcnt)
{
	struct spdk_uring_sock *sock = __uring_sock(_sock);
	struct spdk_uring_buf_tracker *tr;
	struct iovec iov;
	ssize_t total, len;
	int i;

	if (sock->connection_status < 0) {
		errno = -sock->connection_status;
		return -1;
	}

	errno = -ENOTSUP;
	if (_sock->group_impl == NULL) {
		/* If not in a group just read from the socket the regular way. */
		return sock_readv(sock->fd, iovs, iovcnt);
	}

	if (STAILQ_EMPTY(&sock->recv_stream)) {
		if (sock->group->buf_ring_count == 0) {
			/* If the user hasn't posted any buffers, read from the socket
			 * directly. */

			if (sock->pending_recv) {
				sock->pending_recv = false;
				TAILQ_REMOVE(&(__uring_group_impl(_sock->group_impl))->pending_recv, sock, link);
			}

			return sock_readv(sock->fd, iovs, iovcnt);
		}

		errno = EAGAIN;
		return -1;
	}

	total = 0;
	for (i = 0; i < iovcnt; i++) {
		/* Copy to stack so we can change it */
		iov = iovs[i];

		tr = STAILQ_FIRST(&sock->recv_stream);
		while (tr != NULL) {
			len = spdk_min(iov.iov_len, tr->len - sock->recv_offset);
			memcpy(iov.iov_base, tr->buf + sock->recv_offset, len);

			total += len;
			sock->recv_offset += len;
			iov.iov_base += len;
			iov.iov_len -= len;

			if (sock->recv_offset == tr->len) {
				sock->recv_offset = 0;
				STAILQ_REMOVE_HEAD(&sock->recv_stream, link);
				STAILQ_INSERT_HEAD(&sock->group->free_trackers, tr, link);
				spdk_sock_group_provide_buf(sock->group->base.group, tr->buf, tr->buflen, tr->ctx);
				tr = STAILQ_FIRST(&sock->recv_stream);
			}

			if (iov.iov_len == 0) {
				break;
			}
		}
	}

	if (STAILQ_EMPTY(&sock->recv_stream)) {
		struct spdk_uring_sock_group_impl *group;

		group = __uring_group_impl(_sock->group_impl);
		sock->pending_recv = false;
		TAILQ_REMOVE(&group->pending_recv, sock, link);
	}

	assert(total > 0);
	return total;
}

static ssize_t
uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
{
@@ -837,7 +966,7 @@ uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
	}

	if (sock->recv_pipe == NULL) {
		return sock_readv(sock->fd, iov, iovcnt);
		return uring_sock_readv_no_pipe(_sock, iov, iovcnt);
	}

	len = 0;
@@ -1131,9 +1260,6 @@ _sock_prep_read(struct spdk_sock *_sock)

	sqe = io_uring_get_sqe(&sock->group->uring);
	io_uring_prep_recv(sqe, sock->fd, NULL, URING_MAX_RECV_SIZE, 0);
	/* Because we're using buffer select but have no logic to actually
	 * post buffers, the recv will complete with ENOBUFS whenever there
	 * is data on the socket, turning it into a POLLIN operation. */
	sqe->buf_group = URING_BUF_GROUP_ID;
	sqe->flags |= IOSQE_BUFFER_SELECT;
	io_uring_sqe_set_data(sqe, task);
@@ -1168,7 +1294,7 @@ sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max
	struct io_uring_cqe *cqe;
	struct spdk_uring_sock *sock, *tmp;
	struct spdk_uring_task *task;
	int status;
	int status, bid, flags;
	bool is_zcopy;

	for (i = 0; i < max; i++) {
@@ -1190,6 +1316,7 @@ sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max
		sock->group->io_inflight--;
		sock->group->io_avail++;
		status = cqe->res;
		flags = cqe->flags;
		io_uring_cqe_seen(&group->uring, cqe);

		task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE;
@@ -1203,7 +1330,8 @@ sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max
			} else if (status == -ECANCELED) {
				continue;
			} else if (status == -ENOBUFS) {
				/* There's data in the socket. */
				/* There's data in the socket but the user hasn't provided any buffers.
				 * We need to notify the user that the socket has data pending. */
				if (sock->base.cb_fn != NULL &&
				    sock->pending_recv == false) {
					sock->pending_recv = true;
@@ -1222,8 +1350,28 @@ sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max
					TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
				}
			} else {
				/* This shouldn't happen. We've never provided any buffers. */
				assert(false);
				struct spdk_uring_buf_tracker *tracker;

				assert((flags & IORING_CQE_F_BUFFER) != 0);

				bid = flags >> IORING_CQE_BUFFER_SHIFT;
				tracker = &group->trackers[bid];

				assert(tracker->buf != NULL);
				assert(tracker->len != 0);

				/* Append this data to the stream */
				tracker->len = status;
				STAILQ_INSERT_TAIL(&sock->recv_stream, tracker, link);
				assert(group->buf_ring_count > 0);
				group->buf_ring_count--;

				if (sock->base.cb_fn != NULL &&
				    sock->pending_recv == false) {
					sock->pending_recv = true;
					TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
				}

				_sock_prep_read(&sock->base);
			}
			break;
@@ -1480,6 +1628,68 @@ uring_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_grou
	return NULL;
}

static int
uring_sock_group_impl_buf_pool_free(struct spdk_uring_sock_group_impl *group_impl)
{
	if (group_impl->buf_ring) {
		io_uring_unregister_buf_ring(&group_impl->uring, URING_BUF_GROUP_ID);
		free(group_impl->buf_ring);
	}

	free(group_impl->trackers);

	return 0;
}

static int
uring_sock_group_impl_buf_pool_alloc(struct spdk_uring_sock_group_impl *group_impl)
{
	struct io_uring_buf_reg buf_reg = {};
	struct io_uring_buf_ring *buf_ring;
	int i, rc;

	rc = posix_memalign((void **)&buf_ring, 0x1000, URING_BUF_POOL_SIZE * sizeof(struct io_uring_buf));
	if (rc != 0) {
		/* posix_memalign returns positive errno values */
		return -rc;
	}

	buf_reg.ring_addr = (unsigned long long)buf_ring;
	buf_reg.ring_entries = URING_BUF_POOL_SIZE;
	buf_reg.bgid = URING_BUF_GROUP_ID;

	rc = io_uring_register_buf_ring(&group_impl->uring, &buf_reg, 0);
	if (rc != 0) {
		free(buf_ring);
		return rc;
	}

	group_impl->buf_ring = buf_ring;
	io_uring_buf_ring_init(group_impl->buf_ring);
	group_impl->buf_ring_count = 0;

	group_impl->trackers = calloc(URING_BUF_POOL_SIZE, sizeof(struct spdk_uring_buf_tracker));
	if (group_impl->trackers == NULL) {
		uring_sock_group_impl_buf_pool_free(group_impl);
		return -ENOMEM;
	}

	STAILQ_INIT(&group_impl->free_trackers);

	for (i = 0; i < URING_BUF_POOL_SIZE; i++) {
		struct spdk_uring_buf_tracker *tracker = &group_impl->trackers[i];

		tracker->buf = NULL;
		tracker->len = 0;
		tracker->ctx = NULL;
		tracker->id = i;

		STAILQ_INSERT_TAIL(&group_impl->free_trackers, tracker, link);
	}

	return 0;
}

static struct spdk_sock_group_impl *
uring_sock_group_impl_create(void)
{
@@ -1501,6 +1711,14 @@ uring_sock_group_impl_create(void)

	TAILQ_INIT(&group_impl->pending_recv);

	if (uring_sock_group_impl_buf_pool_alloc(group_impl) < 0) {
		SPDK_ERRLOG("Failed to create buffer ring. Your kernel is likely not new enough. "
			    "Please switch to the POSIX sock implementation instead.\n");
		io_uring_queue_exit(&group_impl->uring);
		free(group_impl);
		return NULL;
	}

	if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
		spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base);
	}
@@ -1558,6 +1776,42 @@ uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group,
	return 0;
}

static void
uring_sock_group_populate_buf_ring(struct spdk_uring_sock_group_impl *group)
{
	struct spdk_uring_buf_tracker *tracker;
	int count, mask;

	if (g_spdk_uring_sock_impl_opts.enable_recv_pipe) {
		/* If recv_pipe is enabled, we do not post buffers. */
		return;
	}

	/* Try to re-populate the io_uring's buffer pool using user-provided buffers */
	tracker = STAILQ_FIRST(&group->free_trackers);
	count = 0;
	mask = io_uring_buf_ring_mask(URING_BUF_POOL_SIZE);
	while (tracker != NULL) {
		tracker->buflen = spdk_sock_group_get_buf(group->base.group, &tracker->buf, &tracker->ctx);
		if (tracker->buflen == 0) {
			break;
		}

		assert(tracker->buf != NULL);
		STAILQ_REMOVE_HEAD(&group->free_trackers, link);
		assert(STAILQ_FIRST(&group->free_trackers) != tracker);

		io_uring_buf_ring_add(group->buf_ring, tracker->buf, tracker->buflen, tracker->id, mask, count);
		count++;
		tracker = STAILQ_FIRST(&group->free_trackers);
	}

	if (count > 0) {
		group->buf_ring_count += count;
		io_uring_buf_ring_advance(group->buf_ring, count);
	}
}

static int
uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
			   struct spdk_sock **socks)
@@ -1578,6 +1832,9 @@ uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
		}
	}

	/* Try to re-populate the io_uring's buffer pool using user-provided buffers */
	uring_sock_group_populate_buf_ring(group);

	to_submit = group->io_queued;

	/* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */
@@ -1652,6 +1909,11 @@ uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
	}
	assert(sock->pending_recv == false);

	/* We have no way to handle this case. We could let the user read this
	 * buffer, but the buffer came from a group and we have lost the association
	 * to that so we couldn't release it. */
	assert(STAILQ_EMPTY(&sock->recv_stream));

	if (sock->placement_id != -1) {
		spdk_sock_map_release(&g_map, sock->placement_id);
	}
@@ -1673,6 +1935,8 @@ uring_sock_group_impl_close(struct spdk_sock_group_impl *_group)
	assert(group->io_inflight == 0);
	assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH);

	uring_sock_group_impl_buf_pool_free(group);

	io_uring_queue_exit(&group->uring);

	if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
+4 −0
Original line number Diff line number Diff line
@@ -29,6 +29,10 @@ DEFINE_STUB(__io_uring_get_cqe, int, (struct io_uring *ring, struct io_uring_cqe
DEFINE_STUB(io_uring_submit, int, (struct io_uring *ring), 0);
DEFINE_STUB(io_uring_queue_init, int, (unsigned entries, struct io_uring *ring, unsigned flags), 0);
DEFINE_STUB_V(io_uring_queue_exit, (struct io_uring *ring));
DEFINE_STUB(spdk_sock_group_provide_buf, int, (struct spdk_sock_group *group, void *buf,
		size_t len, void *ctx), 0);
DEFINE_STUB(spdk_sock_group_get_buf, size_t, (struct spdk_sock_group *group, void **buf,
		void **ctx), 0);

static void
_req_cb(void *cb_arg, int len)