Unverified Commit 1e803494 authored by ysaito1001's avatar ysaito1001 Committed by GitHub
Browse files

Make `TransferredBytes` be the top of the list in `BinLabel` (#3871)

## Motivation and Context
https://github.com/awslabs/aws-sdk-rust/issues/1202

## Description
The issue above demonstrated the incorrect
[BinLabel](https://github.com/smithy-lang/smithy-rs/blob/07fe426697cc30ad613568902528c305f953deb1/rust-runtime/aws-smithy-runtime/src/client/http/body/minimum_throughput/throughput.rs#L100-L119)
ordering in
[LogBuffer](https://github.com/smithy-lang/smithy-rs/blob/07fe426697cc30ad613568902528c305f953deb1/rust-runtime/aws-smithy-runtime/src/client/http/body/minimum_throughput/throughput.rs#L173-L183),
the underlying data structure we use for stall stream protection.

The following trace logs are generated from executing the reproduction
steps in the issue above. In the file labeled "no_sleep," we have
commented out
`std::thread::sleep(std::time::Duration::from_millis(120));` from the
reproducer so the updated code can be tested as the happy path.


[s3_throughput_min_repro_no_sleep.log](https://github.com/user-attachments/files/17299373/s3_throughput_min_repro_no_sleep.log)


[s3_throughput_min_repro_with_sleep.log](https://github.com/user-attachments/files/17299447/s3_throughput_min_repro_with_sleep.log)

In both files, it’s important to note that `Bin`s assigned
`TransferredBytes` can be overwritten by `Pending` due to
[`ThroughputLogs::push`](https://github.com/smithy-lang/smithy-rs/blob/07fe426697cc30ad613568902528c305f953deb1/rust-runtime/aws-smithy-runtime/src/client/http/body/minimum_throughput/throughput.rs#L346).
Once a `Bin` is labeled as `Pending`, it cannot be re-labeled.

When this occurs, the only way to avoid the stall stream protection
check going into the grace period is for time to advance beyond the
current `Bin`'s resolution, the `LogBuffer` pushes a new `Bin` during
[`catch_up`](https://github.com/smithy-lang/smithy-rs/blob/07fe426697cc30ad613568902528c305f953deb1/rust-runtime/aws-smithy-runtime/src/client/http/body/minimum_throughput/throughput.rs#L355)
, and this new `Bin` hopefully [gets
assigned](https://github.com/smithy-lang/smithy-rs/blob/07fe426697cc30ad613568902528c305f953deb1/rust-runtime/aws-smithy-runtime/src/client/http/body/minimum_throughput/http_body_0_4_x.rs#L78-L79)
a `TransferredBytes`. However, this new `Bin` could also be overwritten
by Pending in a subsequent call to
[`MinimumThroughputDownloadBody::poll_data`](https://github.com/smithy-lang/smithy-rs/blob/07fe426697cc30ad613568902528c305f953deb1/rust-runtime/aws-smithy-runtime/src/client/http/body/minimum_throughput/http_body_0_4_x.rs#L78-L79),
which can trigger the the grace period if the overall `LogBuffer` looks
like it's violated the stall stream protection check.

The reproducer without sleep does not fail the stall stream protection
obviously because the execution completes way before the grace period
ends, but more importantly because the execution periodically assigns
new `TransferredBytes` `Bin`s in the throughput logs. This effectively
resets the grace period for the stall stream protection (search for
`throughput recovered; exiting grace period` in the
`s3_throughput_min_repro_no_sleep.log`). However, with sleep, `Bin`s
labeled as `TransferredBytes` are frequently (and almost immediately)
overwritten by `Pending`. This results in the execution being unable to
exit the grace period, ultimately leading to a stall stream protection
error.

To resolve this, we make `TransferredBytes` be the top priority in
`BinLabel`. This means once a new `Bin` has earned `TransferredBytes`,
it's green for that time resolution and that it should not be revoked by
`Pending` overwriting it to make it look like no bytes transferred
during that time.

## Testing
- Existing tests in CI
- Added unit tests for `BinLabel` ordering and for `ThroughputLogs`
- Passed the customer's reproduction step
- To confirm the stall stream protection for download still works, I
switched off WiFi while running the customer's reproducer (with sleep)
and it successfully failed with the stall stream protection error:
```
---- s3_throughput_min_repro stdout ----
2024-10-08T23:29:24.999477Z DEBUG aws_smithy_runtime::client::http::body::minimum_throughput::http_body_0_4_x: current throughput: 0 B/s is below minimum: 1 B/s
2024-10-08T23:29:24.999513Z TRACE aws_smithy_runtime::client::http::body::minimum_throughput::http_body_0_4_x: received poll pending
2024-10-08T23:29:24.999530Z DEBUG aws_smithy_runtime::client::http::body::minimum_throughput::http_body_0_4_x: current throughput: 0 B/s is below minimum: 1 B/s
2024-10-08T23:29:25.081811Z TRACE aws_smithy_runtime::client::http::body::minimum_throughput::http_body_0_4_x: received poll pending
2024-10-08T23:29:25.081938Z DEBUG aws_smithy_runtime::client::http::body::minimum_throughput::http_body_0_4_x: current throughput: 0 B/s is below minimum: 1 B/s
test s3_throughput_min_repro ... FAILED
...
called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: Error { kind: StreamingError(ThroughputBelowMinimum { expected: Throughput { bytes_read: 1, per_time_elapsed: 1s }, actual: Throughput { bytes_read: 0, per_time_elapsed: 1s } }) } }
```

## Checklist
<!--- If a checkbox below is not applicable, then please DELETE it
rather than leaving it unchecked -->
- [x] For changes to the smithy-rs codegen or runtime crates, I have
created a changelog entry Markdown file in the `.changelog` directory,
specifying "client," "server," or both in the `applies_to` key.
- [x] For changes to the AWS SDK, generated SDK code, or SDK runtime
crates, I have created a changelog entry Markdown file in the
`.changelog` directory, specifying "aws-sdk-rust" in the `applies_to`
key.

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
parent 07fe4266
Loading
Loading
Loading
Loading
+14 −0
Original line number Diff line number Diff line
---
applies_to:
- client
- aws-sdk-rust
authors:
- ysaito1001
references:
- smithy-rs#3871
- aws-sdk-rust#1202
breaking: false
new_feature: false
bug_fix: true
---
Fix minimum throughput detection for downloads to avoid incorrectly raising an error while the user is consuming data at a slow but steady pace.
+28 −28
Original line number Diff line number Diff line
@@ -203,7 +203,7 @@ dependencies = [
 "aws-smithy-async 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
 "aws-smithy-eventstream 0.60.5 (registry+https://github.com/rust-lang/crates.io-index)",
 "aws-smithy-http 0.60.11 (registry+https://github.com/rust-lang/crates.io-index)",
 "aws-smithy-runtime 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
 "aws-smithy-runtime 1.7.1",
 "aws-smithy-runtime-api 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
 "aws-smithy-types 1.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
 "aws-types",
@@ -233,7 +233,7 @@ dependencies = [
 "aws-smithy-eventstream 0.60.5 (registry+https://github.com/rust-lang/crates.io-index)",
 "aws-smithy-http 0.60.11 (registry+https://github.com/rust-lang/crates.io-index)",
 "aws-smithy-json 0.60.7 (registry+https://github.com/rust-lang/crates.io-index)",
 "aws-smithy-runtime 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
 "aws-smithy-runtime 1.7.1",
 "aws-smithy-runtime-api 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
 "aws-smithy-types 1.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
 "aws-smithy-xml 0.60.9 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -410,7 +410,7 @@ name = "aws-smithy-experimental"
version = "0.1.4"
dependencies = [
 "aws-smithy-async 1.2.1",
 "aws-smithy-runtime 1.7.1",
 "aws-smithy-runtime 1.7.2",
 "aws-smithy-runtime-api 1.7.2",
 "aws-smithy-types 1.2.7",
 "h2 0.4.6",
@@ -580,9 +580,11 @@ dependencies = [
[[package]]
name = "aws-smithy-protocol-test"
version = "0.62.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "495c940cd5c7232ac3f0945ff559096deadd2fc73e4418a0e98fe5836788bb39"
dependencies = [
 "assert-json-diff",
 "aws-smithy-runtime-api 1.7.2",
 "aws-smithy-runtime-api 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
 "base64-simd",
 "cbor-diag",
 "http 0.2.12",
@@ -596,19 +598,17 @@ dependencies = [

[[package]]
name = "aws-smithy-protocol-test"
version = "0.62.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "495c940cd5c7232ac3f0945ff559096deadd2fc73e4418a0e98fe5836788bb39"
version = "0.63.0"
dependencies = [
 "assert-json-diff",
 "aws-smithy-runtime-api 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
 "aws-smithy-runtime-api 1.7.2",
 "base64-simd",
 "cbor-diag",
 "ciborium",
 "http 0.2.12",
 "pretty_assertions",
 "regex-lite",
 "roxmltree",
 "serde_cbor",
 "serde_json",
 "thiserror",
]
@@ -624,19 +624,18 @@ dependencies = [
[[package]]
name = "aws-smithy-runtime"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1ce695746394772e7000b39fe073095db6d45a862d0767dd5ad0ac0d7f8eb87"
dependencies = [
 "approx",
 "aws-smithy-async 1.2.1",
 "aws-smithy-http 0.60.11",
 "aws-smithy-async 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
 "aws-smithy-http 0.60.11 (registry+https://github.com/rust-lang/crates.io-index)",
 "aws-smithy-protocol-test 0.62.0",
 "aws-smithy-runtime-api 1.7.2",
 "aws-smithy-types 1.2.7",
 "aws-smithy-runtime-api 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
 "aws-smithy-types 1.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
 "bytes",
 "fastrand",
 "futures-util",
 "h2 0.3.26",
 "http 0.2.12",
 "http 1.1.0",
 "http-body 0.4.6",
 "http-body 1.0.1",
 "httparse",
@@ -646,31 +645,30 @@ dependencies = [
 "once_cell",
 "pin-project-lite",
 "pin-utils",
 "pretty_assertions",
 "rustls 0.21.12",
 "serde",
 "serde_json",
 "tokio",
 "tracing",
 "tracing-subscriber",
 "tracing-test",
]

[[package]]
name = "aws-smithy-runtime"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1ce695746394772e7000b39fe073095db6d45a862d0767dd5ad0ac0d7f8eb87"
version = "1.7.2"
dependencies = [
 "aws-smithy-async 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
 "aws-smithy-http 0.60.11 (registry+https://github.com/rust-lang/crates.io-index)",
 "aws-smithy-protocol-test 0.62.0 (registry+https://github.com/rust-lang/crates.io-index)",
 "aws-smithy-runtime-api 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
 "aws-smithy-types 1.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
 "approx",
 "aws-smithy-async 1.2.1",
 "aws-smithy-http 0.60.11",
 "aws-smithy-protocol-test 0.63.0",
 "aws-smithy-runtime-api 1.7.2",
 "aws-smithy-types 1.2.7",
 "bytes",
 "fastrand",
 "futures-util",
 "h2 0.3.26",
 "http 0.2.12",
 "http 1.1.0",
 "http-body 0.4.6",
 "http-body 1.0.1",
 "httparse",
@@ -680,12 +678,14 @@ dependencies = [
 "once_cell",
 "pin-project-lite",
 "pin-utils",
 "pretty_assertions",
 "rustls 0.21.12",
 "serde",
 "serde_json",
 "tokio",
 "tracing",
 "tracing-subscriber",
 "tracing-test",
]

[[package]]
@@ -809,7 +809,7 @@ dependencies = [
name = "aws-smithy-xml"
version = "0.60.9"
dependencies = [
 "aws-smithy-protocol-test 0.62.0",
 "aws-smithy-protocol-test 0.63.0",
 "base64 0.13.1",
 "proptest",
 "xmlparser",
@@ -1991,7 +1991,7 @@ dependencies = [
 "aws-smithy-compression",
 "aws-smithy-http 0.60.11",
 "aws-smithy-json 0.60.7",
 "aws-smithy-runtime 1.7.1",
 "aws-smithy-runtime 1.7.2",
 "aws-smithy-runtime-api 1.7.2",
 "aws-smithy-types 1.2.7",
 "aws-smithy-xml 0.60.9",
+1 −1
Original line number Diff line number Diff line
[package]
name = "aws-smithy-runtime"
version = "1.7.1"
version = "1.7.2"
authors = ["AWS Rust SDK Team <aws-sdk-rust@amazon.com>", "Zelda Hessler <zhessler@amazon.com>"]
description = "The new smithy runtime crate"
edition = "2021"
+38 −9
Original line number Diff line number Diff line
@@ -101,7 +101,7 @@ impl From<(u64, Duration)> for Throughput {
#[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq)]
enum BinLabel {
    // IMPORTANT: The order of these enums matters since it represents their priority:
    // Pending > TransferredBytes > NoPolling > Empty
    // TransferredBytes > Pending > NoPolling > Empty
    //
    /// There is no data in this bin.
    Empty,
@@ -109,13 +109,11 @@ enum BinLabel {
    /// No polling took place during this bin.
    NoPolling,

    /// This many bytes were transferred during this bin.
    TransferredBytes,

    /// The user/remote was not providing/consuming data fast enough during this bin.
    ///
    /// The number is the number of bytes transferred, if this replaced TransferredBytes.
    Pending,

    /// This many bytes were transferred during this bin.
    TransferredBytes,
}

/// Represents a bin (or a cell) in a linear grid that represents a small chunk of time.
@@ -139,8 +137,8 @@ impl Bin {

    fn merge(&mut self, other: Bin) -> &mut Self {
        // Assign values based on this priority order (highest priority higher up):
        //   1. Pending
        //   2. TransferredBytes
        //   1. TransferredBytes
        //   2. Pending
        //   3. NoPolling
        //   4. Empty
        self.label = if other.label > self.label {
@@ -410,6 +408,14 @@ mod test {
    use super::*;
    use std::time::Duration;

    #[test]
    fn test_log_buffer_bin_label_priority() {
        use BinLabel::*;
        assert!(Empty < NoPolling);
        assert!(NoPolling < Pending);
        assert!(Pending < TransferredBytes);
    }

    #[test]
    fn test_throughput_eq() {
        let t1 = Throughput::new(1, Duration::from_secs(1));
@@ -521,7 +527,7 @@ mod test {
        assert_eq!(ThroughputReport::NoPolling, report);
    }

    // Transferred bytes MUST take priority over pending
    // Transferred bytes MUST take priority over pending when reporting throughput
    #[test]
    fn mixed_bag_mostly_pending() {
        let start = SystemTime::UNIX_EPOCH;
@@ -571,4 +577,27 @@ mod test {

        tl.push_pending(t0);
    }

    #[test]
    fn test_label_transferred_bytes_should_not_be_overwritten_by_pending() {
        let start = SystemTime::UNIX_EPOCH;
        // Each `Bin`'s resolution is 100ms (1s / BIN_COUNT), where `BIN_COUNT` is 10
        let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);

        // push `TransferredBytes` and then `Pending` in the same first `Bin`
        logs.push_bytes_transferred(start + Duration::from_millis(10), 10);
        logs.push_pending(start + Duration::from_millis(20));

        let BinCounts {
            empty,
            no_polling,
            transferred,
            pending,
        } = logs.buffer.counts();

        assert_eq!(9, empty);
        assert_eq!(0, no_polling);
        assert_eq!(1, transferred); // `transferred` should still be there
        assert_eq!(0, pending); // while `pending` should cease to exist, failing to overwrite `transferred`
    }
}