Unverified Commit 5c0e2941 authored by John DiSanti's avatar John DiSanti Committed by GitHub
Browse files

Add full Event Stream marshalling support and working S3 example (#667)

* Add generic error support to event stream unmarshallers

* Validate content-type in event stream unmarshaller

* Implement header marshalling and test

* Introduce SharedPropertyBag

* Send empty end frame

* Add `try_recv_initial()` to Receiver

* Set up Event Stream approval list and shrink default service list

* Add support and example for S3 SelectObjectContent

* Improve Event Stream error ergonomics

* Update changelog

* Fix CI

* Split generic error parsing in `Protocol`

* Incorporate CR feedback

* Make raw message available when failng to unmarshall into event or error
parent dbe0dc0e
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -4,12 +4,14 @@ vNext (Month Day, Year)
- `ProvideRegion` is now asynchronous. Code that called `provider.region()` must be changed to `provider.region().await`.
- `<awsservice>::Config::from_env()` is now also asynchronous because it must load a region
- `<awsservice>::Config::builder()` will **not** load a default region unspecified. A region must be specified directly.
- `Request` and `Response` in `smithy_http::operation` now use `SharedPropertyBag` instead of `Arc<Mutex<PropertyBag>>`. Use the `acquire` and `acquire_mut` methods to get a reference to the underlying `PropertyBag` to access properties. (#667)

**New this week**

- (When complete) Add Event Stream support (#653, #xyz)
- (When complete) Add profile file provider for region (#594, #xyz)
- Improve documentation on collection-aware builders (#664)
- Add support for Transcribe `StartStreamTranscription` and S3 `SelectObjectContent` operations (#667)


v0.21 (August 19th, 2021)
+7 −6
Original line number Diff line number Diff line
@@ -3,6 +3,8 @@
 * SPDX-License-Identifier: Apache-2.0.
 */

use http::{HeaderMap, HeaderValue};

const EXTENDED_REQUEST_ID: &str = "s3_extended_request_id";

pub trait ErrorExt {
@@ -15,13 +17,12 @@ impl ErrorExt for smithy_types::Error {
    }
}

pub fn parse_extended_error<B>(
pub fn parse_extended_error(
    error: smithy_types::Error,
    response: &http::Response<B>,
    headers: &HeaderMap<HeaderValue>,
) -> smithy_types::Error {
    let mut builder = error.into_builder();
    let host_id = response
        .headers()
    let host_id = headers
        .get("x-amz-id-2")
        .and_then(|header_value| header_value.to_str().ok());
    if let Some(host_id) = host_id {
@@ -49,7 +50,7 @@ mod test {
            .request_id("456")
            .build();

        let error = parse_extended_error(error, &resp);
        let error = parse_extended_error(error, resp.headers());
        assert_eq!(
            error
                .extended_request_id()
@@ -66,7 +67,7 @@ mod test {
            .request_id("456")
            .build();

        let error = parse_extended_error(error, &resp);
        let error = parse_extended_error(error, resp.headers());
        assert_eq!(error.extended_request_id(), None);
    }
}
+42 −36
Original line number Diff line number Diff line
@@ -5,49 +5,40 @@

use crate::middleware::Signature;
use aws_auth::Credentials;
use aws_sigv4::event_stream::sign_message;
use aws_sigv4::event_stream::{sign_empty_message, sign_message};
use aws_sigv4::SigningParams;
use aws_types::region::SigningRegion;
use aws_types::SigningService;
use smithy_eventstream::frame::{Message, SignMessage, SignMessageError};
use smithy_http::property_bag::PropertyBag;
use std::sync::{Arc, Mutex, MutexGuard};
use smithy_http::property_bag::{PropertyBag, SharedPropertyBag};
use std::time::SystemTime;

/// Event Stream SigV4 signing implementation.
#[derive(Debug)]
pub struct SigV4Signer {
    properties: Arc<Mutex<PropertyBag>>,
    properties: SharedPropertyBag,
    last_signature: Option<String>,
}

impl SigV4Signer {
    pub fn new(properties: Arc<Mutex<PropertyBag>>) -> Self {
    pub fn new(properties: SharedPropertyBag) -> Self {
        Self {
            properties,
            last_signature: None,
        }
    }
}

impl SignMessage for SigV4Signer {
    fn sign(&mut self, message: Message) -> Result<Message, SignMessageError> {
        let properties = PropertyAccessor(self.properties.lock().unwrap());
        if self.last_signature.is_none() {
            // The Signature property should exist in the property bag for all Event Stream requests.
            self.last_signature = Some(properties.expect::<Signature>().as_ref().into())
        }

    fn signing_params(properties: &PropertyBag) -> SigningParams<()> {
        // Every single one of these values would have been retrieved during the initial request,
        // so we can safely assume they all exist in the property bag at this point.
        let credentials = properties.expect::<Credentials>();
        let region = properties.expect::<SigningRegion>();
        let signing_service = properties.expect::<SigningService>();
        let credentials = properties.get::<Credentials>().unwrap();
        let region = properties.get::<SigningRegion>().unwrap();
        let signing_service = properties.get::<SigningService>().unwrap();
        let time = properties
            .get::<SystemTime>()
            .copied()
            .unwrap_or_else(SystemTime::now);
        let params = SigningParams {
        SigningParams {
            access_key: credentials.access_key_id(),
            secret_key: credentials.secret_access_key(),
            security_token: credentials.session_token(),
@@ -55,28 +46,44 @@ impl SignMessage for SigV4Signer {
            service_name: signing_service.as_ref(),
            date_time: time.into(),
            settings: (),
        };

        let (signed_message, signature) =
            sign_message(&message, self.last_signature.as_ref().unwrap(), &params).into_parts();
        self.last_signature = Some(signature);

        Ok(signed_message)
        }
    }
}

// TODO(EventStream): Make a new type around `Arc<Mutex<PropertyBag>>` called `SharedPropertyBag`
// and abstract the mutex away entirely.
struct PropertyAccessor<'a>(MutexGuard<'a, PropertyBag>);
impl SignMessage for SigV4Signer {
    fn sign(&mut self, message: Message) -> Result<Message, SignMessageError> {
        let properties = self.properties.acquire();
        if self.last_signature.is_none() {
            // The Signature property should exist in the property bag for all Event Stream requests.
            self.last_signature = Some(
                properties
                    .get::<Signature>()
                    .expect("property bag contains initial Signature")
                    .as_ref()
                    .into(),
            )
        }

impl<'a> PropertyAccessor<'a> {
    fn get<T: Send + Sync + 'static>(&self) -> Option<&T> {
        self.0.get::<T>()
        let (signed_message, signature) = {
            let params = Self::signing_params(&properties);
            sign_message(&message, self.last_signature.as_ref().unwrap(), &params).into_parts()
        };
        self.last_signature = Some(signature);
        Ok(signed_message)
    }

    fn expect<T: Send + Sync + 'static>(&self) -> &T {
        self.get::<T>()
            .expect("property should have been inserted into property bag via middleware")
    fn sign_empty(&mut self) -> Result<Message, SignMessageError> {
        let properties = self.properties.acquire();
        if self.last_signature.is_none() {
            // The Signature property should exist in the property bag for all Event Stream requests.
            self.last_signature = Some(properties.get::<Signature>().unwrap().as_ref().into())
        }
        let (signed_message, signature) = {
            let params = Self::signing_params(&properties);
            sign_empty_message(self.last_signature.as_ref().unwrap(), &params).into_parts()
        };
        self.last_signature = Some(signature);
        Ok(signed_message)
    }
}

@@ -90,7 +97,6 @@ mod tests {
    use aws_types::SigningService;
    use smithy_eventstream::frame::{HeaderValue, Message, SignMessage};
    use smithy_http::property_bag::PropertyBag;
    use std::sync::{Arc, Mutex};
    use std::time::{Duration, UNIX_EPOCH};

    #[test]
@@ -104,7 +110,7 @@ mod tests {
        properties.insert(SigningRegion::from(region));
        properties.insert(Signature::new("initial-signature".into()));

        let mut signer = SigV4Signer::new(Arc::new(Mutex::new(properties)));
        let mut signer = SigV4Signer::new(properties.into());
        let mut signatures = Vec::new();
        for _ in 0..5 {
            let signed = signer
+2 −1
Original line number Diff line number Diff line
@@ -9,10 +9,11 @@ description = "AWS SigV4 signer"

[features]
sign-http = ["http", "http-body", "percent-encoding", "form_urlencoded"]
sign-eventstream = ["smithy-eventstream"]
sign-eventstream = ["smithy-eventstream", "bytes"]
default = ["sign-http"]

[dependencies]
bytes = { version = "1", optional = true }
chrono = { version = "0.4", default-features = false, features = ["clock", "std"] }
form_urlencoded = { version = "1.0", optional = true }
hex = "0.4"
+28 −8
Original line number Diff line number Diff line
@@ -8,6 +8,7 @@
use crate::date_fmt::{format_date, format_date_time};
use crate::sign::{calculate_signature, generate_signing_key, sha256_hex_string};
use crate::SigningOutput;
use bytes::Bytes;
use chrono::{DateTime, SubsecRound, Utc};
use smithy_eventstream::frame::{write_headers_to, Header, HeaderValue, Message};
use std::io::Write;
@@ -54,6 +55,26 @@ pub fn sign_message<'a>(
    message: &'a Message,
    last_signature: &'a str,
    params: &'a SigningParams<'a>,
) -> SigningOutput<Message> {
    let message_payload = {
        let mut payload = Vec::new();
        message.write_to(&mut payload).unwrap();
        payload
    };
    sign_payload(Some(message_payload), last_signature, params)
}

pub fn sign_empty_message<'a>(
    last_signature: &'a str,
    params: &'a SigningParams<'a>,
) -> SigningOutput<Message> {
    sign_payload(None, last_signature, params)
}

fn sign_payload<'a>(
    message_payload: Option<Vec<u8>>,
    last_signature: &'a str,
    params: &'a SigningParams<'a>,
) -> SigningOutput<Message> {
    // Truncate the sub-seconds up front since the timestamp written to the signed message header
    // needs to exactly match the string formatted timestamp, which doesn't include sub-seconds.
@@ -65,18 +86,17 @@ pub fn sign_message<'a>(
        params.region,
        params.service_name,
    );
    let message_payload = {
        let mut payload = Vec::new();
        message.write_to(&mut payload).unwrap();
        payload
    };
    let string_to_sign =
        calculate_string_to_sign(&message_payload, last_signature, &date_time, params);
    let string_to_sign = calculate_string_to_sign(
        message_payload.as_ref().map(|v| &v[..]).unwrap_or(&[]),
        last_signature,
        &date_time,
        params,
    );
    let signature = calculate_signature(signing_key, &string_to_sign);

    // Generate the signed wrapper event frame
    SigningOutput::new(
        Message::new(message_payload)
        Message::new(message_payload.map(Bytes::from).unwrap_or_else(Bytes::new))
            .add_header(Header::new(
                ":chunk-signature",
                HeaderValue::ByteArray(hex::decode(&signature).unwrap().into()),
Loading