Unverified Commit 64fe38cb authored by Russell Cohen's avatar Russell Cohen Committed by GitHub
Browse files

Add support for file-based ByteSteam (#412)

* Add support for file-based ByteSteam

This commit adds support for creating a `ByteStream` from a path or from
a file using `ReaderStream` as the transport mechanism.

In the long term, the implementation details may change significantly, however the external API,
"from_path", is expected to be stable.

* Inner does not need to be pub crate

* Show doc feature hints

* Ensure SdkBody is Send + Sync
parent 7321b9fc
Loading
Loading
Loading
Loading
+11 −0
Original line number Diff line number Diff line
@@ -5,6 +5,10 @@ authors = ["rcoh@amazon.com"]
edition = "2018"
license = "Apache-2.0"

[features]
bytestream-util = ["tokio/fs", "tokio-util/io"]
default = ["bytestream-util"]

[dependencies]
smithy-types = { path = "../smithy-types" }
bytes = "1"
@@ -21,9 +25,16 @@ hyper = "0.14.5"
# ByteStream internals
bytes-utils = "0.1.1"
futures-core = "0.3.14"
tokio = { version = "1.6", optional = true }
tokio-util = { version = "0.6", optional = true}

[dev-dependencies]
proptest = "1"
base64 = "0.13.0"
tokio = {version = "1.6", features = ["macros", "rt", "fs", "io-util"]}
tokio-stream = "0.1.5"
tempfile = "3.2.0"

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
+67 −18
Original line number Diff line number Diff line
@@ -8,9 +8,9 @@ use http::{HeaderMap, HeaderValue};
use http_body::{Body, SizeHint};
use pin_project::pin_project;
use std::error::Error as StdError;
use std::fmt;
use std::fmt::{Debug, Formatter};
use std::fmt::{self, Debug, Formatter};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

pub type Error = Box<dyn StdError + Send + Sync>;
@@ -24,8 +24,24 @@ pub type Error = Box<dyn StdError + Send + Sync>;
/// TODO: Consider renaming to simply `Body`, although I'm concerned about naming headaches
/// between hyper::Body and our Body
#[pin_project]
#[derive(Debug)]
pub struct SdkBody(#[pin] Inner);
pub struct SdkBody {
    #[pin]
    inner: Inner,
    /// An optional function to recreate the inner body
    ///
    /// In the event of retry, this function will be called to generate a new body. See
    /// [`try_clone()`](SdkBody::try_clone)
    rebuild: Option<Arc<dyn (Fn() -> Inner) + Send + Sync>>,
}

impl Debug for SdkBody {
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
        f.debug_struct("SdkBody")
            .field("inner", &self.inner)
            .field("retryable", &self.rebuild.is_some())
            .finish()
    }
}

type BoxBody = http_body::combinators::BoxBody<Bytes, Error>;

@@ -55,22 +71,39 @@ impl Debug for Inner {
impl SdkBody {
    /// Construct an SdkBody from a Boxed implementation of http::Body
    pub fn from_dyn(body: BoxBody) -> Self {
        Self(Inner::Dyn(body))
        Self {
            inner: Inner::Dyn(body),
            rebuild: None,
        }
    }

    pub fn retryable(f: impl Fn() -> SdkBody + Send + Sync + 'static) -> Self {
        let initial = f();
        SdkBody {
            inner: initial.inner,
            rebuild: Some(Arc::new(move || f().inner)),
        }
    }

    pub fn taken() -> Self {
        Self(Inner::Taken)
        Self {
            inner: Inner::Taken,
            rebuild: None,
        }
    }

    pub fn empty() -> Self {
        Self(Inner::Once(None))
        Self {
            inner: Inner::Once(None),
            rebuild: None,
        }
    }

    fn poll_inner(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<Bytes, Error>>> {
        match self.project().0.project() {
        match self.project().inner.project() {
            InnerProj::Once(ref mut opt) => {
                let data = opt.take();
                match data {
@@ -92,7 +125,7 @@ impl SdkBody {
    /// If this SdkBody is NOT streaming, this will return the byte slab
    /// If this SdkBody is streaming, this will return `None`
    pub fn bytes(&self) -> Option<&[u8]> {
        match &self.0 {
        match &self.inner {
            Inner::Once(Some(b)) => Some(&b),
            Inner::Once(None) => Some(&[]),
            _ => None,
@@ -100,10 +133,13 @@ impl SdkBody {
    }

    pub fn try_clone(&self) -> Option<Self> {
        match &self.0 {
            Inner::Once(bytes) => Some(SdkBody(Inner::Once(bytes.clone()))),
            _ => None,
        self.rebuild.as_ref().map(|rebuild| {
            let next = rebuild();
            SdkBody {
                inner: next,
                rebuild: self.rebuild.clone(),
            }
        })
    }

    pub fn content_length(&self) -> Option<u64> {
@@ -119,13 +155,19 @@ impl From<&str> for SdkBody {

impl From<Bytes> for SdkBody {
    fn from(bytes: Bytes) -> Self {
        SdkBody(Inner::Once(Some(bytes)))
        SdkBody {
            inner: Inner::Once(Some(bytes.clone())),
            rebuild: Some(Arc::new(move || Inner::Once(Some(bytes.clone())))),
        }
    }
}

impl From<hyper::Body> for SdkBody {
    fn from(body: hyper::Body) -> Self {
        SdkBody(Inner::Streaming(body))
        SdkBody {
            inner: Inner::Streaming(body),
            rebuild: None,
        }
    }
}

@@ -143,7 +185,7 @@ impl From<String> for SdkBody {

impl From<&[u8]> for SdkBody {
    fn from(data: &[u8]) -> Self {
        SdkBody(Inner::Once(Some(Bytes::copy_from_slice(data))))
        Self::from(Bytes::copy_from_slice(data))
    }
}

@@ -166,7 +208,7 @@ impl http_body::Body for SdkBody {
    }

    fn is_end_stream(&self) -> bool {
        match &self.0 {
        match &self.inner {
            Inner::Once(None) => true,
            Inner::Once(Some(bytes)) => bytes.is_empty(),
            Inner::Streaming(hyper_body) => hyper_body.is_end_stream(),
@@ -176,7 +218,7 @@ impl http_body::Body for SdkBody {
    }

    fn size_hint(&self) -> SizeHint {
        match &self.0 {
        match &self.inner {
            Inner::Once(None) => SizeHint::with_exact(0),
            Inner::Once(Some(bytes)) => SizeHint::with_exact(bytes.len() as u64),
            Inner::Streaming(hyper_body) => hyper_body.size_hint(),
@@ -245,4 +287,11 @@ mod test {
        // actually don't really care what the debug impl is, just that it doesn't crash
        let _ = format!("{:?}", body);
    }

    fn is_send<T: Send + Sync>() {}

    #[test]
    fn sdk_body_is_send() {
        is_send::<SdkBody>()
    }
}
+129 −3
Original line number Diff line number Diff line
@@ -71,19 +71,40 @@
//!     Ok(())
//! }
//! ```
//!
//! ### Create a ByteStream from a file
//! **Note:** This is only available with `bytestream-util` enabled.
//! ```rust
//! use smithy_http::byte_stream::ByteStream;
//! use std::path::Path;
//! struct GetObjectInput {
//!   body: ByteStream
//! }
//!
//! async fn bytestream_from_file() -> GetObjectInput {
//!     let f = Path::new("docs/some-large-file.csv");
//!     let bytestream = ByteStream::from_path(&f).await.expect("valid path");
//!     GetObjectInput { body: bytestream }
//! }
//! ```

use crate::body::SdkBody;
use bytes::Buf;
use bytes::Bytes;
use bytes_utils::SegmentedBuf;
use http_body::combinators::BoxBody;
use http_body::Body;
use pin_project::pin_project;
use std::error::Error as StdError;
use std::fmt::{Debug, Formatter};
use std::io::IoSlice;
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};

#[cfg(feature = "bytestream-util")]
mod bytestream_util;

/// Stream of binary data
///
/// `ByteStream` wraps a stream of binary data for ease of use.
@@ -132,7 +153,6 @@ use std::task::{Context, Poll};
///     }
///     ```
///
/// `ByteStream`
#[pin_project]
#[derive(Debug)]
pub struct ByteStream(#[pin] Inner<SdkBody>);
@@ -173,6 +193,59 @@ impl ByteStream {
    pub async fn collect(self) -> Result<AggregatedBytes, Error> {
        self.0.collect().await.map_err(|err| Error(err))
    }

    /// Create a ByteStream that streams data from the filesystem
    ///
    /// This function creates a retryable ByteStream for a given `path`. The returned ByteStream
    /// will provide a size hint when used as an HTTP body. If the request fails, the read will
    /// begin again by reloading the file handle.
    ///
    /// ## Warning
    /// The contents of the file MUST not change during retries. The length & checksum of the file
    /// will be cached. If the contents of the file change, the operation will almost certainly fail.
    ///
    /// Furthermore, a partial write MAY seek in the file and resume from the previous location.
    ///
    /// # Example
    /// ```rust
    /// use smithy_http::byte_stream::ByteStream;
    /// use std::path::Path;
    ///  async fn make_bytestream() -> ByteStream {
    ///     ByteStream::from_path(&Path::new("docs/rows.csv")).await.expect("file should be readable")
    /// }
    /// ```
    #[cfg(feature = "bytestream-util")]
    #[cfg_attr(docsrs, doc(cfg(feature = "bytestream-util")))]
    pub async fn from_path(path: &Path) -> Result<Self, Error> {
        let path_buf = path.to_path_buf();
        let sz = tokio::fs::metadata(path)
            .await
            .map_err(|err| Error(err.into()))?
            .len();
        let body_loader = move || {
            SdkBody::from_dyn(BoxBody::new(bytestream_util::PathBody::from_path(
                path_buf.as_path(),
                sz,
            )))
        };
        Ok(ByteStream::new(SdkBody::retryable(body_loader)))
    }

    /// Create a ByteStream from a file
    ///
    /// NOTE: This will NOT result in a retryable ByteStream. For a ByteStream that can be retried in the case of
    /// upstream failures, use [`ByteStream::from_path`](ByteStream::from_path)
    #[cfg(feature = "bytestream-util")]
    #[cfg_attr(docsrs, doc(cfg(feature = "bytestream-util")))]
    pub async fn from_file(file: tokio::fs::File) -> Result<Self, Error> {
        let sz = file
            .metadata()
            .await
            .map_err(|err| Error(err.into()))?
            .len();
        let body = SdkBody::from_dyn(BoxBody::new(bytestream_util::PathBody::from_file(file, sz)));
        Ok(ByteStream::new(body))
    }
}

impl Default for ByteStream {
@@ -189,6 +262,18 @@ impl From<SdkBody> for ByteStream {
    }
}

impl From<Bytes> for ByteStream {
    fn from(input: Bytes) -> Self {
        ByteStream::new(SdkBody::from(input))
    }
}

impl From<Vec<u8>> for ByteStream {
    fn from(input: Vec<u8>) -> Self {
        Self::from(Bytes::from(input))
    }
}

#[derive(Debug)]
pub struct Error(Box<dyn StdError + Send + Sync + 'static>);

@@ -318,8 +403,10 @@ where

#[cfg(test)]
mod tests {
    use crate::byte_stream::Inner;
    use bytes::Bytes;
    use crate::byte_stream::{ByteStream, Inner};
    use bytes::{Buf, Bytes};
    use http_body::Body;
    use std::error::Error;

    #[tokio::test]
    async fn read_from_string_body() {
@@ -348,4 +435,43 @@ mod tests {
            Bytes::from("data 1data 2data 3")
        );
    }

    #[cfg(feature = "bytestream-util")]
    #[tokio::test]
    async fn path_based_bytestreams() -> Result<(), Box<dyn Error>> {
        use std::io::Write;
        use tempfile::NamedTempFile;
        let mut file = NamedTempFile::new()?;

        for i in 0..10000 {
            writeln!(file, "Brian was here. Briefly. {}", i)?;
        }
        let body = ByteStream::from_path(file.path()).await?.into_inner();
        // assert that a valid size hint is immediately ready
        assert_eq!(body.size_hint().exact(), Some(298890));
        let mut body1 = body.try_clone().expect("retryable bodies are cloneable");
        // read a little bit from one of the clones
        let some_data = body1
            .data()
            .await
            .expect("should have some data")
            .expect("read should not fail");
        assert!(!some_data.is_empty());
        // make some more clones
        let body2 = body.try_clone().expect("retryable bodies are cloneable");
        let body3 = body.try_clone().expect("retryable bodies are cloneable");
        let body2 = ByteStream::new(body2).collect().await?.into_bytes();
        let body3 = ByteStream::new(body3).collect().await?.into_bytes();
        assert_eq!(body2, body3);
        assert!(body2.starts_with(b"Brian was here."));
        assert!(body2.ends_with(b"9999\n"));
        assert_eq!(body2.len(), 298890);

        assert_eq!(
            ByteStream::new(body1).collect().await?.remaining(),
            298890 - some_data.len()
        );

        Ok(())
    }
}
+101 −0
Original line number Diff line number Diff line
/*
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0.
 */

use bytes::Bytes;
use futures_core::{ready, Stream};
use http::HeaderMap;
use http_body::{Body, SizeHint};
use std::future::Future;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::fs::File;
use tokio::io;
use tokio_util::io::ReaderStream;

/// An HTTP Body designed to wrap files
///
/// PathBody is a three-phase HTTP body designed to wrap files with three specific features:
/// 1. The underlying file is wrapped with StreamReader to implement HTTP body
/// 2. It can be constructed directly from a path so it's easy to use during retries
/// 3. Provide size hint
pub struct PathBody {
    state: State,
    sz: u64,
}

impl PathBody {
    pub fn from_path(path: &Path, sz: u64) -> Self {
        PathBody {
            state: State::Unloaded(path.to_path_buf()),
            sz,
        }
    }
    pub fn from_file(file: File, sz: u64) -> Self {
        PathBody {
            state: State::Loaded(ReaderStream::new(file)),
            sz,
        }
    }
}

enum State {
    Unloaded(PathBuf),
    Loading(Pin<Box<dyn Future<Output = io::Result<File>> + Send + Sync + 'static>>),
    Loaded(tokio_util::io::ReaderStream<File>),
}

impl Body for PathBody {
    type Data = Bytes;
    type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

    fn poll_data(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
        loop {
            match self.state {
                State::Unloaded(ref path_buf) => {
                    let buf = path_buf.clone();
                    self.state = State::Loading(Box::pin(async move {
                        let file = tokio::fs::File::open(&buf).await?;
                        Ok(file)
                    }));
                }
                State::Loading(ref mut future) => {
                    match ready!(Pin::new(future).poll(cx)) {
                        Ok(file) => {
                            self.state = State::Loaded(ReaderStream::new(file));
                        }
                        Err(e) => return Poll::Ready(Some(Err(e.into()))),
                    };
                }
                State::Loaded(ref mut stream) => {
                    return match ready!(Pin::new(stream).poll_next(cx)) {
                        Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))),
                        None => Poll::Ready(None),
                        Some(Err(e)) => Poll::Ready(Some(Err(e.into()))),
                    }
                }
            };
        }
    }

    fn poll_trailers(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
    ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
        Poll::Ready(Ok(None))
    }

    fn is_end_stream(&self) -> bool {
        // fast path end-stream for empty files
        self.sz == 0
    }

    fn size_hint(&self) -> SizeHint {
        SizeHint::with_exact(self.sz)
    }
}
+2 −0
Original line number Diff line number Diff line
@@ -3,6 +3,8 @@
 * SPDX-License-Identifier: Apache-2.0.
 */

#![cfg_attr(docsrs, feature(doc_cfg))]

pub mod base64;
pub mod body;
pub mod byte_stream;
Loading