Unverified Commit 7541fe7f authored by Erlend Langseth's avatar Erlend Langseth Committed by GitHub
Browse files

Stream implementation (wrapper) for PaginationStream (#3299)

## Motivation and Context
https://github.com/awslabs/aws-sdk-rust/discussions/995



## Description
I tried to implement futures::Stream for a wrapper struct around
`PaginationStream`. I am unsure if I did it in the best way. After
fighting with the borrow checker for a while I decided to try
`Arc<Mutex<_>>` - is this the way to go or does there exist a better
way?
Even then, does the code look correct? I used it in my project and my
integration tests do pass but I am not 100% that these tests will catch
any error in paginated ListObjectsV2.

I would appreciate any feedback so far.

## Testing
In progress while waiting for feedback on code

## Checklist
- [x] I have updated `CHANGELOG.next.toml` if I made changes to the
smithy-rs codegen or runtime crates


_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._

---------

Co-authored-by: default avatarRussell Cohen <rcoh@amazon.com>
Co-authored-by: default avatarRussell Cohen <russell.r.cohen@gmail.com>
parent 3ea59928
Loading
Loading
Loading
Loading
+28 −0
Original line number Diff line number Diff line
@@ -23,6 +23,34 @@ references = ["smithy-rs#3300", "aws-sdk-rust#977"]
meta = { "breaking" = false, "tada" = true, "bug" = false }
author = "rcoh"

[[smithy-rs]]
message = """ Add `PaginationStreamExt` extension trait to `aws-smithy-types-convert` behind the `convert-streams` feature. This makes it possible to treat a paginator as a [`futures_core::Stream`](https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html), allowing customers to use stream combinators like [`map`](https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.map) and [`filter`](https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.filter).

Example:

```rust
use aws_smithy_types_convert::stream::PaginationStreamExt
let stream = s3_client.list_objects_v2().bucket("...").into_paginator().send().into_stream_03x();
```
"""
references = ["smithy-rs#3299"]
meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "client"}
author = "Ploppz"

[[aws-sdk-rust]]
message = """ Add `PaginationStreamExt` extension trait to `aws-smithy-types-convert` behind the `convert-streams` feature. This makes it possible to treat a paginator as a [`futures_core::Stream`](https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html), allowing customers to use stream combinators like [`map`](https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.map) and [`filter`](https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.filter).

Example:

```rust
use aws_smithy_types_convert::stream::PaginationStreamExt
let stream = s3_client.list_objects_v2().bucket("...").into_paginator().send().into_stream_03x();
```
"""
references = ["smithy-rs#3299"]
meta = { "breaking" = false, "tada" = false, "bug" = false }
author = "Ploppz"

[[smithy-rs]]
message = "Serialize 0/false in query parameters, and ignore actual default value during serialization instead of just 0/false. See [changelog discussion](https://github.com/smithy-lang/smithy-rs/discussions/3312) for details."
references = ["smithy-rs#3252", "smithy-rs#3312"]
+7 −0
Original line number Diff line number Diff line
@@ -8,6 +8,8 @@
use crate::future::pagination_stream::collect::sealed::Collectable;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

pub mod collect;
pub mod fn_stream;
use fn_stream::FnStream;
@@ -60,6 +62,11 @@ impl<Item> PaginationStream<Item> {
        self.0.next().await
    }

    /// Poll an item from the stream
    pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Item>> {
        Pin::new(&mut self.0).poll_next(cx)
    }

    /// Consumes this stream and gathers elements into a collection.
    pub async fn collect<T: Collectable<Item>>(self) -> T {
        self.0.collect().await
+3 −0
Original line number Diff line number Diff line
@@ -10,11 +10,14 @@ repository = "https://github.com/smithy-lang/smithy-rs"
[features]
convert-chrono = ["aws-smithy-types", "chrono"]
convert-time = ["aws-smithy-types", "time"]
convert-streams = ["aws-smithy-async", "futures-core"]

[dependencies]
aws-smithy-types = { path = "../aws-smithy-types", optional = true }
aws-smithy-async = {path = "../aws-smithy-async", optional = true}
chrono = { version = "0.4.26", optional = true, default-features = false, features = ["std"] }
time = { version = "0.3.4", optional = true }
futures-core = { version = "0.3.0", optional = true }

[package.metadata.docs.rs]
all-features = true
+2 −0
Original line number Diff line number Diff line
@@ -4,4 +4,6 @@ allowed_external_types = [
    "chrono::offset::fixed::FixedOffset",
    "chrono::offset::utc::Utc",
    "time::offset_date_time::OffsetDateTime",
    "aws_smithy_async::future::pagination_stream::PaginationStream",
    "futures_core::stream::Stream",
]
+3 −0
Original line number Diff line number Diff line
@@ -18,3 +18,6 @@

#[cfg(any(feature = "convert-time", feature = "convert-chrono"))]
pub mod date_time;

#[cfg(feature = "convert-streams")]
pub mod stream;
Loading