Commit 784b3b89 authored by Ben Walker's avatar Ben Walker Committed by Jim Harris
Browse files

sock: Add a zero copy receive interface



This is not implemented by any sock module just yet.

The basic concept is that the user provides buffers to a socket group.
These get filled in with the next portions of the stream on each socket.
The user then calls spdk_sock_recv_next() to get the pointer to the
buffer holding the next part of the stream. When the user is done, they
can put the buffer back to the group to be used again.

The provided buffers are held in a pool on the socket group.
Implementations can request a buffer from the pool or set up
a notification any time a buffer is returned.

Change-Id: I236fca43c6e3e11aafeb58caba0e4798654124be
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/16105


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarKonrad Sztyber <konrad.sztyber@intel.com>
Community-CI: Mellanox Build Bot
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
parent 95a367d6
Loading
Loading
Loading
Loading
+38 −0
Original line number Diff line number Diff line
@@ -365,6 +365,31 @@ void spdk_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *re
 */
ssize_t spdk_sock_readv(struct spdk_sock *sock, struct iovec *iov, int iovcnt);

/**
 * Receive the next portion of the stream from the socket.
 *
 * A buffer provided to this socket's group's pool using
 * spdk_sock_group_provide_buf() will contain the data and be
 * returned in *buf.
 *
 * Note that the amount of data in buf is determined entirely by
 * the sock layer. You cannot request to receive only a limited
 * amount here. You simply get whatever the next portion of the stream
 * is, as determined by the sock module. You can place an upper limit
 * on the size of the buffer since these buffers are originally
 * provided to the group through spdk_sock_group_provide_buf().
 *
 * This code path will only work if the recvbuf is disabled. To disable
 * the recvbuf, call spdk_sock_set_recvbuf with a size of 0.
 *
 * \param sock Socket to receive from.
 * \param buf Populated with the next portion of the stream
 * \param ctx Returned context pointer from when the buffer was provided.
 *
 * \return On success, the length of the buffer placed into buf, On failure, -1 with errno set.
 */
int spdk_sock_recv_next(struct spdk_sock *sock, void **buf, void **ctx);

/**
 * Read message from the given socket asynchronously, calling the provided callback when the whole
 * buffer is filled or an error is encountered.  Only a single read request can be active at a time
@@ -481,6 +506,19 @@ int spdk_sock_group_add_sock(struct spdk_sock_group *group, struct spdk_sock *so
 */
int spdk_sock_group_remove_sock(struct spdk_sock_group *group, struct spdk_sock *sock);

/**
 * Provides a buffer to the group to be used in its receive pool.
 * See spdk_sock_recv_next() for more details.
 *
 * \param group Socket group.
 * \param buf Pointer the buffer provided.
 * \param len Length of the buffer.
 * \param ctx Pointer that will be returned in spdk_sock_recv_next()
 *
 * \return 0 on success, -1 on failure.
 */
int spdk_sock_group_provide_buf(struct spdk_sock_group *group, void *buf, size_t len, void *ctx);

/**
 * Poll incoming events for each registered socket.
 *
+10 −0
Original line number Diff line number Diff line
@@ -48,8 +48,15 @@ struct spdk_sock {
	struct spdk_sock_impl_opts	impl_opts;
};

struct spdk_sock_group_provided_buf {
	size_t						len;
	void						*ctx;
	STAILQ_ENTRY(spdk_sock_group_provided_buf)	link;
};

struct spdk_sock_group {
	STAILQ_HEAD(, spdk_sock_group_impl)	group_impls;
	STAILQ_HEAD(, spdk_sock_group_provided_buf) pool;
	void					*ctx;
};

@@ -79,6 +86,7 @@ struct spdk_net_impl {
	ssize_t (*readv)(struct spdk_sock *sock, struct iovec *iov, int iovcnt);
	ssize_t (*writev)(struct spdk_sock *sock, struct iovec *iov, int iovcnt);

	int (*recv_next)(struct spdk_sock *sock, void **buf, void **ctx);
	void (*writev_async)(struct spdk_sock *sock, struct spdk_sock_request *req);
	void (*readv_async)(struct spdk_sock *sock, struct spdk_sock_request *req);
	int (*flush)(struct spdk_sock *sock);
@@ -114,6 +122,8 @@ static void __attribute__((constructor)) net_impl_register_##name(void) \
	spdk_net_impl_register(impl, priority); \
}

size_t spdk_sock_group_get_buf(struct spdk_sock_group *group, void **buf, void **ctx);

static inline void
spdk_sock_request_queue(struct spdk_sock *sock, struct spdk_sock_request *req)
{
+48 −0
Original line number Diff line number Diff line
@@ -512,6 +512,22 @@ spdk_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req)
	sock->net_impl->writev_async(sock, req);
}

int
spdk_sock_recv_next(struct spdk_sock *sock, void **buf, void **ctx)
{
	if (sock == NULL || sock->flags.closed) {
		errno = EBADF;
		return -1;
	}

	if (sock->group_impl == NULL) {
		errno = ENOTSUP;
		return -1;
	}

	return sock->net_impl->recv_next(sock, buf, ctx);
}

int
spdk_sock_flush(struct spdk_sock *sock)
{
@@ -572,6 +588,7 @@ spdk_sock_group_create(void *ctx)
	}

	STAILQ_INIT(&group->group_impls);
	STAILQ_INIT(&group->pool);

	STAILQ_FOREACH_FROM(impl, &g_net_impls, link) {
		group_impl = impl->group_impl_create();
@@ -662,6 +679,37 @@ spdk_sock_group_remove_sock(struct spdk_sock_group *group, struct spdk_sock *soc
	return rc;
}

int
spdk_sock_group_provide_buf(struct spdk_sock_group *group, void *buf, size_t len, void *ctx)
{
	struct spdk_sock_group_provided_buf *provided;

	provided = (struct spdk_sock_group_provided_buf *)buf;

	provided->len = len;
	provided->ctx = ctx;
	STAILQ_INSERT_HEAD(&group->pool, provided, link);

	return 0;
}

size_t
spdk_sock_group_get_buf(struct spdk_sock_group *group, void **buf, void **ctx)
{
	struct spdk_sock_group_provided_buf *provided;

	provided = STAILQ_FIRST(&group->pool);
	if (provided == NULL) {
		*buf = NULL;
		return 0;
	}
	STAILQ_REMOVE_HEAD(&group->pool, link);

	*buf = provided;
	*ctx = provided->ctx;
	return provided->len;
}

int
spdk_sock_group_poll(struct spdk_sock_group *group)
{
+3 −0
Original line number Diff line number Diff line
@@ -12,6 +12,7 @@
	spdk_sock_close;
	spdk_sock_flush;
	spdk_sock_recv;
	spdk_sock_recv_next;
	spdk_sock_writev;
	spdk_sock_writev_async;
	spdk_sock_readv;
@@ -26,6 +27,7 @@
	spdk_sock_group_get_ctx;
	spdk_sock_group_add_sock;
	spdk_sock_group_remove_sock;
	spdk_sock_group_provide_buf;
	spdk_sock_group_poll;
	spdk_sock_group_poll_count;
	spdk_sock_group_close;
@@ -37,6 +39,7 @@

	# internal function in spdk_internal/sock.h
	spdk_net_impl_register;
	spdk_sock_group_get_buf;
	spdk_sock_map_insert;
	spdk_sock_map_release;
	spdk_sock_map_lookup;
+9 −0
Original line number Diff line number Diff line
@@ -1538,6 +1538,13 @@ posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
	}
}

static int
posix_sock_recv_next(struct spdk_sock *sock, void **buf, void **ctx)
{
	errno = ENOTSUP;
	return -1;
}

static void
posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req)
{
@@ -2036,6 +2043,7 @@ static struct spdk_net_impl g_posix_net_impl = {
	.readv		= posix_sock_readv,
	.readv_async	= posix_sock_readv_async,
	.writev		= posix_sock_writev,
	.recv_next	= posix_sock_recv_next,
	.writev_async	= posix_sock_writev_async,
	.flush		= posix_sock_flush,
	.set_recvlowat	= posix_sock_set_recvlowat,
@@ -2084,6 +2092,7 @@ static struct spdk_net_impl g_ssl_net_impl = {
	.recv		= posix_sock_recv,
	.readv		= posix_sock_readv,
	.writev		= posix_sock_writev,
	.recv_next	= posix_sock_recv_next,
	.writev_async	= posix_sock_writev_async,
	.flush		= posix_sock_flush,
	.set_recvlowat	= posix_sock_set_recvlowat,
Loading