Unverified Commit 0f908065 authored by John DiSanti's avatar John DiSanti Committed by GitHub
Browse files

Remove `CaptureSmithyConnectionWrapper` (#3045)

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
parent e61fb6f0
Loading
Loading
Loading
Loading
+1 −55
Original line number Diff line number Diff line
@@ -7,7 +7,7 @@

use std::fmt::{Debug, Formatter};
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::sync::Arc;

/// Metadata that tracks the state of an active connection.
#[derive(Clone)]
@@ -51,57 +51,3 @@ impl Debug for ConnectionMetadata {
            .finish()
    }
}

type LoaderFn = dyn Fn() -> Option<ConnectionMetadata> + Send + Sync;

/// State for a middleware that will monitor and manage connections.
#[allow(missing_debug_implementations)]
#[derive(Clone, Default)]
pub struct CaptureSmithyConnection {
    loader: Arc<Mutex<Option<Box<LoaderFn>>>>,
}

impl CaptureSmithyConnection {
    /// Create a new connection monitor.
    pub fn new() -> Self {
        Self {
            loader: Default::default(),
        }
    }

    /// Set the retriever that will capture the `hyper` connection.
    pub fn set_connection_retriever<F>(&self, f: F)
    where
        F: Fn() -> Option<ConnectionMetadata> + Send + Sync + 'static,
    {
        *self.loader.lock().unwrap() = Some(Box::new(f));
    }

    /// Get the associated connection metadata.
    pub fn get(&self) -> Option<ConnectionMetadata> {
        match self.loader.lock().unwrap().as_ref() {
            Some(loader) => loader(),
            None => {
                tracing::debug!("no loader was set on the CaptureSmithyConnection");
                None
            }
        }
    }
}

#[cfg(test)]
mod test {
    use crate::connection::{CaptureSmithyConnection, ConnectionMetadata};

    #[test]
    #[allow(clippy::redundant_clone)]
    fn retrieve_connection_metadata() {
        let retriever = CaptureSmithyConnection::new();
        let retriever_clone = retriever.clone();
        assert!(retriever.get().is_none());
        retriever.set_connection_retriever(|| Some(ConnectionMetadata::new(true, None, || {})));

        assert!(retriever.get().is_some());
        assert!(retriever_clone.get().is_some());
    }
}
+47 −26
Original line number Diff line number Diff line
@@ -3,7 +3,7 @@
 * SPDX-License-Identifier: Apache-2.0
 */

use aws_smithy_http::connection::{CaptureSmithyConnection, ConnectionMetadata};
use aws_smithy_http::connection::ConnectionMetadata;
use aws_smithy_runtime_api::box_error::BoxError;
use aws_smithy_runtime_api::client::interceptors::context::{
    AfterDeserializationInterceptorContextRef, BeforeTransmitInterceptorContextMut,
@@ -14,6 +14,7 @@ use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
use aws_smithy_types::config_bag::{ConfigBag, Storable, StoreReplace};
use aws_smithy_types::retry::{ErrorKind, ReconnectMode, RetryConfig};
use std::fmt;
use std::sync::{Arc, Mutex};
use tracing::{debug, error};

/// An interceptor for poisoning connections in response to certain events.
@@ -52,11 +53,11 @@ impl Interceptor for ConnectionPoisoningInterceptor {
        _runtime_components: &RuntimeComponents,
        cfg: &mut ConfigBag,
    ) -> Result<(), BoxError> {
        let capture_smithy_connection = CaptureSmithyConnectionWrapper::new();
        let capture_smithy_connection = CaptureSmithyConnection::new();
        context
            .request_mut()
            .extensions_mut()
            .insert(capture_smithy_connection.clone_inner());
            .insert(capture_smithy_connection.clone());
        cfg.interceptor_state().store_put(capture_smithy_connection);

        Ok(())
@@ -72,7 +73,7 @@ impl Interceptor for ConnectionPoisoningInterceptor {
            .load::<RetryConfig>()
            .map(RetryConfig::reconnect_mode)
            .unwrap_or(ReconnectMode::ReconnectOnTransientError);
        let captured_connection = cfg.load::<CaptureSmithyConnectionWrapper>().cloned();
        let captured_connection = cfg.load::<CaptureSmithyConnection>().cloned();
        let retry_classifiers = runtime_components
            .retry_classifiers()
            .ok_or("retry classifiers are required for connection poisoning to work")?;
@@ -101,46 +102,66 @@ impl Interceptor for ConnectionPoisoningInterceptor {
    }
}

// TODO(enableNewSmithyRuntimeCleanup): A storable wrapper won't be needed anymore once we absorb aws_smithy_http into the new runtime crate.
/// A wrapper around CaptureSmithyConnection that implements `Storable` so that it can be added to the `ConfigBag`.
type LoaderFn = dyn Fn() -> Option<ConnectionMetadata> + Send + Sync;

/// State for a middleware that will monitor and manage connections.
#[allow(missing_debug_implementations)]
#[derive(Clone, Default)]
pub struct CaptureSmithyConnectionWrapper {
    inner: CaptureSmithyConnection,
pub struct CaptureSmithyConnection {
    loader: Arc<Mutex<Option<Box<LoaderFn>>>>,
}

impl CaptureSmithyConnectionWrapper {
    /// Creates a new `CaptureSmithyConnectionWrapper`.
impl CaptureSmithyConnection {
    /// Create a new connection monitor.
    pub fn new() -> Self {
        Self {
            inner: CaptureSmithyConnection::new(),
            loader: Default::default(),
        }
    }

    /// Returns a reference to the inner `CaptureSmithyConnection`.
    pub fn clone_inner(&self) -> CaptureSmithyConnection {
        self.inner.clone()
    /// Set the retriever that will capture the `hyper` connection.
    pub fn set_connection_retriever<F>(&self, f: F)
    where
        F: Fn() -> Option<ConnectionMetadata> + Send + Sync + 'static,
    {
        *self.loader.lock().unwrap() = Some(Box::new(f));
    }

    /// Returns the captured connection metadata, if any.
    /// Get the associated connection metadata.
    pub fn get(&self) -> Option<ConnectionMetadata> {
        self.inner.get()
        match self.loader.lock().unwrap().as_ref() {
            Some(loader) => loader(),
            None => {
                tracing::debug!("no loader was set on the CaptureSmithyConnection");
                None
            }
        }
    }
}

    /// Sets the connection retriever function.
    pub fn set_connection_retriever<F>(&self, f: F)
    where
        F: Fn() -> Option<ConnectionMetadata> + Send + Sync + 'static,
    {
        self.inner.set_connection_retriever(f)
impl fmt::Debug for CaptureSmithyConnection {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "CaptureSmithyConnection")
    }
}

impl Storable for CaptureSmithyConnectionWrapper {
impl Storable for CaptureSmithyConnection {
    type Storer = StoreReplace<Self>;
}

impl fmt::Debug for CaptureSmithyConnectionWrapper {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "CaptureSmithyConnectionWrapper")
#[cfg(test)]
mod test {
    use super::*;

    #[test]
    #[allow(clippy::redundant_clone)]
    fn retrieve_connection_metadata() {
        let retriever = CaptureSmithyConnection::new();
        let retriever_clone = retriever.clone();
        assert!(retriever.get().is_none());
        retriever.set_connection_retriever(|| Some(ConnectionMetadata::new(true, None, || {})));

        assert!(retriever.get().is_some());
        assert!(retriever_clone.get().is_some());
    }
}
+2 −1
Original line number Diff line number Diff line
@@ -3,10 +3,11 @@
 * SPDX-License-Identifier: Apache-2.0
 */

use crate::client::http::connection_poisoning::CaptureSmithyConnection;
use aws_smithy_async::future::timeout::TimedOutError;
use aws_smithy_async::rt::sleep::{default_async_sleep, AsyncSleep, SharedAsyncSleep};
use aws_smithy_http::body::SdkBody;
use aws_smithy_http::connection::{CaptureSmithyConnection, ConnectionMetadata};
use aws_smithy_http::connection::ConnectionMetadata;
use aws_smithy_http::result::ConnectorError;
use aws_smithy_runtime_api::box_error::BoxError;
use aws_smithy_runtime_api::client::http::{