Unverified Commit b43e7c55 authored by Nugine's avatar Nugine
Browse files

s3s: http: fix set_event_stream_body

parent e4831a00
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -11,7 +11,8 @@ pub fn from_aws(src: AwsSelectObjectContentEventStream) -> s3s::dto::SelectObjec
    s3s::dto::SelectObjectContentEventStream::new(AsyncStream::new(|mut y| async move {
        loop {
            let recv = SyncFuture::new(src.get_mut().recv());
            match recv.await {
            let ans = recv.await;
            match ans {
                Ok(Some(ev)) => y.yield_(crate::conv::try_from_aws(ev)).await,
                Ok(None) => break,
                Err(err) => y.yield_err(wrap_sdk_error!(err)).await,
+3 −1
Original line number Diff line number Diff line
@@ -64,7 +64,9 @@ impl Stream for Wrapper {
    type Item = Result<Bytes, StdError>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match ready!(Pin::new(&mut self.0).poll_next(cx)) {
        let item = ready!(Pin::new(&mut self.0).poll_next(cx));
        debug!(?item, "SelectObjectContentEventStream");
        match item {
            Some(ev) => Poll::Ready(Some(event_into_bytes(ev).map_err(|e| Box::new(e) as StdError))),
            None => Poll::Ready(None),
        }
+2 −0
Original line number Diff line number Diff line
@@ -107,6 +107,8 @@ pub fn set_stream_body(res: &mut Response, stream: StreamingBlob) {

pub fn set_event_stream_body(res: &mut Response, stream: SelectObjectContentEventStream) {
    *res.body_mut() = Body::from(stream.into_byte_stream());
    res.headers_mut()
        .insert(hyper::header::TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
}

pub fn add_opt_metadata(res: &mut Response, metadata: Option<Metadata>) -> S3Result {