Unverified Commit b089118a authored by Liam Perlaki's avatar Liam Perlaki Committed by GitHub
Browse files

use GracefulShutdown util (#152)

parent 030c22cf
Loading
Loading
Loading
Loading
+3 −2
Original line number Diff line number Diff line
@@ -28,8 +28,9 @@ clap = { version = "4.3.21", optional = true, features = ["derive"] }
crc32c = "0.6.4"
futures = "0.3.28"
hex-simd = "0.8.0"
hyper-util = { version = "0.1.3", optional = true, features = [
    "server",
hyper-util = { version = "0.1.5", optional = true, features = [
    "server-auto",
    "server-graceful",
    "http1",
    "http2",
    "tokio",
+35 −21
Original line number Diff line number Diff line
@@ -116,30 +116,44 @@ async fn run(opt: Opt) -> Result {

    let hyper_service = service.into_shared();

    let connection = ConnBuilder::new(TokioExecutor::new());
    let http_server = ConnBuilder::new(TokioExecutor::new());
    let graceful = hyper_util::server::graceful::GracefulShutdown::new();

    let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c());

    info!("server is running at http://{local_addr}");

    let server = async move {
    loop {
            let (socket, _) = match listener.accept().await {
                Ok(ok) => ok,
        let (socket, _) = tokio::select! {
            res =  listener.accept() => {
                match res {
                    Ok(conn) => conn,
                    Err(err) => {
                        tracing::error!("error accepting connection: {err}");
                        continue;
                    }
                }
            }
            _ = ctrl_c.as_mut() => {
                break;
            }
        };
            let service = hyper_service.clone();
            let conn = connection.clone();

        let conn = http_server.serve_connection(TokioIo::new(socket), hyper_service.clone());
        let conn = graceful.watch(conn.into_owned());
        tokio::spawn(async move {
                let _ = conn.serve_connection(TokioIo::new(socket), service).await;
            let _ = conn.await;
        });
    }
    };

    let task = tokio::spawn(server);
    info!("server is running at http://{local_addr}");

    tokio::signal::ctrl_c().await?;
    task.abort();
    tokio::select! {
        () = graceful.shutdown() => {
             tracing::debug!("Gracefully shutdown!");
        },
        () = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
             tracing::debug!("Waited 10 seconds for graceful shutdown, aborting...");
        }
    }

    info!("server is stopped");
    Ok(())
+3 −2
Original line number Diff line number Diff line
@@ -13,8 +13,9 @@ aws-config = { version = "1.1.7", default-features = false, features = [
aws-credential-types = "1.1.7"
aws-sdk-s3 = "1.17.0"
clap = { version = "4.3.21", features = ["derive"] }
hyper-util = { version = "0.1.3", features = [
    "server",
hyper-util = { version = "0.1.5", features = [
    "server-auto",
    "server-graceful",
    "http1",
    "http2",
    "tokio",
+37 −21
Original line number Diff line number Diff line
@@ -77,31 +77,47 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {

    let hyper_service = service.into_shared();

    let connection = ConnBuilder::new(TokioExecutor::new());
    let http_server = ConnBuilder::new(TokioExecutor::new());
    let graceful = hyper_util::server::graceful::GracefulShutdown::new();

    let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c());

    info!("server is running at http://{}:{}/", opt.host, opt.port);
    info!("server is forwarding requests to {}", opt.endpoint_url);

    let server = async move {
    loop {
            let (socket, _) = match listener.accept().await {
                Ok(ok) => ok,
        let (socket, _) = tokio::select! {
            res =  listener.accept() => {
                match res {
                    Ok(conn) => conn,
                    Err(err) => {
                        tracing::error!("error accepting connection: {err}");
                        continue;
                    }
                }
            }
            _ = ctrl_c.as_mut() => {
                break;
            }
        };
            let service = hyper_service.clone();
            let conn = connection.clone();

        let conn = http_server.serve_connection(TokioIo::new(socket), hyper_service.clone());
        let conn = graceful.watch(conn.into_owned());
        tokio::spawn(async move {
                let _ = conn.serve_connection(TokioIo::new(socket), service).await;
            let _ = conn.await;
        });
    }
    };

    info!("server is running at http://{}:{}/", opt.host, opt.port);
    info!("server is forwarding requests to {}", opt.endpoint_url);
    let task = tokio::spawn(server);
    tokio::select! {
        () = graceful.shutdown() => {
             tracing::debug!("Gracefully shutdown!");
        },
        () = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
             tracing::debug!("Waited 10 seconds for graceful shutdown, aborting...");
        }
    }

    tokio::signal::ctrl_c().await?;
    task.abort();
    info!("server is stopped");

    Ok(())
}