Unverified Commit d48acae8 authored by John DiSanti's avatar John DiSanti Committed by GitHub
Browse files

Implement an identity cache in aws-smithy-runtime (#3062)

This PR adds a `ResolveCachedIdentity` trait, and adapts the existing
LazyCredentialsCache implementation to it. It does NOT make this cache
used yet, as that will be done as a separate PR to keep code review
smaller.

Notable differences from the credentials cache:
- Supports multiple different identity types and identity resolvers
- Applies cache partitioning to the `SharedIdentityResolver` so that
operation config overrides of the identity resolver will result in the
correct identity being resolved.

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
parent 6dceb8c0
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -464,7 +464,7 @@ mod tests {
                .unwrap();
            let token = identity.data::<Token>().unwrap().clone();
            assert_eq!(value, token.token());
            assert_eq!(time(expires_at), *identity.expiration().unwrap());
            assert_eq!(time(expires_at), identity.expiration().unwrap());
        }

        async fn expect_expired_token_err(&self) {
+1 −1
Original line number Diff line number Diff line
@@ -81,7 +81,7 @@ impl SigV4aSigner {
        request_timestamp: SystemTime,
    ) -> Result<v4a::SigningParams<'a, SigningSettings>, SigV4SigningError> {
        if let Some(expires_in) = settings.expires_in {
            if let Some(&identity_expiration) = identity.expiration() {
            if let Some(identity_expiration) = identity.expiration() {
                let presigned_expires_time = request_timestamp + expires_in;
                if presigned_expires_time > identity_expiration {
                    tracing::warn!(EXPIRATION_WARNING);
+99 −6
Original line number Diff line number Diff line
@@ -11,6 +11,7 @@ use aws_smithy_types::config_bag::ConfigBag;
use std::any::Any;
use std::fmt;
use std::fmt::Debug;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::SystemTime;

@@ -22,6 +23,72 @@ new_type_future! {
    pub struct IdentityFuture<'a, Identity, BoxError>;
}

static NEXT_CACHE_PARTITION: AtomicUsize = AtomicUsize::new(0);

/// Cache partition key for identity caching.
///
/// Identities need cache partitioning because a single identity cache is used across
/// multiple identity providers across multiple auth schemes. In addition, a single auth scheme
/// may have many different identity providers due to operation-level config overrides.
///
/// This partition _must_ be respected when retrieving from the identity cache and _should_
/// be part of the cache key.
///
/// Calling [`IdentityCachePartition::new`] will create a new globally unique cache partition key,
/// and the [`SharedIdentityResolver`] will automatically create and store a partion on construction.
/// Thus, every configured identity resolver will be assigned a unique partition.
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
pub struct IdentityCachePartition(usize);

impl IdentityCachePartition {
    /// Create a new globally unique cache partition key.
    pub fn new() -> Self {
        Self(NEXT_CACHE_PARTITION.fetch_add(1, Ordering::Relaxed))
    }

    /// Helper for unit tests to create an identity cache partition with a known value.
    #[cfg(feature = "test-util")]
    pub fn new_for_tests(value: usize) -> IdentityCachePartition {
        Self(value)
    }
}

/// Caching resolver for identities.
pub trait ResolveCachedIdentity: fmt::Debug + Send + Sync {
    /// Returns a cached identity, or resolves an identity and caches it if its not already cached.
    fn resolve_cached_identity<'a>(
        &'a self,
        resolver: SharedIdentityResolver,
        runtime_components: &'a RuntimeComponents,
        config_bag: &'a ConfigBag,
    ) -> IdentityFuture<'a>;
}

/// Shared identity cache.
#[derive(Clone, Debug)]
pub struct SharedIdentityCache(Arc<dyn ResolveCachedIdentity>);

impl SharedIdentityCache {
    /// Creates a new [`SharedIdentityCache`] from the given cache implementation.
    pub fn new(cache: impl ResolveCachedIdentity + 'static) -> Self {
        Self(Arc::new(cache))
    }
}

impl ResolveCachedIdentity for SharedIdentityCache {
    fn resolve_cached_identity<'a>(
        &'a self,
        resolver: SharedIdentityResolver,
        runtime_components: &'a RuntimeComponents,
        config_bag: &'a ConfigBag,
    ) -> IdentityFuture<'a> {
        self.0
            .resolve_cached_identity(resolver, runtime_components, config_bag)
    }
}

impl_shared_conversions!(convert SharedIdentityCache from ResolveCachedIdentity using SharedIdentityCache::new);

#[deprecated(note = "Renamed to ResolveIdentity.")]
pub use ResolveIdentity as IdentityResolver;

@@ -43,16 +110,42 @@ pub trait ResolveIdentity: Send + Sync + Debug {
        runtime_components: &'a RuntimeComponents,
        config_bag: &'a ConfigBag,
    ) -> IdentityFuture<'a>;

    /// Returns a fallback identity.
    ///
    /// This method should be used as a fallback plan, i.e., when a call to `resolve_identity`
    /// is interrupted by a timeout and its future fails to complete.
    ///
    /// The fallback identity should be set aside and ready to be returned
    /// immediately. Therefore, a new identity should NOT be fetched
    /// within this method, which might cause a long-running operation.
    fn fallback_on_interrupt(&self) -> Option<Identity> {
        None
    }
}

/// Container for a shared identity resolver.
#[derive(Clone, Debug)]
pub struct SharedIdentityResolver(Arc<dyn ResolveIdentity>);
pub struct SharedIdentityResolver {
    inner: Arc<dyn ResolveIdentity>,
    cache_partition: IdentityCachePartition,
}

impl SharedIdentityResolver {
    /// Creates a new [`SharedIdentityResolver`] from the given resolver.
    pub fn new(resolver: impl ResolveIdentity + 'static) -> Self {
        Self(Arc::new(resolver))
        Self {
            inner: Arc::new(resolver),
            cache_partition: IdentityCachePartition::new(),
        }
    }

    /// Returns the globally unique cache partition key for this identity resolver.
    ///
    /// See the [`IdentityCachePartition`] docs for more information on what this is used for
    /// and why.
    pub fn cache_partition(&self) -> IdentityCachePartition {
        self.cache_partition
    }
}

@@ -62,7 +155,7 @@ impl ResolveIdentity for SharedIdentityResolver {
        runtime_components: &'a RuntimeComponents,
        config_bag: &'a ConfigBag,
    ) -> IdentityFuture<'a> {
        self.0.resolve_identity(runtime_components, config_bag)
        self.inner.resolve_identity(runtime_components, config_bag)
    }
}

@@ -136,8 +229,8 @@ impl Identity {
    }

    /// Returns the expiration time for this identity, if any.
    pub fn expiration(&self) -> Option<&SystemTime> {
        self.expiration.as_ref()
    pub fn expiration(&self) -> Option<SystemTime> {
        self.expiration
    }
}

@@ -181,6 +274,6 @@ mod tests {

        assert_eq!("foo", identity.data::<MyIdentityData>().unwrap().first);
        assert_eq!("bar", identity.data::<MyIdentityData>().unwrap().last);
        assert_eq!(Some(&expiration), identity.expiration());
        assert_eq!(Some(expiration), identity.expiration());
    }
}
+1 −1
Original line number Diff line number Diff line
@@ -608,7 +608,7 @@ impl RuntimeComponentsBuilder {
        impl ResolveIdentity for FakeIdentityResolver {
            fn resolve_identity<'a>(
                &'a self,
                _: &RuntimeComponents,
                _: &'a RuntimeComponents,
                _: &'a ConfigBag,
            ) -> IdentityFuture<'a> {
                unreachable!("fake identity resolver must be overridden for this test")
+1 −1
Original line number Diff line number Diff line
@@ -47,7 +47,7 @@ approx = "0.5.1"
aws-smithy-async = { path = "../aws-smithy-async", features = ["rt-tokio", "test-util"] }
aws-smithy-runtime-api = { path = "../aws-smithy-runtime-api", features = ["test-util"] }
aws-smithy-types = { path = "../aws-smithy-types", features = ["test-util"] }
tokio = { version = "1.25", features = ["macros", "rt", "test-util"] }
tokio = { version = "1.25", features = ["macros", "rt", "rt-multi-thread", "test-util"] }
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
tracing-test = "0.2.1"

Loading