diff --git a/rust-runtime/smithy-http-tower/Cargo.toml b/rust-runtime/smithy-http-tower/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..9f3c796e1a7e7a6cfa307eb3f57f7267addf2425 --- /dev/null +++ b/rust-runtime/smithy-http-tower/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "smithy-http-tower" +version = "0.1.0" +authors = ["Russell Cohen "] +edition = "2018" +description = "Tower compatible shims for Smithy middleware" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +smithy-http = { path = "../smithy-http" } +tower = { version = "0.4.4" } +pin-project = "1" +http = "0.2.3" +bytes = "1" +http-body = "0.4.0" + +[dev-dependencies] +tower = { version = "0.4.4", features = ["util"] } +tokio = { version = "1", features = ["full"]} diff --git a/rust-runtime/smithy-http-tower/src/dispatch.rs b/rust-runtime/smithy-http-tower/src/dispatch.rs new file mode 100644 index 0000000000000000000000000000000000000000..c9f82f5754ccea36bfba3067abb12c76fc50a8f2 --- /dev/null +++ b/rust-runtime/smithy-http-tower/src/dispatch.rs @@ -0,0 +1,87 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +use crate::SendOperationError; +use pin_project::pin_project; +use smithy_http::body::SdkBody; +use smithy_http::operation; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tower::{BoxError, Layer, Service}; + +#[pin_project] +pub struct DispatchFuture { + #[pin] + f: F, +} + +/// Connects Operation driven middleware to an HTTP implementation. +/// +/// It will also wrap the error type in OperationError to enable operation middleware +/// reporting specific errors +#[derive(Clone)] +pub struct DispatchService { + inner: S, +} + +impl Future for DispatchFuture +where + F: Future>, + E: Into, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + this.f + .poll(cx) + .map_err(|e| SendOperationError::RequestDispatchError(e.into())) + } +} + +impl Service for DispatchService +where + S: Service>, + S::Error: Into, +{ + type Response = S::Response; + type Error = SendOperationError; + type Future = DispatchFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner + .poll_ready(cx) + .map_err(|e| SendOperationError::RequestDispatchError(e.into())) + } + + fn call(&mut self, req: operation::Request) -> Self::Future { + let (req, _property_bag) = req.into_parts(); + DispatchFuture { + f: self.inner.call(req), + } + } +} + +#[derive(Clone, Default)] +#[non_exhaustive] +pub struct DispatchLayer; + +impl DispatchLayer { + pub fn new() -> Self { + DispatchLayer + } +} + +impl Layer for DispatchLayer +where + S: Service>, +{ + type Service = DispatchService; + + fn layer(&self, inner: S) -> Self::Service { + DispatchService { inner } + } +} diff --git a/rust-runtime/smithy-http-tower/src/lib.rs b/rust-runtime/smithy-http-tower/src/lib.rs new file mode 100644 index 0000000000000000000000000000000000000000..f2896a780cc2ce6d4815624d129df58850ce5489 --- /dev/null +++ b/rust-runtime/smithy-http-tower/src/lib.rs @@ -0,0 +1,109 @@ +pub mod dispatch; +pub mod map_request; +pub mod parse_response; + +use smithy_http::result::SdkError; +use tower::BoxError; + +/// An Error Occurred During the process of sending an Operation +/// +/// The variants are split to enable the final [SdkError](`smithy_http::result::SdkError`) to differentiate +/// between two types of errors: +/// 1. [`RequestConstructionError`](SendOperationError::RequestConstructionError): Errors where the +/// SDK never attempted to dispatch the underlying `http::Request`. These represent errors that +/// occurred during the request construction pipeline. These generally stem from configuration issues. +/// 2. [`RequestDispatchError`](SendOperationError::RequestDispatchError): Errors where the inner +/// tower service failed (eg. because the hostname couldn't be resolved, connection errors, +/// socket hangup etc.). In this case, we don't know how much of the request was _actually_ sent +/// to the client. We only know that we never got back an `http::Response` (and instead got an error). +/// +/// `SendOperationError` is currently defined only in `smithy-http-tower` because it may be removed +/// or replaced with `SdkError` in the future. +/// +/// `SendOperationError` MAY be moved to a private module in the future. +#[derive(Debug)] +pub enum SendOperationError { + /// The request could not be constructed + /// + /// These errors usually stem from configuration issues (eg. no region, bad credential provider, etc.) + RequestConstructionError(BoxError), + + /// The request could not be dispatched + RequestDispatchError(BoxError), +} + +/// Convert a `SendOperationError` into an `SdkError` +impl From for SdkError { + fn from(err: SendOperationError) -> Self { + match err { + SendOperationError::RequestDispatchError(e) => { + smithy_http::result::SdkError::DispatchFailure(e) + } + SendOperationError::RequestConstructionError(e) => { + smithy_http::result::SdkError::ConstructionFailure(e) + } + } + } +} + +#[cfg(test)] +mod tests { + use crate::dispatch::DispatchLayer; + use crate::map_request::MapRequestLayer; + use crate::parse_response::ParseResponseLayer; + use bytes::Bytes; + use http::Response; + use smithy_http::body::SdkBody; + use smithy_http::middleware::MapRequest; + use smithy_http::operation; + use smithy_http::operation::{Operation, Request}; + use smithy_http::response::ParseStrictResponse; + use std::convert::{Infallible, TryInto}; + use tower::{service_fn, Service, ServiceBuilder}; + + /// Creates a stubbed service stack and runs it to validate that all the types line up & + /// everything is properly wired + #[tokio::test] + async fn service_stack() { + #[derive(Clone)] + struct AddHeader; + impl MapRequest for AddHeader { + type Error = Infallible; + fn apply(&self, request: Request) -> Result { + request.augment(|mut req, _| { + req.headers_mut() + .insert("X-Test", "Value".try_into().unwrap()); + Ok(req) + }) + } + } + + struct TestParseResponse; + impl ParseStrictResponse for TestParseResponse { + type Output = Result; + + fn parse(&self, _response: &Response) -> Self::Output { + Ok("OK".to_string()) + } + } + + let http_layer = service_fn(|_request: http::Request| async move { + if _request.headers().contains_key("X-Test") { + Ok(http::Response::new(SdkBody::from("ok"))) + } else { + Err("header not set") + } + }); + + let mut svc = ServiceBuilder::new() + .layer(ParseResponseLayer::::new()) + .layer(MapRequestLayer::for_mapper(AddHeader)) + .layer(DispatchLayer) + .service(http_layer); + let req = http::Request::new(SdkBody::from("hello")); + let req = operation::Request::new(req); + let req = Operation::new(req, TestParseResponse); + let resp = svc.call(req).await.expect("Response should succeed"); + assert_eq!(resp.parsed, "OK".to_string()) + } +} diff --git a/rust-runtime/smithy-http-tower/src/map_request.rs b/rust-runtime/smithy-http-tower/src/map_request.rs new file mode 100644 index 0000000000000000000000000000000000000000..5f741e3f230640bb26ad49efc7c3e672d5189faa --- /dev/null +++ b/rust-runtime/smithy-http-tower/src/map_request.rs @@ -0,0 +1,89 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +use crate::SendOperationError; +use pin_project::pin_project; +use smithy_http::middleware::MapRequest; +use smithy_http::operation; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tower::{Layer, Service}; + +#[derive(Clone)] +/// Tower service for [`MapRequest`](smithy_http::middleware::MapRequest) +pub struct MapRequestService { + inner: S, + mapper: M, +} + +pub struct MapRequestLayer { + mapper: M, +} + +impl MapRequestLayer { + pub fn for_mapper(mapper: M) -> Self { + MapRequestLayer { mapper } + } +} + +impl Layer for MapRequestLayer +where + M: Clone, +{ + type Service = MapRequestService; + + fn layer(&self, inner: S) -> Self::Service { + MapRequestService { + inner, + mapper: self.mapper.clone(), + } + } +} + +#[pin_project(project = EnumProj)] +pub enum MapRequestFuture { + Inner(#[pin] F), + Ready(Option), +} + +impl Future for MapRequestFuture +where + F: Future>, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.project() { + EnumProj::Ready(e) => Poll::Ready(Err(e.take().unwrap())), + EnumProj::Inner(f) => f.poll(cx), + } + } +} + +impl Service for MapRequestService +where + S: Service, + M: MapRequest, +{ + type Response = S::Response; + type Error = S::Error; + type Future = MapRequestFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: operation::Request) -> Self::Future { + match self + .mapper + .apply(req) + .map_err(|e| SendOperationError::RequestConstructionError(e.into())) + { + Err(e) => MapRequestFuture::Ready(Some(e)), + Ok(req) => MapRequestFuture::Inner(self.inner.call(req)), + } + } +} diff --git a/rust-runtime/smithy-http-tower/src/parse_response.rs b/rust-runtime/smithy-http-tower/src/parse_response.rs new file mode 100644 index 0000000000000000000000000000000000000000..7c65437ae81e81f0304e57ac1560c9af78be7e94 --- /dev/null +++ b/rust-runtime/smithy-http-tower/src/parse_response.rs @@ -0,0 +1,95 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +use crate::SendOperationError; +use bytes::Bytes; +use smithy_http::middleware::load_response; +use smithy_http::operation; +use smithy_http::operation::Operation; +use smithy_http::response::ParseHttpResponse; +use std::future::Future; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tower::{BoxError, Layer, Service}; + +/// `ParseResponseService` dispatches [`Operation`](smithy_http::operation::Operation)s and parses them. +/// +/// `ParseResponseService` is intended to wrap a `DispatchService` which will handle the interface between +/// services that operate on [`operation::Request`](operation::Request) and services that operate +/// on [`http::Request`](http::Request). +#[derive(Clone)] +pub struct ParseResponseService { + inner: S, + _output_type: PhantomData, +} + +#[derive(Default)] +pub struct ParseResponseLayer { + _output_type: PhantomData, +} + +/// `ParseResponseLayer` dispatches [`Operation`](smithy_http::operation::Operation)s and parses them. +impl ParseResponseLayer { + pub fn new() -> Self { + ParseResponseLayer { + _output_type: Default::default(), + } + } +} + +impl Layer for ParseResponseLayer +where + S: Service, +{ + type Service = ParseResponseService; + + fn layer(&self, inner: S) -> Self::Service { + ParseResponseService { + inner, + _output_type: Default::default(), + } + } +} + +type BoxedResultFuture = Pin>>>; + +/// ParseResponseService +/// +/// Generic Parameter Listing: +/// `S`: The inner service +/// `O`: The type of the response parser whose output type is `Result` +/// `T`: The happy path return of the response parser +/// `E`: The error path return of the response parser +/// `B`: The HTTP Body type returned by the inner service +/// `R`: The type of the retry policy +impl tower::Service> for ParseResponseService +where + S: Service, Error = SendOperationError>, + S::Future: 'static, + B: http_body::Body + Unpin + From + 'static, + B::Error: Into, + O: ParseHttpResponse> + 'static, +{ + type Response = smithy_http::result::SdkSuccess; + type Error = smithy_http::result::SdkError; + type Future = BoxedResultFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(|err| err.into()) + } + + fn call(&mut self, req: Operation) -> Self::Future { + let (req, handler) = req.into_request_response(); + let resp = self.inner.call(req); + let fut = async move { + match resp.await { + Err(e) => Err(e.into()), + Ok(resp) => load_response(resp, &handler).await, + } + }; + Box::pin(fut) + } +} diff --git a/rust-runtime/smithy-http/src/body.rs b/rust-runtime/smithy-http/src/body.rs index dea1386d92d7fb3369a15848e72679bc2d18a8cc..da6690e065bc0a623c7fdca9ce69efb0f4dc3ca4 100644 --- a/rust-runtime/smithy-http/src/body.rs +++ b/rust-runtime/smithy-http/src/body.rs @@ -19,6 +19,8 @@ type BodyError = Box; /// /// TODO: Consider renaming to simply `Body`, although I'm concerned about naming headaches /// between hyper::Body and our Body +/// TODO: Once we add streaming bodies, we will need a custom debug implementation +#[derive(Debug)] pub enum SdkBody { Once(Option), // TODO: tokio::sync::mpsc based streaming body diff --git a/rust-runtime/smithy-http/src/operation.rs b/rust-runtime/smithy-http/src/operation.rs index 2df4e5cb4ffea916e2cd6727b24ec79399f68a4d..3414b2c38b7f2eb44b8986125947f2acf4d612a4 100644 --- a/rust-runtime/smithy-http/src/operation.rs +++ b/rust-runtime/smithy-http/src/operation.rs @@ -9,11 +9,13 @@ pub struct Operation { _retry_policy: R, } -impl Operation { +impl Operation { pub fn into_request_response(self) -> (Request, H) { (self.request, self.response_handler) } +} +impl Operation { pub fn new(request: Request, response_handler: H) -> Self { Operation { request, diff --git a/rust-runtime/test.sh b/rust-runtime/test.sh index 1f5cd64f859da7dcce336f414091dc33e118a20d..356f3dde00cedf9943b8740a18db7c48c4673052 100755 --- a/rust-runtime/test.sh +++ b/rust-runtime/test.sh @@ -14,5 +14,6 @@ do (cd "$crate" && cargo fmt -- --check) (cd "$crate" && cargo clippy -- -D warnings) (cd "$crate" && cargo test) + (cd "$crate" && RUSTDOCFLAGS="-D warnings" cargo doc --no-deps) fi done