Loading Cargo.lock +4 −0 Original line number Diff line number Diff line Loading @@ -2839,6 +2839,10 @@ dependencies = [ "aws-credential-types", "aws-sdk-s3", "aws-sdk-sts", "bytes", "futures", "http-body 1.0.1", "http-body-util", "s3s-test", "tracing", ] Loading crates/s3s-e2e/Cargo.toml +4 −0 Original line number Diff line number Diff line Loading @@ -19,6 +19,10 @@ tracing = "0.1.41" aws-credential-types = "1.2.6" aws-sdk-s3 = "1.105.0" aws-sdk-sts = { version = "1.76.0", features = ["behavior-version-latest"] } http-body-util = "0.1.3" futures = { version = "0.3.31", default-features = false } bytes = "1.10.1" http-body = "1.0.1" [dependencies.aws-config] version = "1.8.6" Loading crates/s3s-e2e/src/basic.rs +98 −0 Original line number Diff line number Diff line Loading @@ -9,6 +9,11 @@ use s3s_test::TestSuite; use s3s_test::tcx::TestContext; use aws_sdk_s3::primitives::ByteStream; use aws_sdk_s3::primitives::SdkBody; use bytes::Bytes; use futures::StreamExt as _; use http_body_util::StreamBody; pub fn register(tcx: &mut TestContext) { case!(tcx, Basic, Essential, test_list_buckets); Loading @@ -19,6 +24,7 @@ pub fn register(tcx: &mut TestContext) { case!(tcx, Basic, Put, test_put_object_tiny); case!(tcx, Basic, Put, test_put_object_with_metadata); case!(tcx, Basic, Put, test_put_object_larger); case!(tcx, Basic, Put, test_put_object_with_checksum_algorithm); case!(tcx, Basic, Copy, test_copy_object); } Loading Loading @@ -374,6 +380,61 @@ impl Put { Ok(()) } async fn test_put_object_with_checksum_algorithm(self: Arc<Self>) -> Result { use aws_sdk_s3::types::ChecksumAlgorithm; use aws_sdk_s3::types::ChecksumMode; let s3 = &self.s3; let bucket = self.bucket.as_str(); let key = "with-checksum-trailer"; let body = { let bytes = Bytes::from_static(&[b'a'; 1024]); let stream = futures::stream::repeat_with(move || { let frame = http_body::Frame::data(bytes.clone()); Ok::<_, std::io::Error>(frame) }); let body = WithSizeHint::new(StreamBody::new(stream.take(70)), 70 * 1024); ByteStream::new(SdkBody::from_body_1_x(body)) }; let put_resp = s3 .put_object() .bucket(bucket) .key(key) .checksum_algorithm(ChecksumAlgorithm::Crc32) .body(body) .send() .await?; let put_crc32 = put_resp .checksum_crc32() .expect("PUT should return checksum when checksum_algorithm is used"); let resp = s3 .get_object() .bucket(bucket) .key(key) .checksum_mode(ChecksumMode::Enabled) .send() .await?; let get_crc32 = resp .checksum_crc32() .expect("GET should return checksum when checksum_mode is enabled and full object is returned") .to_owned(); let body = resp.body.collect().await?; let body = String::from_utf8(body.to_vec())?; assert_eq!(body, "a".repeat(70 * 1024)); assert_eq!(get_crc32, put_crc32); Ok(()) } } // Copy test fixture Loading Loading @@ -463,3 +524,40 @@ impl Copy { Ok(()) } } struct WithSizeHint<T> { inner: T, size_hint: usize, } impl<T> WithSizeHint<T> { pub fn new(inner: T, size_hint: usize) -> Self { Self { inner, size_hint } } } impl<T> http_body::Body for WithSizeHint<T> where T: http_body::Body + Unpin, { type Data = T::Data; type Error = T::Error; fn poll_frame( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Option<std::result::Result<http_body::Frame<Self::Data>, Self::Error>>> { let this = self.get_mut(); std::pin::Pin::new(&mut this.inner).poll_frame(cx) } fn is_end_stream(&self) -> bool { self.inner.is_end_stream() } fn size_hint(&self) -> http_body::SizeHint { let mut hint = self.inner.size_hint(); hint.set_exact(self.size_hint as u64); hint } } Loading
Cargo.lock +4 −0 Original line number Diff line number Diff line Loading @@ -2839,6 +2839,10 @@ dependencies = [ "aws-credential-types", "aws-sdk-s3", "aws-sdk-sts", "bytes", "futures", "http-body 1.0.1", "http-body-util", "s3s-test", "tracing", ] Loading
crates/s3s-e2e/Cargo.toml +4 −0 Original line number Diff line number Diff line Loading @@ -19,6 +19,10 @@ tracing = "0.1.41" aws-credential-types = "1.2.6" aws-sdk-s3 = "1.105.0" aws-sdk-sts = { version = "1.76.0", features = ["behavior-version-latest"] } http-body-util = "0.1.3" futures = { version = "0.3.31", default-features = false } bytes = "1.10.1" http-body = "1.0.1" [dependencies.aws-config] version = "1.8.6" Loading
crates/s3s-e2e/src/basic.rs +98 −0 Original line number Diff line number Diff line Loading @@ -9,6 +9,11 @@ use s3s_test::TestSuite; use s3s_test::tcx::TestContext; use aws_sdk_s3::primitives::ByteStream; use aws_sdk_s3::primitives::SdkBody; use bytes::Bytes; use futures::StreamExt as _; use http_body_util::StreamBody; pub fn register(tcx: &mut TestContext) { case!(tcx, Basic, Essential, test_list_buckets); Loading @@ -19,6 +24,7 @@ pub fn register(tcx: &mut TestContext) { case!(tcx, Basic, Put, test_put_object_tiny); case!(tcx, Basic, Put, test_put_object_with_metadata); case!(tcx, Basic, Put, test_put_object_larger); case!(tcx, Basic, Put, test_put_object_with_checksum_algorithm); case!(tcx, Basic, Copy, test_copy_object); } Loading Loading @@ -374,6 +380,61 @@ impl Put { Ok(()) } async fn test_put_object_with_checksum_algorithm(self: Arc<Self>) -> Result { use aws_sdk_s3::types::ChecksumAlgorithm; use aws_sdk_s3::types::ChecksumMode; let s3 = &self.s3; let bucket = self.bucket.as_str(); let key = "with-checksum-trailer"; let body = { let bytes = Bytes::from_static(&[b'a'; 1024]); let stream = futures::stream::repeat_with(move || { let frame = http_body::Frame::data(bytes.clone()); Ok::<_, std::io::Error>(frame) }); let body = WithSizeHint::new(StreamBody::new(stream.take(70)), 70 * 1024); ByteStream::new(SdkBody::from_body_1_x(body)) }; let put_resp = s3 .put_object() .bucket(bucket) .key(key) .checksum_algorithm(ChecksumAlgorithm::Crc32) .body(body) .send() .await?; let put_crc32 = put_resp .checksum_crc32() .expect("PUT should return checksum when checksum_algorithm is used"); let resp = s3 .get_object() .bucket(bucket) .key(key) .checksum_mode(ChecksumMode::Enabled) .send() .await?; let get_crc32 = resp .checksum_crc32() .expect("GET should return checksum when checksum_mode is enabled and full object is returned") .to_owned(); let body = resp.body.collect().await?; let body = String::from_utf8(body.to_vec())?; assert_eq!(body, "a".repeat(70 * 1024)); assert_eq!(get_crc32, put_crc32); Ok(()) } } // Copy test fixture Loading Loading @@ -463,3 +524,40 @@ impl Copy { Ok(()) } } struct WithSizeHint<T> { inner: T, size_hint: usize, } impl<T> WithSizeHint<T> { pub fn new(inner: T, size_hint: usize) -> Self { Self { inner, size_hint } } } impl<T> http_body::Body for WithSizeHint<T> where T: http_body::Body + Unpin, { type Data = T::Data; type Error = T::Error; fn poll_frame( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Option<std::result::Result<http_body::Frame<Self::Data>, Self::Error>>> { let this = self.get_mut(); std::pin::Pin::new(&mut this.inner).poll_frame(cx) } fn is_end_stream(&self) -> bool { self.inner.is_end_stream() } fn size_hint(&self) -> http_body::SizeHint { let mut hint = self.inner.size_hint(); hint.set_exact(self.size_hint as u64); hint } }