Unverified Commit a0801ece authored by Aaron Todd's avatar Aaron Todd Committed by GitHub
Browse files

re-use http checksums on retry attempts (#4200)

## Description
A change was needed for the S3 Flexible Checksum SEP to add guidance for
reusing checksums during retry attempts to prevent data corruption. When
a request fails after checksum calculation is complete, SDKs must save
and reuse the checksum for retry attempts rather than recalculating it.
This prevents inconsistencies when payload content might change between
retries, ensuring data durability in S3.

* Adds a simple cache to the checksum crate that will favor cached
checksums from prior attempts if set
* Adds new integration tests to verify retry behavior and re-use of
checksums

NOTE: If a user were to actually replace file contents between attempts
with a different content length the Rust SDK uses the original content
length set on the ByteStream. This happens early when we create the
bytestream by taking the user provided content length OR calculating it
from the file. We don't ever attempt to recalculate this and I see no
great way of doing this. The result is a client side failure about
stream length mismatch as opposed to sending the request to the server
with the original checksum.

## Checklist
<!--- If a checkbox below is not applicable, then please DELETE it
rather than leaving it unchecked -->
- [x] For changes to the AWS SDK, generated SDK code, or SDK runtime
crates, I have created a changelog entry Markdown file in the
`.changelog` directory, specifying "aws-sdk-rust" in the `applies_to`
key.

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
parent fd9ab82b
Loading
Loading
Loading
Loading
+12 −0
Original line number Diff line number Diff line
---
applies_to:
- client
- aws-sdk-rust
authors:
- aajtodd
references: []
breaking: false
new_feature: false
bug_fix: false
---
re-use checksums on retry attempts for enhanced durability
+2 −2
Original line number Diff line number Diff line
@@ -231,7 +231,7 @@ dependencies = [

[[package]]
name = "aws-smithy-checksums"
version = "0.63.4"
version = "0.63.5"
dependencies = [
 "aws-smithy-http",
 "aws-smithy-types",
@@ -341,7 +341,7 @@ dependencies = [

[[package]]
name = "aws-smithy-runtime-api"
version = "1.8.2"
version = "1.8.3"
dependencies = [
 "aws-smithy-async",
 "aws-smithy-types",
+5 −5
Original line number Diff line number Diff line
@@ -50,7 +50,7 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"

[[package]]
name = "aws-config"
version = "1.8.1"
version = "1.8.2"
dependencies = [
 "aws-credential-types",
 "aws-runtime",
@@ -117,7 +117,7 @@ dependencies = [

[[package]]
name = "aws-runtime"
version = "1.6.0"
version = "1.5.9"
dependencies = [
 "aws-credential-types",
 "aws-sigv4",
@@ -230,7 +230,7 @@ dependencies = [

[[package]]
name = "aws-smithy-http"
version = "0.62.2"
version = "0.62.1"
dependencies = [
 "aws-smithy-runtime-api",
 "aws-smithy-types",
@@ -318,7 +318,7 @@ dependencies = [

[[package]]
name = "aws-smithy-runtime"
version = "1.8.3"
version = "1.8.4"
dependencies = [
 "aws-smithy-async",
 "aws-smithy-http",
@@ -341,7 +341,7 @@ dependencies = [

[[package]]
name = "aws-smithy-runtime-api"
version = "1.8.1"
version = "1.8.3"
dependencies = [
 "aws-smithy-async",
 "aws-smithy-types",
+95 −22
Original line number Diff line number Diff line
@@ -7,9 +7,11 @@

//! Interceptor for handling Smithy `@httpChecksum` request checksumming with AWS SigV4

use crate::presigning::PresigningMarker;
use aws_runtime::auth::PayloadSigningOverride;
use aws_runtime::content_encoding::header_value::AWS_CHUNKED;
use aws_runtime::content_encoding::{AwsChunkedBody, AwsChunkedBodyOptions};
use aws_smithy_checksums::body::ChecksumCache;
use aws_smithy_checksums::ChecksumAlgorithm;
use aws_smithy_checksums::{body::calculate, http::HttpChecksum};
use aws_smithy_runtime::client::sdk_feature::SmithySdkFeature;
@@ -28,10 +30,11 @@ use aws_smithy_types::error::operation::BuildError;
use http::HeaderValue;
use http_body::Body;
use std::str::FromStr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::{fmt, mem};

use crate::presigning::PresigningMarker;

/// Errors related to constructing checksum-validated HTTP requests
#[derive(Debug)]
pub(crate) enum Error {
@@ -64,6 +67,8 @@ struct RequestChecksumInterceptorState {
    checksum_algorithm: Option<String>,
    /// This value is set in the model on the `httpChecksum` trait
    request_checksum_required: bool,
    calculate_checksum: Arc<AtomicBool>,
    checksum_cache: ChecksumCache,
}
impl Storable for RequestChecksumInterceptorState {
    type Storer = StoreReplace<Self>;
@@ -150,15 +155,15 @@ where
        layer.store_put(RequestChecksumInterceptorState {
            checksum_algorithm,
            request_checksum_required,
            checksum_cache: ChecksumCache::new(),
            calculate_checksum: Arc::new(AtomicBool::new(false)),
        });
        cfg.push_layer(layer);

        Ok(())
    }

    /// Calculate a checksum and modify the request to include the checksum as a header
    /// (for in-memory request bodies) or a trailer (for streaming request bodies).
    /// Streaming bodies must be sized or this will return an error.
    /// Setup state for calculating checksum and setting UA features
    fn modify_before_retry_loop(
        &self,
        context: &mut BeforeTransmitInterceptorContextMut<'_>,
@@ -207,14 +212,17 @@ where
            _ => true,
        };

        // Calculate the checksum if necessary
        if calculate_checksum {
        // If a checksum override is set in the ConfigBag we use that instead (currently only used by S3Express)
        // If we have made it this far without a checksum being set we set the default (currently Crc32)
        let checksum_algorithm =
            incorporate_custom_default(checksum_algorithm, cfg).unwrap_or_default();

        if calculate_checksum {
            state.calculate_checksum.store(true, Ordering::Release);

            // Set the user-agent metric for the selected checksum algorithm
            // NOTE: We have to do this in modify_before_retry_loop since UA interceptor also runs
            // in modify_before_signing but is registered before this interceptor (client level vs operation level).
            match checksum_algorithm {
                ChecksumAlgorithm::Crc32 => {
                    cfg.interceptor_state()
@@ -244,9 +252,43 @@ where
                        more_info = "Unsupported value of ChecksumAlgorithm detected when setting user-agent metrics",
                        unsupported = ?unsupported),
            }
        }

        Ok(())
    }

    /// Calculate a checksum and modify the request to include the checksum as a header
    /// (for in-memory request bodies) or a trailer (for streaming request bodies).
    /// Streaming bodies must be sized or this will return an error.
    fn modify_before_signing(
        &self,
        context: &mut BeforeTransmitInterceptorContextMut<'_>,
        _runtime_components: &RuntimeComponents,
        cfg: &mut ConfigBag,
    ) -> Result<(), BoxError> {
        let state = cfg
            .load::<RequestChecksumInterceptorState>()
            .expect("set in `read_before_serialization`");

        let checksum_cache = state.checksum_cache.clone();

        let checksum_algorithm = state
            .checksum_algorithm
            .clone()
            .map(|s| ChecksumAlgorithm::from_str(s.as_str()))
            .transpose()?;

        let calculate_checksum = state.calculate_checksum.load(Ordering::SeqCst);

        // Calculate the checksum if necessary
        if calculate_checksum {
            // If a checksum override is set in the ConfigBag we use that instead (currently only used by S3Express)
            // If we have made it this far without a checksum being set we set the default (currently Crc32)
            let checksum_algorithm =
                incorporate_custom_default(checksum_algorithm, cfg).unwrap_or_default();

            let request = context.request_mut();
            add_checksum_for_request_body(request, checksum_algorithm, cfg)?;
            add_checksum_for_request_body(request, checksum_algorithm, checksum_cache, cfg)?;
        }

        Ok(())
@@ -295,6 +337,7 @@ fn incorporate_custom_default(
fn add_checksum_for_request_body(
    request: &mut HttpRequest,
    checksum_algorithm: ChecksumAlgorithm,
    checksum_cache: ChecksumCache,
    cfg: &mut ConfigBag,
) -> Result<(), BoxError> {
    match request.body().bytes() {
@@ -308,9 +351,22 @@ fn add_checksum_for_request_body(
                tracing::debug!("applying {checksum_algorithm:?} of the request body as a header");
                checksum.update(data);

                let calculated_headers = checksum.headers();
                let checksum_headers = if let Some(cached_headers) = checksum_cache.get() {
                    if cached_headers != calculated_headers {
                        tracing::warn!(cached = ?cached_headers, calculated = ?calculated_headers, "calculated checksum differs from cached checksum!");
                    }
                    cached_headers
                } else {
                    checksum_cache.set(calculated_headers.clone());
                    calculated_headers
                };

                for (hdr_name, hdr_value) in checksum_headers.iter() {
                    request
                        .headers_mut()
                    .insert(checksum.header_name(), checksum.header_value());
                        .insert(hdr_name.clone(), hdr_value.clone());
                }
            }
        }
        // Body is streaming: wrap the body so it will emit a checksum as a trailer.
@@ -318,7 +374,11 @@ fn add_checksum_for_request_body(
            tracing::debug!("applying {checksum_algorithm:?} of the request body as a trailer");
            cfg.interceptor_state()
                .store_put(PayloadSigningOverride::StreamingUnsignedPayloadTrailer);
            wrap_streaming_request_body_in_checksum_calculating_body(request, checksum_algorithm)?;
            wrap_streaming_request_body_in_checksum_calculating_body(
                request,
                checksum_algorithm,
                checksum_cache.clone(),
            )?;
        }
    }
    Ok(())
@@ -327,6 +387,7 @@ fn add_checksum_for_request_body(
fn wrap_streaming_request_body_in_checksum_calculating_body(
    request: &mut HttpRequest,
    checksum_algorithm: ChecksumAlgorithm,
    checksum_cache: ChecksumCache,
) -> Result<(), BuildError> {
    let checksum = checksum_algorithm.into_impl();

@@ -347,7 +408,8 @@ fn wrap_streaming_request_body_in_checksum_calculating_body(
        body.map(move |body| {
            let checksum = checksum_algorithm.into_impl();
            let trailer_len = HttpChecksum::size(checksum.as_ref());
            let body = calculate::ChecksumBody::new(body, checksum);
            let body =
                calculate::ChecksumBody::new(body, checksum).with_cache(checksum_cache.clone());
            let aws_chunked_body_options =
                AwsChunkedBodyOptions::new(original_body_size, vec![trailer_len]);

@@ -394,6 +456,7 @@ fn wrap_streaming_request_body_in_checksum_calculating_body(
#[cfg(test)]
mod tests {
    use crate::http_request_checksum::wrap_streaming_request_body_in_checksum_calculating_body;
    use aws_smithy_checksums::body::ChecksumCache;
    use aws_smithy_checksums::ChecksumAlgorithm;
    use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
    use aws_smithy_types::base64;
@@ -417,7 +480,12 @@ mod tests {
        assert!(request.body().try_clone().is_some());

        let checksum_algorithm: ChecksumAlgorithm = "crc32".parse().unwrap();
        wrap_streaming_request_body_in_checksum_calculating_body(&mut request, checksum_algorithm)
        let checksum_cache = ChecksumCache::new();
        wrap_streaming_request_body_in_checksum_calculating_body(
            &mut request,
            checksum_algorithm,
            checksum_cache,
        )
        .unwrap();

        // ensure wrapped SdkBody is retryable
@@ -463,7 +531,12 @@ mod tests {
        // ensure original SdkBody is retryable
        assert!(request.body().try_clone().is_some());

        wrap_streaming_request_body_in_checksum_calculating_body(&mut request, checksum_algorithm)
        let checksum_cache = ChecksumCache::new();
        wrap_streaming_request_body_in_checksum_calculating_body(
            &mut request,
            checksum_algorithm,
            checksum_cache,
        )
        .unwrap();

        // ensure wrapped SdkBody is retryable
+2 −0
Original line number Diff line number Diff line
@@ -21,6 +21,7 @@ import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Compani
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.Hound
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.Http1x
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.HttpBody1x
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.HttpBodyUtil
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.SerdeJson
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.Smol
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.TempFile
@@ -186,6 +187,7 @@ class S3TestDependencies(private val runtimeConfig: RuntimeConfig) : LibRsCustom
            addDependency(FuturesUtil.toDevDependency())
            addDependency(HdrHistogram)
            addDependency(HttpBody1x.toDevDependency().copy(optional = false))
            addDependency(HttpBodyUtil.toDevDependency().copy(optional = false))
            addDependency(Smol)
            addDependency(TempFile)
            addDependency(TracingAppender)
Loading