Unverified Commit 7cf97ceb authored by Nugine's avatar Nugine
Browse files

s3s-aws: proxy: remove transform_body

parent 9411d3e7
Loading
Loading
Loading
Loading
+1 −3
Original line number Diff line number Diff line
@@ -43,9 +43,7 @@ pub fn codegen(ops: &Operations, rust_types: &RustTypes, g: &mut Codegen) {
                    s => s,
                };

                if field.type_ == "StreamingBlob" {
                    g.ln(f!("b = b.set_{aws_field_name}(Some(transform_body(input.{s3s_field_name}).await));"));
                } else if field.option_type {
                if field.option_type {
                    g.ln(f!("b = b.set_{aws_field_name}(try_into_aws(input.{s3s_field_name})?);"));
                } else {
                    g.ln(f!("b = b.set_{aws_field_name}(Some(try_into_aws(input.{s3s_field_name})?));"));
+3 −3
Original line number Diff line number Diff line
@@ -1797,7 +1797,7 @@ impl S3 for Proxy {
        debug!(?input);
        let mut b = self.0.put_object();
        b = b.set_acl(try_into_aws(input.acl)?);
        b = b.set_body(Some(transform_body(input.body).await));
        b = b.set_body(try_into_aws(input.body)?);
        b = b.set_bucket(Some(try_into_aws(input.bucket)?));
        b = b.set_bucket_key_enabled(Some(try_into_aws(input.bucket_key_enabled)?));
        b = b.set_cache_control(try_into_aws(input.cache_control)?);
@@ -2023,7 +2023,7 @@ impl S3 for Proxy {
    async fn upload_part(&self, input: s3s::dto::UploadPartInput) -> S3Result<s3s::dto::UploadPartOutput> {
        debug!(?input);
        let mut b = self.0.upload_part();
        b = b.set_body(Some(transform_body(input.body).await));
        b = b.set_body(try_into_aws(input.body)?);
        b = b.set_bucket(Some(try_into_aws(input.bucket)?));
        b = b.set_checksum_algorithm(try_into_aws(input.checksum_algorithm)?);
        b = b.set_checksum_crc32(try_into_aws(input.checksum_crc32)?);
@@ -2093,7 +2093,7 @@ impl S3 for Proxy {
        debug!(?input);
        let mut b = self.0.write_get_object_response();
        b = b.set_accept_ranges(try_into_aws(input.accept_ranges)?);
        b = b.set_body(Some(transform_body(input.body).await));
        b = b.set_body(try_into_aws(input.body)?);
        b = b.set_bucket_key_enabled(Some(try_into_aws(input.bucket_key_enabled)?));
        b = b.set_cache_control(try_into_aws(input.cache_control)?);
        b = b.set_checksum_crc32(try_into_aws(input.checksum_crc32)?);
+0 −42
Original line number Diff line number Diff line
@@ -35,45 +35,3 @@ impl From<aws_sdk_s3::Client> for Proxy {
        Self(value)
    }
}

async fn transform_body(body: Option<s3s::dto::StreamingBlob>) -> aws_sdk_s3::types::ByteStream {
    use aws_smithy_http::body::SdkBody;
    use futures::Stream;

    const AGGREGATION_THRESHOLD: usize = 8 * 1024;

    match body {
        None => SdkBody::empty().into(),
        Some(stream) => {
            let can_aggregate = match stream.size_hint() {
                (_, Some(upper)) => upper <= AGGREGATION_THRESHOLD,
                _ => false,
            };
            if can_aggregate {
                return aggregate(stream).await.into();
            }
            SdkBody::from(hyper::Body::wrap_stream(stream.0)).into()
        }
    }
}

async fn aggregate(stream: s3s::dto::StreamingBlob) -> aws_smithy_http::body::SdkBody {
    use aws_smithy_http::body::SdkBody;
    use bytes::BufMut;
    use futures::TryStreamExt;
    use std::future::ready;

    let result: Result<Vec<bytes::Bytes>, _> = stream.try_collect().await;
    match result {
        Ok(buf) => {
            let mut vec: Vec<u8> = Vec::with_capacity(buf.iter().map(|b| b.len()).sum());
            buf.into_iter().for_each(|bytes| vec.put(bytes));
            tracing::debug!(len=?vec.len(), "aggregated body");
            SdkBody::from(vec)
        }
        Err(err) => {
            let stream = futures::stream::once(ready(<Result<bytes::Bytes, _>>::Err(err)));
            SdkBody::from(hyper::Body::wrap_stream(stream))
        }
    }
}