From 7541fe7fc8d85c878ead28406f970dc8440ef81a Mon Sep 17 00:00:00 2001 From: Erlend Langseth <3rlendhl@gmail.com> Date: Tue, 2 Jan 2024 18:36:00 +0100 Subject: [PATCH] Stream implementation (wrapper) for PaginationStream (#3299) ## Motivation and Context https://github.com/awslabs/aws-sdk-rust/discussions/995 ## Description I tried to implement futures::Stream for a wrapper struct around `PaginationStream`. I am unsure if I did it in the best way. After fighting with the borrow checker for a while I decided to try `Arc>` - is this the way to go or does there exist a better way? Even then, does the code look correct? I used it in my project and my integration tests do pass but I am not 100% that these tests will catch any error in paginated ListObjectsV2. I would appreciate any feedback so far. ## Testing In progress while waiting for feedback on code ## Checklist - [x] I have updated `CHANGELOG.next.toml` if I made changes to the smithy-rs codegen or runtime crates _By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice._ --------- Co-authored-by: Russell Cohen Co-authored-by: Russell Cohen --- CHANGELOG.next.toml | 28 +++++++++ .../src/future/pagination_stream.rs | 7 +++ .../aws-smithy-types-convert/Cargo.toml | 3 + .../external-types.toml | 2 + .../aws-smithy-types-convert/src/lib.rs | 3 + .../aws-smithy-types-convert/src/stream.rs | 62 +++++++++++++++++++ 6 files changed, 105 insertions(+) create mode 100644 rust-runtime/aws-smithy-types-convert/src/stream.rs diff --git a/CHANGELOG.next.toml b/CHANGELOG.next.toml index b0d8f9316..8a50fded0 100644 --- a/CHANGELOG.next.toml +++ b/CHANGELOG.next.toml @@ -23,6 +23,34 @@ references = ["smithy-rs#3300", "aws-sdk-rust#977"] meta = { "breaking" = false, "tada" = true, "bug" = false } author = "rcoh" +[[smithy-rs]] +message = """ Add `PaginationStreamExt` extension trait to `aws-smithy-types-convert` behind the `convert-streams` feature. This makes it possible to treat a paginator as a [`futures_core::Stream`](https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html), allowing customers to use stream combinators like [`map`](https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.map) and [`filter`](https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.filter). + +Example: + +```rust +use aws_smithy_types_convert::stream::PaginationStreamExt +let stream = s3_client.list_objects_v2().bucket("...").into_paginator().send().into_stream_03x(); +``` +""" +references = ["smithy-rs#3299"] +meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "client"} +author = "Ploppz" + +[[aws-sdk-rust]] +message = """ Add `PaginationStreamExt` extension trait to `aws-smithy-types-convert` behind the `convert-streams` feature. This makes it possible to treat a paginator as a [`futures_core::Stream`](https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html), allowing customers to use stream combinators like [`map`](https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.map) and [`filter`](https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.filter). + +Example: + +```rust +use aws_smithy_types_convert::stream::PaginationStreamExt +let stream = s3_client.list_objects_v2().bucket("...").into_paginator().send().into_stream_03x(); +``` +""" +references = ["smithy-rs#3299"] +meta = { "breaking" = false, "tada" = false, "bug" = false } +author = "Ploppz" + [[smithy-rs]] message = "Serialize 0/false in query parameters, and ignore actual default value during serialization instead of just 0/false. See [changelog discussion](https://github.com/smithy-lang/smithy-rs/discussions/3312) for details." references = ["smithy-rs#3252", "smithy-rs#3312"] diff --git a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs index a5454108a..d8a0a8999 100644 --- a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs +++ b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs @@ -8,6 +8,8 @@ use crate::future::pagination_stream::collect::sealed::Collectable; use std::future::Future; use std::pin::Pin; +use std::task::{Context, Poll}; + pub mod collect; pub mod fn_stream; use fn_stream::FnStream; @@ -60,6 +62,11 @@ impl PaginationStream { self.0.next().await } + /// Poll an item from the stream + pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0).poll_next(cx) + } + /// Consumes this stream and gathers elements into a collection. pub async fn collect>(self) -> T { self.0.collect().await diff --git a/rust-runtime/aws-smithy-types-convert/Cargo.toml b/rust-runtime/aws-smithy-types-convert/Cargo.toml index 306747b62..3b9888999 100644 --- a/rust-runtime/aws-smithy-types-convert/Cargo.toml +++ b/rust-runtime/aws-smithy-types-convert/Cargo.toml @@ -10,11 +10,14 @@ repository = "https://github.com/smithy-lang/smithy-rs" [features] convert-chrono = ["aws-smithy-types", "chrono"] convert-time = ["aws-smithy-types", "time"] +convert-streams = ["aws-smithy-async", "futures-core"] [dependencies] aws-smithy-types = { path = "../aws-smithy-types", optional = true } +aws-smithy-async = {path = "../aws-smithy-async", optional = true} chrono = { version = "0.4.26", optional = true, default-features = false, features = ["std"] } time = { version = "0.3.4", optional = true } +futures-core = { version = "0.3.0", optional = true } [package.metadata.docs.rs] all-features = true diff --git a/rust-runtime/aws-smithy-types-convert/external-types.toml b/rust-runtime/aws-smithy-types-convert/external-types.toml index aca8b6962..ab9b0fe6a 100644 --- a/rust-runtime/aws-smithy-types-convert/external-types.toml +++ b/rust-runtime/aws-smithy-types-convert/external-types.toml @@ -4,4 +4,6 @@ allowed_external_types = [ "chrono::offset::fixed::FixedOffset", "chrono::offset::utc::Utc", "time::offset_date_time::OffsetDateTime", + "aws_smithy_async::future::pagination_stream::PaginationStream", + "futures_core::stream::Stream", ] diff --git a/rust-runtime/aws-smithy-types-convert/src/lib.rs b/rust-runtime/aws-smithy-types-convert/src/lib.rs index bd7ae8a78..1283c0f53 100644 --- a/rust-runtime/aws-smithy-types-convert/src/lib.rs +++ b/rust-runtime/aws-smithy-types-convert/src/lib.rs @@ -18,3 +18,6 @@ #[cfg(any(feature = "convert-time", feature = "convert-chrono"))] pub mod date_time; + +#[cfg(feature = "convert-streams")] +pub mod stream; diff --git a/rust-runtime/aws-smithy-types-convert/src/stream.rs b/rust-runtime/aws-smithy-types-convert/src/stream.rs new file mode 100644 index 000000000..3dbf5a789 --- /dev/null +++ b/rust-runtime/aws-smithy-types-convert/src/stream.rs @@ -0,0 +1,62 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +//! Conversions from Stream-like structs to implementors of `futures::Stream` + +use futures_core::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use aws_smithy_async::future::pagination_stream::PaginationStream; + +/// Stream implementor wrapping `PaginationStream` +pub struct PaginationStreamImplStream { + pagination_stream: PaginationStream, +} + +impl PaginationStreamImplStream { + /// Create a new Stream object wrapping a `PaginationStream` + pub fn new(pagination_stream: PaginationStream) -> Self { + PaginationStreamImplStream { pagination_stream } + } +} + +impl Stream for PaginationStreamImplStream { + type Item = Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.pagination_stream.poll_next(cx) + } +} + +/// Trait to convert PaginationStream into implementor of `Stream` +pub trait PaginationStreamExt { + /// Convert PaginationStream into implementor of `Stream` + /// + /// # Example + /// ```no_run + /// # use aws_smithy_async::future::pagination_stream::PaginationStream; + /// use aws_smithy_types_convert::stream::PaginationStreamExt; + /// // Assuming you have obtained a pagination stream, by something like: + /// // ``` + /// // let pagination_stream = s3_client + /// // .list_objects_v2() + /// // .bucket(bucket) + /// // .into_paginator() + /// // .send(); + /// // ``` + /// # let pagination_stream: PaginationStream = unimplemented!(); + /// let futures_stream = pagination_stream.into_stream_03x(); + /// ``` + fn into_stream_03x(self) -> PaginationStreamImplStream; +} + +impl PaginationStreamExt for PaginationStream { + fn into_stream_03x(self) -> PaginationStreamImplStream { + PaginationStreamImplStream { + pagination_stream: self, + } + } +} -- GitLab