Unverified Commit 303d99b6 authored by Zelda Hessler's avatar Zelda Hessler Committed by GitHub
Browse files

Fix various small issues with the orchestrator (#2736)

## Motivation and Context
<!--- Why is this change required? What problem does it solve? -->
<!--- If it fixes an open issue, please link to the issue here -->
This change fixes many of the smaller issues I ran into during the
implementation of standard retries for the orchestrator. Merges from
`main` were getting difficult with my [other
PR](https://github.com/awslabs/smithy-rs/pull/2725) so I'm breaking
things up.

## Description
<!--- Describe your changes in detail -->
- when orchestrator attempt timeout occurs, error is now set in context
- update test connection to allow defining connection events with
optional latency simulation update orchestrator attempt loop to track
iteration count
- set request attempts from the attempt loop
- add comment explaining "rewind" step of making request attempts add
`doesnt_matter` method to `TypeErasedBox`, useful when testing update
tests to use the new `TypeErasedBox::doesnt_matter` method
- add more doc comments
- add `set_subsec_nanos` method to `DateTime`.
- I added this to make it easier to string-format a datetime that didn't
include the nanos.
- fix Invocation ID interceptor not inserting the expected header update
input type for `OperationError::other` to be more user-friendly
- add `test-util` feature to `aws-smithy-runtime-api`
- add `test-util` feature to `aws-runtime`
- fix presigining inlineable to pull in tower dep during codegen

## Testing
<!--- Please describe in detail how you tested your changes -->
<!--- Include details of your testing environment, and the tests you ran
to -->
<!--- see how your change affects other areas of the code, etc. -->
tests have been updated where necessary
----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
parent 0cba3d89
Loading
Loading
Loading
Loading
+5 −2
Original line number Original line Diff line number Diff line
@@ -9,17 +9,18 @@ repository = "https://github.com/awslabs/smithy-rs"


[features]
[features]
event-stream = ["dep:aws-smithy-eventstream", "aws-sigv4/sign-eventstream"]
event-stream = ["dep:aws-smithy-eventstream", "aws-sigv4/sign-eventstream"]
test-util = []


[dependencies]
[dependencies]
aws-credential-types = { path = "../aws-credential-types" }
aws-credential-types = { path = "../aws-credential-types" }
aws-http = { path = "../aws-http" }
aws-http = { path = "../aws-http" }
aws-sigv4 = { path = "../aws-sigv4" }
aws-sigv4 = { path = "../aws-sigv4" }
aws-smithy-async = { path = "../../../rust-runtime/aws-smithy-async" }
aws-smithy-eventstream = { path = "../../../rust-runtime/aws-smithy-eventstream", optional = true }
aws-smithy-eventstream = { path = "../../../rust-runtime/aws-smithy-eventstream", optional = true }
aws-smithy-http = { path = "../../../rust-runtime/aws-smithy-http" }
aws-smithy-http = { path = "../../../rust-runtime/aws-smithy-http" }
aws-smithy-runtime = { path = "../../../rust-runtime/aws-smithy-runtime" }
aws-smithy-runtime = { path = "../../../rust-runtime/aws-smithy-runtime" }
aws-smithy-runtime-api = { path = "../../../rust-runtime/aws-smithy-runtime-api" }
aws-smithy-runtime-api = { path = "../../../rust-runtime/aws-smithy-runtime-api" }
aws-smithy-types = { path = "../../../rust-runtime/aws-smithy-types" }
aws-smithy-types = { path = "../../../rust-runtime/aws-smithy-types" }
aws-smithy-async = { path = "../../../rust-runtime/aws-smithy-async" }
aws-types = { path = "../aws-types" }
aws-types = { path = "../aws-types" }
http = "0.2.3"
http = "0.2.3"
percent-encoding = "2.1.0"
percent-encoding = "2.1.0"
@@ -28,8 +29,10 @@ uuid = { version = "1", features = ["v4", "fast-rng"] }


[dev-dependencies]
[dev-dependencies]
aws-credential-types = { path = "../aws-credential-types", features = ["test-util"] }
aws-credential-types = { path = "../aws-credential-types", features = ["test-util"] }
aws-smithy-protocol-test = { path = "../../../rust-runtime/aws-smithy-protocol-test" }
aws-smithy-async = { path = "../../../rust-runtime/aws-smithy-async", features = ["test-util"] }
aws-smithy-async = { path = "../../../rust-runtime/aws-smithy-async", features = ["test-util"] }
aws-smithy-types = { path = "../../../rust-runtime/aws-smithy-types", features = ["test-util"] }
aws-smithy-protocol-test = { path = "../../../rust-runtime/aws-smithy-protocol-test" }
aws-smithy-runtime-api = { path = "../../../rust-runtime/aws-smithy-runtime-api", features = ["test-util"] }
proptest = "1"
proptest = "1"
serde = { version = "1", features = ["derive"]}
serde = { version = "1", features = ["derive"]}
serde_json = "1"
serde_json = "1"
+127 −45
Original line number Original line Diff line number Diff line
@@ -9,17 +9,26 @@ use aws_smithy_runtime_api::client::interceptors::{
};
};
use aws_smithy_types::config_bag::ConfigBag;
use aws_smithy_types::config_bag::ConfigBag;
use http::{HeaderName, HeaderValue};
use http::{HeaderName, HeaderValue};
use std::fmt::Debug;
use uuid::Uuid;
use uuid::Uuid;


#[cfg(feature = "test-util")]
pub use test_util::{NoInvocationIdGenerator, PredefinedInvocationIdGenerator};

#[allow(clippy::declare_interior_mutable_const)] // we will never mutate this
#[allow(clippy::declare_interior_mutable_const)] // we will never mutate this
const AMZ_SDK_INVOCATION_ID: HeaderName = HeaderName::from_static("amz-sdk-invocation-id");
const AMZ_SDK_INVOCATION_ID: HeaderName = HeaderName::from_static("amz-sdk-invocation-id");


/// A generator for returning new invocation IDs on demand.
pub trait InvocationIdGenerator: Debug + Send + Sync {
    /// Call this function to receive a new [`InvocationId`] or an error explaining why one couldn't
    /// be provided.
    fn generate(&self) -> Result<Option<InvocationId>, BoxError>;
}

/// This interceptor generates a UUID and attaches it to all request attempts made as part of this operation.
/// This interceptor generates a UUID and attaches it to all request attempts made as part of this operation.
#[non_exhaustive]
#[non_exhaustive]
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct InvocationIdInterceptor {
pub struct InvocationIdInterceptor {}
    id: InvocationId,
}


impl InvocationIdInterceptor {
impl InvocationIdInterceptor {
    /// Creates a new `InvocationIdInterceptor`
    /// Creates a new `InvocationIdInterceptor`
@@ -28,39 +37,50 @@ impl InvocationIdInterceptor {
    }
    }
}
}


impl Default for InvocationIdInterceptor {
    fn default() -> Self {
        Self {
            id: InvocationId::from_uuid(),
        }
    }
}

impl Interceptor for InvocationIdInterceptor {
impl Interceptor for InvocationIdInterceptor {
    fn modify_before_retry_loop(
    fn modify_before_retry_loop(
        &self,
        &self,
        context: &mut BeforeTransmitInterceptorContextMut<'_>,
        _ctx: &mut BeforeTransmitInterceptorContextMut<'_>,
        _cfg: &mut ConfigBag,
        cfg: &mut ConfigBag,
    ) -> Result<(), BoxError> {
    ) -> Result<(), BoxError> {
        let headers = context.request_mut().headers_mut();
        let id = cfg
        let id = _cfg.get::<InvocationId>().unwrap_or(&self.id);
            .get::<Box<dyn InvocationIdGenerator>>()
            .map(|gen| gen.generate())
            .transpose()?
            .flatten();
        cfg.put::<InvocationId>(id.unwrap_or_default());

        Ok(())
    }

    fn modify_before_transmit(
        &self,
        ctx: &mut BeforeTransmitInterceptorContextMut<'_>,
        cfg: &mut ConfigBag,
    ) -> Result<(), BoxError> {
        let headers = ctx.request_mut().headers_mut();
        let id = cfg
            .get::<InvocationId>()
            .ok_or("Expected an InvocationId in the ConfigBag but none was present")?;
        headers.append(AMZ_SDK_INVOCATION_ID, id.0.clone());
        headers.append(AMZ_SDK_INVOCATION_ID, id.0.clone());
        Ok(())
        Ok(())
    }
    }
}
}


/// InvocationId provides a consistent ID across retries
/// InvocationId provides a consistent ID across retries
#[derive(Debug)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InvocationId(HeaderValue);
pub struct InvocationId(HeaderValue);

impl InvocationId {
impl InvocationId {
    /// A test invocation id to allow deterministic requests
    /// Create a new, random, invocation ID.
    pub fn for_tests() -> Self {
    pub fn new() -> Self {
        InvocationId(HeaderValue::from_static(
        Self::default()
            "00000000-0000-4000-8000-000000000000",
    }
        ))
}
}


    fn from_uuid() -> Self {
/// Defaults to a random UUID.
impl Default for InvocationId {
    fn default() -> Self {
        let id = Uuid::new_v4();
        let id = Uuid::new_v4();
        let id = id
        let id = id
            .to_string()
            .to_string()
@@ -70,45 +90,107 @@ impl InvocationId {
    }
    }
}
}


#[cfg(feature = "test-util")]
mod test_util {
    use super::*;
    use std::sync::{Arc, Mutex};

    impl InvocationId {
        /// Create a new invocation ID from a `&'static str`.
        pub fn new_from_str(uuid: &'static str) -> Self {
            InvocationId(HeaderValue::from_static(uuid))
        }
    }

    /// A "generator" that returns [`InvocationId`]s from a predefined list.
    #[derive(Debug)]
    pub struct PredefinedInvocationIdGenerator {
        pre_generated_ids: Arc<Mutex<Vec<InvocationId>>>,
    }

    impl PredefinedInvocationIdGenerator {
        /// Given a `Vec<InvocationId>`, create a new [`PredefinedInvocationIdGenerator`].
        pub fn new(mut invocation_ids: Vec<InvocationId>) -> Self {
            // We're going to pop ids off of the end of the list, so we need to reverse the list or else
            // we'll be popping the ids in reverse order, confusing the poor test writer.
            invocation_ids.reverse();

            Self {
                pre_generated_ids: Arc::new(Mutex::new(invocation_ids)),
            }
        }
    }

    impl InvocationIdGenerator for PredefinedInvocationIdGenerator {
        fn generate(&self) -> Result<Option<InvocationId>, BoxError> {
            Ok(Some(
                self.pre_generated_ids
                    .lock()
                    .expect("this will never be under contention")
                    .pop()
                    .expect("testers will provide enough invocation IDs"),
            ))
        }
    }

    /// A "generator" that always returns `None`.
    #[derive(Debug, Default)]
    pub struct NoInvocationIdGenerator;

    impl NoInvocationIdGenerator {
        /// Create a new [`NoInvocationIdGenerator`].
        pub fn new() -> Self {
            Self::default()
        }
    }

    impl InvocationIdGenerator for NoInvocationIdGenerator {
        fn generate(&self) -> Result<Option<InvocationId>, BoxError> {
            Ok(None)
        }
    }
}

#[cfg(test)]
#[cfg(test)]
mod tests {
mod tests {
    use crate::invocation_id::InvocationIdInterceptor;
    use crate::invocation_id::{InvocationId, InvocationIdInterceptor};
    use aws_smithy_http::body::SdkBody;
    use aws_smithy_http::body::SdkBody;
    use aws_smithy_runtime_api::client::interceptors::{Interceptor, InterceptorContext};
    use aws_smithy_runtime_api::client::interceptors::{
        BeforeTransmitInterceptorContextMut, Interceptor, InterceptorContext,
    };
    use aws_smithy_types::config_bag::ConfigBag;
    use aws_smithy_types::config_bag::ConfigBag;
    use aws_smithy_types::type_erasure::TypedBox;
    use aws_smithy_types::type_erasure::TypeErasedBox;
    use http::HeaderValue;
    use http::HeaderValue;


    fn expect_header<'a>(context: &'a InterceptorContext, header_name: &str) -> &'a HeaderValue {
    fn expect_header<'a>(
        context
        context: &'a BeforeTransmitInterceptorContextMut<'_>,
            .request()
        header_name: &str,
            .expect("request is set")
    ) -> &'a HeaderValue {
            .headers()
        context.request().headers().get(header_name).unwrap()
            .get(header_name)
            .unwrap()
    }
    }


    #[test]
    #[test]
    fn test_id_is_generated_and_set() {
    fn test_id_is_generated_and_set() {
        let mut context = InterceptorContext::new(TypedBox::new("doesntmatter").erase());
        let mut ctx = InterceptorContext::new(TypeErasedBox::doesnt_matter());
        context.enter_serialization_phase();
        ctx.enter_serialization_phase();
        context.set_request(http::Request::builder().body(SdkBody::empty()).unwrap());
        ctx.set_request(http::Request::builder().body(SdkBody::empty()).unwrap());
        let _ = context.take_input();
        let _ = ctx.take_input();
        context.enter_before_transmit_phase();
        ctx.enter_before_transmit_phase();


        let mut config = ConfigBag::base();
        let mut cfg = ConfigBag::base();
        let interceptor = InvocationIdInterceptor::new();
        let interceptor = InvocationIdInterceptor::new();
        let mut ctx = Into::into(&mut context);
        let mut ctx = Into::into(&mut ctx);
        interceptor
        interceptor
            .modify_before_signing(&mut ctx, &mut config)
            .modify_before_retry_loop(&mut ctx, &mut cfg)
            .unwrap();
            .unwrap();
        interceptor
        interceptor
            .modify_before_retry_loop(&mut ctx, &mut config)
            .modify_before_transmit(&mut ctx, &mut cfg)
            .unwrap();
            .unwrap();


        let header = expect_header(&context, "amz-sdk-invocation-id");
        let expected = cfg.get::<InvocationId>().expect("invocation ID was set");
        assert_eq!(&interceptor.id.0, header);
        let header = expect_header(&ctx, "amz-sdk-invocation-id");
        assert_eq!(expected.0, header, "the invocation ID in the config bag must match the invocation ID in the request header");
        // UUID should include 32 chars and 4 dashes
        // UUID should include 32 chars and 4 dashes
        assert_eq!(interceptor.id.0.len(), 36);
        assert_eq!(header.len(), 36);
    }
    }
}
}
+2 −2
Original line number Original line Diff line number Diff line
@@ -75,7 +75,7 @@ mod tests {
    use aws_smithy_http::body::SdkBody;
    use aws_smithy_http::body::SdkBody;
    use aws_smithy_protocol_test::{assert_ok, validate_headers};
    use aws_smithy_protocol_test::{assert_ok, validate_headers};
    use aws_smithy_runtime_api::client::interceptors::InterceptorContext;
    use aws_smithy_runtime_api::client::interceptors::InterceptorContext;
    use aws_smithy_types::type_erasure::TypedBox;
    use aws_smithy_types::type_erasure::TypeErasedBox;
    use aws_types::os_shim_internal::Env;
    use aws_types::os_shim_internal::Env;
    use http::HeaderValue;
    use http::HeaderValue;
    use proptest::{prelude::*, proptest};
    use proptest::{prelude::*, proptest};
@@ -148,7 +148,7 @@ mod tests {
            request = request.header(name, value);
            request = request.header(name, value);
        }
        }
        let request = request.body(SdkBody::empty()).expect("must be valid");
        let request = request.body(SdkBody::empty()).expect("must be valid");
        let mut context = InterceptorContext::new(TypedBox::new("doesntmatter").erase());
        let mut context = InterceptorContext::new(TypeErasedBox::doesnt_matter());
        context.enter_serialization_phase();
        context.enter_serialization_phase();
        context.set_request(request);
        context.set_request(request);
        let _ = context.take_input();
        let _ = context.take_input();
+18 −11
Original line number Original line Diff line number Diff line
@@ -3,10 +3,11 @@
 * SPDX-License-Identifier: Apache-2.0
 * SPDX-License-Identifier: Apache-2.0
 */
 */


use aws_smithy_runtime::client::orchestrator::interceptors::{RequestAttempts, ServiceClockSkew};
use aws_smithy_runtime::client::orchestrator::interceptors::ServiceClockSkew;
use aws_smithy_runtime_api::client::interceptors::{
use aws_smithy_runtime_api::client::interceptors::{
    BeforeTransmitInterceptorContextMut, BoxError, Interceptor,
    BeforeTransmitInterceptorContextMut, BoxError, Interceptor,
};
};
use aws_smithy_runtime_api::client::request_attempts::RequestAttempts;
use aws_smithy_types::config_bag::ConfigBag;
use aws_smithy_types::config_bag::ConfigBag;
use aws_smithy_types::date_time::Format;
use aws_smithy_types::date_time::Format;
use aws_smithy_types::retry::RetryConfig;
use aws_smithy_types::retry::RetryConfig;
@@ -44,7 +45,7 @@ impl RequestInfoInterceptor {
        let request_attempts = cfg
        let request_attempts = cfg
            .get::<RequestAttempts>()
            .get::<RequestAttempts>()
            .map(|r_a| r_a.attempts())
            .map(|r_a| r_a.attempts())
            .unwrap_or(1);
            .unwrap_or(0);
        let request_attempts = request_attempts.to_string();
        let request_attempts = request_attempts.to_string();
        Some((Cow::Borrowed("attempt"), Cow::Owned(request_attempts)))
        Some((Cow::Borrowed("attempt"), Cow::Owned(request_attempts)))
    }
    }
@@ -68,11 +69,19 @@ impl RequestInfoInterceptor {
        let estimated_skew: Duration = cfg.get::<ServiceClockSkew>().cloned()?.into();
        let estimated_skew: Duration = cfg.get::<ServiceClockSkew>().cloned()?.into();
        let current_time = SystemTime::now();
        let current_time = SystemTime::now();
        let ttl = current_time.checked_add(socket_read + estimated_skew)?;
        let ttl = current_time.checked_add(socket_read + estimated_skew)?;
        let timestamp = DateTime::from(ttl);
        let mut timestamp = DateTime::from(ttl);
        let formatted_timestamp = timestamp
        // Set subsec_nanos to 0 so that the formatted `DateTime` won't have fractional seconds.
        timestamp.set_subsec_nanos(0);
        let mut formatted_timestamp = timestamp
            .fmt(Format::DateTime)
            .fmt(Format::DateTime)
            .expect("the resulting DateTime will always be valid");
            .expect("the resulting DateTime will always be valid");


        // Remove dashes and colons
        formatted_timestamp = formatted_timestamp
            .chars()
            .filter(|&c| c != '-' && c != ':')
            .collect();

        Some((Cow::Borrowed("ttl"), Cow::Owned(formatted_timestamp)))
        Some((Cow::Borrowed("ttl"), Cow::Owned(formatted_timestamp)))
    }
    }
}
}
@@ -84,13 +93,13 @@ impl Interceptor for RequestInfoInterceptor {
        cfg: &mut ConfigBag,
        cfg: &mut ConfigBag,
    ) -> Result<(), BoxError> {
    ) -> Result<(), BoxError> {
        let mut pairs = RequestPairs::new();
        let mut pairs = RequestPairs::new();
        if let Some(pair) = self.build_attempts_pair(cfg) {
        if let Some(pair) = self.build_ttl_pair(cfg) {
            pairs = pairs.with_pair(pair);
            pairs = pairs.with_pair(pair);
        }
        }
        if let Some(pair) = self.build_max_attempts_pair(cfg) {
        if let Some(pair) = self.build_attempts_pair(cfg) {
            pairs = pairs.with_pair(pair);
            pairs = pairs.with_pair(pair);
        }
        }
        if let Some(pair) = self.build_ttl_pair(cfg) {
        if let Some(pair) = self.build_max_attempts_pair(cfg) {
            pairs = pairs.with_pair(pair);
            pairs = pairs.with_pair(pair);
        }
        }


@@ -156,12 +165,11 @@ mod tests {
    use super::RequestInfoInterceptor;
    use super::RequestInfoInterceptor;
    use crate::request_info::RequestPairs;
    use crate::request_info::RequestPairs;
    use aws_smithy_http::body::SdkBody;
    use aws_smithy_http::body::SdkBody;
    use aws_smithy_runtime::client::orchestrator::interceptors::RequestAttempts;
    use aws_smithy_runtime_api::client::interceptors::{Interceptor, InterceptorContext};
    use aws_smithy_runtime_api::client::interceptors::{Interceptor, InterceptorContext};
    use aws_smithy_types::config_bag::ConfigBag;
    use aws_smithy_types::config_bag::ConfigBag;
    use aws_smithy_types::retry::RetryConfig;
    use aws_smithy_types::retry::RetryConfig;
    use aws_smithy_types::timeout::TimeoutConfig;
    use aws_smithy_types::timeout::TimeoutConfig;
    use aws_smithy_types::type_erasure::TypedBox;
    use aws_smithy_types::type_erasure::TypeErasedBox;
    use http::HeaderValue;
    use http::HeaderValue;
    use std::time::Duration;
    use std::time::Duration;


@@ -178,7 +186,7 @@ mod tests {


    #[test]
    #[test]
    fn test_request_pairs_for_initial_attempt() {
    fn test_request_pairs_for_initial_attempt() {
        let mut context = InterceptorContext::new(TypedBox::new("doesntmatter").erase());
        let mut context = InterceptorContext::new(TypeErasedBox::doesnt_matter());
        context.enter_serialization_phase();
        context.enter_serialization_phase();
        context.set_request(http::Request::builder().body(SdkBody::empty()).unwrap());
        context.set_request(http::Request::builder().body(SdkBody::empty()).unwrap());


@@ -189,7 +197,6 @@ mod tests {
                .read_timeout(Duration::from_secs(30))
                .read_timeout(Duration::from_secs(30))
                .build(),
                .build(),
        );
        );
        config.put(RequestAttempts::new());


        let _ = context.take_input();
        let _ = context.take_input();
        context.enter_before_transmit_phase();
        context.enter_before_transmit_phase();
+2 −2
Original line number Original line Diff line number Diff line
@@ -112,7 +112,7 @@ mod tests {
    use aws_smithy_runtime_api::client::interceptors::{Interceptor, InterceptorContext};
    use aws_smithy_runtime_api::client::interceptors::{Interceptor, InterceptorContext};
    use aws_smithy_types::config_bag::ConfigBag;
    use aws_smithy_types::config_bag::ConfigBag;
    use aws_smithy_types::error::display::DisplayErrorContext;
    use aws_smithy_types::error::display::DisplayErrorContext;
    use aws_smithy_types::type_erasure::TypedBox;
    use aws_smithy_types::type_erasure::TypeErasedBox;


    fn expect_header<'a>(context: &'a InterceptorContext, header_name: &str) -> &'a str {
    fn expect_header<'a>(context: &'a InterceptorContext, header_name: &str) -> &'a str {
        context
        context
@@ -126,7 +126,7 @@ mod tests {
    }
    }


    fn context() -> InterceptorContext {
    fn context() -> InterceptorContext {
        let mut context = InterceptorContext::new(TypedBox::new("doesntmatter").erase());
        let mut context = InterceptorContext::new(TypeErasedBox::doesnt_matter());
        context.enter_serialization_phase();
        context.enter_serialization_phase();
        context.set_request(http::Request::builder().body(SdkBody::empty()).unwrap());
        context.set_request(http::Request::builder().body(SdkBody::empty()).unwrap());
        let _ = context.take_input();
        let _ = context.take_input();
Loading