Unverified Commit d5a315fa authored by Zelda Hessler's avatar Zelda Hessler Committed by GitHub
Browse files

Adaptive Retries 1/2 (rate limiter) (#2773)

## Motivation and Context
<!--- Why is this change required? What problem does it solve? -->
<!--- If it fixes an open issue, please link to the issue here -->
#2190

## Description
<!--- Describe your changes in detail -->
this PR adds the rate limiter necessary for orchestrator support of
adaptive retries. I'll integrate the rate limiter with the retry policy
as part of a separate PR following this one. I wanted to keep these
small and easy to review.

This implementation is based on the Go v2 SDK's implementation.

## Testing
<!--- Please describe in detail how you tested your changes -->
<!--- Include details of your testing environment, and the tests you ran
to -->
<!--- see how your change affects other areas of the code, etc. -->
tests are included
----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
parent 934129cf
Loading
Loading
Loading
Loading
+45 −3
Original line number Diff line number Diff line
@@ -69,6 +69,40 @@ impl ControlledSleep {
    }
}

/// A sleep implementation where calls to [`AsyncSleep::sleep`] will complete instantly.
///
/// Create a [`InstantSleep`] with [`instant_time_and_sleep`]
#[derive(Debug, Clone)]
pub struct InstantSleep {
    log: Arc<Mutex<Vec<Duration>>>,
}

impl AsyncSleep for InstantSleep {
    fn sleep(&self, duration: Duration) -> Sleep {
        let log = self.log.clone();
        Sleep::new(async move {
            log.lock().unwrap().push(duration);
        })
    }
}

impl InstantSleep {
    /// Given a shared log for sleep durations, create a new `InstantSleep`.
    pub fn new(log: Arc<Mutex<Vec<Duration>>>) -> Self {
        Self { log }
    }

    /// Return the sleep durations that were logged by this `InstantSleep`.
    pub fn logs(&self) -> Vec<Duration> {
        self.log.lock().unwrap().iter().cloned().collect()
    }

    /// Return the total sleep duration that was logged by this `InstantSleep`.
    pub fn total_duration(&self) -> Duration {
        self.log.lock().unwrap().iter().sum()
    }
}

/// Guard returned from [`SleepGate::expect_sleep`]
///
/// # Examples
@@ -112,9 +146,9 @@ impl CapturedSleep<'_> {
    /// ```rust
    /// use std::time::Duration;
    /// use aws_smithy_async::rt::sleep::AsyncSleep;
    /// fn do_something(sleep: &dyn AsyncSleep) {
    /// async fn do_something(sleep: &dyn AsyncSleep) {
    ///   println!("before sleep");
    ///   sleep.sleep(Duration::from_secs(1));
    ///   sleep.sleep(Duration::from_secs(1)).await;
    ///   println!("after sleep");
    /// }
    /// ```
@@ -194,6 +228,14 @@ pub fn controlled_time_and_sleep(
    (ManualTimeSource { start_time, log }, sleep, gate)
}

/// Returns a duo of tools to test interactions with time. Sleeps will end instantly, but the
/// desired length of the sleeps will be recorded for later verification.
pub fn instant_time_and_sleep(start_time: SystemTime) -> (ManualTimeSource, InstantSleep) {
    let log = Arc::new(Mutex::new(vec![]));
    let sleep = InstantSleep::new(log.clone());
    (ManualTimeSource { start_time, log }, sleep)
}

impl TimeSource for SystemTime {
    fn now(&self) -> SystemTime {
        *self
@@ -249,7 +291,7 @@ mod test {

        let guard = gate.expect_sleep().await;
        assert_eq!(progress.load(Ordering::Acquire), 2);
        assert_eq!(task.is_finished(), false);
        assert!(!task.is_finished(), "task should not be finished");
        guard.allow_progress();
        timeout(Duration::from_secs(1), task)
            .await
+2 −0
Original line number Diff line number Diff line
@@ -15,3 +15,5 @@

/// Smithy runtime for client orchestration.
pub mod client;

pub mod macros;
+165 −0
Original line number Diff line number Diff line
/*
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0
 */

//! Various utility macros to aid runtime crate writers.

/// Define a new builder struct, along with a method to create it, and setters.
///
/// ## Examples
///
/// The builder macro takes a list of field definitions, each with four elements:
/// ```txt
/// set_optional_field, optional_field,   String,     "An optional field which may or may not be set when `.build()` is called.",
/// ^ The setter name,  ^ The field name, ^ The type, ^ The documentation for the field and setter methods.
/// ```
///
/// The following example creates a new builder struct, along with a method to create it, and setters
/// for a struct `MyConfig` with three fields:
///
/// ```
/// use std::collections::HashMap;
/// use std::sync::{Arc, Mutex};
/// use aws_smithy_runtime_api::{builder, builder_methods, builder_struct};
///
/// struct MyConfig {
///     optional_field: Option<String>,
///     optional_field_with_a_default: f64,
///     required_field_with_no_default: Arc<Mutex<HashMap<String, String>>>,
/// }
///
/// impl MyConfig {
///     pub fn builder() -> Builder {
///         Builder::new()
///     }
/// }
///
/// builder!(
///     set_optional_field, optional_field, String, "An optional field which may or may not be set when `.build()` is called.",
///     set_optional_field_with_a_default, optional_field_with_a_default, f64, "An optional field that will default to `f64::MAX` if it's unset when `.build()` is called.",
///     set_required_field_with_no_default, required_field_with_no_default, HashMap<String, String>, "A required field that will cause the builder to panic if it's unset when `.build()` is called."
/// );
///
/// impl Builder {
///     fn build(self) -> MyConfig {
///         MyConfig {
///             optional_field: self.optional_field,
///             optional_field_with_a_default: self.optional_field_with_a_default.unwrap_or(f64::MAX),
///             required_field_with_no_default: Arc::new(Mutex::new(
///                 self.required_field_with_no_default.expect("'required_field_with_no_default' is required")
///             )),
///        }
///    }
/// }
/// ```
///
/// In this example, the result of macro expansion would look like this:
///
/// ```
/// # use std::collections::HashMap;
/// # use std::sync::{Arc, Mutex};
/// #[derive(Clone, Debug, Default)]
/// pub struct Builder {
///     #[doc = "An optional field which may or may not be set when `.build()` is called."]
///     optional_field: Option<String>,
///     #[doc = "An optional field that will default to `f64::MAX` if it's unset when `.build()` is called."]
///     optional_field_with_a_default: Option<f64>,
///     #[doc = "A required field that will cause the builder to panic if it's unset when `.build()` is called."]
///     required_field_with_no_default: Option<HashMap<String, String>>,
/// }
///
/// impl Builder {
///     pub fn new() -> Self {
///         Builder::default()
///     }
///
///     #[doc = "An optional field which may or may not be set when `.build()` is called."]
///     pub fn set_optional_field(&mut self, optional_field: Option<String>) -> &mut Self {
///         self.optional_field = optional_field;
///         self
///     }
///
///     #[doc = "An optional field which may or may not be set when `.build()` is called."]
///     pub fn optional_field(mut self, optional_field: String) -> Self {
///         self.optional_field = Some(optional_field);
///         self
///     }
///
///     #[doc = "An optional field that will default to `f64::MAX` if it's unset when `.build()` is called."]
///     pub fn set_optional_field_with_a_default(&mut self, optional_field_with_a_default: Option<f64>) -> &mut Self {
///         self.optional_field_with_a_default = optional_field_with_a_default;
///         self
///     }
///
///     #[doc = "An optional field that will default to `f64::MAX` if it's unset when `.build()` is called."]
///     pub fn optional_field_with_a_default(mut self, optional_field_with_a_default: f64) -> Self {
///         self.optional_field_with_a_default = Some(optional_field_with_a_default);
///         self
///     }
///
///     #[doc = "A required field that will cause the builder to panic if it's unset when `.build()` is called."]
///     pub fn set_required_field_with_no_default(&mut self, required_field_with_no_default: Option<HashMap<String, String>>) -> &mut Self {
///         self.required_field_with_no_default = required_field_with_no_default;
///         self
///     }
///
///     #[doc = "A required field that will cause the builder to panic if it's unset when `.build()` is called."]
///     pub fn required_field_with_no_default(mut self, required_field_with_no_default: HashMap<String, String>) -> Self {
///         self.required_field_with_no_default = Some(required_field_with_no_default);
///         self
///     }
/// }
/// ```
#[macro_export]
macro_rules! builder {
    ($($tt:tt)+) => {
        builder_struct!($($tt)+);

        impl Builder {
            pub fn new() -> Self {
                Builder::default()
            }

            builder_methods!($($tt)+);
        }
    }
}

/// Define a new builder struct, its fields, and their docs. This macro is intended to be called
/// by the `builder!` macro and should not be called directly.
#[macro_export]
macro_rules! builder_struct {
    ($($_setter_name:ident, $field_name:ident, $ty:ty, $doc:literal $(,)?)+) => {
        #[derive(Clone, Debug, Default)]
        pub struct Builder {
            $(
            #[doc = $doc]
            $field_name: Option<$ty>,
            )+
        }
    }
}

/// Define setter methods for a builder struct. Must be called from within an `impl` block. This
/// macro is intended to be called by the `builder!` macro and should not be called directly.
#[macro_export]
macro_rules! builder_methods {
    ($fn_name:ident, $arg_name:ident, $ty:ty, $doc:literal, $($tail:tt)+) => {
        builder_methods!($fn_name, $arg_name, $ty, $doc);
        builder_methods!($($tail)+);
    };
    ($fn_name:ident, $arg_name:ident, $ty:ty, $doc:literal) => {
        #[doc = $doc]
        pub fn $fn_name(&mut self, $arg_name: Option<$ty>) -> &mut Self {
            self.$arg_name = $arg_name;
            self
        }

        #[doc = $doc]
        pub fn $arg_name(mut self, $arg_name: $ty) -> Self {
            self.$arg_name = Some($arg_name);
            self
        }
    };
}
+1 −0
Original line number Diff line number Diff line
@@ -31,6 +31,7 @@ tracing = "0.1.37"
fastrand = "1.4"

[dev-dependencies]
approx = "0.5.1"
aws-smithy-async = { path = "../aws-smithy-async", features = ["rt-tokio", "test-util"] }
aws-smithy-runtime-api = { path = "../aws-smithy-runtime-api", features = ["test-util"] }
aws-smithy-types = { path = "../aws-smithy-types", features = ["test-util"] }
+6 −0
Original line number Diff line number Diff line
@@ -5,3 +5,9 @@

pub mod classifier;
pub mod strategy;

mod client_rate_limiter;
mod token_bucket;

pub use client_rate_limiter::ClientRateLimiterRuntimePlugin;
pub use token_bucket::TokenBucketRuntimePlugin;
Loading