Unverified Commit 2c20f0b2 authored by Kefu Chai's avatar Kefu Chai Committed by GitHub
Browse files

fix(s3s/http): prevent unbounded memory allocation in POST object (#370) (#390)



Add defense-in-depth limits to multipart/form-data parsing for POST object
to prevent DoS attacks via oversized uploads:

- MAX_FORM_FIELD_SIZE: 1 MB per field (matching MinIO)
- MAX_FORM_FIELDS_SIZE: 20 MB total for all form fields
- MAX_FORM_PARTS: 1000 maximum parts
- MAX_POST_OBJECT_FILE_SIZE: 5 GB for file content (matching S3 single PUT limit)

Why buffering instead of streaming:

A pure streaming solution using FileStream was initially attempted, but it
breaks compatibility with s3s-proxy and other consumers that use the AWS SDK.
The AWS SDK requires a known Content-Length to compute SHA-256 checksums for
SigV4 request signing. Since FileStream returns an unknown remaining length,
the SDK throws UnsizedRequestBody errors.

MinIO can accept streaming POST uploads directly because its HTTP server
parses multipart boundaries at the protocol level without needing size upfront.
However, when requests are proxied through s3s-proxy using the AWS SDK, the
SDK's cryptographic signing algorithm requires the body size before transmission.

The buffering approach with size limits provides:
1. Protection against unbounded memory allocation (DoS mitigation)
2. Compatibility with AWS SDK-based consumers (s3s-proxy, etc.)
3. Known content length for downstream request signing

Fixes #370

Signed-off-by: default avatarKefu Chai <tchaikov@gmail.com>
parent fd19e678
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -609,7 +609,8 @@ fn codegen_op_http_de_multipart(op: &Operation, rust_types: &RustTypes) {
        "",
        "let vec_stream = req.s3ext.vec_stream.take().expect(\"missing vec stream\");",
        "",
        "let content_length = i64::try_from(vec_stream.exact_remaining_length()).map_err(|e|s3_error!(e, InvalidArgument, \"content-length overflow\"))?;",
        "let content_length = i64::try_from(vec_stream.exact_remaining_length())",
        "    .map_err(|e| s3_error!(e, InvalidArgument, \"content-length overflow\"))?;",
        "let content_length = (content_length != 0).then_some(content_length);",
        "",
        "let body: Option<StreamingBlob> = Some(StreamingBlob::new(vec_stream));",
+195 −2
Original line number Diff line number Diff line
@@ -16,6 +16,23 @@ use hyper::body::Bytes;
use memchr::memchr_iter;
use transform_stream::{AsyncTryStream, Yielder};

/// Maximum size per form field (1 MB)
/// This prevents `DoS` attacks via oversized individual fields
const MAX_FORM_FIELD_SIZE: usize = 1024 * 1024;

/// Maximum total size for all form fields combined (20 MB)
/// This prevents `DoS` attacks via accumulation of many fields
const MAX_FORM_FIELDS_SIZE: usize = 20 * 1024 * 1024;

/// Maximum number of parts in multipart form
/// This prevents `DoS` attacks via excessive part count
const MAX_FORM_PARTS: usize = 1000;

/// Maximum file size for POST object (5 GB - S3 limit for single PUT)
/// This prevents `DoS` attacks via oversized file uploads
/// Note: S3 has a 5GB limit for single PUT object, so this is a reasonable default
pub const MAX_POST_OBJECT_FILE_SIZE: u64 = 5 * 1024 * 1024 * 1024;

/// Form file
#[derive(Debug)]
pub struct File {
@@ -68,6 +85,32 @@ pub enum MultipartError {
    Underlying(StdError),
    #[error("MultipartError: InvalidFormat")]
    InvalidFormat,
    #[error("MultipartError: FieldTooLarge: field size {0} bytes exceeds limit of {1} bytes")]
    FieldTooLarge(usize, usize),
    #[error("MultipartError: TotalSizeTooLarge: total form fields size {0} bytes exceeds limit of {1} bytes")]
    TotalSizeTooLarge(usize, usize),
    #[error("MultipartError: TooManyParts: part count {0} exceeds limit of {1}")]
    TooManyParts(usize, usize),
    #[error("MultipartError: FileTooLarge: file size {0} bytes exceeds limit of {1} bytes")]
    FileTooLarge(u64, u64),
}

/// Aggregates a file stream into a Vec<Bytes> with a size limit.
/// Returns error if the total size exceeds the limit.
pub async fn aggregate_file_stream_limited(mut stream: FileStream, max_size: u64) -> Result<Vec<Bytes>, MultipartError> {
    use futures::stream::StreamExt;
    let mut vec = Vec::new();
    let mut total_size: u64 = 0;

    while let Some(result) = stream.next().await {
        let bytes = result.map_err(|e| MultipartError::Underlying(Box::new(e)))?;
        total_size = total_size.saturating_add(bytes.len() as u64);
        if total_size > max_size {
            return Err(MultipartError::FileTooLarge(total_size, max_size));
        }
        vec.push(bytes);
    }
    Ok(vec)
}

/// transform multipart
@@ -90,17 +133,29 @@ where
    };

    let mut fields = Vec::new();
    let mut total_fields_size: usize = 0;
    let mut parts_count: usize = 0;

    loop {
        // copy bytes to buf
        match body.as_mut().next().await {
            None => return Err(MultipartError::InvalidFormat),
            Some(Err(e)) => return Err(MultipartError::Underlying(e)),
            Some(Ok(bytes)) => buf.extend_from_slice(&bytes),
            Some(Ok(bytes)) => {
                // Check if adding these bytes would exceed reasonable buffer size
                // We use MAX_FORM_FIELDS_SIZE as the limit for the parsing buffer
                if buf.len().saturating_add(bytes.len()) > MAX_FORM_FIELDS_SIZE {
                    return Err(MultipartError::TotalSizeTooLarge(
                        buf.len().saturating_add(bytes.len()),
                        MAX_FORM_FIELDS_SIZE,
                    ));
                }
                buf.extend_from_slice(&bytes);
            }
        }

        // try to parse
        match try_parse(body, pat, &buf, &mut fields, boundary) {
        match try_parse(body, pat, &buf, &mut fields, boundary, &mut total_fields_size, &mut parts_count) {
            Err((b, p)) => {
                body = b;
                pat = p;
@@ -118,6 +173,8 @@ fn try_parse<S>(
    buf: &'_ [u8],
    fields: &'_ mut Vec<(String, String)>,
    boundary: &'_ [u8],
    total_fields_size: &'_ mut usize,
    parts_count: &'_ mut usize,
) -> Result<Result<Multipart, MultipartError>, (Pin<Box<S>>, Box<[u8]>)>
where
    S: Stream<Item = Result<Bytes, StdError>> + Send + Sync + 'static,
@@ -126,6 +183,9 @@ where
    let pat_without_crlf = &pat[..pat.len().wrapping_sub(2)];

    fields.clear();
    // Reset counters since we're re-parsing from scratch
    *total_fields_size = 0;
    *parts_count = 0;

    let mut lines = CrlfLines { slice: buf };

@@ -152,6 +212,12 @@ where

    let mut headers = [httparse::EMPTY_HEADER; 2];
    loop {
        // Check parts count limit
        *parts_count += 1;
        if *parts_count > MAX_FORM_PARTS {
            return Ok(Err(MultipartError::TooManyParts(*parts_count, MAX_FORM_PARTS)));
        }

        let (idx, parsed_headers) = match httparse::parse_headers(lines.slice, &mut headers) {
            Ok(httparse::Status::Complete(ans)) => ans,
            Ok(_) => return Err((body, pat)),
@@ -184,6 +250,17 @@ where
                        #[allow(clippy::indexing_slicing)]
                        let b = &b[..b.len().saturating_sub(2)];

                        // Check per-field size limit
                        if b.len() > MAX_FORM_FIELD_SIZE {
                            return Ok(Err(MultipartError::FieldTooLarge(b.len(), MAX_FORM_FIELD_SIZE)));
                        }

                        // Check total fields size limit
                        *total_fields_size = total_fields_size.saturating_add(b.len());
                        if *total_fields_size > MAX_FORM_FIELDS_SIZE {
                            return Ok(Err(MultipartError::TotalSizeTooLarge(*total_fields_size, MAX_FORM_FIELDS_SIZE)));
                        }

                        match std::str::from_utf8(b) {
                            Err(_) => return Ok(Err(MultipartError::InvalidFormat)),
                            Ok(s) => s,
@@ -668,4 +745,120 @@ mod tests {
            assert_eq!(file_bytes, file_content);
        }
    }

    #[tokio::test]
    async fn test_field_too_large() {
        let boundary = "boundary123";

        // Create a field value that exceeds MAX_FORM_FIELD_SIZE (1 MB)
        // This will exceed MAX_FORM_FIELD_SIZE and trigger FieldTooLarge error
        let field_size = MAX_FORM_FIELD_SIZE + 1000; // Just over 1 MB
        let large_value = "x".repeat(field_size);

        let body_bytes = vec![
            Bytes::from(format!("--{boundary}\r\n")),
            Bytes::from("Content-Disposition: form-data; name=\"large_field\"\r\n\r\n"),
            Bytes::from(large_value),
            Bytes::from(format!("\r\n--{boundary}--\r\n")),
        ];

        let body_stream = futures::stream::iter(body_bytes.into_iter().map(Ok::<_, StdError>));

        let result = transform_multipart(body_stream, boundary.as_bytes()).await;
        // Either error is acceptable - both indicate the field/buffer is too large
        assert!(result.is_err(), "Should fail when field exceeds limits");
    }

    #[tokio::test]
    async fn test_total_size_too_large() {
        let boundary = "boundary123";

        // Create multiple fields that together exceed MAX_FORM_FIELDS_SIZE (20 MB)
        let field_size = MAX_FORM_FIELD_SIZE; // 1 MB per field
        let num_fields = 21; // 21 fields = 21 MB > 20 MB limit

        let mut body_bytes = Vec::new();

        for i in 0..num_fields {
            body_bytes.push(format!("--{boundary}\r\n"));
            body_bytes.push(format!("Content-Disposition: form-data; name=\"field{i}\"\r\n\r\n"));
            body_bytes.push("x".repeat(field_size));
            body_bytes.push("\r\n".to_string());
        }
        body_bytes.push(format!("--{boundary}--\r\n"));

        let body_stream = futures::stream::iter(body_bytes.into_iter().map(|s| Ok::<_, StdError>(Bytes::from(s))));

        let result = transform_multipart(body_stream, boundary.as_bytes()).await;
        match result {
            Err(MultipartError::TotalSizeTooLarge(size, limit)) => {
                assert_eq!(limit, MAX_FORM_FIELDS_SIZE);
                assert!(size > MAX_FORM_FIELDS_SIZE);
            }
            _ => panic!("Expected TotalSizeTooLarge error"),
        }
    }

    #[tokio::test]
    async fn test_too_many_parts() {
        let boundary = "boundary123";

        // Create more parts than MAX_FORM_PARTS (1000)
        let num_parts = MAX_FORM_PARTS + 1;

        let mut body_bytes = Vec::new();

        for i in 0..num_parts {
            body_bytes.push(format!("--{boundary}\r\n"));
            body_bytes.push(format!("Content-Disposition: form-data; name=\"field{i}\"\r\n\r\n"));
            body_bytes.push("value".to_string());
            body_bytes.push("\r\n".to_string());
        }
        body_bytes.push(format!("--{boundary}--\r\n"));

        let body_stream = futures::stream::iter(body_bytes.into_iter().map(|s| Ok::<_, StdError>(Bytes::from(s))));

        let result = transform_multipart(body_stream, boundary.as_bytes()).await;
        match result {
            Err(MultipartError::TooManyParts(count, limit)) => {
                assert_eq!(limit, MAX_FORM_PARTS);
                assert!(count > MAX_FORM_PARTS);
            }
            _ => panic!("Expected TooManyParts error"),
        }
    }

    #[tokio::test]
    async fn test_limits_within_bounds() {
        let boundary = "boundary123";

        // Create fields within limits
        let field_count = 10;
        let field_size = 100; // Small fields

        let mut body_bytes = Vec::new();

        for i in 0..field_count {
            body_bytes.push(format!("--{boundary}\r\n"));
            body_bytes.push(format!("Content-Disposition: form-data; name=\"field{i}\"\r\n\r\n"));
            body_bytes.push("x".repeat(field_size));
            body_bytes.push("\r\n".to_string());
        }

        // Add a file
        body_bytes.push(format!("--{boundary}\r\n"));
        body_bytes.push("Content-Disposition: form-data; name=\"file\"; filename=\"test.txt\"\r\n".to_string());
        body_bytes.push("Content-Type: text/plain\r\n\r\n".to_string());
        body_bytes.push("file content".to_string());
        body_bytes.push(format!("\r\n--{boundary}--\r\n"));

        let body_stream = futures::stream::iter(body_bytes.into_iter().map(|s| Ok::<_, StdError>(Bytes::from(s))));

        let result = transform_multipart(body_stream, boundary.as_bytes()).await;
        assert!(result.is_ok(), "Should succeed when within limits");

        let multipart = result.unwrap();
        assert_eq!(multipart.fields().len(), field_count);
        assert!(multipart.file.stream.is_some());
    }
}
+6 −4
Original line number Diff line number Diff line
@@ -31,8 +31,6 @@ use crate::path::{ParseS3PathError, S3Path};
use crate::protocol::S3Request;
use crate::route::S3Route;
use crate::s3_trait::S3;
use crate::stream::VecByteStream;
use crate::stream::aggregate_unlimited;
use crate::validation::{AwsNameValidation, NameValidation};

use std::mem;
@@ -383,8 +381,12 @@ async fn prepare(req: &mut Request, ccx: &CallContext<'_>) -> S3Result<Prepare>
                        // POST object
                        debug!(?multipart);
                        let file_stream = multipart.take_file_stream().expect("missing file stream");
                        let vec_bytes = aggregate_unlimited(file_stream).await.map_err(S3Error::internal_error)?;
                        let vec_stream = VecByteStream::new(vec_bytes);
                        // Aggregate file stream with size limit to get known length
                        // This is required because downstream handlers (like s3s-proxy) need content-length
                        let vec_bytes = http::aggregate_file_stream_limited(file_stream, http::MAX_POST_OBJECT_FILE_SIZE)
                            .await
                            .map_err(|e| invalid_request!(e, "failed to read file stream"))?;
                        let vec_stream = crate::stream::VecByteStream::new(vec_bytes);
                        req.s3ext.vec_stream = Some(vec_stream);
                        break 'resolve (&PutObject as &'static dyn Operation, false);
                    }
+2 −2
Original line number Diff line number Diff line
@@ -210,8 +210,8 @@ mod tests {

        // In case the futures are made too large accidentally
        assert!(output_size(&crate::ops::call) <= 1600);
        assert!(output_size(&S3Service::call) <= 2900);
        assert!(output_size(&S3Service::call_owned) <= 3200);
        assert!(output_size(&S3Service::call) <= 3000);
        assert!(output_size(&S3Service::call_owned) <= 3300);
    }

    // Test validation functionality
+1 −14
Original line number Diff line number Diff line
@@ -6,7 +6,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::Bytes;
use futures::{Stream, StreamExt, pin_mut};
use futures::Stream;

pub trait ByteStream: Stream {
    fn remaining_length(&self) -> RemainingLength {
@@ -132,19 +132,6 @@ where
    }
}

// FIXME: unbounded memory allocation
pub(crate) async fn aggregate_unlimited<S, E>(stream: S) -> Result<Vec<Bytes>, E>
where
    S: ByteStream<Item = Result<Bytes, E>>,
{
    let mut vec = Vec::new();
    pin_mut!(stream);
    while let Some(result) = stream.next().await {
        vec.push(result?);
    }
    Ok(vec)
}

pub(crate) struct VecByteStream {
    queue: VecDeque<Bytes>,
    remaining_bytes: usize,