From 3073a0a2d19c57755f2ab43abca65048d8d0ee76 Mon Sep 17 00:00:00 2001 From: Tanguy Le Barzic Date: Thu, 21 Apr 2022 16:37:17 +0200 Subject: [PATCH] Allow to specify a read buffer initial capacity when creating ByteStream from a file (#1238) * Allow to specify a read buffer initial capacity when creating ByteStream from a file The behaviour of the existing ByteStream::from_file / ByteStream::from_path is unchanged (using a default buffer capacity of 4k, which corresponds to Tokio's ReaderStream default buffer capacity). Using higher buffer sizes can result in a large reduction in CPU during S3 uploads, at the cost of memory increase. * Rename len to file_size This makes the distinction with the buffer size clearer * Use a builder to specify advanced options to create a ByteStream * Improved comments * Specify the unit to use for PathBodyBuilder.with_file_size * Improved comments following review * Rename PathBodyBuilder to FsBuilder * Renaming in FsBuilder - renames `with_buffer_size` to `buffer_size` - renames `with_file_size` to `file_size` - renames `byte_stream` to `build` * Make PathBody private * Updated API for FsBuilder * Document panic behavior of ByteStream::build * Document ByteStream::read_from * Move ByteStream::read_from No functional change * Update rust-runtime/aws-smithy-http/src/byte_stream.rs a -> an Co-authored-by: Russell Cohen Co-authored-by: Russell Cohen --- .../aws-smithy-http/src/byte_stream.rs | 128 ++++++++++--- .../src/byte_stream/bytestream_util.rs | 169 ++++++++++++++++-- 2 files changed, 264 insertions(+), 33 deletions(-) diff --git a/rust-runtime/aws-smithy-http/src/byte_stream.rs b/rust-runtime/aws-smithy-http/src/byte_stream.rs index d92937ba1..446c4731c 100644 --- a/rust-runtime/aws-smithy-http/src/byte_stream.rs +++ b/rust-runtime/aws-smithy-http/src/byte_stream.rs @@ -95,6 +95,30 @@ //! } //! # } //! ``` +//! +//! If you want more control over how the file is read, such as specifying the size of the buffer used to read the file +//! or the length of the file, use an [`FsBuilder`](crate::byte_stream::FsBuilder). +//! +//! ```no_run +//! # #[cfg(feature = "rt-tokio")] +//! # { +//! use aws_smithy_http::byte_stream::ByteStream; +//! use std::path::Path; +//! struct GetObjectInput { +//! body: ByteStream +//! } +//! +//! async fn bytestream_from_file() -> GetObjectInput { +//! let bytestream = ByteStream::read_from().path("docs/some-large-file.csv") +//! .buffer_size(32_784) +//! .file_size(123_456) +//! .build() +//! .await +//! .expect("valid path"); +//! GetObjectInput { body: bytestream } +//! } +//! # } +//! ``` use crate::body::SdkBody; use bytes::Buf; @@ -111,6 +135,9 @@ use std::task::{Context, Poll}; #[cfg(feature = "rt-tokio")] mod bytestream_util; +#[cfg(feature = "rt-tokio")] +pub use self::bytestream_util::FsBuilder; + /// Stream of binary data /// /// `ByteStream` wraps a stream of binary data for ease of use. @@ -239,6 +266,33 @@ impl ByteStream { self.0.collect().await.map_err(|err| Error(err)) } + /// Returns a [`FsBuilder`](crate::byte_stream::FsBuilder), allowing you to build a `ByteStream` with + /// full control over how the file is read (eg. specifying the length of the file or the size of the buffer used to read the file). + /// ```no_run + /// # #[cfg(feature = "rt-tokio")] + /// # { + /// use aws_smithy_http::byte_stream::ByteStream; + /// + /// async fn bytestream_from_file() -> ByteStream { + /// let bytestream = ByteStream::read_from() + /// .path("docs/some-large-file.csv") + /// // Specify the size of the buffer used to read the file (in bytes, default is 4096) + /// .buffer_size(32_784) + /// // Specify the length of the file used (skips an additional call to retrieve the size) + /// .file_size(123_456) + /// .build() + /// .await + /// .expect("valid path"); + /// bytestream + /// } + /// # } + /// ``` + #[cfg(feature = "rt-tokio")] + #[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))] + pub fn read_from() -> FsBuilder { + FsBuilder::new() + } + /// Create a ByteStream that streams data from the filesystem /// /// This function creates a retryable ByteStream for a given `path`. The returned ByteStream @@ -251,6 +305,10 @@ impl ByteStream { /// /// Furthermore, a partial write MAY seek in the file and resume from the previous location. /// + /// Note: If you want more control, such as specifying the size of the buffer used to read the file + /// or the length of the file, use a [`FsBuilder`](crate::byte_stream::FsBuilder) as returned + /// from `ByteStream::read_from` + /// /// # Examples /// ```no_run /// use aws_smithy_http::byte_stream::ByteStream; @@ -262,36 +320,21 @@ impl ByteStream { #[cfg(feature = "rt-tokio")] #[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))] pub async fn from_path(path: impl AsRef) -> Result { - let path = path.as_ref(); - let path_buf = path.to_path_buf(); - let sz = tokio::fs::metadata(path) - .await - .map_err(|err| Error(err.into()))? - .len(); - let body_loader = move || { - SdkBody::from_dyn(http_body::combinators::BoxBody::new( - bytestream_util::PathBody::from_path(path_buf.as_path(), sz), - )) - }; - Ok(ByteStream::new(SdkBody::retryable(body_loader))) + FsBuilder::new().path(path).build().await } /// Create a ByteStream from a file /// /// NOTE: This will NOT result in a retryable ByteStream. For a ByteStream that can be retried in the case of /// upstream failures, use [`ByteStream::from_path`](ByteStream::from_path) + #[deprecated( + since = "0.40.0", + note = "Prefer the more extensible ByteStream::read_from() API" + )] #[cfg(feature = "rt-tokio")] #[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))] pub async fn from_file(file: tokio::fs::File) -> Result { - let sz = file - .metadata() - .await - .map_err(|err| Error(err.into()))? - .len(); - let body = SdkBody::from_dyn(http_body::combinators::BoxBody::new( - bytestream_util::PathBody::from_file(file, sz), - )); - Ok(ByteStream::new(body)) + FsBuilder::new().file(file).build().await } } @@ -535,4 +578,47 @@ mod tests { Ok(()) } + + #[cfg(feature = "rt-tokio")] + #[tokio::test] + async fn path_based_bytestreams_with_builder() -> Result<(), Box> { + use super::ByteStream; + use bytes::Buf; + use http_body::Body; + use std::io::Write; + use tempfile::NamedTempFile; + let mut file = NamedTempFile::new()?; + + for i in 0..10000 { + writeln!(file, "Brian was here. Briefly. {}", i)?; + } + let body = ByteStream::read_from() + .path(&file) + .buffer_size(16384) + // This isn't the right file length - one shouldn't do this in real code + .file_size(200) + .build() + .await? + .into_inner(); + + // assert that the file length specified size is used as size hint + assert_eq!(body.size_hint().exact(), Some(200)); + + let mut body1 = body.try_clone().expect("retryable bodies are cloneable"); + // read a little bit from one of the clones + let some_data = body1 + .data() + .await + .expect("should have some data") + .expect("read should not fail"); + // The size of one read should be equal to that of the buffer size + assert_eq!(some_data.len(), 16384); + + assert_eq!( + ByteStream::new(body1).collect().await?.remaining(), + 298890 - some_data.len() + ); + + Ok(()) + } } diff --git a/rust-runtime/aws-smithy-http/src/byte_stream/bytestream_util.rs b/rust-runtime/aws-smithy-http/src/byte_stream/bytestream_util.rs index a1b304d94..a087ab928 100644 --- a/rust-runtime/aws-smithy-http/src/byte_stream/bytestream_util.rs +++ b/rust-runtime/aws-smithy-http/src/byte_stream/bytestream_util.rs @@ -8,35 +8,179 @@ use futures_core::{ready, Stream}; use http::HeaderMap; use http_body::{Body, SizeHint}; use std::future::Future; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::fs::File; use tokio::io; use tokio_util::io::ReaderStream; +use crate::body::SdkBody; + +use super::{ByteStream, Error}; + +// 4KB corresponds to the default buffer size used by Tokio's ReaderStream +const DEFAULT_BUFFER_SIZE: usize = 4096; + /// An HTTP Body designed to wrap files /// /// PathBody is a three-phase HTTP body designed to wrap files with three specific features: /// 1. The underlying file is wrapped with StreamReader to implement HTTP body /// 2. It can be constructed directly from a path so it's easy to use during retries /// 3. Provide size hint -pub struct PathBody { +struct PathBody { state: State, - len: u64, + file_size: u64, + buffer_size: usize, } impl PathBody { - pub fn from_path(path: &Path, len: u64) -> Self { + fn from_path(path_buf: PathBuf, file_size: u64, buffer_size: usize) -> Self { PathBody { - state: State::Unloaded(path.to_path_buf()), - len, + state: State::Unloaded(path_buf), + file_size, + buffer_size, } } - pub fn from_file(file: File, len: u64) -> Self { + fn from_file(file: File, file_size: u64, buffer_size: usize) -> Self { PathBody { - state: State::Loaded(ReaderStream::new(file)), - len, + state: State::Loaded(ReaderStream::with_capacity(file, buffer_size)), + file_size, + buffer_size, + } + } +} + +/// Builder for creating [`ByteStreams`](crate::byte_stream::ByteStream) from a file/path, with full control over advanced options. +/// +/// Example usage: +/// ```no_run +/// # #[cfg(feature = "rt-tokio")] +/// # { +/// use aws_smithy_http::byte_stream::ByteStream; +/// use std::path::Path; +/// struct GetObjectInput { +/// body: ByteStream +/// } +/// +/// async fn bytestream_from_file() -> GetObjectInput { +/// let bytestream = ByteStream::read_from() +/// .path("docs/some-large-file.csv") +/// // Specify the size of the buffer used to read the file (in bytes, default is 4096) +/// .buffer_size(32_784) +/// // Specify the length of the file used (skips an additional call to retrieve the size) +/// .file_size(123_456) +/// .build() +/// .await +/// .expect("valid path"); +/// GetObjectInput { body: bytestream } +/// } +/// # } +/// ``` +pub struct FsBuilder { + file: Option, + path: Option, + file_size: Option, + buffer_size: usize, +} + +impl Default for FsBuilder { + fn default() -> Self { + Self::new() + } +} + +impl FsBuilder { + /// Create a new [`FsBuilder`] (using a default read buffer of 4096 bytes). + /// + /// You must then call either [`file`](FsBuilder::file) or [`path`](FsBuilder::path) to specify what to read from. + pub fn new() -> Self { + FsBuilder { + file: None, + path: None, + file_size: None, + buffer_size: DEFAULT_BUFFER_SIZE, + } + } + + /// Sets the path to read from. + /// + /// NOTE: The resulting ByteStream (after calling [build](FsBuilder::build)) will be retryable. + /// The returned ByteStream will provide a size hint when used as an HTTP body. + /// If the request fails, the read will begin again by reloading the file handle. + pub fn path(mut self, path: impl AsRef) -> Self { + self.path = Some(path.as_ref().to_path_buf()); + self + } + + /// Sets the file to read from. + /// + /// NOTE: The resulting ByteStream (after calling [build](FsBuilder::build)) will not be a retryable ByteStream. + /// For a ByteStream that can be retried in the case of upstream failures, use [`FsBuilder::path`](FsBuilder::path). + pub fn file(mut self, file: tokio::fs::File) -> Self { + self.file = Some(file); + self + } + + /// Specify the length of the file to read (in bytes). + /// + /// By pre-specifying the length of the file, this API skips an additional call to retrieve the size from file-system metadata. + pub fn file_size(mut self, file_size: u64) -> Self { + self.file_size = Some(file_size); + self + } + + /// Specify the size of the buffer used to read the file (in bytes). + /// + /// Increasing the read buffer capacity to higher values than the default (4096 bytes) can result in a large reduction + /// in CPU usage, at the cost of memory increase. + pub fn buffer_size(mut self, buffer_size: usize) -> Self { + self.buffer_size = buffer_size; + self + } + + /// Returns a [`ByteStream`](crate::byte_stream::ByteStream) from this builder. + /// NOTE: If both [`file`](FsBuilder::file) and [`path`](FsBuilder::path) have been called for this FsBuilder, `build` will + /// read from the path specified by [`path`](FsBuilder::path). + /// + /// # Panics + /// + /// Panics if neither of the `file` or`path` setters were called. + pub async fn build(self) -> Result { + let buffer_size = self.buffer_size; + + if let Some(path) = self.path { + let path_buf = path.to_path_buf(); + let file_size = self.file_size.unwrap_or( + tokio::fs::metadata(path) + .await + .map_err(|err| Error(err.into()))? + .len(), + ); + + let body_loader = move || { + SdkBody::from_dyn(http_body::combinators::BoxBody::new(PathBody::from_path( + path_buf.clone(), + file_size, + buffer_size, + ))) + }; + Ok(ByteStream::new(SdkBody::retryable(body_loader))) + } else if let Some(file) = self.file { + let file_size = self.file_size.unwrap_or( + file.metadata() + .await + .map_err(|err| Error(err.into()))? + .len(), + ); + + let body = SdkBody::from_dyn(http_body::combinators::BoxBody::new( + PathBody::from_file(file, file_size, buffer_size), + )); + + Ok(ByteStream::new(body)) + } else { + panic!("FsBuilder constructed without a file or a path") } } } @@ -67,7 +211,8 @@ impl Body for PathBody { State::Loading(ref mut future) => { match ready!(Pin::new(future).poll(cx)) { Ok(file) => { - self.state = State::Loaded(ReaderStream::new(file)); + self.state = + State::Loaded(ReaderStream::with_capacity(file, self.buffer_size)); } Err(e) => return Poll::Ready(Some(Err(e.into()))), }; @@ -92,10 +237,10 @@ impl Body for PathBody { fn is_end_stream(&self) -> bool { // fast path end-stream for empty files - self.len == 0 + self.file_size == 0 } fn size_hint(&self) -> SizeHint { - SizeHint::with_exact(self.len) + SizeHint::with_exact(self.file_size) } } -- GitLab