diff --git a/aws/rust-runtime/aws-runtime/Cargo.toml b/aws/rust-runtime/aws-runtime/Cargo.toml index e241ae92e7fb7c2771956a3c15bbcbed546a0a24..08411f4584b8b805c7a9245fe7b4b1ffae5f3cb7 100644 --- a/aws/rust-runtime/aws-runtime/Cargo.toml +++ b/aws/rust-runtime/aws-runtime/Cargo.toml @@ -9,17 +9,18 @@ repository = "https://github.com/awslabs/smithy-rs" [features] event-stream = ["dep:aws-smithy-eventstream", "aws-sigv4/sign-eventstream"] +test-util = [] [dependencies] aws-credential-types = { path = "../aws-credential-types" } aws-http = { path = "../aws-http" } aws-sigv4 = { path = "../aws-sigv4" } +aws-smithy-async = { path = "../../../rust-runtime/aws-smithy-async" } aws-smithy-eventstream = { path = "../../../rust-runtime/aws-smithy-eventstream", optional = true } aws-smithy-http = { path = "../../../rust-runtime/aws-smithy-http" } aws-smithy-runtime = { path = "../../../rust-runtime/aws-smithy-runtime" } aws-smithy-runtime-api = { path = "../../../rust-runtime/aws-smithy-runtime-api" } aws-smithy-types = { path = "../../../rust-runtime/aws-smithy-types" } -aws-smithy-async = { path = "../../../rust-runtime/aws-smithy-async" } aws-types = { path = "../aws-types" } http = "0.2.3" percent-encoding = "2.1.0" @@ -28,8 +29,10 @@ uuid = { version = "1", features = ["v4", "fast-rng"] } [dev-dependencies] aws-credential-types = { path = "../aws-credential-types", features = ["test-util"] } -aws-smithy-protocol-test = { path = "../../../rust-runtime/aws-smithy-protocol-test" } aws-smithy-async = { path = "../../../rust-runtime/aws-smithy-async", features = ["test-util"] } +aws-smithy-types = { path = "../../../rust-runtime/aws-smithy-types", features = ["test-util"] } +aws-smithy-protocol-test = { path = "../../../rust-runtime/aws-smithy-protocol-test" } +aws-smithy-runtime-api = { path = "../../../rust-runtime/aws-smithy-runtime-api", features = ["test-util"] } proptest = "1" serde = { version = "1", features = ["derive"]} serde_json = "1" diff --git a/aws/rust-runtime/aws-runtime/src/invocation_id.rs b/aws/rust-runtime/aws-runtime/src/invocation_id.rs index 899f76d753a0df56a9e90e2465ed2013f2b8d6b9..420cedb73debdf17712f6af1e5aa12ebd68b8e60 100644 --- a/aws/rust-runtime/aws-runtime/src/invocation_id.rs +++ b/aws/rust-runtime/aws-runtime/src/invocation_id.rs @@ -9,17 +9,26 @@ use aws_smithy_runtime_api::client::interceptors::{ }; use aws_smithy_types::config_bag::ConfigBag; use http::{HeaderName, HeaderValue}; +use std::fmt::Debug; use uuid::Uuid; +#[cfg(feature = "test-util")] +pub use test_util::{NoInvocationIdGenerator, PredefinedInvocationIdGenerator}; + #[allow(clippy::declare_interior_mutable_const)] // we will never mutate this const AMZ_SDK_INVOCATION_ID: HeaderName = HeaderName::from_static("amz-sdk-invocation-id"); +/// A generator for returning new invocation IDs on demand. +pub trait InvocationIdGenerator: Debug + Send + Sync { + /// Call this function to receive a new [`InvocationId`] or an error explaining why one couldn't + /// be provided. + fn generate(&self) -> Result, BoxError>; +} + /// This interceptor generates a UUID and attaches it to all request attempts made as part of this operation. #[non_exhaustive] -#[derive(Debug)] -pub struct InvocationIdInterceptor { - id: InvocationId, -} +#[derive(Debug, Default)] +pub struct InvocationIdInterceptor {} impl InvocationIdInterceptor { /// Creates a new `InvocationIdInterceptor` @@ -28,39 +37,50 @@ impl InvocationIdInterceptor { } } -impl Default for InvocationIdInterceptor { - fn default() -> Self { - Self { - id: InvocationId::from_uuid(), - } - } -} - impl Interceptor for InvocationIdInterceptor { fn modify_before_retry_loop( &self, - context: &mut BeforeTransmitInterceptorContextMut<'_>, - _cfg: &mut ConfigBag, + _ctx: &mut BeforeTransmitInterceptorContextMut<'_>, + cfg: &mut ConfigBag, ) -> Result<(), BoxError> { - let headers = context.request_mut().headers_mut(); - let id = _cfg.get::().unwrap_or(&self.id); + let id = cfg + .get::>() + .map(|gen| gen.generate()) + .transpose()? + .flatten(); + cfg.put::(id.unwrap_or_default()); + + Ok(()) + } + + fn modify_before_transmit( + &self, + ctx: &mut BeforeTransmitInterceptorContextMut<'_>, + cfg: &mut ConfigBag, + ) -> Result<(), BoxError> { + let headers = ctx.request_mut().headers_mut(); + let id = cfg + .get::() + .ok_or("Expected an InvocationId in the ConfigBag but none was present")?; headers.append(AMZ_SDK_INVOCATION_ID, id.0.clone()); Ok(()) } } /// InvocationId provides a consistent ID across retries -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct InvocationId(HeaderValue); + impl InvocationId { - /// A test invocation id to allow deterministic requests - pub fn for_tests() -> Self { - InvocationId(HeaderValue::from_static( - "00000000-0000-4000-8000-000000000000", - )) + /// Create a new, random, invocation ID. + pub fn new() -> Self { + Self::default() } +} - fn from_uuid() -> Self { +/// Defaults to a random UUID. +impl Default for InvocationId { + fn default() -> Self { let id = Uuid::new_v4(); let id = id .to_string() @@ -70,45 +90,107 @@ impl InvocationId { } } +#[cfg(feature = "test-util")] +mod test_util { + use super::*; + use std::sync::{Arc, Mutex}; + + impl InvocationId { + /// Create a new invocation ID from a `&'static str`. + pub fn new_from_str(uuid: &'static str) -> Self { + InvocationId(HeaderValue::from_static(uuid)) + } + } + + /// A "generator" that returns [`InvocationId`]s from a predefined list. + #[derive(Debug)] + pub struct PredefinedInvocationIdGenerator { + pre_generated_ids: Arc>>, + } + + impl PredefinedInvocationIdGenerator { + /// Given a `Vec`, create a new [`PredefinedInvocationIdGenerator`]. + pub fn new(mut invocation_ids: Vec) -> Self { + // We're going to pop ids off of the end of the list, so we need to reverse the list or else + // we'll be popping the ids in reverse order, confusing the poor test writer. + invocation_ids.reverse(); + + Self { + pre_generated_ids: Arc::new(Mutex::new(invocation_ids)), + } + } + } + + impl InvocationIdGenerator for PredefinedInvocationIdGenerator { + fn generate(&self) -> Result, BoxError> { + Ok(Some( + self.pre_generated_ids + .lock() + .expect("this will never be under contention") + .pop() + .expect("testers will provide enough invocation IDs"), + )) + } + } + + /// A "generator" that always returns `None`. + #[derive(Debug, Default)] + pub struct NoInvocationIdGenerator; + + impl NoInvocationIdGenerator { + /// Create a new [`NoInvocationIdGenerator`]. + pub fn new() -> Self { + Self::default() + } + } + + impl InvocationIdGenerator for NoInvocationIdGenerator { + fn generate(&self) -> Result, BoxError> { + Ok(None) + } + } +} + #[cfg(test)] mod tests { - use crate::invocation_id::InvocationIdInterceptor; + use crate::invocation_id::{InvocationId, InvocationIdInterceptor}; use aws_smithy_http::body::SdkBody; - use aws_smithy_runtime_api::client::interceptors::{Interceptor, InterceptorContext}; + use aws_smithy_runtime_api::client::interceptors::{ + BeforeTransmitInterceptorContextMut, Interceptor, InterceptorContext, + }; use aws_smithy_types::config_bag::ConfigBag; - use aws_smithy_types::type_erasure::TypedBox; + use aws_smithy_types::type_erasure::TypeErasedBox; use http::HeaderValue; - fn expect_header<'a>(context: &'a InterceptorContext, header_name: &str) -> &'a HeaderValue { - context - .request() - .expect("request is set") - .headers() - .get(header_name) - .unwrap() + fn expect_header<'a>( + context: &'a BeforeTransmitInterceptorContextMut<'_>, + header_name: &str, + ) -> &'a HeaderValue { + context.request().headers().get(header_name).unwrap() } #[test] fn test_id_is_generated_and_set() { - let mut context = InterceptorContext::new(TypedBox::new("doesntmatter").erase()); - context.enter_serialization_phase(); - context.set_request(http::Request::builder().body(SdkBody::empty()).unwrap()); - let _ = context.take_input(); - context.enter_before_transmit_phase(); + let mut ctx = InterceptorContext::new(TypeErasedBox::doesnt_matter()); + ctx.enter_serialization_phase(); + ctx.set_request(http::Request::builder().body(SdkBody::empty()).unwrap()); + let _ = ctx.take_input(); + ctx.enter_before_transmit_phase(); - let mut config = ConfigBag::base(); + let mut cfg = ConfigBag::base(); let interceptor = InvocationIdInterceptor::new(); - let mut ctx = Into::into(&mut context); + let mut ctx = Into::into(&mut ctx); interceptor - .modify_before_signing(&mut ctx, &mut config) + .modify_before_retry_loop(&mut ctx, &mut cfg) .unwrap(); interceptor - .modify_before_retry_loop(&mut ctx, &mut config) + .modify_before_transmit(&mut ctx, &mut cfg) .unwrap(); - let header = expect_header(&context, "amz-sdk-invocation-id"); - assert_eq!(&interceptor.id.0, header); + let expected = cfg.get::().expect("invocation ID was set"); + let header = expect_header(&ctx, "amz-sdk-invocation-id"); + assert_eq!(expected.0, header, "the invocation ID in the config bag must match the invocation ID in the request header"); // UUID should include 32 chars and 4 dashes - assert_eq!(interceptor.id.0.len(), 36); + assert_eq!(header.len(), 36); } } diff --git a/aws/rust-runtime/aws-runtime/src/recursion_detection.rs b/aws/rust-runtime/aws-runtime/src/recursion_detection.rs index 9437dce76bce44d3196e2af0cea29503280e22a9..d7101e686a588671da164338dcbb5b5d4174b3b8 100644 --- a/aws/rust-runtime/aws-runtime/src/recursion_detection.rs +++ b/aws/rust-runtime/aws-runtime/src/recursion_detection.rs @@ -75,7 +75,7 @@ mod tests { use aws_smithy_http::body::SdkBody; use aws_smithy_protocol_test::{assert_ok, validate_headers}; use aws_smithy_runtime_api::client::interceptors::InterceptorContext; - use aws_smithy_types::type_erasure::TypedBox; + use aws_smithy_types::type_erasure::TypeErasedBox; use aws_types::os_shim_internal::Env; use http::HeaderValue; use proptest::{prelude::*, proptest}; @@ -148,7 +148,7 @@ mod tests { request = request.header(name, value); } let request = request.body(SdkBody::empty()).expect("must be valid"); - let mut context = InterceptorContext::new(TypedBox::new("doesntmatter").erase()); + let mut context = InterceptorContext::new(TypeErasedBox::doesnt_matter()); context.enter_serialization_phase(); context.set_request(request); let _ = context.take_input(); diff --git a/aws/rust-runtime/aws-runtime/src/request_info.rs b/aws/rust-runtime/aws-runtime/src/request_info.rs index 140ac28aeadcc1ad97ac504f1132b4f3642e9a18..0b413105361eb3db23cbaa5d12f4337755271036 100644 --- a/aws/rust-runtime/aws-runtime/src/request_info.rs +++ b/aws/rust-runtime/aws-runtime/src/request_info.rs @@ -3,10 +3,11 @@ * SPDX-License-Identifier: Apache-2.0 */ -use aws_smithy_runtime::client::orchestrator::interceptors::{RequestAttempts, ServiceClockSkew}; +use aws_smithy_runtime::client::orchestrator::interceptors::ServiceClockSkew; use aws_smithy_runtime_api::client::interceptors::{ BeforeTransmitInterceptorContextMut, BoxError, Interceptor, }; +use aws_smithy_runtime_api::client::request_attempts::RequestAttempts; use aws_smithy_types::config_bag::ConfigBag; use aws_smithy_types::date_time::Format; use aws_smithy_types::retry::RetryConfig; @@ -44,7 +45,7 @@ impl RequestInfoInterceptor { let request_attempts = cfg .get::() .map(|r_a| r_a.attempts()) - .unwrap_or(1); + .unwrap_or(0); let request_attempts = request_attempts.to_string(); Some((Cow::Borrowed("attempt"), Cow::Owned(request_attempts))) } @@ -68,11 +69,19 @@ impl RequestInfoInterceptor { let estimated_skew: Duration = cfg.get::().cloned()?.into(); let current_time = SystemTime::now(); let ttl = current_time.checked_add(socket_read + estimated_skew)?; - let timestamp = DateTime::from(ttl); - let formatted_timestamp = timestamp + let mut timestamp = DateTime::from(ttl); + // Set subsec_nanos to 0 so that the formatted `DateTime` won't have fractional seconds. + timestamp.set_subsec_nanos(0); + let mut formatted_timestamp = timestamp .fmt(Format::DateTime) .expect("the resulting DateTime will always be valid"); + // Remove dashes and colons + formatted_timestamp = formatted_timestamp + .chars() + .filter(|&c| c != '-' && c != ':') + .collect(); + Some((Cow::Borrowed("ttl"), Cow::Owned(formatted_timestamp))) } } @@ -84,13 +93,13 @@ impl Interceptor for RequestInfoInterceptor { cfg: &mut ConfigBag, ) -> Result<(), BoxError> { let mut pairs = RequestPairs::new(); - if let Some(pair) = self.build_attempts_pair(cfg) { + if let Some(pair) = self.build_ttl_pair(cfg) { pairs = pairs.with_pair(pair); } - if let Some(pair) = self.build_max_attempts_pair(cfg) { + if let Some(pair) = self.build_attempts_pair(cfg) { pairs = pairs.with_pair(pair); } - if let Some(pair) = self.build_ttl_pair(cfg) { + if let Some(pair) = self.build_max_attempts_pair(cfg) { pairs = pairs.with_pair(pair); } @@ -156,12 +165,11 @@ mod tests { use super::RequestInfoInterceptor; use crate::request_info::RequestPairs; use aws_smithy_http::body::SdkBody; - use aws_smithy_runtime::client::orchestrator::interceptors::RequestAttempts; use aws_smithy_runtime_api::client::interceptors::{Interceptor, InterceptorContext}; use aws_smithy_types::config_bag::ConfigBag; use aws_smithy_types::retry::RetryConfig; use aws_smithy_types::timeout::TimeoutConfig; - use aws_smithy_types::type_erasure::TypedBox; + use aws_smithy_types::type_erasure::TypeErasedBox; use http::HeaderValue; use std::time::Duration; @@ -178,7 +186,7 @@ mod tests { #[test] fn test_request_pairs_for_initial_attempt() { - let mut context = InterceptorContext::new(TypedBox::new("doesntmatter").erase()); + let mut context = InterceptorContext::new(TypeErasedBox::doesnt_matter()); context.enter_serialization_phase(); context.set_request(http::Request::builder().body(SdkBody::empty()).unwrap()); @@ -189,7 +197,6 @@ mod tests { .read_timeout(Duration::from_secs(30)) .build(), ); - config.put(RequestAttempts::new()); let _ = context.take_input(); context.enter_before_transmit_phase(); diff --git a/aws/rust-runtime/aws-runtime/src/user_agent.rs b/aws/rust-runtime/aws-runtime/src/user_agent.rs index 51bace059a611e0b4d9721c1b0a215d0817ed602..52c6cdb4da26afef82fd6118716fc7676bcc1380 100644 --- a/aws/rust-runtime/aws-runtime/src/user_agent.rs +++ b/aws/rust-runtime/aws-runtime/src/user_agent.rs @@ -112,7 +112,7 @@ mod tests { use aws_smithy_runtime_api::client::interceptors::{Interceptor, InterceptorContext}; use aws_smithy_types::config_bag::ConfigBag; use aws_smithy_types::error::display::DisplayErrorContext; - use aws_smithy_types::type_erasure::TypedBox; + use aws_smithy_types::type_erasure::TypeErasedBox; fn expect_header<'a>(context: &'a InterceptorContext, header_name: &str) -> &'a str { context @@ -126,7 +126,7 @@ mod tests { } fn context() -> InterceptorContext { - let mut context = InterceptorContext::new(TypedBox::new("doesntmatter").erase()); + let mut context = InterceptorContext::new(TypeErasedBox::doesnt_matter()); context.enter_serialization_phase(); context.set_request(http::Request::builder().body(SdkBody::empty()).unwrap()); let _ = context.take_input(); diff --git a/aws/sdk-codegen/src/main/kotlin/software/amazon/smithy/rustsdk/AwsCustomizableOperationDecorator.kt b/aws/sdk-codegen/src/main/kotlin/software/amazon/smithy/rustsdk/AwsCustomizableOperationDecorator.kt index 62d3f79368b582aef26ed659fbad60e34ed6250e..1b4243c65cdc4c218f394f5be39f6213189769ed 100644 --- a/aws/sdk-codegen/src/main/kotlin/software/amazon/smithy/rustsdk/AwsCustomizableOperationDecorator.kt +++ b/aws/sdk-codegen/src/main/kotlin/software/amazon/smithy/rustsdk/AwsCustomizableOperationDecorator.kt @@ -33,7 +33,7 @@ class CustomizableOperationTestHelpers(runtimeConfig: RuntimeConfig) : "SharedInterceptor" to RuntimeType.smithyRuntimeApi(runtimeConfig) .resolve("client::interceptors::SharedInterceptor"), "TestParamsSetterInterceptor" to CargoDependency.smithyRuntime(runtimeConfig).withFeature("test-util") - .toType().resolve("client::test_util::interceptor::TestParamsSetterInterceptor"), + .toType().resolve("client::test_util::interceptors::TestParamsSetterInterceptor"), ) override fun section(section: CustomizableOperationSection): Writable = diff --git a/aws/sdk-codegen/src/main/kotlin/software/amazon/smithy/rustsdk/AwsRuntimeType.kt b/aws/sdk-codegen/src/main/kotlin/software/amazon/smithy/rustsdk/AwsRuntimeType.kt index dc2afd1a35a054a7a1821a04c499aed78df5b150..b0ae6a70ebb65f9b59b208cefc7c86f3176f9f3f 100644 --- a/aws/sdk-codegen/src/main/kotlin/software/amazon/smithy/rustsdk/AwsRuntimeType.kt +++ b/aws/sdk-codegen/src/main/kotlin/software/amazon/smithy/rustsdk/AwsRuntimeType.kt @@ -41,8 +41,13 @@ fun RuntimeConfig.awsRoot(): RuntimeCrateLocation { } object AwsRuntimeType { - fun presigning(): RuntimeType = - RuntimeType.forInlineDependency(InlineAwsDependency.forRustFile("presigning", visibility = Visibility.PUBLIC)) + fun presigning(): RuntimeType = RuntimeType.forInlineDependency( + InlineAwsDependency.forRustFile( + "presigning", + visibility = Visibility.PUBLIC, + CargoDependency.Tower, + ), + ) // TODO(enableNewSmithyRuntime): Delete defaultMiddleware and middleware.rs, and remove tower dependency from inlinables, when cleaning up middleware fun RuntimeConfig.defaultMiddleware() = RuntimeType.forInlineDependency( diff --git a/aws/sdk-codegen/src/main/kotlin/software/amazon/smithy/rustsdk/InvocationIdDecorator.kt b/aws/sdk-codegen/src/main/kotlin/software/amazon/smithy/rustsdk/InvocationIdDecorator.kt index c608f1c573f8d85d9ce00fb43b496ce4600e3007..220b57169af6e3a9748acb0cd94d91390355cad2 100644 --- a/aws/sdk-codegen/src/main/kotlin/software/amazon/smithy/rustsdk/InvocationIdDecorator.kt +++ b/aws/sdk-codegen/src/main/kotlin/software/amazon/smithy/rustsdk/InvocationIdDecorator.kt @@ -10,7 +10,7 @@ import software.amazon.smithy.rust.codegen.client.smithy.customize.ClientCodegen import software.amazon.smithy.rust.codegen.client.smithy.generators.ServiceRuntimePluginCustomization import software.amazon.smithy.rust.codegen.client.smithy.generators.ServiceRuntimePluginSection import software.amazon.smithy.rust.codegen.core.rustlang.Writable -import software.amazon.smithy.rust.codegen.core.rustlang.rust +import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate import software.amazon.smithy.rust.codegen.core.rustlang.writable import software.amazon.smithy.rust.codegen.core.util.letIf @@ -29,15 +29,20 @@ class InvocationIdDecorator : ClientCodegenDecorator { private class InvocationIdRuntimePluginCustomization( private val codegenContext: ClientCodegenContext, ) : ServiceRuntimePluginCustomization() { + private val runtimeConfig = codegenContext.runtimeConfig + private val awsRuntime = AwsRuntimeType.awsRuntime(runtimeConfig) + private val codegenScope = arrayOf( + "InvocationIdInterceptor" to awsRuntime.resolve("invocation_id::InvocationIdInterceptor"), + ) + override fun section(section: ServiceRuntimePluginSection): Writable = writable { - if (section is ServiceRuntimePluginSection.AdditionalConfig) { - section.registerInterceptor(codegenContext.runtimeConfig, this) { - rust( - "#T::new()", - AwsRuntimeType.awsRuntime(codegenContext.runtimeConfig) - .resolve("invocation_id::InvocationIdInterceptor"), - ) + when (section) { + is ServiceRuntimePluginSection.AdditionalConfig -> { + section.registerInterceptor(codegenContext.runtimeConfig, this) { + rustTemplate("#{InvocationIdInterceptor}::new()", *codegenScope) + } } + else -> emptySection } } } diff --git a/aws/sdk-codegen/src/main/kotlin/software/amazon/smithy/rustsdk/RetryInformationHeaderDecorator.kt b/aws/sdk-codegen/src/main/kotlin/software/amazon/smithy/rustsdk/RetryInformationHeaderDecorator.kt index d930693988bd3a5ed2347458dd06facd0a683945..4f8a342e31a241df3068ff503a8997703ca687fd 100644 --- a/aws/sdk-codegen/src/main/kotlin/software/amazon/smithy/rustsdk/RetryInformationHeaderDecorator.kt +++ b/aws/sdk-codegen/src/main/kotlin/software/amazon/smithy/rustsdk/RetryInformationHeaderDecorator.kt @@ -41,11 +41,6 @@ private class AddRetryInformationHeaderInterceptors(codegenContext: ClientCodege rust("#T::new()", smithyRuntime.resolve("client::orchestrator::interceptors::ServiceClockSkewInterceptor")) } - // Track the number of request attempts made. - section.registerInterceptor(runtimeConfig, this) { - rust("#T::new()", smithyRuntime.resolve("client::orchestrator::interceptors::RequestAttemptsInterceptor")) - } - // Add request metadata to outgoing requests. Sets a header. section.registerInterceptor(runtimeConfig, this) { rust("#T::new()", awsRuntime.resolve("request_info::RequestInfoInterceptor")) diff --git a/aws/sdk/integration-tests/transcribestreaming/Cargo.toml b/aws/sdk/integration-tests/transcribestreaming/Cargo.toml index 38b1018cc506e005f6a2e2696f45448c32cf64a9..181ba493cbc0a246befe4f86020126914f4076ee 100644 --- a/aws/sdk/integration-tests/transcribestreaming/Cargo.toml +++ b/aws/sdk/integration-tests/transcribestreaming/Cargo.toml @@ -22,4 +22,5 @@ hound = "3.4.0" http = "0.2.0" serde_json = "1.0.0" tokio = { version = "1.23.1", features = ["full", "test-util"] } +tracing = "0.1" tracing-subscriber = { version = "0.3.15", features = ["env-filter"] } diff --git a/aws/sra-test/integration-tests/aws-sdk-s3/tests/request_information_headers.rs b/aws/sra-test/integration-tests/aws-sdk-s3/tests/request_information_headers.rs index 43e62b4d42d890e437112b3f20c59cbd4ed02208..28a8df6c04e0fd79c472013331b6e4b1111b374d 100644 --- a/aws/sra-test/integration-tests/aws-sdk-s3/tests/request_information_headers.rs +++ b/aws/sra-test/integration-tests/aws-sdk-s3/tests/request_information_headers.rs @@ -13,6 +13,7 @@ use aws_smithy_client::dvr::MediaType; use aws_smithy_client::erase::DynConnector; use aws_smithy_runtime::client::retries::strategy::FixedDelayRetryStrategy; use aws_smithy_runtime_api::client::interceptors::InterceptorRegistrar; +use aws_smithy_runtime_api::client::orchestrator::ConfigBagAccessors; use aws_smithy_runtime_api::client::runtime_plugin::RuntimePlugin; use aws_smithy_types::client::orchestrator::ConfigBagAccessors; use aws_smithy_types::config_bag::ConfigBag; diff --git a/aws/sra-test/integration-tests/aws-sdk-s3/tests/sra_test.rs b/aws/sra-test/integration-tests/aws-sdk-s3/tests/sra_test.rs index 69f89d1a9df6955d246be46f0201d0a9e7e79963..3e6c64be5d0349865eda8f27d4314c423f8ab025 100644 --- a/aws/sra-test/integration-tests/aws-sdk-s3/tests/sra_test.rs +++ b/aws/sra-test/integration-tests/aws-sdk-s3/tests/sra_test.rs @@ -27,9 +27,7 @@ async fn sra_test() { .interceptor(util::TestUserAgentInterceptor) .build(); let client = Client::from_conf(config); - let fixup = util::FixupPlugin { - timestamp: UNIX_EPOCH + Duration::from_secs(1624036048), - }; + let fixup = util::FixupPlugin; let resp = dbg!( client diff --git a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/ServiceRuntimePluginGenerator.kt b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/ServiceRuntimePluginGenerator.kt index 1c7904a386d30244c3660eb58f74d0d10200048f..5351a10e56af3d3675436f7ee13355fa2e1a2511 100644 --- a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/ServiceRuntimePluginGenerator.kt +++ b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/ServiceRuntimePluginGenerator.kt @@ -146,7 +146,7 @@ class ServiceRuntimePluginGenerator( let sleep_impl = self.handle.conf.sleep_impl(); let timeout_config = self.handle.conf.timeout_config(); - let connector_settings = timeout_config.map(|c| #{ConnectorSettings}::from_timeout_config(c)).unwrap_or_default(); + let connector_settings = timeout_config.map(#{ConnectorSettings}::from_timeout_config).unwrap_or_default(); let connection: #{Box} = #{Box}::new(#{DynConnectorAdapter}::new( // TODO(enableNewSmithyRuntime): Replace the tower-based DynConnector and remove DynConnectorAdapter when deleting the middleware implementation #{require_connector}( diff --git a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/CustomizableOperationGenerator.kt b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/CustomizableOperationGenerator.kt index eab249a9c0e97548c638f8412a055f5874512229..8a64e514c11ad66c637f17598dc28a419a6d4f04 100644 --- a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/CustomizableOperationGenerator.kt +++ b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/CustomizableOperationGenerator.kt @@ -154,9 +154,9 @@ class CustomizableOperationGenerator( "Interceptor" to RuntimeType.smithyRuntimeApi(runtimeConfig) .resolve("client::interceptors::Interceptor"), "MapRequestInterceptor" to RuntimeType.smithyRuntime(runtimeConfig) - .resolve("client::interceptor::MapRequestInterceptor"), + .resolve("client::interceptors::MapRequestInterceptor"), "MutateRequestInterceptor" to RuntimeType.smithyRuntime(runtimeConfig) - .resolve("client::interceptor::MutateRequestInterceptor"), + .resolve("client::interceptors::MutateRequestInterceptor"), "RuntimePlugin" to RuntimeType.runtimePlugin(runtimeConfig), "SendResult" to ClientRustModule.Client.customize.toType() .resolve("internal::SendResult"), diff --git a/codegen-core/common-test-models/pokemon.smithy b/codegen-core/common-test-models/pokemon.smithy index 745f51d93c7cc42ecb8243f3d347da562fee8c1f..014ee61c4163dd58e6b07d632bcf3b8bb6bc3a63 100644 --- a/codegen-core/common-test-models/pokemon.smithy +++ b/codegen-core/common-test-models/pokemon.smithy @@ -162,4 +162,4 @@ structure StreamPokemonRadioOutput { } @streaming -blob StreamingBlob \ No newline at end of file +blob StreamingBlob diff --git a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/ValidateUnsupportedConstraints.kt b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/ValidateUnsupportedConstraints.kt index f0f0ee8227f3ac604b0c893f49e46492afd5d866..192facc2ba3f074c7c758d331140a465d0f1c8bd 100644 --- a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/ValidateUnsupportedConstraints.kt +++ b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/ValidateUnsupportedConstraints.kt @@ -294,7 +294,7 @@ fun validateUnsupportedConstraints( messages += LogMessage( Level.SEVERE, """ - The `ignoreUnsupportedConstraints` flag in the `codegen` configuration is set to `true`, but it has no + The `ignoreUnsupportedConstraints` flag in the `codegen` configuration is set to `true`, but it has no effect. All the constraint traits used in the model are well-supported, please remove this flag. """.trimIndent().replace("\n", " "), ) diff --git a/codegen-server/src/test/kotlin/software/amazon/smithy/rust/codegen/server/smithy/ValidateUnsupportedConstraintsAreNotUsedTest.kt b/codegen-server/src/test/kotlin/software/amazon/smithy/rust/codegen/server/smithy/ValidateUnsupportedConstraintsAreNotUsedTest.kt index 3c5e62e40fa8d7b867be1bf3df7606ed693728b6..a140322e361d8d6f5d0e627324221790273f7cef 100644 --- a/codegen-server/src/test/kotlin/software/amazon/smithy/rust/codegen/server/smithy/ValidateUnsupportedConstraintsAreNotUsedTest.kt +++ b/codegen-server/src/test/kotlin/software/amazon/smithy/rust/codegen/server/smithy/ValidateUnsupportedConstraintsAreNotUsedTest.kt @@ -259,7 +259,7 @@ internal class ValidateUnsupportedConstraintsAreNotUsedTest { validationResult.shouldAbort shouldBe true validationResult.messages[0].message shouldContain( """ - The `ignoreUnsupportedConstraints` flag in the `codegen` configuration is set to `true`, but it has no + The `ignoreUnsupportedConstraints` flag in the `codegen` configuration is set to `true`, but it has no effect. All the constraint traits used in the model are well-supported, please remove this flag. """.trimIndent().replace("\n", " ") ) diff --git a/rust-runtime/aws-smithy-runtime-api/src/client.rs b/rust-runtime/aws-smithy-runtime-api/src/client.rs index 1c4a97a62bded27ceaba582718764b6c41dc0fdb..5627d8ffc75d150c29f19e8cd7909209f29b0b24 100644 --- a/rust-runtime/aws-smithy-runtime-api/src/client.rs +++ b/rust-runtime/aws-smithy-runtime-api/src/client.rs @@ -24,3 +24,6 @@ pub mod runtime_plugin; /// Smithy auth runtime plugins pub mod auth; + +/// A type to track the number of requests sent by the orchestrator for a given operation. +pub mod request_attempts; diff --git a/rust-runtime/aws-smithy-runtime-api/src/client/interceptors.rs b/rust-runtime/aws-smithy-runtime-api/src/client/interceptors.rs index 1477aa4489df76eec2eda30ff88b2dd39a10018c..12722aef6c6129ce073dee61630c20563cde7e67 100644 --- a/rust-runtime/aws-smithy-runtime-api/src/client/interceptors.rs +++ b/rust-runtime/aws-smithy-runtime-api/src/client/interceptors.rs @@ -612,6 +612,10 @@ impl Deref for SharedInterceptor { pub struct InterceptorRegistrar(Vec); impl InterceptorRegistrar { + /// Register an interceptor with this `InterceptorRegistrar`. + /// + /// When this `InterceptorRegistrar` is passed to an orchestrator, the orchestrator will run the + /// registered interceptor for all the "hooks" that it implements. pub fn register(&mut self, interceptor: SharedInterceptor) { self.0.push(interceptor); } diff --git a/rust-runtime/aws-smithy-runtime-api/src/client/interceptors/context.rs b/rust-runtime/aws-smithy-runtime-api/src/client/interceptors/context.rs index c393f2e766ad52831e864fa144ab7723fb95044e..9b58b816db83efe278985ccbd6f287c3e64e4836 100644 --- a/rust-runtime/aws-smithy-runtime-api/src/client/interceptors/context.rs +++ b/rust-runtime/aws-smithy-runtime-api/src/client/interceptors/context.rs @@ -36,8 +36,7 @@ use aws_smithy_http::result::SdkError; use aws_smithy_types::config_bag::ConfigBag; use aws_smithy_types::type_erasure::{TypeErasedBox, TypeErasedError}; use phase::Phase; -use std::fmt::Debug; -use std::mem; +use std::{fmt, mem}; use tracing::{error, trace}; pub type Input = TypeErasedBox; @@ -56,7 +55,7 @@ type Response = HttpResponse; #[derive(Debug)] pub struct InterceptorContext where - E: Debug, + E: fmt::Debug, { pub(crate) input: Option, pub(crate) output_or_error: Option>>, @@ -84,7 +83,7 @@ impl InterceptorContext { impl InterceptorContext where - E: Debug, + E: fmt::Debug, { /// Decomposes the context into its constituent parts. #[doc(hidden)] @@ -113,7 +112,7 @@ where .. } = self; output_or_error - .expect("output_or_error must always beset before finalize is called.") + .expect("output_or_error must always be set before finalize is called.") .map_err(|error| OrchestratorError::into_sdk_error(error, &phase, response)) } @@ -189,6 +188,7 @@ where /// Advance to the Serialization phase. #[doc(hidden)] pub fn enter_serialization_phase(&mut self) { + trace!("entering \'serialization\' phase"); debug_assert!( self.phase.is_before_serialization(), "called enter_serialization_phase but phase is not before 'serialization'" @@ -199,6 +199,7 @@ where /// Advance to the BeforeTransmit phase. #[doc(hidden)] pub fn enter_before_transmit_phase(&mut self) { + trace!("entering \'before transmit\' phase"); debug_assert!( self.phase.is_serialization(), "called enter_before_transmit_phase but phase is not 'serialization'" @@ -215,13 +216,13 @@ where self.request() .expect("request is set before calling enter_before_transmit_phase"), ); - self.tainted = true; self.phase = Phase::BeforeTransmit; } /// Advance to the Transmit phase. #[doc(hidden)] pub fn enter_transmit_phase(&mut self) { + trace!("entering \'transmit\' phase"); debug_assert!( self.phase.is_before_transmit(), "called enter_transmit_phase but phase is not before transmit" @@ -232,6 +233,7 @@ where /// Advance to the BeforeDeserialization phase. #[doc(hidden)] pub fn enter_before_deserialization_phase(&mut self) { + trace!("entering \'before deserialization\' phase"); debug_assert!( self.phase.is_transmit(), "called enter_before_deserialization_phase but phase is not 'transmit'" @@ -250,6 +252,7 @@ where /// Advance to the Deserialization phase. #[doc(hidden)] pub fn enter_deserialization_phase(&mut self) { + trace!("entering \'deserialization\' phase"); debug_assert!( self.phase.is_before_deserialization(), "called enter_deserialization_phase but phase is not 'before deserialization'" @@ -260,6 +263,7 @@ where /// Advance to the AfterDeserialization phase. #[doc(hidden)] pub fn enter_after_deserialization_phase(&mut self) { + trace!("entering \'after deserialization\' phase"); debug_assert!( self.phase.is_deserialization(), "called enter_after_deserialization_phase but phase is not 'deserialization'" @@ -271,23 +275,45 @@ where self.phase = Phase::AfterDeserialization; } - // Returns false if rewinding isn't possible - pub fn rewind(&mut self, _cfg: &mut ConfigBag) -> bool { - // If before transmit was never touched, then we don't need to rewind - if !self.tainted { - return true; + /// Set the request checkpoint. This should only be called once, right before entering the retry loop. + #[doc(hidden)] + pub fn save_checkpoint(&mut self) { + trace!("saving request checkpoint..."); + self.request_checkpoint = self.request().and_then(try_clone); + match self.request_checkpoint.as_ref() { + Some(_) => trace!("successfully saved request checkpoint"), + None => trace!("failed to save request checkpoint: request body could not be cloned"), + } + } + + /// Returns false if rewinding isn't possible + #[doc(hidden)] + pub fn rewind(&mut self, _cfg: &mut ConfigBag) -> RewindResult { + // If request_checkpoint was never set, but we've already made one attempt, + // then this is not a retryable request + if self.request_checkpoint.is_none() && self.tainted { + return RewindResult::Impossible; } - // If request_checkpoint was never set, then this is not a retryable request - if self.request_checkpoint.is_none() { - return false; + + if !self.tainted { + // The first call to rewind() happens before the request is ever touched, so we don't need + // to clone it then. However, the request must be marked as tainted so that subsequent calls + // to rewind() properly reload the saved request checkpoint. + self.tainted = true; + return RewindResult::Unnecessary; } - // Otherwise, rewind back to the beginning of BeforeTransmit + + // Otherwise, rewind to the saved request checkpoint // TODO(enableNewSmithyRuntime): Also rewind the ConfigBag self.phase = Phase::BeforeTransmit; self.request = try_clone(self.request_checkpoint.as_ref().expect("checked above")); + assert!( + self.request.is_some(), + "if the request wasn't cloneable, then we should have already return from this method." + ); self.response = None; self.output_or_error = None; - true + RewindResult::Occurred } /// Mark this context as failed due to errors during the operation. Any errors already contained @@ -313,6 +339,33 @@ where } } +/// The result of attempting to rewind a request. +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[doc(hidden)] +pub enum RewindResult { + /// The request couldn't be rewound because it wasn't cloneable. + Impossible, + /// The request wasn't rewound because it was unnecessary. + Unnecessary, + /// The request was rewound successfully. + Occurred, +} + +impl fmt::Display for RewindResult { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + RewindResult::Impossible => write!( + f, + "The request couldn't be rewound because it wasn't cloneable." + ), + RewindResult::Unnecessary => { + write!(f, "The request wasn't rewound because it was unnecessary.") + } + RewindResult::Occurred => write!(f, "The request was rewound successfully."), + } + } +} + fn try_clone(request: &HttpRequest) -> Option { let cloned_body = request.body().try_clone()?; let mut cloned_request = ::http::Request::builder() @@ -346,7 +399,7 @@ mod tests { "input", context .input() - .map(|i| i.downcast_ref::().unwrap()) + .and_then(|i| i.downcast_ref::()) .unwrap() ); context.input_mut(); @@ -404,7 +457,7 @@ mod tests { "input", context .input() - .map(|i| i.downcast_ref::().unwrap()) + .and_then(|i| i.downcast_ref::()) .unwrap() ); @@ -417,7 +470,8 @@ mod tests { .unwrap(), ); context.enter_before_transmit_phase(); - + context.save_checkpoint(); + assert_eq!(context.rewind(&mut cfg), RewindResult::Unnecessary); // Modify the test header post-checkpoint to simulate modifying the request for signing or a mutating interceptor context.request_mut().unwrap().headers_mut().remove("test"); context.request_mut().unwrap().headers_mut().insert( @@ -437,7 +491,7 @@ mod tests { context.enter_deserialization_phase(); context.set_output_or_error(Err(OrchestratorError::operation(error))); - assert!(context.rewind(&mut cfg)); + assert_eq!(context.rewind(&mut cfg), RewindResult::Occurred); // Now after rewinding, the test header should be its original value assert_eq!( diff --git a/rust-runtime/aws-smithy-runtime-api/src/client/orchestrator/error.rs b/rust-runtime/aws-smithy-runtime-api/src/client/orchestrator/error.rs index 34296dbcacd538907506578f1db1d4d497495efa..56e6b9f4cccaa65b53826afb6669a27254181c25 100644 --- a/rust-runtime/aws-smithy-runtime-api/src/client/orchestrator/error.rs +++ b/rust-runtime/aws-smithy-runtime-api/src/client/orchestrator/error.rs @@ -24,7 +24,8 @@ pub enum OrchestratorError { impl OrchestratorError { /// Create a new `OrchestratorError` from a [`BoxError`]. - pub fn other(err: BoxError) -> Self { + pub fn other(err: impl Into>) -> Self { + let err = err.into(); Self::Other { err } } @@ -130,6 +131,6 @@ where E: Debug + std::error::Error + 'static, { fn from(err: aws_smithy_http::byte_stream::error::Error) -> Self { - Self::other(err.into()) + Self::other(err) } } diff --git a/rust-runtime/aws-smithy-runtime-api/src/client/request_attempts.rs b/rust-runtime/aws-smithy-runtime-api/src/client/request_attempts.rs new file mode 100644 index 0000000000000000000000000000000000000000..3cfe306f10ac62bf2a11cb941daff0927285e27c --- /dev/null +++ b/rust-runtime/aws-smithy-runtime-api/src/client/request_attempts.rs @@ -0,0 +1,26 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#[derive(Debug, Clone, Copy)] +pub struct RequestAttempts { + attempts: usize, +} + +impl RequestAttempts { + #[cfg(any(feature = "test-util", test))] + pub fn new(attempts: usize) -> Self { + Self { attempts } + } + + pub fn attempts(&self) -> usize { + self.attempts + } +} + +impl From for RequestAttempts { + fn from(attempts: usize) -> Self { + Self { attempts } + } +} diff --git a/rust-runtime/aws-smithy-runtime/Cargo.toml b/rust-runtime/aws-smithy-runtime/Cargo.toml index aadbfcac83d6538e12b4ca955796df55e24fe74b..d1fd93c93b069616262f22b4ad73a8deb77e0f9b 100644 --- a/rust-runtime/aws-smithy-runtime/Cargo.toml +++ b/rust-runtime/aws-smithy-runtime/Cargo.toml @@ -31,6 +31,7 @@ tracing = "0.1.37" [dev-dependencies] aws-smithy-async = { path = "../aws-smithy-async", features = ["rt-tokio", "test-util"] } +aws-smithy-runtime-api = { path = "../aws-smithy-runtime-api", features = ["test-util"] } tokio = { version = "1.25", features = ["macros", "rt", "test-util"] } tracing-subscriber = { version = "0.3.15", features = ["env-filter"] } tracing-test = "0.2.1" diff --git a/rust-runtime/aws-smithy-runtime/external-types.toml b/rust-runtime/aws-smithy-runtime/external-types.toml index 92360e722af0988682f8d44b26f4f84c43016dde..a6398200200254fa5f7fc6414d1d7a2600066b6e 100644 --- a/rust-runtime/aws-smithy-runtime/external-types.toml +++ b/rust-runtime/aws-smithy-runtime/external-types.toml @@ -1,5 +1,6 @@ allowed_external_types = [ "aws_smithy_runtime_api::*", + "aws_smithy_async::*", "aws_smithy_http::*", "aws_smithy_types::*", "aws_smithy_client::erase::DynConnector", diff --git a/rust-runtime/aws-smithy-runtime/src/client.rs b/rust-runtime/aws-smithy-runtime/src/client.rs index 1558db8bfea65979c44a29cf4b2057f4a862ff7c..0c8a1705f7d09e18d98f57a366dccfa127f9b3d2 100644 --- a/rust-runtime/aws-smithy-runtime/src/client.rs +++ b/rust-runtime/aws-smithy-runtime/src/client.rs @@ -32,4 +32,4 @@ pub mod runtime_plugin; pub mod identity; /// Interceptors for Smithy clients. -pub mod interceptor; +pub mod interceptors; diff --git a/rust-runtime/aws-smithy-runtime/src/client/connections/test_connection.rs b/rust-runtime/aws-smithy-runtime/src/client/connections/test_connection.rs index f948daf746084e5ce2939cff4709e6af0ae83113..2ca2db43b12bb26ebafb8702561d0b3fa103946a 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/connections/test_connection.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/connections/test_connection.rs @@ -5,6 +5,7 @@ //! Module with client connectors useful for testing. +use aws_smithy_async::rt::sleep::AsyncSleep; use aws_smithy_http::body::SdkBody; use aws_smithy_http::result::ConnectorError; use aws_smithy_protocol_test::{assert_ok, validate_body, MediaType}; @@ -13,9 +14,9 @@ use aws_smithy_runtime_api::client::orchestrator::{ }; use http::header::{HeaderName, CONTENT_TYPE}; use std::fmt::Debug; -use std::future::ready; use std::ops::Deref; use std::sync::{Arc, Mutex}; +use std::time::Duration; use tokio::sync::oneshot; /// Test Connection to capture a single request @@ -92,7 +93,44 @@ pub fn capture_request( ) } -type ConnectVec = Vec<(HttpRequest, HttpResponse)>; +type ConnectionEvents = Vec; + +#[derive(Debug)] +pub struct ConnectionEvent { + latency: Duration, + req: HttpRequest, + res: HttpResponse, +} + +impl ConnectionEvent { + pub fn new(req: HttpRequest, res: HttpResponse) -> Self { + Self { + res, + req, + latency: Duration::from_secs(0), + } + } + + /// Add simulated latency to this `ConnectionEvent` + pub fn with_latency(mut self, latency: Duration) -> Self { + self.latency = latency; + self + } + + pub fn req(&self) -> &HttpRequest { + &self.req + } + + pub fn res(&self) -> &HttpResponse { + &self.res + } +} + +impl From<(HttpRequest, HttpResponse)> for ConnectionEvent { + fn from((req, res): (HttpRequest, HttpResponse)) -> Self { + Self::new(req, res) + } +} #[derive(Debug)] pub struct ValidateRequest { @@ -101,20 +139,23 @@ pub struct ValidateRequest { } impl ValidateRequest { - pub fn assert_matches(&self, ignore_headers: &[HeaderName]) { + pub fn assert_matches(&self, index: usize, ignore_headers: &[HeaderName]) { let (actual, expected) = (&self.actual, &self.expected); - assert_eq!(actual.uri(), expected.uri()); + assert_eq!( + actual.uri(), + expected.uri(), + "Request #{index} - URI doesn't match expected value" + ); for (name, value) in expected.headers() { if !ignore_headers.contains(name) { let actual_header = actual .headers() .get(name) - .unwrap_or_else(|| panic!("Header {:?} missing", name)); + .unwrap_or_else(|| panic!("Request #{index} - Header {name:?} is missing")); assert_eq!( actual_header.to_str().unwrap(), value.to_str().unwrap(), - "Header mismatch for {:?}", - name + "Request #{index} - Header {name:?} doesn't match expected value", ); } } @@ -132,7 +173,11 @@ impl ValidateRequest { }; match (actual_str, expected_str) { (Ok(actual), Ok(expected)) => assert_ok(validate_body(actual, expected, media_type)), - _ => assert_eq!(actual.body().bytes(), expected.body().bytes()), + _ => assert_eq!( + actual.body().bytes(), + expected.body().bytes(), + "Request #{index} - Body contents didn't match expected value" + ), }; } } @@ -142,28 +187,20 @@ impl ValidateRequest { /// A basic test connection. It will: /// - Respond to requests with a preloaded series of responses /// - Record requests for future examination -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct TestConnection { - data: Arc>, + data: Arc>, requests: Arc>>, -} - -// Need a clone impl that ignores `B` -impl Clone for TestConnection { - fn clone(&self) -> Self { - TestConnection { - data: self.data.clone(), - requests: self.requests.clone(), - } - } + sleep_impl: Arc, } impl TestConnection { - pub fn new(mut data: ConnectVec) -> Self { + pub fn new(mut data: ConnectionEvents, sleep_impl: Arc) -> Self { data.reverse(); TestConnection { data: Arc::new(Mutex::new(data)), requests: Default::default(), + sleep_impl, } } @@ -173,33 +210,40 @@ impl TestConnection { #[track_caller] pub fn assert_requests_match(&self, ignore_headers: &[HeaderName]) { - for req in self.requests().iter() { - req.assert_matches(ignore_headers) + for (i, req) in self.requests().iter().enumerate() { + req.assert_matches(i, ignore_headers) } - let remaining_requests = self.data.lock().unwrap().len(); + let remaining_requests = self.data.lock().unwrap(); + let number_of_remaining_requests = remaining_requests.len(); let actual_requests = self.requests().len(); - assert_eq!( - remaining_requests, 0, - "Expected {} additional requests ({} were made)", - remaining_requests, actual_requests + assert!( + remaining_requests.is_empty(), + "Expected {number_of_remaining_requests} additional requests (only {actual_requests} sent)", ); } } impl Connection for TestConnection { fn call(&self, request: HttpRequest) -> BoxFuture { - // TODO(orchestrator) Validate request - - let res = if let Some((expected, resp)) = self.data.lock().unwrap().pop() { + // TODO(enableNewSmithyRuntime) Validate request + let (res, simulated_latency) = if let Some(event) = self.data.lock().unwrap().pop() { self.requests.lock().unwrap().push(ValidateRequest { - expected, + expected: event.req, actual: request, }); - Ok(resp.map(SdkBody::from)) + + (Ok(event.res.map(SdkBody::from)), event.latency) } else { - Err(ConnectorError::other("No more data".into(), None).into()) + ( + Err(ConnectorError::other("No more data".into(), None).into()), + Duration::from_secs(0), + ) }; - Box::pin(ready(res)) + let sleep = self.sleep_impl.sleep(simulated_latency); + Box::pin(async move { + sleep.await; + res + }) } } diff --git a/rust-runtime/aws-smithy-runtime/src/client/interceptor.rs b/rust-runtime/aws-smithy-runtime/src/client/interceptors.rs similarity index 100% rename from rust-runtime/aws-smithy-runtime/src/client/interceptor.rs rename to rust-runtime/aws-smithy-runtime/src/client/interceptors.rs diff --git a/rust-runtime/aws-smithy-runtime/src/client/orchestrator.rs b/rust-runtime/aws-smithy-runtime/src/client/orchestrator.rs index 42d23e2bcdc6db5318ebcf7511276644fa9cd70a..12abb526ea94472722d58105b319edcd784a9aa1 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/orchestrator.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/orchestrator.rs @@ -10,16 +10,17 @@ use crate::client::timeout::{MaybeTimeout, ProvideMaybeTimeoutConfig, TimeoutKin use aws_smithy_http::body::SdkBody; use aws_smithy_http::byte_stream::ByteStream; use aws_smithy_http::result::SdkError; -use aws_smithy_runtime_api::client::interceptors::context::{Error, Input, Output}; +use aws_smithy_runtime_api::client::interceptors::context::{Error, Input, Output, RewindResult}; use aws_smithy_runtime_api::client::interceptors::{InterceptorContext, Interceptors}; use aws_smithy_runtime_api::client::orchestrator::{ - BoxError, ConfigBagAccessors, HttpResponse, LoadedRequestBody, + BoxError, ConfigBagAccessors, HttpResponse, LoadedRequestBody, OrchestratorError, }; +use aws_smithy_runtime_api::client::request_attempts::RequestAttempts; use aws_smithy_runtime_api::client::retries::ShouldAttempt; use aws_smithy_runtime_api::client::runtime_plugin::RuntimePlugins; use aws_smithy_types::config_bag::ConfigBag; use std::mem; -use tracing::{debug_span, Instrument}; +use tracing::{debug, debug_span, instrument, trace, Instrument}; mod auth; /// Defines types that implement a trait for endpoint resolution @@ -29,6 +30,7 @@ pub mod interceptors; macro_rules! halt { ([$ctx:ident] => $err:expr) => {{ + trace!("encountered orchestrator error, continuing"); $ctx.fail($err.into()); return; }}; @@ -46,6 +48,7 @@ macro_rules! halt_on_err { macro_rules! continue_on_err { ([$ctx:ident] => $expr:expr) => { if let Err(err) = $expr { + trace!("encountered orchestrator error, continuing"); $ctx.fail(err.into()); } }; @@ -82,6 +85,7 @@ pub async fn invoke( /// Apply configuration is responsible for apply runtime plugins to the config bag, as well as running /// `read_before_execution` interceptors. If a failure occurs due to config construction, `invoke` /// will raise it to the user. If an interceptor fails, then `invoke` +#[instrument(skip_all)] fn apply_configuration( ctx: &mut InterceptorContext, cfg: &mut ConfigBag, @@ -97,6 +101,7 @@ fn apply_configuration( Ok(()) } +#[instrument(skip_all)] async fn try_op(ctx: &mut InterceptorContext, cfg: &mut ConfigBag, interceptors: &Interceptors) { // Before serialization halt_on_err!([ctx] => interceptors.read_before_serialization(ctx, cfg)); @@ -129,10 +134,10 @@ async fn try_op(ctx: &mut InterceptorContext, cfg: &mut ConfigBag, interceptors: let retry_strategy = cfg.retry_strategy(); match retry_strategy.should_attempt_initial_request(cfg) { // Yes, let's make a request - Ok(ShouldAttempt::Yes) => { /* Keep going */ } + Ok(ShouldAttempt::Yes) => trace!("retry strategy has OKed initial request"), // No, this request shouldn't be sent Ok(ShouldAttempt::No) => { - let err: BoxError = "The retry strategy indicates that an initial request shouldn't be made, but it did specify why.".into(); + let err: BoxError = "The retry strategy indicates that an initial request shouldn't be made, but it didn't specify why.".into(); halt!([ctx] => err); } // No, we shouldn't make a request because... @@ -142,16 +147,36 @@ async fn try_op(ctx: &mut InterceptorContext, cfg: &mut ConfigBag, interceptors: } } - loop { + // Save a request checkpoint before we make the request. This will allow us to "rewind" + // the request in the case of retry attempts. + ctx.save_checkpoint(); + for i in 0usize.. { + trace!("beginning attempt #{i}"); + // Break from the loop if we can't rewind the request's state. This will always succeed the + // first time, but will fail on subsequent iterations if the request body wasn't retryable. + match ctx.rewind(cfg) { + r @ RewindResult::Impossible => { + debug!("{r}"); + break; + } + r @ RewindResult::Occurred => debug!("{r}"), + r @ RewindResult::Unnecessary => debug!("{r}"), + } + // Track which attempt we're currently on. + cfg.put::(i.into()); let attempt_timeout_config = cfg.maybe_timeout_config(TimeoutKind::OperationAttempt); - async { + let maybe_timeout = async { try_attempt(ctx, cfg, interceptors).await; finally_attempt(ctx, cfg, interceptors).await; Result::<_, SdkError>::Ok(()) } .maybe_timeout_with_config(attempt_timeout_config) .await - .expect("These are infallible; The retry strategy will decide whether to stop or not."); + .map_err(OrchestratorError::other); + + // We continue when encountering a timeout error. The retry classifier will decide what to do with it. + continue_on_err!([ctx] => maybe_timeout); + let retry_strategy = cfg.retry_strategy(); let should_attempt = halt_on_err!([ctx] => retry_strategy.should_attempt_retry(ctx, cfg)); match should_attempt { @@ -159,16 +184,21 @@ async fn try_op(ctx: &mut InterceptorContext, cfg: &mut ConfigBag, interceptors: ShouldAttempt::Yes => continue, // No, this request shouldn't be retried ShouldAttempt::No => { + trace!("this error is not retryable, exiting attempt loop"); break; } - ShouldAttempt::YesAfterDelay(_delay) => { - // TODO(enableNewSmithyRuntime): implement retries with explicit delay - todo!("implement retries with an explicit delay.") + ShouldAttempt::YesAfterDelay(delay) => { + let sleep_impl = halt_on_err!([ctx] => cfg.sleep_impl().ok_or(OrchestratorError::other( + "The retry strategy requested a delay before sending the next request, but no 'async sleep' implementation was set." + ))); + sleep_impl.sleep(delay).await; + continue; } } } } +#[instrument(skip_all)] async fn try_attempt( ctx: &mut InterceptorContext, cfg: &mut ConfigBag, @@ -221,6 +251,7 @@ async fn try_attempt( halt_on_err!([ctx] => interceptors.read_after_deserialization(ctx, cfg)); } +#[instrument(skip_all)] async fn finally_attempt( ctx: &mut InterceptorContext, cfg: &mut ConfigBag, @@ -230,6 +261,7 @@ async fn finally_attempt( continue_on_err!([ctx] => interceptors.read_after_attempt(ctx, cfg)); } +#[instrument(skip_all)] async fn finally_op( ctx: &mut InterceptorContext, cfg: &mut ConfigBag, diff --git a/rust-runtime/aws-smithy-runtime/src/client/orchestrator/interceptors.rs b/rust-runtime/aws-smithy-runtime/src/client/orchestrator/interceptors.rs index 081b9b3bd74eda7317a11eb6b8ff8e09b2fcba54..b9de2daa37bde2de6c7489ee5f0be7c461d03ba3 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/orchestrator/interceptors.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/orchestrator/interceptors.rs @@ -3,8 +3,6 @@ * SPDX-License-Identifier: Apache-2.0 */ -mod request_attempts; mod service_clock_skew; -pub use request_attempts::{RequestAttempts, RequestAttemptsInterceptor}; pub use service_clock_skew::{ServiceClockSkew, ServiceClockSkewInterceptor}; diff --git a/rust-runtime/aws-smithy-runtime/src/client/orchestrator/interceptors/request_attempts.rs b/rust-runtime/aws-smithy-runtime/src/client/orchestrator/interceptors/request_attempts.rs deleted file mode 100644 index c42971cb21b0662e45ad2dbf8d6734b37026a51e..0000000000000000000000000000000000000000 --- a/rust-runtime/aws-smithy-runtime/src/client/orchestrator/interceptors/request_attempts.rs +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0 - */ - -use aws_smithy_runtime_api::client::interceptors::{ - BeforeTransmitInterceptorContextMut, BoxError, Interceptor, -}; -use aws_smithy_types::config_bag::ConfigBag; - -#[derive(Debug, Clone, Default)] -#[non_exhaustive] -pub struct RequestAttempts { - attempts: u32, -} - -impl RequestAttempts { - pub fn new() -> Self { - Self::default() - } - - // There is no legitimate reason to set this unless you're testing things. - // Therefore, this is only available for tests. - #[cfg(test)] - pub fn new_with_attempts(attempts: u32) -> Self { - Self { attempts } - } - - pub fn attempts(&self) -> u32 { - self.attempts - } - - fn increment(mut self) -> Self { - self.attempts += 1; - self - } -} - -#[derive(Debug, Default)] -#[non_exhaustive] -pub struct RequestAttemptsInterceptor {} - -impl RequestAttemptsInterceptor { - pub fn new() -> Self { - Self::default() - } -} - -impl Interceptor for RequestAttemptsInterceptor { - fn modify_before_retry_loop( - &self, - _ctx: &mut BeforeTransmitInterceptorContextMut<'_>, - cfg: &mut ConfigBag, - ) -> Result<(), BoxError> { - cfg.put(RequestAttempts::new()); - Ok(()) - } - - fn modify_before_transmit( - &self, - _ctx: &mut BeforeTransmitInterceptorContextMut<'_>, - cfg: &mut ConfigBag, - ) -> Result<(), BoxError> { - if let Some(request_attempts) = cfg.get::().cloned() { - cfg.put(request_attempts.increment()); - } - Ok(()) - } -} diff --git a/rust-runtime/aws-smithy-runtime/src/client/retries/strategy/fixed_delay.rs b/rust-runtime/aws-smithy-runtime/src/client/retries/strategy/fixed_delay.rs index 243c0a329817cae9678e289ad8fd0c8a138e18f6..66db997eaef2970ffb8c5e3afeddc99e27907685 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/retries/strategy/fixed_delay.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/retries/strategy/fixed_delay.rs @@ -3,9 +3,9 @@ * SPDX-License-Identifier: Apache-2.0 */ -use crate::client::orchestrator::interceptors::RequestAttempts; use aws_smithy_runtime_api::client::interceptors::InterceptorContext; use aws_smithy_runtime_api::client::orchestrator::BoxError; +use aws_smithy_runtime_api::client::request_attempts::RequestAttempts; use aws_smithy_runtime_api::client::retries::{ ClassifyRetry, RetryClassifiers, RetryReason, RetryStrategy, ShouldAttempt, }; @@ -56,7 +56,7 @@ impl RetryStrategy for FixedDelayRetryStrategy { let request_attempts: &RequestAttempts = cfg .get() .expect("at least one request attempt is made before any retry is attempted"); - if request_attempts.attempts() >= self.max_attempts { + if request_attempts.attempts() >= self.max_attempts as usize { tracing::trace!( attempts = request_attempts.attempts(), max_attempts = self.max_attempts, diff --git a/rust-runtime/aws-smithy-runtime/src/client/test_util.rs b/rust-runtime/aws-smithy-runtime/src/client/test_util.rs index 71afb382e4c046545556db11086c39daaf875aa6..101c971eff38987639cc8ae6658fe7836af263f2 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/test_util.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/test_util.rs @@ -5,5 +5,5 @@ pub mod connector; pub mod deserializer; -pub mod interceptor; +pub mod interceptors; pub mod serializer; diff --git a/rust-runtime/aws-smithy-runtime/src/client/test_util/interceptor.rs b/rust-runtime/aws-smithy-runtime/src/client/test_util/interceptors.rs similarity index 94% rename from rust-runtime/aws-smithy-runtime/src/client/test_util/interceptor.rs rename to rust-runtime/aws-smithy-runtime/src/client/test_util/interceptors.rs index d8d8c6ec32af9492e9df763efb2fb1f7de3d27dc..07681ba274aa73749c6059dbda25f67961cfd42c 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/test_util/interceptor.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/test_util/interceptors.rs @@ -61,12 +61,11 @@ mod tests { ctx.enter_before_transmit_phase(); let mut ctx = Into::into(&mut ctx); let request_time = UNIX_EPOCH + Duration::from_secs(1624036048); - let interceptor = TestParamsSetterInterceptor::new({ - let request_time = request_time.clone(); + let interceptor = TestParamsSetterInterceptor::new( move |_: &mut BeforeTransmitInterceptorContextMut<'_>, cfg: &mut ConfigBag| { cfg.set_request_time(request_time); - } - }); + }, + ); interceptor .modify_before_signing(&mut ctx, &mut cfg) .unwrap(); diff --git a/rust-runtime/aws-smithy-types/Cargo.toml b/rust-runtime/aws-smithy-types/Cargo.toml index 7ebd3ece231e0192759552d9106c69ce8eed234b..aa2462d6d0134c101d7b3dc26df7c2b0bbe7575a 100644 --- a/rust-runtime/aws-smithy-types/Cargo.toml +++ b/rust-runtime/aws-smithy-types/Cargo.toml @@ -7,6 +7,9 @@ edition = "2021" license = "Apache-2.0" repository = "https://github.com/awslabs/smithy-rs" +[features] +test-util = [] + [dependencies] itoa = "1.0.0" num-integer = "0.1.44" diff --git a/rust-runtime/aws-smithy-types/src/config_bag.rs b/rust-runtime/aws-smithy-types/src/config_bag.rs index 0100a99f977db405ec8ee945a4d781147b22500d..f307d927b847219a6a5df2dacd4888beaf389182 100644 --- a/rust-runtime/aws-smithy-types/src/config_bag.rs +++ b/rust-runtime/aws-smithy-types/src/config_bag.rs @@ -275,7 +275,9 @@ impl FrozenConfigBag { } impl ConfigBag { - /// Creates a new config bag "base". Configuration may then be "layered" onto the base by calling + /// Create a new config bag "base". + /// + /// Configuration may then be "layered" onto the base by calling /// [`ConfigBag::store_put`], [`ConfigBag::store_or_unset`], [`ConfigBag::store_append`]. Layers /// of configuration may then be "frozen" (made immutable) by calling [`ConfigBag::freeze`]. pub fn base() -> Self { @@ -495,11 +497,13 @@ pub struct ItemIter<'a, T> { inner: BagIter<'a>, t: PhantomData, } + impl<'a, T> Debug for ItemIter<'a, T> { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "ItemIter") } } + impl<'a, T: 'a> Iterator for ItemIter<'a, T> where T: Store, diff --git a/rust-runtime/aws-smithy-types/src/date_time/mod.rs b/rust-runtime/aws-smithy-types/src/date_time/mod.rs index fd9555b5e60dd270c46844950199066247345247..84ac29799b09e591c46f7af0a027f0a77622b74a 100644 --- a/rust-runtime/aws-smithy-types/src/date_time/mod.rs +++ b/rust-runtime/aws-smithy-types/src/date_time/mod.rs @@ -176,6 +176,12 @@ impl DateTime { self.seconds } + /// Set the seconds component of this `DateTime`. + pub fn set_seconds(&mut self, seconds: i64) -> &mut Self { + self.seconds = seconds; + self + } + /// Returns the sub-second nanos component of the `DateTime`. /// /// _Note: this does not include the number of seconds since the epoch._ @@ -183,6 +189,12 @@ impl DateTime { self.subsecond_nanos } + /// Set the "sub-second" nanoseconds of this `DateTime`. + pub fn set_subsec_nanos(&mut self, subsec_nanos: u32) -> &mut Self { + self.subsecond_nanos = subsec_nanos; + self + } + /// Converts the `DateTime` to the number of milliseconds since the Unix epoch. /// /// This is fallible since `DateTime` holds more precision than an `i64`, and will diff --git a/rust-runtime/aws-smithy-types/src/type_erasure.rs b/rust-runtime/aws-smithy-types/src/type_erasure.rs index b27d9df3475ce117141f9cfb8ef9b4cea8e1f140..99025d461789cbc3ef3c24427a83cd0e85dd0027 100644 --- a/rust-runtime/aws-smithy-types/src/type_erasure.rs +++ b/rust-runtime/aws-smithy-types/src/type_erasure.rs @@ -106,6 +106,17 @@ pub struct TypeErasedBox { >, } +#[cfg(feature = "test-util")] +impl TypeErasedBox { + /// Often, when testing the orchestrator or its components, it's necessary to provide a + /// `TypeErasedBox` to serve as an `Input` for `invoke`. In cases where the type won't actually + /// be accessed during testing, use this method to generate a `TypeErasedBox` that makes it + /// clear that "for the purpose of this test, the `Input` doesn't matter." + pub fn doesnt_matter() -> Self { + Self::new("doesn't matter") + } +} + impl fmt::Debug for TypeErasedBox { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("TypeErasedBox:")?; @@ -117,7 +128,7 @@ impl TypeErasedBox { /// Create a new `TypeErasedBox` from `value` of type `T` pub fn new(value: T) -> Self { let debug = |value: &Box, f: &mut fmt::Formatter<'_>| { - fmt::Debug::fmt(value.downcast_ref::().expect("typechecked"), f) + fmt::Debug::fmt(value.downcast_ref::().expect("type-checked"), f) }; Self { field: Box::new(value),