Unverified Commit 78cad9e3 authored by Russell Cohen's avatar Russell Cohen Committed by GitHub
Browse files

Assorted canary Improvements (#1061)

* Assorted canary improvements

1. Add a timeout for individual canaries
2. Add an Ec2 canary.
3. Add functionality to enable/disable canaries based on SDK version

* Refactor page size
parent db5aac4b
Loading
Loading
Loading
Loading
+46 −22
Original line number Diff line number Diff line
@@ -3,7 +3,9 @@
 * SPDX-License-Identifier: Apache-2.0.
 */

use crate::paginator_canary;
use crate::{s3_canary, transcribe_canary};
use aws_sdk_ec2 as ec2;
use aws_sdk_s3 as s3;
use aws_sdk_transcribestreaming as transcribe;
use std::env;
@@ -12,31 +14,41 @@ use std::future::Future;
use std::pin::Pin;
use tracing::{info_span, Instrument};

#[macro_export]
macro_rules! mk_canary {
    ($name: expr, $run_canary: expr) => {
        pub(crate) fn mk_canary(
            clients: &Clients,
            env: &CanaryEnv,
        ) -> Option<(&'static str, crate::canary::CanaryFuture)> {
            Some(($name, Box::pin($run_canary(clients, env))))
        }
    };
}

pub fn get_canaries_to_run(clients: Clients, env: CanaryEnv) -> Vec<(&'static str, CanaryFuture)> {
    vec![
        (
            "s3_canary",
            Box::pin(
                s3_canary::s3_canary(clients.s3, env.s3_bucket_name)
                    .instrument(info_span!("s3_canary")),
            ),
        ),
    let canaries = vec![
        paginator_canary::mk_canary(&clients, &env),
        s3_canary::mk_canary(&clients, &env),
        transcribe_canary::mk_canary(&clients, &env),
    ];

    canaries
        .into_iter()
        .flatten()
        .map(|(name, fut)| {
            (
            "transcribe_canary",
            Box::pin(
                transcribe_canary::transcribe_canary(
                    clients.transcribe,
                    env.expected_transcribe_result,
                name,
                Box::pin(fut.instrument(info_span!("run_canary", name = name))) as _,
            )
                .instrument(info_span!("transcribe_canary")),
            ),
        ),
    ]
        })
        .collect()
}

#[derive(Clone)]
pub struct Clients {
    pub s3: s3::Client,
    pub ec2: ec2::Client,
    pub transcribe: transcribe::Client,
}

@@ -44,6 +56,7 @@ impl Clients {
    pub async fn initialize() -> Self {
        let config = aws_config::load_from_env().await;
        Self {
            ec2: ec2::Client::new(&config),
            s3: s3::Client::new(&config),
            transcribe: transcribe::Client::new(&config),
        }
@@ -51,8 +64,10 @@ impl Clients {
}

pub struct CanaryEnv {
    s3_bucket_name: String,
    expected_transcribe_result: String,
    pub(crate) s3_bucket_name: String,
    pub(crate) expected_transcribe_result: String,
    #[allow(dead_code)]
    pub(crate) page_size: usize,
}

impl fmt::Debug for CanaryEnv {
@@ -77,11 +92,20 @@ impl CanaryEnv {
        // This is an environment variable so that the code doesn't need to be changed if
        // Amazon Transcribe starts returning different output for the same audio.
        let expected_transcribe_result = env::var("CANARY_EXPECTED_TRANSCRIBE_RESULT")
            .expect("CANARY_EXPECTED_TRANSCRIBE_RESULT must be set");
            .unwrap_or_else(|_| {
                "Good day to you transcribe. This is Polly talking to you from the Rust ST K."
                    .to_string()
            });

        let page_size = env::var("PAGE_SIZE")
            .map(|ps| ps.parse::<usize>())
            .unwrap_or_else(|_| Ok(16))
            .expect("invalid page size");

        Self {
            s3_bucket_name,
            expected_transcribe_result,
            page_size,
        }
    }
}
+55 −7
Original line number Diff line number Diff line
@@ -7,23 +7,70 @@ use canary::{get_canaries_to_run, CanaryEnv};
use lambda_runtime::{Context as LambdaContext, Error};
use serde_json::{json, Value};
use std::collections::BTreeMap;
use std::env;
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tracing::info;

/// Conditionally include the module based on the $version feature gate
///
/// When the module is not included, an `mk_canary` function will be generated that returns `None`.
macro_rules! canary_module {
    ($name: ident, since: $version: expr) => {
        #[cfg(feature = $version)]
        mod $name;

        #[cfg(not(feature = $version))]
        mod $name {
            pub(crate) fn mk_canary(
                _clients: &crate::canary::Clients,
                _env: &crate::canary::CanaryEnv,
            ) -> Option<(&'static str, crate::canary::CanaryFuture)> {
                tracing::warn!(concat!(
                    stringify!($name),
                    " is disabled because it is not supported by this version of the SDK."
                ));
                None
            }
        }
    };
}

mod canary;

mod s3_canary;
canary_module!(paginator_canary, since: "v0.4.1");
mod transcribe_canary;

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt::init();

    let local = env::args().any(|arg| arg == "--local");
    let main_handler = LambdaMain::new().await;
    if local {
        let result = lambda_main(main_handler.clients).await?;
        if result
            .as_object()
            .expect("is object")
            .get_key_value("result")
            .expect("exists")
            .1
            .as_str()
            .expect("is str")
            == "success"
        {
            Ok(())
        } else {
            Err(format!("canary failed: {:?}", result).into())
        }
    } else {
        lambda_runtime::run(main_handler).await?;
        Ok(())
    }
}

// Enables us to keep the clients alive between successive Lambda executions.
// Not because we need to for this use-case, but to demonstrate how to.
@@ -77,11 +124,12 @@ async fn lambda_main(clients: canary::Clients) -> Result<Value, Error> {
}

async fn canary_result(handle: JoinHandle<anyhow::Result<()>>) -> Result<(), String> {
    match handle.await {
        Ok(result) => match result {
    match timeout(Duration::from_secs(20), handle).await {
        Err(_timeout) => Err(format!("canary timed out")),
        Ok(Ok(result)) => match result {
            Ok(_) => Ok(()),
            Err(err) => Err(err.to_string()),
            Err(err) => Err(format!("{:?}", err)),
        },
        Err(err) => Err(err.to_string()),
        Ok(Err(err)) => Err(err.to_string()),
    }
}
+59 −0
Original line number Diff line number Diff line
/*
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0.
 */

use crate::canary::Clients;

use crate::mk_canary;
use anyhow::{bail, Context};

use aws_sdk_ec2 as ec2;
use aws_sdk_ec2::model::InstanceType;
use std::env;

use crate::CanaryEnv;
use tokio_stream::StreamExt;

mk_canary!("ec2_paginator", |clients: &Clients, env: &CanaryEnv| {
    paginator_canary(clients.ec2.clone(), env.page_size)
});

pub async fn paginator_canary(client: ec2::Client, page_size: usize) -> anyhow::Result<()> {
    let mut history = client
        .describe_spot_price_history()
        .instance_types(InstanceType::M1Medium)
        .into_paginator()
        .page_size(page_size as i32)
        .send();

    let mut num_pages = 0;
    while let Some(page) = history.try_next().await? {
        let items_in_page = page.spot_price_history.unwrap_or_default().len();
        if items_in_page > page_size as usize {
            bail!(
                "failed to retrieve results of correct page size (expected {}, got {})",
                page_size,
                items_in_page
            )
        }
        num_pages += 1;
    }
    if dbg!(num_pages) < 2 {
        bail!("should be ~60 of pages of results")
    }

    Ok(())
}

#[cfg(test)]
mod test {
    use crate::paginator_canary::paginator_canary;

    #[tokio::test]
    async fn test_paginator() {
        let conf = aws_config::load_from_env().await;
        let client = aws_sdk_ec2::Client::new(&conf);
        paginator_canary(client).await.unwrap()
    }
}
+7 −1
Original line number Diff line number Diff line
@@ -3,13 +3,19 @@
 * SPDX-License-Identifier: Apache-2.0.
 */

use crate::canary::CanaryError;
use crate::canary::{CanaryError, Clients};
use crate::{mk_canary, CanaryEnv};
use anyhow::Context;
use aws_sdk_s3 as s3;
use uuid::Uuid;

const METADATA_TEST_VALUE: &str = "some   value";

mk_canary!("s3", |clients: &Clients, env: &CanaryEnv| s3_canary(
    clients.s3.clone(),
    env.s3_bucket_name.clone()
));

pub async fn s3_canary(client: s3::Client, s3_bucket_name: String) -> anyhow::Result<()> {
    use s3::{error::GetObjectError, error::GetObjectErrorKind, SdkError};
    let test_key = Uuid::new_v4().as_u128().to_string();
+9 −0
Original line number Diff line number Diff line
@@ -4,6 +4,7 @@
 */

use crate::canary::CanaryError;
use crate::mk_canary;
use async_stream::stream;
use aws_sdk_transcribestreaming as transcribe;
use bytes::BufMut;
@@ -13,6 +14,14 @@ use transcribe::model::{
use transcribe::Blob;

const CHUNK_SIZE: usize = 8192;
use crate::canary::{CanaryEnv, Clients};

mk_canary!("transcribe_canary", |client: &Clients, env: &CanaryEnv| {
    transcribe_canary(
        client.transcribe.clone(),
        env.expected_transcribe_result.clone(),
    )
});

pub async fn transcribe_canary(
    client: transcribe::Client,
Loading