Commit 5c8df8ce authored by Nugine's avatar Nugine
Browse files

feat(s3s/http): support UnsyncBoxBody

parent 96fa0a2f
Loading
Loading
Loading
Loading
+31 −0
Original line number Diff line number Diff line
@@ -6,6 +6,7 @@ use crate::stream::RemainingLength;
use std::fmt;
use std::mem;
use std::pin::Pin;
use std::sync::Mutex;
use std::task::Context;
use std::task::Poll;

@@ -15,6 +16,7 @@ use futures::Stream;
use http_body::Frame;

type BoxBody = http_body_util::combinators::BoxBody<Bytes, StdError>;
type UnsyncBoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, StdError>;

pin_project_lite::pin_project! {
    #[derive(Default)]
@@ -41,6 +43,10 @@ pin_project_lite::pin_project! {
            #[pin]
            inner: BoxBody,
        },
        UnsyncBoxBody {
            #[pin]
            inner: Mutex<UnsyncBoxBody>,
        },
        DynStream {
            #[pin]
            inner: DynByteStream
@@ -84,6 +90,19 @@ impl Body {
            },
        }
    }

    #[must_use]
    pub fn http_body_unsync<B>(body: B) -> Self
    where
        B: http_body::Body<Data = Bytes> + Send + 'static,
        StdError: From<B::Error>,
    {
        Self {
            kind: Kind::UnsyncBoxBody {
                inner: Mutex::new(UnsyncBoxBody::new(http_body_util::BodyExt::map_err(body, From::from))),
            },
        }
    }
}

impl From<Bytes> for Body {
@@ -144,6 +163,11 @@ impl http_body::Body for Body {
                http_body::Body::poll_frame(inner, cx)
                //
            }
            KindProj::UnsyncBoxBody { inner } => {
                let mut inner = inner.lock().unwrap();
                http_body::Body::poll_frame(Pin::new(&mut *inner), cx)
                //
            }
            KindProj::DynStream { inner } => {
                Stream::poll_next(inner, cx).map_ok(Frame::data)
                //
@@ -157,6 +181,7 @@ impl http_body::Body for Body {
            Kind::Once { inner } => inner.is_empty(),
            Kind::Hyper { inner } => http_body::Body::is_end_stream(inner),
            Kind::BoxBody { inner } => http_body::Body::is_end_stream(inner),
            Kind::UnsyncBoxBody { inner } => inner.lock().unwrap().is_end_stream(),
            Kind::DynStream { inner } => inner.remaining_length().exact() == Some(0),
        }
    }
@@ -167,6 +192,7 @@ impl http_body::Body for Body {
            Kind::Once { inner } => http_body::SizeHint::with_exact(inner.len() as u64),
            Kind::Hyper { inner } => http_body::Body::size_hint(inner),
            Kind::BoxBody { inner } => http_body::Body::size_hint(inner),
            Kind::UnsyncBoxBody { inner } => inner.lock().unwrap().size_hint(),
            Kind::DynStream { inner } => inner.remaining_length().into(),
        }
    }
@@ -195,6 +221,7 @@ impl ByteStream for Body {
            Kind::Once { inner } => RemainingLength::new_exact(inner.len()),
            Kind::Hyper { inner } => http_body::Body::size_hint(inner).into(),
            Kind::BoxBody { inner } => http_body::Body::size_hint(inner).into(),
            Kind::UnsyncBoxBody { inner } => http_body::Body::size_hint(&*inner.lock().unwrap()).into(),
            Kind::DynStream { inner } => inner.remaining_length(),
        }
    }
@@ -215,6 +242,10 @@ impl fmt::Debug for Body {
                d.field("body", &"{..}");
                d.field("remaining_length", &http_body::Body::size_hint(inner));
            }
            Kind::UnsyncBoxBody { inner } => {
                d.field("body", &"{..}");
                d.field("remaining_length", &http_body::Body::size_hint(&*inner.lock().unwrap()));
            }
            Kind::DynStream { inner } => {
                d.field("dyn_stream", &"{..}");
                d.field("remaining_length", &inner.remaining_length());