Unverified Commit 1ca52459 authored by Zelda Hessler's avatar Zelda Hessler Committed by GitHub
Browse files

Add a test ensuring streamed response will error if `EOF` is received early (#1906)

* add: test ensuring streamed response will error if eof is received early
formatting: sort s3 integration test cargo toml deps
update: CHANGELOG.next.toml

* formatting: run cargo fmt
remove: commented out logging init
add: comment for CRLF check
update: read Ok(0) case to say note that it's unreachable and why that's the case
update: debug messages

* update: test to use OS-assigned port instead of 3000
parent fb631b4f
Loading
Loading
Loading
Loading
+12 −0
Original line number Diff line number Diff line
@@ -17,12 +17,24 @@ references = ["smithy-rs#1847"]
meta = { "breaking" = false, "tada" = false, "bug" = true }
author = "crisidev"

[[aws-sdk-rust]]
message = "Add test to exercise excluded headers in aws-sigv4."
references = ["smithy-rs#1890"]
meta = { "breaking" = false, "tada" = false, "bug" = false }
author = "ysaito1001"

[[aws-sdk-rust]]
message = "Support Sigv4 signature generation on PowerPC 32 and 64 bit. This architecture cannot compile `ring`, so the implementation has been updated to rely on `hamc` + `sha2` to achive the same result with broader platform compatibility and higher performance. We also updated the CI which is now running as many tests as possible against i686 and PowerPC 32 and 64 bit."
references = ["smithy-rs#1847"]
meta = { "breaking" = true, "tada" = false, "bug" = true }
author = "crisidev"

[[aws-sdk-rust]]
message = "Add test ensuring that a response will error if the response body returns an EOF before the entire body has been read."
references = ["smithy-rs#1801"]
meta = { "breaking" = false, "tada" = false, "bug" = false }
author = "Velfi"

[[smithy-rs]]
message = "Replace bool with enum for a function parameter of `label::fmt_string`."
references = ["smithy-rs#1875"]
+3 −3
Original line number Diff line number Diff line
@@ -11,7 +11,6 @@ edition = "2021"
async-std = "1.12.0"
aws-config = { path = "../../build/aws-sdk/sdk/aws-config" }
aws-http = { path = "../../build/aws-sdk/sdk/aws-http" }
aws-types = { path = "../../build/aws-sdk/sdk/aws-types" }
aws-sdk-s3 = { path = "../../build/aws-sdk/sdk/s3" }
aws-sdk-sts = { path = "../../build/aws-sdk/sdk/sts" }
aws-smithy-async = { path = "../../build/aws-sdk/sdk/aws-smithy-async", features = ["rt-tokio"] }
@@ -19,14 +18,15 @@ aws-smithy-client = { path = "../../build/aws-sdk/sdk/aws-smithy-client", featur
aws-smithy-http = { path = "../../build/aws-sdk/sdk/aws-smithy-http" }
aws-smithy-protocol-test = { path = "../../build/aws-sdk/sdk/aws-smithy-protocol-test" }
aws-smithy-types = { path = "../../build/aws-sdk/sdk/aws-smithy-types" }
aws-types = { path = "../../build/aws-sdk/sdk/aws-types" }
bytes = "1"
bytes-utils = "0.1.2"
http = "0.2.3"
http-body = "0.4.5"
hyper = "0.14.12"
serde_json = "1"
tempfile = "3"
smol = "1.2"
tempfile = "3"
tokio = { version = "1.8.4", features = ["full", "test-util"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.15", features = ["env-filter"] }
tracing = "0.1.0"
+140 −0
Original line number Diff line number Diff line
/*
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0
 */

use aws_sdk_s3::{Credentials, Endpoint, Region};
use bytes::BytesMut;
use std::future::Future;
use std::net::SocketAddr;
use std::time::Duration;
use tracing::debug;

// test will hang forever with the default (single-threaded) test executor
#[tokio::test(flavor = "multi_thread")]
#[should_panic(
    expected = "error reading a body from connection: end of file before message length reached"
)]
async fn test_streaming_response_fails_when_eof_comes_before_content_length_reached() {
    // We spawn a faulty server that will close the connection after
    // writing half of the response body.
    let (server, server_addr) = start_faulty_server().await;
    let _ = tokio::spawn(server);

    let creds = Credentials::new(
        "ANOTREAL",
        "notrealrnrELgWzOk3IfjzDKtFBhDby",
        Some("notarealsessiontoken".to_string()),
        None,
        "test",
    );

    let conf = aws_sdk_s3::Config::builder()
        .credentials_provider(creds)
        .region(Region::new("us-east-1"))
        .endpoint_resolver(Endpoint::immutable(
            format!("http://{server_addr}").parse().expect("valid URI"),
        ))
        .build();

    let client = aws_sdk_s3::client::Client::from_conf(conf);

    // This will succeed b/c the head of the response is fine.
    let res = client
        .get_object()
        .bucket("some-test-bucket")
        .key("test.txt")
        .send()
        .await
        .unwrap();

    // Should panic here when the body is read with an "UnexpectedEof" error
    if let Err(e) = res.body.collect().await {
        panic!("{e}")
    }
}

async fn start_faulty_server() -> (impl Future<Output = ()>, SocketAddr) {
    use tokio::net::{TcpListener, TcpStream};
    use tokio::time::sleep;

    let listener = TcpListener::bind("0.0.0.0:0")
        .await
        .expect("socket is free");
    let bind_addr = listener.local_addr().unwrap();

    async fn process_socket(socket: TcpStream) {
        let mut buf = BytesMut::new();
        let response: &[u8] = br#"HTTP/1.1 200 OK
x-amz-request-id: 4B4NGF0EAWN0GE63
content-length: 12
etag: 3e25960a79dbc69b674cd4ec67a72c62
content-type: application/octet-stream
server: AmazonS3
content-encoding:
last-modified: Tue, 21 Jun 2022 16:29:14 GMT
date: Tue, 21 Jun 2022 16:29:23 GMT
x-amz-id-2: kPl+IVVZAwsN8ePUyQJZ40WD9dzaqtr4eNESArqE68GSKtVvuvCTDe+SxhTT+JTUqXB1HL4OxNM=
accept-ranges: bytes

Hello"#;
        let mut time_to_respond = false;

        loop {
            match socket.try_read_buf(&mut buf) {
                Ok(0) => {
                    unreachable!(
                        "The connection will be closed before this branch is ever reached"
                    );
                }
                Ok(n) => {
                    debug!("read {n} bytes from the socket");
                    if let Ok(s) = std::str::from_utf8(&buf) {
                        debug!("buf currently looks like:\n{s:?}");
                    }

                    // Check for CRLF to see if we've received the entire HTTP request.
                    if buf.ends_with(b"\r\n\r\n") {
                        time_to_respond = true;
                    }
                }
                Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                    debug!("reading would block, sleeping for 1ms and then trying again");
                    sleep(Duration::from_millis(1)).await;
                }
                Err(e) => {
                    panic!("{e}")
                }
            }

            if socket.writable().await.is_ok() {
                if time_to_respond {
                    // The content length is 12 but we'll only write 5 bytes
                    socket.try_write(&response).unwrap();
                    // We break from the R/W loop after sending a partial response in order to
                    // close the connection early.
                    debug!("faulty server has written partial response, now closing connection");
                    break;
                }
            }
        }
    }

    let fut = async move {
        loop {
            let (socket, addr) = listener
                .accept()
                .await
                .expect("listener can accept new connections");
            debug!("server received new connection from {addr:?}");
            let start = std::time::Instant::now();
            process_socket(socket).await;
            debug!(
                "connection to {addr:?} closed after {:.02?}",
                start.elapsed()
            );
        }
    };

    (fut, bind_addr)
}