Unverified Commit 386ded88 authored by Zelda Hessler's avatar Zelda Hessler Committed by GitHub
Browse files

add connection poisoning support to the orchestrator (#2824)



This PR updates the orchestrator to support connection poisoning for the
`hyper` connector. It also renames many usages of "connection" to
"connector" in order to make talking about these concepts less
confusing. In short:

```
/// A "connector" manages one or more "connections", handles connection timeouts, re-establishes
/// connections, etc.
///
/// "Connections" refers to the actual transport layer implementation of the connector.
/// By default, the orchestrator uses a connector provided by `hyper`.
```

One possibly controversial decision I made is that `reconnect_mode` is
now a standalone configurable field, rather that a sub-field of
`RetryConfig`. I think we should get together as a team and discuss what
kinds of things we want to allow users to do when configuring
connections.

My thoughts on this are that we should either:

- **A.** Make connection-related settings their own types instead of
including them within other config types
- **B.** Make a single config struct for all connector-related stuff/use
the preëxisting `ConnectorSettings` struct.

Personally, I'm partial to A.

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._

---------

Co-authored-by: default avatarJohn DiSanti <jdisanti@amazon.com>
parent 71b401f5
Loading
Loading
Loading
Loading
+7 −4
Original line number Diff line number Diff line
@@ -15,6 +15,7 @@ import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate
import software.amazon.smithy.rust.codegen.core.rustlang.writable
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeConfig
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType
import software.amazon.smithy.rust.codegen.core.util.letIf

class RetryClassifierDecorator : ClientCodegenDecorator {
    override val name: String = "RetryPolicy"
@@ -25,11 +26,13 @@ class RetryClassifierDecorator : ClientCodegenDecorator {
        operation: OperationShape,
        baseCustomizations: List<OperationCustomization>,
    ): List<OperationCustomization> =
        baseCustomizations + RetryClassifierFeature(codegenContext.runtimeConfig) + OperationRetryClassifiersFeature(
        (baseCustomizations + RetryClassifierFeature(codegenContext.runtimeConfig)).letIf(codegenContext.smithyRuntimeMode.generateOrchestrator) {
            it + OperationRetryClassifiersFeature(
                codegenContext,
                operation,
            )
        }
}

class RetryClassifierFeature(private val runtimeConfig: RuntimeConfig) : OperationCustomization() {
    override fun retryType(): RuntimeType =
+0 −1
Original line number Diff line number Diff line
@@ -52,7 +52,6 @@ class TimestreamDecorator : ClientCodegenDecorator {
            Visibility.PUBLIC,
            CargoDependency.Tokio.copy(scope = DependencyScope.Compile, features = setOf("sync")),
        )
        val runtimeMode = codegenContext.smithyRuntimeMode
        rustCrate.lib {
            // helper function to resolve an endpoint given a base client
            rustTemplate(
+4 −4
Original line number Diff line number Diff line
@@ -12,8 +12,8 @@ publish = false

[dev-dependencies]
async-std = "1.12.0"
aws-credential-types = { path = "../../build/aws-sdk/sdk/aws-credential-types", features = ["test-util"] }
aws-config = { path = "../../build/aws-sdk/sdk/aws-config" }
aws-credential-types = { path = "../../build/aws-sdk/sdk/aws-credential-types", features = ["test-util"] }
aws-http = { path = "../../build/aws-sdk/sdk/aws-http" }
aws-sdk-s3 = { path = "../../build/aws-sdk/sdk/s3" }
aws-sdk-sts = { path = "../../build/aws-sdk/sdk/sts" }
@@ -36,9 +36,9 @@ serde_json = "1"
smol = "1.2"
tempfile = "3"
tokio = { version = "1.23.1", features = ["macros", "test-util", "rt-multi-thread"] }
# If you're writing a test with this, take heed! `no-env-filter` means you'll be capturing
# logs from everything that speaks, so be specific with your asserts.
tracing-test = { version = "0.2.4", features = ["no-env-filter"] }
tracing = "0.1.37"
tracing-appender = "0.2.2"
tracing-subscriber = { version = "0.3.15", features = ["env-filter", "json"] }
# If you're writing a test with this, take heed! `no-env-filter` means you'll be capturing
# logs from everything that speaks, so be specific with your asserts.
tracing-test = { version = "0.2.4", features = ["no-env-filter"] }
+14 −16
Original line number Diff line number Diff line
@@ -3,20 +3,16 @@
 * SPDX-License-Identifier: Apache-2.0
 */

use aws_credential_types::provider::SharedCredentialsProvider;
use aws_credential_types::Credentials;
use aws_smithy_async::rt::sleep::{SharedAsyncSleep, TokioSleep};
use aws_sdk_s3::config::retry::{ReconnectMode, RetryConfig};
use aws_sdk_s3::config::{Credentials, Region, SharedAsyncSleep};
use aws_smithy_async::rt::sleep::TokioSleep;
use aws_smithy_client::test_connection::wire_mock::{
    check_matches, ReplayedEvent, WireLevelTestConnection,
};
use aws_smithy_client::{ev, match_events};
use aws_smithy_types::retry::{ReconnectMode, RetryConfig};
use aws_types::region::Region;
use aws_types::SdkConfig;

#[tokio::test]
/// test that disabling reconnects on retry config disables them for the client
async fn disable_reconnects() {
async fn test_disable_reconnect_on_503() {
    let mock = WireLevelTestConnection::spinup(vec![
        ReplayedEvent::status(503),
        ReplayedEvent::status(503),
@@ -24,9 +20,9 @@ async fn disable_reconnects() {
    ])
    .await;

    let sdk_config = SdkConfig::builder()
    let config = aws_sdk_s3::Config::builder()
        .region(Region::from_static("us-east-2"))
        .credentials_provider(SharedCredentialsProvider::new(Credentials::for_tests()))
        .credentials_provider(Credentials::for_tests())
        .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
        .endpoint_url(mock.endpoint_url())
        .http_connector(mock.http_connector())
@@ -34,7 +30,7 @@ async fn disable_reconnects() {
            RetryConfig::standard().with_reconnect_mode(ReconnectMode::ReuseAllConnections),
        )
        .build();
    let client = aws_sdk_s3::Client::new(&sdk_config);
    let client = aws_sdk_s3::Client::from_conf(config);
    let resp = client
        .get_object()
        .bucket("bucket")
@@ -56,7 +52,7 @@ async fn disable_reconnects() {
}

#[tokio::test]
async fn reconnect_on_503() {
async fn test_enabling_reconnect_on_503() {
    let mock = WireLevelTestConnection::spinup(vec![
        ReplayedEvent::status(503),
        ReplayedEvent::status(503),
@@ -64,15 +60,17 @@ async fn reconnect_on_503() {
    ])
    .await;

    let sdk_config = SdkConfig::builder()
    let config = aws_sdk_s3::Config::builder()
        .region(Region::from_static("us-east-2"))
        .credentials_provider(SharedCredentialsProvider::new(Credentials::for_tests()))
        .credentials_provider(Credentials::for_tests())
        .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
        .endpoint_url(mock.endpoint_url())
        .http_connector(mock.http_connector())
        .retry_config(RetryConfig::standard())
        .retry_config(
            RetryConfig::standard().with_reconnect_mode(ReconnectMode::ReconnectOnTransientError),
        )
        .build();
    let client = aws_sdk_s3::Client::new(&sdk_config);
    let client = aws_sdk_s3::Client::from_conf(config);
    let resp = client
        .get_object()
        .bucket("bucket")
+37 −0
Original line number Diff line number Diff line
/*
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0
 */

package software.amazon.smithy.rust.codegen.client.smithy.customizations

import software.amazon.smithy.rust.codegen.client.smithy.ClientCodegenContext
import software.amazon.smithy.rust.codegen.client.smithy.generators.ServiceRuntimePluginCustomization
import software.amazon.smithy.rust.codegen.client.smithy.generators.ServiceRuntimePluginSection
import software.amazon.smithy.rust.codegen.core.rustlang.Writable
import software.amazon.smithy.rust.codegen.core.rustlang.rust
import software.amazon.smithy.rust.codegen.core.rustlang.writable
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType.Companion.smithyRuntime

class ConnectionPoisoningRuntimePluginCustomization(
    codegenContext: ClientCodegenContext,
) : ServiceRuntimePluginCustomization() {
    private val runtimeConfig = codegenContext.runtimeConfig

    override fun section(section: ServiceRuntimePluginSection): Writable = writable {
        when (section) {
            is ServiceRuntimePluginSection.RegisterInterceptor -> {
                // This interceptor assumes that a compatible Connector is set. Otherwise, connection poisoning
                // won't work and an error message will be logged.
                section.registerInterceptor(runtimeConfig, this) {
                    rust(
                        "#T::new()",
                        smithyRuntime(runtimeConfig).resolve("client::connectors::connection_poisoning::ConnectionPoisoningInterceptor"),
                    )
                }
            }

            else -> emptySection
        }
    }
}
Loading