Unverified Commit cff9779e authored by Michael Rossberg's avatar Michael Rossberg Committed by GitHub
Browse files

store multipart metadata (#129)

parent 2ff56c28
Loading
Loading
Loading
Loading
+20 −6
Original line number Diff line number Diff line
@@ -74,9 +74,10 @@ impl FileSystem {
    }

    /// resolve metadata path under the virtual root (custom format)
    pub(crate) fn get_metadata_path(&self, bucket: &str, key: &str) -> Result<PathBuf> {
    pub(crate) fn get_metadata_path(&self, bucket: &str, key: &str, upload_id: Option<Uuid>) -> Result<PathBuf> {
        let encode = |s: &str| base64_simd::URL_SAFE_NO_PAD.encode_to_string(s);
        let file_path = format!(".bucket-{}.object-{}.metadata.json", encode(bucket), encode(key));
        let u_ext = upload_id.map(|u| format!(".upload-{u}")).unwrap_or_default();
        let file_path = format!(".bucket-{}.object-{}{u_ext}.metadata.json", encode(bucket), encode(key));
        self.resolve_abs_path(file_path)
    }

@@ -87,8 +88,8 @@ impl FileSystem {
    }

    /// load metadata from fs
    pub(crate) async fn load_metadata(&self, bucket: &str, key: &str) -> Result<Option<dto::Metadata>> {
        let path = self.get_metadata_path(bucket, key)?;
    pub(crate) async fn load_metadata(&self, bucket: &str, key: &str, upload_id: Option<Uuid>) -> Result<Option<dto::Metadata>> {
        let path = self.get_metadata_path(bucket, key, upload_id)?;
        if path.exists().not() {
            return Ok(None);
        }
@@ -98,13 +99,26 @@ impl FileSystem {
    }

    /// save metadata to fs
    pub(crate) async fn save_metadata(&self, bucket: &str, key: &str, metadata: &dto::Metadata) -> Result<()> {
        let path = self.get_metadata_path(bucket, key)?;
    pub(crate) async fn save_metadata(
        &self,
        bucket: &str,
        key: &str,
        metadata: &dto::Metadata,
        upload_id: Option<Uuid>,
    ) -> Result<()> {
        let path = self.get_metadata_path(bucket, key, upload_id)?;
        let content = serde_json::to_vec(metadata)?;
        fs::write(&path, &content).await?;
        Ok(())
    }

    /// remove metadata from fs
    pub(crate) fn delete_metadata(&self, bucket: &str, key: &str, upload_id: Option<Uuid>) -> Result<()> {
        let path = self.get_metadata_path(bucket, key, upload_id)?;
        std::fs::remove_file(path)?;
        Ok(())
    }

    pub(crate) async fn load_internal_info(&self, bucket: &str, key: &str) -> Result<Option<InternalInfo>> {
        let path = self.get_internal_info_path(bucket, key)?;
        if path.exists().not() {
+17 −5
Original line number Diff line number Diff line
@@ -100,9 +100,9 @@ impl S3 for FileSystem {

        debug!(from = %src_path.display(), to = %dst_path.display(), "copy file");

        let src_metadata_path = self.get_metadata_path(bucket, key)?;
        let src_metadata_path = self.get_metadata_path(bucket, key, None)?;
        if src_metadata_path.exists() {
            let dst_metadata_path = self.get_metadata_path(&input.bucket, &input.key)?;
            let dst_metadata_path = self.get_metadata_path(&input.bucket, &input.key, None)?;
            let _ = try_!(fs::copy(src_metadata_path, dst_metadata_path).await);
        }

@@ -225,7 +225,7 @@ impl S3 for FileSystem {

        let body = bytes_stream(ReaderStream::with_capacity(file, 4096), content_length_usize);

        let object_metadata = self.load_metadata(&input.bucket, &input.key).await?;
        let object_metadata = self.load_metadata(&input.bucket, &input.key, None).await?;

        let md5_sum = self.get_md5_sum(&input.bucket, &input.key).await?;
        let e_tag = format!("\"{md5_sum}\"");
@@ -277,7 +277,7 @@ impl S3 for FileSystem {
        let last_modified = Timestamp::from(try_!(file_metadata.modified()));
        let file_len = file_metadata.len();

        let object_metadata = self.load_metadata(&input.bucket, &input.key).await?;
        let object_metadata = self.load_metadata(&input.bucket, &input.key, None).await?;

        // TODO: detect content type
        let content_type = mime::APPLICATION_OCTET_STREAM;
@@ -504,7 +504,7 @@ impl S3 for FileSystem {
        debug!(path = %object_path.display(), ?size, %md5_sum, ?checksum, "write file");

        if let Some(ref metadata) = metadata {
            self.save_metadata(&bucket, &key, metadata).await?;
            self.save_metadata(&bucket, &key, metadata, None).await?;
        }

        let mut info: InternalInfo = default();
@@ -532,6 +532,11 @@ impl S3 for FileSystem {
        let input = req.input;
        let upload_id = self.create_upload_id(req.credentials.as_ref()).await?;

        if let Some(ref metadata) = input.metadata {
            self.save_metadata(&input.bucket, &input.key, metadata, Some(upload_id))
                .await?;
        }

        let output = CreateMultipartUploadOutput {
            bucket: Some(input.bucket),
            key: Some(input.key),
@@ -715,6 +720,11 @@ impl S3 for FileSystem {

        self.delete_upload_id(&upload_id).await?;

        if let Ok(Some(metadata)) = self.load_metadata(&bucket, &key, Some(upload_id)).await {
            self.save_metadata(&bucket, &key, &metadata, None).await?;
            let _ = self.delete_metadata(&bucket, &key, Some(upload_id));
        }

        let object_path = self.get_object_path(&bucket, &key)?;
        let mut file_writer = self.prepare_file_write(&object_path).await?;

@@ -764,6 +774,8 @@ impl S3 for FileSystem {
            return Err(s3_error!(AccessDenied));
        }

        let _ = self.delete_metadata(&bucket, &key, Some(upload_id));

        let prefix = format!(".upload_id-{upload_id}");
        let mut iter = try_!(fs::read_dir(&self.root).await);
        while let Some(entry) = try_!(iter.next_entry().await) {