Unverified Commit 75d50dd1 authored by Zelda Hessler's avatar Zelda Hessler Committed by GitHub
Browse files

feature: conversion method for `ByteStream` into `AsyncRead` implementor (#1391)

* feature: conversion method for ByteStream into AsyncRead implementor
add: impl From<byte_stream::Error> for std::io::Error
add: CHANGELOG.next.toml entries

* fix: unused dep

* add: docs
parent 5620922d
Loading
Loading
Loading
Loading
+12 −0
Original line number Diff line number Diff line
@@ -16,3 +16,15 @@ message = "Add ability to sign a request with all headers, or to change which he
references = ["smithy-rs#1381"]
meta = { "breaking" = false, "tada" = true, "bug" = false }
author = "alonlud"

 [[aws-sdk-rust]]
 message = "Add method `ByteStream::into_async_read`. This makes it easy to convert `ByteStream`s into a struct implementing `tokio:io::AsyncRead`. Available on **crate feature** `rt-tokio` only."
 references = ["smithy-rs#1390"]
 meta = { "breaking" = false, "tada" = true, "bug" = false }
 author = "Velfi"

 [[smithy-rs]]
 message = "Add method `ByteStream::into_async_read`. This makes it easy to convert `ByteStream`s into a struct implementing `tokio:io::AsyncRead`. Available on **crate feature** `rt-tokio` only."
 references = ["smithy-rs#1390"]
 meta = { "breaking" = false, "tada" = true, "bug" = false }
 author = "Velfi"
+71 −3
Original line number Diff line number Diff line
@@ -149,6 +149,7 @@ pub use self::bytestream_util::FsBuilder;
///
/// `ByteStream` provides two primary mechanisms for accessing the data:
/// 1. With `.collect()`:
///
///     [`.collect()`](crate::byte_stream::ByteStream::collect) reads the complete ByteStream into memory and stores it in `AggregatedBytes`,
///     a non-contiguous ByteBuffer.
///     ```no_run
@@ -165,7 +166,7 @@ pub use self::bytestream_util::FsBuilder;
///     ```
/// 2. Via [`impl Stream`](futures_core::Stream):
///
///     _Note: An import of `StreamExt` is required to use `try_next()`._
///     _Note: An import of `StreamExt` is required to use `.try_next()`._
///
///     For use-cases where holding the entire ByteStream in memory is unnecessary, use the
///     `Stream` implementation:
@@ -193,6 +194,29 @@ pub use self::bytestream_util::FsBuilder;
///     }
///     ```
///
/// 3. Via [`.into_async_read()`](crate::byte_stream::ByteStream::into_async_read):
///
///     _Note: The `rt-tokio` feature must be active to use `.into_async_read()`._
///
///     It's possible to convert a `ByteStream` into a struct that implements [`tokio::io::AsyncRead`](tokio::io::AsyncRead).
///     Then, you can use pre-existing tools like [`tokio::io::BufReader`](tokio::io::BufReader):
///     ```no_run
///     use aws_smithy_http::byte_stream::ByteStream;
///     use aws_smithy_http::body::SdkBody;
///     use tokio::io::{AsyncBufReadExt, BufReader};
///     #[cfg(feature = "rt-tokio")]
///     async fn example() -> std::io::Result<()> {
///        let stream = ByteStream::new(SdkBody::from("hello!\nThis is some data"));
///        // Wrap the stream in a BufReader
///        let buf_reader = BufReader::new(stream.into_async_read());
///        let mut lines = buf_reader.lines();
///        assert_eq!(lines.next_line().await?, Some("hello!".to_owned()));
///        assert_eq!(lines.next_line().await?, Some("This is some data".to_owned()));
///        assert_eq!(lines.next_line().await?, None);
///        Ok(())
///     }
///     ```
///
/// ## Getting data into a ByteStream
/// ByteStreams can be created in one of three ways:
/// 1. **From in-memory binary data**: ByteStreams created from in-memory data are always retryable. Data
@@ -347,6 +371,27 @@ impl ByteStream {
        self.0.with_body_callback(body_callback);
        self
    }

    #[cfg(feature = "rt-tokio")]
    /// Convert this `ByteStream` into a struct that implements [`AsyncRead`](tokio::io::AsyncRead).
    ///
    /// # Example
    ///
    /// ```rust
    /// use tokio::io::{BufReader, AsyncBufReadExt};
    /// use aws_smithy_http::byte_stream::ByteStream;
    ///
    /// # async fn dox(my_bytestream: ByteStream) -> std::io::Result<()> {
    /// let mut lines =  BufReader::new(my_bytestream.into_async_read()).lines();
    /// while let Some(line) = lines.next_line().await? {
    ///   // Do something line by line
    /// }
    /// # Ok(())
    /// # }
    /// ```
    pub fn into_async_read(self) -> impl tokio::io::AsyncRead {
        tokio_util::io::StreamReader::new(self)
    }
}

impl Default for ByteStream {
@@ -403,6 +448,12 @@ impl StdError for Error {
    }
}

impl From<Error> for std::io::Error {
    fn from(err: Error) -> Self {
        std::io::Error::new(std::io::ErrorKind::Other, err)
    }
}

impl futures_core::stream::Stream for ByteStream {
    type Item = Result<Bytes, Error>;

@@ -597,4 +648,21 @@ mod tests {

        Ok(())
    }

    #[cfg(feature = "rt-tokio")]
    #[tokio::test]
    async fn bytestream_into_async_read() {
        use super::ByteStream;
        use tokio::io::AsyncBufReadExt;

        let byte_stream = ByteStream::from_static(b"data 1\ndata 2\ndata 3");
        let async_buf_read = tokio::io::BufReader::new(byte_stream.into_async_read());

        let mut lines = async_buf_read.lines();

        assert_eq!(lines.next_line().await.unwrap(), Some("data 1".to_owned()));
        assert_eq!(lines.next_line().await.unwrap(), Some("data 2".to_owned()));
        assert_eq!(lines.next_line().await.unwrap(), Some("data 3".to_owned()));
        assert_eq!(lines.next_line().await.unwrap(), None);
    }
}