Unverified Commit d42727e3 authored by Caymon Sullivan's avatar Caymon Sullivan Committed by GitHub
Browse files

feat(aws-smithy-types): Impl http-body-1.0 Body for SdkBody (#3380)

## Motivation and Context
A step towards moving to http-1.0
https://github.com/awslabs/aws-sdk-rust/issues/1046



(Russell): This is a minimal implementation of `http-body = 1`. It isn't
maximally efficient since even if we were given a 1x body, we convert it
back and forth first. This is a first step.

## Description
Implements http-body-1.0 Body trait for SdkBody.

## Testing
Regular CI

## Checklist
<!--- If a checkbox below is not applicable, then please DELETE it
rather than leaving it unchecked -->
- [x] I have updated `CHANGELOG.next.toml` if I made changes to the
smithy-rs codegen or runtime crates
- [x] I have updated `CHANGELOG.next.toml` if I made changes to the AWS
SDK, generated SDK code, or SDK runtime crates

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._

---------

Co-authored-by: default avatarRussell Cohen <rcoh@amazon.com>
Co-authored-by: default avatarRussell Cohen <russell.r.cohen@gmail.com>
Co-authored-by: default avatarJohn DiSanti <john@vinylsquid.com>
Co-authored-by: default avatarJohn DiSanti <jdisanti@amazon.com>
parent d95cc864
Loading
Loading
Loading
Loading
+7 −1
Original line number Diff line number Diff line
@@ -17,7 +17,7 @@ references = ["smithy-rs#3416"]
meta = { "breaking" = false, "bug" = false, "tada" = true }
author = "jackkleeman"

[[aws-sdk-rust]]
[[smithy-rs]]
message = "Added aws-smithy-wasm crate to enable SDK use in WASI compliant environments"
references = ["smithy-rs#2087", "smithy-rs#2520", "smithy-rs#3409", "aws-sdk-rust#59"]
meta = { "breaking" = false, "tada" = true, "bug" = false }
@@ -28,3 +28,9 @@ message = "Added aws-smithy-wasm crate to enable SDK use in WASI compliant envir
references = ["smithy-rs#2087", "smithy-rs#2520", "smithy-rs#3409"]
meta = { "breaking" = false, "tada" = true, "bug" = false, "target" = "client"}
author = "landonxjames"

[[smithy-rs]]
message = "[`SdkBody`](https://docs.rs/aws-smithy-types/latest/aws_smithy_types/body/struct.SdkBody.html) now implements the 1.0 version of the `http_body::Body` trait."
references = ["smithy-rs#3365", "aws-sdk-rust#1046"]
meta = { "breaking" = false, "tada" = true, "bug" = false, "target" = "all" }
author = "cayman-amzn"
 No newline at end of file
+2 −1
Original line number Diff line number Diff line
@@ -13,7 +13,7 @@ repository = "https://github.com/smithy-lang/smithy-rs"
[features]
byte-stream-poll-next = []
http-body-0-4-x = ["dep:http-body-0-4"]
http-body-1-x = ["dep:http-body-1-0", "dep:http-body-util", "dep:http-body-0-4", "dep:http-1x"]
http-body-1-x = ["dep:http-body-1-0", "dep:http-body-util", "dep:http-body-0-4", "dep:http-1x", "dep:hyper-1-0"]
hyper-0-14-x = ["dep:hyper-0-14"]
rt-tokio = [
    "dep:http-body-0-4",
@@ -38,6 +38,7 @@ http-body-0-4 = { package = "http-body", version = "0.4.4", optional = true }
http-body-1-0 = { package = "http-body", version = "1", optional = true }
http-body-util = { version = "0.1.0", optional = true }
hyper-0-14 = { package = "hyper", version = "0.14.26", optional = true }
hyper-1-0 = { package = "hyper", version = "1", optional = true }
itoa = "1.0.0"
num-integer = "0.1.44"
pin-project-lite = "0.2.9"
+2 −4
Original line number Diff line number Diff line
@@ -32,8 +32,6 @@ pin_project! {
    /// For handling responses, the type of the body will be controlled
    /// by the HTTP stack.
    ///
    // TODO(naming): Consider renaming to simply `Body`, although I'm concerned about naming headaches
    // between hyper::Body and our Body
    pub struct SdkBody {
        #[pin]
        inner: Inner,
@@ -191,10 +189,10 @@ impl SdkBody {
        }
    }

    #[cfg(feature = "http-body-0-4-x")]
    #[cfg(any(feature = "http-body-0-4-x", feature = "http-body-1-x",))]
    pub(crate) fn poll_next_trailers(
        self: Pin<&mut Self>,
        #[allow(unused)] cx: &mut Context<'_>,
        cx: &mut Context<'_>,
    ) -> Poll<Result<Option<http::HeaderMap<http::HeaderValue>>, Error>> {
        let this = self.project();
        match this.inner.project() {
+119 −5
Original line number Diff line number Diff line
@@ -23,6 +23,53 @@ impl SdkBody {
    {
        SdkBody::from_body_0_4_internal(Http1toHttp04::new(body.map_err(Into::into)))
    }

    pub(crate) fn poll_data_frame(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<http_body_1_0::Frame<Bytes>, Error>>> {
        match ready!(self.as_mut().poll_next(cx)) {
            // if there's no more data, try to return trailers
            None => match ready!(self.poll_next_trailers(cx)) {
                Ok(Some(trailers)) => Poll::Ready(Some(Ok(http_body_1_0::Frame::trailers(
                    convert_headers_0x_1x(trailers),
                )))),
                Ok(None) => Poll::Ready(None),
                Err(e) => Poll::Ready(Some(Err(e))),
            },
            Some(result) => match result {
                Err(err) => Poll::Ready(Some(Err(err))),
                Ok(bytes) => Poll::Ready(Some(Ok(http_body_1_0::Frame::data(bytes)))),
            },
        }
    }
}

#[cfg(feature = "http-body-1-x")]
impl http_body_1_0::Body for SdkBody {
    type Data = Bytes;
    type Error = Error;

    fn poll_frame(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<http_body_1_0::Frame<Self::Data>, Self::Error>>> {
        self.poll_data_frame(cx)
    }

    fn is_end_stream(&self) -> bool {
        self.is_end_stream()
    }

    fn size_hint(&self) -> http_body_1_0::SizeHint {
        let mut hint = http_body_1_0::SizeHint::default();
        let (lower, upper) = self.bounds_on_remaining_length();
        hint.set_lower(lower);
        if let Some(upper) = upper {
            hint.set_upper(upper);
        }
        hint
    }
}

pin_project! {
@@ -83,7 +130,7 @@ where
        // already read everything
        let this = self.project();
        match this.trailers.take() {
            Some(headers) => Poll::Ready(Ok(Some(convert_header_map(headers)))),
            Some(headers) => Poll::Ready(Ok(Some(convert_headers_1x_0x(headers)))),
            None => Poll::Ready(Ok(None)),
        }
    }
@@ -107,7 +154,7 @@ where
    }
}

fn convert_header_map(input: http_1x::HeaderMap) -> http::HeaderMap {
fn convert_headers_1x_0x(input: http_1x::HeaderMap) -> http::HeaderMap {
    let mut map = http::HeaderMap::with_capacity(input.capacity());
    let mut mem: Option<http_1x::HeaderName> = None;
    for (k, v) in input.into_iter() {
@@ -121,6 +168,20 @@ fn convert_header_map(input: http_1x::HeaderMap) -> http::HeaderMap {
    map
}

fn convert_headers_0x_1x(input: http::HeaderMap) -> http_1x::HeaderMap {
    let mut map = http_1x::HeaderMap::with_capacity(input.capacity());
    let mut mem: Option<http::HeaderName> = None;
    for (k, v) in input.into_iter() {
        let name = k.or_else(|| mem.clone()).unwrap();
        map.append(
            http_1x::HeaderName::from_bytes(name.as_str().as_bytes()).expect("already validated"),
            http_1x::HeaderValue::from_bytes(v.as_bytes()).expect("already validated"),
        );
        mem = Some(name);
    }
    map
}

#[cfg(test)]
mod test {
    use std::collections::VecDeque;
@@ -132,8 +193,9 @@ mod test {
    use http_1x::header::{CONTENT_LENGTH as CL1, CONTENT_TYPE as CT1};
    use http_1x::{HeaderMap, HeaderName, HeaderValue};
    use http_body_1_0::Frame;
    use http_body_util::BodyExt;

    use crate::body::http_body_1_x::convert_header_map;
    use crate::body::http_body_1_x::{convert_headers_1x_0x, Http1toHttp04};
    use crate::body::{Error, SdkBody};
    use crate::byte_stream::ByteStream;

@@ -215,10 +277,46 @@ mod test {
        while let Some(_data) = http_body_0_4::Body::data(&mut body).await {}
        assert_eq!(
            http_body_0_4::Body::trailers(&mut body).await.unwrap(),
            Some(convert_header_map(trailers()))
            Some(convert_headers_1x_0x(trailers()))
        );
    }

    #[tokio::test]
    async fn test_read_trailers_as_1x() {
        let body = TestBody {
            chunks: vec![
                Chunk::Data("123"),
                Chunk::Data("456"),
                Chunk::Data("789"),
                Chunk::Trailers(trailers()),
            ]
            .into(),
        };
        let body = SdkBody::from_body_1_x(body);

        let collected = BodyExt::collect(body).await.expect("should succeed");
        assert_eq!(collected.trailers(), Some(&trailers()));
        assert_eq!(collected.to_bytes().as_ref(), b"123456789");
    }

    #[tokio::test]
    async fn test_trailers_04x_to_1x() {
        let body = TestBody {
            chunks: vec![
                Chunk::Data("123"),
                Chunk::Data("456"),
                Chunk::Data("789"),
                Chunk::Trailers(trailers()),
            ]
            .into(),
        };
        let body = SdkBody::from_body_0_4(Http1toHttp04::new(body));

        let collected = BodyExt::collect(body).await.expect("should succeed");
        assert_eq!(collected.trailers(), Some(&trailers()));
        assert_eq!(collected.to_bytes().as_ref(), b"123456789");
    }

    #[tokio::test]
    async fn test_errors() {
        let body = TestBody {
@@ -235,6 +333,7 @@ mod test {
        let body = ByteStream::new(body);
        body.collect().await.expect_err("body returned an error");
    }

    #[tokio::test]
    async fn test_no_trailers() {
        let body = TestBody {
@@ -262,6 +361,21 @@ mod test {

        expect.insert(CL0, http::HeaderValue::from_static("1234"));

        assert_eq!(convert_header_map(http1_headermap), expect);
        assert_eq!(convert_headers_1x_0x(http1_headermap), expect);
    }

    #[test]
    fn sdkbody_debug_dyn() {
        let body = TestBody {
            chunks: vec![
                Chunk::Data("123"),
                Chunk::Data("456"),
                Chunk::Data("789"),
                Chunk::Trailers(trailers()),
            ]
            .into(),
        };
        let body = SdkBody::from_body_1_x(body);
        assert!(format!("{:?}", body).contains("BoxBody"));
    }
}
+1 −1
Original line number Diff line number Diff line
@@ -248,7 +248,7 @@ pin_project! {
    /// 3. **From an `SdkBody` directly**: For more advanced / custom use cases, a ByteStream can be created directly
    /// from an SdkBody. **When created from an SdkBody, care must be taken to ensure retriability.** An SdkBody is retryable
    /// when constructed from in-memory data or when using [`SdkBody::retryable`](crate::body::SdkBody::retryable).
    ///     ```no_run
    ///     ```ignore
    ///     # use hyper_0_14 as hyper;
    ///     use aws_smithy_types::byte_stream::ByteStream;
    ///     use aws_smithy_types::body::SdkBody;