diff --git a/src/shard.rs b/src/shard.rs index 774bb9b..2bdb3bc 100644 --- a/src/shard.rs +++ b/src/shard.rs @@ -1050,10 +1050,16 @@ impl< Ok(()) } + /// Upserts a placeholder, optionally validating existing resident values + /// + /// Returns: + /// - `Ok((token, value))` if a valid resident was found + /// - `Err((placeholder, is_new))` where `is_new` indicates if this is a newly created placeholder pub fn upsert_placeholder( &mut self, hash: u64, key: &Q, + validator: &mut impl FnMut(&Val) -> bool, ) -> Result<(Token, &Val), (Plh, bool)> where Q: Hash + Equivalent + ToOwned + ?Sized, @@ -1063,6 +1069,40 @@ impl< let (entry, _) = self.entries.get_mut(idx).unwrap(); match entry { Entry::Resident(resident) => { + if !validator(&resident.value) { + let old_state = resident.state; + let old_key = &resident.key; + let old_value = &resident.value; + let weight = self.weighter.weight(old_key, old_value); + + let shared = Plh::new(hash, idx); + *entry = Entry::Placeholder(Placeholder { + key: key.to_owned(), + hot: old_state, + shared: shared.clone(), + }); + + match old_state { + ResidentState::Hot => { + self.num_hot -= 1; + self.weight_hot -= weight; + if weight != 0 { + self.hot_head = self.entries.unlink(idx); + } + } + ResidentState::Cold => { + self.num_cold -= 1; + self.weight_cold -= weight; + if weight != 0 { + self.cold_head = self.entries.unlink(idx); + } + } + } + + record_miss_mut!(self); + return Err((shared, false)); // false = replaced existing + } + if *resident.referenced.get_mut() < MAX_F { *resident.referenced.get_mut() += 1; } diff --git a/src/sync.rs b/src/sync.rs index ae96daa..6ce6c74 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -435,14 +435,43 @@ impl< &'a self, key: &Q, ) -> Result> + where + Q: Hash + Equivalent + ToOwned + ?Sized, + { + self.get_validate_value_or_guard_async(key, |_| true).await + } + + /// Gets an item from the cache with key `key`, applying `validation` to determine whether + /// the value is 'live'. + /// + /// If the corresponding value isn't present in the cache or fails validation, this functions + /// returns a guard that can be used to insert the value once it's computed. + /// While the returned guard is alive, other calls with the same key using the + /// `get_value_guard` or `get_or_insert` family of functions will wait until the guard + /// is dropped or the value is inserted. + pub async fn get_validate_value_or_guard_async<'a, Q>( + &'a self, + key: &Q, + mut validation: impl FnMut(&Val) -> bool + Unpin, + ) -> Result> where Q: Hash + Equivalent + ToOwned + ?Sized, { let (shard, hash) = self.shard_for(key).unwrap(); - if let Some(v) = shard.read().get(hash, key) { - return Ok(v.clone()); + + // Try fast path with read lock first + { + let reader = shard.read(); + if let Some(v) = reader.get(hash, key) { + if validation(v) { + return Ok(v.clone()); + } + // Validation failed, fall through to JoinFuture + } + // No entry found or validation failed, let JoinFuture handle everything } - JoinFuture::new(&self.lifecycle, shard, hash, key).await + + JoinFuture::new(&self.lifecycle, shard, hash, key, validation).await } /// Gets or inserts an item in the cache with key `key`. diff --git a/src/sync_placeholder.rs b/src/sync_placeholder.rs index de1c162..680a450 100644 --- a/src/sync_placeholder.rs +++ b/src/sync_placeholder.rs @@ -224,7 +224,7 @@ impl< Q: Hash + Equivalent + ToOwned + ?Sized, { let mut shard_guard = shard.write(); - let shared = match shard_guard.upsert_placeholder(hash, key) { + let shared = match shard_guard.upsert_placeholder(hash, key, &mut |_| true) { Ok((_, v)) => return GuardResult::Value(v.clone()), Err((shared, true)) => { return GuardResult::Guard(Self::start_loading(lifecycle, shard, shared)); @@ -413,11 +413,12 @@ impl std::fmt::Debug for PlaceholderGuard<'_, Key, Val, We, } /// Future that results in an Ok(Value) or Err(Guard) -pub struct JoinFuture<'a, 'b, Q: ?Sized, Key, Val, We, B, L> { +pub struct JoinFuture<'a, 'b, Q: ?Sized, Key, Val, We, B, L, F: FnMut(&'a Val) -> bool> { lifecycle: &'a L, shard: &'a RwLock>>, state: JoinFutureState<'b, Q, Val>, notified: AtomicBool, + validation: F, } enum JoinFutureState<'b, Q: ?Sized, Val> { @@ -432,18 +433,22 @@ enum JoinFutureState<'b, Q: ?Sized, Val> { Done, } -impl<'a, 'b, Q: ?Sized, Key, Val, We, B, L> JoinFuture<'a, 'b, Q, Key, Val, We, B, L> { +impl<'a, 'b, Q: ?Sized, Key, Val, We, B, L, F: FnMut(&'a Val) -> bool> + JoinFuture<'a, 'b, Q, Key, Val, We, B, L, F> +{ pub fn new( lifecycle: &'a L, shard: &'a RwLock>>, hash: u64, key: &'b Q, - ) -> JoinFuture<'a, 'b, Q, Key, Val, We, B, L> { + validation: F, + ) -> JoinFuture<'a, 'b, Q, Key, Val, We, B, L, F> { Self { lifecycle, shard, state: JoinFutureState::Created { hash, key }, notified: Default::default(), + validation, } } @@ -480,7 +485,10 @@ impl<'a, 'b, Q: ?Sized, Key, Val, We, B, L> JoinFuture<'a, 'b, Q, Key, Val, We, } } -impl Drop for JoinFuture<'_, '_, Q, Key, Val, We, B, L> { +impl<'a, Q: ?Sized, Key, Val, We, B, L, F> Drop for JoinFuture<'a, '_, Q, Key, Val, We, B, L, F> +where + F: FnMut(&'a Val) -> bool, +{ #[inline] fn drop(&mut self) { if matches!(self.state, JoinFutureState::Pending { .. }) { @@ -497,7 +505,8 @@ impl< We: Weighter, B: BuildHasher, L: Lifecycle, - > Future for JoinFuture<'a, '_, Q, Key, Val, We, B, L> + F: FnMut(&Val) -> bool + Unpin, + > Future for JoinFuture<'a, '_, Q, Key, Val, We, B, L, F> { type Output = Result>; @@ -509,7 +518,7 @@ impl< JoinFutureState::Created { hash, key } => { debug_assert!(!this.notified.load(Ordering::Acquire)); let mut shard_guard = shard.write(); - match shard_guard.upsert_placeholder(*hash, *key) { + match shard_guard.upsert_placeholder(*hash, *key, &mut this.validation) { Ok((_, v)) => { this.state = JoinFutureState::Done; Poll::Ready(Ok(v.clone())) diff --git a/src/unsync.rs b/src/unsync.rs index f65cc14..b2929f6 100644 --- a/src/unsync.rs +++ b/src/unsync.rs @@ -249,7 +249,10 @@ impl, B: BuildHasher, L: Lifecycle + ToOwned + ?Sized, { - let idx = match self.shard.upsert_placeholder(self.shard.hash(key), key) { + let idx = match self + .shard + .upsert_placeholder(self.shard.hash(key), key, &mut |_| true) + { Ok((idx, _)) => idx, Err((plh, _)) => { let v = with()?; @@ -275,7 +278,10 @@ impl, B: BuildHasher, L: Lifecycle + ToOwned + ?Sized, { - let idx = match self.shard.upsert_placeholder(self.shard.hash(key), key) { + let idx = match self + .shard + .upsert_placeholder(self.shard.hash(key), key, &mut |_| true) + { Ok((idx, _)) => idx, Err((plh, _)) => { let v = with()?; @@ -297,7 +303,10 @@ impl, B: BuildHasher, L: Lifecycle + ToOwned + ?Sized, { // TODO: this could be using a simpler entry API - match self.shard.upsert_placeholder(self.shard.hash(key), key) { + match self + .shard + .upsert_placeholder(self.shard.hash(key), key, &mut |_| true) + { Ok((_, v)) => unsafe { // Rustc gets insanely confused about returning from mut borrows // Safety: v has the same lifetime as self @@ -323,7 +332,10 @@ impl, B: BuildHasher, L: Lifecycle + ToOwned + ?Sized, { // TODO: this could be using a simpler entry API - match self.shard.upsert_placeholder(self.shard.hash(key), key) { + match self + .shard + .upsert_placeholder(self.shard.hash(key), key, &mut |_| true) + { Ok((idx, _)) => Ok(self.shard.peek_token_mut(idx).map(RefMut)), Err((placeholder, _)) => Err(Guard { cache: self,