From e33e22c48e5bccafe481d778d8b323c974370a58 Mon Sep 17 00:00:00 2001 From: jiucheng Date: Mon, 12 Jan 2026 19:50:05 +0800 Subject: [PATCH 1/2] fix spark-2 --- .../spark/shuffle/celeborn/HashBasedShuffleWriter.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java index 7121ccdf954..fe2578fb270 100644 --- a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java +++ b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java @@ -54,7 +54,7 @@ import org.apache.celeborn.common.CelebornConf; @Private -public class HashBasedShuffleWriter extends ShuffleWriter { +public class HashBasedShuffleWriter extends ShuffeWriter { private static final Logger logger = LoggerFactory.getLogger(HashBasedShuffleWriter.class); @@ -332,11 +332,6 @@ private void cleanupPusher() throws IOException { } private void close() throws IOException, InterruptedException { - // here we wait for all the in-flight batches to return which sent by dataPusher thread - dataPusher.waitOnTermination(); - sendBufferPool.returnPushTaskQueue(dataPusher.getAndResetIdleQueue()); - shuffleClient.prepareForMergeData(shuffleId, mapId, encodedAttemptId); - // merge and push residual data to reduce network traffic // NB: since dataPusher thread have no in-flight data at this point, // we now push merged data by task thread will not introduce any contention @@ -369,6 +364,8 @@ private void close() throws IOException, InterruptedException { sendOffsets = null; long waitStartTime = System.nanoTime(); + dataPusher.waitOnTermination(); + sendBufferPool.returnPushTaskQueue(dataPusher.getAndResetIdleQueue()); shuffleClient.mapperEnd(shuffleId, mapId, encodedAttemptId, numMappers, numPartitions); writeMetrics.incWriteTime(System.nanoTime() - waitStartTime); From 4e909f963eab1181faa1a1c3ccdb4889aa6afc0b Mon Sep 17 00:00:00 2001 From: jiucheng Date: Mon, 12 Jan 2026 19:56:37 +0800 Subject: [PATCH 2/2] fix --- .../apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java index fe2578fb270..de7d00b4e47 100644 --- a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java +++ b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java @@ -54,7 +54,7 @@ import org.apache.celeborn.common.CelebornConf; @Private -public class HashBasedShuffleWriter extends ShuffeWriter { +public class HashBasedShuffleWriter extends ShuffleWriter { private static final Logger logger = LoggerFactory.getLogger(HashBasedShuffleWriter.class);