Unverified Commit abd01157 authored by Russell Cohen's avatar Russell Cohen Committed by GitHub
Browse files

Update the S3 benchmark for correctness and performance (#2838)

## Motivation and Context
Gain a better understanding of S3 performance with the Rust SDK

## Description
Updates the S3 benchmark to:
1. Add verify step (untimed)
2. Improve performance by reading and writing from the disk concurrently
3. Add support for using multiple clients simultaeneously
4. Fix correctness issues & simplify
5. Add timeouts on the parts

## Testing
Ran the benchmark locally and on a c5n.18xlarge


----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
parent 655684b6
Loading
Loading
Loading
Loading
+23 −8
Original line number Diff line number Diff line
@@ -17,6 +17,17 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41ed9a86bf92ae6580e0a31281f65a1b1d867c0cc68d5346e2ae128dddfa6a7d"

[[package]]
name = "async-trait"
version = "0.1.70"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79fa67157abdfd688a259b6648808757db9347af834624f27ec646da976aee5d"
dependencies = [
 "proc-macro2",
 "quote",
 "syn",
]

[[package]]
name = "autocfg"
version = "1.1.0"
@@ -404,11 +415,15 @@ dependencies = [
name = "benchmark"
version = "0.1.0"
dependencies = [
 "async-trait",
 "aws-config",
 "aws-sdk-s3",
 "aws-smithy-client",
 "aws-smithy-http",
 "clap",
 "hyper",
 "tokio",
 "tracing",
 "tracing-subscriber",
]

@@ -735,9 +750,9 @@ checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"

[[package]]
name = "hyper"
version = "0.14.26"
version = "0.14.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab302d72a6f11a3b910431ff93aae7e773078c769f0a3ef15fb9ec692ed147d4"
checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468"
dependencies = [
 "bytes",
 "futures-channel",
@@ -1005,18 +1020,18 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"

[[package]]
name = "proc-macro2"
version = "1.0.59"
version = "1.0.63"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6aeca18b86b413c660b781aa319e4e2648a3e6f9eadc9b47e9038e6fe9f3451b"
checksum = "7b368fba921b0dce7e60f5e04ec15e565b3303972b42bcfde1d0713b881959eb"
dependencies = [
 "unicode-ident",
]

[[package]]
name = "quote"
version = "1.0.28"
version = "1.0.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b9ab9c7eadfd8df19006f1cf1a4aed13540ed5cbc047010ece5826e10825488"
checksum = "573015e8ab27661678357f27dc26460738fd2b6c86e46f386fde94cb5d913105"
dependencies = [
 "proc-macro2",
]
@@ -1264,9 +1279,9 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc"

[[package]]
name = "syn"
version = "2.0.18"
version = "2.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32d41677bcbe24c20c52e7c70b0d8db04134c5d1066bf98662e2871ad200ea3e"
checksum = "59fb7d6d8281a51045d62b8eb3a7d1ce347b76f312af50cd3dc0af39c87c1737"
dependencies = [
 "proc-macro2",
 "quote",
+4 −0
Original line number Diff line number Diff line
@@ -12,6 +12,10 @@ publish = false
aws-config = "0.55.3"
aws-sdk-s3 = "0.28.0"
aws-smithy-http = "0.55.3"
aws-smithy-client= { version = "0.55.3", features = ["client-hyper"] }
clap = { version = "4.3.2", default-features = false, features = ["derive", "std", "help"] }
tokio = { version = "1.28.2", features = ["full"] }
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
tracing = "0.1"
async-trait = "0.1.68"
hyper = { version = "0.14.27", features = ["client"] }
+49 −0
Original line number Diff line number Diff line
/*
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0
 */

use crate::{verify, Args, BoxError};
use async_trait::async_trait;
use aws_config::SdkConfig;
use aws_sdk_s3::Client;
use std::path::{Path, PathBuf};

pub(crate) struct GetTestResult {
    pub(crate) expected: PathBuf,
    pub(crate) actual: PathBuf,
}

#[async_trait]
pub(crate) trait GetBenchmark {
    type Setup: Send;
    async fn prepare(&self, conf: &SdkConfig) -> Self::Setup;
    async fn do_get(
        &self,
        state: Self::Setup,
        target_path: &Path,
        args: &Args,
    ) -> Result<PathBuf, BoxError>;
    async fn do_bench(
        &self,
        state: Self::Setup,
        args: &Args,
        expected_path: &Path,
    ) -> Result<GetTestResult, BoxError> {
        let target_path = expected_path.with_extension("downloaded");
        let downloaded_path = self.do_get(state, &target_path, args).await?;
        Ok(GetTestResult {
            expected: expected_path.to_path_buf(),
            actual: downloaded_path,
        })
    }

    async fn verify(
        &self,
        _client: &Client,
        _args: &Args,
        result: GetTestResult,
    ) -> Result<(), BoxError> {
        verify::diff(&result.actual, &result.expected).await
    }
}
+1 −1
Original line number Diff line number Diff line
@@ -6,7 +6,7 @@
use std::fmt;
use std::time;

const ONE_GIGABYTE: u64 = 1024 * 1024 * 1024;
const ONE_GIGABYTE: u64 = 1000 * 1000 * 1000;

#[derive(Debug)]
pub struct Latencies {
+120 −35
Original line number Diff line number Diff line
@@ -4,9 +4,11 @@
 */

use crate::latencies::Latencies;
use crate::multipart_get::get_object_multipart;
use crate::multipart_put::put_object_multipart;
use crate::multipart_put::{put_object_multipart, PutObjectMultipart};
use async_trait::async_trait;
use aws_config::SdkConfig;
use aws_sdk_s3 as s3;
use aws_sdk_s3::Client;
use clap::Parser as _;
use s3::error::DisplayErrorContext;
use s3::primitives::ByteStream;
@@ -14,16 +16,24 @@ use std::error::Error as StdError;
use std::path::Path;
use std::path::PathBuf;
use std::process;
use std::process::{Command, Stdio};
use std::time;

mod get_test;
mod latencies;
mod multipart_get;
mod multipart_put;
mod put_test;
mod verify;

pub type BoxError = Box<dyn StdError + Send + Sync>;

pub const BENCH_KEY: &str = "s3_bench_file";

use crate::get_test::GetBenchmark;
use crate::put_test::PutBenchmark;
use tracing::Instrument;

#[derive(Copy, Clone, Debug, clap::ValueEnum)]
pub enum Fs {
    #[cfg(target_os = "linux")]
@@ -41,7 +51,7 @@ pub enum Bench {
    GetObjectMultipart,
}

#[derive(Debug, clap::Parser)]
#[derive(Debug, Clone, clap::Parser)]
#[command()]
pub struct Args {
    /// Which benchmark to run.
@@ -79,6 +89,9 @@ pub struct Args {
    /// Number of concurrent uploads/downloads to perform.
    #[arg(long, default_value_t = 4)]
    concurrency: usize,

    #[arg(long, default_value_t = 1000)]
    part_upload_timeout_millis: u64,
}

#[tokio::main]
@@ -94,13 +107,12 @@ async fn main() {
        }
        loader.load().await
    };
    let client = s3::Client::new(&config);

    let result = match args.bench {
        Bench::PutObject => benchmark_put_object(&client, &args).await,
        Bench::GetObject => benchmark_get_object(&client, &args).await,
        Bench::PutObjectMultipart => benchmark_put_object_multipart(&client, &args).await,
        Bench::GetObjectMultipart => benchmark_get_object_multipart(&client, &args).await,
        Bench::PutObject => benchmark_put_object(&config, &args).await,
        Bench::GetObject => benchmark_get_object(&config, &args).await,
        Bench::PutObjectMultipart => benchmark_put_object_multipart(&config, &args).await,
        Bench::GetObjectMultipart => benchmark_get_object_multipart(&config, &args).await,
    };
    match result {
        Ok(latencies) => {
@@ -117,15 +129,31 @@ async fn main() {
}

macro_rules! benchmark {
    ($client:ident, $args:ident, setup => $setup:expr, operation => $operation:expr) => {{
    ($sdk_config:ident, $args:ident, setup => $setup:expr, operation => $operation:expr) => {{
        #[allow(unused)]
        use crate::get_test::GetBenchmark;
        #[allow(unused)]
        use crate::put_test::PutBenchmark;
        println!("setting up...");
        let test_file_path = generate_test_file($args)?;
        $setup($client, $args, &test_file_path).await?;
        let setup_client = aws_sdk_s3::Client::new(&$sdk_config);
        $setup(&setup_client, $args, &test_file_path).await?;
        println!("setup complete");

        let mut latencies = Latencies::new($args.size_bytes);
        for i in 0..$args.iterations {
            let span = tracing::info_span!("run operation");
            let bench = $operation;
            let client = bench.prepare($sdk_config).await;
            let start = time::Instant::now();
            $operation($client, $args, &test_file_path).await?;
            let result = bench
                .do_bench(client, $args, &test_file_path)
                .instrument(span)
                .await?;
            let latency = start.elapsed();
            if let Err(e) = bench.verify(&setup_client, $args, result).await {
                println!("benchmark did not finish correctly: {}", e);
            }
            latencies.push(latency);
            println!(
                "finished iteration {i} in {} seconds",
@@ -137,38 +165,79 @@ macro_rules! benchmark {
    }};
}

async fn benchmark_put_object(client: &s3::Client, args: &Args) -> Result<Latencies, BoxError> {
    benchmark!(client, args, setup => no_setup, operation => put_object)
async fn benchmark_put_object(conf: &SdkConfig, args: &Args) -> Result<Latencies, BoxError> {
    struct PutObject;
    #[async_trait]
    impl PutBenchmark for PutObject {
        type Setup = Client;

        async fn prepare(&self, conf: &SdkConfig) -> Self::Setup {
            Client::new(conf)
        }

async fn benchmark_get_object(client: &s3::Client, args: &Args) -> Result<Latencies, BoxError> {
    async fn operation(client: &s3::Client, args: &Args, path: &Path) -> Result<(), BoxError> {
        let output = client
        async fn do_put(
            &self,
            state: Self::Setup,
            target_key: &str,
            local_file: &Path,
            args: &Args,
        ) -> Result<(), BoxError> {
            state
                .put_object()
                .bucket(&args.bucket)
                .key(target_key)
                .body(ByteStream::from_path(local_file).await?)
                .send()
                .await?;
            Ok(())
        }
    }
    benchmark!(conf, args, setup => no_setup, operation => PutObject)
}

async fn benchmark_get_object(client: &SdkConfig, args: &Args) -> Result<Latencies, BoxError> {
    struct GetObject;
    #[async_trait]
    impl GetBenchmark for GetObject {
        type Setup = Client;

        async fn prepare(&self, conf: &SdkConfig) -> Self::Setup {
            Client::new(&conf)
        }

        async fn do_get(
            &self,
            state: Self::Setup,
            target_path: &Path,
            args: &Args,
        ) -> Result<PathBuf, BoxError> {
            let output = state
                .get_object()
                .bucket(&args.bucket)
                .key(BENCH_KEY)
                .send()
                .await?;
            let mut body = output.body.into_async_read();
        let mut file = tokio::fs::File::create(path).await?;
            let mut file = tokio::fs::File::create(target_path).await?;
            tokio::io::copy(&mut body, &mut file).await?;
        Ok(())
            Ok(target_path.to_path_buf())
        }
    }
    benchmark!(client, args, setup => put_object_intelligent, operation => operation)
    benchmark!(client, args, setup => put_object_intelligent, operation => GetObject)
}

async fn benchmark_put_object_multipart(
    client: &s3::Client,
    conf: &SdkConfig,
    args: &Args,
) -> Result<Latencies, BoxError> {
    benchmark!(client, args, setup => no_setup, operation => put_object_multipart)
    benchmark!(conf, args, setup => no_setup, operation => PutObjectMultipart)
}

async fn benchmark_get_object_multipart(
    client: &s3::Client,
    config: &SdkConfig,
    args: &Args,
) -> Result<Latencies, BoxError> {
    benchmark!(client, args, setup => put_object_intelligent, operation => get_object_multipart)
    benchmark!(config, args, setup => put_object_intelligent, operation => multipart_get::GetObjectMultipart::new())
}

fn generate_test_file(args: &Args) -> Result<PathBuf, BoxError> {
@@ -183,11 +252,27 @@ fn generate_test_file(args: &Args) -> Result<PathBuf, BoxError> {
        }
    };

    process::Command::new("truncate")
        .arg("-s")
    let mut yes_process = Command::new("yes")
        .arg("01234567890abcdefghijklmnopqrstuvwxyz")
        .stdout(Stdio::piped())
        .spawn()?;

    let mut head_process = Command::new("head")
        .arg("-c")
        .arg(format!("{}", args.size_bytes))
        .arg(&path)
        .output()?;
        .stdin(yes_process.stdout.take().unwrap())
        .stdout(Stdio::piped())
        .spawn()?;

    let mut file = std::fs::File::create(&path)?;
    head_process.stdout.as_mut().unwrap();
    std::io::copy(&mut head_process.stdout.take().unwrap(), &mut file)?;

    let exit_status = head_process.wait()?;

    if !exit_status.success() {
        Err("failed to generate temp file")?
    }

    Ok(path)
}
@@ -202,7 +287,7 @@ async fn put_object_intelligent(
    path: &Path,
) -> Result<(), BoxError> {
    if args.size_bytes > args.part_size_bytes {
        put_object_multipart(client, args, path).await
        put_object_multipart(&[client.clone()], args, BENCH_KEY, path).await
    } else {
        put_object(client, args, path).await
    }
Loading