Unverified Commit 3b8e0eeb authored by Zelda Hessler's avatar Zelda Hessler Committed by GitHub
Browse files

Add simple request concurrency tests to S3 integration test suite. (#2061)

* add: multi-threaded concurrency test for S3 client
add: single-threaded concurrency test for S3 client
parent 6eca6a87
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
@@ -13,14 +13,17 @@ import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Compani
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.AsyncStream
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.BytesUtils
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.Criterion
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.FastRand
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.FuturesCore
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.FuturesUtil
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.HdrHistogram
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.Hound
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.SerdeJson
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.Smol
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.TempFile
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.Tokio
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.Tracing
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.TracingAppender
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.TracingSubscriber
import software.amazon.smithy.rust.codegen.core.rustlang.DependencyScope
import software.amazon.smithy.rust.codegen.core.rustlang.Writable
@@ -117,7 +120,10 @@ class S3TestDependencies : LibRsCustomization() {
        writable {
            addDependency(AsyncStd)
            addDependency(BytesUtils)
            addDependency(FastRand)
            addDependency(HdrHistogram)
            addDependency(Smol)
            addDependency(TempFile)
            addDependency(TracingAppender)
        }
}
+5 −1
Original line number Diff line number Diff line
@@ -21,6 +21,9 @@ aws-smithy-types = { path = "../../build/aws-sdk/sdk/aws-smithy-types" }
aws-types = { path = "../../build/aws-sdk/sdk/aws-types" }
bytes = "1"
bytes-utils = "0.1.2"
fastrand = "1.8.0"
futures-util = "0.3.25"
hdrhistogram = "7.5.2"
http = "0.2.3"
http-body = "0.4.5"
hyper = "0.14.12"
@@ -29,4 +32,5 @@ smol = "1.2"
tempfile = "3"
tokio = { version = "1.8.4", features = ["macros", "test-util", "rt-multi-thread"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.15", features = ["env-filter"] }
tracing-appender = "0.2.2"
tracing-subscriber = { version = "0.3.15", features = ["env-filter", "json"] }
+262 −0
Original line number Diff line number Diff line
/*
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0
 */

use std::future::Future;
use std::iter::repeat_with;
use std::net::SocketAddr;
use std::sync::Arc;

use aws_sdk_s3::Client;
use aws_smithy_http::endpoint::Endpoint;
use aws_smithy_types::timeout::TimeoutConfig;
use aws_types::credentials::SharedCredentialsProvider;
use aws_types::region::Region;
use aws_types::{Credentials, SdkConfig};
use bytes::BytesMut;
use futures_util::future;
use hdrhistogram::sync::SyncHistogram;
use hdrhistogram::Histogram;
use tokio::sync::Semaphore;
use tokio::time::{Duration, Instant};
use tracing::debug;

const TASK_COUNT: usize = 10_000;
// Larger requests take longer to send, which means we'll consume more network resources per
// request, which means we can't support as many concurrent connections to S3.
const TASK_PAYLOAD_LENGTH: usize = 10_000;
// At 130 and above, this test will fail with a `ConnectorError` from `hyper`. I've seen:
// - ConnectorError { kind: Io, source: hyper::Error(Canceled, hyper::Error(Io, Os { code: 54, kind: ConnectionReset, message: "Connection reset by peer" })) }
// - ConnectorError { kind: Io, source: hyper::Error(BodyWrite, Os { code: 32, kind: BrokenPipe, message: "Broken pipe" }) }
// These errors don't necessarily occur when actually running against S3 with concurrency levels
// above 129. You can test it for yourself by running the
// `test_concurrency_put_object_against_live` test that appears at the bottom of this file.
const CONCURRENCY_LIMIT: usize = 129;

#[tokio::test(flavor = "multi_thread")]
async fn test_concurrency_on_multi_thread_against_dummy_server() {
    let (server, server_addr) = start_agreeable_server().await;
    let _ = tokio::spawn(server);
    let sdk_config = SdkConfig::builder()
        .credentials_provider(SharedCredentialsProvider::new(Credentials::new(
            "ANOTREAL",
            "notrealrnrELgWzOk3IfjzDKtFBhDby",
            Some("notarealsessiontoken".to_string()),
            None,
            "test",
        )))
        .region(Region::new("us-east-1"))
        .endpoint_resolver(
            Endpoint::immutable(format!("http://{server_addr}")).expect("valid endpoint"),
        )
        .build();

    test_concurrency(sdk_config).await;
}

#[tokio::test(flavor = "current_thread")]
async fn test_concurrency_on_single_thread_against_dummy_server() {
    let (server, server_addr) = start_agreeable_server().await;
    let _ = tokio::spawn(server);
    let sdk_config = SdkConfig::builder()
        .credentials_provider(SharedCredentialsProvider::new(Credentials::new(
            "ANOTREAL",
            "notrealrnrELgWzOk3IfjzDKtFBhDby",
            Some("notarealsessiontoken".to_string()),
            None,
            "test",
        )))
        .region(Region::new("us-east-1"))
        .endpoint_resolver(
            Endpoint::immutable(format!("http://{server_addr}")).expect("valid endpoint"),
        )
        .build();

    test_concurrency(sdk_config).await;
}

#[ignore = "this test runs against S3 and requires credentials"]
#[tokio::test(flavor = "multi_thread")]
async fn test_concurrency_on_multi_thread_against_s3() {
    let sdk_config = aws_config::from_env()
        .timeout_config(
            TimeoutConfig::builder()
                .connect_timeout(Duration::from_secs(30))
                .read_timeout(Duration::from_secs(30))
                .build(),
        )
        .load()
        .await;

    test_concurrency(sdk_config).await;
}

#[derive(Clone, Copy)]
enum State {
    Listening,
    Speaking,
}

// This server is agreeable because it always replies with `OK`
async fn start_agreeable_server() -> (impl Future<Output = ()>, SocketAddr) {
    use tokio::net::{TcpListener, TcpStream};
    use tokio::time::sleep;

    let listener = TcpListener::bind("0.0.0.0:0")
        .await
        .expect("socket is free");
    let bind_addr = listener.local_addr().unwrap();
    async fn handle_tcp_stream(tcp_stream: TcpStream) {
        let mut buf = BytesMut::new();
        let mut state = State::Listening;

        let response: &[u8] = b"HTTP/1.1 200 OK\r\n\r\n";
        let mut bytes_left_to_write = response.len();

        loop {
            match state {
                State::Listening => {
                    match tcp_stream.try_read_buf(&mut buf) {
                        Ok(_) => {
                            // Check for CRLF to see if we've received the entire HTTP request.
                            let s = String::from_utf8_lossy(&buf);
                            if let Some(content_length) = discern_content_length(&s) {
                                if let Some(body_length) = discern_body_length(&s) {
                                    if body_length == content_length {
                                        state = State::Speaking;
                                    }
                                }
                            }
                        }
                        Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                            // reading would block, sleeping for 1ms and then trying again
                            sleep(Duration::from_millis(1)).await;
                        }
                        Err(err) => {
                            panic!("{}", err)
                        }
                    }
                }
                State::Speaking => {
                    if tcp_stream.writable().await.is_ok() {
                        let bytes_written = tcp_stream.try_write(response).unwrap();
                        bytes_left_to_write -= bytes_written;
                        if bytes_left_to_write == 0 {
                            break;
                        }
                    }
                }
            }
        }
    }

    let fut = async move {
        loop {
            let (tcp_stream, _addr) = listener
                .accept()
                .await
                .expect("listener can accept new connections");
            handle_tcp_stream(tcp_stream).await;
        }
    };

    (fut, bind_addr)
}

fn discern_content_length(s: &str) -> Option<usize> {
    // split on newlines
    s.split("\r\n")
        // throw out all lines that aren't the content-length header
        .find(|s| s.contains("content-length: "))
        // attempt to parse the numeric part of the header as a usize
        .and_then(|s| s.trim_start_matches("content-length: ").parse().ok())
}

fn discern_body_length(s: &str) -> Option<usize> {
    // If the request doesn't have a double CRLF, then we haven't finished reading it yet
    if !s.contains("\r\n\r\n") {
        return None;
    }
    // starting from end, split on the double CRLF that separates the body from the header
    s.rsplit("\r\n\r\n")
        // get the body, which must be the first element (we don't send trailers with PutObject requests)
        .next()
        // get the length of the body, in bytes, being sure to trim off the final newline
        .map(|s| s.trim_end().len())
}

async fn test_concurrency(sdk_config: SdkConfig) {
    let client = Client::new(&sdk_config);

    let histogram =
        Histogram::new_with_bounds(1, Duration::from_secs(60 * 60).as_nanos() as u64, 3)
            .unwrap()
            .into_sync();

    debug!("creating futures");
    // This semaphore ensures we only run up to <CONCURRENCY_LIMIT> requests at once.
    let semaphore = Arc::new(Semaphore::new(CONCURRENCY_LIMIT));
    let futures = (0..TASK_COUNT).map(|i| {
        let client = client.clone();
        let key = format!("concurrency/test_object_{:05}", i);
        let body: Vec<_> = repeat_with(fastrand::alphanumeric)
            .take(TASK_PAYLOAD_LENGTH)
            .map(|c| c as u8)
            .collect();
        let fut = client
            .put_object()
            .bucket("your-test-bucket-here")
            .key(key)
            .body(body.into())
            .send();
        // make a clone of the semaphore and the recorder that can live in the future
        let semaphore = semaphore.clone();
        let mut histogram_recorder = histogram.recorder();

        // because we wait on a permit from the semaphore, only <CONCURRENCY_LIMIT> futures
        // will be run at once. Otherwise, we'd quickly get rate-limited by S3.
        async move {
            let permit = semaphore
                .acquire()
                .await
                .expect("we'll get one if we wait long enough");
            let start = Instant::now();
            let res = fut.await.expect("request should succeed");
            histogram_recorder.saturating_record(start.elapsed().as_nanos() as u64);
            drop(permit);
            res
        }
    });

    debug!("joining futures");
    let res: Vec<_> = future::join_all(futures).await;
    // Assert we ran all the tasks
    assert_eq!(TASK_COUNT, res.len());

    display_metrics(
        "Request Latency",
        histogram,
        "s",
        Duration::from_secs(1).as_nanos() as f64,
    );
}

fn display_metrics(name: &str, mut h: SyncHistogram<u64>, unit: &str, scale: f64) {
    // Refreshing is required or else we won't see any results at all
    h.refresh();
    debug!("displaying {} results from {name} histogram", h.len());
    debug!(
        "{name}\n\
        \tmean:\t{:.1}{unit},\n\
        \tp50:\t{:.1}{unit},\n\
        \tp90:\t{:.1}{unit},\n\
        \tp99:\t{:.1}{unit},\n\
        \tmax:\t{:.1}{unit}",
        h.mean() / scale,
        h.value_at_quantile(0.5) as f64 / scale,
        h.value_at_quantile(0.9) as f64 / scale,
        h.value_at_quantile(0.99) as f64 / scale,
        h.max() as f64 / scale,
    );
}
+12 −6
Original line number Diff line number Diff line
@@ -191,7 +191,7 @@ data class CargoDependency(
        val Url: CargoDependency = CargoDependency("url", CratesIo("2.3.1"))
        val Bytes: CargoDependency = CargoDependency("bytes", CratesIo("1.0.0"))
        val BytesUtils: CargoDependency = CargoDependency("bytes-utils", CratesIo("0.1.0"))
        val FastRand: CargoDependency = CargoDependency("fastrand", CratesIo("1.0.0"))
        val FastRand: CargoDependency = CargoDependency("fastrand", CratesIo("1.8.0"))
        val Hex: CargoDependency = CargoDependency("hex", CratesIo("0.4.3"))
        val Http: CargoDependency = CargoDependency("http", CratesIo("0.2.0"))
        val HttpBody: CargoDependency = CargoDependency("http-body", CratesIo("0.4.4"))
@@ -210,21 +210,27 @@ data class CargoDependency(
        val AsyncStd: CargoDependency = CargoDependency("async-std", CratesIo("1.12.0"), DependencyScope.Dev)
        val AsyncStream: CargoDependency = CargoDependency("async-stream", CratesIo("0.3.0"), DependencyScope.Dev)
        val Criterion: CargoDependency = CargoDependency("criterion", CratesIo("0.4.0"), DependencyScope.Dev)
        val FuturesCore: CargoDependency = CargoDependency("futures-core", CratesIo("0.3.0"), DependencyScope.Dev)
        val FuturesUtil: CargoDependency = CargoDependency("futures-util", CratesIo("0.3.0"), DependencyScope.Dev)
        val FuturesCore: CargoDependency = CargoDependency("futures-core", CratesIo("0.3.25"), DependencyScope.Dev)
        val FuturesUtil: CargoDependency = CargoDependency("futures-util", CratesIo("0.3.25"), DependencyScope.Dev)
        val HdrHistogram: CargoDependency = CargoDependency("hdrhistogram", CratesIo("7.5.2"), DependencyScope.Dev)
        val Hound: CargoDependency = CargoDependency("hound", CratesIo("3.4.0"), DependencyScope.Dev)
        val PrettyAssertions: CargoDependency =
            CargoDependency("pretty_assertions", CratesIo("1.0.0"), DependencyScope.Dev)
            CargoDependency("pretty_assertions", CratesIo("1.3.0"), DependencyScope.Dev)
        val SerdeJson: CargoDependency = CargoDependency("serde_json", CratesIo("1.0.0"), DependencyScope.Dev)
        val Smol: CargoDependency = CargoDependency("smol", CratesIo("1.2.0"), DependencyScope.Dev)
        val TempFile: CargoDependency = CargoDependency("tempfile", CratesIo("3.2.0"), DependencyScope.Dev)
        val Tokio: CargoDependency =
            CargoDependency("tokio", CratesIo("1.8.4"), DependencyScope.Dev, features = setOf("macros", "test-util", "rt-multi-thread"))
        val TracingAppender: CargoDependency = CargoDependency(
            "tracing-appender",
            CratesIo("0.2.2"),
            DependencyScope.Dev,
        )
        val TracingSubscriber: CargoDependency = CargoDependency(
            "tracing-subscriber",
            CratesIo("0.3.15"),
            CratesIo("0.3.16"),
            DependencyScope.Dev,
            features = setOf("env-filter"),
            features = setOf("env-filter", "json"),
        )

        fun smithyAsync(runtimeConfig: RuntimeConfig) = runtimeConfig.runtimeCrate("async")