Unverified Commit d8a7d999 authored by Russell Cohen's avatar Russell Cohen Committed by GitHub
Browse files

Fix bug in default HTTP Connector provided by aws-config (#2471)



* Fix bug in default HTTP Connector provided by aws-config

* Fix clippy lint

---------

Co-authored-by: default avatarJohn DiSanti <jdisanti@amazon.com>
parent 0e173721
Loading
Loading
Loading
Loading
+15 −0
Original line number Diff line number Diff line
@@ -338,3 +338,18 @@ message = "Update MSRV to 1.66.1"
references = ["smithy-rs#2467"]
meta = { "breaking" = true, "tada" = true, "bug" = false, "target" = "all" }
author = "Velfi"

###############

[[aws-sdk-rust]]
message = """Default connector provided by `aws-config` now respects `ConnectorSettings`.

Previously, it used the timeout settings provided by aws-config. A test from @Oliboy50 has been incorporated to verify this behavior.

**Behavior Change**: Prior to this change, the Hyper client would be shared between all service clients. After this change, each service client will use its own Hyper Client.
To revert to the previous behavior, set `HttpConnector::Prebuilt` on `SdkConfig::http_connector`.
"""
references = ["smithy-rs#2471", "smithy-rs#2333", "smithy-rs#2151"]
meta = { "breaking" = false, "tada" = false, "bug" = true }
author = "rcoh"
+1 −0
Original line number Diff line number Diff line
@@ -35,6 +35,7 @@ pub fn default_connector(
    settings: &ConnectorSettings,
    sleep: Option<Arc<dyn AsyncSleep>>,
) -> Option<DynConnector> {
    tracing::trace!(settings = ?settings, sleep = ?sleep, "creating a new connector");
    let hyper = base(settings, sleep).build(aws_smithy_client::conns::https());
    Some(DynConnector::new(hyper))
}
+4 −7
Original line number Diff line number Diff line
@@ -154,7 +154,7 @@ mod loader {
    use aws_credential_types::cache::CredentialsCache;
    use aws_credential_types::provider::{ProvideCredentials, SharedCredentialsProvider};
    use aws_smithy_async::rt::sleep::{default_async_sleep, AsyncSleep};
    use aws_smithy_client::http_connector::{ConnectorSettings, HttpConnector};
    use aws_smithy_client::http_connector::HttpConnector;
    use aws_smithy_types::retry::RetryConfig;
    use aws_smithy_types::timeout::TimeoutConfig;
    use aws_types::app_name::AppName;
@@ -570,12 +570,9 @@ mod loader {
                    .await
            };

            let http_connector = self.http_connector.unwrap_or_else(|| {
                HttpConnector::Prebuilt(default_connector(
                    &ConnectorSettings::from_timeout_config(&timeout_config),
                    sleep_impl.clone(),
                ))
            });
            let http_connector = self
                .http_connector
                .unwrap_or_else(|| HttpConnector::ConnectorFn(Arc::new(default_connector)));

            let credentials_cache = self.credentials_cache.unwrap_or_else(|| {
                let mut builder = CredentialsCache::lazy_builder().time_source(conf.time_source());
+99 −0
Original line number Diff line number Diff line
/*
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0
 */

use aws_credential_types::provider::SharedCredentialsProvider;
use aws_credential_types::Credentials;
use aws_smithy_async::rt::sleep::{AsyncSleep, TokioSleep};

use aws_smithy_client::http_connector::{ConnectorSettings, HttpConnector};
use aws_smithy_client::test_connection;

use aws_smithy_http::result::SdkError;
use aws_smithy_types::timeout::TimeoutConfig;
use aws_types::region::Region;
use aws_types::SdkConfig;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;

/// Verify that `make_connector_fn` isn't called per request
#[tokio::test]
async fn make_connector_fn_test() {
    let sentinel = Arc::new(AtomicUsize::new(0));
    let connector_sentinel = sentinel.clone();
    let connector_with_counter = HttpConnector::ConnectorFn(Arc::new(
        move |_settings: &ConnectorSettings, _sleep: Option<Arc<dyn AsyncSleep>>| {
            connector_sentinel.fetch_add(1, Ordering::Relaxed);
            Some(test_connection::infallible_connection_fn(|_req| {
                http::Response::builder().status(200).body("ok!").unwrap()
            }))
        },
    ));
    let sdk_config = SdkConfig::builder()
        .http_connector(connector_with_counter)
        .credentials_provider(SharedCredentialsProvider::new(Credentials::for_tests()))
        .region(Region::from_static("us-east-1"))
        .build();
    let client = aws_sdk_s3::Client::new(&sdk_config);
    assert_eq!(sentinel.load(Ordering::Relaxed), 1);
    for _ in 0..10 {
        let _ = client
            .get_object()
            .bucket("foo")
            .key("bar")
            .send()
            .await
            .expect("test connector replies with 200");
    }
    assert_eq!(sentinel.load(Ordering::Relaxed), 1);
    // but creating another client creates another connector
    let _client_2 = aws_sdk_s3::Client::new(&sdk_config);
    assert_eq!(sentinel.load(Ordering::Relaxed), 2);
}

/// Use a 5 second operation timeout on SdkConfig and a 0ms connect timeout on the service config
#[tokio::test]
async fn timeouts_can_be_set_by_service() {
    let sdk_config = SdkConfig::builder()
        .credentials_provider(SharedCredentialsProvider::new(Credentials::for_tests()))
        .region(Region::from_static("us-east-1"))
        .sleep_impl(Arc::new(TokioSleep::new()))
        .timeout_config(
            TimeoutConfig::builder()
                .operation_timeout(Duration::from_secs(5))
                .build(),
        )
        // ip that
        .endpoint_url(
            // Emulate a connect timeout error by hitting an unroutable IP
            "http://172.255.255.0:18104",
        )
        .build();
    let config = aws_sdk_s3::config::Builder::from(&sdk_config)
        .timeout_config(
            TimeoutConfig::builder()
                .connect_timeout(Duration::from_secs(0))
                .build(),
        )
        .build();
    let client = aws_sdk_s3::Client::from_conf(config);
    let start = Instant::now();
    let err = client
        .get_object()
        .key("foo")
        .bucket("bar")
        .send()
        .await
        .expect_err("unroutable IP should timeout");
    match err {
        SdkError::DispatchFailure(err) => assert!(err.is_timeout()),
        // if the connect timeout is not respected, this times out after 1 second because of the operation timeout with `SdkError::Timeout`
        _other => panic!("unexpected error: {:?}", _other),
    }
    // there should be a 0ms timeout, we gotta set some stuff up. Just want to make sure
    // it's shorter than the 5 second timeout if the test is broken
    assert!(start.elapsed() < Duration::from_millis(500));
}
+59 −0
Original line number Diff line number Diff line
@@ -7,6 +7,7 @@
// TODO(docs)
#![allow(missing_docs)]

use std::fmt::{Debug, Formatter};
use std::future::Ready;
use std::ops::Deref;
use std::sync::{Arc, Mutex};
@@ -16,6 +17,7 @@ use http::header::{HeaderName, CONTENT_TYPE};
use http::Request;
use tokio::sync::oneshot;

use crate::erase::DynConnector;
use aws_smithy_http::body::SdkBody;
use aws_smithy_http::result::ConnectorError;
use aws_smithy_protocol_test::{assert_ok, validate_body, MediaType};
@@ -271,6 +273,63 @@ where
    }
}

/// Create a DynConnector from `Fn(http:Request) -> http::Response`
///
/// # Examples
///
/// ```rust
/// use aws_smithy_client::test_connection::infallible_connection_fn;
/// let connector = infallible_connection_fn(|_req|http::Response::builder().status(200).body("OK!").unwrap());
/// ```
pub fn infallible_connection_fn<B>(
    f: impl Fn(http::Request<SdkBody>) -> http::Response<B> + Send + Sync + 'static,
) -> DynConnector
where
    B: Into<SdkBody>,
{
    ConnectionFn::infallible(f)
}

#[derive(Clone)]
struct ConnectionFn {
    #[allow(clippy::type_complexity)]
    response: Arc<
        dyn Fn(http::Request<SdkBody>) -> Result<http::Response<SdkBody>, ConnectorError>
            + Send
            + Sync,
    >,
}

impl Debug for ConnectionFn {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("ConnectionFn").finish()
    }
}

impl ConnectionFn {
    fn infallible<B: Into<SdkBody>>(
        f: impl Fn(http::Request<SdkBody>) -> http::Response<B> + Send + Sync + 'static,
    ) -> DynConnector {
        DynConnector::new(Self {
            response: Arc::new(move |request| Ok(f(request).map(|b| b.into()))),
        })
    }
}

impl tower::Service<http::Request<SdkBody>> for ConnectionFn {
    type Response = http::Response<SdkBody>;
    type Error = ConnectorError;
    type Future = Ready<Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: Request<SdkBody>) -> Self::Future {
        std::future::ready((self.response)(req))
    }
}

/// [`wire_mock`] contains utilities for mocking at the socket level
///
/// Other tools in this module actually operate at the `http::Request` / `http::Response` level. This
Loading