Loading crates/s3s-fs/src/fs.rs +45 −6 Original line number Diff line number Diff line use crate::error::*; use crate::utils::hex; use s3s::auth::Credentials; use s3s::dto; use std::env; Loading @@ -13,6 +14,7 @@ use tokio::io::AsyncReadExt; use md5::{Digest, Md5}; use path_absolutize::Absolutize; use uuid::Uuid; #[derive(Debug)] pub struct FileSystem { Loading @@ -25,27 +27,28 @@ impl FileSystem { Ok(Self { root }) } pub(crate) fn resolve_abs_path(&self, path: impl AsRef<Path>) -> Result<PathBuf> { Ok(path.as_ref().absolutize_virtually(&self.root)?.into_owned()) } /// resolve object path under the virtual root pub(crate) fn get_object_path(&self, bucket: &str, key: &str) -> Result<PathBuf> { let dir = Path::new(&bucket); let file_path = Path::new(&key); let ans = dir.join(file_path).absolutize_virtually(&self.root)?.into(); Ok(ans) self.resolve_abs_path(dir.join(file_path)) } /// resolve bucket path under the virtual root pub(crate) fn get_bucket_path(&self, bucket: &str) -> Result<PathBuf> { let dir = Path::new(&bucket); let ans = dir.absolutize_virtually(&self.root)?.into(); Ok(ans) self.resolve_abs_path(dir) } /// resolve metadata path under the virtual root (custom format) pub(crate) fn get_metadata_path(&self, bucket: &str, key: &str) -> Result<PathBuf> { let encode = |s: &str| base64_simd::URL_SAFE_NO_PAD.encode_to_string(s); let file_path = format!(".bucket-{}.object-{}.metadata.json", encode(bucket), encode(key)); let ans = Path::new(&file_path).absolutize_virtually(&self.root)?.into(); Ok(ans) self.resolve_abs_path(file_path) } /// load metadata from fs Loading Loading @@ -82,4 +85,40 @@ impl FileSystem { } Ok(hex(md5_hash.finalize())) } fn get_upload_info_path(&self, upload_id: &Uuid) -> Result<PathBuf> { self.resolve_abs_path(format!(".upload-{upload_id}.json")) } pub(crate) async fn create_upload_id(&self, cred: Option<&Credentials>) -> Result<Uuid> { let upload_id = Uuid::new_v4(); let upload_info_path = self.get_upload_info_path(&upload_id)?; let ak: Option<&str> = cred.map(|c| c.access_key.as_str()); let content = serde_json::to_vec(&ak)?; fs::write(&upload_info_path, &content).await?; Ok(upload_id) } pub(crate) async fn verify_upload_id(&self, cred: Option<&Credentials>, upload_id: &Uuid) -> Result<bool> { let upload_info_path = self.get_upload_info_path(upload_id)?; if upload_info_path.exists().not() { return Ok(false); } let content = fs::read(&upload_info_path).await?; let ak: Option<String> = serde_json::from_slice(&content)?; Ok(ak.as_deref() == cred.map(|c| c.access_key.as_str())) } pub(crate) async fn delete_upload_id(&self, upload_id: &Uuid) -> Result<()> { let upload_info_path = self.get_upload_info_path(upload_id)?; if upload_info_path.exists() { fs::remove_file(&upload_info_path).await?; } Ok(()) } } crates/s3s-fs/src/s3.rs +15 −14 Original line number Diff line number Diff line Loading @@ -11,7 +11,6 @@ use std::collections::VecDeque; use std::io; use std::ops::Neg; use std::ops::Not; use std::path::Path; use std::path::PathBuf; use tokio::fs; Loading @@ -22,9 +21,9 @@ use tokio_util::io::ReaderStream; use futures::TryStreamExt; use md5::{Digest, Md5}; use numeric_cast::NumericCast; use path_absolutize::Absolutize; use rust_utils::default::default; use tracing::debug; use uuid::Uuid; #[async_trait::async_trait] impl S3 for FileSystem { Loading Loading @@ -426,12 +425,12 @@ impl S3 for FileSystem { req: S3Request<CreateMultipartUploadInput>, ) -> S3Result<S3Response<CreateMultipartUploadOutput>> { let input = req.input; let upload_id = uuid::Uuid::new_v4().to_string(); let upload_id = self.create_upload_id(req.credentials.as_ref()).await?; let output = CreateMultipartUploadOutput { bucket: Some(input.bucket), key: Some(input.key), upload_id: Some(upload_id), upload_id: Some(upload_id.to_string()), ..Default::default() }; Loading @@ -449,12 +448,12 @@ impl S3 for FileSystem { let body = body.ok_or_else(|| s3_error!(IncompleteBody))?; if uuid::Uuid::parse_str(&upload_id).is_err() { return Err(s3_error!(InvalidRequest)); let upload_id = Uuid::parse_str(&upload_id).map_err(|_| s3_error!(InvalidRequest))?; if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() { return Err(s3_error!(AccessDenied)); } let file_path_str = format!(".upload_id-{upload_id}.part-{part_number}"); let file_path = try_!(Path::new(&file_path_str).absolutize_virtually(&self.root)); let file_path = self.resolve_abs_path(format!(".upload_id-{upload_id}.part-{part_number}"))?; let mut md5_hash = Md5::new(); let stream = body.inspect_ok(|bytes| md5_hash.update(bytes.as_ref())); Loading Loading @@ -489,6 +488,13 @@ impl S3 for FileSystem { let Some(multipart_upload) = multipart_upload else { return Err(s3_error!(InvalidPart)) }; let upload_id = Uuid::parse_str(&upload_id).map_err(|_| s3_error!(InvalidRequest))?; if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() { return Err(s3_error!(AccessDenied)); } self.delete_upload_id(&upload_id).await?; let object_path = self.get_object_path(&bucket, &key)?; let file = try_!(fs::File::create(&object_path).await); let mut writer = BufWriter::new(file); Loading @@ -501,12 +507,7 @@ impl S3 for FileSystem { return Err(s3_error!(InvalidRequest, "invalid part order")); } if uuid::Uuid::parse_str(&upload_id).is_err() { return Err(s3_error!(InvalidRequest)); } let part_path_str = format!(".upload_id-{upload_id}.part-{part_number}"); let part_path = try_!(Path::new(&part_path_str).absolutize_virtually(&self.root)); let part_path = self.resolve_abs_path(format!(".upload_id-{upload_id}.part-{part_number}"))?; let mut reader = try_!(fs::File::open(&part_path).await); let size = try_!(tokio::io::copy(&mut reader, &mut writer).await); Loading Loading
crates/s3s-fs/src/fs.rs +45 −6 Original line number Diff line number Diff line use crate::error::*; use crate::utils::hex; use s3s::auth::Credentials; use s3s::dto; use std::env; Loading @@ -13,6 +14,7 @@ use tokio::io::AsyncReadExt; use md5::{Digest, Md5}; use path_absolutize::Absolutize; use uuid::Uuid; #[derive(Debug)] pub struct FileSystem { Loading @@ -25,27 +27,28 @@ impl FileSystem { Ok(Self { root }) } pub(crate) fn resolve_abs_path(&self, path: impl AsRef<Path>) -> Result<PathBuf> { Ok(path.as_ref().absolutize_virtually(&self.root)?.into_owned()) } /// resolve object path under the virtual root pub(crate) fn get_object_path(&self, bucket: &str, key: &str) -> Result<PathBuf> { let dir = Path::new(&bucket); let file_path = Path::new(&key); let ans = dir.join(file_path).absolutize_virtually(&self.root)?.into(); Ok(ans) self.resolve_abs_path(dir.join(file_path)) } /// resolve bucket path under the virtual root pub(crate) fn get_bucket_path(&self, bucket: &str) -> Result<PathBuf> { let dir = Path::new(&bucket); let ans = dir.absolutize_virtually(&self.root)?.into(); Ok(ans) self.resolve_abs_path(dir) } /// resolve metadata path under the virtual root (custom format) pub(crate) fn get_metadata_path(&self, bucket: &str, key: &str) -> Result<PathBuf> { let encode = |s: &str| base64_simd::URL_SAFE_NO_PAD.encode_to_string(s); let file_path = format!(".bucket-{}.object-{}.metadata.json", encode(bucket), encode(key)); let ans = Path::new(&file_path).absolutize_virtually(&self.root)?.into(); Ok(ans) self.resolve_abs_path(file_path) } /// load metadata from fs Loading Loading @@ -82,4 +85,40 @@ impl FileSystem { } Ok(hex(md5_hash.finalize())) } fn get_upload_info_path(&self, upload_id: &Uuid) -> Result<PathBuf> { self.resolve_abs_path(format!(".upload-{upload_id}.json")) } pub(crate) async fn create_upload_id(&self, cred: Option<&Credentials>) -> Result<Uuid> { let upload_id = Uuid::new_v4(); let upload_info_path = self.get_upload_info_path(&upload_id)?; let ak: Option<&str> = cred.map(|c| c.access_key.as_str()); let content = serde_json::to_vec(&ak)?; fs::write(&upload_info_path, &content).await?; Ok(upload_id) } pub(crate) async fn verify_upload_id(&self, cred: Option<&Credentials>, upload_id: &Uuid) -> Result<bool> { let upload_info_path = self.get_upload_info_path(upload_id)?; if upload_info_path.exists().not() { return Ok(false); } let content = fs::read(&upload_info_path).await?; let ak: Option<String> = serde_json::from_slice(&content)?; Ok(ak.as_deref() == cred.map(|c| c.access_key.as_str())) } pub(crate) async fn delete_upload_id(&self, upload_id: &Uuid) -> Result<()> { let upload_info_path = self.get_upload_info_path(upload_id)?; if upload_info_path.exists() { fs::remove_file(&upload_info_path).await?; } Ok(()) } }
crates/s3s-fs/src/s3.rs +15 −14 Original line number Diff line number Diff line Loading @@ -11,7 +11,6 @@ use std::collections::VecDeque; use std::io; use std::ops::Neg; use std::ops::Not; use std::path::Path; use std::path::PathBuf; use tokio::fs; Loading @@ -22,9 +21,9 @@ use tokio_util::io::ReaderStream; use futures::TryStreamExt; use md5::{Digest, Md5}; use numeric_cast::NumericCast; use path_absolutize::Absolutize; use rust_utils::default::default; use tracing::debug; use uuid::Uuid; #[async_trait::async_trait] impl S3 for FileSystem { Loading Loading @@ -426,12 +425,12 @@ impl S3 for FileSystem { req: S3Request<CreateMultipartUploadInput>, ) -> S3Result<S3Response<CreateMultipartUploadOutput>> { let input = req.input; let upload_id = uuid::Uuid::new_v4().to_string(); let upload_id = self.create_upload_id(req.credentials.as_ref()).await?; let output = CreateMultipartUploadOutput { bucket: Some(input.bucket), key: Some(input.key), upload_id: Some(upload_id), upload_id: Some(upload_id.to_string()), ..Default::default() }; Loading @@ -449,12 +448,12 @@ impl S3 for FileSystem { let body = body.ok_or_else(|| s3_error!(IncompleteBody))?; if uuid::Uuid::parse_str(&upload_id).is_err() { return Err(s3_error!(InvalidRequest)); let upload_id = Uuid::parse_str(&upload_id).map_err(|_| s3_error!(InvalidRequest))?; if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() { return Err(s3_error!(AccessDenied)); } let file_path_str = format!(".upload_id-{upload_id}.part-{part_number}"); let file_path = try_!(Path::new(&file_path_str).absolutize_virtually(&self.root)); let file_path = self.resolve_abs_path(format!(".upload_id-{upload_id}.part-{part_number}"))?; let mut md5_hash = Md5::new(); let stream = body.inspect_ok(|bytes| md5_hash.update(bytes.as_ref())); Loading Loading @@ -489,6 +488,13 @@ impl S3 for FileSystem { let Some(multipart_upload) = multipart_upload else { return Err(s3_error!(InvalidPart)) }; let upload_id = Uuid::parse_str(&upload_id).map_err(|_| s3_error!(InvalidRequest))?; if self.verify_upload_id(req.credentials.as_ref(), &upload_id).await?.not() { return Err(s3_error!(AccessDenied)); } self.delete_upload_id(&upload_id).await?; let object_path = self.get_object_path(&bucket, &key)?; let file = try_!(fs::File::create(&object_path).await); let mut writer = BufWriter::new(file); Loading @@ -501,12 +507,7 @@ impl S3 for FileSystem { return Err(s3_error!(InvalidRequest, "invalid part order")); } if uuid::Uuid::parse_str(&upload_id).is_err() { return Err(s3_error!(InvalidRequest)); } let part_path_str = format!(".upload_id-{upload_id}.part-{part_number}"); let part_path = try_!(Path::new(&part_path_str).absolutize_virtually(&self.root)); let part_path = self.resolve_abs_path(format!(".upload_id-{upload_id}.part-{part_number}"))?; let mut reader = try_!(fs::File::open(&part_path).await); let size = try_!(tokio::io::copy(&mut reader, &mut writer).await); Loading