Unverified Commit 3073a0a2 authored by Tanguy Le Barzic's avatar Tanguy Le Barzic Committed by GitHub
Browse files

Allow to specify a read buffer initial capacity when creating ByteStream from a file (#1238)



* Allow to specify a read buffer initial capacity when creating ByteStream from a file

The behaviour of the existing ByteStream::from_file / ByteStream::from_path is unchanged (using a default buffer capacity of 4k, which corresponds to Tokio's ReaderStream default buffer capacity). Using higher buffer sizes can result in a large reduction in CPU during S3 uploads, at the cost of memory increase.

* Rename len to file_size

This makes the distinction with the buffer size clearer

* Use a builder to specify advanced options to create a ByteStream

* Improved comments

* Specify the unit to use for PathBodyBuilder.with_file_size

* Improved comments following review

* Rename PathBodyBuilder to FsBuilder

* Renaming in FsBuilder

- renames `with_buffer_size` to `buffer_size`
- renames `with_file_size` to `file_size`
- renames `byte_stream` to `build`

* Make PathBody private

* Updated API for FsBuilder

* Document panic behavior of ByteStream::build

* Document ByteStream::read_from

* Move ByteStream::read_from

No functional change

* Update rust-runtime/aws-smithy-http/src/byte_stream.rs

a -> an

Co-authored-by: default avatarRussell Cohen <russell.r.cohen@gmail.com>
Co-authored-by: default avatarRussell Cohen <rcoh@amazon.com>
parent 125328da
Loading
Loading
Loading
Loading
+107 −21
Original line number Diff line number Diff line
@@ -95,6 +95,30 @@
//! }
//! # }
//! ```
//!
//! If you want more control over how the file is read, such as specifying the size of the buffer used to read the file
//! or the length of the file, use an [`FsBuilder`](crate::byte_stream::FsBuilder).
//!
//! ```no_run
//! # #[cfg(feature = "rt-tokio")]
//! # {
//! use aws_smithy_http::byte_stream::ByteStream;
//! use std::path::Path;
//! struct GetObjectInput {
//!     body: ByteStream
//! }
//!
//! async fn bytestream_from_file() -> GetObjectInput {
//!     let bytestream = ByteStream::read_from().path("docs/some-large-file.csv")
//!         .buffer_size(32_784)
//!         .file_size(123_456)
//!         .build()
//!         .await
//!         .expect("valid path");
//!     GetObjectInput { body: bytestream }
//! }
//! # }
//! ```

use crate::body::SdkBody;
use bytes::Buf;
@@ -111,6 +135,9 @@ use std::task::{Context, Poll};
#[cfg(feature = "rt-tokio")]
mod bytestream_util;

#[cfg(feature = "rt-tokio")]
pub use self::bytestream_util::FsBuilder;

/// Stream of binary data
///
/// `ByteStream` wraps a stream of binary data for ease of use.
@@ -239,6 +266,33 @@ impl ByteStream {
        self.0.collect().await.map_err(|err| Error(err))
    }

    /// Returns a [`FsBuilder`](crate::byte_stream::FsBuilder), allowing you to build a `ByteStream` with
    /// full control over how the file is read (eg. specifying the length of the file or the size of the buffer used to read the file).
    /// ```no_run
    /// # #[cfg(feature = "rt-tokio")]
    /// # {
    /// use aws_smithy_http::byte_stream::ByteStream;
    ///
    /// async fn bytestream_from_file() -> ByteStream {
    ///     let bytestream = ByteStream::read_from()
    ///         .path("docs/some-large-file.csv")
    ///         // Specify the size of the buffer used to read the file (in bytes, default is 4096)
    ///         .buffer_size(32_784)
    ///         // Specify the length of the file used (skips an additional call to retrieve the size)
    ///         .file_size(123_456)
    ///         .build()
    ///         .await
    ///         .expect("valid path");
    ///     bytestream
    /// }
    /// # }
    /// ```
    #[cfg(feature = "rt-tokio")]
    #[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
    pub fn read_from() -> FsBuilder {
        FsBuilder::new()
    }

    /// Create a ByteStream that streams data from the filesystem
    ///
    /// This function creates a retryable ByteStream for a given `path`. The returned ByteStream
@@ -251,6 +305,10 @@ impl ByteStream {
    ///
    /// Furthermore, a partial write MAY seek in the file and resume from the previous location.
    ///
    /// Note: If you want more control, such as specifying the size of the buffer used to read the file
    /// or the length of the file, use a [`FsBuilder`](crate::byte_stream::FsBuilder) as returned
    /// from `ByteStream::read_from`
    ///
    /// # Examples
    /// ```no_run
    /// use aws_smithy_http::byte_stream::ByteStream;
@@ -262,36 +320,21 @@ impl ByteStream {
    #[cfg(feature = "rt-tokio")]
    #[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
    pub async fn from_path(path: impl AsRef<std::path::Path>) -> Result<Self, Error> {
        let path = path.as_ref();
        let path_buf = path.to_path_buf();
        let sz = tokio::fs::metadata(path)
            .await
            .map_err(|err| Error(err.into()))?
            .len();
        let body_loader = move || {
            SdkBody::from_dyn(http_body::combinators::BoxBody::new(
                bytestream_util::PathBody::from_path(path_buf.as_path(), sz),
            ))
        };
        Ok(ByteStream::new(SdkBody::retryable(body_loader)))
        FsBuilder::new().path(path).build().await
    }

    /// Create a ByteStream from a file
    ///
    /// NOTE: This will NOT result in a retryable ByteStream. For a ByteStream that can be retried in the case of
    /// upstream failures, use [`ByteStream::from_path`](ByteStream::from_path)
    #[deprecated(
        since = "0.40.0",
        note = "Prefer the more extensible ByteStream::read_from() API"
    )]
    #[cfg(feature = "rt-tokio")]
    #[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
    pub async fn from_file(file: tokio::fs::File) -> Result<Self, Error> {
        let sz = file
            .metadata()
            .await
            .map_err(|err| Error(err.into()))?
            .len();
        let body = SdkBody::from_dyn(http_body::combinators::BoxBody::new(
            bytestream_util::PathBody::from_file(file, sz),
        ));
        Ok(ByteStream::new(body))
        FsBuilder::new().file(file).build().await
    }
}

@@ -535,4 +578,47 @@ mod tests {

        Ok(())
    }

    #[cfg(feature = "rt-tokio")]
    #[tokio::test]
    async fn path_based_bytestreams_with_builder() -> Result<(), Box<dyn std::error::Error>> {
        use super::ByteStream;
        use bytes::Buf;
        use http_body::Body;
        use std::io::Write;
        use tempfile::NamedTempFile;
        let mut file = NamedTempFile::new()?;

        for i in 0..10000 {
            writeln!(file, "Brian was here. Briefly. {}", i)?;
        }
        let body = ByteStream::read_from()
            .path(&file)
            .buffer_size(16384)
            // This isn't the right file length - one shouldn't do this in real code
            .file_size(200)
            .build()
            .await?
            .into_inner();

        // assert that the file length specified size is used as size hint
        assert_eq!(body.size_hint().exact(), Some(200));

        let mut body1 = body.try_clone().expect("retryable bodies are cloneable");
        // read a little bit from one of the clones
        let some_data = body1
            .data()
            .await
            .expect("should have some data")
            .expect("read should not fail");
        // The size of one read should be equal to that of the buffer size
        assert_eq!(some_data.len(), 16384);

        assert_eq!(
            ByteStream::new(body1).collect().await?.remaining(),
            298890 - some_data.len()
        );

        Ok(())
    }
}
+157 −12
Original line number Diff line number Diff line
@@ -8,35 +8,179 @@ use futures_core::{ready, Stream};
use http::HeaderMap;
use http_body::{Body, SizeHint};
use std::future::Future;
use std::path::{Path, PathBuf};
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::fs::File;
use tokio::io;
use tokio_util::io::ReaderStream;

use crate::body::SdkBody;

use super::{ByteStream, Error};

// 4KB corresponds to the default buffer size used by Tokio's ReaderStream
const DEFAULT_BUFFER_SIZE: usize = 4096;

/// An HTTP Body designed to wrap files
///
/// PathBody is a three-phase HTTP body designed to wrap files with three specific features:
/// 1. The underlying file is wrapped with StreamReader to implement HTTP body
/// 2. It can be constructed directly from a path so it's easy to use during retries
/// 3. Provide size hint
pub struct PathBody {
struct PathBody {
    state: State,
    len: u64,
    file_size: u64,
    buffer_size: usize,
}

impl PathBody {
    pub fn from_path(path: &Path, len: u64) -> Self {
    fn from_path(path_buf: PathBuf, file_size: u64, buffer_size: usize) -> Self {
        PathBody {
            state: State::Unloaded(path.to_path_buf()),
            len,
            state: State::Unloaded(path_buf),
            file_size,
            buffer_size,
        }
    }
    pub fn from_file(file: File, len: u64) -> Self {
    fn from_file(file: File, file_size: u64, buffer_size: usize) -> Self {
        PathBody {
            state: State::Loaded(ReaderStream::new(file)),
            len,
            state: State::Loaded(ReaderStream::with_capacity(file, buffer_size)),
            file_size,
            buffer_size,
        }
    }
}

/// Builder for creating [`ByteStreams`](crate::byte_stream::ByteStream) from a file/path, with full control over advanced options.
///
/// Example usage:
/// ```no_run
/// # #[cfg(feature = "rt-tokio")]
/// # {
/// use aws_smithy_http::byte_stream::ByteStream;
/// use std::path::Path;
/// struct GetObjectInput {
///     body: ByteStream
/// }
///
/// async fn bytestream_from_file() -> GetObjectInput {
///     let bytestream = ByteStream::read_from()
///         .path("docs/some-large-file.csv")
///         // Specify the size of the buffer used to read the file (in bytes, default is 4096)
///         .buffer_size(32_784)
///         // Specify the length of the file used (skips an additional call to retrieve the size)
///         .file_size(123_456)
///         .build()
///         .await
///         .expect("valid path");
///     GetObjectInput { body: bytestream }
/// }
/// # }
/// ```
pub struct FsBuilder {
    file: Option<tokio::fs::File>,
    path: Option<PathBuf>,
    file_size: Option<u64>,
    buffer_size: usize,
}

impl Default for FsBuilder {
    fn default() -> Self {
        Self::new()
    }
}

impl FsBuilder {
    /// Create a new [`FsBuilder`] (using a default read buffer of 4096 bytes).
    ///
    /// You must then call either [`file`](FsBuilder::file) or [`path`](FsBuilder::path) to specify what to read from.
    pub fn new() -> Self {
        FsBuilder {
            file: None,
            path: None,
            file_size: None,
            buffer_size: DEFAULT_BUFFER_SIZE,
        }
    }

    /// Sets the path to read from.
    ///
    /// NOTE: The resulting ByteStream (after calling [build](FsBuilder::build)) will be retryable.
    /// The returned ByteStream will provide a size hint when used as an HTTP body.
    /// If the request fails, the read will begin again by reloading the file handle.
    pub fn path(mut self, path: impl AsRef<std::path::Path>) -> Self {
        self.path = Some(path.as_ref().to_path_buf());
        self
    }

    /// Sets the file to read from.
    ///
    /// NOTE: The resulting ByteStream (after calling [build](FsBuilder::build)) will not be a retryable ByteStream.
    /// For a ByteStream that can be retried in the case of upstream failures, use [`FsBuilder::path`](FsBuilder::path).
    pub fn file(mut self, file: tokio::fs::File) -> Self {
        self.file = Some(file);
        self
    }

    /// Specify the length of the file to read (in bytes).
    ///
    /// By pre-specifying the length of the file, this API skips an additional call to retrieve the size from file-system metadata.
    pub fn file_size(mut self, file_size: u64) -> Self {
        self.file_size = Some(file_size);
        self
    }

    /// Specify the size of the buffer used to read the file (in bytes).
    ///
    /// Increasing the read buffer capacity to higher values than the default (4096 bytes) can result in a large reduction
    /// in CPU usage, at the cost of memory increase.
    pub fn buffer_size(mut self, buffer_size: usize) -> Self {
        self.buffer_size = buffer_size;
        self
    }

    /// Returns a [`ByteStream`](crate::byte_stream::ByteStream) from this builder.
    /// NOTE: If both [`file`](FsBuilder::file) and [`path`](FsBuilder::path) have been called for this FsBuilder, `build` will
    /// read from the path specified by [`path`](FsBuilder::path).
    ///
    /// # Panics
    ///
    /// Panics if neither of the `file` or`path` setters were called.
    pub async fn build(self) -> Result<ByteStream, Error> {
        let buffer_size = self.buffer_size;

        if let Some(path) = self.path {
            let path_buf = path.to_path_buf();
            let file_size = self.file_size.unwrap_or(
                tokio::fs::metadata(path)
                    .await
                    .map_err(|err| Error(err.into()))?
                    .len(),
            );

            let body_loader = move || {
                SdkBody::from_dyn(http_body::combinators::BoxBody::new(PathBody::from_path(
                    path_buf.clone(),
                    file_size,
                    buffer_size,
                )))
            };
            Ok(ByteStream::new(SdkBody::retryable(body_loader)))
        } else if let Some(file) = self.file {
            let file_size = self.file_size.unwrap_or(
                file.metadata()
                    .await
                    .map_err(|err| Error(err.into()))?
                    .len(),
            );

            let body = SdkBody::from_dyn(http_body::combinators::BoxBody::new(
                PathBody::from_file(file, file_size, buffer_size),
            ));

            Ok(ByteStream::new(body))
        } else {
            panic!("FsBuilder constructed without a file or a path")
        }
    }
}
@@ -67,7 +211,8 @@ impl Body for PathBody {
                State::Loading(ref mut future) => {
                    match ready!(Pin::new(future).poll(cx)) {
                        Ok(file) => {
                            self.state = State::Loaded(ReaderStream::new(file));
                            self.state =
                                State::Loaded(ReaderStream::with_capacity(file, self.buffer_size));
                        }
                        Err(e) => return Poll::Ready(Some(Err(e.into()))),
                    };
@@ -92,10 +237,10 @@ impl Body for PathBody {

    fn is_end_stream(&self) -> bool {
        // fast path end-stream for empty files
        self.len == 0
        self.file_size == 0
    }

    fn size_hint(&self) -> SizeHint {
        SizeHint::with_exact(self.len)
        SizeHint::with_exact(self.file_size)
    }
}