Unverified Commit 83e34cd1 authored by Liam Perlaki's avatar Liam Perlaki Committed by GitHub
Browse files

upgrade hyper and http-body to 1.x (#134)



* upgrade hyper and http-body to 1.x

* add BoxBody variant

* discard non-data frames in Body Stream impl

---------

Co-authored-by: default avatarLiam Perlaki <liam@perlaki.org>
parent 3640ecfd
Loading
Loading
Loading
Loading
+5 −9
Original line number Diff line number Diff line
@@ -11,15 +11,11 @@ categories = ["web-programming", "web-programming::http-server"]

[dependencies]
async-trait = "0.1.73"
aws-sdk-s3 = "1.12.0"
aws-smithy-http = { version = "0.60.2", features = ["event-stream"] }
aws-smithy-runtime-api = { version = "1.1.2", features = ["client", "http-02x"] }
aws-smithy-types = { version = "1.1.2", features = ["http-body-0-4-x"] }
aws-smithy-types-convert = { version = "0.60.2", features = ["convert-time"] }
bytes = "1.4.0"
futures = { version = "0.3.28", default-features = false, features = ["std"] }
http-body = "0.4.5"
hyper = "0.14.27"
aws-sdk-s3 = "1.17.0"
aws-smithy-runtime-api = { version = "1.2.0", features = ["client", "http-1x"] }
aws-smithy-types = { version = "1.1.8", features = ["http-body-1-x"] }
aws-smithy-types-convert = { version = "0.60.8", features = ["convert-time"] }
hyper = "1.1.0"
s3s = { version = "0.10.0-dev", path = "../s3s" }
sync_wrapper = "0.1.2"
tracing = "0.1.37"
+2 −25
Original line number Diff line number Diff line
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;

use aws_smithy_types::body::SdkBody;

use futures::Stream;
use hyper::body::HttpBody;

pub fn s3s_body_into_sdk_body(body: s3s::Body) -> SdkBody {
    SdkBody::from_body_0_4(body.boxed())
    SdkBody::from_body_1_x(body)
}

pub fn sdk_body_into_s3s_body(body: SdkBody) -> s3s::Body {
    s3s::Body::from(Box::pin(Wrapper(body)) as s3s::stream::DynByteStream)
}

struct Wrapper(SdkBody);

impl Stream for Wrapper {
    type Item = Result<bytes::Bytes, s3s::StdError>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        hyper::body::HttpBody::poll_data(Pin::new(&mut self.0), cx)
    }
}

impl s3s::stream::ByteStream for Wrapper {
    fn remaining_length(&self) -> s3s::stream::RemainingLength {
        hyper::body::HttpBody::size_hint(&self.0).into()
    }
    s3s::Body::http_body(body)
}
+1 −1
Original line number Diff line number Diff line
@@ -56,7 +56,7 @@ impl HttpConnector for Connector {
}

fn convert_input(req: AwsHttpRequest) -> Result<Request<s3s::Body>, ConnectorError> {
    let mut req = req.try_into_http02x().map_err(on_err)?;
    let mut req = req.try_into_http1x().map_err(on_err)?;

    if req.headers().contains_key(HOST).not() {
        let host = auto_host_header(req.uri());
+16 −4
Original line number Diff line number Diff line
@@ -14,20 +14,29 @@ name = "s3s-fs"
required-features = ["binary"]

[features]
binary = ["tokio/full", "dep:clap", "dep:tracing-subscriber", "dep:hyper"]
binary = ["tokio/full", "dep:clap", "dep:tracing-subscriber", "dep:hyper-util"]

[dependencies]
async-trait = "0.1.73"
base64-simd = "0.8.0"
bytes = "1.4.0"
chrono = { version = "0.4.26", default-features = false, features = ["std", "clock"] }
chrono = { version = "0.4.26", default-features = false, features = [
    "std",
    "clock",
] }
clap = { version = "4.3.21", optional = true, features = ["derive"] }
crc32c = "0.6.4"
crc32fast = "1.3.2"
digest = "0.10.7"
futures = "0.3.28"
hex-simd = "0.8.0"
hyper = { version = "0.14.27", optional = true, features = ["http1", "http2", "server", "stream", "runtime"] }

hyper-util = { version = "0.1.3", optional = true, features = [
    "server",
    "http1",
    "http2",
    "tokio",
] }
md-5 = "0.10.5"
mime = "0.3.17"
nugine-rust-utils = "0.3.1"
@@ -43,7 +52,10 @@ tokio = { version = "1.31.0", features = ["fs", "io-util"] }
tokio-util = { version = "0.7.8", features = ["io"] }
tracing = "0.1.37"
tracing-error = "0.2.0"
tracing-subscriber = { version = "0.3.17", optional = true, features = ["env-filter", "time"] }
tracing-subscriber = { version = "0.3.17", optional = true, features = [
    "env-filter",
    "time",
] }
transform-stream = "0.3.0"
uuid = { version = "1.4.1", features = ["v4"] }

+30 −9
Original line number Diff line number Diff line
@@ -8,13 +8,16 @@ use s3s::auth::SimpleAuth;
use s3s::service::S3ServiceBuilder;

use std::io::IsTerminal;
use std::net::TcpListener;
use std::path::PathBuf;

use tokio::net::TcpListener;

use clap::{CommandFactory, Parser};
use hyper::server::Server;
use tracing::info;

use hyper_util::rt::{TokioExecutor, TokioIo};
use hyper_util::server::conn::auto::Builder as ConnBuilder;

#[derive(Debug, Parser)]
#[command(version)]
struct Opt {
@@ -108,18 +111,36 @@ async fn run(opt: Opt) -> Result {
    };

    // Run server
    let listener = TcpListener::bind((opt.host.as_str(), opt.port))?;
    let listener = TcpListener::bind((opt.host.as_str(), opt.port)).await?;
    let local_addr = listener.local_addr()?;

    let server = Server::from_tcp(listener)?.serve(service.into_shared().into_make_service());
    let hyper_service = service.into_shared();

    let connection = ConnBuilder::new(TokioExecutor::new());

    let server = async move {
        loop {
            let (socket, _) = match listener.accept().await {
                Ok(ok) => ok,
                Err(err) => {
                    tracing::error!("error accepting connection: {err}");
                    continue;
                }
            };
            let service = hyper_service.clone();
            let conn = connection.clone();
            tokio::spawn(async move {
                let _ = conn.serve_connection(TokioIo::new(socket), service).await;
            });
        }
    };

    let task = tokio::spawn(server);
    info!("server is running at http://{local_addr}");
    server.with_graceful_shutdown(shutdown_signal()).await?;

    tokio::signal::ctrl_c().await?;
    task.abort();

    info!("server is stopped");
    Ok(())
}

async fn shutdown_signal() {
    let _ = tokio::signal::ctrl_c().await;
}
Loading