From ad05cb7f9925bc43fd173504ccfb40d1d80cc0a5 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Thu, 29 Jan 2026 22:23:21 +0800 Subject: [PATCH 01/13] add impl about chunked cache --- core/layers/foyer/src/cached_metadata.rs | 29 ++ core/layers/foyer/src/chunk_utils.rs | 309 ++++++++++++++++++++ core/layers/foyer/src/chunked.rs | 201 +++++++++++++ core/layers/foyer/src/deleter.rs | 2 +- core/layers/foyer/src/full.rs | 2 +- core/layers/foyer/src/lib.rs | 352 +++++++++++++++++++++-- core/layers/foyer/src/writer.rs | 2 +- 7 files changed, 877 insertions(+), 20 deletions(-) create mode 100644 core/layers/foyer/src/cached_metadata.rs create mode 100644 core/layers/foyer/src/chunk_utils.rs create mode 100644 core/layers/foyer/src/chunked.rs diff --git a/core/layers/foyer/src/cached_metadata.rs b/core/layers/foyer/src/cached_metadata.rs new file mode 100644 index 000000000000..9e9664f0e7e5 --- /dev/null +++ b/core/layers/foyer/src/cached_metadata.rs @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Cached metadata for an object in chunked mode. +/// +/// This stores essential information needed for chunk-based reading: +/// - `content_length`: Total size of the object +/// - `version`: Object version (if versioning is enabled) +/// - `etag`: Entity tag for cache validation +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct CachedMetadata { + pub content_length: u64, + pub version: Option, + pub etag: Option, +} diff --git a/core/layers/foyer/src/chunk_utils.rs b/core/layers/foyer/src/chunk_utils.rs new file mode 100644 index 000000000000..2a39c218ba64 --- /dev/null +++ b/core/layers/foyer/src/chunk_utils.rs @@ -0,0 +1,309 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Information about a chunk needed to satisfy a read request. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ChunkInfo { + /// The index of this chunk (0-based) + pub chunk_index: u64, + /// Offset within this chunk where the requested data starts + pub offset_in_chunk: usize, + /// Length of data to read from this chunk + pub length_in_chunk: usize, +} + +/// Align a range to chunk boundaries. +/// +/// Returns the aligned (start, end) positions that cover the original range. +/// The end position is exclusive. +/// +/// # Arguments +/// * `range_start` - Start of the requested range +/// * `range_end` - End of the requested range (exclusive) +/// * `chunk_size` - Size of each chunk +/// * `object_size` - Total size of the object +/// +/// # Returns +/// (aligned_start, aligned_end) - Both aligned to chunk boundaries, clamped to object_size +#[allow(dead_code)] +pub fn align_range(range_start: u64, range_end: u64, chunk_size: usize, object_size: u64) -> (u64, u64) { + let chunk_size = chunk_size as u64; + + // Align start down to chunk boundary + let aligned_start = (range_start / chunk_size) * chunk_size; + + // Align end up to chunk boundary, but don't exceed object size + let aligned_end = if range_end == 0 { + 0 + } else { + let chunks_needed = range_end.div_ceil(chunk_size); + (chunks_needed * chunk_size).min(object_size) + }; + + (aligned_start, aligned_end) +} + +/// Split a range into individual chunk read operations. +/// +/// # Arguments +/// * `range_start` - Start of the requested range (inclusive) +/// * `range_size` - Size of the requested range, or None for "read to end" +/// * `chunk_size` - Size of each chunk +/// * `object_size` - Total size of the object +/// +/// # Returns +/// A vector of `ChunkInfo` describing each chunk that needs to be fetched +/// and how to slice the data from each chunk. +pub fn split_range_into_chunks( + range_start: u64, + range_size: Option, + chunk_size: usize, + object_size: u64, +) -> Vec { + if object_size == 0 { + return vec![]; + } + + let chunk_size_u64 = chunk_size as u64; + let range_end = match range_size { + Some(size) => (range_start + size).min(object_size), + None => object_size, + }; + + if range_start >= range_end { + return vec![]; + } + + let first_chunk_index = range_start / chunk_size_u64; + let last_chunk_index = (range_end - 1) / chunk_size_u64; + + let mut chunks = Vec::with_capacity((last_chunk_index - first_chunk_index + 1) as usize); + + for chunk_index in first_chunk_index..=last_chunk_index { + let chunk_start = chunk_index * chunk_size_u64; + let chunk_end = ((chunk_index + 1) * chunk_size_u64).min(object_size); + + // Calculate the intersection of [range_start, range_end) with [chunk_start, chunk_end) + let read_start = range_start.max(chunk_start); + let read_end = range_end.min(chunk_end); + + if read_start < read_end { + chunks.push(ChunkInfo { + chunk_index, + offset_in_chunk: (read_start - chunk_start) as usize, + length_in_chunk: (read_end - read_start) as usize, + }); + } + } + + chunks +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_align_range_basic() { + // Range within a single chunk + let (start, end) = align_range(100, 200, 1024, 4096); + assert_eq!(start, 0); + assert_eq!(end, 1024); + + // Range spanning multiple chunks + let (start, end) = align_range(1000, 3000, 1024, 4096); + assert_eq!(start, 0); + assert_eq!(end, 3072); + } + + #[test] + fn test_align_range_at_boundaries() { + // Start at chunk boundary + let (start, end) = align_range(1024, 2000, 1024, 4096); + assert_eq!(start, 1024); + assert_eq!(end, 2048); + + // End at chunk boundary + let (start, end) = align_range(100, 2048, 1024, 4096); + assert_eq!(start, 0); + assert_eq!(end, 2048); + + // Both at boundaries + let (start, end) = align_range(1024, 2048, 1024, 4096); + assert_eq!(start, 1024); + assert_eq!(end, 2048); + } + + #[test] + fn test_align_range_clamp_to_object_size() { + // Range extends beyond object + let (start, end) = align_range(3000, 5000, 1024, 4096); + assert_eq!(start, 2048); + assert_eq!(end, 4096); + + // Object smaller than chunk + let (start, end) = align_range(0, 500, 1024, 500); + assert_eq!(start, 0); + assert_eq!(end, 500); + } + + #[test] + fn test_split_range_single_chunk() { + // Read within a single chunk + let chunks = split_range_into_chunks(100, Some(200), 1024, 4096); + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].chunk_index, 0); + assert_eq!(chunks[0].offset_in_chunk, 100); + assert_eq!(chunks[0].length_in_chunk, 200); + } + + #[test] + fn test_split_range_multiple_chunks() { + // Read spanning 3 chunks + let chunks = split_range_into_chunks(500, Some(2000), 1024, 4096); + assert_eq!(chunks.len(), 3); + + // First chunk: 500-1024 (524 bytes) + assert_eq!(chunks[0].chunk_index, 0); + assert_eq!(chunks[0].offset_in_chunk, 500); + assert_eq!(chunks[0].length_in_chunk, 524); + + // Second chunk: 1024-2048 (1024 bytes) + assert_eq!(chunks[1].chunk_index, 1); + assert_eq!(chunks[1].offset_in_chunk, 0); + assert_eq!(chunks[1].length_in_chunk, 1024); + + // Third chunk: 2048-2500 (452 bytes) + assert_eq!(chunks[2].chunk_index, 2); + assert_eq!(chunks[2].offset_in_chunk, 0); + assert_eq!(chunks[2].length_in_chunk, 452); + } + + #[test] + fn test_split_range_read_to_end() { + // Read from offset to end of object + let chunks = split_range_into_chunks(1500, None, 1024, 3000); + assert_eq!(chunks.len(), 2); + + // First chunk: 1500-2048 (548 bytes) + assert_eq!(chunks[0].chunk_index, 1); + assert_eq!(chunks[0].offset_in_chunk, 476); + assert_eq!(chunks[0].length_in_chunk, 548); + + // Second chunk: 2048-3000 (952 bytes) + assert_eq!(chunks[1].chunk_index, 2); + assert_eq!(chunks[1].offset_in_chunk, 0); + assert_eq!(chunks[1].length_in_chunk, 952); + } + + #[test] + fn test_split_range_entire_object() { + // Read entire object + let chunks = split_range_into_chunks(0, None, 1024, 2500); + assert_eq!(chunks.len(), 3); + + assert_eq!(chunks[0].chunk_index, 0); + assert_eq!(chunks[0].offset_in_chunk, 0); + assert_eq!(chunks[0].length_in_chunk, 1024); + + assert_eq!(chunks[1].chunk_index, 1); + assert_eq!(chunks[1].offset_in_chunk, 0); + assert_eq!(chunks[1].length_in_chunk, 1024); + + assert_eq!(chunks[2].chunk_index, 2); + assert_eq!(chunks[2].offset_in_chunk, 0); + assert_eq!(chunks[2].length_in_chunk, 452); + } + + #[test] + fn test_split_range_empty_object() { + let chunks = split_range_into_chunks(0, None, 1024, 0); + assert!(chunks.is_empty()); + } + + #[test] + fn test_split_range_empty_range() { + // Zero-length range + let chunks = split_range_into_chunks(100, Some(0), 1024, 4096); + assert!(chunks.is_empty()); + + // Start equals end + let chunks = split_range_into_chunks(1024, Some(0), 1024, 4096); + assert!(chunks.is_empty()); + } + + #[test] + fn test_split_range_beyond_object() { + // Range starts beyond object + let chunks = split_range_into_chunks(5000, Some(100), 1024, 4096); + assert!(chunks.is_empty()); + + // Range partially beyond object + let chunks = split_range_into_chunks(4000, Some(500), 1024, 4096); + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].chunk_index, 3); + assert_eq!(chunks[0].offset_in_chunk, 928); + assert_eq!(chunks[0].length_in_chunk, 96); + } + + #[test] + fn test_split_range_exact_chunk_boundaries() { + // Read exactly one full chunk + let chunks = split_range_into_chunks(1024, Some(1024), 1024, 4096); + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].chunk_index, 1); + assert_eq!(chunks[0].offset_in_chunk, 0); + assert_eq!(chunks[0].length_in_chunk, 1024); + + // Read exactly two full chunks + let chunks = split_range_into_chunks(0, Some(2048), 1024, 4096); + assert_eq!(chunks.len(), 2); + assert_eq!(chunks[0].length_in_chunk, 1024); + assert_eq!(chunks[1].length_in_chunk, 1024); + } + + #[test] + fn test_split_range_object_not_chunk_aligned() { + // Object size is not a multiple of chunk size + let chunks = split_range_into_chunks(0, None, 1024, 1500); + assert_eq!(chunks.len(), 2); + + assert_eq!(chunks[0].chunk_index, 0); + assert_eq!(chunks[0].length_in_chunk, 1024); + + assert_eq!(chunks[1].chunk_index, 1); + assert_eq!(chunks[1].offset_in_chunk, 0); + assert_eq!(chunks[1].length_in_chunk, 476); // Last chunk is smaller + } + + #[test] + fn test_split_range_last_partial_chunk() { + // Read that ends in the middle of the last partial chunk + let chunks = split_range_into_chunks(1000, Some(400), 1024, 1500); + assert_eq!(chunks.len(), 2); + + // First chunk: 1000-1024 + assert_eq!(chunks[0].chunk_index, 0); + assert_eq!(chunks[0].offset_in_chunk, 1000); + assert_eq!(chunks[0].length_in_chunk, 24); + + // Second chunk: 1024-1400 + assert_eq!(chunks[1].chunk_index, 1); + assert_eq!(chunks[1].offset_in_chunk, 0); + assert_eq!(chunks[1].length_in_chunk, 376); + } +} diff --git a/core/layers/foyer/src/chunked.rs b/core/layers/foyer/src/chunked.rs new file mode 100644 index 000000000000..edce3ebfd407 --- /dev/null +++ b/core/layers/foyer/src/chunked.rs @@ -0,0 +1,201 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use opendal_core::Buffer; +use opendal_core::Result; +use opendal_core::raw::Access; +use opendal_core::raw::BytesContentRange; +use opendal_core::raw::BytesRange; +use opendal_core::raw::OpRead; +use opendal_core::raw::OpStat; +use opendal_core::raw::RpRead; +use opendal_core::raw::oio::Read; + +use crate::FoyerKey; +use crate::FoyerValue; +use crate::Inner; +use crate::cached_metadata::CachedMetadata; +use crate::chunk_utils::split_range_into_chunks; + +/// ChunkedReader reads objects in fixed-size chunks, caching each chunk independently. +/// +/// This enables efficient partial reads of large objects by only fetching and caching +/// the chunks that cover the requested range. +pub struct ChunkedReader { + inner: Arc>, + chunk_size: usize, +} + +impl ChunkedReader { + pub fn new(inner: Arc>, chunk_size: usize) -> Self { + Self { inner, chunk_size } + } + + /// Read data from cache or underlying storage using chunked caching. + /// + /// The algorithm: + /// 1. Fetch object metadata (cached) to determine content_length + /// 2. Compute which chunks cover the requested range + /// 3. Fetch each chunk (from cache or backend) + /// 4. Assemble and slice the result to match the requested range + pub async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Buffer)> { + let version = args.version().map(|v| v.to_string()); + + // Extract range info from args + let range_start = args.range().offset(); + let range_size = args.range().size(); + + // Step 1: Fetch metadata + let metadata = self.fetch_metadata(path, version.clone()).await?; + let object_size = metadata.content_length; + + // Step 2: Compute chunks needed + let chunks = split_range_into_chunks(range_start, range_size, self.chunk_size, object_size); + + if chunks.is_empty() { + let rp = RpRead::new().with_size(Some(0)); + return Ok((rp, Buffer::new())); + } + + // Step 3: Fetch all needed chunks and assemble + let mut result_bufs = Vec::with_capacity(chunks.len()); + let mut total_len = 0u64; + + for chunk_info in &chunks { + let chunk_data = self + .fetch_chunk(path, version.clone(), chunk_info.chunk_index, object_size) + .await?; + + // Slice the chunk to get only the needed portion + let end = (chunk_info.offset_in_chunk + chunk_info.length_in_chunk).min(chunk_data.len()); + let sliced = chunk_data.slice(chunk_info.offset_in_chunk..end); + total_len += sliced.len() as u64; + result_bufs.push(sliced); + } + + // Step 4: Assemble result + let range_end = range_start + total_len; + let range = BytesContentRange::default() + .with_range(range_start, range_end.saturating_sub(1)) + .with_size(object_size); + + let buffer: Buffer = result_bufs + .into_iter() + .flat_map(|b| b.to_bytes()) + .collect::>() + .into(); + + let rp = RpRead::new() + .with_size(Some(buffer.len() as u64)) + .with_range(Some(range)); + + Ok((rp, buffer)) + } + + /// Fetch object metadata from cache or backend. + /// + /// Uses simple get/insert pattern: check cache first, on miss stat from backend + /// and insert into cache. + async fn fetch_metadata( + &self, + path: &str, + version: Option, + ) -> Result { + let key = FoyerKey::Metadata { + path: path.to_string(), + chunk_size: self.chunk_size, + version: version.clone(), + }; + + // Try cache first + if let Ok(Some(entry)) = self.inner.cache.get(&key).await { + let bytes = entry.value().0.to_vec(); + if let Ok(cached) = bincode::deserialize::(&bytes) { + return Ok(cached); + } + // Deserialization failed - cache entry is corrupted, fall through to re-fetch + } + + // Cache miss - fetch from backend + let metadata = self + .inner + .accessor + .stat(path, OpStat::default()) + .await? + .into_metadata(); + + let cached = CachedMetadata { + content_length: metadata.content_length(), + version: metadata.version().map(|v| v.to_string()), + etag: metadata.etag().map(|v| v.to_string()), + }; + + // Serialize and insert into cache + if let Ok(encoded) = bincode::serialize(&cached) { + self.inner + .cache + .insert(key, FoyerValue(Buffer::from(encoded))); + } + + Ok(cached) + } + + /// Fetch a single chunk from cache or backend. + /// + /// Uses simple get/insert pattern: check cache first, on miss read from backend + /// and insert into cache. + async fn fetch_chunk( + &self, + path: &str, + version: Option, + chunk_index: u64, + object_size: u64, + ) -> Result { + let key = FoyerKey::Chunk { + path: path.to_string(), + chunk_size: self.chunk_size, + chunk_index, + version: version.clone(), + }; + + // Try cache first + if let Ok(Some(entry)) = self.inner.cache.get(&key).await { + return Ok(entry.value().0.clone()); + } + + // Cache miss - fetch from backend + let chunk_start = chunk_index * self.chunk_size as u64; + let chunk_end = ((chunk_index + 1) * self.chunk_size as u64).min(object_size); + let range = BytesRange::new(chunk_start, Some(chunk_end - chunk_start)); + + let (_, mut reader) = self + .inner + .accessor + .read(path, OpRead::default().with_range(range)) + .await?; + let buffer = reader.read_all().await?; + + // Insert into cache + self.inner + .cache + .insert(key, FoyerValue(buffer.clone())); + + Ok(buffer) + } +} diff --git a/core/layers/foyer/src/deleter.rs b/core/layers/foyer/src/deleter.rs index 1de6b662ea3a..99fe8b176334 100644 --- a/core/layers/foyer/src/deleter.rs +++ b/core/layers/foyer/src/deleter.rs @@ -44,7 +44,7 @@ impl Deleter { impl oio::Delete for Deleter { async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { self.deleter.delete(path, args.clone()).await?; - self.keys.push(FoyerKey { + self.keys.push(FoyerKey::Full { path: path.to_string(), version: args.version().map(|v| v.to_string()), }); diff --git a/core/layers/foyer/src/full.rs b/core/layers/foyer/src/full.rs index 95772d01214d..9b04067605ff 100644 --- a/core/layers/foyer/src/full.rs +++ b/core/layers/foyer/src/full.rs @@ -70,7 +70,7 @@ impl FullReader { .inner .cache .fetch( - FoyerKey { + FoyerKey::Full { path: path_str.clone(), version: version.clone(), }, diff --git a/core/layers/foyer/src/lib.rs b/core/layers/foyer/src/lib.rs index 06b9c389eb07..344ed68e3550 100644 --- a/core/layers/foyer/src/lib.rs +++ b/core/layers/foyer/src/lib.rs @@ -15,6 +15,9 @@ // specific language governing permissions and limitations // under the License. +mod cached_metadata; +mod chunk_utils; +mod chunked; mod deleter; mod error; mod full; @@ -42,10 +45,32 @@ pub use writer::Writer; /// - If a version is given, the object is cached under that versioned key. /// - If version is not supplied, the object is cached exactly as returned by the backend, /// We do NOT interpret `None` as "latest" and we do not promote it to any other version. +/// +/// # Variants +/// +/// - `Full`: Caches the entire object (used in non-chunked mode) +/// - `Metadata`: Caches object metadata for chunked mode (content_length, version, etag) +/// - `Chunk`: Caches a specific chunk of the object #[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] -pub struct FoyerKey { - pub path: String, - pub version: Option, +pub enum FoyerKey { + /// Cache key for the entire object (non-chunked mode) + Full { + path: String, + version: Option, + }, + /// Cache key for object metadata (chunked mode) + Metadata { + path: String, + chunk_size: usize, + version: Option, + }, + /// Cache key for a specific chunk of the object + Chunk { + path: String, + chunk_size: usize, + chunk_index: u64, + version: Option, + }, } /// [`FoyerValue`] is a wrapper around `Buffer` that implements the `Code` trait. @@ -93,6 +118,18 @@ impl Code for FoyerValue { /// - `delete`: [`FoyerLayer`] will remove the data from the foyer hybrid cache regardless of whether the service's delete operation is successful. /// - Other operations: [`FoyerLayer`] will not cache the results of other operations, such as `list`, `copy`, `rename`, etc. They will be passed through to the underlying accessor without caching. /// +/// # Caching Modes +/// +/// ## Full Mode (default) +/// Caches the entire object as a single cache entry. Best for small to medium-sized objects. +/// +/// ## Chunked Mode +/// When `chunk_size` is set via [`with_chunk_size`](FoyerLayer::with_chunk_size), objects are cached +/// in fixed-size chunks. This improves cache efficiency for large files with partial reads: +/// - Only the chunks covering the requested range are fetched and cached +/// - Subsequent reads of overlapping ranges reuse cached chunks +/// - Each chunk is cached independently, allowing fine-grained cache eviction +/// /// # Examples /// /// ```no_run @@ -108,8 +145,14 @@ impl Code for FoyerValue { /// .build() /// .await?; /// +/// // Full mode (default) /// let op = Operator::new(Memory::default())? -/// .layer(FoyerLayer::new(cache)) +/// .layer(FoyerLayer::new(cache.clone())) +/// .finish(); +/// +/// // Chunked mode with 4MB chunks +/// let op_chunked = Operator::new(Memory::default())? +/// .layer(FoyerLayer::new(cache).with_chunk_size(4 * 1024 * 1024)) /// .finish(); /// # Ok(()) /// # } @@ -122,6 +165,7 @@ impl Code for FoyerValue { pub struct FoyerLayer { cache: HybridCache, size_limit: Range, + chunk_size: Option, } impl FoyerLayer { @@ -130,6 +174,7 @@ impl FoyerLayer { FoyerLayer { cache, size_limit: 0..usize::MAX, + chunk_size: None, } } @@ -150,6 +195,25 @@ impl FoyerLayer { self.size_limit = start..end; self } + + /// Enables chunked caching mode with the specified chunk size. + /// + /// When enabled, objects are cached in fixed-size chunks rather than as whole objects. + /// This is beneficial for: + /// - Large files where only portions are typically read + /// - Random access patterns within files + /// - Reducing memory pressure by caching only needed chunks + /// + /// # Arguments + /// * `chunk_size` - The size of each chunk in bytes. Recommended: 1MB - 16MB. + /// + /// # Note + /// - The `size_limit` setting is ignored in chunked mode + /// - Writer and Deleter still operate in full mode for consistency + pub fn with_chunk_size(mut self, chunk_size: usize) -> Self { + self.chunk_size = Some(chunk_size); + self + } } impl Layer for FoyerLayer { @@ -162,6 +226,7 @@ impl Layer for FoyerLayer { accessor, cache, size_limit: self.size_limit.clone(), + chunk_size: self.chunk_size, }), } } @@ -172,6 +237,7 @@ pub(crate) struct Inner { pub(crate) accessor: A, pub(crate) cache: HybridCache, pub(crate) size_limit: Range, + pub(crate) chunk_size: Option, } #[derive(Debug)] @@ -195,9 +261,18 @@ impl LayeredAccess for FoyerAccessor { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - full::FullReader::new(self.inner.clone(), self.inner.size_limit.clone()) - .read(path, args) - .await + match self.inner.chunk_size { + None => { + full::FullReader::new(self.inner.clone(), self.inner.size_limit.clone()) + .read(path, args) + .await + } + Some(chunk_size) => { + chunked::ChunkedReader::new(self.inner.clone(), chunk_size) + .read(path, args) + .await + } + } } fn write( @@ -379,12 +454,12 @@ mod tests { #[test] fn test_foyer_key_version_none_vs_empty() { - let key_none = FoyerKey { + let key_none = FoyerKey::Full { path: "test/path".to_string(), version: None, }; - let key_empty = FoyerKey { + let key_empty = FoyerKey::Full { path: "test/path".to_string(), version: Some("".to_string()), }; @@ -410,35 +485,59 @@ mod tests { fn test_foyer_key_serde() { use std::io::Cursor; - let test_cases = vec![ - FoyerKey { + let test_cases: Vec = vec![ + FoyerKey::Full { path: "simple".to_string(), version: None, }, - FoyerKey { + FoyerKey::Full { path: "with/slash/path".to_string(), version: None, }, - FoyerKey { + FoyerKey::Full { path: "versioned".to_string(), version: Some("v1.0.0".to_string()), }, - FoyerKey { + FoyerKey::Full { path: "empty-version".to_string(), version: Some("".to_string()), }, - FoyerKey { + FoyerKey::Full { path: "".to_string(), version: None, }, - FoyerKey { + FoyerKey::Full { path: "unicode/θ·―εΎ„/πŸš€".to_string(), version: Some("η‰ˆζœ¬-1".to_string()), }, - FoyerKey { + FoyerKey::Full { path: "long/".to_string().repeat(100), version: Some("long-version-".to_string().repeat(50)), }, + // Test Metadata variant + FoyerKey::Metadata { + path: "meta/path".to_string(), + chunk_size: 1024, + version: None, + }, + FoyerKey::Metadata { + path: "meta/versioned".to_string(), + chunk_size: 4096, + version: Some("v2".to_string()), + }, + // Test Chunk variant + FoyerKey::Chunk { + path: "chunk/path".to_string(), + chunk_size: 1024, + chunk_index: 0, + version: None, + }, + FoyerKey::Chunk { + path: "chunk/versioned".to_string(), + chunk_size: 4096, + chunk_index: 42, + version: Some("v3".to_string()), + }, ]; for original in test_cases { @@ -456,4 +555,223 @@ mod tests { ); } } + + #[tokio::test] + async fn test_chunked_single_chunk() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache).with_chunk_size(1024)) + .finish(); + + let data = vec![42u8; 500]; + op.write("test.bin", data.clone()).await.unwrap(); + + // Read within single chunk + let read_data = op.read_with("test.bin").range(100..300).await.unwrap(); + assert_eq!(read_data.len(), 200); + assert_eq!(read_data.to_vec(), data[100..300]); + } + + #[tokio::test] + async fn test_chunked_multiple_chunks() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache).with_chunk_size(1024)) + .finish(); + + // 3KB file across 3 chunks + let data: Vec = (0..3000).map(|i| (i % 256) as u8).collect(); + op.write("large.bin", data.clone()).await.unwrap(); + + // Read spanning multiple chunks (byte 500 to 2500 covers 3 chunks) + let read_data = op.read_with("large.bin").range(500..2500).await.unwrap(); + assert_eq!(read_data.len(), 2000); + assert_eq!(read_data.to_vec(), data[500..2500]); + } + + #[tokio::test] + async fn test_chunked_full_read() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache).with_chunk_size(1024)) + .finish(); + + let data = vec![123u8; 2500]; + op.write("full.bin", data.clone()).await.unwrap(); + + // Read entire file + let read_data = op.read("full.bin").await.unwrap(); + assert_eq!(read_data.len(), 2500); + assert_eq!(read_data.to_vec(), data); + } + + #[tokio::test] + async fn test_chunked_last_partial_chunk() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache).with_chunk_size(1024)) + .finish(); + + // 1500 bytes = 1 full chunk + 476 bytes + let data: Vec = (0..1500).map(|i| (i % 256) as u8).collect(); + op.write("partial.bin", data.clone()).await.unwrap(); + + // Read from last partial chunk + let read_data = op.read_with("partial.bin").range(1200..1500).await.unwrap(); + assert_eq!(read_data.len(), 300); + assert_eq!(read_data.to_vec(), data[1200..1500]); + } + + #[tokio::test] + async fn test_chunked_cache_hit() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone()).with_chunk_size(1024)) + .finish(); + + let data = vec![99u8; 3000]; + op.write("cached.bin", data.clone()).await.unwrap(); + + // First read to populate cache + let read1 = op.read_with("cached.bin").range(1000..2000).await.unwrap(); + assert_eq!(read1.to_vec(), data[1000..2000]); + + // Second read should hit cache for overlapping chunks + let read2 = op.read_with("cached.bin").range(1500..2500).await.unwrap(); + assert_eq!(read2.to_vec(), data[1500..2500]); + } + + #[tokio::test] + async fn test_chunked_empty_file() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache).with_chunk_size(1024)) + .finish(); + + op.write("empty.bin", Vec::::new()).await.unwrap(); + + let read_data = op.read("empty.bin").await.unwrap(); + assert_eq!(read_data.len(), 0); + } + + #[tokio::test] + async fn test_chunked_exact_boundaries() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache).with_chunk_size(1024)) + .finish(); + + let data = vec![77u8; 3072]; // Exactly 3 chunks + op.write("aligned.bin", data.clone()).await.unwrap(); + + // Read exactly one chunk + let read_data = op.read_with("aligned.bin").range(1024..2048).await.unwrap(); + assert_eq!(read_data.len(), 1024); + assert_eq!(read_data.to_vec(), data[1024..2048]); + } } diff --git a/core/layers/foyer/src/writer.rs b/core/layers/foyer/src/writer.rs index fd505f174232..fc6e9b4ef96f 100644 --- a/core/layers/foyer/src/writer.rs +++ b/core/layers/foyer/src/writer.rs @@ -71,7 +71,7 @@ impl oio::Write for Writer { let metadata = self.w.close().await?; if !self.skip_cache { self.inner.cache.insert( - FoyerKey { + FoyerKey::Full { path: self.path.clone(), version: metadata.version().map(|v| v.to_string()), }, From cbfc5264cdbe30a03e65d6c723c7b25e0def4f99 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Thu, 29 Jan 2026 22:31:00 +0800 Subject: [PATCH 02/13] merge the chunked logic into chunked.rs --- core/layers/foyer/src/cached_metadata.rs | 29 --- core/layers/foyer/src/chunk_utils.rs | 309 ----------------------- core/layers/foyer/src/chunked.rs | 308 +++++++++++++++++++++- core/layers/foyer/src/lib.rs | 2 - 4 files changed, 306 insertions(+), 342 deletions(-) delete mode 100644 core/layers/foyer/src/cached_metadata.rs delete mode 100644 core/layers/foyer/src/chunk_utils.rs diff --git a/core/layers/foyer/src/cached_metadata.rs b/core/layers/foyer/src/cached_metadata.rs deleted file mode 100644 index 9e9664f0e7e5..000000000000 --- a/core/layers/foyer/src/cached_metadata.rs +++ /dev/null @@ -1,29 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -/// Cached metadata for an object in chunked mode. -/// -/// This stores essential information needed for chunk-based reading: -/// - `content_length`: Total size of the object -/// - `version`: Object version (if versioning is enabled) -/// - `etag`: Entity tag for cache validation -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct CachedMetadata { - pub content_length: u64, - pub version: Option, - pub etag: Option, -} diff --git a/core/layers/foyer/src/chunk_utils.rs b/core/layers/foyer/src/chunk_utils.rs deleted file mode 100644 index 2a39c218ba64..000000000000 --- a/core/layers/foyer/src/chunk_utils.rs +++ /dev/null @@ -1,309 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -/// Information about a chunk needed to satisfy a read request. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct ChunkInfo { - /// The index of this chunk (0-based) - pub chunk_index: u64, - /// Offset within this chunk where the requested data starts - pub offset_in_chunk: usize, - /// Length of data to read from this chunk - pub length_in_chunk: usize, -} - -/// Align a range to chunk boundaries. -/// -/// Returns the aligned (start, end) positions that cover the original range. -/// The end position is exclusive. -/// -/// # Arguments -/// * `range_start` - Start of the requested range -/// * `range_end` - End of the requested range (exclusive) -/// * `chunk_size` - Size of each chunk -/// * `object_size` - Total size of the object -/// -/// # Returns -/// (aligned_start, aligned_end) - Both aligned to chunk boundaries, clamped to object_size -#[allow(dead_code)] -pub fn align_range(range_start: u64, range_end: u64, chunk_size: usize, object_size: u64) -> (u64, u64) { - let chunk_size = chunk_size as u64; - - // Align start down to chunk boundary - let aligned_start = (range_start / chunk_size) * chunk_size; - - // Align end up to chunk boundary, but don't exceed object size - let aligned_end = if range_end == 0 { - 0 - } else { - let chunks_needed = range_end.div_ceil(chunk_size); - (chunks_needed * chunk_size).min(object_size) - }; - - (aligned_start, aligned_end) -} - -/// Split a range into individual chunk read operations. -/// -/// # Arguments -/// * `range_start` - Start of the requested range (inclusive) -/// * `range_size` - Size of the requested range, or None for "read to end" -/// * `chunk_size` - Size of each chunk -/// * `object_size` - Total size of the object -/// -/// # Returns -/// A vector of `ChunkInfo` describing each chunk that needs to be fetched -/// and how to slice the data from each chunk. -pub fn split_range_into_chunks( - range_start: u64, - range_size: Option, - chunk_size: usize, - object_size: u64, -) -> Vec { - if object_size == 0 { - return vec![]; - } - - let chunk_size_u64 = chunk_size as u64; - let range_end = match range_size { - Some(size) => (range_start + size).min(object_size), - None => object_size, - }; - - if range_start >= range_end { - return vec![]; - } - - let first_chunk_index = range_start / chunk_size_u64; - let last_chunk_index = (range_end - 1) / chunk_size_u64; - - let mut chunks = Vec::with_capacity((last_chunk_index - first_chunk_index + 1) as usize); - - for chunk_index in first_chunk_index..=last_chunk_index { - let chunk_start = chunk_index * chunk_size_u64; - let chunk_end = ((chunk_index + 1) * chunk_size_u64).min(object_size); - - // Calculate the intersection of [range_start, range_end) with [chunk_start, chunk_end) - let read_start = range_start.max(chunk_start); - let read_end = range_end.min(chunk_end); - - if read_start < read_end { - chunks.push(ChunkInfo { - chunk_index, - offset_in_chunk: (read_start - chunk_start) as usize, - length_in_chunk: (read_end - read_start) as usize, - }); - } - } - - chunks -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_align_range_basic() { - // Range within a single chunk - let (start, end) = align_range(100, 200, 1024, 4096); - assert_eq!(start, 0); - assert_eq!(end, 1024); - - // Range spanning multiple chunks - let (start, end) = align_range(1000, 3000, 1024, 4096); - assert_eq!(start, 0); - assert_eq!(end, 3072); - } - - #[test] - fn test_align_range_at_boundaries() { - // Start at chunk boundary - let (start, end) = align_range(1024, 2000, 1024, 4096); - assert_eq!(start, 1024); - assert_eq!(end, 2048); - - // End at chunk boundary - let (start, end) = align_range(100, 2048, 1024, 4096); - assert_eq!(start, 0); - assert_eq!(end, 2048); - - // Both at boundaries - let (start, end) = align_range(1024, 2048, 1024, 4096); - assert_eq!(start, 1024); - assert_eq!(end, 2048); - } - - #[test] - fn test_align_range_clamp_to_object_size() { - // Range extends beyond object - let (start, end) = align_range(3000, 5000, 1024, 4096); - assert_eq!(start, 2048); - assert_eq!(end, 4096); - - // Object smaller than chunk - let (start, end) = align_range(0, 500, 1024, 500); - assert_eq!(start, 0); - assert_eq!(end, 500); - } - - #[test] - fn test_split_range_single_chunk() { - // Read within a single chunk - let chunks = split_range_into_chunks(100, Some(200), 1024, 4096); - assert_eq!(chunks.len(), 1); - assert_eq!(chunks[0].chunk_index, 0); - assert_eq!(chunks[0].offset_in_chunk, 100); - assert_eq!(chunks[0].length_in_chunk, 200); - } - - #[test] - fn test_split_range_multiple_chunks() { - // Read spanning 3 chunks - let chunks = split_range_into_chunks(500, Some(2000), 1024, 4096); - assert_eq!(chunks.len(), 3); - - // First chunk: 500-1024 (524 bytes) - assert_eq!(chunks[0].chunk_index, 0); - assert_eq!(chunks[0].offset_in_chunk, 500); - assert_eq!(chunks[0].length_in_chunk, 524); - - // Second chunk: 1024-2048 (1024 bytes) - assert_eq!(chunks[1].chunk_index, 1); - assert_eq!(chunks[1].offset_in_chunk, 0); - assert_eq!(chunks[1].length_in_chunk, 1024); - - // Third chunk: 2048-2500 (452 bytes) - assert_eq!(chunks[2].chunk_index, 2); - assert_eq!(chunks[2].offset_in_chunk, 0); - assert_eq!(chunks[2].length_in_chunk, 452); - } - - #[test] - fn test_split_range_read_to_end() { - // Read from offset to end of object - let chunks = split_range_into_chunks(1500, None, 1024, 3000); - assert_eq!(chunks.len(), 2); - - // First chunk: 1500-2048 (548 bytes) - assert_eq!(chunks[0].chunk_index, 1); - assert_eq!(chunks[0].offset_in_chunk, 476); - assert_eq!(chunks[0].length_in_chunk, 548); - - // Second chunk: 2048-3000 (952 bytes) - assert_eq!(chunks[1].chunk_index, 2); - assert_eq!(chunks[1].offset_in_chunk, 0); - assert_eq!(chunks[1].length_in_chunk, 952); - } - - #[test] - fn test_split_range_entire_object() { - // Read entire object - let chunks = split_range_into_chunks(0, None, 1024, 2500); - assert_eq!(chunks.len(), 3); - - assert_eq!(chunks[0].chunk_index, 0); - assert_eq!(chunks[0].offset_in_chunk, 0); - assert_eq!(chunks[0].length_in_chunk, 1024); - - assert_eq!(chunks[1].chunk_index, 1); - assert_eq!(chunks[1].offset_in_chunk, 0); - assert_eq!(chunks[1].length_in_chunk, 1024); - - assert_eq!(chunks[2].chunk_index, 2); - assert_eq!(chunks[2].offset_in_chunk, 0); - assert_eq!(chunks[2].length_in_chunk, 452); - } - - #[test] - fn test_split_range_empty_object() { - let chunks = split_range_into_chunks(0, None, 1024, 0); - assert!(chunks.is_empty()); - } - - #[test] - fn test_split_range_empty_range() { - // Zero-length range - let chunks = split_range_into_chunks(100, Some(0), 1024, 4096); - assert!(chunks.is_empty()); - - // Start equals end - let chunks = split_range_into_chunks(1024, Some(0), 1024, 4096); - assert!(chunks.is_empty()); - } - - #[test] - fn test_split_range_beyond_object() { - // Range starts beyond object - let chunks = split_range_into_chunks(5000, Some(100), 1024, 4096); - assert!(chunks.is_empty()); - - // Range partially beyond object - let chunks = split_range_into_chunks(4000, Some(500), 1024, 4096); - assert_eq!(chunks.len(), 1); - assert_eq!(chunks[0].chunk_index, 3); - assert_eq!(chunks[0].offset_in_chunk, 928); - assert_eq!(chunks[0].length_in_chunk, 96); - } - - #[test] - fn test_split_range_exact_chunk_boundaries() { - // Read exactly one full chunk - let chunks = split_range_into_chunks(1024, Some(1024), 1024, 4096); - assert_eq!(chunks.len(), 1); - assert_eq!(chunks[0].chunk_index, 1); - assert_eq!(chunks[0].offset_in_chunk, 0); - assert_eq!(chunks[0].length_in_chunk, 1024); - - // Read exactly two full chunks - let chunks = split_range_into_chunks(0, Some(2048), 1024, 4096); - assert_eq!(chunks.len(), 2); - assert_eq!(chunks[0].length_in_chunk, 1024); - assert_eq!(chunks[1].length_in_chunk, 1024); - } - - #[test] - fn test_split_range_object_not_chunk_aligned() { - // Object size is not a multiple of chunk size - let chunks = split_range_into_chunks(0, None, 1024, 1500); - assert_eq!(chunks.len(), 2); - - assert_eq!(chunks[0].chunk_index, 0); - assert_eq!(chunks[0].length_in_chunk, 1024); - - assert_eq!(chunks[1].chunk_index, 1); - assert_eq!(chunks[1].offset_in_chunk, 0); - assert_eq!(chunks[1].length_in_chunk, 476); // Last chunk is smaller - } - - #[test] - fn test_split_range_last_partial_chunk() { - // Read that ends in the middle of the last partial chunk - let chunks = split_range_into_chunks(1000, Some(400), 1024, 1500); - assert_eq!(chunks.len(), 2); - - // First chunk: 1000-1024 - assert_eq!(chunks[0].chunk_index, 0); - assert_eq!(chunks[0].offset_in_chunk, 1000); - assert_eq!(chunks[0].length_in_chunk, 24); - - // Second chunk: 1024-1400 - assert_eq!(chunks[1].chunk_index, 1); - assert_eq!(chunks[1].offset_in_chunk, 0); - assert_eq!(chunks[1].length_in_chunk, 376); - } -} diff --git a/core/layers/foyer/src/chunked.rs b/core/layers/foyer/src/chunked.rs index edce3ebfd407..900519e460fe 100644 --- a/core/layers/foyer/src/chunked.rs +++ b/core/layers/foyer/src/chunked.rs @@ -30,8 +30,19 @@ use opendal_core::raw::oio::Read; use crate::FoyerKey; use crate::FoyerValue; use crate::Inner; -use crate::cached_metadata::CachedMetadata; -use crate::chunk_utils::split_range_into_chunks; + +/// Cached metadata for an object in chunked mode. +/// +/// This stores essential information needed for chunk-based reading: +/// - `content_length`: Total size of the object +/// - `version`: Object version (if versioning is enabled) +/// - `etag`: Entity tag for cache validation +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +struct CachedMetadata { + content_length: u64, + version: Option, + etag: Option, +} /// ChunkedReader reads objects in fixed-size chunks, caching each chunk independently. /// @@ -199,3 +210,296 @@ impl ChunkedReader { Ok(buffer) } } + +/// Information about a chunk needed to satisfy a read request. +#[derive(Debug, Clone, PartialEq, Eq)] +struct ChunkInfo { + /// The index of this chunk (0-based) + chunk_index: u64, + /// Offset within this chunk where the requested data starts + offset_in_chunk: usize, + /// Length of data to read from this chunk + length_in_chunk: usize, +} + +/// Align a range to chunk boundaries. +/// +/// Returns the aligned (start, end) positions that cover the original range. +/// The end position is exclusive. +/// +/// # Arguments +/// * `range_start` - Start of the requested range +/// * `range_end` - End of the requested range (exclusive) +/// * `chunk_size` - Size of each chunk +/// * `object_size` - Total size of the object +/// +/// # Returns +/// (aligned_start, aligned_end) - Both aligned to chunk boundaries, clamped to object_size +#[allow(dead_code)] +fn align_range(range_start: u64, range_end: u64, chunk_size: usize, object_size: u64) -> (u64, u64) { + let chunk_size = chunk_size as u64; + + // Align start down to chunk boundary + let aligned_start = (range_start / chunk_size) * chunk_size; + + // Align end up to chunk boundary, but don't exceed object size + let aligned_end = if range_end == 0 { + 0 + } else { + let chunks_needed = range_end.div_ceil(chunk_size); + (chunks_needed * chunk_size).min(object_size) + }; + + (aligned_start, aligned_end) +} + +/// Split a range into individual chunk read operations. +/// +/// # Arguments +/// * `range_start` - Start of the requested range (inclusive) +/// * `range_size` - Size of the requested range, or None for "read to end" +/// * `chunk_size` - Size of each chunk +/// * `object_size` - Total size of the object +/// +/// # Returns +/// A vector of `ChunkInfo` describing each chunk that needs to be fetched +/// and how to slice the data from each chunk. +fn split_range_into_chunks( + range_start: u64, + range_size: Option, + chunk_size: usize, + object_size: u64, +) -> Vec { + if object_size == 0 { + return vec![]; + } + + let chunk_size_u64 = chunk_size as u64; + let range_end = match range_size { + Some(size) => (range_start + size).min(object_size), + None => object_size, + }; + + if range_start >= range_end { + return vec![]; + } + + let first_chunk_index = range_start / chunk_size_u64; + let last_chunk_index = (range_end - 1) / chunk_size_u64; + + let mut chunks = Vec::with_capacity((last_chunk_index - first_chunk_index + 1) as usize); + + for chunk_index in first_chunk_index..=last_chunk_index { + let chunk_start = chunk_index * chunk_size_u64; + let chunk_end = ((chunk_index + 1) * chunk_size_u64).min(object_size); + + // Calculate the intersection of [range_start, range_end) with [chunk_start, chunk_end) + let read_start = range_start.max(chunk_start); + let read_end = range_end.min(chunk_end); + + if read_start < read_end { + chunks.push(ChunkInfo { + chunk_index, + offset_in_chunk: (read_start - chunk_start) as usize, + length_in_chunk: (read_end - read_start) as usize, + }); + } + } + + chunks +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_align_range_basic() { + // Range within a single chunk + let (start, end) = align_range(100, 200, 1024, 4096); + assert_eq!(start, 0); + assert_eq!(end, 1024); + + // Range spanning multiple chunks + let (start, end) = align_range(1000, 3000, 1024, 4096); + assert_eq!(start, 0); + assert_eq!(end, 3072); + } + + #[test] + fn test_align_range_at_boundaries() { + // Start at chunk boundary + let (start, end) = align_range(1024, 2000, 1024, 4096); + assert_eq!(start, 1024); + assert_eq!(end, 2048); + + // End at chunk boundary + let (start, end) = align_range(100, 2048, 1024, 4096); + assert_eq!(start, 0); + assert_eq!(end, 2048); + + // Both at boundaries + let (start, end) = align_range(1024, 2048, 1024, 4096); + assert_eq!(start, 1024); + assert_eq!(end, 2048); + } + + #[test] + fn test_align_range_clamp_to_object_size() { + // Range extends beyond object + let (start, end) = align_range(3000, 5000, 1024, 4096); + assert_eq!(start, 2048); + assert_eq!(end, 4096); + + // Object smaller than chunk + let (start, end) = align_range(0, 500, 1024, 500); + assert_eq!(start, 0); + assert_eq!(end, 500); + } + + #[test] + fn test_split_range_single_chunk() { + // Read within a single chunk + let chunks = split_range_into_chunks(100, Some(200), 1024, 4096); + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].chunk_index, 0); + assert_eq!(chunks[0].offset_in_chunk, 100); + assert_eq!(chunks[0].length_in_chunk, 200); + } + + #[test] + fn test_split_range_multiple_chunks() { + // Read spanning 3 chunks + let chunks = split_range_into_chunks(500, Some(2000), 1024, 4096); + assert_eq!(chunks.len(), 3); + + // First chunk: 500-1024 (524 bytes) + assert_eq!(chunks[0].chunk_index, 0); + assert_eq!(chunks[0].offset_in_chunk, 500); + assert_eq!(chunks[0].length_in_chunk, 524); + + // Second chunk: 1024-2048 (1024 bytes) + assert_eq!(chunks[1].chunk_index, 1); + assert_eq!(chunks[1].offset_in_chunk, 0); + assert_eq!(chunks[1].length_in_chunk, 1024); + + // Third chunk: 2048-2500 (452 bytes) + assert_eq!(chunks[2].chunk_index, 2); + assert_eq!(chunks[2].offset_in_chunk, 0); + assert_eq!(chunks[2].length_in_chunk, 452); + } + + #[test] + fn test_split_range_read_to_end() { + // Read from offset to end of object + let chunks = split_range_into_chunks(1500, None, 1024, 3000); + assert_eq!(chunks.len(), 2); + + // First chunk: 1500-2048 (548 bytes) + assert_eq!(chunks[0].chunk_index, 1); + assert_eq!(chunks[0].offset_in_chunk, 476); + assert_eq!(chunks[0].length_in_chunk, 548); + + // Second chunk: 2048-3000 (952 bytes) + assert_eq!(chunks[1].chunk_index, 2); + assert_eq!(chunks[1].offset_in_chunk, 0); + assert_eq!(chunks[1].length_in_chunk, 952); + } + + #[test] + fn test_split_range_entire_object() { + // Read entire object + let chunks = split_range_into_chunks(0, None, 1024, 2500); + assert_eq!(chunks.len(), 3); + + assert_eq!(chunks[0].chunk_index, 0); + assert_eq!(chunks[0].offset_in_chunk, 0); + assert_eq!(chunks[0].length_in_chunk, 1024); + + assert_eq!(chunks[1].chunk_index, 1); + assert_eq!(chunks[1].offset_in_chunk, 0); + assert_eq!(chunks[1].length_in_chunk, 1024); + + assert_eq!(chunks[2].chunk_index, 2); + assert_eq!(chunks[2].offset_in_chunk, 0); + assert_eq!(chunks[2].length_in_chunk, 452); + } + + #[test] + fn test_split_range_empty_object() { + let chunks = split_range_into_chunks(0, None, 1024, 0); + assert!(chunks.is_empty()); + } + + #[test] + fn test_split_range_empty_range() { + // Zero-length range + let chunks = split_range_into_chunks(100, Some(0), 1024, 4096); + assert!(chunks.is_empty()); + + // Start equals end + let chunks = split_range_into_chunks(1024, Some(0), 1024, 4096); + assert!(chunks.is_empty()); + } + + #[test] + fn test_split_range_beyond_object() { + // Range starts beyond object + let chunks = split_range_into_chunks(5000, Some(100), 1024, 4096); + assert!(chunks.is_empty()); + + // Range partially beyond object + let chunks = split_range_into_chunks(4000, Some(500), 1024, 4096); + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].chunk_index, 3); + assert_eq!(chunks[0].offset_in_chunk, 928); + assert_eq!(chunks[0].length_in_chunk, 96); + } + + #[test] + fn test_split_range_exact_chunk_boundaries() { + // Read exactly one full chunk + let chunks = split_range_into_chunks(1024, Some(1024), 1024, 4096); + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].chunk_index, 1); + assert_eq!(chunks[0].offset_in_chunk, 0); + assert_eq!(chunks[0].length_in_chunk, 1024); + + // Read exactly two full chunks + let chunks = split_range_into_chunks(0, Some(2048), 1024, 4096); + assert_eq!(chunks.len(), 2); + assert_eq!(chunks[0].length_in_chunk, 1024); + assert_eq!(chunks[1].length_in_chunk, 1024); + } + + #[test] + fn test_split_range_object_not_chunk_aligned() { + // Object size is not a multiple of chunk size + let chunks = split_range_into_chunks(0, None, 1024, 1500); + assert_eq!(chunks.len(), 2); + + assert_eq!(chunks[0].chunk_index, 0); + assert_eq!(chunks[0].length_in_chunk, 1024); + + assert_eq!(chunks[1].chunk_index, 1); + assert_eq!(chunks[1].offset_in_chunk, 0); + assert_eq!(chunks[1].length_in_chunk, 476); // Last chunk is smaller + } + + #[test] + fn test_split_range_last_partial_chunk() { + // Read that ends in the middle of the last partial chunk + let chunks = split_range_into_chunks(1000, Some(400), 1024, 1500); + assert_eq!(chunks.len(), 2); + + // First chunk: 1000-1024 + assert_eq!(chunks[0].chunk_index, 0); + assert_eq!(chunks[0].offset_in_chunk, 1000); + assert_eq!(chunks[0].length_in_chunk, 24); + + // Second chunk: 1024-1400 + assert_eq!(chunks[1].chunk_index, 1); + assert_eq!(chunks[1].offset_in_chunk, 0); + assert_eq!(chunks[1].length_in_chunk, 376); + } +} diff --git a/core/layers/foyer/src/lib.rs b/core/layers/foyer/src/lib.rs index 344ed68e3550..6173ac6a50c9 100644 --- a/core/layers/foyer/src/lib.rs +++ b/core/layers/foyer/src/lib.rs @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -mod cached_metadata; -mod chunk_utils; mod chunked; mod deleter; mod error; From 8abda435c222c8d1f574e539789a9d1a809c43f0 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Thu, 29 Jan 2026 22:40:57 +0800 Subject: [PATCH 03/13] add todo while tune the format --- core/layers/foyer/src/chunked.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/core/layers/foyer/src/chunked.rs b/core/layers/foyer/src/chunked.rs index 900519e460fe..ee51b8b71989 100644 --- a/core/layers/foyer/src/chunked.rs +++ b/core/layers/foyer/src/chunked.rs @@ -94,7 +94,8 @@ impl ChunkedReader { .await?; // Slice the chunk to get only the needed portion - let end = (chunk_info.offset_in_chunk + chunk_info.length_in_chunk).min(chunk_data.len()); + let end = + (chunk_info.offset_in_chunk + chunk_info.length_in_chunk).min(chunk_data.len()); let sliced = chunk_data.slice(chunk_info.offset_in_chunk..end); total_len += sliced.len() as u64; result_bufs.push(sliced); @@ -106,6 +107,7 @@ impl ChunkedReader { .with_range(range_start, range_end.saturating_sub(1)) .with_size(object_size); + // TODO: use non-contiguous buffer let buffer: Buffer = result_bufs .into_iter() .flat_map(|b| b.to_bytes()) @@ -123,11 +125,7 @@ impl ChunkedReader { /// /// Uses simple get/insert pattern: check cache first, on miss stat from backend /// and insert into cache. - async fn fetch_metadata( - &self, - path: &str, - version: Option, - ) -> Result { + async fn fetch_metadata(&self, path: &str, version: Option) -> Result { let key = FoyerKey::Metadata { path: path.to_string(), chunk_size: self.chunk_size, @@ -203,9 +201,7 @@ impl ChunkedReader { let buffer = reader.read_all().await?; // Insert into cache - self.inner - .cache - .insert(key, FoyerValue(buffer.clone())); + self.inner.cache.insert(key, FoyerValue(buffer.clone())); Ok(buffer) } @@ -236,7 +232,12 @@ struct ChunkInfo { /// # Returns /// (aligned_start, aligned_end) - Both aligned to chunk boundaries, clamped to object_size #[allow(dead_code)] -fn align_range(range_start: u64, range_end: u64, chunk_size: usize, object_size: u64) -> (u64, u64) { +fn align_range( + range_start: u64, + range_end: u64, + chunk_size: usize, + object_size: u64, +) -> (u64, u64) { let chunk_size = chunk_size as u64; // Align start down to chunk boundary From 6d529ea69934f054d18cf650e3abcd23db594a49 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Thu, 29 Jan 2026 22:46:11 +0800 Subject: [PATCH 04/13] add integration tests --- core/layers/foyer/src/lib.rs | 348 +++++++++++++++++++++++++++++++++++ 1 file changed, 348 insertions(+) diff --git a/core/layers/foyer/src/lib.rs b/core/layers/foyer/src/lib.rs index 6173ac6a50c9..3ba5f1b01ac6 100644 --- a/core/layers/foyer/src/lib.rs +++ b/core/layers/foyer/src/lib.rs @@ -772,4 +772,352 @@ mod tests { assert_eq!(read_data.len(), 1024); assert_eq!(read_data.to_vec(), data[1024..2048]); } + + #[tokio::test] + async fn test_chunked_metadata_caching() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone()).with_chunk_size(1024)) + .finish(); + + let data = vec![42u8; 5000]; + op.write("meta.bin", data.clone()).await.unwrap(); + + // First read to cache metadata and chunks + let read1 = op.read_with("meta.bin").range(0..1000).await.unwrap(); + assert_eq!(read1.len(), 1000); + + // Clear only data cache (metadata should remain if properly keyed) + // Second read from different range - metadata should be cached + let read2 = op.read_with("meta.bin").range(3000..4000).await.unwrap(); + assert_eq!(read2.len(), 1000); + assert_eq!(read2.to_vec(), data[3000..4000]); + } + + #[tokio::test] + async fn test_chunked_different_chunk_sizes() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(2 * 1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let data = vec![88u8; 10000]; + + // Test with 1KB chunks + let op1 = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone()).with_chunk_size(1024)) + .finish(); + op1.write("chunk_size_test.bin", data.clone()).await.unwrap(); + let read1 = op1.read_with("chunk_size_test.bin").range(2000..4000).await.unwrap(); + assert_eq!(read1.len(), 2000); + + // Test with 2KB chunks - should create different cache keys + let op2 = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone()).with_chunk_size(2048)) + .finish(); + op2.write("chunk_size_test2.bin", data.clone()).await.unwrap(); + let read2 = op2.read_with("chunk_size_test2.bin").range(2000..4000).await.unwrap(); + assert_eq!(read2.len(), 2000); + assert_eq!(read2.to_vec(), data[2000..4000]); + } + + #[tokio::test] + async fn test_chunked_cache_persistence() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone()).with_chunk_size(1024)) + .finish(); + + let data: Vec = (0..5000).map(|i| (i % 256) as u8).collect(); + op.write("persist.bin", data.clone()).await.unwrap(); + + // Read to populate cache + let read1 = op.read_with("persist.bin").range(1000..3000).await.unwrap(); + assert_eq!(read1.to_vec(), data[1000..3000]); + + // Verify cache hit by reading again (overlapping range) + let read2 = op.read_with("persist.bin").range(1500..2500).await.unwrap(); + assert_eq!(read2.to_vec(), data[1500..2500]); + + // Non-overlapping range should fetch new chunks + let read3 = op.read_with("persist.bin").range(4000..5000).await.unwrap(); + assert_eq!(read3.to_vec(), data[4000..5000]); + } + + #[tokio::test] + async fn test_chunked_boundary_reads() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache).with_chunk_size(1024)) + .finish(); + + let data: Vec = (0..4096).map(|i| (i % 256) as u8).collect(); + op.write("boundary.bin", data.clone()).await.unwrap(); + + // Read from exact chunk start + let read1 = op.read_with("boundary.bin").range(1024..1524).await.unwrap(); + assert_eq!(read1.to_vec(), data[1024..1524]); + + // Read to exact chunk end + let read2 = op.read_with("boundary.bin").range(500..1024).await.unwrap(); + assert_eq!(read2.to_vec(), data[500..1024]); + + // Read across chunk boundary + let read3 = op.read_with("boundary.bin").range(1020..1030).await.unwrap(); + assert_eq!(read3.to_vec(), data[1020..1030]); + } + + #[tokio::test] + async fn test_chunked_large_file() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(4 * 1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(32 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache).with_chunk_size(4096)) + .finish(); + + // 100KB file - 25 chunks + let data: Vec = (0..100_000).map(|i| (i % 256) as u8).collect(); + op.write("large.bin", data.clone()).await.unwrap(); + + // Read various sections + let read1 = op.read_with("large.bin").range(10_000..20_000).await.unwrap(); + assert_eq!(read1.len(), 10_000); + assert_eq!(read1.to_vec(), data[10_000..20_000]); + + let read2 = op.read_with("large.bin").range(50_000..60_000).await.unwrap(); + assert_eq!(read2.len(), 10_000); + assert_eq!(read2.to_vec(), data[50_000..60_000]); + + // Read entire file + let read_all = op.read("large.bin").await.unwrap(); + assert_eq!(read_all.len(), 100_000); + assert_eq!(read_all.to_vec(), data); + } + + #[tokio::test] + async fn test_chunked_sequential_reads() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache).with_chunk_size(1024)) + .finish(); + + let data: Vec = (0..10_000).map(|i| (i % 256) as u8).collect(); + op.write("sequential.bin", data.clone()).await.unwrap(); + + // Simulate sequential reading pattern + for offset in (0..10_000).step_by(500) { + let end = (offset + 500).min(data.len()); + let read_data = op + .read_with("sequential.bin") + .range(offset as u64..end as u64) + .await + .unwrap(); + assert_eq!(read_data.to_vec(), data[offset..end]); + } + } + + #[tokio::test] + async fn test_chunked_random_access() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(2 * 1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache).with_chunk_size(1024)) + .finish(); + + let data: Vec = (0..20_000).map(|i| (i % 256) as u8).collect(); + op.write("random.bin", data.clone()).await.unwrap(); + + // Random access pattern + let ranges = [ + (5000, 5500), + (15000, 15200), + (1000, 2000), + (18000, 19000), + (500, 1500), + ]; + + for (start, end) in ranges { + let read_data = op + .read_with("random.bin") + .range(start..end) + .await + .unwrap(); + assert_eq!(read_data.to_vec(), data[start as usize..end as usize]); + } + } + + #[tokio::test] + async fn test_chunked_with_clear_cache() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone()).with_chunk_size(1024)) + .finish(); + + let data = vec![99u8; 5000]; + op.write("clear_test.bin", data.clone()).await.unwrap(); + + // First read to populate cache + let read1 = op.read_with("clear_test.bin").range(1000..2000).await.unwrap(); + assert_eq!(read1.to_vec(), data[1000..2000]); + + // Clear cache + cache.clear().await.unwrap(); + + // Read again - should still work (fetch from backend) + let read2 = op.read_with("clear_test.bin").range(1000..2000).await.unwrap(); + assert_eq!(read2.to_vec(), data[1000..2000]); + } + + #[tokio::test] + async fn test_chunked_single_byte_reads() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache).with_chunk_size(1024)) + .finish(); + + let data: Vec = (0..3000).map(|i| (i % 256) as u8).collect(); + op.write("single_byte.bin", data.clone()).await.unwrap(); + + // Read single bytes at various positions + let positions = [0, 1023, 1024, 2047, 2048, 2999]; + for pos in positions { + let read_data = op + .read_with("single_byte.bin") + .range(pos..pos + 1) + .await + .unwrap(); + assert_eq!(read_data.len(), 1); + assert_eq!(read_data.to_vec()[0], data[pos as usize]); + } + } } From 69adc29a4e428763c3240d624f5aea5094aebb2e Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Thu, 29 Jan 2026 23:25:16 +0800 Subject: [PATCH 05/13] skip cache for writer in chunked mode --- core/layers/foyer/src/writer.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/layers/foyer/src/writer.rs b/core/layers/foyer/src/writer.rs index fc6e9b4ef96f..85d04cdc29e3 100644 --- a/core/layers/foyer/src/writer.rs +++ b/core/layers/foyer/src/writer.rs @@ -43,22 +43,24 @@ impl Writer { inner: Arc>, size_limit: std::ops::Range, ) -> Self { + // In chunked mode, skip caching entirely + let skip_cache = inner.chunk_size.is_some(); Self { w, buf: oio::QueueBuf::new(), path, inner, size_limit, - skip_cache: false, + skip_cache, } } } impl oio::Write for Writer { async fn write(&mut self, bs: Buffer) -> Result<()> { - if self.size_limit.contains(&(self.buf.len() + bs.len())) { + // Only buffer if not in chunked mode and within size limit + if !self.skip_cache && self.size_limit.contains(&(self.buf.len() + bs.len())) { self.buf.push(bs.clone()); - self.skip_cache = false; } else { self.buf.clear(); self.skip_cache = true; @@ -69,6 +71,7 @@ impl oio::Write for Writer { async fn close(&mut self) -> Result { let buffer = self.buf.clone().collect(); let metadata = self.w.close().await?; + // Only cache in full mode (not chunked mode) if !self.skip_cache { self.inner.cache.insert( FoyerKey::Full { From 84055aac798b6d277871c74766c47c050ff49215 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Thu, 29 Jan 2026 23:26:07 +0800 Subject: [PATCH 06/13] rename Metadata into ChunkMetadata --- core/layers/foyer/src/chunked.rs | 2 +- core/layers/foyer/src/lib.rs | 68 +++++++++++++++++++++++--------- 2 files changed, 51 insertions(+), 19 deletions(-) diff --git a/core/layers/foyer/src/chunked.rs b/core/layers/foyer/src/chunked.rs index ee51b8b71989..4227b43cda55 100644 --- a/core/layers/foyer/src/chunked.rs +++ b/core/layers/foyer/src/chunked.rs @@ -126,7 +126,7 @@ impl ChunkedReader { /// Uses simple get/insert pattern: check cache first, on miss stat from backend /// and insert into cache. async fn fetch_metadata(&self, path: &str, version: Option) -> Result { - let key = FoyerKey::Metadata { + let key = FoyerKey::ChunkMetadata { path: path.to_string(), chunk_size: self.chunk_size, version: version.clone(), diff --git a/core/layers/foyer/src/lib.rs b/core/layers/foyer/src/lib.rs index 3ba5f1b01ac6..5503b507a8ec 100644 --- a/core/layers/foyer/src/lib.rs +++ b/core/layers/foyer/src/lib.rs @@ -57,7 +57,7 @@ pub enum FoyerKey { version: Option, }, /// Cache key for object metadata (chunked mode) - Metadata { + ChunkMetadata { path: String, chunk_size: usize, version: Option, @@ -513,12 +513,12 @@ mod tests { version: Some("long-version-".to_string().repeat(50)), }, // Test Metadata variant - FoyerKey::Metadata { + FoyerKey::ChunkMetadata { path: "meta/path".to_string(), chunk_size: 1024, version: None, }, - FoyerKey::Metadata { + FoyerKey::ChunkMetadata { path: "meta/versioned".to_string(), chunk_size: 4096, version: Some("v2".to_string()), @@ -833,8 +833,14 @@ mod tests { .unwrap() .layer(FoyerLayer::new(cache.clone()).with_chunk_size(1024)) .finish(); - op1.write("chunk_size_test.bin", data.clone()).await.unwrap(); - let read1 = op1.read_with("chunk_size_test.bin").range(2000..4000).await.unwrap(); + op1.write("chunk_size_test.bin", data.clone()) + .await + .unwrap(); + let read1 = op1 + .read_with("chunk_size_test.bin") + .range(2000..4000) + .await + .unwrap(); assert_eq!(read1.len(), 2000); // Test with 2KB chunks - should create different cache keys @@ -842,8 +848,14 @@ mod tests { .unwrap() .layer(FoyerLayer::new(cache.clone()).with_chunk_size(2048)) .finish(); - op2.write("chunk_size_test2.bin", data.clone()).await.unwrap(); - let read2 = op2.read_with("chunk_size_test2.bin").range(2000..4000).await.unwrap(); + op2.write("chunk_size_test2.bin", data.clone()) + .await + .unwrap(); + let read2 = op2 + .read_with("chunk_size_test2.bin") + .range(2000..4000) + .await + .unwrap(); assert_eq!(read2.len(), 2000); assert_eq!(read2.to_vec(), data[2000..4000]); } @@ -912,7 +924,11 @@ mod tests { op.write("boundary.bin", data.clone()).await.unwrap(); // Read from exact chunk start - let read1 = op.read_with("boundary.bin").range(1024..1524).await.unwrap(); + let read1 = op + .read_with("boundary.bin") + .range(1024..1524) + .await + .unwrap(); assert_eq!(read1.to_vec(), data[1024..1524]); // Read to exact chunk end @@ -920,7 +936,11 @@ mod tests { assert_eq!(read2.to_vec(), data[500..1024]); // Read across chunk boundary - let read3 = op.read_with("boundary.bin").range(1020..1030).await.unwrap(); + let read3 = op + .read_with("boundary.bin") + .range(1020..1030) + .await + .unwrap(); assert_eq!(read3.to_vec(), data[1020..1030]); } @@ -951,11 +971,19 @@ mod tests { op.write("large.bin", data.clone()).await.unwrap(); // Read various sections - let read1 = op.read_with("large.bin").range(10_000..20_000).await.unwrap(); + let read1 = op + .read_with("large.bin") + .range(10_000..20_000) + .await + .unwrap(); assert_eq!(read1.len(), 10_000); assert_eq!(read1.to_vec(), data[10_000..20_000]); - let read2 = op.read_with("large.bin").range(50_000..60_000).await.unwrap(); + let read2 = op + .read_with("large.bin") + .range(50_000..60_000) + .await + .unwrap(); assert_eq!(read2.len(), 10_000); assert_eq!(read2.to_vec(), data[50_000..60_000]); @@ -1037,11 +1065,7 @@ mod tests { ]; for (start, end) in ranges { - let read_data = op - .read_with("random.bin") - .range(start..end) - .await - .unwrap(); + let read_data = op.read_with("random.bin").range(start..end).await.unwrap(); assert_eq!(read_data.to_vec(), data[start as usize..end as usize]); } } @@ -1072,14 +1096,22 @@ mod tests { op.write("clear_test.bin", data.clone()).await.unwrap(); // First read to populate cache - let read1 = op.read_with("clear_test.bin").range(1000..2000).await.unwrap(); + let read1 = op + .read_with("clear_test.bin") + .range(1000..2000) + .await + .unwrap(); assert_eq!(read1.to_vec(), data[1000..2000]); // Clear cache cache.clear().await.unwrap(); // Read again - should still work (fetch from backend) - let read2 = op.read_with("clear_test.bin").range(1000..2000).await.unwrap(); + let read2 = op + .read_with("clear_test.bin") + .range(1000..2000) + .await + .unwrap(); assert_eq!(read2.to_vec(), data[1000..2000]); } From 5b892cfa1feb763464eb7e40e764fb1105bc72fb Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Thu, 29 Jan 2026 23:27:57 +0800 Subject: [PATCH 07/13] skip_cache in deleter.rs --- core/layers/foyer/src/deleter.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/core/layers/foyer/src/deleter.rs b/core/layers/foyer/src/deleter.rs index 99fe8b176334..de4a6ffea060 100644 --- a/core/layers/foyer/src/deleter.rs +++ b/core/layers/foyer/src/deleter.rs @@ -29,14 +29,17 @@ pub struct Deleter { pub(crate) deleter: A::Deleter, pub(crate) keys: Vec, pub(crate) inner: Arc>, + skip_cache: bool, } impl Deleter { pub(crate) fn new(deleter: A::Deleter, inner: Arc>) -> Self { + let skip_cache = inner.chunk_size.is_some(); Self { deleter, keys: vec![], inner, + skip_cache, } } } @@ -44,16 +47,20 @@ impl Deleter { impl oio::Delete for Deleter { async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { self.deleter.delete(path, args.clone()).await?; - self.keys.push(FoyerKey::Full { - path: path.to_string(), - version: args.version().map(|v| v.to_string()), - }); + if !self.skip_cache { + self.keys.push(FoyerKey::Full { + path: path.to_string(), + version: args.version().map(|v| v.to_string()), + }); + } Ok(()) } async fn close(&mut self) -> Result<()> { - for key in &self.keys { - self.inner.cache.remove(key); + if !self.skip_cache { + for key in &self.keys { + self.inner.cache.remove(key); + } } self.deleter.close().await } From 1754e76ea01cdc1f0476061ded3d3eadba8d9ac2 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Thu, 29 Jan 2026 23:33:51 +0800 Subject: [PATCH 08/13] fix typo in comment --- core/layers/foyer/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/layers/foyer/src/lib.rs b/core/layers/foyer/src/lib.rs index 5503b507a8ec..fd547e0cde02 100644 --- a/core/layers/foyer/src/lib.rs +++ b/core/layers/foyer/src/lib.rs @@ -47,7 +47,7 @@ pub use writer::Writer; /// # Variants /// /// - `Full`: Caches the entire object (used in non-chunked mode) -/// - `Metadata`: Caches object metadata for chunked mode (content_length, version, etag) +/// - `ChunkMetadata`: Caches object metadata for chunked mode (content_length, version, etag) /// - `Chunk`: Caches a specific chunk of the object #[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] pub enum FoyerKey { From cea7ac21ce2dadaf41c346e8f6a66ac4e8297707 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Fri, 30 Jan 2026 21:19:57 +0800 Subject: [PATCH 09/13] fix TODO in using non-contiguous buffer to avoid memory copy. --- core/layers/foyer/src/chunked.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/core/layers/foyer/src/chunked.rs b/core/layers/foyer/src/chunked.rs index 4227b43cda55..df0acf76b5ca 100644 --- a/core/layers/foyer/src/chunked.rs +++ b/core/layers/foyer/src/chunked.rs @@ -107,12 +107,10 @@ impl ChunkedReader { .with_range(range_start, range_end.saturating_sub(1)) .with_size(object_size); - // TODO: use non-contiguous buffer - let buffer: Buffer = result_bufs - .into_iter() - .flat_map(|b| b.to_bytes()) - .collect::>() - .into(); + // Use non-contiguous buffer to avoid memory copy. + // Buffer implements Iterator, so flatten() extracts all Bytes chunks, + // then collect() uses FromIterator to create a non-contiguous Buffer. + let buffer: Buffer = result_bufs.into_iter().flatten().collect(); let rp = RpRead::new() .with_size(Some(buffer.len() as u64)) From 21a42fddcb763dda67b3752612ab27461a2bd151 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Fri, 30 Jan 2026 21:51:14 +0800 Subject: [PATCH 10/13] refactor: let FoyerValue to be an enum to avoid deserialization when the data is cached in memory --- core/layers/foyer/src/chunked.rs | 51 +++++++++++-------------- core/layers/foyer/src/full.rs | 45 +++++++++++++--------- core/layers/foyer/src/lib.rs | 64 +++++++++++++++++--------------- core/layers/foyer/src/writer.rs | 3 +- 4 files changed, 84 insertions(+), 79 deletions(-) diff --git a/core/layers/foyer/src/chunked.rs b/core/layers/foyer/src/chunked.rs index df0acf76b5ca..0dd94adb019c 100644 --- a/core/layers/foyer/src/chunked.rs +++ b/core/layers/foyer/src/chunked.rs @@ -27,23 +27,12 @@ use opendal_core::raw::OpStat; use opendal_core::raw::RpRead; use opendal_core::raw::oio::Read; +use crate::CachedBuffer; +use crate::CachedChunkMetadata; use crate::FoyerKey; use crate::FoyerValue; use crate::Inner; -/// Cached metadata for an object in chunked mode. -/// -/// This stores essential information needed for chunk-based reading: -/// - `content_length`: Total size of the object -/// - `version`: Object version (if versioning is enabled) -/// - `etag`: Entity tag for cache validation -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -struct CachedMetadata { - content_length: u64, - version: Option, - etag: Option, -} - /// ChunkedReader reads objects in fixed-size chunks, caching each chunk independently. /// /// This enables efficient partial reads of large objects by only fetching and caching @@ -120,10 +109,11 @@ impl ChunkedReader { } /// Fetch object metadata from cache or backend. - /// - /// Uses simple get/insert pattern: check cache first, on miss stat from backend - /// and insert into cache. - async fn fetch_metadata(&self, path: &str, version: Option) -> Result { + async fn fetch_metadata( + &self, + path: &str, + version: Option, + ) -> Result { let key = FoyerKey::ChunkMetadata { path: path.to_string(), chunk_size: self.chunk_size, @@ -132,11 +122,10 @@ impl ChunkedReader { // Try cache first if let Ok(Some(entry)) = self.inner.cache.get(&key).await { - let bytes = entry.value().0.to_vec(); - if let Ok(cached) = bincode::deserialize::(&bytes) { - return Ok(cached); + if let FoyerValue::ChunkMetadata(cached) = entry.value() { + return Ok(cached.clone()); } - // Deserialization failed - cache entry is corrupted, fall through to re-fetch + // Cache entry is wrong variant, fall through to re-fetch } // Cache miss - fetch from backend @@ -147,18 +136,16 @@ impl ChunkedReader { .await? .into_metadata(); - let cached = CachedMetadata { + let cached = CachedChunkMetadata { content_length: metadata.content_length(), version: metadata.version().map(|v| v.to_string()), etag: metadata.etag().map(|v| v.to_string()), }; - // Serialize and insert into cache - if let Ok(encoded) = bincode::serialize(&cached) { - self.inner - .cache - .insert(key, FoyerValue(Buffer::from(encoded))); - } + // Insert into cache + self.inner + .cache + .insert(key, FoyerValue::ChunkMetadata(cached.clone())); Ok(cached) } @@ -183,7 +170,9 @@ impl ChunkedReader { // Try cache first if let Ok(Some(entry)) = self.inner.cache.get(&key).await { - return Ok(entry.value().0.clone()); + if let FoyerValue::Buffer(cached) = entry.value() { + return Ok(cached.0.clone()); + } } // Cache miss - fetch from backend @@ -199,7 +188,9 @@ impl ChunkedReader { let buffer = reader.read_all().await?; // Insert into cache - self.inner.cache.insert(key, FoyerValue(buffer.clone())); + self.inner + .cache + .insert(key, FoyerValue::Buffer(CachedBuffer(buffer.clone()))); Ok(buffer) } diff --git a/core/layers/foyer/src/full.rs b/core/layers/foyer/src/full.rs index 9b04067605ff..2868d5c00826 100644 --- a/core/layers/foyer/src/full.rs +++ b/core/layers/foyer/src/full.rs @@ -29,6 +29,7 @@ use opendal_core::raw::OpStat; use opendal_core::raw::RpRead; use opendal_core::raw::oio::Read; +use crate::CachedBuffer; use crate::FoyerKey; use crate::FoyerValue; use crate::Inner; @@ -103,7 +104,7 @@ impl FullReader { .map_err(FoyerError::other)?; let buffer = reader.read_all().await.map_err(FoyerError::other)?; - Ok(FoyerValue(buffer)) + Ok(FoyerValue::Buffer(CachedBuffer(buffer))) } }, ) @@ -111,26 +112,34 @@ impl FullReader { // If got entry from cache, slice it to the requested range. If it's larger than size_limit, // we'll simply forward the request to the underlying accessor with user's given range. - match result { - Ok(entry) => { - let end = range_end.unwrap_or(entry.len() as u64); - let range = BytesContentRange::default() - .with_range(range_start, end - 1) - .with_size(entry.len() as _); - let buffer = entry.slice(range_start as usize..end as usize); - let rp = RpRead::new() - .with_size(Some(buffer.len() as _)) - .with_range(Some(range)); - Ok((rp, buffer)) - } + let buffer = match result { + Ok(entry) => match entry.value() { + FoyerValue::Buffer(cached) => cached.0.clone(), + _ => { + // fallback to underlying accessor if cache miss + let (_, mut reader) = + self.inner.accessor.read(path, original_args).await?; + reader.read_all().await? + } + }, Err(e) => match e.downcast::() { Ok(_) => { - let (rp, mut reader) = self.inner.accessor.read(path, original_args).await?; - let buffer = reader.read_all().await?; - Ok((rp, buffer)) + // fallback to underlying accessor if object size is too big + let (_, mut reader) = self.inner.accessor.read(path, original_args).await?; + reader.read_all().await? } - Err(e) => Err(extract_err(e)), + Err(e) => return Err(extract_err(e)), }, - } + }; + + let end = range_end.unwrap_or(buffer.len() as u64); + let range = BytesContentRange::default() + .with_range(range_start, end - 1) + .with_size(buffer.len() as _); + let sliced = buffer.slice(range_start as usize..end as usize); + let rp = RpRead::new() + .with_size(Some(sliced.len() as _)) + .with_range(Some(range)); + Ok((rp, sliced)) } } diff --git a/core/layers/foyer/src/lib.rs b/core/layers/foyer/src/lib.rs index fd547e0cde02..f250c880b9b9 100644 --- a/core/layers/foyer/src/lib.rs +++ b/core/layers/foyer/src/lib.rs @@ -23,11 +23,11 @@ mod writer; use std::{ future::Future, - ops::{Bound, Deref, Range, RangeBounds}, + ops::{Bound, Range, RangeBounds}, sync::Arc, }; -use foyer::{Code, CodeError, HybridCache}; +use foyer::HybridCache; use opendal_core::raw::*; use opendal_core::*; @@ -71,41 +71,45 @@ pub enum FoyerKey { }, } -/// [`FoyerValue`] is a wrapper around `Buffer` that implements the `Code` trait. -#[derive(Debug)] -pub struct FoyerValue(pub Buffer); +/// [`FoyerValue`] is a value type for the foyer cache that can store either +/// a raw buffer or cached metadata. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum FoyerValue { + /// Raw buffer data (used for full objects and chunks) + Buffer(CachedBuffer), + /// Cached metadata for chunked mode + ChunkMetadata(CachedChunkMetadata), +} -impl Deref for FoyerValue { - type Target = Buffer; +/// A serializable wrapper around [`Buffer`]. +#[derive(Debug, Clone)] +pub struct CachedBuffer(pub Buffer); - fn deref(&self) -> &Self::Target { - &self.0 +impl serde::Serialize for CachedBuffer { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + serde::Serialize::serialize(&self.0.to_vec(), serializer) } } -impl Code for FoyerValue { - fn encode(&self, writer: &mut impl std::io::Write) -> std::result::Result<(), CodeError> { - let len = self.0.len() as u64; - writer.write_all(&len.to_le_bytes())?; - std::io::copy(&mut self.0.clone(), writer)?; - Ok(()) - } - - fn decode(reader: &mut impl std::io::Read) -> std::result::Result +impl<'de> serde::Deserialize<'de> for CachedBuffer { + fn deserialize(deserializer: D) -> std::result::Result where - Self: Sized, + D: serde::Deserializer<'de>, { - let mut len_bytes = [0u8; 8]; - reader.read_exact(&mut len_bytes)?; - let len = u64::from_le_bytes(len_bytes) as usize; - let mut buffer = vec![0u8; len]; - reader.read_exact(&mut buffer[..len])?; - Ok(FoyerValue(buffer.into())) + let vec = Vec::::deserialize(deserializer)?; + Ok(CachedBuffer(Buffer::from(vec))) } +} - fn estimated_size(&self) -> usize { - 8 + self.0.len() - } +/// Cached metadata for an object in chunked mode. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct CachedChunkMetadata { + pub content_length: u64, + pub version: Option, + pub etag: Option, } /// Hybrid cache layer for OpenDAL that uses [foyer](https://github.com/foyer-rs/foyer) for caching. @@ -305,8 +309,8 @@ impl LayeredAccess for FoyerAccessor { #[cfg(test)] mod tests { use foyer::{ - DirectFsDeviceOptions, Engine, Error as FoyerError, HybridCacheBuilder, LargeEngineOptions, - RecoverMode, + Code, DirectFsDeviceOptions, Engine, Error as FoyerError, HybridCacheBuilder, + LargeEngineOptions, RecoverMode, }; use opendal_core::{Operator, services::Memory}; use size::consts::MiB; diff --git a/core/layers/foyer/src/writer.rs b/core/layers/foyer/src/writer.rs index 85d04cdc29e3..d74d95ed5604 100644 --- a/core/layers/foyer/src/writer.rs +++ b/core/layers/foyer/src/writer.rs @@ -23,6 +23,7 @@ use opendal_core::Result; use opendal_core::raw::Access; use opendal_core::raw::oio; +use crate::CachedBuffer; use crate::FoyerKey; use crate::FoyerValue; use crate::Inner; @@ -78,7 +79,7 @@ impl oio::Write for Writer { path: self.path.clone(), version: metadata.version().map(|v| v.to_string()), }, - FoyerValue(buffer), + FoyerValue::Buffer(CachedBuffer(buffer)), ); } Ok(metadata) From fbbd8a2bfe35c1521023365ffae083066a633291 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Fri, 30 Jan 2026 22:08:06 +0800 Subject: [PATCH 11/13] add more test case for the full mode --- core/layers/foyer/src/full.rs | 54 ++++--- core/layers/foyer/src/lib.rs | 266 ++++++++++++++++++++++++++++++++++ 2 files changed, 303 insertions(+), 17 deletions(-) diff --git a/core/layers/foyer/src/full.rs b/core/layers/foyer/src/full.rs index 2868d5c00826..a8e7c0cbbc04 100644 --- a/core/layers/foyer/src/full.rs +++ b/core/layers/foyer/src/full.rs @@ -112,34 +112,54 @@ impl FullReader { // If got entry from cache, slice it to the requested range. If it's larger than size_limit, // we'll simply forward the request to the underlying accessor with user's given range. - let buffer = match result { + match result { Ok(entry) => match entry.value() { - FoyerValue::Buffer(cached) => cached.0.clone(), + FoyerValue::Buffer(cached) => { + let full_buffer = cached.0.clone(); + let full_size = full_buffer.len() as u64; + let end = range_end.unwrap_or(full_size); + let sliced = full_buffer.slice(range_start as usize..end as usize); + let rp = make_rp_read(range_start, sliced.len() as u64, Some(full_size)); + return Ok((rp, sliced)); + } _ => { - // fallback to underlying accessor if cache miss - let (_, mut reader) = - self.inner.accessor.read(path, original_args).await?; - reader.read_all().await? + // fallback to underlying accessor if cached value missmatch, this should not + // happen, but if it does, let's simply regard it as a cache miss and fetch the + // data again. } }, Err(e) => match e.downcast::() { Ok(_) => { // fallback to underlying accessor if object size is too big - let (_, mut reader) = self.inner.accessor.read(path, original_args).await?; - reader.read_all().await? } Err(e) => return Err(extract_err(e)), }, }; - let end = range_end.unwrap_or(buffer.len() as u64); - let range = BytesContentRange::default() - .with_range(range_start, end - 1) - .with_size(buffer.len() as _); - let sliced = buffer.slice(range_start as usize..end as usize); - let rp = RpRead::new() - .with_size(Some(sliced.len() as _)) - .with_range(Some(range)); - Ok((rp, sliced)) + let (_, mut reader) = self.inner.accessor.read(path, original_args).await?; + let buffer = reader.read_all().await?; + let rp = make_rp_read(range_start, buffer.len() as u64, None); + Ok((rp, buffer)) + } +} + +/// Build RpRead with range information. +/// +/// - `range_start`: Start offset of the returned data +/// - `buffer_len`: Length of the returned buffer +/// - `total_size`: Total object size if known (for cache hit), None for fallback path +fn make_rp_read(range_start: u64, buffer_len: u64, total_size: Option) -> RpRead { + if buffer_len == 0 { + return RpRead::new().with_size(Some(0)); } + + let range_end = range_start + buffer_len - 1; + let mut range = BytesContentRange::default().with_range(range_start, range_end); + if let Some(size) = total_size { + range = range.with_size(size); + } + + RpRead::new() + .with_size(Some(buffer_len)) + .with_range(Some(range)) } diff --git a/core/layers/foyer/src/lib.rs b/core/layers/foyer/src/lib.rs index f250c880b9b9..955e85d2c4f3 100644 --- a/core/layers/foyer/src/lib.rs +++ b/core/layers/foyer/src/lib.rs @@ -446,6 +446,272 @@ mod tests { assert_eq!(read_small_range.to_vec(), small_data[0..1024]); } + #[tokio::test] + async fn test_full_fetch_size_too_large_fallback() { + let dir = tempfile::tempdir().unwrap(); + + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + // Size limit: 1KB to 5KB - files outside this range trigger FetchSizeTooLarge + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone()).with_size_limit(1024..5 * 1024)) + .finish(); + + // Large file exceeding upper limit + let large_data: Vec = (0..10 * 1024).map(|i| (i % 256) as u8).collect(); + op.write("large.bin", large_data.clone()).await.unwrap(); + + // Full read should work via FetchSizeTooLarge fallback + let read_full = op.read("large.bin").await.unwrap(); + assert_eq!(read_full.len(), large_data.len()); + assert_eq!(read_full.to_vec(), large_data); + + // Range read on large file should also work + let read_range = op.read_with("large.bin").range(1000..5000).await.unwrap(); + assert_eq!(read_range.len(), 4000); + assert_eq!(read_range.to_vec(), large_data[1000..5000]); + + // Verify cache was NOT populated for large file + let key = FoyerKey::Full { + path: "large.bin".to_string(), + version: None, + }; + let cached = cache.get(&key).await; + assert!( + cached.is_err() || cached.unwrap().is_none(), + "large file should not be cached" + ); + } + + #[tokio::test] + async fn test_full_below_minimum_size_limit() { + let dir = tempfile::tempdir().unwrap(); + + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + // Size limit: 1KB to 10KB - files smaller than 1KB trigger FetchSizeTooLarge + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone()).with_size_limit(1024..10 * 1024)) + .finish(); + + // Tiny file below minimum limit + let tiny_data = vec![42u8; 500]; + op.write("tiny.bin", tiny_data.clone()).await.unwrap(); + + // Should still be readable via fallback + let read_data = op.read("tiny.bin").await.unwrap(); + assert_eq!(read_data.to_vec(), tiny_data); + + // Range read should also work + let read_range = op.read_with("tiny.bin").range(100..300).await.unwrap(); + assert_eq!(read_range.len(), 200); + assert_eq!(read_range.to_vec(), tiny_data[100..300]); + + // Verify cache was NOT populated + let key = FoyerKey::Full { + path: "tiny.bin".to_string(), + version: None, + }; + let cached = cache.get(&key).await; + assert!( + cached.is_err() || cached.unwrap().is_none(), + "tiny file should not be cached" + ); + } + + #[tokio::test] + async fn test_full_at_size_limit_boundaries() { + let dir = tempfile::tempdir().unwrap(); + + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + // Size limit: exactly 1024..2048 (1KB to 2KB exclusive) + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone()).with_size_limit(1024..2048)) + .finish(); + + // File at exactly lower bound (1024 bytes) - should be cached + let at_lower = vec![1u8; 1024]; + op.write("at_lower.bin", at_lower.clone()).await.unwrap(); + let read = op.read("at_lower.bin").await.unwrap(); + assert_eq!(read.to_vec(), at_lower); + + // File just below lower bound (1023 bytes) - should NOT be cached + let below_lower = vec![2u8; 1023]; + op.write("below_lower.bin", below_lower.clone()) + .await + .unwrap(); + let read = op.read("below_lower.bin").await.unwrap(); + assert_eq!(read.to_vec(), below_lower); + + // File just below upper bound (2047 bytes) - should be cached + let below_upper = vec![3u8; 2047]; + op.write("below_upper.bin", below_upper.clone()) + .await + .unwrap(); + let read = op.read("below_upper.bin").await.unwrap(); + assert_eq!(read.to_vec(), below_upper); + + // File at exactly upper bound (2048 bytes) - should NOT be cached (exclusive) + let at_upper = vec![4u8; 2048]; + op.write("at_upper.bin", at_upper.clone()).await.unwrap(); + let read = op.read("at_upper.bin").await.unwrap(); + assert_eq!(read.to_vec(), at_upper); + + // Verify caching behavior + let key_at_lower = FoyerKey::Full { + path: "at_lower.bin".to_string(), + version: None, + }; + let key_below_lower = FoyerKey::Full { + path: "below_lower.bin".to_string(), + version: None, + }; + let key_below_upper = FoyerKey::Full { + path: "below_upper.bin".to_string(), + version: None, + }; + let key_at_upper = FoyerKey::Full { + path: "at_upper.bin".to_string(), + version: None, + }; + + // at_lower and below_upper should be cached + assert!(cache.get(&key_at_lower).await.unwrap().is_some()); + assert!(cache.get(&key_below_upper).await.unwrap().is_some()); + + // below_lower and at_upper should NOT be cached + let cached = cache.get(&key_below_lower).await; + assert!(cached.is_err() || cached.unwrap().is_none()); + let cached = cache.get(&key_at_upper).await; + assert!(cached.is_err() || cached.unwrap().is_none()); + } + + #[tokio::test] + async fn test_full_range_read_from_cache() { + let dir = tempfile::tempdir().unwrap(); + + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone())) + .finish(); + + let data: Vec = (0..5000).map(|i| (i % 256) as u8).collect(); + op.write("range_test.bin", data.clone()).await.unwrap(); + + // First read populates cache + let _ = op.read("range_test.bin").await.unwrap(); + + // Various range reads from cache + let read1 = op.read_with("range_test.bin").range(0..100).await.unwrap(); + assert_eq!(read1.to_vec(), data[0..100]); + + let read2 = op + .read_with("range_test.bin") + .range(1000..2000) + .await + .unwrap(); + assert_eq!(read2.to_vec(), data[1000..2000]); + + let read3 = op + .read_with("range_test.bin") + .range(4500..5000) + .await + .unwrap(); + assert_eq!(read3.to_vec(), data[4500..5000]); + + // Read to end (no explicit end) + let read4 = op + .read_with("range_test.bin") + .range(4000..) + .await + .unwrap(); + assert_eq!(read4.to_vec(), data[4000..]); + } + + #[tokio::test] + async fn test_full_empty_file() { + let dir = tempfile::tempdir().unwrap(); + + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + // Size limit starting from 0 to include empty files + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone()).with_size_limit(0..1024)) + .finish(); + + op.write("empty.bin", Vec::::new()).await.unwrap(); + + let read = op.read("empty.bin").await.unwrap(); + assert_eq!(read.len(), 0); + } + #[test] fn test_error() { let e = Error::new(ErrorKind::NotFound, "not found"); From 258ce054c24b39a4a20355556c4b9f28fd25978e Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Fri, 30 Jan 2026 22:22:15 +0800 Subject: [PATCH 12/13] cleanup unused code --- core/layers/foyer/src/chunked.rs | 81 -------------------------------- core/layers/foyer/src/lib.rs | 6 +-- 2 files changed, 1 insertion(+), 86 deletions(-) diff --git a/core/layers/foyer/src/chunked.rs b/core/layers/foyer/src/chunked.rs index 0dd94adb019c..506931655deb 100644 --- a/core/layers/foyer/src/chunked.rs +++ b/core/layers/foyer/src/chunked.rs @@ -96,7 +96,6 @@ impl ChunkedReader { .with_range(range_start, range_end.saturating_sub(1)) .with_size(object_size); - // Use non-contiguous buffer to avoid memory copy. // Buffer implements Iterator, so flatten() extracts all Bytes chunks, // then collect() uses FromIterator to create a non-contiguous Buffer. let buffer: Buffer = result_bufs.into_iter().flatten().collect(); @@ -207,42 +206,6 @@ struct ChunkInfo { length_in_chunk: usize, } -/// Align a range to chunk boundaries. -/// -/// Returns the aligned (start, end) positions that cover the original range. -/// The end position is exclusive. -/// -/// # Arguments -/// * `range_start` - Start of the requested range -/// * `range_end` - End of the requested range (exclusive) -/// * `chunk_size` - Size of each chunk -/// * `object_size` - Total size of the object -/// -/// # Returns -/// (aligned_start, aligned_end) - Both aligned to chunk boundaries, clamped to object_size -#[allow(dead_code)] -fn align_range( - range_start: u64, - range_end: u64, - chunk_size: usize, - object_size: u64, -) -> (u64, u64) { - let chunk_size = chunk_size as u64; - - // Align start down to chunk boundary - let aligned_start = (range_start / chunk_size) * chunk_size; - - // Align end up to chunk boundary, but don't exceed object size - let aligned_end = if range_end == 0 { - 0 - } else { - let chunks_needed = range_end.div_ceil(chunk_size); - (chunks_needed * chunk_size).min(object_size) - }; - - (aligned_start, aligned_end) -} - /// Split a range into individual chunk read operations. /// /// # Arguments @@ -303,50 +266,6 @@ fn split_range_into_chunks( mod tests { use super::*; - #[test] - fn test_align_range_basic() { - // Range within a single chunk - let (start, end) = align_range(100, 200, 1024, 4096); - assert_eq!(start, 0); - assert_eq!(end, 1024); - - // Range spanning multiple chunks - let (start, end) = align_range(1000, 3000, 1024, 4096); - assert_eq!(start, 0); - assert_eq!(end, 3072); - } - - #[test] - fn test_align_range_at_boundaries() { - // Start at chunk boundary - let (start, end) = align_range(1024, 2000, 1024, 4096); - assert_eq!(start, 1024); - assert_eq!(end, 2048); - - // End at chunk boundary - let (start, end) = align_range(100, 2048, 1024, 4096); - assert_eq!(start, 0); - assert_eq!(end, 2048); - - // Both at boundaries - let (start, end) = align_range(1024, 2048, 1024, 4096); - assert_eq!(start, 1024); - assert_eq!(end, 2048); - } - - #[test] - fn test_align_range_clamp_to_object_size() { - // Range extends beyond object - let (start, end) = align_range(3000, 5000, 1024, 4096); - assert_eq!(start, 2048); - assert_eq!(end, 4096); - - // Object smaller than chunk - let (start, end) = align_range(0, 500, 1024, 500); - assert_eq!(start, 0); - assert_eq!(end, 500); - } - #[test] fn test_split_range_single_chunk() { // Read within a single chunk diff --git a/core/layers/foyer/src/lib.rs b/core/layers/foyer/src/lib.rs index 955e85d2c4f3..8635a5a26da8 100644 --- a/core/layers/foyer/src/lib.rs +++ b/core/layers/foyer/src/lib.rs @@ -674,11 +674,7 @@ mod tests { assert_eq!(read3.to_vec(), data[4500..5000]); // Read to end (no explicit end) - let read4 = op - .read_with("range_test.bin") - .range(4000..) - .await - .unwrap(); + let read4 = op.read_with("range_test.bin").range(4000..).await.unwrap(); assert_eq!(read4.to_vec(), data[4000..]); } From fdeaec99c8a2c95e59293393f5b7c9e193a31254 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Fri, 30 Jan 2026 22:22:53 +0800 Subject: [PATCH 13/13] fix typo --- core/layers/foyer/src/full.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/layers/foyer/src/full.rs b/core/layers/foyer/src/full.rs index a8e7c0cbbc04..1a17e60dd442 100644 --- a/core/layers/foyer/src/full.rs +++ b/core/layers/foyer/src/full.rs @@ -123,7 +123,7 @@ impl FullReader { return Ok((rp, sliced)); } _ => { - // fallback to underlying accessor if cached value missmatch, this should not + // fallback to underlying accessor if cached value mismatch, this should not // happen, but if it does, let's simply regard it as a cache miss and fetch the // data again. }