diff --git a/core/layers/foyer/src/chunked.rs b/core/layers/foyer/src/chunked.rs new file mode 100644 index 000000000000..506931655deb --- /dev/null +++ b/core/layers/foyer/src/chunked.rs @@ -0,0 +1,414 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use opendal_core::Buffer; +use opendal_core::Result; +use opendal_core::raw::Access; +use opendal_core::raw::BytesContentRange; +use opendal_core::raw::BytesRange; +use opendal_core::raw::OpRead; +use opendal_core::raw::OpStat; +use opendal_core::raw::RpRead; +use opendal_core::raw::oio::Read; + +use crate::CachedBuffer; +use crate::CachedChunkMetadata; +use crate::FoyerKey; +use crate::FoyerValue; +use crate::Inner; + +/// ChunkedReader reads objects in fixed-size chunks, caching each chunk independently. +/// +/// This enables efficient partial reads of large objects by only fetching and caching +/// the chunks that cover the requested range. +pub struct ChunkedReader { + inner: Arc>, + chunk_size: usize, +} + +impl ChunkedReader { + pub fn new(inner: Arc>, chunk_size: usize) -> Self { + Self { inner, chunk_size } + } + + /// Read data from cache or underlying storage using chunked caching. + /// + /// The algorithm: + /// 1. Fetch object metadata (cached) to determine content_length + /// 2. Compute which chunks cover the requested range + /// 3. Fetch each chunk (from cache or backend) + /// 4. Assemble and slice the result to match the requested range + pub async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Buffer)> { + let version = args.version().map(|v| v.to_string()); + + // Extract range info from args + let range_start = args.range().offset(); + let range_size = args.range().size(); + + // Step 1: Fetch metadata + let metadata = self.fetch_metadata(path, version.clone()).await?; + let object_size = metadata.content_length; + + // Step 2: Compute chunks needed + let chunks = split_range_into_chunks(range_start, range_size, self.chunk_size, object_size); + + if chunks.is_empty() { + let rp = RpRead::new().with_size(Some(0)); + return Ok((rp, Buffer::new())); + } + + // Step 3: Fetch all needed chunks and assemble + let mut result_bufs = Vec::with_capacity(chunks.len()); + let mut total_len = 0u64; + + for chunk_info in &chunks { + let chunk_data = self + .fetch_chunk(path, version.clone(), chunk_info.chunk_index, object_size) + .await?; + + // Slice the chunk to get only the needed portion + let end = + (chunk_info.offset_in_chunk + chunk_info.length_in_chunk).min(chunk_data.len()); + let sliced = chunk_data.slice(chunk_info.offset_in_chunk..end); + total_len += sliced.len() as u64; + result_bufs.push(sliced); + } + + // Step 4: Assemble result + let range_end = range_start + total_len; + let range = BytesContentRange::default() + .with_range(range_start, range_end.saturating_sub(1)) + .with_size(object_size); + + // Buffer implements Iterator, so flatten() extracts all Bytes chunks, + // then collect() uses FromIterator to create a non-contiguous Buffer. + let buffer: Buffer = result_bufs.into_iter().flatten().collect(); + + let rp = RpRead::new() + .with_size(Some(buffer.len() as u64)) + .with_range(Some(range)); + + Ok((rp, buffer)) + } + + /// Fetch object metadata from cache or backend. + async fn fetch_metadata( + &self, + path: &str, + version: Option, + ) -> Result { + let key = FoyerKey::ChunkMetadata { + path: path.to_string(), + chunk_size: self.chunk_size, + version: version.clone(), + }; + + // Try cache first + if let Ok(Some(entry)) = self.inner.cache.get(&key).await { + if let FoyerValue::ChunkMetadata(cached) = entry.value() { + return Ok(cached.clone()); + } + // Cache entry is wrong variant, fall through to re-fetch + } + + // Cache miss - fetch from backend + let metadata = self + .inner + .accessor + .stat(path, OpStat::default()) + .await? + .into_metadata(); + + let cached = CachedChunkMetadata { + content_length: metadata.content_length(), + version: metadata.version().map(|v| v.to_string()), + etag: metadata.etag().map(|v| v.to_string()), + }; + + // Insert into cache + self.inner + .cache + .insert(key, FoyerValue::ChunkMetadata(cached.clone())); + + Ok(cached) + } + + /// Fetch a single chunk from cache or backend. + /// + /// Uses simple get/insert pattern: check cache first, on miss read from backend + /// and insert into cache. + async fn fetch_chunk( + &self, + path: &str, + version: Option, + chunk_index: u64, + object_size: u64, + ) -> Result { + let key = FoyerKey::Chunk { + path: path.to_string(), + chunk_size: self.chunk_size, + chunk_index, + version: version.clone(), + }; + + // Try cache first + if let Ok(Some(entry)) = self.inner.cache.get(&key).await { + if let FoyerValue::Buffer(cached) = entry.value() { + return Ok(cached.0.clone()); + } + } + + // Cache miss - fetch from backend + let chunk_start = chunk_index * self.chunk_size as u64; + let chunk_end = ((chunk_index + 1) * self.chunk_size as u64).min(object_size); + let range = BytesRange::new(chunk_start, Some(chunk_end - chunk_start)); + + let (_, mut reader) = self + .inner + .accessor + .read(path, OpRead::default().with_range(range)) + .await?; + let buffer = reader.read_all().await?; + + // Insert into cache + self.inner + .cache + .insert(key, FoyerValue::Buffer(CachedBuffer(buffer.clone()))); + + Ok(buffer) + } +} + +/// Information about a chunk needed to satisfy a read request. +#[derive(Debug, Clone, PartialEq, Eq)] +struct ChunkInfo { + /// The index of this chunk (0-based) + chunk_index: u64, + /// Offset within this chunk where the requested data starts + offset_in_chunk: usize, + /// Length of data to read from this chunk + length_in_chunk: usize, +} + +/// Split a range into individual chunk read operations. +/// +/// # Arguments +/// * `range_start` - Start of the requested range (inclusive) +/// * `range_size` - Size of the requested range, or None for "read to end" +/// * `chunk_size` - Size of each chunk +/// * `object_size` - Total size of the object +/// +/// # Returns +/// A vector of `ChunkInfo` describing each chunk that needs to be fetched +/// and how to slice the data from each chunk. +fn split_range_into_chunks( + range_start: u64, + range_size: Option, + chunk_size: usize, + object_size: u64, +) -> Vec { + if object_size == 0 { + return vec![]; + } + + let chunk_size_u64 = chunk_size as u64; + let range_end = match range_size { + Some(size) => (range_start + size).min(object_size), + None => object_size, + }; + + if range_start >= range_end { + return vec![]; + } + + let first_chunk_index = range_start / chunk_size_u64; + let last_chunk_index = (range_end - 1) / chunk_size_u64; + + let mut chunks = Vec::with_capacity((last_chunk_index - first_chunk_index + 1) as usize); + + for chunk_index in first_chunk_index..=last_chunk_index { + let chunk_start = chunk_index * chunk_size_u64; + let chunk_end = ((chunk_index + 1) * chunk_size_u64).min(object_size); + + // Calculate the intersection of [range_start, range_end) with [chunk_start, chunk_end) + let read_start = range_start.max(chunk_start); + let read_end = range_end.min(chunk_end); + + if read_start < read_end { + chunks.push(ChunkInfo { + chunk_index, + offset_in_chunk: (read_start - chunk_start) as usize, + length_in_chunk: (read_end - read_start) as usize, + }); + } + } + + chunks +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_split_range_single_chunk() { + // Read within a single chunk + let chunks = split_range_into_chunks(100, Some(200), 1024, 4096); + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].chunk_index, 0); + assert_eq!(chunks[0].offset_in_chunk, 100); + assert_eq!(chunks[0].length_in_chunk, 200); + } + + #[test] + fn test_split_range_multiple_chunks() { + // Read spanning 3 chunks + let chunks = split_range_into_chunks(500, Some(2000), 1024, 4096); + assert_eq!(chunks.len(), 3); + + // First chunk: 500-1024 (524 bytes) + assert_eq!(chunks[0].chunk_index, 0); + assert_eq!(chunks[0].offset_in_chunk, 500); + assert_eq!(chunks[0].length_in_chunk, 524); + + // Second chunk: 1024-2048 (1024 bytes) + assert_eq!(chunks[1].chunk_index, 1); + assert_eq!(chunks[1].offset_in_chunk, 0); + assert_eq!(chunks[1].length_in_chunk, 1024); + + // Third chunk: 2048-2500 (452 bytes) + assert_eq!(chunks[2].chunk_index, 2); + assert_eq!(chunks[2].offset_in_chunk, 0); + assert_eq!(chunks[2].length_in_chunk, 452); + } + + #[test] + fn test_split_range_read_to_end() { + // Read from offset to end of object + let chunks = split_range_into_chunks(1500, None, 1024, 3000); + assert_eq!(chunks.len(), 2); + + // First chunk: 1500-2048 (548 bytes) + assert_eq!(chunks[0].chunk_index, 1); + assert_eq!(chunks[0].offset_in_chunk, 476); + assert_eq!(chunks[0].length_in_chunk, 548); + + // Second chunk: 2048-3000 (952 bytes) + assert_eq!(chunks[1].chunk_index, 2); + assert_eq!(chunks[1].offset_in_chunk, 0); + assert_eq!(chunks[1].length_in_chunk, 952); + } + + #[test] + fn test_split_range_entire_object() { + // Read entire object + let chunks = split_range_into_chunks(0, None, 1024, 2500); + assert_eq!(chunks.len(), 3); + + assert_eq!(chunks[0].chunk_index, 0); + assert_eq!(chunks[0].offset_in_chunk, 0); + assert_eq!(chunks[0].length_in_chunk, 1024); + + assert_eq!(chunks[1].chunk_index, 1); + assert_eq!(chunks[1].offset_in_chunk, 0); + assert_eq!(chunks[1].length_in_chunk, 1024); + + assert_eq!(chunks[2].chunk_index, 2); + assert_eq!(chunks[2].offset_in_chunk, 0); + assert_eq!(chunks[2].length_in_chunk, 452); + } + + #[test] + fn test_split_range_empty_object() { + let chunks = split_range_into_chunks(0, None, 1024, 0); + assert!(chunks.is_empty()); + } + + #[test] + fn test_split_range_empty_range() { + // Zero-length range + let chunks = split_range_into_chunks(100, Some(0), 1024, 4096); + assert!(chunks.is_empty()); + + // Start equals end + let chunks = split_range_into_chunks(1024, Some(0), 1024, 4096); + assert!(chunks.is_empty()); + } + + #[test] + fn test_split_range_beyond_object() { + // Range starts beyond object + let chunks = split_range_into_chunks(5000, Some(100), 1024, 4096); + assert!(chunks.is_empty()); + + // Range partially beyond object + let chunks = split_range_into_chunks(4000, Some(500), 1024, 4096); + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].chunk_index, 3); + assert_eq!(chunks[0].offset_in_chunk, 928); + assert_eq!(chunks[0].length_in_chunk, 96); + } + + #[test] + fn test_split_range_exact_chunk_boundaries() { + // Read exactly one full chunk + let chunks = split_range_into_chunks(1024, Some(1024), 1024, 4096); + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].chunk_index, 1); + assert_eq!(chunks[0].offset_in_chunk, 0); + assert_eq!(chunks[0].length_in_chunk, 1024); + + // Read exactly two full chunks + let chunks = split_range_into_chunks(0, Some(2048), 1024, 4096); + assert_eq!(chunks.len(), 2); + assert_eq!(chunks[0].length_in_chunk, 1024); + assert_eq!(chunks[1].length_in_chunk, 1024); + } + + #[test] + fn test_split_range_object_not_chunk_aligned() { + // Object size is not a multiple of chunk size + let chunks = split_range_into_chunks(0, None, 1024, 1500); + assert_eq!(chunks.len(), 2); + + assert_eq!(chunks[0].chunk_index, 0); + assert_eq!(chunks[0].length_in_chunk, 1024); + + assert_eq!(chunks[1].chunk_index, 1); + assert_eq!(chunks[1].offset_in_chunk, 0); + assert_eq!(chunks[1].length_in_chunk, 476); // Last chunk is smaller + } + + #[test] + fn test_split_range_last_partial_chunk() { + // Read that ends in the middle of the last partial chunk + let chunks = split_range_into_chunks(1000, Some(400), 1024, 1500); + assert_eq!(chunks.len(), 2); + + // First chunk: 1000-1024 + assert_eq!(chunks[0].chunk_index, 0); + assert_eq!(chunks[0].offset_in_chunk, 1000); + assert_eq!(chunks[0].length_in_chunk, 24); + + // Second chunk: 1024-1400 + assert_eq!(chunks[1].chunk_index, 1); + assert_eq!(chunks[1].offset_in_chunk, 0); + assert_eq!(chunks[1].length_in_chunk, 376); + } +} diff --git a/core/layers/foyer/src/deleter.rs b/core/layers/foyer/src/deleter.rs index 1de6b662ea3a..de4a6ffea060 100644 --- a/core/layers/foyer/src/deleter.rs +++ b/core/layers/foyer/src/deleter.rs @@ -29,14 +29,17 @@ pub struct Deleter { pub(crate) deleter: A::Deleter, pub(crate) keys: Vec, pub(crate) inner: Arc>, + skip_cache: bool, } impl Deleter { pub(crate) fn new(deleter: A::Deleter, inner: Arc>) -> Self { + let skip_cache = inner.chunk_size.is_some(); Self { deleter, keys: vec![], inner, + skip_cache, } } } @@ -44,16 +47,20 @@ impl Deleter { impl oio::Delete for Deleter { async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { self.deleter.delete(path, args.clone()).await?; - self.keys.push(FoyerKey { - path: path.to_string(), - version: args.version().map(|v| v.to_string()), - }); + if !self.skip_cache { + self.keys.push(FoyerKey::Full { + path: path.to_string(), + version: args.version().map(|v| v.to_string()), + }); + } Ok(()) } async fn close(&mut self) -> Result<()> { - for key in &self.keys { - self.inner.cache.remove(key); + if !self.skip_cache { + for key in &self.keys { + self.inner.cache.remove(key); + } } self.deleter.close().await } diff --git a/core/layers/foyer/src/full.rs b/core/layers/foyer/src/full.rs index 95772d01214d..1a17e60dd442 100644 --- a/core/layers/foyer/src/full.rs +++ b/core/layers/foyer/src/full.rs @@ -29,6 +29,7 @@ use opendal_core::raw::OpStat; use opendal_core::raw::RpRead; use opendal_core::raw::oio::Read; +use crate::CachedBuffer; use crate::FoyerKey; use crate::FoyerValue; use crate::Inner; @@ -70,7 +71,7 @@ impl FullReader { .inner .cache .fetch( - FoyerKey { + FoyerKey::Full { path: path_str.clone(), version: version.clone(), }, @@ -103,7 +104,7 @@ impl FullReader { .map_err(FoyerError::other)?; let buffer = reader.read_all().await.map_err(FoyerError::other)?; - Ok(FoyerValue(buffer)) + Ok(FoyerValue::Buffer(CachedBuffer(buffer))) } }, ) @@ -112,25 +113,53 @@ impl FullReader { // If got entry from cache, slice it to the requested range. If it's larger than size_limit, // we'll simply forward the request to the underlying accessor with user's given range. match result { - Ok(entry) => { - let end = range_end.unwrap_or(entry.len() as u64); - let range = BytesContentRange::default() - .with_range(range_start, end - 1) - .with_size(entry.len() as _); - let buffer = entry.slice(range_start as usize..end as usize); - let rp = RpRead::new() - .with_size(Some(buffer.len() as _)) - .with_range(Some(range)); - Ok((rp, buffer)) - } + Ok(entry) => match entry.value() { + FoyerValue::Buffer(cached) => { + let full_buffer = cached.0.clone(); + let full_size = full_buffer.len() as u64; + let end = range_end.unwrap_or(full_size); + let sliced = full_buffer.slice(range_start as usize..end as usize); + let rp = make_rp_read(range_start, sliced.len() as u64, Some(full_size)); + return Ok((rp, sliced)); + } + _ => { + // fallback to underlying accessor if cached value mismatch, this should not + // happen, but if it does, let's simply regard it as a cache miss and fetch the + // data again. + } + }, Err(e) => match e.downcast::() { Ok(_) => { - let (rp, mut reader) = self.inner.accessor.read(path, original_args).await?; - let buffer = reader.read_all().await?; - Ok((rp, buffer)) + // fallback to underlying accessor if object size is too big } - Err(e) => Err(extract_err(e)), + Err(e) => return Err(extract_err(e)), }, - } + }; + + let (_, mut reader) = self.inner.accessor.read(path, original_args).await?; + let buffer = reader.read_all().await?; + let rp = make_rp_read(range_start, buffer.len() as u64, None); + Ok((rp, buffer)) } } + +/// Build RpRead with range information. +/// +/// - `range_start`: Start offset of the returned data +/// - `buffer_len`: Length of the returned buffer +/// - `total_size`: Total object size if known (for cache hit), None for fallback path +fn make_rp_read(range_start: u64, buffer_len: u64, total_size: Option) -> RpRead { + if buffer_len == 0 { + return RpRead::new().with_size(Some(0)); + } + + let range_end = range_start + buffer_len - 1; + let mut range = BytesContentRange::default().with_range(range_start, range_end); + if let Some(size) = total_size { + range = range.with_size(size); + } + + RpRead::new() + .with_size(Some(buffer_len)) + .with_range(Some(range)) +} diff --git a/core/layers/foyer/src/lib.rs b/core/layers/foyer/src/lib.rs index 06b9c389eb07..8635a5a26da8 100644 --- a/core/layers/foyer/src/lib.rs +++ b/core/layers/foyer/src/lib.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +mod chunked; mod deleter; mod error; mod full; @@ -22,11 +23,11 @@ mod writer; use std::{ future::Future, - ops::{Bound, Deref, Range, RangeBounds}, + ops::{Bound, Range, RangeBounds}, sync::Arc, }; -use foyer::{Code, CodeError, HybridCache}; +use foyer::HybridCache; use opendal_core::raw::*; use opendal_core::*; @@ -42,47 +43,73 @@ pub use writer::Writer; /// - If a version is given, the object is cached under that versioned key. /// - If version is not supplied, the object is cached exactly as returned by the backend, /// We do NOT interpret `None` as "latest" and we do not promote it to any other version. +/// +/// # Variants +/// +/// - `Full`: Caches the entire object (used in non-chunked mode) +/// - `ChunkMetadata`: Caches object metadata for chunked mode (content_length, version, etag) +/// - `Chunk`: Caches a specific chunk of the object #[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] -pub struct FoyerKey { - pub path: String, - pub version: Option, +pub enum FoyerKey { + /// Cache key for the entire object (non-chunked mode) + Full { + path: String, + version: Option, + }, + /// Cache key for object metadata (chunked mode) + ChunkMetadata { + path: String, + chunk_size: usize, + version: Option, + }, + /// Cache key for a specific chunk of the object + Chunk { + path: String, + chunk_size: usize, + chunk_index: u64, + version: Option, + }, } -/// [`FoyerValue`] is a wrapper around `Buffer` that implements the `Code` trait. -#[derive(Debug)] -pub struct FoyerValue(pub Buffer); +/// [`FoyerValue`] is a value type for the foyer cache that can store either +/// a raw buffer or cached metadata. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum FoyerValue { + /// Raw buffer data (used for full objects and chunks) + Buffer(CachedBuffer), + /// Cached metadata for chunked mode + ChunkMetadata(CachedChunkMetadata), +} -impl Deref for FoyerValue { - type Target = Buffer; +/// A serializable wrapper around [`Buffer`]. +#[derive(Debug, Clone)] +pub struct CachedBuffer(pub Buffer); - fn deref(&self) -> &Self::Target { - &self.0 +impl serde::Serialize for CachedBuffer { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + serde::Serialize::serialize(&self.0.to_vec(), serializer) } } -impl Code for FoyerValue { - fn encode(&self, writer: &mut impl std::io::Write) -> std::result::Result<(), CodeError> { - let len = self.0.len() as u64; - writer.write_all(&len.to_le_bytes())?; - std::io::copy(&mut self.0.clone(), writer)?; - Ok(()) - } - - fn decode(reader: &mut impl std::io::Read) -> std::result::Result +impl<'de> serde::Deserialize<'de> for CachedBuffer { + fn deserialize(deserializer: D) -> std::result::Result where - Self: Sized, + D: serde::Deserializer<'de>, { - let mut len_bytes = [0u8; 8]; - reader.read_exact(&mut len_bytes)?; - let len = u64::from_le_bytes(len_bytes) as usize; - let mut buffer = vec![0u8; len]; - reader.read_exact(&mut buffer[..len])?; - Ok(FoyerValue(buffer.into())) + let vec = Vec::::deserialize(deserializer)?; + Ok(CachedBuffer(Buffer::from(vec))) } +} - fn estimated_size(&self) -> usize { - 8 + self.0.len() - } +/// Cached metadata for an object in chunked mode. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct CachedChunkMetadata { + pub content_length: u64, + pub version: Option, + pub etag: Option, } /// Hybrid cache layer for OpenDAL that uses [foyer](https://github.com/foyer-rs/foyer) for caching. @@ -93,6 +120,18 @@ impl Code for FoyerValue { /// - `delete`: [`FoyerLayer`] will remove the data from the foyer hybrid cache regardless of whether the service's delete operation is successful. /// - Other operations: [`FoyerLayer`] will not cache the results of other operations, such as `list`, `copy`, `rename`, etc. They will be passed through to the underlying accessor without caching. /// +/// # Caching Modes +/// +/// ## Full Mode (default) +/// Caches the entire object as a single cache entry. Best for small to medium-sized objects. +/// +/// ## Chunked Mode +/// When `chunk_size` is set via [`with_chunk_size`](FoyerLayer::with_chunk_size), objects are cached +/// in fixed-size chunks. This improves cache efficiency for large files with partial reads: +/// - Only the chunks covering the requested range are fetched and cached +/// - Subsequent reads of overlapping ranges reuse cached chunks +/// - Each chunk is cached independently, allowing fine-grained cache eviction +/// /// # Examples /// /// ```no_run @@ -108,8 +147,14 @@ impl Code for FoyerValue { /// .build() /// .await?; /// +/// // Full mode (default) /// let op = Operator::new(Memory::default())? -/// .layer(FoyerLayer::new(cache)) +/// .layer(FoyerLayer::new(cache.clone())) +/// .finish(); +/// +/// // Chunked mode with 4MB chunks +/// let op_chunked = Operator::new(Memory::default())? +/// .layer(FoyerLayer::new(cache).with_chunk_size(4 * 1024 * 1024)) /// .finish(); /// # Ok(()) /// # } @@ -122,6 +167,7 @@ impl Code for FoyerValue { pub struct FoyerLayer { cache: HybridCache, size_limit: Range, + chunk_size: Option, } impl FoyerLayer { @@ -130,6 +176,7 @@ impl FoyerLayer { FoyerLayer { cache, size_limit: 0..usize::MAX, + chunk_size: None, } } @@ -150,6 +197,25 @@ impl FoyerLayer { self.size_limit = start..end; self } + + /// Enables chunked caching mode with the specified chunk size. + /// + /// When enabled, objects are cached in fixed-size chunks rather than as whole objects. + /// This is beneficial for: + /// - Large files where only portions are typically read + /// - Random access patterns within files + /// - Reducing memory pressure by caching only needed chunks + /// + /// # Arguments + /// * `chunk_size` - The size of each chunk in bytes. Recommended: 1MB - 16MB. + /// + /// # Note + /// - The `size_limit` setting is ignored in chunked mode + /// - Writer and Deleter still operate in full mode for consistency + pub fn with_chunk_size(mut self, chunk_size: usize) -> Self { + self.chunk_size = Some(chunk_size); + self + } } impl Layer for FoyerLayer { @@ -162,6 +228,7 @@ impl Layer for FoyerLayer { accessor, cache, size_limit: self.size_limit.clone(), + chunk_size: self.chunk_size, }), } } @@ -172,6 +239,7 @@ pub(crate) struct Inner { pub(crate) accessor: A, pub(crate) cache: HybridCache, pub(crate) size_limit: Range, + pub(crate) chunk_size: Option, } #[derive(Debug)] @@ -195,9 +263,18 @@ impl LayeredAccess for FoyerAccessor { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - full::FullReader::new(self.inner.clone(), self.inner.size_limit.clone()) - .read(path, args) - .await + match self.inner.chunk_size { + None => { + full::FullReader::new(self.inner.clone(), self.inner.size_limit.clone()) + .read(path, args) + .await + } + Some(chunk_size) => { + chunked::ChunkedReader::new(self.inner.clone(), chunk_size) + .read(path, args) + .await + } + } } fn write( @@ -232,8 +309,8 @@ impl LayeredAccess for FoyerAccessor { #[cfg(test)] mod tests { use foyer::{ - DirectFsDeviceOptions, Engine, Error as FoyerError, HybridCacheBuilder, LargeEngineOptions, - RecoverMode, + Code, DirectFsDeviceOptions, Engine, Error as FoyerError, HybridCacheBuilder, + LargeEngineOptions, RecoverMode, }; use opendal_core::{Operator, services::Memory}; use size::consts::MiB; @@ -369,6 +446,268 @@ mod tests { assert_eq!(read_small_range.to_vec(), small_data[0..1024]); } + #[tokio::test] + async fn test_full_fetch_size_too_large_fallback() { + let dir = tempfile::tempdir().unwrap(); + + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + // Size limit: 1KB to 5KB - files outside this range trigger FetchSizeTooLarge + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone()).with_size_limit(1024..5 * 1024)) + .finish(); + + // Large file exceeding upper limit + let large_data: Vec = (0..10 * 1024).map(|i| (i % 256) as u8).collect(); + op.write("large.bin", large_data.clone()).await.unwrap(); + + // Full read should work via FetchSizeTooLarge fallback + let read_full = op.read("large.bin").await.unwrap(); + assert_eq!(read_full.len(), large_data.len()); + assert_eq!(read_full.to_vec(), large_data); + + // Range read on large file should also work + let read_range = op.read_with("large.bin").range(1000..5000).await.unwrap(); + assert_eq!(read_range.len(), 4000); + assert_eq!(read_range.to_vec(), large_data[1000..5000]); + + // Verify cache was NOT populated for large file + let key = FoyerKey::Full { + path: "large.bin".to_string(), + version: None, + }; + let cached = cache.get(&key).await; + assert!( + cached.is_err() || cached.unwrap().is_none(), + "large file should not be cached" + ); + } + + #[tokio::test] + async fn test_full_below_minimum_size_limit() { + let dir = tempfile::tempdir().unwrap(); + + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + // Size limit: 1KB to 10KB - files smaller than 1KB trigger FetchSizeTooLarge + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone()).with_size_limit(1024..10 * 1024)) + .finish(); + + // Tiny file below minimum limit + let tiny_data = vec![42u8; 500]; + op.write("tiny.bin", tiny_data.clone()).await.unwrap(); + + // Should still be readable via fallback + let read_data = op.read("tiny.bin").await.unwrap(); + assert_eq!(read_data.to_vec(), tiny_data); + + // Range read should also work + let read_range = op.read_with("tiny.bin").range(100..300).await.unwrap(); + assert_eq!(read_range.len(), 200); + assert_eq!(read_range.to_vec(), tiny_data[100..300]); + + // Verify cache was NOT populated + let key = FoyerKey::Full { + path: "tiny.bin".to_string(), + version: None, + }; + let cached = cache.get(&key).await; + assert!( + cached.is_err() || cached.unwrap().is_none(), + "tiny file should not be cached" + ); + } + + #[tokio::test] + async fn test_full_at_size_limit_boundaries() { + let dir = tempfile::tempdir().unwrap(); + + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + // Size limit: exactly 1024..2048 (1KB to 2KB exclusive) + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone()).with_size_limit(1024..2048)) + .finish(); + + // File at exactly lower bound (1024 bytes) - should be cached + let at_lower = vec![1u8; 1024]; + op.write("at_lower.bin", at_lower.clone()).await.unwrap(); + let read = op.read("at_lower.bin").await.unwrap(); + assert_eq!(read.to_vec(), at_lower); + + // File just below lower bound (1023 bytes) - should NOT be cached + let below_lower = vec![2u8; 1023]; + op.write("below_lower.bin", below_lower.clone()) + .await + .unwrap(); + let read = op.read("below_lower.bin").await.unwrap(); + assert_eq!(read.to_vec(), below_lower); + + // File just below upper bound (2047 bytes) - should be cached + let below_upper = vec![3u8; 2047]; + op.write("below_upper.bin", below_upper.clone()) + .await + .unwrap(); + let read = op.read("below_upper.bin").await.unwrap(); + assert_eq!(read.to_vec(), below_upper); + + // File at exactly upper bound (2048 bytes) - should NOT be cached (exclusive) + let at_upper = vec![4u8; 2048]; + op.write("at_upper.bin", at_upper.clone()).await.unwrap(); + let read = op.read("at_upper.bin").await.unwrap(); + assert_eq!(read.to_vec(), at_upper); + + // Verify caching behavior + let key_at_lower = FoyerKey::Full { + path: "at_lower.bin".to_string(), + version: None, + }; + let key_below_lower = FoyerKey::Full { + path: "below_lower.bin".to_string(), + version: None, + }; + let key_below_upper = FoyerKey::Full { + path: "below_upper.bin".to_string(), + version: None, + }; + let key_at_upper = FoyerKey::Full { + path: "at_upper.bin".to_string(), + version: None, + }; + + // at_lower and below_upper should be cached + assert!(cache.get(&key_at_lower).await.unwrap().is_some()); + assert!(cache.get(&key_below_upper).await.unwrap().is_some()); + + // below_lower and at_upper should NOT be cached + let cached = cache.get(&key_below_lower).await; + assert!(cached.is_err() || cached.unwrap().is_none()); + let cached = cache.get(&key_at_upper).await; + assert!(cached.is_err() || cached.unwrap().is_none()); + } + + #[tokio::test] + async fn test_full_range_read_from_cache() { + let dir = tempfile::tempdir().unwrap(); + + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone())) + .finish(); + + let data: Vec = (0..5000).map(|i| (i % 256) as u8).collect(); + op.write("range_test.bin", data.clone()).await.unwrap(); + + // First read populates cache + let _ = op.read("range_test.bin").await.unwrap(); + + // Various range reads from cache + let read1 = op.read_with("range_test.bin").range(0..100).await.unwrap(); + assert_eq!(read1.to_vec(), data[0..100]); + + let read2 = op + .read_with("range_test.bin") + .range(1000..2000) + .await + .unwrap(); + assert_eq!(read2.to_vec(), data[1000..2000]); + + let read3 = op + .read_with("range_test.bin") + .range(4500..5000) + .await + .unwrap(); + assert_eq!(read3.to_vec(), data[4500..5000]); + + // Read to end (no explicit end) + let read4 = op.read_with("range_test.bin").range(4000..).await.unwrap(); + assert_eq!(read4.to_vec(), data[4000..]); + } + + #[tokio::test] + async fn test_full_empty_file() { + let dir = tempfile::tempdir().unwrap(); + + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + // Size limit starting from 0 to include empty files + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone()).with_size_limit(0..1024)) + .finish(); + + op.write("empty.bin", Vec::::new()).await.unwrap(); + + let read = op.read("empty.bin").await.unwrap(); + assert_eq!(read.len(), 0); + } + #[test] fn test_error() { let e = Error::new(ErrorKind::NotFound, "not found"); @@ -379,12 +718,12 @@ mod tests { #[test] fn test_foyer_key_version_none_vs_empty() { - let key_none = FoyerKey { + let key_none = FoyerKey::Full { path: "test/path".to_string(), version: None, }; - let key_empty = FoyerKey { + let key_empty = FoyerKey::Full { path: "test/path".to_string(), version: Some("".to_string()), }; @@ -410,35 +749,59 @@ mod tests { fn test_foyer_key_serde() { use std::io::Cursor; - let test_cases = vec![ - FoyerKey { + let test_cases: Vec = vec![ + FoyerKey::Full { path: "simple".to_string(), version: None, }, - FoyerKey { + FoyerKey::Full { path: "with/slash/path".to_string(), version: None, }, - FoyerKey { + FoyerKey::Full { path: "versioned".to_string(), version: Some("v1.0.0".to_string()), }, - FoyerKey { + FoyerKey::Full { path: "empty-version".to_string(), version: Some("".to_string()), }, - FoyerKey { + FoyerKey::Full { path: "".to_string(), version: None, }, - FoyerKey { + FoyerKey::Full { path: "unicode/θ·―εΎ„/πŸš€".to_string(), version: Some("η‰ˆζœ¬-1".to_string()), }, - FoyerKey { + FoyerKey::Full { path: "long/".to_string().repeat(100), version: Some("long-version-".to_string().repeat(50)), }, + // Test Metadata variant + FoyerKey::ChunkMetadata { + path: "meta/path".to_string(), + chunk_size: 1024, + version: None, + }, + FoyerKey::ChunkMetadata { + path: "meta/versioned".to_string(), + chunk_size: 4096, + version: Some("v2".to_string()), + }, + // Test Chunk variant + FoyerKey::Chunk { + path: "chunk/path".to_string(), + chunk_size: 1024, + chunk_index: 0, + version: None, + }, + FoyerKey::Chunk { + path: "chunk/versioned".to_string(), + chunk_size: 4096, + chunk_index: 42, + version: Some("v3".to_string()), + }, ]; for original in test_cases { @@ -456,4 +819,603 @@ mod tests { ); } } + + #[tokio::test] + async fn test_chunked_single_chunk() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache).with_chunk_size(1024)) + .finish(); + + let data = vec![42u8; 500]; + op.write("test.bin", data.clone()).await.unwrap(); + + // Read within single chunk + let read_data = op.read_with("test.bin").range(100..300).await.unwrap(); + assert_eq!(read_data.len(), 200); + assert_eq!(read_data.to_vec(), data[100..300]); + } + + #[tokio::test] + async fn test_chunked_multiple_chunks() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache).with_chunk_size(1024)) + .finish(); + + // 3KB file across 3 chunks + let data: Vec = (0..3000).map(|i| (i % 256) as u8).collect(); + op.write("large.bin", data.clone()).await.unwrap(); + + // Read spanning multiple chunks (byte 500 to 2500 covers 3 chunks) + let read_data = op.read_with("large.bin").range(500..2500).await.unwrap(); + assert_eq!(read_data.len(), 2000); + assert_eq!(read_data.to_vec(), data[500..2500]); + } + + #[tokio::test] + async fn test_chunked_full_read() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache).with_chunk_size(1024)) + .finish(); + + let data = vec![123u8; 2500]; + op.write("full.bin", data.clone()).await.unwrap(); + + // Read entire file + let read_data = op.read("full.bin").await.unwrap(); + assert_eq!(read_data.len(), 2500); + assert_eq!(read_data.to_vec(), data); + } + + #[tokio::test] + async fn test_chunked_last_partial_chunk() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache).with_chunk_size(1024)) + .finish(); + + // 1500 bytes = 1 full chunk + 476 bytes + let data: Vec = (0..1500).map(|i| (i % 256) as u8).collect(); + op.write("partial.bin", data.clone()).await.unwrap(); + + // Read from last partial chunk + let read_data = op.read_with("partial.bin").range(1200..1500).await.unwrap(); + assert_eq!(read_data.len(), 300); + assert_eq!(read_data.to_vec(), data[1200..1500]); + } + + #[tokio::test] + async fn test_chunked_cache_hit() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone()).with_chunk_size(1024)) + .finish(); + + let data = vec![99u8; 3000]; + op.write("cached.bin", data.clone()).await.unwrap(); + + // First read to populate cache + let read1 = op.read_with("cached.bin").range(1000..2000).await.unwrap(); + assert_eq!(read1.to_vec(), data[1000..2000]); + + // Second read should hit cache for overlapping chunks + let read2 = op.read_with("cached.bin").range(1500..2500).await.unwrap(); + assert_eq!(read2.to_vec(), data[1500..2500]); + } + + #[tokio::test] + async fn test_chunked_empty_file() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache).with_chunk_size(1024)) + .finish(); + + op.write("empty.bin", Vec::::new()).await.unwrap(); + + let read_data = op.read("empty.bin").await.unwrap(); + assert_eq!(read_data.len(), 0); + } + + #[tokio::test] + async fn test_chunked_exact_boundaries() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache).with_chunk_size(1024)) + .finish(); + + let data = vec![77u8; 3072]; // Exactly 3 chunks + op.write("aligned.bin", data.clone()).await.unwrap(); + + // Read exactly one chunk + let read_data = op.read_with("aligned.bin").range(1024..2048).await.unwrap(); + assert_eq!(read_data.len(), 1024); + assert_eq!(read_data.to_vec(), data[1024..2048]); + } + + #[tokio::test] + async fn test_chunked_metadata_caching() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone()).with_chunk_size(1024)) + .finish(); + + let data = vec![42u8; 5000]; + op.write("meta.bin", data.clone()).await.unwrap(); + + // First read to cache metadata and chunks + let read1 = op.read_with("meta.bin").range(0..1000).await.unwrap(); + assert_eq!(read1.len(), 1000); + + // Clear only data cache (metadata should remain if properly keyed) + // Second read from different range - metadata should be cached + let read2 = op.read_with("meta.bin").range(3000..4000).await.unwrap(); + assert_eq!(read2.len(), 1000); + assert_eq!(read2.to_vec(), data[3000..4000]); + } + + #[tokio::test] + async fn test_chunked_different_chunk_sizes() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(2 * 1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let data = vec![88u8; 10000]; + + // Test with 1KB chunks + let op1 = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone()).with_chunk_size(1024)) + .finish(); + op1.write("chunk_size_test.bin", data.clone()) + .await + .unwrap(); + let read1 = op1 + .read_with("chunk_size_test.bin") + .range(2000..4000) + .await + .unwrap(); + assert_eq!(read1.len(), 2000); + + // Test with 2KB chunks - should create different cache keys + let op2 = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone()).with_chunk_size(2048)) + .finish(); + op2.write("chunk_size_test2.bin", data.clone()) + .await + .unwrap(); + let read2 = op2 + .read_with("chunk_size_test2.bin") + .range(2000..4000) + .await + .unwrap(); + assert_eq!(read2.len(), 2000); + assert_eq!(read2.to_vec(), data[2000..4000]); + } + + #[tokio::test] + async fn test_chunked_cache_persistence() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone()).with_chunk_size(1024)) + .finish(); + + let data: Vec = (0..5000).map(|i| (i % 256) as u8).collect(); + op.write("persist.bin", data.clone()).await.unwrap(); + + // Read to populate cache + let read1 = op.read_with("persist.bin").range(1000..3000).await.unwrap(); + assert_eq!(read1.to_vec(), data[1000..3000]); + + // Verify cache hit by reading again (overlapping range) + let read2 = op.read_with("persist.bin").range(1500..2500).await.unwrap(); + assert_eq!(read2.to_vec(), data[1500..2500]); + + // Non-overlapping range should fetch new chunks + let read3 = op.read_with("persist.bin").range(4000..5000).await.unwrap(); + assert_eq!(read3.to_vec(), data[4000..5000]); + } + + #[tokio::test] + async fn test_chunked_boundary_reads() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache).with_chunk_size(1024)) + .finish(); + + let data: Vec = (0..4096).map(|i| (i % 256) as u8).collect(); + op.write("boundary.bin", data.clone()).await.unwrap(); + + // Read from exact chunk start + let read1 = op + .read_with("boundary.bin") + .range(1024..1524) + .await + .unwrap(); + assert_eq!(read1.to_vec(), data[1024..1524]); + + // Read to exact chunk end + let read2 = op.read_with("boundary.bin").range(500..1024).await.unwrap(); + assert_eq!(read2.to_vec(), data[500..1024]); + + // Read across chunk boundary + let read3 = op + .read_with("boundary.bin") + .range(1020..1030) + .await + .unwrap(); + assert_eq!(read3.to_vec(), data[1020..1030]); + } + + #[tokio::test] + async fn test_chunked_large_file() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(4 * 1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(32 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache).with_chunk_size(4096)) + .finish(); + + // 100KB file - 25 chunks + let data: Vec = (0..100_000).map(|i| (i % 256) as u8).collect(); + op.write("large.bin", data.clone()).await.unwrap(); + + // Read various sections + let read1 = op + .read_with("large.bin") + .range(10_000..20_000) + .await + .unwrap(); + assert_eq!(read1.len(), 10_000); + assert_eq!(read1.to_vec(), data[10_000..20_000]); + + let read2 = op + .read_with("large.bin") + .range(50_000..60_000) + .await + .unwrap(); + assert_eq!(read2.len(), 10_000); + assert_eq!(read2.to_vec(), data[50_000..60_000]); + + // Read entire file + let read_all = op.read("large.bin").await.unwrap(); + assert_eq!(read_all.len(), 100_000); + assert_eq!(read_all.to_vec(), data); + } + + #[tokio::test] + async fn test_chunked_sequential_reads() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache).with_chunk_size(1024)) + .finish(); + + let data: Vec = (0..10_000).map(|i| (i % 256) as u8).collect(); + op.write("sequential.bin", data.clone()).await.unwrap(); + + // Simulate sequential reading pattern + for offset in (0..10_000).step_by(500) { + let end = (offset + 500).min(data.len()); + let read_data = op + .read_with("sequential.bin") + .range(offset as u64..end as u64) + .await + .unwrap(); + assert_eq!(read_data.to_vec(), data[offset..end]); + } + } + + #[tokio::test] + async fn test_chunked_random_access() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(2 * 1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache).with_chunk_size(1024)) + .finish(); + + let data: Vec = (0..20_000).map(|i| (i % 256) as u8).collect(); + op.write("random.bin", data.clone()).await.unwrap(); + + // Random access pattern + let ranges = [ + (5000, 5500), + (15000, 15200), + (1000, 2000), + (18000, 19000), + (500, 1500), + ]; + + for (start, end) in ranges { + let read_data = op.read_with("random.bin").range(start..end).await.unwrap(); + assert_eq!(read_data.to_vec(), data[start as usize..end as usize]); + } + } + + #[tokio::test] + async fn test_chunked_with_clear_cache() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone()).with_chunk_size(1024)) + .finish(); + + let data = vec![99u8; 5000]; + op.write("clear_test.bin", data.clone()).await.unwrap(); + + // First read to populate cache + let read1 = op + .read_with("clear_test.bin") + .range(1000..2000) + .await + .unwrap(); + assert_eq!(read1.to_vec(), data[1000..2000]); + + // Clear cache + cache.clear().await.unwrap(); + + // Read again - should still work (fetch from backend) + let read2 = op + .read_with("clear_test.bin") + .range(1000..2000) + .await + .unwrap(); + assert_eq!(read2.to_vec(), data[1000..2000]); + } + + #[tokio::test] + async fn test_chunked_single_byte_reads() { + let dir = tempfile::tempdir().unwrap(); + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache).with_chunk_size(1024)) + .finish(); + + let data: Vec = (0..3000).map(|i| (i % 256) as u8).collect(); + op.write("single_byte.bin", data.clone()).await.unwrap(); + + // Read single bytes at various positions + let positions = [0, 1023, 1024, 2047, 2048, 2999]; + for pos in positions { + let read_data = op + .read_with("single_byte.bin") + .range(pos..pos + 1) + .await + .unwrap(); + assert_eq!(read_data.len(), 1); + assert_eq!(read_data.to_vec()[0], data[pos as usize]); + } + } } diff --git a/core/layers/foyer/src/writer.rs b/core/layers/foyer/src/writer.rs index fd505f174232..d74d95ed5604 100644 --- a/core/layers/foyer/src/writer.rs +++ b/core/layers/foyer/src/writer.rs @@ -23,6 +23,7 @@ use opendal_core::Result; use opendal_core::raw::Access; use opendal_core::raw::oio; +use crate::CachedBuffer; use crate::FoyerKey; use crate::FoyerValue; use crate::Inner; @@ -43,22 +44,24 @@ impl Writer { inner: Arc>, size_limit: std::ops::Range, ) -> Self { + // In chunked mode, skip caching entirely + let skip_cache = inner.chunk_size.is_some(); Self { w, buf: oio::QueueBuf::new(), path, inner, size_limit, - skip_cache: false, + skip_cache, } } } impl oio::Write for Writer { async fn write(&mut self, bs: Buffer) -> Result<()> { - if self.size_limit.contains(&(self.buf.len() + bs.len())) { + // Only buffer if not in chunked mode and within size limit + if !self.skip_cache && self.size_limit.contains(&(self.buf.len() + bs.len())) { self.buf.push(bs.clone()); - self.skip_cache = false; } else { self.buf.clear(); self.skip_cache = true; @@ -69,13 +72,14 @@ impl oio::Write for Writer { async fn close(&mut self) -> Result { let buffer = self.buf.clone().collect(); let metadata = self.w.close().await?; + // Only cache in full mode (not chunked mode) if !self.skip_cache { self.inner.cache.insert( - FoyerKey { + FoyerKey::Full { path: self.path.clone(), version: metadata.version().map(|v| v.to_string()), }, - FoyerValue(buffer), + FoyerValue::Buffer(CachedBuffer(buffer)), ); } Ok(metadata)