Unverified Commit ff308686 authored by Nugine's avatar Nugine
Browse files

s3s: refactor streams

parent a71189ad
Loading
Loading
Loading
Loading
+32 −0
Original line number Diff line number Diff line
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;

use aws_smithy_http::body::SdkBody;

use futures::Stream;
use hyper::body::HttpBody;

pub fn s3s_body_into_sdk_body(body: s3s::Body) -> SdkBody {
    SdkBody::from_dyn(body.boxed())
}

pub fn sdk_body_into_s3s_body(body: SdkBody) -> s3s::Body {
    s3s::Body::from(Box::pin(Wrapper(body)) as s3s::stream::DynByteStream)
}

struct Wrapper(SdkBody);

impl Stream for Wrapper {
    type Item = Result<bytes::Bytes, s3s::StdError>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        hyper::body::HttpBody::poll_data(Pin::new(&mut self.0), cx)
    }
}

impl s3s::stream::ByteStream for Wrapper {
    fn remaining_length(&self) -> s3s::stream::RemainingLength {
        hyper::body::HttpBody::size_hint(&self.0).into()
    }
}
+6 −6
Original line number Diff line number Diff line
@@ -43,23 +43,23 @@ impl Service<Request<SdkBody>> for Connector {

    fn call(&mut self, req: Request<SdkBody>) -> Self::Future {
        let req = convert_input(req);
        let mut service = self.0.clone();
        Box::pin(async move { convert_output(service.call(req).await) })
        let service = self.0.clone();
        Box::pin(async move { convert_output(service.as_ref().call(req).await) })
    }
}

fn convert_input(mut req: Request<SdkBody>) -> Request<hyper::Body> {
fn convert_input(mut req: Request<SdkBody>) -> Request<s3s::Body> {
    if req.headers().contains_key(HOST).not() {
        let host = auto_host_header(req.uri());
        req.headers_mut().insert(HOST, host);
    }

    req.map(|sdk_body| hyper::Body::wrap_stream(ByteStream::from(sdk_body)))
    req.map(|sdk_body| s3s::Body::from(hyper::Body::wrap_stream(ByteStream::from(sdk_body))))
}

fn convert_output(result: S3Result<Response<hyper::Body>>) -> Result<Response<SdkBody>, ConnectorError> {
fn convert_output(result: S3Result<Response<s3s::Body>>) -> Result<Response<SdkBody>, ConnectorError> {
    match result {
        Ok(res) => Ok(res.map(SdkBody::from)),
        Ok(res) => Ok(res.map(|s3s_body| SdkBody::from(hyper::Body::from(s3s_body)))),
        Err(e) => Err(on_err(e)),
    }
}
+151 −0
Original line number Diff line number Diff line
use super::*;

use crate::body::s3s_body_into_sdk_body;
use crate::body::sdk_body_into_s3s_body;

use std::collections::HashMap;
use std::convert::Infallible;

macro_rules! identity_impl {
    ($($ty:ty),+) => {
        $(
            impl AwsConversion for $ty {
                type Target = $ty;
                type Error = Infallible;

                #[inline(always)]
                fn try_from_aws(x: Self::Target) -> Result<Self, Self::Error> {
                    Ok(x)
                }

                #[inline(always)]
                fn try_into_aws(x: Self) -> Result<Self::Target, Self::Error> {
                    Ok(x)
                }
            }
        )+
    };
}

identity_impl!(bool, i32, i64, String, HashMap<String, String>);

impl<T: AwsConversion> AwsConversion for Option<T> {
    type Target = Option<T::Target>;
    type Error = T::Error;

    fn try_from_aws(x: Self::Target) -> Result<Self, Self::Error> {
        x.map(try_from_aws).transpose()
    }

    fn try_into_aws(x: Self) -> Result<Self::Target, Self::Error> {
        x.map(try_into_aws).transpose()
    }
}

impl<T: AwsConversion> AwsConversion for Vec<T> {
    type Target = Vec<T::Target>;
    type Error = T::Error;

    fn try_from_aws(x: Self::Target) -> Result<Self, Self::Error> {
        x.into_iter().map(try_from_aws).collect()
    }

    fn try_into_aws(x: Self) -> Result<Self::Target, Self::Error> {
        x.into_iter().map(try_into_aws).collect()
    }
}

impl AwsConversion for s3s::dto::Timestamp {
    type Target = aws_sdk_s3::types::DateTime;
    type Error = S3Error;

    fn try_from_aws(x: Self::Target) -> S3Result<Self> {
        use aws_smithy_types_convert::date_time::DateTimeExt;
        Ok(Self::from(x.to_time().map_err(S3Error::internal_error)?))
    }

    fn try_into_aws(x: Self) -> S3Result<Self::Target> {
        use aws_smithy_types_convert::date_time::DateTimeExt;
        Ok(aws_sdk_s3::types::DateTime::from_time(x.into()))
    }
}

impl AwsConversion for s3s::dto::ContentType {
    type Target = String;
    type Error = S3Error;

    fn try_from_aws(x: Self::Target) -> S3Result<Self> {
        x.parse::<Self>().map_err(S3Error::internal_error)
    }

    fn try_into_aws(x: Self) -> S3Result<Self::Target> {
        Ok(x.to_string())
    }
}

impl AwsConversion for s3s::dto::CopySource {
    type Target = String;
    type Error = S3Error;

    fn try_from_aws(x: Self::Target) -> S3Result<Self> {
        Self::parse(x.as_str()).map_err(S3Error::internal_error)
    }

    fn try_into_aws(x: Self) -> S3Result<Self::Target> {
        Ok(x.format_to_string())
    }
}

impl AwsConversion for s3s::dto::Range {
    type Target = String;
    type Error = S3Error;

    fn try_from_aws(x: Self::Target) -> S3Result<Self> {
        Self::parse(x.as_str()).map_err(S3Error::internal_error)
    }

    fn try_into_aws(x: Self) -> S3Result<Self::Target> {
        Ok(x.format_to_string())
    }
}

impl AwsConversion for s3s::dto::Event {
    type Target = aws_sdk_s3::model::Event;
    type Error = Infallible;

    fn try_from_aws(x: Self::Target) -> Result<Self, Self::Error> {
        Ok(Self::from(x.as_str().to_owned()))
    }

    fn try_into_aws(x: Self) -> Result<Self::Target, Self::Error> {
        Ok(Self::Target::from(x))
    }
}

impl AwsConversion for s3s::dto::StreamingBlob {
    type Target = aws_sdk_s3::types::ByteStream;
    type Error = Infallible;

    fn try_from_aws(x: Self::Target) -> Result<Self, Self::Error> {
        // ByteStream -> SdkBody -> s3s::Body -> StreamingBlob
        Ok(sdk_body_into_s3s_body(x.into_inner()).into())
    }

    fn try_into_aws(x: Self) -> Result<Self::Target, Self::Error> {
        // StreamingBlob -> s3s::Body -> SdkBody -> ByteStream
        Ok(s3s_body_into_sdk_body(x.into()).into())
    }
}

impl AwsConversion for s3s::dto::Body {
    type Target = aws_sdk_s3::types::Blob;
    type Error = Infallible;

    fn try_from_aws(x: Self::Target) -> Result<Self, Self::Error> {
        Ok(x.into_inner().into())
    }

    fn try_into_aws(x: Self) -> Result<Self::Target, Self::Error> {
        Ok(Self::Target::new(x))
    }
}
+2 −144
Original line number Diff line number Diff line
mod builtin;
mod generated;

use s3s::s3_error;
use s3s::{S3Error, S3Result};

use std::collections::HashMap;
use std::convert::Infallible;


pub trait AwsConversion: Sized {
    type Target;
@@ -32,145 +32,3 @@ where
        None => Err(s3_error!(InternalError, "missing field: {}", field_name)),
    }
}

macro_rules! identity_impl {
    ($($ty:ty),+) => {
        $(
            impl AwsConversion for $ty {
                type Target = $ty;
                type Error = Infallible;

                #[inline(always)]
                fn try_from_aws(x: Self::Target) -> Result<Self, Self::Error> {
                    Ok(x)
                }

                #[inline(always)]
                fn try_into_aws(x: Self) -> Result<Self::Target, Self::Error> {
                    Ok(x)
                }
            }
        )+
    };
}

identity_impl!(bool, i32, i64, String, HashMap<String, String>);

impl<T: AwsConversion> AwsConversion for Option<T> {
    type Target = Option<T::Target>;
    type Error = T::Error;

    fn try_from_aws(x: Self::Target) -> Result<Self, Self::Error> {
        x.map(try_from_aws).transpose()
    }

    fn try_into_aws(x: Self) -> Result<Self::Target, Self::Error> {
        x.map(try_into_aws).transpose()
    }
}

impl<T: AwsConversion> AwsConversion for Vec<T> {
    type Target = Vec<T::Target>;
    type Error = T::Error;

    fn try_from_aws(x: Self::Target) -> Result<Self, Self::Error> {
        x.into_iter().map(try_from_aws).collect()
    }

    fn try_into_aws(x: Self) -> Result<Self::Target, Self::Error> {
        x.into_iter().map(try_into_aws).collect()
    }
}

impl AwsConversion for s3s::dto::Timestamp {
    type Target = aws_sdk_s3::types::DateTime;
    type Error = S3Error;

    fn try_from_aws(x: Self::Target) -> S3Result<Self> {
        use aws_smithy_types_convert::date_time::DateTimeExt;
        Ok(Self::from(x.to_time().map_err(S3Error::internal_error)?))
    }

    fn try_into_aws(x: Self) -> S3Result<Self::Target> {
        use aws_smithy_types_convert::date_time::DateTimeExt;
        Ok(aws_sdk_s3::types::DateTime::from_time(x.into()))
    }
}

impl AwsConversion for s3s::dto::ContentType {
    type Target = String;
    type Error = S3Error;

    fn try_from_aws(x: Self::Target) -> S3Result<Self> {
        x.parse::<Self>().map_err(S3Error::internal_error)
    }

    fn try_into_aws(x: Self) -> S3Result<Self::Target> {
        Ok(x.to_string())
    }
}

impl AwsConversion for s3s::dto::CopySource {
    type Target = String;
    type Error = S3Error;

    fn try_from_aws(x: Self::Target) -> S3Result<Self> {
        Self::parse(x.as_str()).map_err(S3Error::internal_error)
    }

    fn try_into_aws(x: Self) -> S3Result<Self::Target> {
        Ok(x.format_to_string())
    }
}

impl AwsConversion for s3s::dto::Range {
    type Target = String;
    type Error = S3Error;

    fn try_from_aws(x: Self::Target) -> S3Result<Self> {
        Self::parse(x.as_str()).map_err(S3Error::internal_error)
    }

    fn try_into_aws(x: Self) -> S3Result<Self::Target> {
        Ok(x.format_to_string())
    }
}

impl AwsConversion for s3s::dto::Event {
    type Target = aws_sdk_s3::model::Event;
    type Error = Infallible;

    fn try_from_aws(x: Self::Target) -> Result<Self, Self::Error> {
        Ok(Self::from(x.as_str().to_owned()))
    }

    fn try_into_aws(x: Self) -> Result<Self::Target, Self::Error> {
        Ok(Self::Target::from(x))
    }
}

impl AwsConversion for s3s::dto::StreamingBlob {
    type Target = aws_sdk_s3::types::ByteStream;
    type Error = Infallible;

    fn try_from_aws(x: Self::Target) -> Result<Self, Self::Error> {
        Ok(Self::wrap(x))
    }

    fn try_into_aws(x: Self) -> Result<Self::Target, Self::Error> {
        Ok(hyper::Body::wrap_stream(x.0).into())
    }
}

impl AwsConversion for s3s::dto::Body {
    type Target = aws_sdk_s3::types::Blob;
    type Error = Infallible;

    fn try_from_aws(x: Self::Target) -> Result<Self, Self::Error> {
        Ok(x.into_inner().into())
    }

    fn try_into_aws(x: Self) -> Result<Self::Target, Self::Error> {
        Ok(Self::Target::new(x))
    }
}
+2 −0
Original line number Diff line number Diff line
@@ -5,6 +5,8 @@
    clippy::cargo, //
)]

mod body;

pub mod conv;

mod connector;
Loading