Unverified Commit 723d2b18 authored by Nugine's avatar Nugine
Browse files

s3s-aws: proxy: transform_body

parent 96a97fcb
Loading
Loading
Loading
Loading
+3 −1
Original line number Diff line number Diff line
@@ -43,7 +43,9 @@ pub fn codegen(ops: &Operations, rust_types: &RustTypes, g: &mut Codegen) {
                    s => s,
                };

                if field.option_type {
                if field.type_ == "StreamingBlob" {
                    g.ln(f!("b = b.set_{aws_field_name}(Some(transform_body(input.{s3s_field_name}).await));"));
                } else if field.option_type {
                    g.ln(f!("b = b.set_{aws_field_name}(try_into_aws(input.{s3s_field_name})?);"));
                } else {
                    g.ln(f!("b = b.set_{aws_field_name}(Some(try_into_aws(input.{s3s_field_name})?));"));
+1 −0
Original line number Diff line number Diff line
@@ -15,6 +15,7 @@ aws-sdk-s3 = "0.23.0"
aws-smithy-http = "0.53.1"
aws-smithy-types = "0.53.1"
aws-smithy-types-convert = { version = "0.53.1", features = ["convert-time"] }
bytes = "1.3.0"
futures = { version = "0.3.25", default-features = false, features = ["std"] }
hyper = "0.14.23"
s3s = { version = "0.2.0-dev", path = "../s3s" }
+1 −1
Original line number Diff line number Diff line
@@ -158,7 +158,7 @@ impl AwsConversion for s3s::dto::StreamingBlob {
    }

    fn try_into_aws(x: Self) -> Result<Self::Target, Self::Error> {
        Ok(hyper::Body::wrap_stream(x).into())
        Ok(hyper::Body::wrap_stream(x.0).into())
    }
}

+3 −3
Original line number Diff line number Diff line
@@ -1797,7 +1797,7 @@ impl S3 for Proxy {
        debug!(?input);
        let mut b = self.0.put_object();
        b = b.set_acl(try_into_aws(input.acl)?);
        b = b.set_body(try_into_aws(input.body)?);
        b = b.set_body(Some(transform_body(input.body).await));
        b = b.set_bucket(Some(try_into_aws(input.bucket)?));
        b = b.set_bucket_key_enabled(Some(try_into_aws(input.bucket_key_enabled)?));
        b = b.set_cache_control(try_into_aws(input.cache_control)?);
@@ -2023,7 +2023,7 @@ impl S3 for Proxy {
    async fn upload_part(&self, input: s3s::dto::UploadPartInput) -> S3Result<s3s::dto::UploadPartOutput> {
        debug!(?input);
        let mut b = self.0.upload_part();
        b = b.set_body(try_into_aws(input.body)?);
        b = b.set_body(Some(transform_body(input.body).await));
        b = b.set_bucket(Some(try_into_aws(input.bucket)?));
        b = b.set_checksum_algorithm(try_into_aws(input.checksum_algorithm)?);
        b = b.set_checksum_crc32(try_into_aws(input.checksum_crc32)?);
@@ -2093,7 +2093,7 @@ impl S3 for Proxy {
        debug!(?input);
        let mut b = self.0.write_get_object_response();
        b = b.set_accept_ranges(try_into_aws(input.accept_ranges)?);
        b = b.set_body(try_into_aws(input.body)?);
        b = b.set_body(Some(transform_body(input.body).await));
        b = b.set_bucket_key_enabled(Some(try_into_aws(input.bucket_key_enabled)?));
        b = b.set_cache_control(try_into_aws(input.cache_control)?);
        b = b.set_checksum_crc32(try_into_aws(input.checksum_crc32)?);
+45 −5
Original line number Diff line number Diff line
@@ -27,12 +27,52 @@ macro_rules! wrap_sdk_error {

mod generated;

use aws_sdk_s3::Client;
pub struct Proxy(aws_sdk_s3::Client);

pub struct Proxy(Client);
impl From<aws_sdk_s3::Client> for Proxy {
    fn from(value: aws_sdk_s3::Client) -> Self {
        Self(value)
    }
}

async fn transform_body(body: Option<s3s::dto::StreamingBlob>) -> aws_sdk_s3::types::ByteStream {
    use aws_smithy_http::body::SdkBody;
    use futures::Stream;

    const AGGREGATION_THRESHOLD: usize = 8 * 1024;

    match body {
        None => SdkBody::empty().into(),
        Some(stream) => {
            let can_aggregate = match stream.size_hint() {
                (_, Some(upper)) => upper <= AGGREGATION_THRESHOLD,
                _ => false,
            };
            if can_aggregate {
                return aggregate(stream).await.into();
            }
            SdkBody::from(hyper::Body::wrap_stream(stream.0)).into()
        }
    }
}

impl From<Client> for Proxy {
    fn from(val: Client) -> Self {
        Self(val)
async fn aggregate(stream: s3s::dto::StreamingBlob) -> aws_smithy_http::body::SdkBody {
    use aws_smithy_http::body::SdkBody;
    use bytes::BufMut;
    use futures::TryStreamExt;
    use std::future::ready;

    let result: Result<Vec<bytes::Bytes>, _> = stream.try_collect().await;
    match result {
        Ok(buf) => {
            let mut vec: Vec<u8> = Vec::with_capacity(buf.iter().map(|b| b.len()).sum());
            buf.into_iter().for_each(|bytes| vec.put(bytes));
            tracing::debug!(len=?vec.len(), "aggregated body");
            SdkBody::from(vec)
        }
        Err(err) => {
            let stream = futures::stream::once(ready(<Result<bytes::Bytes, _>>::Err(err)));
            SdkBody::from(hyper::Body::wrap_stream(stream))
        }
    }
}