From f03019083545f10147b8e61288d5215a55207960 Mon Sep 17 00:00:00 2001 From: pirDOL Date: Tue, 30 Dec 2025 19:10:48 +0800 Subject: [PATCH] feat(dfget): add support for digest-manifest (#1561) Signed-off-by: pirDOL --- dragonfly-client-core/src/error/mod.rs | 8 + dragonfly-client/src/bin/dfget/main.rs | 555 ++++++++++++++++++++++++- 2 files changed, 555 insertions(+), 8 deletions(-) diff --git a/dragonfly-client-core/src/error/mod.rs b/dragonfly-client-core/src/error/mod.rs index 889ae1075..568b09690 100644 --- a/dragonfly-client-core/src/error/mod.rs +++ b/dragonfly-client-core/src/error/mod.rs @@ -238,6 +238,14 @@ pub enum DFError { #[error("max number of files to download exceeded: {0}")] MaxDownloadFilesExceeded(usize), + /// DigestManifestParseFailed is the error for digest manifest parsing failure. + #[error("digest manifest {filepath} parse failed: {error}")] + DigestManifestParseFailed { filepath: String, error: String }, + + /// MissingDigestEntry is the error when a non-directory entry has no corresponding digest. + #[error("missing digest for entry: {entry_url}")] + MissingDigestEntry { entry_url: String }, + /// Unsupported is the error for unsupported. #[error("unsupported {0}")] Unsupported(String), diff --git a/dragonfly-client/src/bin/dfget/main.rs b/dragonfly-client/src/bin/dfget/main.rs index 731c50f58..226a0d4c2 100644 --- a/dragonfly-client/src/bin/dfget/main.rs +++ b/dragonfly-client/src/bin/dfget/main.rs @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - use bytesize::ByteSize; use clap::Parser; use dragonfly_api::common::v2::{Download, Hdfs, ObjectStorage, TaskType}; @@ -31,15 +30,16 @@ use dragonfly_client_config::{self, dfdaemon, dfget}; use dragonfly_client_core::error::{ErrorType, OrErr}; use dragonfly_client_core::{Error, Result}; use dragonfly_client_util::{ - fs::fallocate, http::header_vec_to_hashmap, + digest::Algorithm, fs::fallocate, http::header_vec_to_hashmap, http::query_params::default_proxy_rule_filtered_query_params, }; + use glob::Pattern; use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget, ProgressState, ProgressStyle}; use local_ip_address::local_ip; use path_absolutize::*; use percent_encoding::percent_decode_str; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::path::{Component, Path, PathBuf}; use std::str::FromStr; use std::sync::Arc; @@ -83,6 +83,13 @@ Examples: # Download a file from Tencent Cloud Object Storage Service(COS). $ dfget cos:/// -O /tmp/file.txt --storage-access-key-id= --storage-access-key-secret= --storage-endpoint= + + # Download directory with digest verification + $ dfget -O /path/to/output --digest-manifest /path/to/checksums.txt https://example.com/files/ + + # The digest manifest file can be downloaded from the same source + $ dfget -O /tmp/checksums.txt https://example.com/files/checksums.txt + $ dfget -O /path/to/output --digest-manifest /tmp/checksums.txt https://example.com/files/ "#; #[derive(Debug, Parser, Clone)] @@ -163,6 +170,13 @@ struct Args { )] digest: Option, + #[arg( + long = "digest-manifest", + required = false, + help = "Verify the integrity of the downloaded files in directory using the specified digest manifest file. The file format should be compatible with sha256sum output: : . If the digest manifest file format is invalid or file cannot be found in the directory, dfget will fail without triggering download." + )] + digest_manifest: Option, + #[arg( short = 'p', long = "priority", @@ -640,6 +654,120 @@ async fn run(mut args: Args, dfdaemon_download_client: DfdaemonDownloadClient) - download(args, progress_bar, dfdaemon_download_client).await } +/// Parses a digest manifest file and returns a mapping from file paths to their digests. +/// +/// The digest manifest file format should be compatible with sha256sum output: +/// : +/// If the file path in manifest is invalid or duplicate, the parsing will fail and return error. +async fn parse_digest_manifest(url: &Url, manifest_path: &Path) -> Result> { + let content = fs::read_to_string(manifest_path).await.map_err(|err| { + Error::DigestManifestParseFailed { + filepath: manifest_path.to_string_lossy().to_string(), + error: err.to_string(), + } + })?; + + let mut digests = HashMap::new(); + + for (line_num, line) in content.lines().enumerate() { + let line = line.trim(); + if line.is_empty() { + continue; + } + + // Parse line: : + let (digest_part, filepath_part) = match line.split_once(char::is_whitespace) { + Some((digest, filepath)) if !filepath.trim().is_empty() => (digest, filepath), + Some(_) | None => { + return Err(Error::DigestManifestParseFailed { + filepath: manifest_path.to_string_lossy().to_string(), + error: format!( + "invalid digest manifest format at line {}: expected ': '", + line_num + ), + }); + } + }; + + let mut filepath = filepath_part.trim().to_string(); + + if !digest_part.contains(':') { + return Err(Error::DigestManifestParseFailed { + filepath: manifest_path.to_string_lossy().to_string(), + error: format!( + "invalid digest format at line {}: expected ':'", + line_num + ), + }); + } + + // Validate the digest format using the existing Algorithm::from_str + let (algorithm_str, _digest) = digest_part.split_once(':').unwrap(); + if Algorithm::from_str(algorithm_str).is_err() { + return Err(Error::DigestManifestParseFailed { + filepath: manifest_path.to_string_lossy().to_string(), + error: format!( + "unsupported digest algorithm '{}' at line {}", + algorithm_str, line_num + ), + }); + } + + // Remove trailing '/' from URL if present + let base_url = url.as_str().trim_end_matches('/'); + + // Combine base URL and filepath to form the key + // For absolute paths, use the full absolute path + // For relative paths, combine with base URL after removing './' prefix + let key = if filepath.starts_with('/') { + // Absolute path - use as is + filepath + } else { + // Relative path - remove './' prefix if present and combine with base URL + if filepath.starts_with("./") { + filepath = filepath[2..].to_string(); + } + format!("{}/{}", base_url, filepath) + }; + + digests.insert(key, digest_part.to_string()); + } + + if digests.is_empty() { + Err(Error::DigestManifestParseFailed { + filepath: manifest_path.to_string_lossy().to_string(), + error: "digest manifest file is empty or contains no valid entries".to_string(), + }) + } else { + info!( + "parsed {} digests from manifest {}", + digests.len(), + manifest_path.display() + ); + Ok(digests) + } +} + +/// Checks that all file entries have corresponding digests in the manifest. +/// Note: The caller must ensure that all entries in `file_entries` are non-directory files (is_dir = false). +fn check_every_file_entry_has_a_digest( + file_entries: &[DirEntry], + digests: &HashMap, +) -> Result<()> { + for entry in file_entries { + // Check if the entry URL exists in digests hashmap using exact match + if !digests.contains_key(&entry.url) { + return Err(Error::MissingDigestEntry { + entry_url: entry.url.clone(), + }); + } else { + info!("found digest for file: {}", entry.url); + } + } + + Ok(()) +} + /// Downloads all files in a directory from various storage backends (object storage, HDFS, etc.). /// /// This function handles directory-based downloads by recursively fetching all entries @@ -681,12 +809,27 @@ async fn download_dir(args: Args, download_client: DfdaemonDownloadClient) -> Re return Ok(()); } + // Filter file entries (non-directories) for reuse in subsequent operations + let file_entries: Vec = entries + .iter() + .filter(|entry| !entry.is_dir) + .cloned() + .collect(); + // If the actual file count is greater than the max_files, then reject the downloading. - let count = entries.iter().filter(|entry| !entry.is_dir).count(); - if count > args.max_files { - return Err(Error::MaxDownloadFilesExceeded(count)); + if file_entries.len() > args.max_files { + return Err(Error::MaxDownloadFilesExceeded(file_entries.len())); } + // If digest_manifest is provided, parse and verify digest entries after checking file count limits + let digest_manifest = if let Some(ref digest_manifest_path) = args.digest_manifest { + let digest_manifest = parse_digest_manifest(&args.url, digest_manifest_path).await?; + check_every_file_entry_has_a_digest(&file_entries, &digest_manifest)?; + Some(digest_manifest) + } else { + None + }; + // Initialize the multi progress bar. let multi_progress_bar = if args.no_progress { let multi_progress = MultiProgress::new(); @@ -713,8 +856,16 @@ async fn download_dir(args: Args, download_client: DfdaemonDownloadClient) -> Re })?; } else { let mut entry_args = args.clone(); - entry_args.output = make_output_by_entry(args.url.clone(), &args.output, entry)?; - entry_args.url = entry_url; + entry_args.output = + make_output_by_entry(args.url.clone(), &args.output, entry.clone())?; + entry_args.url = entry_url.clone(); + + // If digest_manifest was provided, set the digest for this specific file + if let Some(ref digest_manifest) = digest_manifest { + if let Some(digest) = digest_manifest.get(&entry.url) { + entry_args.digest = Some(digest.clone()); + } + } let progress_bar = multi_progress_bar.add(ProgressBar::new(0)); let download_client = download_client.clone(); @@ -1822,4 +1973,392 @@ mod tests { .collect::>() ); } + + // Parse digest manifest tests with new function signature and async support + #[tokio::test] + async fn should_parse_digest_manifest_success() { + use std::io::Write; + let temp_dir = tempfile::tempdir().unwrap(); + let manifest_path = temp_dir.path().join("checksums.txt"); + + let url = Url::parse("http://example.com/files/").unwrap(); + + let manifest_content = r#"sha256:f1972cdafdcfe4d06288dfc31e49c86c877c4d530062d05860ec4460cf5697f3 ./file1.txt +sha256:0a2da6af39432ffaf2b8b2bf3516c54f9333c2286c9b4f3f6b4801712985dfa5 ./subdir/file2.txt"#; + + let mut file = std::fs::File::create(&manifest_path).unwrap(); + file.write_all(manifest_content.as_bytes()).unwrap(); + + let result = parse_digest_manifest(&url, &manifest_path).await; + assert!(result.is_ok()); + + let digests = result.unwrap(); + assert_eq!(digests.len(), 2); + assert_eq!( + digests.get("http://example.com/files/file1.txt"), + Some( + &"sha256:f1972cdafdcfe4d06288dfc31e49c86c877c4d530062d05860ec4460cf5697f3" + .to_string() + ) + ); + assert_eq!( + digests.get("http://example.com/files/subdir/file2.txt"), + Some( + &"sha256:0a2da6af39432ffaf2b8b2bf3516c54f9333c2286c9b4f3f6b4801712985dfa5" + .to_string() + ) + ); + } + + #[tokio::test] + async fn should_parse_digest_manifest_path_processing() { + use std::io::Write; + let temp_dir = tempfile::tempdir().unwrap(); + let manifest_path = temp_dir.path().join("path_processing.txt"); + let url = Url::parse("http://example.com/base/").unwrap(); + + let manifest_content = r#"sha256:test1 ./file1.txt +sha256:test2 file2.txt +sha256:test3 subdir/file3.txt"#; + + let mut file = std::fs::File::create(&manifest_path).unwrap(); + file.write_all(manifest_content.as_bytes()).unwrap(); + + let result = parse_digest_manifest(&url, &manifest_path).await; + assert!(result.is_ok()); + + let digests = result.unwrap(); + assert_eq!(digests.len(), 3); + assert!(digests.contains_key("http://example.com/base/file1.txt")); + assert!(digests.contains_key("http://example.com/base/file2.txt")); + assert!(digests.contains_key("http://example.com/base/subdir/file3.txt")); + } + + #[tokio::test] + async fn should_parse_digest_manifest_url_trailing_slash() { + use std::io::Write; + let temp_dir = tempfile::tempdir().unwrap(); + let manifest_path = temp_dir.path().join("trailing_slash.txt"); + let url = Url::parse("http://example.com/base/").unwrap(); // URL has trailing slash + + let manifest_content = "sha256:test1 file.txt"; + + let mut file = std::fs::File::create(&manifest_path).unwrap(); + file.write_all(manifest_content.as_bytes()).unwrap(); + + let result = parse_digest_manifest(&url, &manifest_path).await; + assert!(result.is_ok()); + + let digests = result.unwrap(); + assert_eq!(digests.len(), 1); + // URL trailing slash should be preserved in key generation + assert!(digests.contains_key("http://example.com/base/file.txt")); + } + + #[tokio::test] + async fn should_parse_digest_manifest_missing_filepath() { + use std::io::Write; + let temp_dir = tempfile::tempdir().unwrap(); + let manifest_path = temp_dir.path().join("invalid_checksums.txt"); + let url = Url::parse("http://example.com/files/").unwrap(); + + // Invalid format - missing filepath + let manifest_content = + "sha256:1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"; + + let mut file = std::fs::File::create(&manifest_path).unwrap(); + file.write_all(manifest_content.as_bytes()).unwrap(); + + let result = parse_digest_manifest(&url, &manifest_path).await; + assert!(result.is_err()); + + // Verify the error message is about invalid format + if let Err(Error::DigestManifestParseFailed { filepath: _, error }) = result { + assert!(error.contains("invalid digest manifest format")); + assert!(error.contains("expected ': '")); + } else { + panic!("Expected DigestManifestParseFailed error"); + } + } + + #[tokio::test] + async fn should_parse_digest_manifest_empty_filepath() { + use std::io::Write; + let temp_dir = tempfile::tempdir().unwrap(); + let manifest_path = temp_dir.path().join("empty_filepath.txt"); + let url = Url::parse("http://example.com/files/").unwrap(); + + let manifest_content = + "sha256:1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef "; + + let mut file = std::fs::File::create(&manifest_path).unwrap(); + file.write_all(manifest_content.as_bytes()).unwrap(); + + let result = parse_digest_manifest(&url, &manifest_path).await; + assert!(result.is_err()); + + // Verify the error message is about invalid format + if let Err(Error::DigestManifestParseFailed { filepath: _, error }) = result { + assert!(error.contains("invalid digest manifest format")); + assert!(error.contains("expected ': '")); + } else { + panic!("Expected DigestManifestParseFailed error"); + } + } + + #[tokio::test] + async fn should_parse_digest_manifest_empty_file() { + use std::io::Write; + let temp_dir = tempfile::tempdir().unwrap(); + let manifest_path = temp_dir.path().join("empty_checksums.txt"); + let url = Url::parse("http://example.com/files/").unwrap(); + + let manifest_content = ""; + + let mut file = std::fs::File::create(&manifest_path).unwrap(); + file.write_all(manifest_content.as_bytes()).unwrap(); + + let result = parse_digest_manifest(&url, &manifest_path).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn should_parse_digest_manifest_missing_colon() { + use std::io::Write; + let temp_dir = tempfile::tempdir().unwrap(); + let manifest_path = temp_dir.path().join("missing_colon.txt"); + let url = Url::parse("http://example.com/files/").unwrap(); + + // Missing colon in digest format + let manifest_content = + "sha2561234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef ./file.txt"; + + let mut file = std::fs::File::create(&manifest_path).unwrap(); + file.write_all(manifest_content.as_bytes()).unwrap(); + + let result = parse_digest_manifest(&url, &manifest_path).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn should_parse_digest_manifest_unsupported_algorithm() { + use std::io::Write; + let temp_dir = tempfile::tempdir().unwrap(); + let manifest_path = temp_dir.path().join("invalid_algo.txt"); + let url = Url::parse("http://example.com/files/").unwrap(); + + // Invalid algorithm format + let manifest_content = "invalid-algorithm:1234567890abcdef ./file.txt"; + + let mut file = std::fs::File::create(&manifest_path).unwrap(); + file.write_all(manifest_content.as_bytes()).unwrap(); + + let result = parse_digest_manifest(&url, &manifest_path).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn should_parse_digest_manifest_file_not_found() { + // Test file that doesn't exist + let temp_dir = tempfile::tempdir().unwrap(); + let manifest_path = temp_dir.path().join("non_existent_file.txt"); + let url = Url::parse("http://example.com/files/").unwrap(); + + let result = parse_digest_manifest(&url, &manifest_path).await; + assert!(result.is_err()); + } + + #[test] + fn should_parse_digest_manifest_validate_directory_digests() { + let entries = [ + DirEntry { + url: "http://example.com/files/file1.txt".to_string(), + content_length: 100, + is_dir: false, + }, + DirEntry { + url: "http://example.com/files/subdir/file2.txt".to_string(), + content_length: 200, + is_dir: false, + }, + ]; + + let digests: HashMap = vec![ + ( + "http://example.com/files/file1.txt".to_string(), + "sha256:test1".to_string(), + ), + ( + "http://example.com/files/subdir/file2.txt".to_string(), + "sha256:test2".to_string(), + ), + ] + .into_iter() + .collect(); + + // Note: Now we need to pass only file entries (non-directories) + let file_entries: Vec = entries + .iter() + .filter(|entry| !entry.is_dir) + .cloned() + .collect(); + let result = check_every_file_entry_has_a_digest(&file_entries, &digests); + assert!( + result.is_ok(), + "validation failed with error: {:?}", + result.err() + ); + } + + #[test] + fn should_parse_digest_manifest_fail_validation_when_missing() { + let entries = [ + DirEntry { + url: "http://example.com/files/file1.txt".to_string(), + content_length: 100, + is_dir: false, + }, + DirEntry { + url: "http://example.com/files/file2.txt".to_string(), // Missing in digests + content_length: 200, + is_dir: false, + }, + ]; + + let digests: HashMap = vec![ + ("./file1.txt".to_string(), "sha256:test1".to_string()), + // file2.txt digest is missing + ] + .into_iter() + .collect(); + + // Now we need to pass only file entries (non-directories) to match the function's new signature + // Filter to only file entries as required by the function + let file_entries: Vec = entries + .iter() + .filter(|entry| !entry.is_dir) + .cloned() + .collect(); + let result = check_every_file_entry_has_a_digest(&file_entries, &digests); + assert!(result.is_err()); + } + + #[test] + fn should_parse_digest_manifest_missing_colon_in_digest() { + use std::io::Write; + let temp_dir = tempfile::tempdir().unwrap(); + let manifest_path = temp_dir.path().join("missing_colon.txt"); + + // Missing colon in digest format + let manifest_content = + "sha2561234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef ./file.txt"; + + let mut file = std::fs::File::create(&manifest_path).unwrap(); + file.write_all(manifest_content.as_bytes()).unwrap(); + } + + #[tokio::test] + async fn should_parse_digest_manifest_different_path_formats() { + use std::io::Write; + let temp_dir = tempfile::tempdir().unwrap(); + let manifest_path = temp_dir.path().join("different_paths.txt"); + let url = Url::parse("http://example.com/files/").unwrap(); + + // Test different path formats: absolute, relative with ./, relative without ./, with dots + let manifest_content = r#"sha256:test1 /absolute/path/file1.txt +sha256:test2 ./relative/path/file2.txt +sha256:test3 relative/path/file3.txt +sha256:test4 .hidden/file4.txt"#; + + let mut file = std::fs::File::create(&manifest_path).unwrap(); + file.write_all(manifest_content.as_bytes()).unwrap(); + + let result = parse_digest_manifest(&url, &manifest_path).await; + assert!(result.is_ok()); + + let digests = result.unwrap(); + assert_eq!(digests.len(), 4); + // Absolute paths remain unchanged, relative paths get URL prefix + assert!(digests.contains_key("/absolute/path/file1.txt")); + assert!(digests.contains_key("http://example.com/files/relative/path/file2.txt")); + assert!(digests.contains_key("http://example.com/files/relative/path/file3.txt")); + assert!(digests.contains_key("http://example.com/files/.hidden/file4.txt")); + } + + #[test] + fn should_check_every_file_entry_has_a_digest_success_case() { + // Test success path: all entry URLs exist in digests + let entries = vec![ + DirEntry { + url: "http://example.com/file1.txt".to_string(), + content_length: 100, + is_dir: false, + }, + DirEntry { + url: "http://example.com/path/to/file2.txt".to_string(), + content_length: 200, + is_dir: false, + }, + ]; + + let digests: HashMap = vec![ + ( + "http://example.com/file1.txt".to_string(), + "sha256:digest1".to_string(), + ), + ( + "http://example.com/path/to/file2.txt".to_string(), + "sha256:digest2".to_string(), + ), + ] + .into_iter() + .collect(); + + assert!(check_every_file_entry_has_a_digest(&entries, &digests).is_ok()); + } + + #[test] + fn should_check_every_file_entry_has_a_digest_failure_case() { + // Test failure path: one entry URL is missing in digests + let entries = vec![ + DirEntry { + url: "http://example.com/found.txt".to_string(), + content_length: 100, + is_dir: false, + }, + DirEntry { + url: "http://example.com/missing.txt".to_string(), + content_length: 200, + is_dir: false, + }, + ]; + + let digests: HashMap = vec![ + ( + "http://example.com/found.txt".to_string(), + "sha256:digest1".to_string(), + ), + // missing.txt digest is intentionally omitted + ] + .into_iter() + .collect(); + + let result = check_every_file_entry_has_a_digest(&entries, &digests); + assert!(result.is_err()); + if let Error::MissingDigestEntry { entry_url } = result.unwrap_err() { + assert_eq!(entry_url, "http://example.com/missing.txt"); + } + } + + #[test] + fn should_check_every_file_entry_has_a_digest_empty_entries() { + // Test empty entries path: no files to check + let empty_entries: Vec = vec![]; + let digests: HashMap = + vec![("file1.txt".to_string(), "sha256:digest1".to_string())] + .into_iter() + .collect(); + + assert!(check_every_file_entry_has_a_digest(&empty_entries, &digests).is_ok()); + } }