diff --git a/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/loaderwriter/writebehind/ClusteredWriteBehind.java b/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/loaderwriter/writebehind/ClusteredWriteBehind.java index d2b9b32e29..eab41429da 100644 --- a/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/loaderwriter/writebehind/ClusteredWriteBehind.java +++ b/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/loaderwriter/writebehind/ClusteredWriteBehind.java @@ -75,10 +75,13 @@ private void process(long hash) { operation); try { if (result != null) { + try{ if (result != currentState.get(key) && !(operation instanceof PutOperation)) { - cacheLoaderWriter.write(result.getKey(), result.getValue()); - } + cacheLoaderWriter.write(result.getKey(), result.getValue()); + } currentState.put(key, result.asOperationExpiringAt(result.expirationTime())); + }catch (Exception ignored){ + } } else { if (currentState.get(key) != null && (operation instanceof RemoveOperation || operation instanceof ConditionalRemoveOperation)) { @@ -95,7 +98,6 @@ private void process(long hash) { for (PutOperation operation : currentState.values()) { builder = builder.add(codec.encode(operation)); } - clusteredWriteBehindStore.replaceAtHead(hash, chain, builder.build()); } } finally { diff --git a/clustered/integration-test/src/test/java/org/ehcache/clustered/writebehind/BasicClusteredWriteBehindTest.java b/clustered/integration-test/src/test/java/org/ehcache/clustered/writebehind/BasicClusteredWriteBehindTest.java index e4fdf94940..98f3307e84 100644 --- a/clustered/integration-test/src/test/java/org/ehcache/clustered/writebehind/BasicClusteredWriteBehindTest.java +++ b/clustered/integration-test/src/test/java/org/ehcache/clustered/writebehind/BasicClusteredWriteBehindTest.java @@ -29,13 +29,19 @@ import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; +import java.time.Duration; import java.util.Arrays; +import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import static org.ehcache.testing.StandardCluster.clusterPath; import static org.ehcache.testing.StandardCluster.newCluster; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.notNullValue; +import static org.terracotta.utilities.test.matchers.Eventually.within; public class BasicClusteredWriteBehindTest extends WriteBehindTestBase { @@ -127,4 +133,32 @@ public void testClusteredWriteBehindLoading() throws Exception { doThreadDump = false; } + + @Test(timeout = 120000) + public void testBasicClusteredWriteBehindWithFailure() { + cacheManager.close(); + cacheManager = null; + cacheManager = createCacheManagerWithLoaderWriterWithFailure(CLUSTER.getConnectionURI()); + final Cache localCache = cacheManager.getCache(testName.getMethodName(), Long.class, String.class); + localCache.put(1L, String.valueOf(1)); + localCache.put(2L, String.valueOf(2)); + localCache.put(3L, String.valueOf(3)); + localCache.put(4L, String.valueOf(4)); + + assertThat(() -> localCache.get(1L), within(Duration.ofSeconds(100)).matches(is(nullValue()))); + assertThat(() -> localCache.get(3L), within(Duration.ofSeconds(100)).matches(is(nullValue()))); + + Map> records = getEvenNumberLoaderWriter().getRecords(); + + // Error will be thrown for odd keys, hence only 2 entries are expected here. + assertThat(() -> records.size(), within(Duration.ofSeconds(2)).matches(is(2))); + + // Verify that values with only even entries are present in records. + List keyRecords = records.get(2L); + assertThat(keyRecords.get(0), is("2")); + keyRecords = records.get(4L); + assertThat(keyRecords.get(0), is("4")); + + doThreadDump = false; + } } diff --git a/clustered/integration-test/src/test/java/org/ehcache/clustered/writebehind/EvenNumberLoaderWriter.java b/clustered/integration-test/src/test/java/org/ehcache/clustered/writebehind/EvenNumberLoaderWriter.java new file mode 100644 index 0000000000..0654239954 --- /dev/null +++ b/clustered/integration-test/src/test/java/org/ehcache/clustered/writebehind/EvenNumberLoaderWriter.java @@ -0,0 +1,56 @@ +/* + * Copyright Terracotta, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.ehcache.clustered.writebehind; + +import org.ehcache.spi.loaderwriter.CacheLoaderWriter; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +class EvenNumberLoaderWriter implements CacheLoaderWriter { + + private final Map> records = new HashMap<>(); + + @Override + public synchronized String load(Long key) { + List list = records.get(key); + return list == null ? null : list.get(list.size() - 1); + } + + @Override + public synchronized void write(Long key, String value) throws Exception { + if (Integer.parseInt(key.toString()) % 2 != 0) { + throw new RuntimeException("Only even keys can be inserted"); + } + record(key, value); + } + + @Override + public void delete(Long key) { + } + + private void record(Long key, String value) { + records.computeIfAbsent(key, k -> new ArrayList<>()).add(value); + } + + synchronized Map> getRecords() { + return Collections.unmodifiableMap(records); + } +} diff --git a/clustered/integration-test/src/test/java/org/ehcache/clustered/writebehind/WriteBehindTestBase.java b/clustered/integration-test/src/test/java/org/ehcache/clustered/writebehind/WriteBehindTestBase.java index 894a892d66..b8a7d44f4b 100644 --- a/clustered/integration-test/src/test/java/org/ehcache/clustered/writebehind/WriteBehindTestBase.java +++ b/clustered/integration-test/src/test/java/org/ehcache/clustered/writebehind/WriteBehindTestBase.java @@ -29,6 +29,7 @@ import org.ehcache.config.units.EntryUnit; import org.ehcache.config.units.MemoryUnit; import org.ehcache.core.internal.resilience.ThrowingResilienceStrategy; +import org.ehcache.spi.loaderwriter.CacheLoaderWriter; import org.junit.Before; import org.junit.Rule; import org.junit.rules.TestName; @@ -56,6 +57,7 @@ public class WriteBehindTestBase { public final TestName testName = new TestName(); private RecordingLoaderWriter loaderWriter; + private EvenNumberLoaderWriter evenNumberLoaderWriter; @Before public void setUp() throws Exception { @@ -85,12 +87,16 @@ void assertValue(Cache cache, String value) { } PersistentCacheManager createCacheManager(URI clusterUri) { + return getPersistentCacheManager(clusterUri, loaderWriter); + } + + private PersistentCacheManager getPersistentCacheManager(URI clusterUri, CacheLoaderWriter localLoaderWriter) { CacheConfiguration cacheConfiguration = newCacheConfigurationBuilder(Long.class, String.class, ResourcePoolsBuilder.newResourcePoolsBuilder() - .heap(10, EntryUnit.ENTRIES) - .offheap(1, MemoryUnit.MB) - .with(ClusteredResourcePoolBuilder.clusteredDedicated("primary-server-resource", 2, MemoryUnit.MB))) - .withLoaderWriter(loaderWriter) + .heap(10, EntryUnit.ENTRIES) + .offheap(1, MemoryUnit.MB) + .with(ClusteredResourcePoolBuilder.clusteredDedicated("primary-server-resource", 2, MemoryUnit.MB))) + .withLoaderWriter(localLoaderWriter) .withService(WriteBehindConfigurationBuilder.newUnBatchedWriteBehindConfiguration()) .withResilienceStrategy(new ThrowingResilienceStrategy<>()) .withService(new ClusteredStoreConfiguration(Consistency.STRONG)) @@ -102,4 +108,13 @@ PersistentCacheManager createCacheManager(URI clusterUri) { .withCache(testName.getMethodName(), cacheConfiguration) .build(true); } + + PersistentCacheManager createCacheManagerWithLoaderWriterWithFailure(URI clusterUri) { + evenNumberLoaderWriter = new EvenNumberLoaderWriter(); + return getPersistentCacheManager(clusterUri, evenNumberLoaderWriter); + } + + protected EvenNumberLoaderWriter getEvenNumberLoaderWriter() { + return this.evenNumberLoaderWriter; + } }