Commit b324fa66 authored by ysaito1001's avatar ysaito1001
Browse files

Reduce surface area of public API for `ClientRateLimiterBuilder`

parent 26f35a4b
Loading
Loading
Loading
Loading
+0 −34
Original line number Diff line number Diff line
@@ -108,7 +108,6 @@ class RetryPartitionTest {
                        CargoDependency.smithyRuntimeTestUtil(ctx.runtimeConfig).toType()
                            .resolve("test_util::capture_test_logs::capture_test_logs"),
                    "capture_request" to RuntimeType.captureRequest(ctx.runtimeConfig),
                    "ClientRateLimiter" to RuntimeType.smithyRuntime(ctx.runtimeConfig).resolve("client::retries::ClientRateLimiter"),
                    "ConfigBag" to RuntimeType.configBag(ctx.runtimeConfig),
                    "Intercept" to RuntimeType.intercept(ctx.runtimeConfig),
                    "RetryConfig" to RuntimeType.smithyTypes(ctx.runtimeConfig).resolve("retry::RetryConfig"),
@@ -171,39 +170,6 @@ class RetryPartitionTest {
                        *codegenScope,
                    )
                }

                tokioTest("test_custom_client_rate_limiter") {
                    val moduleName = ctx.moduleUseName()
                    rustTemplate(
                        """
                        use $moduleName::{Client, Config};

                        let (_logs, logs_rx) = #{capture_test_logs}();
                        let (http_client, _) = #{capture_request}(None);
                        let bucket_capacity = 0.5; // Should be less than INITIAL_REQUEST_COST (1.0)
                        let client_config = Config::builder()
                            .retry_partition(#{RetryPartition}::custom("test")
                                .client_rate_limiter(
                                    #{ClientRateLimiter}::builder()
                                        .enable_throttling(true)
                                        .current_bucket_capacity(bucket_capacity)
                                        .build()
                                )
                                .build()
                            )
                            .retry_config(#{RetryConfig}::adaptive())
                            .http_client(http_client)
                            .build();

                        let client = Client::from_conf(client_config);
                        let _ = client.some_operation().send().await;

                        let log_contents = logs_rx.contents();
                        assert!(log_contents.contains("client rate limiter delayed a request"));
                        """,
                        *codegenScope,
                    )
                }
            }
        }
    }
+40 −32
Original line number Diff line number Diff line
@@ -42,7 +42,7 @@ const SCALE_CONSTANT: f64 = 0.4;
/// Rate limiter for adaptive retry.
#[derive(Clone, Debug)]
pub struct ClientRateLimiter {
    inner: Arc<Mutex<Inner>>,
    pub(crate) inner: Arc<Mutex<Inner>>,
}

#[derive(Debug)]
@@ -51,7 +51,7 @@ pub(crate) struct Inner {
    fill_rate: f64,
    /// The maximum capacity allowed in the token bucket.
    max_capacity: f64,
    /// The current capacity of the token bucket.
    /// The current capacity of the token bucket. The minimum this can be is 1.0
    current_capacity: f64,
    /// The last time the token bucket was refilled.
    last_timestamp: Option<f64>,
@@ -245,7 +245,7 @@ pub struct ClientRateLimiterBuilder {
    token_refill_rate: Option<f64>,
    ///The maximum capacity allowed in the token bucket.
    maximum_bucket_capacity: Option<f64>,
    ///The current capacity of the token bucket. The minimum this can be is 1.0
    ///The current capacity of the token bucket.
    current_bucket_capacity: Option<f64>,
    ///The last time the token bucket was refilled.
    time_of_last_refill: Option<f64>,
@@ -278,12 +278,16 @@ impl ClientRateLimiterBuilder {
        self.token_refill_rate = token_refill_rate;
        self
    }
    /// The maximum capacity allowed in the token bucket.
    /// The maximum capacity allowed in the token bucket
    ///
    /// The implementation of [`ClientRateLimiter`] guarantees that `current_capacity` never exceeds this value.
    pub fn maximum_bucket_capacity(mut self, maximum_bucket_capacity: f64) -> Self {
        self.set_maximum_bucket_capacity(Some(maximum_bucket_capacity));
        self
    }
    /// The maximum capacity allowed in the token bucket.
    /// The maximum capacity allowed in the token bucket
    ///
    /// The implementation of [`ClientRateLimiter`] guarantees that `current_capacity` never exceeds this value.
    pub fn set_maximum_bucket_capacity(
        &mut self,
        maximum_bucket_capacity: Option<f64>,
@@ -291,12 +295,16 @@ impl ClientRateLimiterBuilder {
        self.maximum_bucket_capacity = maximum_bucket_capacity;
        self
    }
    /// The current capacity of the token bucket. The minimum this can be is 1.0
    /// The current capacity of the token bucket
    ///
    /// The implementation of [`ClientRateLimiter`] guarantees that this value is always at least `1.0` when it's enabled.
    pub fn current_bucket_capacity(mut self, current_bucket_capacity: f64) -> Self {
        self.set_current_bucket_capacity(Some(current_bucket_capacity));
        self
    }
    /// The current capacity of the token bucket. The minimum this can be is 1.0
    /// The current capacity of the token bucket
    ///
    /// The implementation of [`ClientRateLimiter`] guarantees that this value is always at least `1.0` when it's enabled.
    pub fn set_current_bucket_capacity(
        &mut self,
        current_bucket_capacity: Option<f64>,
@@ -304,13 +312,13 @@ impl ClientRateLimiterBuilder {
        self.current_bucket_capacity = current_bucket_capacity;
        self
    }
    /// The last time the token bucket was refilled.
    pub fn time_of_last_refill(mut self, time_of_last_refill: f64) -> Self {
    // The last time the token bucket was refilled.
    fn time_of_last_refill(mut self, time_of_last_refill: f64) -> Self {
        self.set_time_of_last_refill(Some(time_of_last_refill));
        self
    }
    /// The last time the token bucket was refilled.
    pub fn set_time_of_last_refill(&mut self, time_of_last_refill: Option<f64>) -> &mut Self {
    // The last time the token bucket was refilled.
    fn set_time_of_last_refill(&mut self, time_of_last_refill: Option<f64>) -> &mut Self {
        self.time_of_last_refill = time_of_last_refill;
        self
    }
@@ -327,38 +335,38 @@ impl ClientRateLimiterBuilder {
        self.tokens_retrieved_per_second = tokens_retrieved_per_second;
        self
    }
    /// The last half second time bucket used.
    pub fn previous_time_bucket(mut self, previous_time_bucket: f64) -> Self {
    // The last half second time bucket used.
    fn previous_time_bucket(mut self, previous_time_bucket: f64) -> Self {
        self.set_previous_time_bucket(Some(previous_time_bucket));
        self
    }
    /// The last half second time bucket used.
    pub fn set_previous_time_bucket(&mut self, previous_time_bucket: Option<f64>) -> &mut Self {
    // The last half second time bucket used.
    fn set_previous_time_bucket(&mut self, previous_time_bucket: Option<f64>) -> &mut Self {
        self.previous_time_bucket = previous_time_bucket;
        self
    }
    /// The number of requests seen within the current time bucket.
    pub fn request_count(mut self, request_count: u64) -> Self {
    // The number of requests seen within the current time bucket.
    fn request_count(mut self, request_count: u64) -> Self {
        self.set_request_count(Some(request_count));
        self
    }
    /// The number of requests seen within the current time bucket.
    pub fn set_request_count(&mut self, request_count: Option<u64>) -> &mut Self {
    // The number of requests seen within the current time bucket.
    fn set_request_count(&mut self, request_count: Option<u64>) -> &mut Self {
        self.request_count = request_count;
        self
    }
    /// Boolean indicating if the token bucket is enabled. The token bucket is initially disabled. When a throttling error is encountered it is enabled.
    pub fn enable_throttling(mut self, enable_throttling: bool) -> Self {
    // Boolean indicating if the token bucket is enabled. The token bucket is initially disabled. When a throttling error is encountered it is enabled.
    fn enable_throttling(mut self, enable_throttling: bool) -> Self {
        self.set_enable_throttling(Some(enable_throttling));
        self
    }
    /// Boolean indicating if the token bucket is enabled. The token bucket is initially disabled. When a throttling error is encountered it is enabled.
    pub fn set_enable_throttling(&mut self, enable_throttling: Option<bool>) -> &mut Self {
    // Boolean indicating if the token bucket is enabled. The token bucket is initially disabled. When a throttling error is encountered it is enabled.
    fn set_enable_throttling(&mut self, enable_throttling: Option<bool>) -> &mut Self {
        self.enable_throttling = enable_throttling;
        self
    }
    /// The maximum rate when the client was last throttled.
    pub fn tokens_retrieved_per_second_at_time_of_last_throttle(
    // The maximum rate when the client was last throttled.
    fn tokens_retrieved_per_second_at_time_of_last_throttle(
        mut self,
        tokens_retrieved_per_second_at_time_of_last_throttle: f64,
    ) -> Self {
@@ -367,8 +375,8 @@ impl ClientRateLimiterBuilder {
        ));
        self
    }
    /// The maximum rate when the client was last throttled.
    pub fn set_tokens_retrieved_per_second_at_time_of_last_throttle(
    // The maximum rate when the client was last throttled.
    fn set_tokens_retrieved_per_second_at_time_of_last_throttle(
        &mut self,
        tokens_retrieved_per_second_at_time_of_last_throttle: Option<f64>,
    ) -> &mut Self {
@@ -376,13 +384,13 @@ impl ClientRateLimiterBuilder {
            tokens_retrieved_per_second_at_time_of_last_throttle;
        self
    }
    /// The last time when the client was throttled.
    pub fn time_of_last_throttle(mut self, time_of_last_throttle: f64) -> Self {
    // The last time when the client was throttled.
    fn time_of_last_throttle(mut self, time_of_last_throttle: f64) -> Self {
        self.set_time_of_last_throttle(Some(time_of_last_throttle));
        self
    }
    /// The last time when the client was throttled.
    pub fn set_time_of_last_throttle(&mut self, time_of_last_throttle: Option<f64>) -> &mut Self {
    // The last time when the client was throttled.
    fn set_time_of_last_throttle(&mut self, time_of_last_throttle: Option<f64>) -> &mut Self {
        self.time_of_last_throttle = time_of_last_throttle;
        self
    }
@@ -390,7 +398,7 @@ impl ClientRateLimiterBuilder {
    pub fn build(self) -> ClientRateLimiter {
        ClientRateLimiter {
            inner: Arc::new(Mutex::new(Inner {
                fill_rate: self.token_refill_rate.unwrap_or(MIN_FILL_RATE),
                fill_rate: self.token_refill_rate.unwrap_or_default(),
                max_capacity: self.maximum_bucket_capacity.unwrap_or(f64::MAX),
                current_capacity: self.current_bucket_capacity.unwrap_or_default(),
                last_timestamp: self.time_of_last_refill,
+32 −1
Original line number Diff line number Diff line
@@ -417,6 +417,7 @@ mod tests {
    use std::sync::Mutex;
    use std::time::Duration;

    use aws_smithy_async::time::SystemTimeSource;
    use aws_smithy_runtime_api::client::interceptors::context::{
        Input, InterceptorContext, Output,
    };
@@ -434,7 +435,7 @@ mod tests {
    use aws_smithy_types::retry::{ErrorKind, RetryConfig};

    use super::{calculate_exponential_backoff, StandardRetryStrategy};
    use crate::client::retries::TokenBucket;
    use crate::client::retries::{ClientRateLimiter, RetryPartition, TokenBucket};

    #[test]
    fn no_retry_necessary_for_ok_result() {
@@ -548,6 +549,36 @@ mod tests {
        assert_eq!(ShouldAttempt::YesAfterDelay(MAX_BACKOFF), actual);
    }

    #[test]
    fn should_yield_client_rate_limiter_from_custom_partition() {
        let expected = ClientRateLimiter::builder().token_refill_rate(3.14).build();
        let cfg = ConfigBag::of_layers(vec![
            // Emulate default config layer overriden by a user config layer
            {
                let mut layer = Layer::new("default");
                layer.store_put(RetryPartition::new("default"));
                layer
            },
            {
                let mut layer = Layer::new("user");
                layer.store_put(RetryConfig::adaptive());
                layer.store_put(
                    RetryPartition::custom("user")
                        .client_rate_limiter(expected.clone())
                        .build(),
                );
                layer
            },
        ]);
        let rc = RuntimeComponentsBuilder::for_tests()
            .with_time_source(Some(SystemTimeSource::new()))
            .build()
            .unwrap();
        let actual = StandardRetryStrategy::adaptive_retry_rate_limiter(&rc, &cfg)
            .expect("should yield client rate limiter from custom partition");
        assert!(std::sync::Arc::ptr_eq(&expected.inner, &actual.inner));
    }

    #[allow(dead_code)] // will be unused with `--no-default-features --features client`
    #[derive(Debug)]
    struct PresetReasonRetryClassifier {