Unverified Commit 5b4515c4 authored by Russell Cohen's avatar Russell Cohen Committed by GitHub
Browse files

Implement Tower Shims for SDK middleware (#187)

* Implement Tower Shims for SDK middleware

This commit implements 3 pieces of tower middleware:
1. A tower service supporting smithy_http::middleware::MapRequest
2. A tower service to dispatch operation::Request
3. A tower service to dispatch & parse the results of `operation::Operation`

* Fix clippy lints

* Go back to default debug implementation

* Check docs in rust-runtime test script

* Make Dispatch layer non-exhaustive

* Derive default for DispatchLayer
parent cb50262c
Loading
Loading
Loading
Loading
+20 −0
Original line number Diff line number Diff line
[package]
name = "smithy-http-tower"
version = "0.1.0"
authors = ["Russell Cohen <rcoh@amazon.com>"]
edition = "2018"
description = "Tower compatible shims for Smithy middleware"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
smithy-http = { path = "../smithy-http" }
tower = { version = "0.4.4" }
pin-project = "1"
http = "0.2.3"
bytes = "1"
http-body = "0.4.0"

[dev-dependencies]
tower = { version = "0.4.4", features = ["util"] }
tokio = { version = "1", features = ["full"]}
+87 −0
Original line number Diff line number Diff line
/*
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0.
 */

use crate::SendOperationError;
use pin_project::pin_project;
use smithy_http::body::SdkBody;
use smithy_http::operation;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::{BoxError, Layer, Service};

#[pin_project]
pub struct DispatchFuture<F> {
    #[pin]
    f: F,
}

/// Connects Operation driven middleware to an HTTP implementation.
///
/// It will also wrap the error type in OperationError to enable operation middleware
/// reporting specific errors
#[derive(Clone)]
pub struct DispatchService<S> {
    inner: S,
}

impl<F, T, E> Future for DispatchFuture<F>
where
    F: Future<Output = Result<T, E>>,
    E: Into<BoxError>,
{
    type Output = Result<T, SendOperationError>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();
        this.f
            .poll(cx)
            .map_err(|e| SendOperationError::RequestDispatchError(e.into()))
    }
}

impl<S> Service<operation::Request> for DispatchService<S>
where
    S: Service<http::Request<SdkBody>>,
    S::Error: Into<BoxError>,
{
    type Response = S::Response;
    type Error = SendOperationError;
    type Future = DispatchFuture<S::Future>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.inner
            .poll_ready(cx)
            .map_err(|e| SendOperationError::RequestDispatchError(e.into()))
    }

    fn call(&mut self, req: operation::Request) -> Self::Future {
        let (req, _property_bag) = req.into_parts();
        DispatchFuture {
            f: self.inner.call(req),
        }
    }
}

#[derive(Clone, Default)]
#[non_exhaustive]
pub struct DispatchLayer;

impl DispatchLayer {
    pub fn new() -> Self {
        DispatchLayer
    }
}

impl<S> Layer<S> for DispatchLayer
where
    S: Service<http::Request<SdkBody>>,
{
    type Service = DispatchService<S>;

    fn layer(&self, inner: S) -> Self::Service {
        DispatchService { inner }
    }
}
+109 −0
Original line number Diff line number Diff line
pub mod dispatch;
pub mod map_request;
pub mod parse_response;

use smithy_http::result::SdkError;
use tower::BoxError;

/// An Error Occurred During the process of sending an Operation
///
/// The variants are split to enable the final [SdkError](`smithy_http::result::SdkError`) to differentiate
/// between two types of errors:
/// 1. [`RequestConstructionError`](SendOperationError::RequestConstructionError): Errors where the
///    SDK never attempted to dispatch the underlying `http::Request`. These represent errors that
///    occurred during the request construction pipeline. These generally stem from configuration issues.
/// 2. [`RequestDispatchError`](SendOperationError::RequestDispatchError): Errors where the inner
///    tower service failed (eg. because the hostname couldn't be resolved, connection errors,
///    socket hangup etc.). In this case, we don't know how much of the request was _actually_ sent
///    to the client. We only know that we never got back an `http::Response` (and instead got an error).
///
/// `SendOperationError` is currently defined only in `smithy-http-tower` because it may be removed
/// or replaced with `SdkError` in the future.
///
/// `SendOperationError` MAY be moved to a private module in the future.
#[derive(Debug)]
pub enum SendOperationError {
    /// The request could not be constructed
    ///
    /// These errors usually stem from configuration issues (eg. no region, bad credential provider, etc.)
    RequestConstructionError(BoxError),

    /// The request could not be dispatched
    RequestDispatchError(BoxError),
}

/// Convert a `SendOperationError` into an `SdkError`
impl<E, B> From<SendOperationError> for SdkError<E, B> {
    fn from(err: SendOperationError) -> Self {
        match err {
            SendOperationError::RequestDispatchError(e) => {
                smithy_http::result::SdkError::DispatchFailure(e)
            }
            SendOperationError::RequestConstructionError(e) => {
                smithy_http::result::SdkError::ConstructionFailure(e)
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use crate::dispatch::DispatchLayer;
    use crate::map_request::MapRequestLayer;
    use crate::parse_response::ParseResponseLayer;
    use bytes::Bytes;
    use http::Response;
    use smithy_http::body::SdkBody;
    use smithy_http::middleware::MapRequest;
    use smithy_http::operation;
    use smithy_http::operation::{Operation, Request};
    use smithy_http::response::ParseStrictResponse;
    use std::convert::{Infallible, TryInto};
    use tower::{service_fn, Service, ServiceBuilder};

    /// Creates a stubbed service stack and runs it to validate that all the types line up &
    /// everything is properly wired
    #[tokio::test]
    async fn service_stack() {
        #[derive(Clone)]
        struct AddHeader;
        impl MapRequest for AddHeader {
            type Error = Infallible;
            fn apply(&self, request: Request) -> Result<Request, Self::Error> {
                request.augment(|mut req, _| {
                    req.headers_mut()
                        .insert("X-Test", "Value".try_into().unwrap());
                    Ok(req)
                })
            }
        }

        struct TestParseResponse;
        impl ParseStrictResponse for TestParseResponse {
            type Output = Result<String, Infallible>;

            fn parse(&self, _response: &Response<Bytes>) -> Self::Output {
                Ok("OK".to_string())
            }
        }

        let http_layer = service_fn(|_request: http::Request<SdkBody>| async move {
            if _request.headers().contains_key("X-Test") {
                Ok(http::Response::new(SdkBody::from("ok")))
            } else {
                Err("header not set")
            }
        });

        let mut svc = ServiceBuilder::new()
            .layer(ParseResponseLayer::<TestParseResponse>::new())
            .layer(MapRequestLayer::for_mapper(AddHeader))
            .layer(DispatchLayer)
            .service(http_layer);
        let req = http::Request::new(SdkBody::from("hello"));
        let req = operation::Request::new(req);
        let req = Operation::new(req, TestParseResponse);
        let resp = svc.call(req).await.expect("Response should succeed");
        assert_eq!(resp.parsed, "OK".to_string())
    }
}
+89 −0
Original line number Diff line number Diff line
/*
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0.
 */

use crate::SendOperationError;
use pin_project::pin_project;
use smithy_http::middleware::MapRequest;
use smithy_http::operation;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::{Layer, Service};

#[derive(Clone)]
/// Tower service for [`MapRequest`](smithy_http::middleware::MapRequest)
pub struct MapRequestService<S, M> {
    inner: S,
    mapper: M,
}

pub struct MapRequestLayer<M> {
    mapper: M,
}

impl<M: MapRequest + Clone> MapRequestLayer<M> {
    pub fn for_mapper(mapper: M) -> Self {
        MapRequestLayer { mapper }
    }
}

impl<S, M> Layer<S> for MapRequestLayer<M>
where
    M: Clone,
{
    type Service = MapRequestService<S, M>;

    fn layer(&self, inner: S) -> Self::Service {
        MapRequestService {
            inner,
            mapper: self.mapper.clone(),
        }
    }
}

#[pin_project(project = EnumProj)]
pub enum MapRequestFuture<F, E> {
    Inner(#[pin] F),
    Ready(Option<E>),
}

impl<O, F, E> Future for MapRequestFuture<F, E>
where
    F: Future<Output = Result<O, E>>,
{
    type Output = Result<O, E>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.project() {
            EnumProj::Ready(e) => Poll::Ready(Err(e.take().unwrap())),
            EnumProj::Inner(f) => f.poll(cx),
        }
    }
}

impl<S, M> Service<operation::Request> for MapRequestService<S, M>
where
    S: Service<operation::Request, Error = SendOperationError>,
    M: MapRequest,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = MapRequestFuture<S::Future, S::Error>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.inner.poll_ready(cx)
    }

    fn call(&mut self, req: operation::Request) -> Self::Future {
        match self
            .mapper
            .apply(req)
            .map_err(|e| SendOperationError::RequestConstructionError(e.into()))
        {
            Err(e) => MapRequestFuture::Ready(Some(e)),
            Ok(req) => MapRequestFuture::Inner(self.inner.call(req)),
        }
    }
}
+95 −0
Original line number Diff line number Diff line
/*
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0.
 */

use crate::SendOperationError;
use bytes::Bytes;
use smithy_http::middleware::load_response;
use smithy_http::operation;
use smithy_http::operation::Operation;
use smithy_http::response::ParseHttpResponse;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::{BoxError, Layer, Service};

/// `ParseResponseService` dispatches [`Operation`](smithy_http::operation::Operation)s and parses them.
///
/// `ParseResponseService` is intended to wrap a `DispatchService` which will handle the interface between
/// services that operate on [`operation::Request`](operation::Request) and services that operate
/// on [`http::Request`](http::Request).
#[derive(Clone)]
pub struct ParseResponseService<S, O> {
    inner: S,
    _output_type: PhantomData<O>,
}

#[derive(Default)]
pub struct ParseResponseLayer<O> {
    _output_type: PhantomData<O>,
}

/// `ParseResponseLayer` dispatches [`Operation`](smithy_http::operation::Operation)s and parses them.
impl<O> ParseResponseLayer<O> {
    pub fn new() -> Self {
        ParseResponseLayer {
            _output_type: Default::default(),
        }
    }
}

impl<S, O> Layer<S> for ParseResponseLayer<O>
where
    S: Service<operation::Request>,
{
    type Service = ParseResponseService<S, O>;

    fn layer(&self, inner: S) -> Self::Service {
        ParseResponseService {
            inner,
            _output_type: Default::default(),
        }
    }
}

type BoxedResultFuture<T, E> = Pin<Box<dyn Future<Output = Result<T, E>>>>;

/// ParseResponseService
///
/// Generic Parameter Listing:
/// `S`: The inner service
/// `O`: The type of the response parser whose output type is `Result<T, E>`
/// `T`: The happy path return of the response parser
/// `E`: The error path return of the response parser
/// `B`: The HTTP Body type returned by the inner service
/// `R`: The type of the retry policy
impl<S, O, T, E, B, R> tower::Service<operation::Operation<O, R>> for ParseResponseService<S, O>
where
    S: Service<operation::Request, Response = http::Response<B>, Error = SendOperationError>,
    S::Future: 'static,
    B: http_body::Body + Unpin + From<Bytes> + 'static,
    B::Error: Into<BoxError>,
    O: ParseHttpResponse<B, Output = Result<T, E>> + 'static,
{
    type Response = smithy_http::result::SdkSuccess<T, B>;
    type Error = smithy_http::result::SdkError<E, B>;
    type Future = BoxedResultFuture<Self::Response, Self::Error>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.inner.poll_ready(cx).map_err(|err| err.into())
    }

    fn call(&mut self, req: Operation<O, R>) -> Self::Future {
        let (req, handler) = req.into_request_response();
        let resp = self.inner.call(req);
        let fut = async move {
            match resp.await {
                Err(e) => Err(e.into()),
                Ok(resp) => load_response(resp, &handler).await,
            }
        };
        Box::pin(fut)
    }
}
Loading