Unverified Commit 7d38b516 authored by Copilot's avatar Copilot Committed by GitHub
Browse files

Fix: Preserve standard object attributes in s3s-fs (#420)



* Initial plan

* Add test case demonstrating Content-Encoding issue

Co-authored-by: default avatarNugine <30099658+Nugine@users.noreply.github.com>

* Add comprehensive investigation report for Content-Encoding issue

Co-authored-by: default avatarNugine <30099658+Nugine@users.noreply.github.com>

* Address code review feedback: improve line number accuracy and test assertions

Co-authored-by: default avatarNugine <30099658+Nugine@users.noreply.github.com>

* Add note about line numbers being version-specific

Co-authored-by: default avatarNugine <30099658+Nugine@users.noreply.github.com>

* Changes before error encountered

Co-authored-by: default avatarNugine <30099658+Nugine@users.noreply.github.com>

* Fix CI issues: suppress dead_code warnings and fix doc URL formatting

Co-authored-by: default avatarNugine <30099658+Nugine@users.noreply.github.com>

* Remove dead code and merge with main branch

- Removed deprecated load_metadata and save_metadata functions
- Merged main branch resolving conflicts with new tests
- Added if_none_match parameter handling for If-None-Match: * support
- All tests pass including Content-Encoding preservation

Co-authored-by: default avatarNugine <30099658+Nugine@users.noreply.github.com>

* Address code review feedback

- Fixed error handling in set_expires_timestamp to properly handle format errors
- Used consistent closure pattern for get_expires_timestamp calls
- Updated outdated comment in test to reflect that attributes are now preserved
- Added test_multipart_with_attributes to verify attributes are preserved through multipart uploads
- Suppressed clippy::redundant_closure_for_method_calls to maintain code consistency

Co-authored-by: default avatarNugine <30099658+Nugine@users.noreply.github.com>

---------

Co-authored-by: default avatarcopilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: default avatarNugine <30099658+Nugine@users.noreply.github.com>
Co-authored-by: default avatarNugine <nugine@foxmail.com>
parent a048b9e0
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -2979,6 +2979,7 @@ dependencies = [
 "path-absolutize",
 "s3s",
 "s3s-aws",
 "serde",
 "serde_json",
 "std-next",
 "thiserror",
+1 −0
Original line number Diff line number Diff line
@@ -44,6 +44,7 @@ std-next = "0.1.9"
numeric_cast = "0.3.0"
path-absolutize = "3.1.1"
s3s = { version = "0.12.0-rc.5", path = "../s3s" }
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.145"
thiserror = "2.0.17"
time = "0.3.44"
+70 −8
Original line number Diff line number Diff line
@@ -27,6 +27,50 @@ pub struct FileSystem {

pub(crate) type InternalInfo = serde_json::Map<String, serde_json::Value>;

/// Stores standard object attributes alongside user metadata
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub(crate) struct ObjectAttributes {
    /// User-defined metadata (x-amz-meta-*)
    #[serde(skip_serializing_if = "Option::is_none")]
    pub user_metadata: Option<dto::Metadata>,

    /// Standard object attributes
    #[serde(skip_serializing_if = "Option::is_none")]
    pub content_encoding: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub content_type: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub content_disposition: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub content_language: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub cache_control: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub expires: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub website_redirect_location: Option<String>,
}

impl ObjectAttributes {
    /// Convert expires Timestamp to String for storage
    pub fn set_expires_timestamp(&mut self, expires: Option<dto::Timestamp>) {
        self.expires = expires.and_then(|ts| {
            let mut buf = Vec::new();
            match ts.format(dto::TimestampFormat::DateTime, &mut buf) {
                Ok(()) => Some(String::from_utf8_lossy(&buf).into_owned()),
                Err(_) => None,
            }
        });
    }

    /// Parse expires String back to Timestamp
    pub fn get_expires_timestamp(&self) -> Option<dto::Timestamp> {
        self.expires
            .as_ref()
            .and_then(|s| dto::Timestamp::parse(dto::TimestampFormat::DateTime, s).ok())
    }
}

fn clean_old_tmp_files(root: &Path) -> std::io::Result<()> {
    let entries = match std::fs::read_dir(root) {
        Ok(entries) => Ok(entries),
@@ -88,27 +132,45 @@ impl FileSystem {
        self.resolve_abs_path(file_path)
    }

    /// load metadata from fs
    pub(crate) async fn load_metadata(&self, bucket: &str, key: &str, upload_id: Option<Uuid>) -> Result<Option<dto::Metadata>> {
    /// load object attributes from fs (with backward compatibility)
    pub(crate) async fn load_object_attributes(
        &self,
        bucket: &str,
        key: &str,
        upload_id: Option<Uuid>,
    ) -> Result<Option<ObjectAttributes>> {
        let path = self.get_metadata_path(bucket, key, upload_id)?;
        if path.exists().not() {
            return Ok(None);
        }
        let content = fs::read(&path).await?;
        let map = serde_json::from_slice(&content)?;
        Ok(Some(map))

        // Try to deserialize as ObjectAttributes first (new format)
        if let Ok(attrs) = serde_json::from_slice::<ObjectAttributes>(&content) {
            return Ok(Some(attrs));
        }

        // Fall back to old format (just user metadata)
        if let Ok(user_metadata) = serde_json::from_slice::<dto::Metadata>(&content) {
            return Ok(Some(ObjectAttributes {
                user_metadata: Some(user_metadata),
                ..Default::default()
            }));
        }

        Ok(None)
    }

    /// save metadata to fs
    pub(crate) async fn save_metadata(
    /// save object attributes to fs
    pub(crate) async fn save_object_attributes(
        &self,
        bucket: &str,
        key: &str,
        metadata: &dto::Metadata,
        attrs: &ObjectAttributes,
        upload_id: Option<Uuid>,
    ) -> Result<()> {
        let path = self.get_metadata_path(bucket, key, upload_id)?;
        let content = serde_json::to_vec(metadata)?;
        let content = serde_json::to_vec(attrs)?;
        let mut file_writer = self.prepare_file_write(&path).await?;
        file_writer.writer().write_all(&content).await?;
        file_writer.writer().flush().await?;
+60 −17
Original line number Diff line number Diff line
@@ -232,7 +232,7 @@ impl S3 for FileSystem {

        let body = bytes_stream(ReaderStream::with_capacity(file, 4096), content_length_usize);

        let object_metadata = self.load_metadata(&input.bucket, &input.key, None).await?;
        let obj_attrs = self.load_object_attributes(&input.bucket, &input.key, None).await?;

        let md5_sum = self.get_md5_sum(&input.bucket, &input.key).await?;

@@ -244,12 +244,20 @@ impl S3 for FileSystem {
            _ => default(),
        };

        #[allow(clippy::redundant_closure_for_method_calls)]
        let output = GetObjectOutput {
            body: Some(StreamingBlob::wrap(body)),
            content_length: Some(content_length_i64),
            content_range,
            last_modified: Some(last_modified),
            metadata: object_metadata,
            metadata: obj_attrs.as_ref().and_then(|a| a.user_metadata.clone()),
            content_encoding: obj_attrs.as_ref().and_then(|a| a.content_encoding.clone()),
            content_type: obj_attrs.as_ref().and_then(|a| a.content_type.clone()),
            content_disposition: obj_attrs.as_ref().and_then(|a| a.content_disposition.clone()),
            content_language: obj_attrs.as_ref().and_then(|a| a.content_language.clone()),
            cache_control: obj_attrs.as_ref().and_then(|a| a.cache_control.clone()),
            expires: obj_attrs.as_ref().and_then(|a| a.get_expires_timestamp()),
            website_redirect_location: obj_attrs.as_ref().and_then(|a| a.website_redirect_location.clone()),
            e_tag: Some(ETag::Strong(md5_sum)),
            checksum_crc32: checksum.checksum_crc32,
            checksum_crc32c: checksum.checksum_crc32c,
@@ -286,16 +294,20 @@ impl S3 for FileSystem {
        let last_modified = Timestamp::from(try_!(file_metadata.modified()));
        let file_len = file_metadata.len();

        let object_metadata = self.load_metadata(&input.bucket, &input.key, None).await?;

        // TODO: detect content type
        let content_type = ContentType::from("application/octet-stream");
        let obj_attrs = self.load_object_attributes(&input.bucket, &input.key, None).await?;

        #[allow(clippy::redundant_closure_for_method_calls)]
        let output = HeadObjectOutput {
            content_length: Some(try_!(i64::try_from(file_len))),
            content_type: Some(content_type),
            content_type: obj_attrs.as_ref().and_then(|a| a.content_type.clone()),
            content_encoding: obj_attrs.as_ref().and_then(|a| a.content_encoding.clone()),
            content_disposition: obj_attrs.as_ref().and_then(|a| a.content_disposition.clone()),
            content_language: obj_attrs.as_ref().and_then(|a| a.content_language.clone()),
            cache_control: obj_attrs.as_ref().and_then(|a| a.cache_control.clone()),
            expires: obj_attrs.as_ref().and_then(|a| a.get_expires_timestamp()),
            website_redirect_location: obj_attrs.as_ref().and_then(|a| a.website_redirect_location.clone()),
            last_modified: Some(last_modified),
            metadata: object_metadata,
            metadata: obj_attrs.as_ref().and_then(|a| a.user_metadata.clone()),
            ..Default::default()
        };
        Ok(S3Response::new(output))
@@ -459,6 +471,8 @@ impl S3 for FileSystem {

    #[tracing::instrument]
    async fn put_object(&self, req: S3Request<PutObjectInput>) -> S3Result<S3Response<PutObjectOutput>> {
        use crate::fs::ObjectAttributes;

        let mut input = req.input;
        if let Some(ref storage_class) = input.storage_class {
            let is_valid = ["STANDARD", "REDUCED_REDUNDANCY"].contains(&storage_class.as_str());
@@ -474,6 +488,13 @@ impl S3 for FileSystem {
            metadata,
            content_length,
            content_md5,
            content_encoding,
            content_type,
            content_disposition,
            content_language,
            cache_control,
            expires,
            website_redirect_location,
            if_none_match,
            ..
        } = input;
@@ -599,9 +620,19 @@ impl S3 for FileSystem {

        debug!(path = %object_path.display(), ?size, %md5_sum, ?checksum, "write file");

        if let Some(ref metadata) = metadata {
            self.save_metadata(&bucket, &key, metadata, None).await?;
        }
        // Save object attributes (including user metadata and standard attributes)
        let mut obj_attrs = ObjectAttributes {
            user_metadata: metadata,
            content_encoding,
            content_type,
            content_disposition,
            content_language,
            cache_control,
            expires: None,
            website_redirect_location,
        };
        obj_attrs.set_expires_timestamp(expires);
        self.save_object_attributes(&bucket, &key, &obj_attrs, None).await?;

        let mut info: InternalInfo = default();
        crate::checksum::modify_internal_info(&mut info, &checksum);
@@ -624,13 +655,25 @@ impl S3 for FileSystem {
        &self,
        req: S3Request<CreateMultipartUploadInput>,
    ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
        use crate::fs::ObjectAttributes;

        let input = req.input;
        let upload_id = self.create_upload_id(req.credentials.as_ref()).await?;

        if let Some(ref metadata) = input.metadata {
            self.save_metadata(&input.bucket, &input.key, metadata, Some(upload_id))
        // Save object attributes (including user metadata and standard attributes)
        let mut obj_attrs = ObjectAttributes {
            user_metadata: input.metadata,
            content_encoding: input.content_encoding,
            content_type: input.content_type,
            content_disposition: input.content_disposition,
            content_language: input.content_language,
            cache_control: input.cache_control,
            expires: None,
            website_redirect_location: input.website_redirect_location,
        };
        obj_attrs.set_expires_timestamp(input.expires);
        self.save_object_attributes(&input.bucket, &input.key, &obj_attrs, Some(upload_id))
            .await?;
        }

        let output = CreateMultipartUploadOutput {
            bucket: Some(input.bucket),
@@ -822,8 +865,8 @@ impl S3 for FileSystem {

        self.delete_upload_id(&upload_id).await?;

        if let Ok(Some(metadata)) = self.load_metadata(&bucket, &key, Some(upload_id)).await {
            self.save_metadata(&bucket, &key, &metadata, None).await?;
        if let Ok(Some(attrs)) = self.load_object_attributes(&bucket, &key, Some(upload_id)).await {
            self.save_object_attributes(&bucket, &key, &attrs, None).await?;
            let _ = self.delete_metadata(&bucket, &key, Some(upload_id));
        }

+172 −0
Original line number Diff line number Diff line
@@ -793,6 +793,178 @@ async fn test_default_bucket_validation() -> Result<()> {
    Ok(())
}

/// Test that demonstrates the Content-Encoding preservation issue
/// Related: <https://github.com/rustfs/rustfs/issues/1062>
#[tokio::test]
#[tracing::instrument]
async fn test_content_encoding_preservation() -> Result<()> {
    let _guard = serial().await;

    let c = Client::new(config());
    let bucket = format!("test-content-encoding-{}", Uuid::new_v4());
    let bucket = bucket.as_str();
    let key = "compressed.json";

    // Simulated Brotli-compressed JSON content
    let content = b"compressed data here";

    create_bucket(&c, bucket).await?;

    // Upload object with Content-Encoding header
    {
        let body = ByteStream::from_static(content);
        c.put_object()
            .bucket(bucket)
            .key(key)
            .body(body)
            .content_encoding("br") // Brotli compression
            .content_type("application/json")
            .content_disposition("attachment; filename=\"data.json\"")
            .cache_control("max-age=3600")
            .send()
            .await?;

        debug!("Uploaded object with Content-Encoding: br");
    }

    // Retrieve object and verify headers are preserved
    {
        let ans = c.get_object().bucket(bucket).key(key).send().await?;

        // Verify that standard object attributes are now preserved by s3s-fs
        debug!("Retrieved object:");
        debug!("  Content-Encoding: {:?}", ans.content_encoding());
        debug!("  Content-Type: {:?}", ans.content_type());
        debug!("  Content-Disposition: {:?}", ans.content_disposition());
        debug!("  Cache-Control: {:?}", ans.cache_control());

        // All standard attributes should be preserved
        assert_eq!(ans.content_encoding(), Some("br"));
        assert_eq!(ans.content_type(), Some("application/json"));
        assert_eq!(ans.content_disposition(), Some("attachment; filename=\"data.json\""));
        assert_eq!(ans.cache_control(), Some("max-age=3600"));
    }

    // Also test HeadObject
    {
        let ans = c.head_object().bucket(bucket).key(key).send().await?;

        debug!("HeadObject result:");
        debug!("  Content-Encoding: {:?}", ans.content_encoding());
        debug!("  Content-Type: {:?}", ans.content_type());

        // Verify HeadObject also returns the stored attributes
        assert_eq!(ans.content_encoding(), Some("br"));
        assert_eq!(ans.content_type(), Some("application/json"));
    }

    {
        delete_object(&c, bucket, key).await?;
        delete_bucket(&c, bucket).await?;
    }

    Ok(())
}

/// Test that standard object attributes are preserved through multipart uploads
#[tokio::test]
#[tracing::instrument]
async fn test_multipart_with_attributes() -> Result<()> {
    let _guard = serial().await;

    let c = Client::new(config());
    let bucket = format!("test-multipart-attrs-{}", Uuid::new_v4());
    let bucket = bucket.as_str();
    let key = "multipart-with-attrs.json";

    create_bucket(&c, bucket).await?;

    // Create multipart upload with standard attributes
    let upload_id = {
        let ans = c
            .create_multipart_upload()
            .bucket(bucket)
            .key(key)
            .content_encoding("gzip")
            .content_type("application/json")
            .content_disposition("attachment; filename=\"data.json\"")
            .cache_control("public, max-age=7200")
            .send()
            .await?;
        ans.upload_id.unwrap()
    };
    let upload_id = upload_id.as_str();

    // Upload a part
    let content = b"part1 content";
    let upload_parts = {
        let body = ByteStream::from_static(content);
        let part_number = 1;

        let ans = c
            .upload_part()
            .bucket(bucket)
            .key(key)
            .upload_id(upload_id)
            .body(body)
            .part_number(part_number)
            .send()
            .await?;

        let part = CompletedPart::builder()
            .e_tag(ans.e_tag.unwrap_or_default())
            .part_number(part_number)
            .build();

        vec![part]
    };

    // Complete the multipart upload
    {
        let upload = CompletedMultipartUpload::builder().set_parts(Some(upload_parts)).build();

        c.complete_multipart_upload()
            .bucket(bucket)
            .key(key)
            .multipart_upload(upload)
            .upload_id(upload_id)
            .send()
            .await?;
    }

    // Verify attributes were preserved after completing multipart upload
    {
        let ans = c.get_object().bucket(bucket).key(key).send().await?;

        debug!("Retrieved multipart object:");
        debug!("  Content-Encoding: {:?}", ans.content_encoding());
        debug!("  Content-Type: {:?}", ans.content_type());
        debug!("  Content-Disposition: {:?}", ans.content_disposition());
        debug!("  Cache-Control: {:?}", ans.cache_control());

        // Verify all attributes are preserved through multipart upload
        assert_eq!(ans.content_encoding(), Some("gzip"));
        assert_eq!(ans.content_type(), Some("application/json"));
        assert_eq!(ans.content_disposition(), Some("attachment; filename=\"data.json\""));
        assert_eq!(ans.cache_control(), Some("public, max-age=7200"));
    }

    // Also verify with HeadObject
    {
        let ans = c.head_object().bucket(bucket).key(key).send().await?;

        assert_eq!(ans.content_encoding(), Some("gzip"));
        assert_eq!(ans.content_type(), Some("application/json"));
    }

    {
        delete_object(&c, bucket, key).await?;
        delete_bucket(&c, bucket).await?;
    }

    Ok(())
}

#[tokio::test]
#[tracing::instrument]
async fn test_sts_assume_role_not_implemented() -> Result<()> {