Commit 1ae601b5 authored by Ben Walker's avatar Ben Walker Committed by Tomasz Zawadzki
Browse files

sock/posix: Avoid extra readv calls after draining recv_pipe



Move from a single flag indicating that the socket is on the
pending_events list to two flags - pipe_has_data and socket_has_data. If
either flag is true, the socket is on the socks_with_data list.

This is necessary to track enough state to avoid doing extra recv()
system calls.

Change-Id: I65e5701dccb0a5bade19f266f164f26706b110d4
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/7595


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Community-CI: Mellanox Build Bot
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Reviewed-by: default avatarAleksey Marchuk <alexeymar@mellanox.com>
parent 4e9adb3b
Loading
Loading
Loading
Loading
+84 −66
Original line number Diff line number Diff line
@@ -68,7 +68,8 @@ struct spdk_posix_sock {
	struct spdk_pipe	*recv_pipe;
	void			*recv_buf;
	int			recv_buf_sz;
	bool			pending_events;
	bool			pipe_has_data;
	bool			socket_has_data;
	bool			zcopy;

	int			placement_id;
@@ -76,12 +77,12 @@ struct spdk_posix_sock {
	TAILQ_ENTRY(spdk_posix_sock)	link;
};

TAILQ_HEAD(spdk_pending_events_list, spdk_posix_sock);
TAILQ_HEAD(spdk_has_data_list, spdk_posix_sock);

struct spdk_posix_sock_group_impl {
	struct spdk_sock_group_impl	base;
	int				fd;
	struct spdk_pending_events_list	pending_events;
	struct spdk_has_data_list	socks_with_data;
	int				placement_id;
};

@@ -904,13 +905,16 @@ posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int

	spdk_pipe_reader_advance(sock->recv_pipe, bytes);

	/* If we drained the pipe, take it off the pending_events list. The socket may still have data buffered
	 * in the kernel to receive, but this will be handled on the next poll call when we get the same EPOLLIN
	 * event again. */
	if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
	/* If we drained the pipe, mark it appropriately */
	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
		assert(sock->pipe_has_data == true);

		group = __posix_group_impl(sock->base.group_impl);
		TAILQ_REMOVE(&group->pending_events, sock, link);
		sock->pending_events = false;
		if (group && !sock->socket_has_data) {
			TAILQ_REMOVE(&group->socks_with_data, sock, link);
		}

		sock->pipe_has_data = false;
	}

	return bytes;
@@ -920,34 +924,46 @@ static inline ssize_t
posix_sock_read(struct spdk_posix_sock *sock)
{
	struct iovec iov[2];
	int bytes;
	int bytes_avail, bytes_recvd;
	struct spdk_posix_sock_group_impl *group;

	bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);

	if (bytes > 0) {
		bytes = readv(sock->fd, iov, 2);
		if (bytes > 0) {
			spdk_pipe_writer_advance(sock->recv_pipe, bytes);

			/* For normal operation, this function is called in response to an EPOLLIN
			 * event, which already placed the socket onto the pending_events list.
			 * But between polls the user may repeatedly call posix_sock_read
			 * and if they clear the pipe on one of those earlier calls, the
			 * socket will be removed from the pending_events list. In that case,
			 * if we now found more data, put it back on.
			 * This essentially never happens in practice because the application
			 * will stop trying to receive and wait for the next EPOLLIN event, but
			 * for correctness let's handle it. */
			if (!sock->pending_events && sock->base.group_impl) {
	bytes_avail = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);

	if (bytes_avail <= 0) {
		return bytes_avail;
	}

	bytes_recvd = readv(sock->fd, iov, 2);

	assert(sock->pipe_has_data == false);

	if (bytes_recvd <= 0) {
		/* Errors count as draining the socket data */
		if (sock->base.group_impl && sock->socket_has_data) {
			group = __posix_group_impl(sock->base.group_impl);
				TAILQ_INSERT_TAIL(&group->pending_events, sock, link);
				sock->pending_events = true;
			TAILQ_REMOVE(&group->socks_with_data, sock, link);
		}

		sock->socket_has_data = false;

		return bytes_recvd;
	}

	spdk_pipe_writer_advance(sock->recv_pipe, bytes_recvd);

#if DEBUG
	if (sock->base.group_impl) {
		assert(sock->socket_has_data == true);
	}
#endif

	return bytes;
	sock->pipe_has_data = true;
	if (bytes_recvd < bytes_avail) {
		/* We drained the kernel socket entirely. */
		sock->socket_has_data = false;
	}

	return bytes_recvd;
}

static ssize_t
@@ -959,26 +975,26 @@ posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
	size_t len;

	if (sock->recv_pipe == NULL) {
		if (group && sock->pending_events) {
			sock->pending_events = false;
			TAILQ_REMOVE(&group->pending_events, sock, link);
		assert(sock->pipe_has_data == false);
		if (group && sock->socket_has_data) {
			sock->socket_has_data = false;
			TAILQ_REMOVE(&group->socks_with_data, sock, link);
		}
		return readv(sock->fd, iov, iovcnt);
	}

	/* If the socket is not in a group, we must assume it always has
	 * data waiting for us because it is not epolled */
	if (!sock->pipe_has_data && (group == NULL || sock->socket_has_data)) {
		/* If the user is receiving a sufficiently large amount of data,
		 * receive directly to their buffers. */
		len = 0;
		for (i = 0; i < iovcnt; i++) {
			len += iov[i].iov_len;
		}

	if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
		/* If the user is receiving a sufficiently large amount of data,
		 * receive directly to their buffers. */
		if (len >= MIN_SOCK_PIPE_SIZE) {
			if (group && sock->pending_events) {
				sock->pending_events = false;
				TAILQ_REMOVE(&group->pending_events, sock, link);
			}
			/* TODO: Should this detect if kernel socket is drained? */
			return readv(sock->fd, iov, iovcnt);
		}

@@ -1160,7 +1176,7 @@ posix_sock_group_impl_create(void)
	}

	group_impl->fd = fd;
	TAILQ_INIT(&group_impl->pending_events);
	TAILQ_INIT(&group_impl->socks_with_data);
	group_impl->placement_id = -1;

	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
@@ -1256,9 +1272,9 @@ posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_
	/* switched from another polling group due to scheduling */
	if (spdk_unlikely(sock->recv_pipe != NULL  &&
			  (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
		assert(sock->pending_events == false);
		sock->pending_events = true;
		TAILQ_INSERT_TAIL(&group->pending_events, sock, link);
		sock->pipe_has_data = true;
		sock->socket_has_data = false;
		TAILQ_INSERT_TAIL(&group->socks_with_data, sock, link);
	}

	if (g_spdk_posix_sock_impl_opts.enable_placement_id == PLACEMENT_MARK) {
@@ -1281,9 +1297,10 @@ posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct sp
	struct spdk_posix_sock *sock = __posix_sock(_sock);
	int rc;

	if (sock->pending_events) {
		TAILQ_REMOVE(&group->pending_events, sock, link);
		sock->pending_events = false;
	if (sock->pipe_has_data || sock->socket_has_data) {
		TAILQ_REMOVE(&group->socks_with_data, sock, link);
		sock->pipe_has_data = false;
		sock->socket_has_data = false;
	}

	if (sock->placement_id != -1) {
@@ -1362,7 +1379,7 @@ posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
	 */
	int last_placement_id = -1;

	TAILQ_FOREACH(psock, &group->pending_events, link) {
	TAILQ_FOREACH(psock, &group->socks_with_data, link) {
		if (psock->zcopy && psock->placement_id >= 0 &&
		    psock->placement_id != last_placement_id) {
			struct pollfd pfd = {psock->fd, POLLIN | POLLERR, 0};
@@ -1433,16 +1450,16 @@ posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
		psock = __posix_sock(sock);
#endif

		/* If the socket does not already have recv pending, add it now */
		if (!psock->pending_events) {
			psock->pending_events = true;
			TAILQ_INSERT_TAIL(&group->pending_events, psock, link);
		/* If the socket is not already in the list, add it now */
		if (!psock->socket_has_data && !psock->pipe_has_data) {
			TAILQ_INSERT_TAIL(&group->socks_with_data, psock, link);
		}
		psock->socket_has_data = true;
	}

	num_events = 0;

	TAILQ_FOREACH_SAFE(psock, &group->pending_events, link, ptmp) {
	TAILQ_FOREACH_SAFE(psock, &group->socks_with_data, link, ptmp) {
		if (num_events == max_events) {
			break;
		}
@@ -1450,15 +1467,16 @@ posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
		/* If the socket's cb_fn is NULL, just remove it from the
		 * list and do not add it to socks array */
		if (spdk_unlikely(psock->base.cb_fn == NULL)) {
			psock->pending_events = false;
			TAILQ_REMOVE(&group->pending_events, psock, link);
			psock->socket_has_data = false;
			psock->pipe_has_data = false;
			TAILQ_REMOVE(&group->socks_with_data, psock, link);
			continue;
		}

		socks[num_events++] = &psock->base;
	}

	/* Cycle the pending_events list so that each time we poll things aren't
	/* Cycle the has_data list so that each time we poll things aren't
	 * in the same order. Say we have 6 sockets in the list, named as follows:
	 * A B C D E F
	 * And all 6 sockets had epoll events, but max_events is only 3. That means
@@ -1473,9 +1491,9 @@ posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,

		/* Capture pointers to the elements we need */
		pd = psock;
		pc = TAILQ_PREV(pd, spdk_pending_events_list, link);
		pa = TAILQ_FIRST(&group->pending_events);
		pf = TAILQ_LAST(&group->pending_events, spdk_pending_events_list);
		pc = TAILQ_PREV(pd, spdk_has_data_list, link);
		pa = TAILQ_FIRST(&group->socks_with_data);
		pf = TAILQ_LAST(&group->socks_with_data, spdk_has_data_list);

		/* Break the link between C and D */
		pc->link.tqe_next = NULL;
@@ -1486,8 +1504,8 @@ posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
		pa->link.tqe_prev = &pf->link.tqe_next;

		/* Fix up the list first/last pointers */
		group->pending_events.tqh_first = pd;
		group->pending_events.tqh_last = &pc->link.tqe_next;
		group->socks_with_data.tqh_first = pd;
		group->socks_with_data.tqh_last = &pc->link.tqe_next;
	}

	return num_events;