Unverified Commit 3b8f69c1 authored by John DiSanti's avatar John DiSanti Committed by GitHub
Browse files

Add incomplete Event Stream support with working Amazon Transcribe example (#653)

* Add incomplete Event Stream support with working Amazon Transcribe example

* Make the raw response in SdkError generic

* Fix XmlBindingTraitSerializerGeneratorTest

* Make the build aware of the SMITHYRS_EXPERIMENTAL_EVENTSTREAM switch

* Fix SigV4SigningCustomizationTest

* Update changelog

* Fix build when SMITHYRS_EXPERIMENTAL_EVENTSTREAM is not set

* Add initial unit test for EventStreamUnmarshallerGenerator

* Add event header unmarshalling support

* Don't pull in event stream dependencies by default

* Only add event stream signer to config for services that need it

* Move event stream inlineables into smithy-eventstream

* Fix some clippy lints

* Transform event stream unions

* Fix crash in SigV4SigningDecorator

* Add test for unmarshalling errors

* Incorporate CR feedback
parent b119782a
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -3,6 +3,7 @@ vNext (Month Day, Year)

**New this week**

- (When complete) Add Event Stream support (#653, #xyz)
- (When complete) Add profile file provider for region (#594, #xyz)

v0.21 (August 19th, 2021)
+5 −0
Original line number Diff line number Diff line
@@ -7,12 +7,17 @@ license = "Apache-2.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
sign-eventstream = ["smithy-eventstream", "aws-sigv4/sign-eventstream"]
default = []

[dependencies]
http = "0.2.2"
aws-sigv4 = { path = "../aws-sigv4" }
aws-auth = { path = "../aws-auth" }
aws-types = { path = "../aws-types" }
smithy-http = { path = "../../../rust-runtime/smithy-http" }
smithy-eventstream = { path = "../../../rust-runtime/smithy-eventstream", optional = true }
# Trying this out as an experiment. thiserror can be removed and replaced with hand written error
# implementations and it is not a breaking change.
thiserror = "1"
+129 −0
Original line number Diff line number Diff line
/*
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0.
 */

use crate::middleware::Signature;
use aws_auth::Credentials;
use aws_sigv4::event_stream::sign_message;
use aws_sigv4::SigningParams;
use aws_types::region::SigningRegion;
use aws_types::SigningService;
use smithy_eventstream::frame::{Message, SignMessage, SignMessageError};
use smithy_http::property_bag::PropertyBag;
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::SystemTime;

/// Event Stream SigV4 signing implementation.
#[derive(Debug)]
pub struct SigV4Signer {
    properties: Arc<Mutex<PropertyBag>>,
    last_signature: Option<String>,
}

impl SigV4Signer {
    pub fn new(properties: Arc<Mutex<PropertyBag>>) -> Self {
        Self {
            properties,
            last_signature: None,
        }
    }
}

impl SignMessage for SigV4Signer {
    fn sign(&mut self, message: Message) -> Result<Message, SignMessageError> {
        let properties = PropertyAccessor(self.properties.lock().unwrap());
        if self.last_signature.is_none() {
            // The Signature property should exist in the property bag for all Event Stream requests.
            self.last_signature = Some(properties.expect::<Signature>().as_ref().into())
        }

        // Every single one of these values would have been retrieved during the initial request,
        // so we can safely assume they all exist in the property bag at this point.
        let credentials = properties.expect::<Credentials>();
        let region = properties.expect::<SigningRegion>();
        let signing_service = properties.expect::<SigningService>();
        let time = properties
            .get::<SystemTime>()
            .copied()
            .unwrap_or_else(SystemTime::now);
        let params = SigningParams {
            access_key: credentials.access_key_id(),
            secret_key: credentials.secret_access_key(),
            security_token: credentials.session_token(),
            region: region.as_ref(),
            service_name: signing_service.as_ref(),
            date_time: time.into(),
            settings: (),
        };

        let (signed_message, signature) =
            sign_message(&message, self.last_signature.as_ref().unwrap(), &params).into_parts();
        self.last_signature = Some(signature);

        Ok(signed_message)
    }
}

// TODO(EventStream): Make a new type around `Arc<Mutex<PropertyBag>>` called `SharedPropertyBag`
// and abstract the mutex away entirely.
struct PropertyAccessor<'a>(MutexGuard<'a, PropertyBag>);

impl<'a> PropertyAccessor<'a> {
    fn get<T: Send + Sync + 'static>(&self) -> Option<&T> {
        self.0.get::<T>()
    }

    fn expect<T: Send + Sync + 'static>(&self) -> &T {
        self.get::<T>()
            .expect("property should have been inserted into property bag via middleware")
    }
}

#[cfg(test)]
mod tests {
    use crate::event_stream::SigV4Signer;
    use crate::middleware::Signature;
    use aws_auth::Credentials;
    use aws_types::region::Region;
    use aws_types::region::SigningRegion;
    use aws_types::SigningService;
    use smithy_eventstream::frame::{HeaderValue, Message, SignMessage};
    use smithy_http::property_bag::PropertyBag;
    use std::sync::{Arc, Mutex};
    use std::time::{Duration, UNIX_EPOCH};

    #[test]
    fn sign_message() {
        let region = Region::new("us-east-1");
        let mut properties = PropertyBag::new();
        properties.insert(region.clone());
        properties.insert(UNIX_EPOCH + Duration::new(1611160427, 0));
        properties.insert(SigningService::from_static("transcribe"));
        properties.insert(Credentials::from_keys("AKIAfoo", "bar", None));
        properties.insert(SigningRegion::from(region));
        properties.insert(Signature::new("initial-signature".into()));

        let mut signer = SigV4Signer::new(Arc::new(Mutex::new(properties)));
        let mut signatures = Vec::new();
        for _ in 0..5 {
            let signed = signer
                .sign(Message::new(&b"identical message"[..]))
                .unwrap();
            if let HeaderValue::ByteArray(signature) = signed
                .headers()
                .iter()
                .find(|h| h.name().as_str() == ":chunk-signature")
                .unwrap()
                .value()
            {
                signatures.push(signature.clone());
            } else {
                panic!("failed to get the :chunk-signature")
            }
        }
        for i in 1..signatures.len() {
            assert_ne!(signatures[i - 1], signatures[i]);
        }
    }
}
+3 −0
Original line number Diff line number Diff line
@@ -7,5 +7,8 @@
//!
//! In the future, additional signature algorithms can be enabled as Cargo Features.

#[cfg(feature = "sign-eventstream")]
pub mod event_stream;

pub mod middleware;
pub mod signer;
+3 −1
Original line number Diff line number Diff line
@@ -24,8 +24,10 @@ impl Signature {
    pub fn new(signature: String) -> Self {
        Self(signature)
    }
}

    pub fn as_str(&self) -> &str {
impl AsRef<str> for Signature {
    fn as_ref(&self) -> &str {
        &self.0
    }
}
Loading