Unverified Commit e03c75c5 authored by John DiSanti's avatar John DiSanti Committed by GitHub
Browse files

Write an S3 throughput benchmark (#2747)

## Motivation and Context
This PR adds a S3 throughput benchmark for upload, download, multipart
upload across multiple threads, and multipart download across multiple
threads.

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
parent 7ed51b21
Loading
Loading
Loading
Loading
+32 −0
Original line number Diff line number Diff line
S3 Benchmark
============

This directory contains a S3 benchmark that measures throughput when using the AWS Rust SDK to put and get objects to/from S3.
The `benchmark/` directory has the Rust benchmark code, and `infrastructure/` contains the CDK infrastructure to stand up
a `c5n.18xlarge` EC2 instance, compile/run the benchmark, and upload the results to S3.

Example of running the `get-object-multipart` benchmark in local dev:

```bash
cargo run -- --bench get-object-multipart --fs disk --part-size-bytes 5242880 --size-bytes 6000000 --bucket my-test-bucket --region us-west-2 --profile my-aws-credentials-profile
```

On Linux, the `--fs` option can be given either `disk` or `tmpfs` (for an in-memory filesystem), while on other OSes, only `disk` is available.

In addition to `get-object-multipart`, there are `put-object-multipart`, `put-object`, and `get-object`. All of these take the
same CLI arguments, although `--part-size-bytes` is unused by `put-object` and `get-object`.

To run the actual benchmark, it must be deployed via CDK from the `infrastructure/` directory:

```bash
npm install
npm run build
npx cdk bootstrap --profile my-aws-credentials-profile
npx cdk synthesize --profile my-aws-credentials-profile
npx cdk deploy --profile my-aws-credentials-profile
```

The `lib/instrastructure-stack.ts` defines the actual CloudFormation stack that creates the EC2 Instance.
This instance is configured to run the `assets/init_instance.sh` and `assets/run_benchmark.sh` scripts on start-up.
It's also configured for SSH access via a key pair named "S3BenchmarkKeyPair". This key pair has to be created manually
before deploying the CDK stack.
+1803 −0

File added.

Preview size limit exceeded, changes collapsed.

+17 −0
Original line number Diff line number Diff line
[package]
name = "benchmark"
version = "0.1.0"
authors = ["AWS Rust SDK Team <aws-sdk-rust@amazon.com>", "John DiSanti <jdisanti@amazon.com>"]
description = "S3 benchmark"
edition = "2021"
license = "Apache-2.0"
repository = "https://github.com/awslabs/smithy-rs"
publish = false

[dependencies]
aws-config = "0.55.3"
aws-sdk-s3 = "0.28.0"
aws-smithy-http = "0.55.3"
clap = { version = "4.3.2", default-features = false, features = ["derive", "std", "help"] }
tokio = { version = "1.28.2", features = ["full"] }
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
+90 −0
Original line number Diff line number Diff line
/*
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0
 */

use std::fmt;
use std::time;

const ONE_GIGABYTE: u64 = 1024 * 1024 * 1024;

#[derive(Debug)]
pub struct Latencies {
    object_size_bytes: u64,
    raw_values: Vec<f64>,
}

impl Latencies {
    pub fn new(object_size_bytes: u64) -> Self {
        Self {
            object_size_bytes,
            raw_values: Vec::new(),
        }
    }

    pub fn push(&mut self, value: time::Duration) {
        self.raw_values.push(value.as_secs_f64());
    }

    /// Calculates the standard deviation squared of the given values.
    fn variance(values: &[f64], average: f64) -> f64 {
        values
            .iter()
            .map(|value| (value - average).powi(2))
            .sum::<f64>()
            / values.len() as f64
    }
}

impl fmt::Display for Latencies {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        let object_size_gigabits = self.object_size_bytes as f64 / ONE_GIGABYTE as f64 * 8f64;

        let average_latency = self.raw_values.iter().sum::<f64>() / self.raw_values.len() as f64;
        let lowest_latency = self
            .raw_values
            .iter()
            .fold(std::f64::INFINITY, |acc, &x| acc.min(x));
        let variance = Self::variance(&self.raw_values, average_latency);
        writeln!(f, "Latency values (s): {:?}", self.raw_values)?;
        writeln!(f, "Average latency (s): {average_latency}")?;
        writeln!(f, "Latency variance (s): {variance}")?;
        writeln!(f, "Object size (Gigabits): {object_size_gigabits}")?;
        writeln!(
            f,
            "Average throughput (Gbps): {}",
            object_size_gigabits / average_latency
        )?;
        writeln!(
            f,
            "Highest average throughput (Gbps): {}",
            object_size_gigabits / lowest_latency
        )?;
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use crate::latencies::{Latencies, ONE_GIGABYTE};

    #[test]
    fn test_display() {
        let latencies = Latencies {
            object_size_bytes: 30 * ONE_GIGABYTE,
            raw_values: vec![
                33.261f64, 41.114, 33.014, 32.97, 34.138, 33.972, 33.001, 34.12,
            ],
        };

        let expected = "\
            Latency values (s): [33.261, 41.114, 33.014, 32.97, 34.138, 33.972, 33.001, 34.12]\n\
            Average latency (s): 34.448750000000004\n\
            Latency variance (s): 6.576178687499994\n\
            Object size (Gigabits): 240\n\
            Average throughput (Gbps): 6.966871076599295\n\
            Highest average throughput (Gbps): 7.279344858962694\n";
        let actual = latencies.to_string();
        assert_eq!(expected, actual);
    }
}
+220 −0
Original line number Diff line number Diff line
/*
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0
 */

use crate::latencies::Latencies;
use crate::multipart_get::get_object_multipart;
use crate::multipart_put::put_object_multipart;
use aws_sdk_s3 as s3;
use clap::Parser as _;
use s3::error::DisplayErrorContext;
use s3::primitives::ByteStream;
use std::error::Error as StdError;
use std::path::Path;
use std::path::PathBuf;
use std::process;
use std::time;

mod latencies;
mod multipart_get;
mod multipart_put;

pub type BoxError = Box<dyn StdError + Send + Sync>;

pub const BENCH_KEY: &str = "s3_bench_file";

#[derive(Copy, Clone, Debug, clap::ValueEnum)]
pub enum Fs {
    #[cfg(target_os = "linux")]
    // Use tmpfs
    Tmpfs,
    // Use the disk
    Disk,
}

#[derive(Copy, Clone, Debug, clap::ValueEnum)]
pub enum Bench {
    PutObject,
    GetObject,
    PutObjectMultipart,
    GetObjectMultipart,
}

#[derive(Debug, clap::Parser)]
#[command()]
pub struct Args {
    /// Which benchmark to run.
    #[arg(long)]
    bench: Bench,

    /// Local FS type to use.
    #[arg(long)]
    fs: Fs,

    /// Size of the object to benchmark with.
    #[arg(long)]
    size_bytes: u64,

    /// S3 bucket to test against.
    #[arg(long)]
    bucket: String,

    /// AWS region to use. Defaults to us-east-1.
    #[arg(long, default_value = "us-east-1")]
    region: String,

    /// AWS credentials profile to use.
    #[arg(long)]
    profile: Option<String>,

    /// Part size for multipart benchmarks. Defaults to 8 MiB.
    #[arg(long, default_value_t = 8_388_608)]
    part_size_bytes: u64,

    /// Number of benchmark iterations to perform.
    #[arg(long, default_value_t = 8)]
    iterations: usize,

    /// Number of concurrent uploads/downloads to perform.
    #[arg(long, default_value_t = 4)]
    concurrency: usize,
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt::init();

    let args = Args::parse();
    let config = {
        let mut loader =
            aws_config::from_env().region(s3::config::Region::new(args.region.clone()));
        if let Some(profile) = args.profile.as_ref() {
            loader = loader.profile_name(profile);
        }
        loader.load().await
    };
    let client = s3::Client::new(&config);

    let result = match args.bench {
        Bench::PutObject => benchmark_put_object(&client, &args).await,
        Bench::GetObject => benchmark_get_object(&client, &args).await,
        Bench::PutObjectMultipart => benchmark_put_object_multipart(&client, &args).await,
        Bench::GetObjectMultipart => benchmark_get_object_multipart(&client, &args).await,
    };
    match result {
        Ok(latencies) => {
            println!("benchmark succeeded");
            println!("=============== {:?} Result ================", args.bench);
            println!("{latencies}");
            println!("==========================================================");
        }
        Err(err) => {
            println!("benchmark failed: {}", DisplayErrorContext(err.as_ref()));
            process::exit(1);
        }
    }
}

macro_rules! benchmark {
    ($client:ident, $args:ident, setup => $setup:expr, operation => $operation:expr) => {{
        let test_file_path = generate_test_file($args)?;
        $setup($client, $args, &test_file_path).await?;

        let mut latencies = Latencies::new($args.size_bytes);
        for i in 0..$args.iterations {
            let start = time::Instant::now();
            $operation($client, $args, &test_file_path).await?;
            let latency = start.elapsed();
            latencies.push(latency);
            println!(
                "finished iteration {i} in {} seconds",
                latency.as_secs_f64()
            );
        }

        Ok(latencies)
    }};
}

async fn benchmark_put_object(client: &s3::Client, args: &Args) -> Result<Latencies, BoxError> {
    benchmark!(client, args, setup => no_setup, operation => put_object)
}

async fn benchmark_get_object(client: &s3::Client, args: &Args) -> Result<Latencies, BoxError> {
    async fn operation(client: &s3::Client, args: &Args, path: &Path) -> Result<(), BoxError> {
        let output = client
            .get_object()
            .bucket(&args.bucket)
            .key(BENCH_KEY)
            .send()
            .await?;
        let mut body = output.body.into_async_read();
        let mut file = tokio::fs::File::create(path).await?;
        tokio::io::copy(&mut body, &mut file).await?;
        Ok(())
    }
    benchmark!(client, args, setup => put_object_intelligent, operation => operation)
}

async fn benchmark_put_object_multipart(
    client: &s3::Client,
    args: &Args,
) -> Result<Latencies, BoxError> {
    benchmark!(client, args, setup => no_setup, operation => put_object_multipart)
}

async fn benchmark_get_object_multipart(
    client: &s3::Client,
    args: &Args,
) -> Result<Latencies, BoxError> {
    benchmark!(client, args, setup => put_object_intelligent, operation => get_object_multipart)
}

fn generate_test_file(args: &Args) -> Result<PathBuf, BoxError> {
    let path = match args.fs {
        Fs::Disk => format!("/tmp/{BENCH_KEY}").into(),
        #[cfg(target_os = "linux")]
        Fs::Tmpfs => {
            if !PathBuf::from("/dev/shm").exists() {
                return Err("tmpfs not available on this machine".into());
            }
            format!("/dev/shm/{BENCH_KEY}").into()
        }
    };

    process::Command::new("truncate")
        .arg("-s")
        .arg(format!("{}", args.size_bytes))
        .arg(&path)
        .output()?;

    Ok(path)
}

async fn no_setup(_client: &s3::Client, _args: &Args, _path: &Path) -> Result<(), BoxError> {
    Ok(())
}

async fn put_object_intelligent(
    client: &s3::Client,
    args: &Args,
    path: &Path,
) -> Result<(), BoxError> {
    if args.size_bytes > args.part_size_bytes {
        put_object_multipart(client, args, path).await
    } else {
        put_object(client, args, path).await
    }
}

async fn put_object(client: &s3::Client, args: &Args, path: &Path) -> Result<(), BoxError> {
    client
        .put_object()
        .bucket(&args.bucket)
        .key(BENCH_KEY)
        .body(ByteStream::from_path(&path).await?)
        .send()
        .await?;
    Ok(())
}
Loading