Commit 5ccf427a authored by Ben Walker's avatar Ben Walker Committed by Jim Harris
Browse files

sock/uring: Separate POLLIN from POLLERR tasks



The POLLIN task is going to be associated with a read operation
in an upcoming patch and cannot be bundled with the POLLERR.

The POLLIN task is only submitted if the socket does not already have
pending_recv set, but POLLERR is always submitted. This is a shift
toward our new model where the commands stay submitted always. However,
if a POLLERR completes successfully while the socket is being removed
from a group, then an errqueue could get submitted. So new flags are
added during removal to avoid submitting new commands.

Change-Id: Ib1bb88cb0fa7c5e7237c9e99679bdcb4b4a7e3c7
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/17593


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarKonrad Sztyber <konrad.sztyber@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Community-CI: Mellanox Build Bot
parent 03233d3d
Loading
Loading
Loading
Loading
+69 −7
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@

enum spdk_sock_task_type {
	SPDK_SOCK_TASK_POLLIN = 0,
	SPDK_SOCK_TASK_POLLERR,
	SPDK_SOCK_TASK_ERRQUEUE,
	SPDK_SOCK_TASK_WRITE,
	SPDK_SOCK_TASK_CANCEL,
@@ -63,12 +64,14 @@ struct spdk_uring_sock {
	struct spdk_uring_task			write_task;
	struct spdk_uring_task			errqueue_task;
	struct spdk_uring_task			pollin_task;
	struct spdk_uring_task			pollerr_task;
	struct spdk_uring_task			cancel_task;
	struct spdk_pipe			*recv_pipe;
	void					*recv_buf;
	int					recv_buf_sz;
	bool					zcopy;
	bool					pending_recv;
	bool					pending_group_remove;
	int					zcopy_send_flags;
	int					connection_status;
	int					placement_id;
@@ -1028,6 +1031,10 @@ _sock_prep_errqueue(struct spdk_sock *_sock)
		return;
	}

	if (sock->pending_group_remove) {
		return;
	}

	assert(sock->group != NULL);
	sock->group->io_queued++;

@@ -1081,6 +1088,33 @@ _sock_flush(struct spdk_sock *_sock)
	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
}

#ifdef SPDK_ZEROCOPY
static void
_sock_prep_pollerr(struct spdk_sock *_sock)
{
	struct spdk_uring_sock *sock = __uring_sock(_sock);
	struct spdk_uring_task *task = &sock->pollerr_task;
	struct io_uring_sqe *sqe;

	/* Do not prepare pollerr event */
	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
		return;
	}

	if (sock->pending_group_remove) {
		return;
	}

	assert(sock->group != NULL);
	sock->group->io_queued++;

	sqe = io_uring_get_sqe(&sock->group->uring);
	io_uring_prep_poll_add(sqe, sock->fd, POLLERR);
	io_uring_sqe_set_data(sqe, task);
	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
}
#endif

static void
_sock_prep_pollin(struct spdk_sock *_sock)
{
@@ -1089,7 +1123,11 @@ _sock_prep_pollin(struct spdk_sock *_sock)
	struct io_uring_sqe *sqe;

	/* Do not prepare pollin event */
	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || (sock->pending_recv && !sock->zcopy)) {
	if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || sock->pending_recv) {
		return;
	}

	if (sock->pending_group_remove) {
		return;
	}

@@ -1097,7 +1135,7 @@ _sock_prep_pollin(struct spdk_sock *_sock)
	sock->group->io_queued++;

	sqe = io_uring_get_sqe(&sock->group->uring);
	io_uring_prep_poll_add(sqe, sock->fd, POLLIN | POLLERR);
	io_uring_prep_poll_add(sqe, sock->fd, POLLIN);
	io_uring_sqe_set_data(sqe, task);
	task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
}
@@ -1164,11 +1202,6 @@ sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max

		switch (task->type) {
		case SPDK_SOCK_TASK_POLLIN:
#ifdef SPDK_ZEROCOPY
			if ((status & POLLERR) == POLLERR) {
				_sock_prep_errqueue(&sock->base);
			}
#endif
			if ((status & POLLIN) == POLLIN) {
				if (sock->base.cb_fn != NULL &&
				    sock->pending_recv == false) {
@@ -1177,6 +1210,13 @@ sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max
				}
			}
			break;
		case SPDK_SOCK_TASK_POLLERR:
#ifdef SPDK_ZEROCOPY
			if ((status & POLLERR) == POLLERR) {
				_sock_prep_errqueue(&sock->base);
			}
#endif
			break;
		case SPDK_SOCK_TASK_WRITE:
			task->last_req = NULL;
			task->iov_cnt = 0;
@@ -1448,6 +1488,9 @@ uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group,
	sock->pollin_task.sock = sock;
	sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN;

	sock->pollerr_task.sock = sock;
	sock->pollerr_task.type = SPDK_SOCK_TASK_POLLERR;

	sock->errqueue_task.sock = sock;
	sock->errqueue_task.type = SPDK_SOCK_TASK_ERRQUEUE;
	sock->errqueue_task.msg.msg_control = sock->buf;
@@ -1493,6 +1536,11 @@ uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
			}
			_sock_flush(_sock);
			_sock_prep_pollin(_sock);
#ifdef SPDK_ZEROCOPY
			if (sock->zcopy) {
				_sock_prep_pollerr(_sock);
			}
#endif
		}
	}

@@ -1527,6 +1575,8 @@ uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
	struct spdk_uring_sock *sock = __uring_sock(_sock);
	struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);

	sock->pending_group_remove = true;

	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
		_sock_prep_cancel_task(_sock, &sock->write_task);
		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
@@ -1547,6 +1597,16 @@ uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
		}
	}

	if (sock->pollerr_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
		_sock_prep_cancel_task(_sock, &sock->pollerr_task);
		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
		 * currently can use a while loop here. */
		while ((sock->pollerr_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
		       (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
			uring_sock_group_impl_poll(_group, 32, NULL);
		}
	}

	if (sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
		_sock_prep_cancel_task(_sock, &sock->errqueue_task);
		/* Since spdk_sock_group_remove_sock is not asynchronous interface, so
@@ -1560,6 +1620,7 @@ uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
	/* Make sure the cancelling the tasks above didn't cause sending new requests */
	assert(sock->write_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
	assert(sock->pollin_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
	assert(sock->pollerr_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
	assert(sock->errqueue_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);

	if (sock->pending_recv) {
@@ -1572,6 +1633,7 @@ uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
		spdk_sock_map_release(&g_map, sock->placement_id);
	}

	sock->pending_group_remove = false;
	sock->group = NULL;
	return 0;
}