Unverified Commit b78367cb authored by Declan Kelly's avatar Declan Kelly Committed by GitHub
Browse files

Record TCP connection local socket address in metadata (#3286)

## Motivation and Context
I want to use this field to uniquely identify TCP connection based on
their `local_addr` + `remote_addr`.

See https://github.com/awslabs/aws-sdk-rust/issues/990 for additional
motivation for this change.

## Description
- Add a new optional `local_addr` field in the `ConnectionMetadata`
struct.
- Transfer the `local_addr` `SocketAddress` from the `hyper::HttpInfo`
to the `ConnectionMetadata` field.
- Add to the `trace-serialize` example program so that it will print out
the capture connection values.

## Testing
`cargo test` in `rust-runtime/aws-smithy-runtime-api` and
`aws-smithy-runtime`.

Also ran:
```
thedeck@c889f3b04fb0 examples % cargo run --example trace-serialize
    Finished dev [unoptimized + debuginfo] target(s) in 0.13s
     Running `/Users/thedeck/repos/github/declanvk/smithy-rs/target/debug/examples/trace-serialize`
2023-12-06T00:13:15.605555Z  INFO lazy_load_identity: aws_smithy_runtime::client::identity::cache::lazy: identity cache miss occurred; added new identity (took Ok(296µs))
2023-12-06T00:13:15.608344Z  INFO trace_serialize: Response received: response=Response { status: StatusCode(200), headers: Headers { headers: {"content-type": HeaderValue { _private: "application/json" }, "content-length": HeaderValue { _private: "17" }, "date": HeaderValue { _private: "Wed, 06 Dec 2023 00:13:15 GMT" }} }, body: SdkBody { inner: BoxBody, retryable: false }, extensions: Extensions }
2023-12-06T00:13:15.608388Z  INFO trace_serialize: Captured connection info remote_addr=Some(127.0.0.1:13734) local_addr=Some(127.0.0.1:50199)
2023-12-06T00:13:15.608511Z  INFO trace_serialize: Response received POKEMON_SERVICE_URL=http://localhost:13734

 response=GetServerStatisticsOutput { calls_count: 0 }
```

You can see the log line with "Captured connection info" contains the
`remote_addr` and the `local_addr` fields.

## Checklist
- [x] I have updated `CHANGELOG.next.toml` if I made changes to the AWS
SDK, generated SDK code, or SDK runtime crates

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._

Co-authored-by: default avatarDeclan Kelly <thedeck@amazon.com>
parent 8df5ac85
Loading
Loading
Loading
Loading
+6 −0
Original line number Original line Diff line number Diff line
@@ -73,3 +73,9 @@ message = "Fix documentation and examples on HyperConnector and HyperClientBuild
references = ["smithy-rs#3282"]
references = ["smithy-rs#3282"]
meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "client" }
meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "client" }
author = "jdisanti"
author = "jdisanti"

[[smithy-rs]]
message = "Expose local socket address from ConnectionMetadata."
references = ["aws-sdk-rust#990"]
meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "client" }
author = "declanvk"
+14 −1
Original line number Original line Diff line number Diff line
@@ -2,6 +2,7 @@
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0
 * SPDX-License-Identifier: Apache-2.0
 */
 */
use aws_smithy_runtime::client::http::connection_poisoning::CaptureSmithyConnection;
/// This example demonstrates how an interceptor can be written to trace what is being
/// This example demonstrates how an interceptor can be written to trace what is being
/// serialized / deserialized on the wire.
/// serialized / deserialized on the wire.
///
///
@@ -59,7 +60,7 @@ impl Intercept for WireFormatInterceptor {
        &self,
        &self,
        context: &BeforeDeserializationInterceptorContextRef<'_>,
        context: &BeforeDeserializationInterceptorContextRef<'_>,
        _runtime_components: &RuntimeComponents,
        _runtime_components: &RuntimeComponents,
        _cfg: &mut ConfigBag,
        cfg: &mut ConfigBag,
    ) -> Result<(), BoxError> {
    ) -> Result<(), BoxError> {
        // Get the response type from the context.
        // Get the response type from the context.
        let response = context.response();
        let response = context.response();
@@ -70,6 +71,18 @@ impl Intercept for WireFormatInterceptor {
            tracing::error!(?response);
            tracing::error!(?response);
        }
        }


        // Print the connection information
        let captured_connection = cfg.load::<CaptureSmithyConnection>().cloned();
        if let Some(captured_connection) = captured_connection.and_then(|conn| conn.get()) {
            tracing::info!(
                remote_addr = ?captured_connection.remote_addr(),
                local_addr = ?captured_connection.local_addr(),
                "Captured connection info"
            );
        } else {
            tracing::warn!("Connection info is missing!");
        }

        Ok(())
        Ok(())
    }
    }
}
}
+204 −0
Original line number Original line Diff line number Diff line
@@ -14,6 +14,7 @@ use std::sync::Arc;
pub struct ConnectionMetadata {
pub struct ConnectionMetadata {
    is_proxied: bool,
    is_proxied: bool,
    remote_addr: Option<SocketAddr>,
    remote_addr: Option<SocketAddr>,
    local_addr: Option<SocketAddr>,
    poison_fn: Arc<dyn Fn() + Send + Sync>,
    poison_fn: Arc<dyn Fn() + Send + Sync>,
}
}


@@ -25,6 +26,10 @@ impl ConnectionMetadata {
    }
    }


    /// Create a new [`ConnectionMetadata`].
    /// Create a new [`ConnectionMetadata`].
    #[deprecated(
        since = "1.1.0",
        note = "`ConnectionMetadata::new` is deprecated in favour of `ConnectionMetadata::builder`."
    )]
    pub fn new(
    pub fn new(
        is_proxied: bool,
        is_proxied: bool,
        remote_addr: Option<SocketAddr>,
        remote_addr: Option<SocketAddr>,
@@ -33,14 +38,26 @@ impl ConnectionMetadata {
        Self {
        Self {
            is_proxied,
            is_proxied,
            remote_addr,
            remote_addr,
            // need to use builder to set this field
            local_addr: None,
            poison_fn: Arc::new(poison),
            poison_fn: Arc::new(poison),
        }
        }
    }
    }


    /// Builder for this connection metadata
    pub fn builder() -> ConnectionMetadataBuilder {
        ConnectionMetadataBuilder::new()
    }

    /// Get the remote address for this connection, if one is set.
    /// Get the remote address for this connection, if one is set.
    pub fn remote_addr(&self) -> Option<SocketAddr> {
    pub fn remote_addr(&self) -> Option<SocketAddr> {
        self.remote_addr
        self.remote_addr
    }
    }

    /// Get the local address for this connection, if one is set.
    pub fn local_addr(&self) -> Option<SocketAddr> {
        self.local_addr
    }
}
}


impl Debug for ConnectionMetadata {
impl Debug for ConnectionMetadata {
@@ -48,6 +65,193 @@ impl Debug for ConnectionMetadata {
        f.debug_struct("SmithyConnection")
        f.debug_struct("SmithyConnection")
            .field("is_proxied", &self.is_proxied)
            .field("is_proxied", &self.is_proxied)
            .field("remote_addr", &self.remote_addr)
            .field("remote_addr", &self.remote_addr)
            .field("local_addr", &self.local_addr)
            .finish()
    }
}

/// Builder type that is used to construct a [`ConnectionMetadata`] value.
#[derive(Default)]
pub struct ConnectionMetadataBuilder {
    is_proxied: Option<bool>,
    remote_addr: Option<SocketAddr>,
    local_addr: Option<SocketAddr>,
    poison_fn: Option<Arc<dyn Fn() + Send + Sync>>,
}

impl Debug for ConnectionMetadataBuilder {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("ConnectionMetadataBuilder")
            .field("is_proxied", &self.is_proxied)
            .field("remote_addr", &self.remote_addr)
            .field("local_addr", &self.local_addr)
            .finish()
            .finish()
    }
    }
}
}

impl ConnectionMetadataBuilder {
    /// Creates a new builder.
    pub fn new() -> Self {
        Self::default()
    }

    /// Set whether or not the associated connection is to an HTTP proxy.
    pub fn proxied(mut self, proxied: bool) -> Self {
        self.set_proxied(Some(proxied));
        self
    }

    /// Set whether or not the associated connection is to an HTTP proxy.
    pub fn set_proxied(&mut self, proxied: Option<bool>) -> &mut Self {
        self.is_proxied = proxied;
        self
    }

    /// Set the remote address of the connection used.
    pub fn remote_addr(mut self, remote_addr: SocketAddr) -> Self {
        self.set_remote_addr(Some(remote_addr));
        self
    }

    /// Set the remote address of the connection used.
    pub fn set_remote_addr(&mut self, remote_addr: Option<SocketAddr>) -> &mut Self {
        self.remote_addr = remote_addr;
        self
    }

    /// Set the local address of the connection used.
    pub fn local_addr(mut self, local_addr: SocketAddr) -> Self {
        self.set_local_addr(Some(local_addr));
        self
    }

    /// Set the local address of the connection used.
    pub fn set_local_addr(&mut self, local_addr: Option<SocketAddr>) -> &mut Self {
        self.local_addr = local_addr;
        self
    }

    /// Set a closure which will poison the associated connection.
    ///
    /// A poisoned connection will not be reused for subsequent requests by the pool
    pub fn poison_fn(mut self, poison_fn: impl Fn() + Send + Sync + 'static) -> Self {
        self.set_poison_fn(Some(poison_fn));
        self
    }

    /// Set a closure which will poison the associated connection.
    ///
    /// A poisoned connection will not be reused for subsequent requests by the pool
    pub fn set_poison_fn(
        &mut self,
        poison_fn: Option<impl Fn() + Send + Sync + 'static>,
    ) -> &mut Self {
        self.poison_fn =
            poison_fn.map(|poison_fn| Arc::new(poison_fn) as Arc<dyn Fn() + Send + Sync>);
        self
    }

    /// Build a [`ConnectionMetadata`] value.
    ///
    /// # Panics
    ///
    /// If either the `is_proxied` or `poison_fn` has not been set, then this method will panic
    pub fn build(self) -> ConnectionMetadata {
        ConnectionMetadata {
            is_proxied: self
                .is_proxied
                .expect("is_proxied should be set for ConnectionMetadata"),
            remote_addr: self.remote_addr,
            local_addr: self.local_addr,
            poison_fn: self
                .poison_fn
                .expect("poison_fn should be set for ConnectionMetadata"),
        }
    }
}

#[cfg(test)]
mod tests {
    use std::{
        net::{IpAddr, Ipv6Addr},
        sync::Mutex,
    };

    use super::*;

    const TEST_SOCKET_ADDR: SocketAddr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 100);

    #[test]
    #[should_panic]
    fn builder_panic_missing_proxied() {
        ConnectionMetadataBuilder::new()
            .poison_fn(|| {})
            .local_addr(TEST_SOCKET_ADDR)
            .remote_addr(TEST_SOCKET_ADDR)
            .build();
    }

    #[test]
    #[should_panic]
    fn builder_panic_missing_poison_fn() {
        ConnectionMetadataBuilder::new()
            .proxied(true)
            .local_addr(TEST_SOCKET_ADDR)
            .remote_addr(TEST_SOCKET_ADDR)
            .build();
    }

    #[test]
    fn builder_all_fields_successful() {
        let mutable_flag = Arc::new(Mutex::new(false));

        let connection_metadata = ConnectionMetadataBuilder::new()
            .proxied(true)
            .local_addr(TEST_SOCKET_ADDR)
            .remote_addr(TEST_SOCKET_ADDR)
            .poison_fn({
                let mutable_flag = Arc::clone(&mutable_flag);
                move || {
                    let mut guard = mutable_flag.lock().unwrap();
                    *guard = !*guard;
                }
            })
            .build();

        assert_eq!(connection_metadata.is_proxied, true);
        assert_eq!(connection_metadata.remote_addr(), Some(TEST_SOCKET_ADDR));
        assert_eq!(connection_metadata.local_addr(), Some(TEST_SOCKET_ADDR));
        assert_eq!(*mutable_flag.lock().unwrap(), false);
        connection_metadata.poison();
        assert_eq!(*mutable_flag.lock().unwrap(), true);
    }

    #[test]
    fn builder_optional_fields_translate() {
        let metadata1 = ConnectionMetadataBuilder::new()
            .proxied(true)
            .poison_fn(|| {})
            .build();

        assert_eq!(metadata1.local_addr(), None);
        assert_eq!(metadata1.remote_addr(), None);

        let metadata2 = ConnectionMetadataBuilder::new()
            .proxied(true)
            .poison_fn(|| {})
            .local_addr(TEST_SOCKET_ADDR)
            .build();

        assert_eq!(metadata2.local_addr(), Some(TEST_SOCKET_ADDR));
        assert_eq!(metadata2.remote_addr(), None);

        let metadata3 = ConnectionMetadataBuilder::new()
            .proxied(true)
            .poison_fn(|| {})
            .remote_addr(TEST_SOCKET_ADDR)
            .build();

        assert_eq!(metadata3.local_addr(), None);
        assert_eq!(metadata3.remote_addr(), Some(TEST_SOCKET_ADDR));
    }
}
+8 −2
Original line number Original line Diff line number Diff line
@@ -100,7 +100,6 @@ impl Intercept for ConnectionPoisoningInterceptor {
type LoaderFn = dyn Fn() -> Option<ConnectionMetadata> + Send + Sync;
type LoaderFn = dyn Fn() -> Option<ConnectionMetadata> + Send + Sync;


/// State for a middleware that will monitor and manage connections.
/// State for a middleware that will monitor and manage connections.
#[allow(missing_debug_implementations)]
#[derive(Clone, Default)]
#[derive(Clone, Default)]
pub struct CaptureSmithyConnection {
pub struct CaptureSmithyConnection {
    loader: Arc<Mutex<Option<Box<LoaderFn>>>>,
    loader: Arc<Mutex<Option<Box<LoaderFn>>>>,
@@ -154,7 +153,14 @@ mod test {
        let retriever = CaptureSmithyConnection::new();
        let retriever = CaptureSmithyConnection::new();
        let retriever_clone = retriever.clone();
        let retriever_clone = retriever.clone();
        assert!(retriever.get().is_none());
        assert!(retriever.get().is_none());
        retriever.set_connection_retriever(|| Some(ConnectionMetadata::new(true, None, || {})));
        retriever.set_connection_retriever(|| {
            Some(
                ConnectionMetadata::builder()
                    .proxied(true)
                    .poison_fn(|| {})
                    .build(),
            )
        });


        assert!(retriever.get().is_some());
        assert!(retriever.get().is_some());
        assert!(retriever_clone.get().is_some());
        assert!(retriever_clone.get().is_some());
+11 −6
Original line number Original line Diff line number Diff line
@@ -287,14 +287,19 @@ fn extract_smithy_connection(capture_conn: &CaptureConnection) -> Option<Connect
        let mut extensions = Extensions::new();
        let mut extensions = Extensions::new();
        conn.get_extras(&mut extensions);
        conn.get_extras(&mut extensions);
        let http_info = extensions.get::<HttpInfo>();
        let http_info = extensions.get::<HttpInfo>();
        let smithy_connection = ConnectionMetadata::new(
        let mut builder = ConnectionMetadata::builder()
            conn.is_proxied(),
            .proxied(conn.is_proxied())
            http_info.map(|info| info.remote_addr()),
            .poison_fn(move || match capture_conn.connection_metadata().as_ref() {
            move || match capture_conn.connection_metadata().as_ref() {
                Some(conn) => conn.poison(),
                Some(conn) => conn.poison(),
                None => tracing::trace!("no connection existed to poison"),
                None => tracing::trace!("no connection existed to poison"),
            },
            });
        );

        builder
            .set_local_addr(http_info.map(|info| info.local_addr()))
            .set_remote_addr(http_info.map(|info| info.remote_addr()));

        let smithy_connection = builder.build();

        Some(smithy_connection)
        Some(smithy_connection)
    } else {
    } else {
        None
        None