Unverified Commit 18738745 authored by ysaito1001's avatar ysaito1001 Committed by GitHub
Browse files

Add integration test for stall stream protection based on aws-sdk-rust#1202 (#3874)

## Motivation and Context
A follow-up on #3871, responding to [the review
feedback](https://github.com/smithy-lang/smithy-rs/pull/3871#pullrequestreview-2357786934)

## Testing
- Also confirmed that reverting the change in the above PR (so that
`BinLabel::Pending` becomes the top of the list) failed the integration
test added to this PR, as expected.
```
2024-10-10T19:06:56.417686Z TRACE aws_smithy_runtime::client::http::body::minimum_throughput::http_body_0_4_x: received poll pending
2024-10-10T19:06:56.417694Z DEBUG aws_smithy_runtime::client::http::body::minimum_throughput::http_body_0_4_x: current throughput: 0 B/s is below minimum: 1 B/s
thread 'user_polls_pending_followed_by_data_for_every_bin_in_throughput_logs' panicked at aws-smithy-runtime/tests/stalled_stream_download.rs:252:10:
response MUST NOT timeout: ThroughputBelowMinimum { expected: Throughput { bytes_read: 1, per_time_elapsed: 1s }, actual: Throughput { bytes_read: 0, per_time_elapsed: 1s } }
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace


failures:
    user_polls_pending_followed_by_data_for_every_bin_in_throughput_logs
```

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
parent e6b154b0
Loading
Loading
Loading
Loading
+3 −3
Original line number Diff line number Diff line
@@ -410,7 +410,7 @@ name = "aws-smithy-experimental"
version = "0.1.4"
dependencies = [
 "aws-smithy-async 1.2.1",
 "aws-smithy-runtime 1.7.2",
 "aws-smithy-runtime 1.7.3",
 "aws-smithy-runtime-api 1.7.2",
 "aws-smithy-types 1.2.7",
 "h2 0.4.6",
@@ -655,7 +655,7 @@ dependencies = [

[[package]]
name = "aws-smithy-runtime"
version = "1.7.2"
version = "1.7.3"
dependencies = [
 "approx",
 "aws-smithy-async 1.2.1",
@@ -1991,7 +1991,7 @@ dependencies = [
 "aws-smithy-compression",
 "aws-smithy-http 0.60.11",
 "aws-smithy-json 0.60.7",
 "aws-smithy-runtime 1.7.2",
 "aws-smithy-runtime 1.7.3",
 "aws-smithy-runtime-api 1.7.2",
 "aws-smithy-types 1.2.7",
 "aws-smithy-xml 0.60.9",
+1 −1
Original line number Diff line number Diff line
[package]
name = "aws-smithy-runtime"
version = "1.7.2"
version = "1.7.3"
authors = ["AWS Rust SDK Team <aws-sdk-rust@amazon.com>", "Zelda Hessler <zhessler@amazon.com>"]
description = "The new smithy runtime crate"
edition = "2021"
+76 −1
Original line number Diff line number Diff line
@@ -6,6 +6,8 @@
#![cfg(all(feature = "client", feature = "test-util"))]

use std::time::Duration;
use tokio::sync::mpsc::channel;
use tokio::sync::Barrier;

#[macro_use]
mod stalled_stream_common;
@@ -194,10 +196,66 @@ async fn user_downloads_data_too_slowly() {
    result.expect("response MUST NOT timeout");
}

/// Scenario: Derived from the reproduction steps in https://github.com/awslabs/aws-sdk-rust/issues/1202.
/// Expected: MUST NOT timeout.
#[tokio::test]
async fn user_polls_pending_followed_by_data_for_every_bin_in_throughput_logs() {
    let _logs = show_test_logs();

    let (time, sleep) = tick_advance_time_and_sleep();
    let (server, response_sender) = channel_server();
    let op = operation(server, time.clone(), sleep);

    let (tx_server, mut rx_server) = channel(1);
    let (tx_client, rx_client) = channel(1);

    let server = tokio::spawn(async move {
        for _ in 1..100 {
            // Block until a signal has been received
            let _ = rx_server.recv().await;
            if response_sender.send(NEAT_DATA).await.is_err() {
                // The client has shut down due to a minimum throughput detection error
                break;
            }
        }
        drop(response_sender);
    });

    let _ticker = tokio::spawn({
        async move {
            // Each `Bin` has a time resolution of 100ms. In every iteration, the client will go first, yielding
            // a `Poll::Pending` in the first half of the allotted time. The server will then take its turn in the
            // second half to generate data, allowing the client to yield a `Poll::Ready` immediately after.
            // This creates a consistent pattern in throughput logs: within each 100ms interval, a newly created `Bin`
            // will be assigned a `BinLabel::Pending`, followed by an attempt to assign `BinLabel::TransferredBytes` to
            // the same `Bin`.
            loop {
                tick!(time, Duration::from_millis(50));
                // We don't `unwrap` here since it will eventually fail when the client shuts down due to the minimum
                // throughput detection error.
                let _ = tx_client.send(()).await;
                tick!(time, Duration::from_millis(50));
                // We don't `unwrap` here since it will eventually fail when the server exits due to the client shutting
                // down due to a minimum throughput detection error.
                let _ = tx_server.send(()).await;
            }
        }
    });

    let response_body = op.invoke(()).await.expect("initial success");
    let result = tokio::spawn(consume_on_signal(rx_client, response_body));
    server.await.unwrap();

    result
        .await
        .expect("no panics")
        .expect("response MUST NOT timeout");
}

use download_test_tools::*;
use tokio::sync::Barrier;
mod download_test_tools {
    use crate::stalled_stream_common::*;
    use tokio::sync::mpsc::Receiver;

    fn response(body: SdkBody) -> HttpResponse {
        HttpResponse::try_from(
@@ -307,4 +365,21 @@ mod download_test_tools {
        }
        Ok(())
    }

    /// A client that allows us to control when data is consumed by sending a signal to `rx`.
    pub async fn consume_on_signal(mut rx: Receiver<()>, body: SdkBody) -> Result<(), BoxError> {
        // Wait to start polling until a signal has been received
        let _ = rx.recv().await;
        pin_mut!(body);
        while let Some(result) = poll_fn(|cx| body.as_mut().poll_data(cx)).await {
            if let Err(err) = result {
                return Err(err);
            } else {
                info!("consumed bytes from the response body");
                // Block until a signal has been received
                let _ = rx.recv().await;
            }
        }
        Ok(())
    }
}