Unverified Commit aaaf97e9 authored by Russell Cohen's avatar Russell Cohen Committed by GitHub
Browse files

Sdkbody redux (#327)

* Use SdkBody instead of hyper::Body

As part of the path to supporting streaming request & response bodies, we need to remove the need to have a generic request/response body. This commit hides hyper::Body inside of SdkBody & also introduces a dynamic alternative to enable supporting any HTTP Body implementation.

* Delete ResponseBody & Update usages

Now that SDK body has a visible "Bytes" variant, ResponseBody is no longer needed as a bridge to expose debug information about responses.

* Rename _cx => cx
parent 25ea4a97
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -93,7 +93,7 @@ where
#[cfg(test)]
mod test {
    use crate::AwsErrorRetryPolicy;
    use smithy_http::middleware::ResponseBody;
    use smithy_http::body::SdkBody;
    use smithy_http::result::{SdkError, SdkSuccess};
    use smithy_http::retry::ClassifyResponse;
    use smithy_types::retry::{ErrorKind, ProvideErrorKind, RetryKind};
@@ -131,7 +131,7 @@ mod test {
    ) -> Result<SdkSuccess<()>, SdkError<E>> {
        Err(SdkError::ServiceError {
            err,
            raw: raw.map(|b| ResponseBody::from_static(b)),
            raw: raw.map(|b| SdkBody::from(b)),
        })
    }

+16 −11
Original line number Diff line number Diff line
@@ -8,7 +8,7 @@ use http::Request;
use hyper::client::ResponseFuture;
use hyper::Response;
use smithy_http::body::SdkBody;
use std::future::{Future, Ready};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::Service;
@@ -107,7 +107,7 @@ pub trait HttpService: Send + Sync {
    fn call(
        &mut self,
        req: http::Request<SdkBody>,
    ) -> Pin<Box<dyn Future<Output = Result<http::Response<hyper::Body>, BoxError>> + Send>>;
    ) -> Pin<Box<dyn Future<Output = Result<http::Response<SdkBody>, BoxError>> + Send>>;

    /// Return a Boxed-clone of this service
    ///
@@ -121,7 +121,7 @@ pub trait HttpService: Send + Sync {
/// This is to facilitate ease of use for people using `Standard::Dyn`
impl<S> HttpService for S
where
    S: Service<http::Request<SdkBody>, Response = http::Response<hyper::Body>>
    S: Service<http::Request<SdkBody>, Response = http::Response<SdkBody>>
        + Send
        + Sync
        + Clone
@@ -136,9 +136,13 @@ where
    fn call(
        &mut self,
        req: Request<SdkBody>,
    ) -> Pin<Box<dyn Future<Output = Result<Response<hyper::Body>, BoxError>> + Send>> {
    ) -> Pin<Box<dyn Future<Output = Result<Response<SdkBody>, BoxError>> + Send>> {
        let fut = Service::call(self, req);
        let fut = async move { fut.await.map_err(|err| err.into()) };
        let fut = async move {
            fut.await
                .map(|res| res.map(SdkBody::from))
                .map_err(|err| err.into())
        };
        Box::pin(fut)
    }

@@ -148,7 +152,7 @@ where
}

impl tower::Service<http::Request<SdkBody>> for Standard {
    type Response = http::Response<hyper::Body>;
    type Response = http::Response<SdkBody>;
    type Error = BoxError;
    type Future = StandardFuture;

@@ -181,17 +185,18 @@ impl tower::Service<http::Request<SdkBody>> for Standard {
#[pin_project::pin_project(project = FutProj)]
pub enum StandardFuture {
    Https(#[pin] ResponseFuture),
    TestConn(#[pin] Ready<Result<http::Response<hyper::Body>, BoxError>>),
    Dyn(#[pin] Pin<Box<dyn Future<Output = Result<http::Response<hyper::Body>, BoxError>> + Send>>),
    Dyn(#[pin] Pin<Box<dyn Future<Output = Result<http::Response<SdkBody>, BoxError>> + Send>>),
}

impl Future for StandardFuture {
    type Output = Result<http::Response<hyper::Body>, BoxError>;
    type Output = Result<http::Response<SdkBody>, BoxError>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.project() {
            FutProj::TestConn(ready_fut) => ready_fut.poll(cx),
            FutProj::Https(fut) => fut.poll(cx).map_err(|err| err.into()),
            FutProj::Https(fut) => fut
                .poll(cx)
                .map(|resp| resp.map(|res| res.map(SdkBody::from)))
                .map_err(|err| err.into()),
            FutProj::Dyn(dyn_fut) => dyn_fut.poll(cx),
        }
    }
+3 −6
Original line number Diff line number Diff line
@@ -89,10 +89,7 @@ impl Client<Standard> {

impl<S> Client<S>
where
    S: Service<http::Request<SdkBody>, Response = http::Response<hyper::Body>>
        + Send
        + Clone
        + 'static,
    S: Service<http::Request<SdkBody>, Response = http::Response<SdkBody>> + Send + Clone + 'static,
    S::Error: Into<BoxError> + Send + Sync + 'static,
    S::Future: Send + 'static,
{
@@ -102,7 +99,7 @@ where
    /// access the raw response use `call_raw`.
    pub async fn call<O, T, E, Retry>(&self, input: Operation<O, Retry>) -> Result<T, SdkError<E>>
    where
        O: ParseHttpResponse<hyper::Body, Output = Result<T, E>> + Send + Sync + Clone + 'static,
        O: ParseHttpResponse<SdkBody, Output = Result<T, E>> + Send + Sync + Clone + 'static,
        E: Error + ProvideErrorKind,
        Retry: ClassifyResponse<SdkSuccess<T>, SdkError<E>>,
    {
@@ -118,7 +115,7 @@ where
        input: Operation<O, Retry>,
    ) -> Result<SdkSuccess<R>, SdkError<E>>
    where
        O: ParseHttpResponse<hyper::Body, Output = Result<R, E>> + Send + Sync + Clone + 'static,
        O: ParseHttpResponse<SdkBody, Output = Result<R, E>> + Send + Sync + Clone + 'static,
        E: Error + ProvideErrorKind,
        Retry: ClassifyResponse<SdkSuccess<R>, SdkError<E>>,
    {
+2 −18
Original line number Diff line number Diff line
@@ -104,7 +104,7 @@ impl<B> TestConnection<B> {
}

impl<B: Into<hyper::Body>> tower::Service<http::Request<SdkBody>> for TestConnection<B> {
    type Response = http::Response<hyper::Body>;
    type Response = http::Response<SdkBody>;
    type Error = BoxError;
    type Future = Ready<Result<Self::Response, Self::Error>>;

@@ -119,7 +119,7 @@ impl<B: Into<hyper::Body>> tower::Service<http::Request<SdkBody>> for TestConnec
                .lock()
                .unwrap()
                .push(ValidateRequest { actual, expected });
            std::future::ready(Ok(resp.map(|body| body.into())))
            std::future::ready(Ok(resp.map(|body| SdkBody::from(body.into()))))
        } else {
            std::future::ready(Err("No more data".into()))
        }
@@ -130,22 +130,6 @@ impl<B: Into<hyper::Body>> tower::Service<http::Request<SdkBody>> for TestConnec
mod tests {
    use crate::test_connection::TestConnection;
    use crate::{conn, Client};
    use smithy_http::body::SdkBody;
    use tower::BoxError;

    /// Validate that the `TestConnection` meets the required trait bounds to be used with a aws-hyper service
    #[test]
    fn meets_trait_bounds() {
        fn check() -> impl tower::Service<
            http::Request<SdkBody>,
            Response = http::Response<hyper::Body>,
            Error = BoxError,
            Future = impl Send,
        > + Clone {
            TestConnection::<String>::new(vec![])
        }
        let _ = check();
    }

    fn is_send_sync<T: Send + Sync>(_: T) {}

+7 −0
Original line number Diff line number Diff line
@@ -94,6 +94,13 @@ fn test_operation() -> Operation<TestOperationParser, AwsErrorRetryPolicy> {
    Operation::new(req, TestOperationParser).with_retry_policy(AwsErrorRetryPolicy::new())
}

#[cfg(any(feature = "native-tls", feature = "rustls"))]
#[test]
fn test_default_client() {
    let client = Client::https();
    let _ = client.call(test_operation());
}

#[tokio::test]
async fn e2e_test() {
    let expected_req = http::Request::builder()
Loading