From 5121ec19172b03a1330176921dd97fa24c33a254 Mon Sep 17 00:00:00 2001 From: Swarali Joshi Date: Tue, 16 Dec 2025 11:08:00 +0530 Subject: [PATCH] HBASE-29348 Fixed NPE issue due to HStoreFile getting closed during race condition --- .../regionserver/DefaultStoreFileManager.java | 27 ++++- .../hadoop/hbase/regionserver/HStoreFile.java | 12 +++ .../hadoop/hbase/regionserver/TestHStore.java | 98 +++++++++++++++++++ 3 files changed, 134 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java index 920a490daa2a..7832e47568cf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java @@ -179,9 +179,30 @@ public void addCompactionResults(Collection newCompactedfiles, // Let a background thread close the actual reader on these compacted files and also // ensure to evict the blocks from block cache so that they are no longer in // cache - newCompactedfiles.forEach(HStoreFile::markCompactedAway); - compactedfiles = ImmutableList.sortedCopyOf(storeFileComparator, - Iterables.concat(compactedfiles, newCompactedfiles)); + List filesToClose = new ArrayList<>(newCompactedfiles); + try { + HStoreFile.increaseStoreFilesRefeCount(newCompactedfiles); + newCompactedfiles.forEach(hStoreFile -> { + StoreFileReader reader = hStoreFile.getReader(); + try { + if (reader == null) { + hStoreFile.initReader(); + } else { + filesToClose.remove(hStoreFile); + } + } catch (IOException e) { + LOG.warn("Couldn't initialize reader for " + hStoreFile, e); + throw new RuntimeException(e); + } finally { + hStoreFile.markCompactedAway(); + } + }); + compactedfiles = ImmutableList.sortedCopyOf(storeFileComparator, + Iterables.concat(compactedfiles, newCompactedfiles)); + } finally { + HStoreFile.decreaseStoreFilesRefeCount(newCompactedfiles); + filesToClose.forEach(HStoreFile::closeStoreFile); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java index a7df71f460e4..111669dcef91 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java @@ -591,6 +591,18 @@ public synchronized void closeStoreFile(boolean evictOnClose) throws IOException } } + public synchronized void closeStoreFile() { + try { + boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true; + if (this.initialReader != null) { + this.initialReader.close(evictOnClose); + this.initialReader = null; + } + } catch (IOException e) { + LOG.warn("failed to close reader", e); + } + } + /** * Delete this file */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 179297bd873f..713f1640d0e5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -95,6 +95,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.Scan; @@ -1775,6 +1776,103 @@ public void testReclaimChunkWhenScaning() throws IOException { } } + + @Test + public void testReaderIsClosedOnlyAfterCompactionComplete() throws Exception { + HBaseTestingUtility util = new HBaseTestingUtility(); + util.startMiniCluster(1); + + try { + Configuration conf = util.getConfiguration(); + + byte[] FAMILY = Bytes.toBytes("f"); + byte[] Q = Bytes.toBytes("q"); + TableName TABLE = TableName.valueOf("race_test"); + + TableDescriptor td = TableDescriptorBuilder.newBuilder(TABLE) + .setColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(FAMILY) + .setMaxVersions(1) + .build()) + .build(); + + util.getAdmin().createTable(td); + + HRegion region = + util.getMiniHBaseCluster().getRegions(TABLE).get(0); + + HStore store = region.getStore(FAMILY); + + for (int i = 0; i < 4; i++) { + Put p = new Put(Bytes.toBytes("row-" + i)); + p.addColumn(FAMILY, Q, Bytes.toBytes(i)); + region.put(p); + region.flush(true); // force store file + } + + // sanity check + assertEquals(4, store.getStorefilesCount()); + + DefaultStoreFileManager sfm = + (DefaultStoreFileManager) store.getStoreEngine().getStoreFileManager(); + + HStoreFile victim = + sfm.getStoreFiles().iterator().next(); + + AtomicReference failure = new AtomicReference<>(); + CountDownLatch start = new CountDownLatch(1); + CountDownLatch done = new CountDownLatch(2); + + Thread remover = new Thread(() -> { + try { + start.await(); + victim.closeStoreFile(true); // async-style close + } catch (Throwable t) { + failure.set(t); + } finally { + done.countDown(); + } + }); + + Thread adder = new Thread(() -> { + try { + start.await(); + sfm.addCompactionResults( + Collections.singletonList(victim), + new ArrayList<>()); + } catch (Throwable t) { + failure.set(t); + } finally { + done.countDown(); + } + }); + + remover.start(); + adder.start(); + start.countDown(); + + assertTrue(done.await(60, TimeUnit.SECONDS)); + + if (failure.get() != null) { + throw new AssertionError( + "Race caused failure (this is the bug you fixed)", + failure.get()); + } + + Collection finalFiles = sfm.getStoreFiles(); + + assertFalse("Old file must be gone", + finalFiles.contains(victim)); + + assertEquals("File count stable after compaction", + 3, finalFiles.size()); + + } finally { + util.shutdownMiniCluster(); + } + } + + /** * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable may change the * versionedList. And the first InMemoryFlushRunnable will use the chagned versionedList to remove