Commit 5433004e authored by Ben Walker's avatar Ben Walker Committed by Tomasz Zawadzki
Browse files

sock/uring: Flushing a socket not in a group still attempts to send



Attempt to send any queued writes even if the socket is in a group. This
will not work if there is a send task already outstanding.

Change-Id: Icc0b5884e3d247042194ad26b30340ceb824886c
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/15212


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarKonrad Sztyber <konrad.sztyber@intel.com>
Reviewed-by: default avatarShuhei Matsumoto <smatsumoto@nvidia.com>
parent 567d6b53
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -319,7 +319,7 @@ int spdk_sock_close(struct spdk_sock **sock);
 *
 * \param sock Socket to flush.
 *
 * \return 0 on success, -1 on failure.
 * \return number of bytes sent on success, -1 (with errno set) on failure
 */
int spdk_sock_flush(struct spdk_sock *sock);

+12 −9
Original line number Diff line number Diff line
@@ -1213,14 +1213,15 @@ _sock_flush(struct spdk_sock *sock)
	int retval;
	struct spdk_sock_request *req;
	int i;
	ssize_t rc;
	ssize_t rc, sent;
	unsigned int offset;
	size_t len;
	bool is_zcopy = false;

	/* Can't flush from within a callback or we end up with recursive calls */
	if (sock->cb_cnt > 0) {
		return 0;
		errno = EAGAIN;
		return -1;
	}

#ifdef SPDK_ZEROCOPY
@@ -1251,12 +1252,14 @@ _sock_flush(struct spdk_sock *sock)
		rc = sendmsg(psock->fd, &msg, flags);
	}
	if (rc <= 0) {
		if (errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) {
			return 0;
		if (rc == 0 || errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && psock->zcopy)) {
			errno = EAGAIN;
		}
		return rc;
		return -1;
	}

	sent = rc;

	if (is_zcopy) {
		/* Handling overflow case, because we use psock->sendmsg_idx - 1 for the
		 * req->internal.offset, so sendmsg_idx should not be zero  */
@@ -1288,7 +1291,7 @@ _sock_flush(struct spdk_sock *sock)
			if (len > (size_t)rc) {
				/* This element was partially sent. */
				req->internal.offset += rc;
				return 0;
				return sent;
			}

			offset = 0;
@@ -1320,7 +1323,7 @@ _sock_flush(struct spdk_sock *sock)
		req = TAILQ_FIRST(&sock->queued_reqs);
	}

	return 0;
	return sent;
}

static int
@@ -1532,7 +1535,7 @@ posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req)
	/* If there are a sufficient number queued, just flush them out immediately. */
	if (sock->queued_iovcnt >= IOV_BATCH_SIZE) {
		rc = _sock_flush(sock);
		if (rc) {
		if (rc < 0 && errno != EAGAIN) {
			spdk_sock_abort_requests(sock);
		}
	}
@@ -1876,7 +1879,7 @@ posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
	 * group. */
	TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
		rc = _sock_flush(sock);
		if (rc) {
		if (rc < 0 && errno != EAGAIN) {
			spdk_sock_abort_requests(sock);
		}
	}
+55 −56
Original line number Diff line number Diff line
@@ -1271,57 +1271,7 @@ end:
	return count;
}

static int
_sock_flush_client(struct spdk_sock *_sock)
{
	struct spdk_uring_sock *sock = __uring_sock(_sock);
	struct msghdr msg = {};
	struct iovec iovs[IOV_BATCH_SIZE];
	int iovcnt;
	ssize_t rc;
	int flags = sock->zcopy_send_flags;
	int retval;
	bool is_zcopy = false;

	/* Can't flush from within a callback or we end up with recursive calls */
	if (_sock->cb_cnt > 0) {
		return 0;
	}

	/* Gather an iov */
	iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL, &flags);
	if (iovcnt == 0) {
		return 0;
	}

	/* Perform the vectored write */
	msg.msg_iov = iovs;
	msg.msg_iovlen = iovcnt;
	rc = sendmsg(sock->fd, &msg, flags | MSG_DONTWAIT);
	if (rc <= 0) {
		if (errno == EAGAIN || errno == EWOULDBLOCK) {
			return 0;
		}
		return rc;
	}

#ifdef SPDK_ZEROCOPY
	is_zcopy = flags & MSG_ZEROCOPY;
#endif
	retval = sock_complete_write_reqs(_sock, rc, is_zcopy);
	if (retval < 0) {
		/* if the socket is closed, return to avoid heap-use-after-free error */
		return retval;
	}

#ifdef SPDK_ZEROCOPY
	if (sock->zcopy && !TAILQ_EMPTY(&_sock->pending_reqs)) {
		_sock_check_zcopy(_sock, 0);
	}
#endif

	return 0;
}
static int uring_sock_flush(struct spdk_sock *_sock);

static void
uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req)
@@ -1338,8 +1288,8 @@ uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req)

	if (!sock->group) {
		if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) {
			rc = _sock_flush_client(_sock);
			if (rc) {
			rc = uring_sock_flush(_sock);
			if (rc < 0 && errno != EAGAIN) {
				spdk_sock_abort_requests(_sock);
			}
		}
@@ -1645,14 +1595,63 @@ static int
uring_sock_flush(struct spdk_sock *_sock)
{
	struct spdk_uring_sock *sock = __uring_sock(_sock);
	struct msghdr msg = {};
	struct iovec iovs[IOV_BATCH_SIZE];
	int iovcnt;
	ssize_t rc;
	int flags = sock->zcopy_send_flags;
	int retval;
	bool is_zcopy = false;

	if (!sock->group) {
		return _sock_flush_client(_sock);
	/* Can't flush from within a callback or we end up with recursive calls */
	if (_sock->cb_cnt > 0) {
		errno = EAGAIN;
		return -1;
	}

	/* Can't flush while a write is already outstanding */
	if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
		errno = EAGAIN;
		return -1;
	}

	/* Gather an iov */
	iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL, &flags);
	if (iovcnt == 0) {
		/* Nothing to send */
		return 0;
	}

	/* Perform the vectored write */
	msg.msg_iov = iovs;
	msg.msg_iovlen = iovcnt;
	rc = sendmsg(sock->fd, &msg, flags | MSG_DONTWAIT);
	if (rc <= 0) {
		if (rc == 0 || errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && sock->zcopy)) {
			errno = EAGAIN;
		}
		return -1;
	}

#ifdef SPDK_ZEROCOPY
	is_zcopy = flags & MSG_ZEROCOPY;
#endif
	retval = sock_complete_write_reqs(_sock, rc, is_zcopy);
	if (retval < 0) {
		/* if the socket is closed, return to avoid heap-use-after-free error */
		errno = ENOTCONN;
		return -1;
	}

#ifdef SPDK_ZEROCOPY
	if (sock->zcopy && !TAILQ_EMPTY(&_sock->pending_reqs)) {
		_sock_check_zcopy(_sock, 0);
	}
#endif

	return rc;
}

static struct spdk_net_impl g_uring_net_impl = {
	.name		= "uring",
	.getaddr	= uring_sock_getaddr,
+6 −6
Original line number Diff line number Diff line
@@ -72,7 +72,7 @@ flush(void)
	MOCK_SET(sendmsg, 64);
	cb_arg1 = false;
	rc = _sock_flush(sock);
	CU_ASSERT(rc == 0);
	CU_ASSERT(rc == 64);
	CU_ASSERT(cb_arg1 == true);
	CU_ASSERT(TAILQ_EMPTY(&sock->queued_reqs));

@@ -83,7 +83,7 @@ flush(void)
	cb_arg1 = false;
	cb_arg2 = false;
	rc = _sock_flush(sock);
	CU_ASSERT(rc == 0);
	CU_ASSERT(rc == 128);
	CU_ASSERT(cb_arg1 == true);
	CU_ASSERT(cb_arg2 == true);
	CU_ASSERT(TAILQ_EMPTY(&sock->queued_reqs));
@@ -95,7 +95,7 @@ flush(void)
	cb_arg1 = false;
	cb_arg2 = false;
	rc = _sock_flush(sock);
	CU_ASSERT(rc == 0);
	CU_ASSERT(rc == 64);
	CU_ASSERT(cb_arg1 == true);
	CU_ASSERT(cb_arg2 == false);
	CU_ASSERT(TAILQ_FIRST(&sock->queued_reqs) == req2);
@@ -107,7 +107,7 @@ flush(void)
	MOCK_SET(sendmsg, 10);
	cb_arg1 = false;
	rc = _sock_flush(sock);
	CU_ASSERT(rc == 0);
	CU_ASSERT(rc == 10);
	CU_ASSERT(cb_arg1 == false);
	CU_ASSERT(TAILQ_FIRST(&sock->queued_reqs) == req1);

@@ -115,7 +115,7 @@ flush(void)
	MOCK_SET(sendmsg, 24);
	cb_arg1 = false;
	rc = _sock_flush(sock);
	CU_ASSERT(rc == 0);
	CU_ASSERT(rc == 24);
	CU_ASSERT(cb_arg1 == false);
	CU_ASSERT(TAILQ_FIRST(&sock->queued_reqs) == req1);

@@ -123,7 +123,7 @@ flush(void)
	MOCK_SET(sendmsg, 30);
	cb_arg1 = false;
	rc = _sock_flush(sock);
	CU_ASSERT(rc == 0);
	CU_ASSERT(rc == 30);
	CU_ASSERT(cb_arg1 == true);
	CU_ASSERT(TAILQ_EMPTY(&sock->queued_reqs));

+12 −12
Original line number Diff line number Diff line
@@ -79,8 +79,8 @@ flush_client(void)
	spdk_sock_request_queue(sock, req1);
	MOCK_SET(sendmsg, 192);
	cb_arg1 = false;
	rc = _sock_flush_client(sock);
	CU_ASSERT(rc == 0);
	rc = uring_sock_flush(sock);
	CU_ASSERT(rc == 192);
	CU_ASSERT(cb_arg1 == true);
	CU_ASSERT(TAILQ_EMPTY(&sock->queued_reqs));

@@ -90,8 +90,8 @@ flush_client(void)
	MOCK_SET(sendmsg, 256);
	cb_arg1 = false;
	cb_arg2 = false;
	rc = _sock_flush_client(sock);
	CU_ASSERT(rc == 0);
	rc = uring_sock_flush(sock);
	CU_ASSERT(rc == 256);
	CU_ASSERT(cb_arg1 == true);
	CU_ASSERT(cb_arg2 == true);
	CU_ASSERT(TAILQ_EMPTY(&sock->queued_reqs));
@@ -102,8 +102,8 @@ flush_client(void)
	MOCK_SET(sendmsg, 192);
	cb_arg1 = false;
	cb_arg2 = false;
	rc = _sock_flush_client(sock);
	CU_ASSERT(rc == 0);
	rc = uring_sock_flush(sock);
	CU_ASSERT(rc == 192);
	CU_ASSERT(cb_arg1 == true);
	CU_ASSERT(cb_arg2 == false);
	CU_ASSERT(TAILQ_FIRST(&sock->queued_reqs) == req2);
@@ -114,24 +114,24 @@ flush_client(void)
	spdk_sock_request_queue(sock, req1);
	MOCK_SET(sendmsg, 10);
	cb_arg1 = false;
	rc = _sock_flush_client(sock);
	CU_ASSERT(rc == 0);
	rc = uring_sock_flush(sock);
	CU_ASSERT(rc == 10);
	CU_ASSERT(cb_arg1 == false);
	CU_ASSERT(TAILQ_FIRST(&sock->queued_reqs) == req1);

	/* Do a second flush that partial sends again. */
	MOCK_SET(sendmsg, 52);
	cb_arg1 = false;
	rc = _sock_flush_client(sock);
	CU_ASSERT(rc == 0);
	rc = uring_sock_flush(sock);
	CU_ASSERT(rc == 52);
	CU_ASSERT(cb_arg1 == false);
	CU_ASSERT(TAILQ_FIRST(&sock->queued_reqs) == req1);

	/* Flush the rest of the data */
	MOCK_SET(sendmsg, 130);
	cb_arg1 = false;
	rc = _sock_flush_client(sock);
	CU_ASSERT(rc == 0);
	rc = uring_sock_flush(sock);
	CU_ASSERT(rc == 130);
	CU_ASSERT(cb_arg1 == true);
	CU_ASSERT(TAILQ_EMPTY(&sock->queued_reqs));