Unverified Commit 43aa13bc authored by Nugine's avatar Nugine Committed by GitHub
Browse files

s3s-fs checksum support (#76)

* s3s-fs: internal info

* s3s-fs: checksum

* deps: reduce features
parent ed6aac48
Loading
Loading
Loading
Loading
+7 −2
Original line number Diff line number Diff line
@@ -22,9 +22,12 @@ base64-simd = "0.8.0"
bytes = "1.4.0"
chrono = { version = "0.4.26", default-features = false, features = ["std", "clock"] }
clap = { version = "4.3.17", optional = true, features = ["derive"] }
crc32c = "0.6.4"
crc32fast = "1.3.2"
digest = "0.10.7"
futures = "0.3.28"
hex-simd = "0.8.0"
hyper = { version = "0.14.27", optional = true, features = ["full"] }
hyper = { version = "0.14.27", optional = true, features = ["http1", "http2", "server", "stream", "runtime"] }
md-5 = "0.10.5"
mime = "0.3.17"
nugine-rust-utils = "0.3.1"
@@ -32,6 +35,8 @@ numeric_cast = "0.2.1"
path-absolutize = "3.1.0"
s3s = { version = "0.6.2-dev", path = "../s3s" }
serde_json = "1.0.103"
sha1 = "0.10.5"
sha2 = "0.10.7"
thiserror = "1.0.43"
time = "0.3.23"
tokio = { version = "1.29.1", features = ["fs", "io-util"] }
@@ -44,7 +49,7 @@ uuid = { version = "1.4.1", features = ["v4"] }

[dev-dependencies]
anyhow = { version = "1.0.72", features = ["backtrace"] }
aws-config = "0.56.0"
aws-config = { version = "0.56.0", no-default-features = true }
aws-credential-types = { version = "0.56.0", features = ["test-util"] }
aws-sdk-s3 = "0.29.0"
once_cell = "1.18.0"
+89 −0
Original line number Diff line number Diff line
use std::hash::Hasher;

use digest::Digest;
use numeric_cast::TruncatingCast;
use rust_utils::default::default;

use crate::fs::InternalInfo;

#[derive(Default)]
pub struct ChecksumCalculator {
    pub crc32: Option<crc32fast::Hasher>,
    pub crc32c: Option<crc32c::Crc32cHasher>,
    pub sha1: Option<sha1::Sha1>,
    pub sha256: Option<sha2::Sha256>,
}

impl ChecksumCalculator {
    pub fn update(&mut self, data: &[u8]) {
        if let Some(crc32) = &mut self.crc32 {
            crc32.update(data);
        }
        if let Some(crc32c) = &mut self.crc32c {
            crc32c.write(data);
        }
        if let Some(sha1) = &mut self.sha1 {
            sha1.update(data);
        }
        if let Some(sha256) = &mut self.sha256 {
            sha256.update(data);
        }
    }

    pub fn finalize(self) -> s3s::dto::Checksum {
        let mut ans: s3s::dto::Checksum = default();
        if let Some(crc32) = self.crc32 {
            let sum = crc32.finalize().to_be_bytes();
            ans.checksum_crc32 = Some(base64(&sum));
        }
        if let Some(crc32c) = self.crc32c {
            let sum = crc32c.finish().truncating_cast::<u32>().to_be_bytes();
            ans.checksum_crc32c = Some(base64(&sum));
        }
        if let Some(sha1) = self.sha1 {
            let sum = sha1.finalize();
            ans.checksum_sha1 = Some(base64(sum.as_ref()));
        }
        if let Some(sha256) = self.sha256 {
            let sum = sha256.finalize();
            ans.checksum_sha256 = Some(base64(sum.as_ref()));
        }
        ans
    }
}

fn base64(input: &[u8]) -> String {
    base64_simd::STANDARD.encode_to_string(input)
}

pub fn modify_internal_info(info: &mut serde_json::Map<String, serde_json::Value>, checksum: &s3s::dto::Checksum) {
    if let Some(checksum_crc32) = &checksum.checksum_crc32 {
        info.insert("checksum_crc32".to_owned(), serde_json::Value::String(checksum_crc32.clone()));
    }
    if let Some(checksum_crc32c) = &checksum.checksum_crc32c {
        info.insert("checksum_crc32c".to_owned(), serde_json::Value::String(checksum_crc32c.clone()));
    }
    if let Some(checksum_sha1) = &checksum.checksum_sha1 {
        info.insert("checksum_sha1".to_owned(), serde_json::Value::String(checksum_sha1.clone()));
    }
    if let Some(checksum_sha256) = &checksum.checksum_sha256 {
        info.insert("checksum_sha256".to_owned(), serde_json::Value::String(checksum_sha256.clone()));
    }
}

pub fn from_internal_info(info: &InternalInfo) -> s3s::dto::Checksum {
    let mut ans: s3s::dto::Checksum = default();
    if let Some(checksum_crc32) = info.get("checksum_crc32") {
        ans.checksum_crc32 = Some(checksum_crc32.as_str().unwrap().to_owned());
    }
    if let Some(checksum_crc32c) = info.get("checksum_crc32c") {
        ans.checksum_crc32c = Some(checksum_crc32c.as_str().unwrap().to_owned());
    }
    if let Some(checksum_sha1) = info.get("checksum_sha1") {
        ans.checksum_sha1 = Some(checksum_sha1.as_str().unwrap().to_owned());
    }
    if let Some(checksum_sha256) = info.get("checksum_sha256") {
        ans.checksum_sha256 = Some(checksum_sha256.as_str().unwrap().to_owned());
    }
    ans
}
+25 −0
Original line number Diff line number Diff line
@@ -21,6 +21,8 @@ pub struct FileSystem {
    pub(crate) root: PathBuf,
}

pub(crate) type InternalInfo = serde_json::Map<String, serde_json::Value>;

impl FileSystem {
    pub fn new(root: impl AsRef<Path>) -> Result<Self> {
        let root = env::current_dir()?.join(root).canonicalize()?;
@@ -51,6 +53,12 @@ impl FileSystem {
        self.resolve_abs_path(file_path)
    }

    pub(crate) fn get_internal_info_path(&self, bucket: &str, key: &str) -> Result<PathBuf> {
        let encode = |s: &str| base64_simd::URL_SAFE_NO_PAD.encode_to_string(s);
        let file_path = format!(".bucket-{}.object-{}.internal.json", encode(bucket), encode(key));
        self.resolve_abs_path(file_path)
    }

    /// load metadata from fs
    pub(crate) async fn load_metadata(&self, bucket: &str, key: &str) -> Result<Option<dto::Metadata>> {
        let path = self.get_metadata_path(bucket, key)?;
@@ -70,6 +78,23 @@ impl FileSystem {
        Ok(())
    }

    pub(crate) async fn load_internal_info(&self, bucket: &str, key: &str) -> Result<Option<InternalInfo>> {
        let path = self.get_internal_info_path(bucket, key)?;
        if path.exists().not() {
            return Ok(None);
        }
        let content = fs::read(&path).await?;
        let map = serde_json::from_slice(&content)?;
        Ok(Some(map))
    }

    pub(crate) async fn save_internal_info(&self, bucket: &str, key: &str, info: &InternalInfo) -> Result<()> {
        let path = self.get_internal_info_path(bucket, key)?;
        let content = serde_json::to_vec(info)?;
        fs::write(&path, &content).await?;
        Ok(())
    }

    /// get md5 sum
    pub(crate) async fn get_md5_sum(&self, bucket: &str, key: &str) -> Result<String> {
        let object_path = self.get_object_path(bucket, key)?;
+2 −0
Original line number Diff line number Diff line
@@ -8,11 +8,13 @@
    clippy::wildcard_imports,
    clippy::missing_errors_doc, // TODO: docs
    clippy::let_underscore_untyped,
    clippy::module_name_repetitions,
)]

#[macro_use]
mod error;

mod checksum;
mod fs;
mod s3;
mod utils;
+57 −4
Original line number Diff line number Diff line
use crate::fs::FileSystem;
use crate::fs::InternalInfo;
use crate::utils::*;

use s3s::dto::*;
@@ -198,13 +199,24 @@ impl S3 for FileSystem {
        let object_metadata = self.load_metadata(&input.bucket, &input.key).await?;

        let md5_sum = self.get_md5_sum(&input.bucket, &input.key).await?;
        let e_tag = format!("\"{md5_sum}\"");

        let info = self.load_internal_info(&input.bucket, &input.key).await?;
        let checksum = match &info {
            Some(info) => crate::checksum::from_internal_info(info),
            None => default(),
        };

        let output = GetObjectOutput {
            body: Some(StreamingBlob::wrap(body)),
            content_length: content_length_i64,
            last_modified: Some(last_modified),
            metadata: object_metadata,
            e_tag: Some(format!("\"{md5_sum}\"")),
            e_tag: Some(e_tag),
            checksum_crc32: checksum.checksum_crc32,
            checksum_crc32c: checksum.checksum_crc32c,
            checksum_sha1: checksum.checksum_sha1,
            checksum_sha256: checksum.checksum_sha256,
            ..Default::default()
        };
        Ok(S3Response::new(output))
@@ -388,6 +400,20 @@ impl S3 for FileSystem {

        let Some(body) = body else { return Err(s3_error!(IncompleteBody)) };

        let mut checksum: crate::checksum::ChecksumCalculator = default();
        if input.checksum_crc32.is_some() {
            checksum.crc32 = Some(default());
        }
        if input.checksum_crc32c.is_some() {
            checksum.crc32c = Some(default());
        }
        if input.checksum_sha1.is_some() {
            checksum.sha1 = Some(default());
        }
        if input.checksum_sha256.is_some() {
            checksum.sha256 = Some(default());
        }

        if key.ends_with('/') {
            if let Some(len) = content_length {
                if len > 0 {
@@ -406,7 +432,10 @@ impl S3 for FileSystem {
        }

        let mut md5_hash = Md5::new();
        let stream = body.inspect_ok(|bytes| md5_hash.update(bytes.as_ref()));
        let stream = body.inspect_ok(|bytes| {
            md5_hash.update(bytes.as_ref());
            checksum.update(bytes.as_ref());
        });

        let file = try_!(fs::File::create(&object_path).await);
        let mut writer = BufWriter::new(file);
@@ -414,14 +443,38 @@ impl S3 for FileSystem {
        let size = copy_bytes(stream, &mut writer).await?;
        let md5_sum = hex(md5_hash.finalize());

        debug!(path = %object_path.display(), ?size, %md5_sum, "write file");
        let checksum = checksum.finalize();
        if checksum.checksum_crc32 != input.checksum_crc32 {
            return Err(s3_error!(BadDigest, "checksum_crc32 mismatch"));
        }
        if checksum.checksum_crc32c != input.checksum_crc32c {
            return Err(s3_error!(BadDigest, "checksum_crc32c mismatch"));
        }
        if checksum.checksum_sha1 != input.checksum_sha1 {
            return Err(s3_error!(BadDigest, "checksum_sha1 mismatch"));
        }
        if checksum.checksum_sha256 != input.checksum_sha256 {
            return Err(s3_error!(BadDigest, "checksum_sha256 mismatch"));
        }

        debug!(path = %object_path.display(), ?size, %md5_sum, ?checksum, "write file");

        if let Some(ref metadata) = metadata {
            self.save_metadata(&bucket, &key, metadata).await?;
        }

        let mut info: InternalInfo = default();
        crate::checksum::modify_internal_info(&mut info, &checksum);
        self.save_internal_info(&bucket, &key, &info).await?;

        let e_tag = format!("\"{md5_sum}\"");

        let output = PutObjectOutput {
            e_tag: Some(format!("\"{md5_sum}\"")),
            e_tag: Some(e_tag),
            checksum_crc32: checksum.checksum_crc32,
            checksum_crc32c: checksum.checksum_crc32c,
            checksum_sha1: checksum.checksum_sha1,
            checksum_sha256: checksum.checksum_sha256,
            ..Default::default()
        };
        Ok(S3Response::new(output))
Loading