Unverified Commit c34135ff authored by Copilot's avatar Copilot Committed by GitHub
Browse files

fix(s3s-fs): fix ListObjectsV2 response fields causing OpenDAL hang and...


fix(s3s-fs): fix ListObjectsV2 response fields causing OpenDAL hang and improve implementation (#351)

* Initial plan

* Add test case to reproduce OpenDAL list hang issue

Co-authored-by: default avatarNugine <30099658+Nugine@users.noreply.github.com>

* Fix ListObjectsV2 KeyCount, MaxKeys, and IsTruncated fields

- KeyCount now includes both Contents and CommonPrefixes
- MaxKeys uses input parameter or defaults to 1000 (AWS default)
- IsTruncated is set to false (no pagination support)

This fixes OpenDAL hanging when listing directories, as it was
interpreting missing IsTruncated as needing to paginate.

Co-authored-by: default avatarNugine <30099658+Nugine@users.noreply.github.com>

* Add is_truncated to ListObjects v1 and format code

Co-authored-by: default avatarNugine <30099658+Nugine@users.noreply.github.com>

* Refactor list_objects_v2: extract helper, enforce max_keys, improve efficiency

- Extract list_objects_recursive helper to reduce code duplication
- Properly enforce max_keys limit on combined results
- Correctly set is_truncated based on whether results were limited
- Interleave Contents and CommonPrefixes in sorted order
- More efficient filtering using retain() instead of iterator chains
- Add test for max_keys enforcement

Co-authored-by: default avatarNugine <30099658+Nugine@users.noreply.github.com>

* clippy fix

* style fix

---------

Co-authored-by: default avatarcopilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: default avatarNugine <30099658+Nugine@users.noreply.github.com>
Co-authored-by: default avatarNugine <nugine@foxmail.com>
parent 9deb9e4c
Loading
Loading
Loading
Loading
+101 −59
Original line number Diff line number Diff line
@@ -351,6 +351,7 @@ impl S3 for FileSystem {
            name: v2.name,
            prefix: v2.prefix,
            max_keys: v2.max_keys,
            is_truncated: v2.is_truncated,
            ..Default::default()
        }))
    }
@@ -366,86 +367,87 @@ impl S3 for FileSystem {

        let delimiter = input.delimiter.as_deref();
        let prefix = input.prefix.as_deref().unwrap_or("").trim_start_matches('/');
        let max_keys = input.max_keys.unwrap_or(1000);

        // Collect all matching objects and common prefixes
        let mut objects: Vec<Object> = default();
        let mut common_prefixes = std::collections::BTreeSet::new();

        if let Some(delimiter) = delimiter {
            // When delimiter is provided, only list immediate contents (non-recursive)
            self.list_objects_with_delimiter(&path, prefix, delimiter, &mut objects, &mut common_prefixes)
                .await?;
        } else {
            // When no delimiter, do recursive listing (current behavior)
            let mut dir_queue: VecDeque<PathBuf> = default();
            dir_queue.push_back(path.clone());
            let prefix_is_empty = prefix.is_empty();

            while let Some(dir) = dir_queue.pop_front() {
                let mut iter = try_!(fs::read_dir(dir).await);
                while let Some(entry) = try_!(iter.next_entry().await) {
                    let file_type = try_!(entry.file_type().await);
                    if file_type.is_dir() {
                        dir_queue.push_back(entry.path());
                    } else {
                        let file_path = entry.path();
                        let key = try_!(file_path.strip_prefix(&path));
                        let Some(key_str) = normalize_path(key, "/") else {
                            continue;
                        };

                        if !prefix_is_empty && !key_str.starts_with(prefix) {
                            continue;
                        }

                        let metadata = try_!(entry.metadata().await);
                        let last_modified = Timestamp::from(try_!(metadata.modified()));
                        let size = metadata.len();

                        let object = Object {
                            key: Some(key_str),
                            last_modified: Some(last_modified),
                            size: Some(try_!(i64::try_from(size))),
                            ..Default::default()
                        };
                        objects.push(object);
                    }
                }
            }
            self.list_objects_recursive(&path, prefix, &mut objects).await?;
        }

        // Sort before filtering and limiting
        objects.sort_by(|lhs, rhs| {
            let lhs_key = lhs.key.as_deref().unwrap_or("");
            let rhs_key = rhs.key.as_deref().unwrap_or("");
            lhs_key.cmp(rhs_key)
        });

        let objects = if let Some(marker) = &input.start_after {
            objects
                .into_iter()
                .skip_while(|n| n.key.as_deref().unwrap_or("") <= marker.as_str())
                .collect()
        } else {
            objects
        };
        // Apply start_after filter if provided
        if let Some(marker) = &input.start_after {
            objects.retain(|obj| obj.key.as_deref().unwrap_or("") > marker.as_str());
        }

        let common_prefixes_list = if common_prefixes.is_empty() {
            None
        } else {
            Some(
                common_prefixes
        // Convert common_prefixes to sorted list
        let common_prefixes_list: Vec<CommonPrefix> = common_prefixes
            .into_iter()
            .map(|prefix| CommonPrefix { prefix: Some(prefix) })
                    .collect(),
            )
        };
            .collect();

        // Limit results to max_keys by interleaving objects and common_prefixes
        let mut result_objects = Vec::new();
        let mut result_prefixes = Vec::new();
        let mut total_count = 0;
        let max_keys_usize = usize::try_from(max_keys).unwrap_or(1000);

        let mut obj_idx = 0;
        let mut prefix_idx = 0;

        while total_count < max_keys_usize {
            let obj_key = objects.get(obj_idx).and_then(|o| o.key.as_deref());
            let prefix_key = common_prefixes_list.get(prefix_idx).and_then(|p| p.prefix.as_deref());

            match (obj_key, prefix_key) {
                (Some(ok), Some(pk)) => {
                    if ok < pk {
                        result_objects.push(objects[obj_idx].clone());
                        obj_idx += 1;
                    } else {
                        result_prefixes.push(common_prefixes_list[prefix_idx].clone());
                        prefix_idx += 1;
                    }
                    total_count += 1;
                }
                (Some(_), None) => {
                    result_objects.push(objects[obj_idx].clone());
                    obj_idx += 1;
                    total_count += 1;
                }
                (None, Some(_)) => {
                    result_prefixes.push(common_prefixes_list[prefix_idx].clone());
                    prefix_idx += 1;
                    total_count += 1;
                }
                (None, None) => break,
            }
        }

        let key_count = try_!(i32::try_from(objects.len()));
        let is_truncated = obj_idx < objects.len() || prefix_idx < common_prefixes_list.len();
        let key_count = try_!(i32::try_from(total_count));

        let contents = result_objects.is_empty().not().then_some(result_objects);
        let common_prefixes = result_prefixes.is_empty().not().then_some(result_prefixes);

        let output = ListObjectsV2Output {
            key_count: Some(key_count),
            max_keys: Some(key_count),
            contents: Some(objects),
            common_prefixes: common_prefixes_list,
            max_keys: Some(max_keys),
            is_truncated: Some(is_truncated),
            contents,
            common_prefixes,
            delimiter: input.delimiter,
            encoding_type: input.encoding_type,
            name: Some(input.bucket),
@@ -900,6 +902,46 @@ impl S3 for FileSystem {
}

impl FileSystem {
    async fn list_objects_recursive(&self, bucket_root: &Path, prefix: &str, objects: &mut Vec<Object>) -> S3Result<()> {
        let mut dir_queue: VecDeque<PathBuf> = default();
        dir_queue.push_back(bucket_root.to_owned());
        let prefix_is_empty = prefix.is_empty();

        while let Some(dir) = dir_queue.pop_front() {
            let mut iter = try_!(fs::read_dir(dir).await);
            while let Some(entry) = try_!(iter.next_entry().await) {
                let file_type = try_!(entry.file_type().await);
                if file_type.is_dir() {
                    dir_queue.push_back(entry.path());
                } else {
                    let file_path = entry.path();
                    let key = try_!(file_path.strip_prefix(bucket_root));
                    let Some(key_str) = normalize_path(key, "/") else {
                        continue;
                    };

                    if !prefix_is_empty && !key_str.starts_with(prefix) {
                        continue;
                    }

                    let metadata = try_!(entry.metadata().await);
                    let last_modified = Timestamp::from(try_!(metadata.modified()));
                    let size = metadata.len();

                    let object = Object {
                        key: Some(key_str),
                        last_modified: Some(last_modified),
                        size: Some(try_!(i64::try_from(size))),
                        ..Default::default()
                    };
                    objects.push(object);
                }
            }
        }

        Ok(())
    }

    async fn list_objects_with_delimiter(
        &self,
        bucket_root: &Path,
+44 −0
Original line number Diff line number Diff line
@@ -339,6 +339,50 @@ async fn test_list_objects_v1_with_prefixes() -> Result<()> {
    Ok(())
}

#[tokio::test]
#[tracing::instrument]
async fn test_list_objects_v2_max_keys() -> Result<()> {
    let c = Client::new(config());
    let bucket = format!("test-max-keys-{}", Uuid::new_v4());
    let bucket_str = bucket.as_str();
    create_bucket(&c, bucket_str).await?;

    // Create 10 files
    let content = "test";
    for i in 0..10 {
        let key = format!("file{i:02}.txt");
        c.put_object()
            .bucket(bucket_str)
            .key(key)
            .body(ByteStream::from_static(content.as_bytes()))
            .send()
            .await?;
    }

    // Test max_keys=5
    let result = c.list_objects_v2().bucket(bucket_str).max_keys(5).send().await;
    let response = log_and_unwrap!(result);

    // Should return exactly 5 objects
    let contents: Vec<_> = response.contents().iter().filter_map(|obj| obj.key()).collect();
    assert_eq!(contents.len(), 5, "Expected 5 objects, got {}", contents.len());
    assert_eq!(response.key_count(), Some(5));
    assert_eq!(response.max_keys(), Some(5));
    assert_eq!(response.is_truncated(), Some(true), "Should be truncated");

    // Test max_keys=20 (more than available)
    let result = c.list_objects_v2().bucket(bucket_str).max_keys(20).send().await;
    let response = log_and_unwrap!(result);

    let contents: Vec<_> = response.contents().iter().filter_map(|obj| obj.key()).collect();
    assert_eq!(contents.len(), 10, "Expected 10 objects, got {}", contents.len());
    assert_eq!(response.key_count(), Some(10));
    assert_eq!(response.max_keys(), Some(20));
    assert_eq!(response.is_truncated(), Some(false), "Should not be truncated");

    Ok(())
}

#[tokio::test]
#[tracing::instrument]
async fn test_single_object() -> Result<()> {
+16 −0
Original line number Diff line number Diff line
@@ -247,6 +247,22 @@ async fn test_list() -> Result<()> {
    Ok(())
}

#[tokio::test]
#[tracing::instrument]
async fn test_write_and_list_root() -> Result<()> {
    let _guard = serial().await;
    ensure_server_ready().await;

    let op = create_operator();
    log_and_unwrap!(op.write("a", "test").await);
    log_and_unwrap!(op.list("/").await);

    // Clean up
    log_and_unwrap!(op.delete("a").await);

    Ok(())
}

#[tokio::test]
#[tracing::instrument]
async fn test_delete_non_existent() -> Result<()> {