Unverified Commit 7b69f211 authored by Adam Cimarosti's avatar Adam Cimarosti Committed by GitHub
Browse files

s3s-fs: fix incomplete uploads by writing via a temp file (#116)

parent aed0cd1e
Loading
Loading
Loading
Loading
+79 −2
Original line number Diff line number Diff line
@@ -7,10 +7,11 @@ use s3s::dto;
use std::env;
use std::ops::Not;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};

use tokio::fs;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::io::{AsyncReadExt, BufWriter};

use md5::{Digest, Md5};
use path_absolutize::Absolutize;
@@ -19,14 +20,35 @@ use uuid::Uuid;
#[derive(Debug)]
pub struct FileSystem {
    pub(crate) root: PathBuf,
    tmp_file_counter: AtomicU64,
}

pub(crate) type InternalInfo = serde_json::Map<String, serde_json::Value>;

fn clean_old_tmp_files(root: &Path) -> std::io::Result<()> {
    let entries = match std::fs::read_dir(root) {
        Ok(entries) => Ok(entries),
        Err(ref io_err) if io_err.kind() == std::io::ErrorKind::NotFound => return Ok(()),
        Err(io_err) => Err(io_err),
    }?;
    for entry in entries {
        let entry = entry?;
        let file_name = entry.file_name();
        let Some(file_name) = file_name.to_str() else { continue };
        // See `FileSystem::write_file`
        if file_name.starts_with(".tmp.") && file_name.ends_with(".internal.part") {
            std::fs::remove_file(entry.path())?;
        }
    }
    Ok(())
}

impl FileSystem {
    pub fn new(root: impl AsRef<Path>) -> Result<Self> {
        let root = env::current_dir()?.join(root).canonicalize()?;
        Ok(Self { root })
        clean_old_tmp_files(&root)?;
        let tmp_file_counter = AtomicU64::new(0);
        Ok(Self { root, tmp_file_counter })
    }

    pub(crate) fn resolve_abs_path(&self, path: impl AsRef<Path>) -> Result<PathBuf> {
@@ -146,4 +168,59 @@ impl FileSystem {
        }
        Ok(())
    }

    /// Write to the filesystem atomically.
    /// This is done by first writing to a temporary location and then moving the file.
    pub(crate) async fn prepare_file_write(&self, bucket: &str, key: &str) -> Result<FileWriter> {
        let final_path = Some(self.get_object_path(bucket, key)?);
        let tmp_name = format!(".tmp.{}.internal.part", self.tmp_file_counter.fetch_add(1, Ordering::SeqCst));
        let tmp_path = self.resolve_abs_path(tmp_name)?;
        let file = File::create(&tmp_path).await?;
        let writer = BufWriter::new(file);
        Ok(FileWriter {
            tmp_path,
            final_path,
            writer,
            clean_tmp: true,
        })
    }
}

pub(crate) struct FileWriter {
    tmp_path: PathBuf,
    final_path: Option<PathBuf>,
    writer: BufWriter<File>,
    clean_tmp: bool,
}

impl FileWriter {
    pub(crate) fn tmp_path(&self) -> &Path {
        &self.tmp_path
    }

    pub(crate) fn final_path(&self) -> &Path {
        self.final_path.as_ref().unwrap()
    }

    pub(crate) fn writer(&mut self) -> &mut BufWriter<File> {
        &mut self.writer
    }

    pub(crate) async fn done(mut self) -> Result<PathBuf> {
        if let Some(final_dir_path) = self.final_path().parent() {
            fs::create_dir_all(&final_dir_path).await?;
        }

        fs::rename(&self.tmp_path, &self.final_path()).await?;
        self.clean_tmp = false;
        Ok(self.final_path.take().unwrap())
    }
}

impl Drop for FileWriter {
    fn drop(&mut self) {
        if self.clean_tmp {
            let _ = std::fs::remove_file(&self.tmp_path);
        }
    }
}
+7 −13
Original line number Diff line number Diff line
@@ -466,10 +466,7 @@ impl S3 for FileSystem {
            return Ok(S3Response::new(output));
        }

        let object_path = self.get_object_path(&bucket, &key)?;
        if let Some(dir_path) = object_path.parent() {
            try_!(fs::create_dir_all(&dir_path).await);
        }
        let mut file_writer = self.prepare_file_write(&bucket, &key).await?;

        let mut md5_hash = Md5::new();
        let stream = body.inspect_ok(|bytes| {
@@ -477,10 +474,9 @@ impl S3 for FileSystem {
            checksum.update(bytes.as_ref());
        });

        let file = try_!(fs::File::create(&object_path).await);
        let mut writer = BufWriter::new(file);
        let size = copy_bytes(stream, file_writer.writer()).await?;
        let object_path = file_writer.done().await?;

        let size = copy_bytes(stream, &mut writer).await?;
        let md5_sum = hex(md5_hash.finalize());

        let checksum = checksum.finalize();
@@ -711,9 +707,7 @@ impl S3 for FileSystem {

        self.delete_upload_id(&upload_id).await?;

        let object_path = self.get_object_path(&bucket, &key)?;
        let file = try_!(fs::File::create(&object_path).await);
        let mut writer = BufWriter::new(file);
        let mut file_writer = self.prepare_file_write(&bucket, &key).await?;

        let mut cnt: i32 = 0;
        for part in multipart_upload.parts.into_iter().flatten() {
@@ -726,12 +720,12 @@ impl S3 for FileSystem {
            let part_path = self.resolve_abs_path(format!(".upload_id-{upload_id}.part-{part_number}"))?;

            let mut reader = try_!(fs::File::open(&part_path).await);
            let size = try_!(tokio::io::copy(&mut reader, &mut writer).await);
            let size = try_!(tokio::io::copy(&mut reader, &mut file_writer.writer()).await);

            debug!(from = %part_path.display(), to = %object_path.display(), ?size, "write file");
            debug!(from = %part_path.display(), tmp = %file_writer.tmp_path().display(), to = %file_writer.final_path().display(), ?size, "write file");
            try_!(fs::remove_file(&part_path).await);
        }
        drop(writer);
        let object_path = file_writer.done().await?;

        let file_size = try_!(fs::metadata(&object_path).await).len();
        let md5_sum = self.get_md5_sum(&bucket, &key).await?;