Unverified Commit 7d1d35c9 authored by John DiSanti's avatar John DiSanti Committed by GitHub
Browse files

Fix Timestream in the orchestrator (#2846)

This PR fixes Timestream in the orchestrator implementation.

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
parent 29997665
Loading
Loading
Loading
Loading
+6 −6
Original line number Diff line number Diff line
@@ -127,7 +127,7 @@ author = "rcoh"
message = "Bump dependency on `lambda_http` by `aws-smithy-http-server` to 0.8.0. This version of `aws-smithy-http-server` is only guaranteed to be compatible with 0.8.0, or semver-compatible versions of 0.8.0 of the `lambda_http` crate. It will not work with versions prior to 0.8.0 _at runtime_, making requests to your smithy-rs service unroutable, so please make sure you're running your service in a compatible configuration"
author = "david-perez"
references = ["smithy-rs#2676", "smithy-rs#2685"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
meta = { "breaking" = true, "tada" = false, "bug" = false, target = "server" }

[[smithy-rs]]
message = """Remove `PollError` from an operations `Service::Error`.
@@ -141,9 +141,9 @@ meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "server" }
author = "hlbarber"

[[aws-sdk-rust]]
message = "The SDK has added support for timestreamwrite and timestreamquery. Support for these services is considered experimental at this time. In order to use these services, you MUST call `.enable_endpoint_discovery()` on the `Client` after construction."
message = "The SDK has added support for timestreamwrite and timestreamquery. Support for these services is considered experimental at this time. In order to use these services, you MUST call `.with_endpoint_discovery_enabled()` on the `Client` after construction."
meta = { "breaking" = false, "tada" = true, "bug" = false }
references = ["smithy-rs#2707", "aws-sdk-rust#114"]
references = ["smithy-rs#2707", "aws-sdk-rust#114", "smithy-rs#2846"]
author = "rcoh"

[[smithy-rs]]
@@ -197,7 +197,7 @@ filter_by_operation_id(plugin, |id| id.absolute() != "namespace#name");
"""
author = "82marbag"
references = ["smithy-rs#2678"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
meta = { "breaking" = true, "tada" = false, "bug" = false, target = "server" }

[[smithy-rs]]
message = "The occurrences of `Arc<dyn ResolveEndpoint>` have now been replaced with `SharedEndpointResolver` in public APIs."
@@ -532,7 +532,7 @@ let scoped_plugin = Scoped::new::<SomeScope>(plugin);

"""
references = ["smithy-rs#2740", "smithy-rs#2759", "smithy-rs#2779", "smithy-rs#2827"]
meta = { "breaking" = true, "tada" = true, "bug" = false }
meta = { "breaking" = true, "tada" = true, "bug" = false, target = "server" }
author = "hlbarber"

[[smithy-rs]]
@@ -608,7 +608,7 @@ let plugin = plugin_from_operation_fn(map);
```
"""
references = ["smithy-rs#2740", "smithy-rs#2759", "smithy-rs#2779"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
meta = { "breaking" = true, "tada" = false, "bug" = false, target = "server" }
author = "hlbarber"

[[smithy-rs]]
+3 −0
Original line number Diff line number Diff line
@@ -39,6 +39,7 @@ impl ReloadEndpoint {
    pub async fn reload_once(&self) {
        match (self.loader)().await {
            Ok((endpoint, expiry)) => {
                tracing::debug!("caching resolved endpoint: {:?}", (&endpoint, &expiry));
                *self.endpoint.lock().unwrap() = Some(ExpiringEndpoint { endpoint, expiry })
            }
            Err(err) => *self.error.lock().unwrap() = Some(err),
@@ -128,6 +129,7 @@ where
        sleep,
        time,
    };
    tracing::debug!("populating initial endpoint discovery cache");
    reloader.reload_once().await;
    // if we didn't successfully get an endpoint, bail out so the client knows
    // configuration failed to work
@@ -137,6 +139,7 @@ where

impl EndpointCache {
    fn resolve_endpoint(&self) -> aws_smithy_http::endpoint::Result {
        tracing::trace!("resolving endpoint from endpoint discovery cache");
        self.endpoint
            .lock()
            .unwrap()
+50 −33
Original line number Diff line number Diff line
@@ -6,6 +6,7 @@
package software.amazon.smithy.rustsdk.customize.timestream

import software.amazon.smithy.rust.codegen.client.smithy.ClientCodegenContext
import software.amazon.smithy.rust.codegen.client.smithy.ClientRustModule
import software.amazon.smithy.rust.codegen.client.smithy.customize.ClientCodegenDecorator
import software.amazon.smithy.rust.codegen.client.smithy.endpoint.Types
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency
@@ -14,10 +15,10 @@ import software.amazon.smithy.rust.codegen.core.rustlang.Visibility
import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate
import software.amazon.smithy.rust.codegen.core.rustlang.toType
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType.Companion.preludeScope
import software.amazon.smithy.rust.codegen.core.smithy.RustCrate
import software.amazon.smithy.rust.codegen.core.smithy.customize.AdHocCustomization
import software.amazon.smithy.rust.codegen.core.smithy.customize.adhocCustomization
import software.amazon.smithy.rust.codegen.core.util.letIf
import software.amazon.smithy.rustsdk.AwsCargoDependency
import software.amazon.smithy.rustsdk.DocSection
import software.amazon.smithy.rustsdk.InlineAwsDependency
@@ -25,21 +26,25 @@ import software.amazon.smithy.rustsdk.InlineAwsDependency
/**
 * This decorator does two things:
 * 1. Adds the `endpoint_discovery` inlineable
 * 2. Adds a `enable_endpoint_discovery` method on client that returns a wrapped client with endpoint discovery enabled
 * 2. Adds a `with_endpoint_discovery_enabled` method on client that returns a wrapped client with endpoint discovery enabled
 */
class TimestreamDecorator : ClientCodegenDecorator {
    override val name: String = "Timestream"
    override val order: Byte = -1

    override fun extraSections(codegenContext: ClientCodegenContext): List<AdHocCustomization> {
        return listOf(
    private fun applies(codegenContext: ClientCodegenContext): Boolean =
        codegenContext.smithyRuntimeMode.defaultToOrchestrator

    override fun extraSections(codegenContext: ClientCodegenContext): List<AdHocCustomization> =
        emptyList<AdHocCustomization>().letIf(applies(codegenContext)) {
            listOf(
                adhocCustomization<DocSection.CreateClient> {
                    addDependency(AwsCargoDependency.awsConfig(codegenContext.runtimeConfig).toDevDependency())
                    rustTemplate(
                        """
                        let config = aws_config::load_from_env().await;
                    // You MUST call `enable_endpoint_discovery` to produce a working client for this service.
                    let ${it.clientName} = ${it.crateName}::Client::new(&config).enable_endpoint_discovery().await;
                        // You MUST call `with_endpoint_discovery_enabled` to produce a working client for this service.
                        let ${it.clientName} = ${it.crateName}::Client::new(&config).with_endpoint_discovery_enabled().await;
                        """.replaceIndent(it.indent),
                    )
                },
@@ -47,12 +52,17 @@ class TimestreamDecorator : ClientCodegenDecorator {
        }

    override fun extras(codegenContext: ClientCodegenContext, rustCrate: RustCrate) {
        if (!applies(codegenContext)) {
            return
        }

        val endpointDiscovery = InlineAwsDependency.forRustFile(
            "endpoint_discovery",
            Visibility.PUBLIC,
            CargoDependency.Tokio.copy(scope = DependencyScope.Compile, features = setOf("sync")),
            CargoDependency.smithyAsync(codegenContext.runtimeConfig).toDevDependency().withFeature("test-util"),
        )
        rustCrate.lib {
        rustCrate.withModule(ClientRustModule.client) {
            // helper function to resolve an endpoint given a base client
            rustTemplate(
                """
@@ -76,33 +86,40 @@ class TimestreamDecorator : ClientCodegenDecorator {
                    /// Enable endpoint discovery for this client
                    ///
                    /// This method MUST be called to construct a working client.
                    pub async fn enable_endpoint_discovery(self) -> #{Result}<(Self, #{endpoint_discovery}::ReloadEndpoint), #{ResolveEndpointError}> {
                        let mut new_conf = self.conf().clone();
                        let sleep = self.conf().sleep_impl().expect("sleep impl must be provided");
                        let time = self.conf().time_source().expect("time source must be provided");
                    pub async fn with_endpoint_discovery_enabled(self) -> #{Result}<(Self, #{endpoint_discovery}::ReloadEndpoint), #{ResolveEndpointError}> {
                        let handle = self.handle.clone();

                        // The original client without endpoint discover gets moved into the endpoint discovery
                        // resolver since calls to DescribeEndpoint without discovery need to be made.
                        let client_without_discovery = self;
                        let (resolver, reloader) = #{endpoint_discovery}::create_cache(
                            move || {
                                let client = self.clone();
                                let client = client_without_discovery.clone();
                                async move { resolve_endpoint(&client).await }
                            },
                            sleep,
                            time
                        )
                        .await?;
                        new_conf.endpoint_resolver = #{SharedEndpointResolver}::new(resolver);
                        Ok((Self::from_conf(new_conf), reloader))
                            handle.conf.sleep_impl()
                                .expect("endpoint discovery requires the client config to have a sleep impl"),
                            handle.conf.time_source()
                                .expect("endpoint discovery requires the client config to have a time source"),
                        ).await?;

                        let client_with_discovery = crate::Client::from_conf(
                            handle.conf.to_builder()
                                    .endpoint_resolver(#{SharedEndpointResolver}::new(resolver))
                                    .build()
                        );
                        Ok((client_with_discovery, reloader))
                    }
                }
                """,
                "endpoint_discovery" to endpointDiscovery.toType(),
                "SystemTime" to RuntimeType.std.resolve("time::SystemTime"),
                *RuntimeType.preludeScope,
                "Arc" to RuntimeType.Arc,
                "Duration" to RuntimeType.std.resolve("time::Duration"),
                "SharedEndpointResolver" to RuntimeType.smithyHttp(codegenContext.runtimeConfig)
                    .resolve("endpoint::SharedEndpointResolver"),
                "SystemTimeSource" to RuntimeType.smithyAsync(codegenContext.runtimeConfig)
                    .resolve("time::SystemTimeSource"),
                "SystemTime" to RuntimeType.std.resolve("time::SystemTime"),
                "endpoint_discovery" to endpointDiscovery.toType(),
                *Types(codegenContext.runtimeConfig).toArray(),
                *preludeScope,
            )
        }
    }
+18 −0
Original line number Diff line number Diff line
/*
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0
 */

#[cfg(aws_sdk_orchestrator_mode)]
#[tokio::test]
async fn test_config_to_builder() {
    use aws_sdk_s3::config::AppName;

    let config = aws_config::load_from_env().await;
    let config = aws_sdk_s3::Config::new(&config);
    // should not panic
    let _ = config
        .to_builder()
        .app_name(AppName::new("SomeAppName").unwrap())
        .build();
}
+3 −2
Original line number Diff line number Diff line
@@ -13,8 +13,9 @@ publish = false
[dev-dependencies]
aws-credential-types = { path = "../../build/aws-sdk/sdk/aws-credential-types", features = ["test-util"] }
aws-sdk-timestreamquery = { path = "../../build/aws-sdk/sdk/timestreamquery" }
tokio = { version = "1.23.1", features = ["full", "test-util"] }
aws-smithy-client = { path = "../../build/aws-sdk/sdk/aws-smithy-client", features = ["test-util"] }
aws-smithy-async = { path = "../../build/aws-sdk/sdk/aws-smithy-async", features = ["test-util"] }
aws-smithy-client = { path = "../../build/aws-sdk/sdk/aws-smithy-client", features = ["test-util"] }
aws-smithy-runtime = { path = "../../build/aws-sdk/sdk/aws-smithy-runtime", features = ["test-util"] }
aws-types = { path = "../../build/aws-sdk/sdk/aws-types" }
tokio = { version = "1.23.1", features = ["full", "test-util"] }
tracing-subscriber = "0.3.17"
Loading