Commit b7ad9426 authored by Ben Walker's avatar Ben Walker Committed by Tomasz Zawadzki
Browse files

sock: Add an asynchronous writev



Add spdk_sock_writev_async for performing asynchronous writes to
sockets. The user of this call is responsible for allocating their own
spdk_sock_request structures to pass to this call.

spdk_sock_writev_async will not return EAGAIN and will instead leave the
requests queued until they are fully sent or aborted due to socket
error.

Change-Id: Idf3239e65d26a3024e578122c23e4fb8f95e241b
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/470523


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Community-CI: Broadcom SPDK FC-NVMe CI <spdk-ci.pdl@broadcom.com>
Community-CI: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
parent 4af2b9bf
Loading
Loading
Loading
Loading
+7 −0
Original line number Diff line number Diff line
@@ -2,6 +2,13 @@

## v20.01: (Upcoming Release)

### sock

Added spdk_sock_writev_async for performing asynchronous writes to sockets. This call will
never return EAGAIN, instead queueing internally until the data has all been sent. This can
simplify many code flows that create pollers to continue attempting to flush writes
on sockets.

### isa-l

Updated ISA-L submodule to commit f3993f5c0b6911 which includes implementation and
+41 −0
Original line number Diff line number Diff line
@@ -40,6 +40,8 @@

#include "spdk/stdinc.h"

#include "spdk/queue.h"

#ifdef __cplusplus
extern "C" {
#endif
@@ -47,6 +49,36 @@ extern "C" {
struct spdk_sock;
struct spdk_sock_group;

/**
 * Anywhere this struct is used, an iovec array is assumed to
 * immediately follow the last member in memory, without any
 * padding.
 *
 * A simpler implementation would be to place a 0-length array
 * of struct iovec at the end of this request. However, embedding
 * a structure that ends with a variable length array inside of
 * another structure is a GNU C extension and not standard.
 */
struct spdk_sock_request {
	/* When the request is completed, this callback will be called.
	 * err will be 0 on success or a negated errno value on failure. */
	void	(*cb_fn)(void *cb_arg, int err);
	void				*cb_arg;

	/**
	 * These fields are used by the socket layer and should not be modified
	 */
	struct __sock_request_internal {
		TAILQ_ENTRY(spdk_sock_request)	link;
		unsigned int			offset;
	} internal;

	int				iovcnt;
	/* struct iovec			iov[]; */
};

#define SPDK_SOCK_REQUEST_IOV(req, i) ((struct iovec *)(((uint8_t *)req + sizeof(struct spdk_sock_request)) + (sizeof(struct iovec) * i)))

/**
 * Get client and server addresses of the given socket.
 *
@@ -126,6 +158,15 @@ ssize_t spdk_sock_recv(struct spdk_sock *sock, void *buf, size_t len);
 */
ssize_t spdk_sock_writev(struct spdk_sock *sock, struct iovec *iov, int iovcnt);

/**
 * Write data to the given socket asynchronously, calling
 * the provided callback when the data has been written.
 *
 * \param sock Socket to write to.
 * \param req The write request to submit.
 */
void spdk_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req);

/**
 * Read message from the given socket to the I/O vector array.
 *
+79 −0
Original line number Diff line number Diff line
@@ -50,10 +50,20 @@ extern "C" {

struct spdk_sock {
	struct spdk_net_impl		*net_impl;
	int				cb_cnt;
	spdk_sock_cb			cb_fn;
	void				*cb_arg;
	struct spdk_sock_group_impl	*group_impl;
	TAILQ_ENTRY(spdk_sock)		link;

	int				max_iovcnt;
	TAILQ_HEAD(, spdk_sock_request)	queued_reqs;
	int				queued_iovcnt;

	struct {
		uint8_t		closed		: 1;
		uint8_t		reserved	: 7;
	} flags;
};

struct spdk_sock_group {
@@ -80,6 +90,8 @@ struct spdk_net_impl {
	ssize_t (*readv)(struct spdk_sock *sock, struct iovec *iov, int iovcnt);
	ssize_t (*writev)(struct spdk_sock *sock, struct iovec *iov, int iovcnt);

	void (*writev_async)(struct spdk_sock *sock, struct spdk_sock_request *req);

	int (*set_recvlowat)(struct spdk_sock *sock, int nbytes);
	int (*set_recvbuf)(struct spdk_sock *sock, int sz);
	int (*set_sendbuf)(struct spdk_sock *sock, int sz);
@@ -108,6 +120,73 @@ static void __attribute__((constructor)) net_impl_register_##name(void) \
	spdk_net_impl_register(impl); \
}

static inline void
spdk_sock_request_queue(struct spdk_sock *sock, struct spdk_sock_request *req)
{
	TAILQ_INSERT_TAIL(&sock->queued_reqs, req, internal.link);
	sock->queued_iovcnt += req->iovcnt;
}

static inline int
spdk_sock_request_put(struct spdk_sock *sock, struct spdk_sock_request *req, int err)
{
	bool closed;
	int rc = 0;

	TAILQ_REMOVE(&sock->queued_reqs, req, internal.link);
	assert(sock->queued_iovcnt >= req->iovcnt);
	sock->queued_iovcnt -= req->iovcnt;

	closed = sock->flags.closed;
	sock->cb_cnt++;
	req->cb_fn(req->cb_arg, err);
	assert(sock->cb_cnt > 0);
	sock->cb_cnt--;

	if (sock->cb_cnt == 0 && !closed && sock->flags.closed) {
		/* The user closed the socket in response to a callback above. */
		rc = -1;
		spdk_sock_close(&sock);
	}

	return rc;
}

static inline int
spdk_sock_abort_requests(struct spdk_sock *sock)
{
	struct spdk_sock_request *req;
	bool closed;
	int rc = 0;

	closed = sock->flags.closed;
	sock->cb_cnt++;

	req = TAILQ_FIRST(&sock->queued_reqs);
	while (req) {
		TAILQ_REMOVE(&sock->queued_reqs, req, internal.link);

		assert(sock->queued_iovcnt >= req->iovcnt);
		sock->queued_iovcnt -= req->iovcnt;

		req->cb_fn(req->cb_arg, -ECANCELED);

		req = TAILQ_FIRST(&sock->queued_reqs);
	}
	assert(sock->cb_cnt > 0);
	sock->cb_cnt--;

	assert(TAILQ_EMPTY(&sock->queued_reqs));

	if (sock->cb_cnt == 0 && !closed && sock->flags.closed) {
		/* The user closed the socket in response to a callback above. */
		rc = -1;
		spdk_sock_close(&sock);
	}

	return rc;
}

#ifdef __cplusplus
}
#endif
+46 −0
Original line number Diff line number Diff line
@@ -178,6 +178,7 @@ spdk_sock_connect(const char *ip, int port)
		sock = impl->connect(ip, port);
		if (sock != NULL) {
			sock->net_impl = impl;
			TAILQ_INIT(&sock->queued_reqs);
			return sock;
		}
	}
@@ -195,6 +196,8 @@ spdk_sock_listen(const char *ip, int port)
		sock = impl->listen(ip, port);
		if (sock != NULL) {
			sock->net_impl = impl;
			/* Don't need to initialize the request queues for listen
			 * sockets. */
			return sock;
		}
	}
@@ -210,6 +213,7 @@ spdk_sock_accept(struct spdk_sock *sock)
	new_sock = sock->net_impl->accept(sock);
	if (new_sock != NULL) {
		new_sock->net_impl = sock->net_impl;
		TAILQ_INIT(&new_sock->queued_reqs);
	}

	return new_sock;
@@ -232,6 +236,15 @@ spdk_sock_close(struct spdk_sock **_sock)
		return -1;
	}

	sock->flags.closed = true;

	if (sock->cb_cnt > 0) {
		/* Let the callback unwind before destroying the socket */
		return 0;
	}

	spdk_sock_abort_requests(sock);

	rc = sock->net_impl->close(sock);
	if (rc == 0) {
		*_sock = NULL;
@@ -248,6 +261,11 @@ spdk_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
		return -1;
	}

	if (sock->flags.closed) {
		errno = EBADF;
		return -1;
	}

	return sock->net_impl->recv(sock, buf, len);
}

@@ -259,6 +277,11 @@ spdk_sock_readv(struct spdk_sock *sock, struct iovec *iov, int iovcnt)
		return -1;
	}

	if (sock->flags.closed) {
		errno = EBADF;
		return -1;
	}

	return sock->net_impl->readv(sock, iov, iovcnt);
}

@@ -270,9 +293,32 @@ spdk_sock_writev(struct spdk_sock *sock, struct iovec *iov, int iovcnt)
		return -1;
	}

	if (sock->flags.closed) {
		errno = EBADF;
		return -1;
	}

	return sock->net_impl->writev(sock, iov, iovcnt);
}

void
spdk_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req)
{
	assert(req->cb_fn != NULL);

	if (sock == NULL) {
		req->cb_fn(req->cb_arg, -EBADF);
		return;
	}

	if (sock->flags.closed) {
		req->cb_fn(req->cb_arg, -EBADF);
		return;
	}

	sock->net_impl->writev_async(sock, req);
}

int
spdk_sock_set_recvlowat(struct spdk_sock *sock, int nbytes)
{
+159 −4
Original line number Diff line number Diff line
@@ -47,6 +47,7 @@
#define PORTNUMLEN 32
#define SO_RCVBUF_SIZE (2 * 1024 * 1024)
#define SO_SNDBUF_SIZE (2 * 1024 * 1024)
#define IOV_BATCH_SIZE 64

struct spdk_posix_sock {
	struct spdk_sock	base;
@@ -433,6 +434,108 @@ spdk_posix_sock_close(struct spdk_sock *_sock)
	return 0;
}

static int
_sock_flush(struct spdk_sock *sock)
{
	struct spdk_posix_sock *psock = __posix_sock(sock);
	struct iovec iovs[IOV_BATCH_SIZE];
	int iovcnt;
	int retval;
	struct spdk_sock_request *req;
	int i;
	ssize_t rc;
	unsigned int offset;
	size_t len;

	/* Can't flush from within a callback or we end up with recursive calls */
	if (sock->cb_cnt > 0) {
		return 0;
	}

	/* Gather an iov */
	iovcnt = 0;
	req = TAILQ_FIRST(&sock->queued_reqs);
	while (req) {
		offset = req->internal.offset;

		for (i = 0; i < req->iovcnt; i++) {
			/* Consume any offset first */
			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
				continue;
			}

			iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset;
			iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
			iovcnt++;

			offset = 0;

			if (iovcnt >= IOV_BATCH_SIZE) {
				break;
			}
		}

		if (iovcnt >= IOV_BATCH_SIZE) {
			break;
		}

		req = TAILQ_NEXT(req, internal.link);
	}

	if (iovcnt == 0) {
		return 0;
	}

	/* Perform the vectored write */
	rc = writev(psock->fd, iovs, iovcnt);
	if (rc <= 0) {
		if (errno == EAGAIN || errno == EWOULDBLOCK) {
			return 0;
		}
		return rc;
	}

	/* Consume the requests that were actually written */
	req = TAILQ_FIRST(&sock->queued_reqs);
	while (req) {
		offset = req->internal.offset;

		for (i = 0; i < req->iovcnt; i++) {
			/* Advance by the offset first */
			if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
				offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
				continue;
			}

			/* Calculate the remaining length of this element */
			len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;

			if (len > (size_t)rc) {
				/* This element was partially sent. */
				req->internal.offset += rc;
				return 0;
			}

			offset = 0;
			req->internal.offset += len;
			rc -= len;
		}

		/* Handled a full request. */
		req->internal.offset = 0;
		retval = spdk_sock_request_put(sock, req, 0);

		if (rc == 0 || retval) {
			break;
		}

		req = TAILQ_FIRST(&sock->queued_reqs);
	}

	return 0;
}

static ssize_t
spdk_posix_sock_recv(struct spdk_sock *_sock, void *buf, size_t len)
{
@@ -453,10 +556,45 @@ static ssize_t
spdk_posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
{
	struct spdk_posix_sock *sock = __posix_sock(_sock);
	int rc;

	/* In order to process a writev, we need to flush any asynchronous writes
	 * first. */
	rc = _sock_flush(_sock);
	if (rc < 0) {
		return rc;
	}

	if (!TAILQ_EMPTY(&_sock->queued_reqs)) {
		/* We weren't able to flush all requests */
		errno = EAGAIN;
		return -1;
	}

	return writev(sock->fd, iov, iovcnt);
}

static void
spdk_posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req)
{
	int rc;

	spdk_sock_request_queue(sock, req);

	if (sock->group_impl == NULL) {
		spdk_sock_request_put(sock, req, -ENOTSUP);
		return;
	}

	/* If there are a sufficient number queued, just flush them out immediately. */
	if (sock->queued_iovcnt >= IOV_BATCH_SIZE) {
		rc = _sock_flush(sock);
		if (rc) {
			spdk_sock_abort_requests(sock);
		}
	}
}

static int
spdk_posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
{
@@ -632,6 +770,7 @@ spdk_posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, stru
	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
	struct spdk_posix_sock *sock = __posix_sock(_sock);
	int rc;

#if defined(__linux__)
	struct epoll_event event;

@@ -649,6 +788,9 @@ spdk_posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, stru
		errno = event.data;
	}
#endif

	spdk_sock_abort_requests(_sock);

	return rc;
}

@@ -657,16 +799,28 @@ spdk_posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_eve
				struct spdk_sock **socks)
{
	struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
	int num_events, i;

	struct spdk_sock *sock, *tmp;
	int num_events, i, rc;
#if defined(__linux__)
	struct epoll_event events[MAX_EVENTS_PER_POLL];

	num_events = epoll_wait(group->fd, events, max_events, 0);
#elif defined(__FreeBSD__)
	struct kevent events[MAX_EVENTS_PER_POLL];
	struct timespec ts = {0};
#endif

	/* This must be a TAILQ_FOREACH_SAFE because while flushing,
	 * a completion callback could remove the sock from the
	 * group. */
	TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
		rc = _sock_flush(sock);
		if (rc) {
			spdk_sock_abort_requests(sock);
		}
	}

#if defined(__linux__)
	num_events = epoll_wait(group->fd, events, max_events, 0);
#elif defined(__FreeBSD__)
	num_events = kevent(group->fd, NULL, 0, events, max_events, &ts);
#endif

@@ -706,6 +860,7 @@ static struct spdk_net_impl g_posix_net_impl = {
	.recv		= spdk_posix_sock_recv,
	.readv		= spdk_posix_sock_readv,
	.writev		= spdk_posix_sock_writev,
	.writev_async	= spdk_posix_sock_writev_async,
	.set_recvlowat	= spdk_posix_sock_set_recvlowat,
	.set_recvbuf	= spdk_posix_sock_set_recvbuf,
	.set_sendbuf	= spdk_posix_sock_set_sendbuf,
Loading