From 78cad9e371835981ca77b11f9b4db5f2f8947886 Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Tue, 11 Jan 2022 15:05:35 -0500 Subject: [PATCH] Assorted canary Improvements (#1061) * Assorted canary improvements 1. Add a timeout for individual canaries 2. Add an Ec2 canary. 3. Add functionality to enable/disable canaries based on SDK version * Refactor page size --- tools/ci-cdk/canary-lambda/src/canary.rs | 68 +++++++++++++------ tools/ci-cdk/canary-lambda/src/main.rs | 62 +++++++++++++++-- .../canary-lambda/src/paginator_canary.rs | 59 ++++++++++++++++ tools/ci-cdk/canary-lambda/src/s3_canary.rs | 8 ++- .../canary-lambda/src/transcribe_canary.rs | 9 +++ .../ci-cdk/canary-lambda/write-cargo-toml.py | 17 ++++- 6 files changed, 192 insertions(+), 31 deletions(-) create mode 100644 tools/ci-cdk/canary-lambda/src/paginator_canary.rs diff --git a/tools/ci-cdk/canary-lambda/src/canary.rs b/tools/ci-cdk/canary-lambda/src/canary.rs index bbabd4f9f..02c3e8d4f 100644 --- a/tools/ci-cdk/canary-lambda/src/canary.rs +++ b/tools/ci-cdk/canary-lambda/src/canary.rs @@ -3,7 +3,9 @@ * SPDX-License-Identifier: Apache-2.0. */ +use crate::paginator_canary; use crate::{s3_canary, transcribe_canary}; +use aws_sdk_ec2 as ec2; use aws_sdk_s3 as s3; use aws_sdk_transcribestreaming as transcribe; use std::env; @@ -12,31 +14,41 @@ use std::future::Future; use std::pin::Pin; use tracing::{info_span, Instrument}; +#[macro_export] +macro_rules! mk_canary { + ($name: expr, $run_canary: expr) => { + pub(crate) fn mk_canary( + clients: &Clients, + env: &CanaryEnv, + ) -> Option<(&'static str, crate::canary::CanaryFuture)> { + Some(($name, Box::pin($run_canary(clients, env)))) + } + }; +} + pub fn get_canaries_to_run(clients: Clients, env: CanaryEnv) -> Vec<(&'static str, CanaryFuture)> { - vec![ - ( - "s3_canary", - Box::pin( - s3_canary::s3_canary(clients.s3, env.s3_bucket_name) - .instrument(info_span!("s3_canary")), - ), - ), - ( - "transcribe_canary", - Box::pin( - transcribe_canary::transcribe_canary( - clients.transcribe, - env.expected_transcribe_result, - ) - .instrument(info_span!("transcribe_canary")), - ), - ), - ] + let canaries = vec![ + paginator_canary::mk_canary(&clients, &env), + s3_canary::mk_canary(&clients, &env), + transcribe_canary::mk_canary(&clients, &env), + ]; + + canaries + .into_iter() + .flatten() + .map(|(name, fut)| { + ( + name, + Box::pin(fut.instrument(info_span!("run_canary", name = name))) as _, + ) + }) + .collect() } #[derive(Clone)] pub struct Clients { pub s3: s3::Client, + pub ec2: ec2::Client, pub transcribe: transcribe::Client, } @@ -44,6 +56,7 @@ impl Clients { pub async fn initialize() -> Self { let config = aws_config::load_from_env().await; Self { + ec2: ec2::Client::new(&config), s3: s3::Client::new(&config), transcribe: transcribe::Client::new(&config), } @@ -51,8 +64,10 @@ impl Clients { } pub struct CanaryEnv { - s3_bucket_name: String, - expected_transcribe_result: String, + pub(crate) s3_bucket_name: String, + pub(crate) expected_transcribe_result: String, + #[allow(dead_code)] + pub(crate) page_size: usize, } impl fmt::Debug for CanaryEnv { @@ -77,11 +92,20 @@ impl CanaryEnv { // This is an environment variable so that the code doesn't need to be changed if // Amazon Transcribe starts returning different output for the same audio. let expected_transcribe_result = env::var("CANARY_EXPECTED_TRANSCRIBE_RESULT") - .expect("CANARY_EXPECTED_TRANSCRIBE_RESULT must be set"); + .unwrap_or_else(|_| { + "Good day to you transcribe. This is Polly talking to you from the Rust ST K." + .to_string() + }); + + let page_size = env::var("PAGE_SIZE") + .map(|ps| ps.parse::()) + .unwrap_or_else(|_| Ok(16)) + .expect("invalid page size"); Self { s3_bucket_name, expected_transcribe_result, + page_size, } } } diff --git a/tools/ci-cdk/canary-lambda/src/main.rs b/tools/ci-cdk/canary-lambda/src/main.rs index f22b36107..4381e654e 100644 --- a/tools/ci-cdk/canary-lambda/src/main.rs +++ b/tools/ci-cdk/canary-lambda/src/main.rs @@ -7,22 +7,69 @@ use canary::{get_canaries_to_run, CanaryEnv}; use lambda_runtime::{Context as LambdaContext, Error}; use serde_json::{json, Value}; use std::collections::BTreeMap; +use std::env; use std::future::Future; use std::pin::Pin; +use std::time::Duration; use tokio::task::JoinHandle; +use tokio::time::timeout; use tracing::info; +/// Conditionally include the module based on the $version feature gate +/// +/// When the module is not included, an `mk_canary` function will be generated that returns `None`. +macro_rules! canary_module { + ($name: ident, since: $version: expr) => { + #[cfg(feature = $version)] + mod $name; + + #[cfg(not(feature = $version))] + mod $name { + pub(crate) fn mk_canary( + _clients: &crate::canary::Clients, + _env: &crate::canary::CanaryEnv, + ) -> Option<(&'static str, crate::canary::CanaryFuture)> { + tracing::warn!(concat!( + stringify!($name), + " is disabled because it is not supported by this version of the SDK." + )); + None + } + } + }; +} + mod canary; + mod s3_canary; +canary_module!(paginator_canary, since: "v0.4.1"); mod transcribe_canary; #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt::init(); - + let local = env::args().any(|arg| arg == "--local"); let main_handler = LambdaMain::new().await; - lambda_runtime::run(main_handler).await?; - Ok(()) + if local { + let result = lambda_main(main_handler.clients).await?; + if result + .as_object() + .expect("is object") + .get_key_value("result") + .expect("exists") + .1 + .as_str() + .expect("is str") + == "success" + { + Ok(()) + } else { + Err(format!("canary failed: {:?}", result).into()) + } + } else { + lambda_runtime::run(main_handler).await?; + Ok(()) + } } // Enables us to keep the clients alive between successive Lambda executions. @@ -77,11 +124,12 @@ async fn lambda_main(clients: canary::Clients) -> Result { } async fn canary_result(handle: JoinHandle>) -> Result<(), String> { - match handle.await { - Ok(result) => match result { + match timeout(Duration::from_secs(20), handle).await { + Err(_timeout) => Err(format!("canary timed out")), + Ok(Ok(result)) => match result { Ok(_) => Ok(()), - Err(err) => Err(err.to_string()), + Err(err) => Err(format!("{:?}", err)), }, - Err(err) => Err(err.to_string()), + Ok(Err(err)) => Err(err.to_string()), } } diff --git a/tools/ci-cdk/canary-lambda/src/paginator_canary.rs b/tools/ci-cdk/canary-lambda/src/paginator_canary.rs new file mode 100644 index 000000000..046763434 --- /dev/null +++ b/tools/ci-cdk/canary-lambda/src/paginator_canary.rs @@ -0,0 +1,59 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +use crate::canary::Clients; + +use crate::mk_canary; +use anyhow::{bail, Context}; + +use aws_sdk_ec2 as ec2; +use aws_sdk_ec2::model::InstanceType; +use std::env; + +use crate::CanaryEnv; +use tokio_stream::StreamExt; + +mk_canary!("ec2_paginator", |clients: &Clients, env: &CanaryEnv| { + paginator_canary(clients.ec2.clone(), env.page_size) +}); + +pub async fn paginator_canary(client: ec2::Client, page_size: usize) -> anyhow::Result<()> { + let mut history = client + .describe_spot_price_history() + .instance_types(InstanceType::M1Medium) + .into_paginator() + .page_size(page_size as i32) + .send(); + + let mut num_pages = 0; + while let Some(page) = history.try_next().await? { + let items_in_page = page.spot_price_history.unwrap_or_default().len(); + if items_in_page > page_size as usize { + bail!( + "failed to retrieve results of correct page size (expected {}, got {})", + page_size, + items_in_page + ) + } + num_pages += 1; + } + if dbg!(num_pages) < 2 { + bail!("should be ~60 of pages of results") + } + + Ok(()) +} + +#[cfg(test)] +mod test { + use crate::paginator_canary::paginator_canary; + + #[tokio::test] + async fn test_paginator() { + let conf = aws_config::load_from_env().await; + let client = aws_sdk_ec2::Client::new(&conf); + paginator_canary(client).await.unwrap() + } +} diff --git a/tools/ci-cdk/canary-lambda/src/s3_canary.rs b/tools/ci-cdk/canary-lambda/src/s3_canary.rs index 8f9185461..499be8d09 100644 --- a/tools/ci-cdk/canary-lambda/src/s3_canary.rs +++ b/tools/ci-cdk/canary-lambda/src/s3_canary.rs @@ -3,13 +3,19 @@ * SPDX-License-Identifier: Apache-2.0. */ -use crate::canary::CanaryError; +use crate::canary::{CanaryError, Clients}; +use crate::{mk_canary, CanaryEnv}; use anyhow::Context; use aws_sdk_s3 as s3; use uuid::Uuid; const METADATA_TEST_VALUE: &str = "some value"; +mk_canary!("s3", |clients: &Clients, env: &CanaryEnv| s3_canary( + clients.s3.clone(), + env.s3_bucket_name.clone() +)); + pub async fn s3_canary(client: s3::Client, s3_bucket_name: String) -> anyhow::Result<()> { use s3::{error::GetObjectError, error::GetObjectErrorKind, SdkError}; let test_key = Uuid::new_v4().as_u128().to_string(); diff --git a/tools/ci-cdk/canary-lambda/src/transcribe_canary.rs b/tools/ci-cdk/canary-lambda/src/transcribe_canary.rs index f1f0b93d2..3caec39ff 100644 --- a/tools/ci-cdk/canary-lambda/src/transcribe_canary.rs +++ b/tools/ci-cdk/canary-lambda/src/transcribe_canary.rs @@ -4,6 +4,7 @@ */ use crate::canary::CanaryError; +use crate::mk_canary; use async_stream::stream; use aws_sdk_transcribestreaming as transcribe; use bytes::BufMut; @@ -13,6 +14,14 @@ use transcribe::model::{ use transcribe::Blob; const CHUNK_SIZE: usize = 8192; +use crate::canary::{CanaryEnv, Clients}; + +mk_canary!("transcribe_canary", |client: &Clients, env: &CanaryEnv| { + transcribe_canary( + client.transcribe.clone(), + env.expected_transcribe_result.clone(), + ) +}); pub async fn transcribe_canary( client: transcribe::Client, diff --git a/tools/ci-cdk/canary-lambda/write-cargo-toml.py b/tools/ci-cdk/canary-lambda/write-cargo-toml.py index 662cf107d..a5da52e00 100755 --- a/tools/ci-cdk/canary-lambda/write-cargo-toml.py +++ b/tools/ci-cdk/canary-lambda/write-cargo-toml.py @@ -32,6 +32,7 @@ anyhow = "1" async-stream = "0.3" bytes = "1" hound = "3.4" +async-trait = "0.1" lambda_runtime = "0.4" serde_json = "1" thiserror = "1" @@ -39,8 +40,13 @@ tokio = { version = "1", features = ["full"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["fmt"] } uuid = { version = "0.8", features = ["v4"] } +tokio-stream = "0" """ +notable_versions = [ + # first version to add support for paginators + "0.4.1" +] def main(): args = Args() @@ -51,9 +57,18 @@ def main(): args.path, args.sdk_version), file=file) print(format_dependency("aws-sdk-s3", args.path, args.sdk_version), file=file) + print(format_dependency("aws-sdk-ec2", + args.path, args.sdk_version), file=file) print(format_dependency("aws-sdk-transcribestreaming", args.path, args.sdk_version), file=file) - + print("[features]", file=file) + for version in notable_versions: + print(f'"v{version}" = []', file=file) + enabled = ', '.join(enabled_versions(args.sdk_version)) + print(f'default = [{enabled}]', file=file) + +def enabled_versions(sdk_version): + return [f'"v{version}"' for version in notable_versions if version.split('.') <= sdk_version.split('.')] def format_dependency(crate, path, version): if path is None: -- GitLab