Unverified Commit 3b95b970 authored by ysaito1001's avatar ysaito1001 Committed by GitHub
Browse files

Port customizable operation to orchestrator (#2706)

## Motivation and Context
Port [Customizable
Operation](https://github.com/awslabs/smithy-rs/pull/1647) to
orchestrator

## Description
This PR implements `CustomizableOperation` in the orchestrator. Just
like the counterpart in the middleware, it is created when the
`customize` method (in the orchestrator mode) on a fluent builder is
called. The `customize` method in the orchestrator could technically be
made a synchronous method because there is no need to create an
operation, which requires `async`, therefore making the `customize`
method in the middleware `async`. However, during the transition from
the middleware to the orchestrator, the integration tests
([example](https://github.com/awslabs/smithy-rs/blob/31c152d9af53afb9a5e6edf9df3def57931b9c1e/aws/sdk/integration-tests/s3/tests/signing-it.rs#L36)

)
need to be able to run in both modes. For this reason, the `customize`
method in the orchestrator is temporarily marked as `async`.

Regarding methods defined on the new `CustomizableOperation`, they
include `mutate_request` and `map_request` from the counterpart in the
middleware. However, it did not port `map_operation` because there is no
operation to map on. Most use cases for `map_operation` is put things in
a property bag. The new `CustomizableOperation` provides an
`interceptor` method to accomplish the same, i.e putting things in a
config bag.

Finally, for integration tests to run in both modes, the code gen emits
the implementation of the `customize` method differently depending on
the active Smithy runtime mode, similar to what the implementation of
`send` method does.

## Testing
Added one `sra-test` for mutating a request.

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._

---------

Co-authored-by: default avatarYuki Saito <awsaito@amazon.com>
Co-authored-by: default avatarJohn DiSanti <jdisanti@amazon.com>
parent 33e1a67b
Loading
Loading
Loading
Loading
+52 −6
Original line number Diff line number Diff line
@@ -40,11 +40,12 @@ async fn operation_interceptor_test() {
    let resp = dbg!(
        client
            .list_objects_v2()
            .config_override(
                aws_sdk_s3::Config::builder().interceptor(util::TestUserAgentInterceptor)
            )
            .bucket("test-bucket")
            .prefix("prefix~")
            .customize()
            .await
            .unwrap()
            .interceptor(util::TestUserAgentInterceptor)
            .send_orchestrator_with_plugin(Some(fixup))
            .await
    );
@@ -106,11 +107,56 @@ async fn interceptor_priority() {
    let resp = dbg!(
        client
            .list_objects_v2()
            .config_override(aws_sdk_s3::Config::builder().interceptor(
                RequestTimeAdvanceInterceptor(Duration::from_secs(1624036048))
            ))
            .bucket("test-bucket")
            .prefix("prefix~")
            .customize()
            .await
            .unwrap()
            .interceptor(RequestTimeAdvanceInterceptor(Duration::from_secs(
                1624036048
            )))
            .send_orchestrator_with_plugin(Some(fixup))
            .await
    );
    let resp = resp.expect("valid e2e test");
    assert_eq!(resp.name(), Some("test-bucket"));
    conn.full_validate(MediaType::Xml).await.expect("success")
}

#[tokio::test]
async fn set_test_user_agent_through_request_mutation() {
    let conn = dvr::ReplayingConnection::from_file(LIST_BUCKETS_PATH).unwrap();

    let config = aws_sdk_s3::Config::builder()
        .credentials_provider(Credentials::for_tests())
        .region(Region::new("us-east-1"))
        .http_connector(DynConnector::new(conn.clone()))
        .build();
    let client = Client::from_conf(config);
    let fixup = util::FixupPlugin {
        timestamp: UNIX_EPOCH + Duration::from_secs(1624036048),
    };

    let resp = dbg!(
        client
            .list_objects_v2()
            .bucket("test-bucket")
            .prefix("prefix~")
            .customize()
            .await
            .unwrap()
            .mutate_request(|request| {
                request.headers_mut()
                    .insert(
                        http::HeaderName::from_static("user-agent"),
                        http::HeaderValue::from_str("aws-sdk-rust/0.123.test os/windows/XPSP3 lang/rust/1.50.0").unwrap(),
                    );
                request.headers_mut()
                    .insert(
                        http::HeaderName::from_static("x-amz-user-agent"),
                        http::HeaderValue::from_str("aws-sdk-rust/0.123.test api/test-service/0.123 os/windows/XPSP3 lang/rust/1.50.0").unwrap(),
                    );
            })
            .send_orchestrator_with_plugin(Some(fixup))
            .await
    );
+4 −3
Original line number Diff line number Diff line
@@ -68,9 +68,12 @@ async fn three_retries_and_then_success() {
    let resp = dbg!(
        client
            .list_objects_v2()
            .config_override(aws_sdk_s3::Config::builder().force_path_style(false))
            .bucket("test-bucket")
            .prefix("prefix~")
            .customize()
            .await
            .unwrap()
            .config_override(aws_sdk_s3::Config::builder().force_path_style(false))
            .send_orchestrator_with_plugin(Some(fixup))
            .await
    );
@@ -152,7 +155,6 @@ async fn three_retries_and_then_success() {
//     let resp = dbg!(
//         client
//             .list_objects_v2()
//             .config_override(aws_sdk_s3::Config::builder().force_path_style(false))
//             .bucket("test-bucket")
//             .prefix("prefix~")
//             .send_v2_with_plugin(Some(fixup))
@@ -244,7 +246,6 @@ async fn three_retries_and_then_success() {
//     let resp = dbg!(
//         client
//             .list_objects_v2()
//             .config_override(aws_sdk_s3::Config::builder().force_path_style(false))
//             .bucket("test-bucket")
//             .prefix("prefix~")
//             .send_v2_with_plugin(Some(fixup))
+142 −1
Original line number Diff line number Diff line
@@ -5,6 +5,7 @@

package software.amazon.smithy.rust.codegen.client.smithy.generators.client

import software.amazon.smithy.model.shapes.OperationShape
import software.amazon.smithy.rust.codegen.client.smithy.ClientCodegenContext
import software.amazon.smithy.rust.codegen.client.smithy.ClientRustModule
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency
@@ -16,13 +17,14 @@ import software.amazon.smithy.rust.codegen.core.smithy.RuntimeConfig
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType.Companion.preludeScope
import software.amazon.smithy.rust.codegen.core.smithy.RustCrate
import software.amazon.smithy.rust.codegen.core.util.outputShape

/**
 * Generates the code required to add the `.customize()` function to the
 * fluent client builders.
 */
class CustomizableOperationGenerator(
    codegenContext: ClientCodegenContext,
    private val codegenContext: ClientCodegenContext,
    private val generics: FluentClientGenerics,
) {
    private val runtimeConfig = codegenContext.runtimeConfig
@@ -126,6 +128,145 @@ class CustomizableOperationGenerator(
            *codegenScope,
        )
    }

    fun renderForOrchestrator(writer: RustWriter, operation: OperationShape) {
        val symbolProvider = codegenContext.symbolProvider
        val model = codegenContext.model

        val builderName = operation.fluentBuilderType(symbolProvider).name
        val outputType = symbolProvider.toSymbol(operation.outputShape(model))
        val errorType = symbolProvider.symbolForOperationError(operation)

        val codegenScope = arrayOf(
            *preludeScope,
            "HttpResponse" to RuntimeType.smithyRuntimeApi(runtimeConfig)
                .resolve("client::orchestrator::HttpResponse"),
            "Interceptor" to RuntimeType.smithyRuntimeApi(runtimeConfig)
                .resolve("client::interceptors::Interceptor"),
            "MapRequestInterceptor" to RuntimeType.smithyRuntime(runtimeConfig)
                .resolve("client::interceptor::MapRequestInterceptor"),
            "MutateRequestInterceptor" to RuntimeType.smithyRuntime(runtimeConfig)
                .resolve("client::interceptor::MutateRequestInterceptor"),
            "OperationError" to errorType,
            "OperationOutput" to outputType,
            "RuntimePlugin" to RuntimeType.runtimePlugin(runtimeConfig),
            "SdkBody" to RuntimeType.sdkBody(runtimeConfig),
            "SdkError" to RuntimeType.sdkError(runtimeConfig),
            "SharedInterceptor" to RuntimeType.smithyRuntimeApi(runtimeConfig)
                .resolve("client::interceptors::SharedInterceptor"),
        )

        writer.rustTemplate(
            """
            /// A wrapper type for [`$builderName`]($builderName) that allows for configuring a single
            /// operation invocation.
            pub struct CustomizableOperation {
                pub(crate) fluent_builder: $builderName,
                pub(crate) config_override: #{Option}<crate::config::Builder>,
                pub(crate) interceptors: Vec<#{SharedInterceptor}>,
            }

            impl CustomizableOperation {
                /// Adds an [`Interceptor`](#{Interceptor}) that runs at specific stages of the request execution pipeline.
                ///
                /// Note that interceptors can also be added to `CustomizableOperation` by `config_override`,
                /// `map_request`, and `mutate_request` (the last two are implemented via interceptors under the hood).
                /// The order in which those user-specified operation interceptors are invoked should not be relied upon
                /// as it is an implementation detail.
                pub fn interceptor(mut self, interceptor: impl #{Interceptor} + #{Send} + #{Sync} + 'static) -> Self {
                    self.interceptors.push(#{SharedInterceptor}::new(interceptor));
                    self
                }

                /// Allows for customizing the operation's request.
                pub fn map_request<F, E>(mut self, f: F) -> Self
                where
                    F: #{Fn}(&mut http::Request<#{SdkBody}>) -> #{Result}<(), E>
                        + #{Send}
                        + #{Sync}
                        + 'static,
                    E: ::std::error::Error + #{Send} + #{Sync} + 'static,
                {
                    self.interceptors.push(
                        #{SharedInterceptor}::new(
                            #{MapRequestInterceptor}::new(f),
                        ),
                    );
                    self
                }

                /// Convenience for `map_request` where infallible direct mutation of request is acceptable.
                pub fn mutate_request<F>(mut self, f: F) -> Self
                where
                    F: #{Fn}(&mut http::Request<#{SdkBody}>) + #{Send} + #{Sync} + 'static,
                {
                    self.interceptors.push(
                        #{SharedInterceptor}::new(
                            #{MutateRequestInterceptor}::new(f),
                        ),
                    );
                    self
                }

                /// Overrides config for a single operation invocation.
                ///
                /// `config_override` is applied to the operation configuration level.
                /// The fields in the builder that are `Some` override those applied to the service
                /// configuration level. For instance,
                ///
                /// Config A     overridden by    Config B          ==        Config C
                /// field_1: None,                field_1: Some(v2),          field_1: Some(v2),
                /// field_2: Some(v1),            field_2: Some(v2),          field_2: Some(v2),
                /// field_3: Some(v1),            field_3: None,              field_3: Some(v1),
                pub fn config_override(
                    mut self,
                    config_override: impl #{Into}<crate::config::Builder>,
                ) -> Self {
                    self.config_override = Some(config_override.into());
                    self
                }

                /// Sends the request and returns the response.
                pub async fn send(
                    self
                ) -> #{Result}<
                    #{OperationOutput},
                    #{SdkError}<
                        #{OperationError},
                        #{HttpResponse}
                    >
                > {
                    self.send_orchestrator_with_plugin(#{Option}::<#{Box}<dyn #{RuntimePlugin} + #{Send} + #{Sync}>>::None)
                        .await
                }

                ##[doc(hidden)]
                // TODO(enableNewSmithyRuntime): Delete when unused
                /// Equivalent to [`Self::send`] but adds a final runtime plugin to shim missing behavior
                pub async fn send_orchestrator_with_plugin(
                    self,
                    final_plugin: #{Option}<impl #{RuntimePlugin} + #{Send} + #{Sync} + 'static>
                ) -> #{Result}<#{OperationOutput}, #{SdkError}<#{OperationError}, #{HttpResponse}>> {
                    let mut config_override = if let Some(config_override) = self.config_override {
                        config_override
                    } else {
                        crate::config::Builder::new()
                    };

                    self.interceptors.into_iter().for_each(|interceptor| {
                        config_override.add_interceptor(interceptor);
                    });

                    self.fluent_builder
                        .config_override(config_override)
                        .send_orchestrator_with_plugin(final_plugin)
                        .await
                }
            }
            """,
            *codegenScope,
        )
    }
}

fun renderCustomizableOperationSend(runtimeConfig: RuntimeConfig, generics: FluentClientGenerics, writer: RustWriter) {
+38 −27
Original line number Diff line number Diff line
@@ -84,13 +84,17 @@ class FluentClientGenerator(
    fun render(crate: RustCrate) {
        renderFluentClient(crate)

        val customizableOperationGenerator = CustomizableOperationGenerator(codegenContext, generics)
        operations.forEach { operation ->
            crate.withModule(symbolProvider.moduleForBuilder(operation)) {
                renderFluentBuilder(operation)
                if (codegenContext.smithyRuntimeMode.generateOrchestrator) {
                    customizableOperationGenerator.renderForOrchestrator(this, operation)
                }
            }
        }

        CustomizableOperationGenerator(codegenContext, generics).render(crate)
        customizableOperationGenerator.render(crate)
    }

    private fun renderFluentClient(crate: RustCrate) {
@@ -307,9 +311,9 @@ class FluentClientGenerator(
            )
            rustTemplate(
                """
                /// Consume this builder, creating a customizable operation that can be modified before being
                /// sent. The operation's inner [http::Request] can be modified as well.
                pub async fn customize(self) -> #{Result}<
                // This function will go away in the near future. Do not rely on it.
                ##[doc(hidden)]
                pub async fn customize_middleware(self) -> #{Result}<
                    #{CustomizableOperation}#{customizable_op_type_params:W},
                    #{SdkError}<#{OperationError}>
                > #{send_bounds:W} {
@@ -349,6 +353,15 @@ class FluentClientGenerator(
                    #{send_bounds:W} {
                        self.send_middleware().await
                    }

                    /// Consumes this builder, creating a customizable operation that can be modified before being
                    /// sent. The operation's inner [http::Request] can be modified as well.
                    pub async fn customize(self) -> #{Result}<
                        #{CustomizableOperation}#{customizable_op_type_params:W},
                        #{SdkError}<#{OperationError}>
                    > #{send_bounds:W} {
                        self.customize_middleware().await
                    }
                    """,
                    *middlewareScope,
                )
@@ -357,6 +370,8 @@ class FluentClientGenerator(
            if (smithyRuntimeMode.generateOrchestrator) {
                val orchestratorScope = arrayOf(
                    *preludeScope,
                    "CustomizableOperation" to symbolProvider.moduleForBuilder(operation).toType()
                        .resolve("CustomizableOperation"),
                    "HttpResponse" to RuntimeType.smithyRuntimeApi(runtimeConfig)
                        .resolve("client::orchestrator::HttpResponse"),
                    "OperationError" to errorType,
@@ -382,10 +397,10 @@ class FluentClientGenerator(
                    pub async fn send_orchestrator_with_plugin(self, final_plugin: #{Option}<impl #{RuntimePlugin} + #{Send} + #{Sync} + 'static>) -> #{Result}<#{OperationOutput}, #{SdkError}<#{OperationError}, #{HttpResponse}>> {
                        let mut runtime_plugins = #{RuntimePlugins}::new()
                            .with_client_plugin(crate::config::ServiceRuntimePlugin::new(self.handle.clone()));
                        runtime_plugins = runtime_plugins.with_operation_plugin(#{Operation}::new());
                        if let Some(config_override) = self.config_override {
                            runtime_plugins = runtime_plugins.with_operation_plugin(config_override);
                        }
                        runtime_plugins = runtime_plugins.with_operation_plugin(#{Operation}::new());
                        if let Some(final_plugin) = final_plugin {
                            runtime_plugins = runtime_plugins.with_client_plugin(final_plugin);
                        }
@@ -402,6 +417,12 @@ class FluentClientGenerator(
                            })?;
                        #{Ok}(#{TypedBox}::<#{OperationOutput}>::assume_from(output).expect("correct output type").unwrap())
                    }

                    ##[doc(hidden)]
                    // TODO(enableNewSmithyRuntime): Remove `async` once we switch to orchestrator
                    pub async fn customize_orchestrator(self) -> #{CustomizableOperation} {
                        #{CustomizableOperation} { fluent_builder: self, config_override: None, interceptors: vec![] }
                    }
                    """,
                    *orchestratorScope,
                )
@@ -419,6 +440,16 @@ class FluentClientGenerator(
                        pub async fn send(self) -> #{Result}<#{OperationOutput}, #{SdkError}<#{OperationError}, #{HttpResponse}>> {
                            self.send_orchestrator().await
                        }

                        /// Consumes this builder, creating a customizable operation that can be modified before being
                        /// sent.
                        // TODO(enableNewSmithyRuntime): Remove `async` and `Result` once we switch to orchestrator
                        pub async fn customize(self) -> #{Result}<
                            #{CustomizableOperation},
                            #{SdkError}<#{OperationError}>
                        > {
                            #{Ok}(self.customize_orchestrator().await)
                        }
                        """,
                        *orchestratorScope,
                    )
@@ -426,17 +457,7 @@ class FluentClientGenerator(

                rustTemplate(
                    """
                    /// Sets the `config_override` for the builder.
                    ///
                    /// `config_override` is applied to the operation configuration level.
                    /// The fields in the builder that are `Some` override those applied to the service
                    /// configuration level. For instance,
                    ///
                    /// Config A     overridden by    Config B          ==        Config C
                    /// field_1: None,                field_1: Some(v2),          field_1: Some(v2),
                    /// field_2: Some(v1),            field_2: Some(v2),          field_2: Some(v2),
                    /// field_3: Some(v1),            field_3: None,              field_3: Some(v1),
                    pub fn config_override(
                    pub(crate) fn config_override(
                        mut self,
                        config_override: impl Into<crate::config::Builder>,
                    ) -> Self {
@@ -444,17 +465,7 @@ class FluentClientGenerator(
                        self
                    }

                    /// Sets the `config_override` for the builder.
                    ///
                    /// `config_override` is applied to the operation configuration level.
                    /// The fields in the builder that are `Some` override those applied to the service
                    /// configuration level. For instance,
                    ///
                    /// Config A     overridden by    Config B          ==        Config C
                    /// field_1: None,                field_1: Some(v2),          field_1: Some(v2),
                    /// field_2: Some(v1),            field_2: Some(v2),          field_2: Some(v2),
                    /// field_3: Some(v1),            field_3: None,              field_3: Some(v1),
                    pub fn set_config_override(
                    pub(crate) fn set_config_override(
                        &mut self,
                        config_override: Option<crate::config::Builder>,
                    ) -> &mut Self {
+3 −0
Original line number Diff line number Diff line
@@ -29,3 +29,6 @@ pub mod runtime_plugin;

/// Smithy identity used by auth and signing.
pub mod identity;

/// Interceptors for Smithy clients.
pub mod interceptor;
Loading