From fcb7da65474b276d96b2540a6c43b9a7a1e6a318 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Mon, 27 Oct 2025 16:28:54 +0300 Subject: [PATCH 1/7] Basic memory + disk spilling --- .../src/spill/in_memory_spill_buffer.rs | 47 ++++++++++++++ datafusion/physical-plan/src/spill/mod.rs | 65 ++++++++++++++++++- .../physical-plan/src/spill/spill_manager.rs | 58 ++++++++++++++++- 3 files changed, 167 insertions(+), 3 deletions(-) create mode 100644 datafusion/physical-plan/src/spill/in_memory_spill_buffer.rs diff --git a/datafusion/physical-plan/src/spill/in_memory_spill_buffer.rs b/datafusion/physical-plan/src/spill/in_memory_spill_buffer.rs new file mode 100644 index 000000000000..81c8e594af09 --- /dev/null +++ b/datafusion/physical-plan/src/spill/in_memory_spill_buffer.rs @@ -0,0 +1,47 @@ +use crate::memory::MemoryStream; +use crate::spill::spill_manager::GetSlicedSize; +use arrow::array::RecordBatch; +use datafusion_common::Result; +use datafusion_execution::SendableRecordBatchStream; +use std::sync::Arc; + +#[derive(Debug)] +pub struct InMemorySpillBuffer { + batches: Vec, + total_bytes: usize, +} + +impl InMemorySpillBuffer { + pub fn from_batch(batch: &RecordBatch) -> Result { + Ok(Self { + batches: vec![batch.clone()], + total_bytes: batch.get_sliced_size()?, + }) + } + + pub fn from_batches(batches: &[RecordBatch]) -> Result { + let mut total_bytes = 0; + let mut owned = Vec::with_capacity(batches.len()); + for b in batches { + total_bytes += b.get_sliced_size()?; + owned.push(b.clone()); + } + Ok(Self { + batches: owned, + total_bytes, + }) + } + + /// return FIFO stream of batches + pub fn as_stream( + self: Arc, + schema: Arc, + ) -> Result { + let stream = MemoryStream::try_new(self.batches.clone(), schema, None)?; + Ok(Box::pin(stream)) + } + + pub fn size(&self) -> usize { + self.total_bytes + } +} diff --git a/datafusion/physical-plan/src/spill/mod.rs b/datafusion/physical-plan/src/spill/mod.rs index fab62bff840f..270b3654b2ba 100644 --- a/datafusion/physical-plan/src/spill/mod.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -19,6 +19,7 @@ pub(crate) mod in_progress_spill_file; pub(crate) mod spill_manager; +pub(crate) mod in_memory_spill_buffer; use std::fs::File; use std::io::BufReader; @@ -376,17 +377,18 @@ mod tests { use crate::common::collect; use crate::metrics::ExecutionPlanMetricsSet; use crate::metrics::SpillMetrics; - use crate::spill::spill_manager::SpillManager; + use crate::spill::spill_manager::{SpillLocation, SpillManager}; use crate::test::build_table_i32; use arrow::array::{ArrayRef, Float64Array, Int32Array, ListArray, StringArray}; use arrow::compute::cast; use arrow::datatypes::{DataType, Field, Int32Type, Schema}; use arrow::record_batch::RecordBatch; use datafusion_common::Result; - use datafusion_execution::runtime_env::RuntimeEnv; + use datafusion_execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder}; use futures::StreamExt as _; use std::sync::Arc; + use datafusion_execution::memory_pool::{FairSpillPool, MemoryPool}; #[tokio::test] async fn test_batch_spill_and_read() -> Result<()> { @@ -426,6 +428,65 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_batch_spill_to_memory_and_disk_and_read() -> Result<()> { + let schema: SchemaRef = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let batch1 = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from_iter_values(0..1000)), + Arc::new(Int32Array::from_iter_values(1000..2000)), + ], + )?; + + let batch2 = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from_iter_values(2000..4000)), + Arc::new(Int32Array::from_iter_values(4000..6000)), + ], + )?; + + let num_rows = batch1.num_rows() + batch2.num_rows(); + let batches = vec![batch1, batch2]; + + // --- create small memory pool (simulate memory pressure) --- + let memory_limit_bytes = 20 * 1024; // 20KB + let memory_pool: Arc = Arc::new(FairSpillPool::new(memory_limit_bytes)); + + + // Construct SpillManager + let env = RuntimeEnvBuilder::new() + .with_memory_pool(memory_pool) + .build_arc()?; + let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let spill_manager = SpillManager::new(env, metrics, Arc::clone(&schema)); + + let results = spill_manager.spill_batches_auto(&batches, "TestAutoSpill")?; + assert_eq!(results.len(), 2); + + let mem_count = results.iter().filter(|r| matches!(r, SpillLocation::Memory(_))).count(); + let disk_count = results.iter().filter(|r| matches!(r, SpillLocation::Disk(_))).count(); + assert!(mem_count >= 1); + assert!(disk_count >= 1); + + let spilled_rows = spill_manager.metrics.spilled_rows.value(); + assert_eq!(spilled_rows, num_rows); + + for spill in results { + let stream = spill_manager.load_spilled_batch(spill)?; + let collected = collect(stream).await?; + assert!(!collected.is_empty()); + assert_eq!(collected[0].schema(), schema); + } + + Ok(()) + } + #[tokio::test] async fn test_batch_spill_and_read_dictionary_arrays() -> Result<()> { // See https://github.com/apache/datafusion/issues/4658 diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs b/datafusion/physical-plan/src/spill/spill_manager.rs index ad23bd66a021..1ac6d072f45d 100644 --- a/datafusion/physical-plan/src/spill/spill_manager.rs +++ b/datafusion/physical-plan/src/spill/spill_manager.rs @@ -21,15 +21,17 @@ use arrow::array::StringViewArray; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_execution::runtime_env::RuntimeEnv; +use std::slice; use std::sync::Arc; -use datafusion_common::{config::SpillCompression, Result}; +use datafusion_common::{config::SpillCompression, DataFusionError, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::SendableRecordBatchStream; use super::{in_progress_spill_file::InProgressSpillFile, SpillReaderStream}; use crate::coop::cooperative; use crate::{common::spawn_buffered, metrics::SpillMetrics}; +use crate::spill::in_memory_spill_buffer::InMemorySpillBuffer; /// The `SpillManager` is responsible for the following tasks: /// - Reading and writing `RecordBatch`es to raw files based on the provided configurations. @@ -168,6 +170,43 @@ impl SpillManager { Ok(file.map(|f| (f, max_record_batch_size))) } + pub(crate) fn spill_batch_auto(&self, batch: &RecordBatch, request_msg: &str) -> Result { + let size = batch.get_sliced_size()?; + + // check pool limit + let used = self.env.memory_pool.reserved(); + let limit = match self.env.memory_pool.memory_limit() { + datafusion_execution::memory_pool::MemoryLimit::Finite(l) => l, + _ => usize::MAX, + }; + + if used + size * 3 / 2 <= limit { + let buf = Arc::new(InMemorySpillBuffer::from_batch(batch)?); + self.metrics.spilled_bytes.add(size); + self.metrics.spilled_rows.add(batch.num_rows()); + Ok(SpillLocation::Memory(buf)) + } else { + let Some(file) = self.spill_record_batch_and_finish(slice::from_ref(batch), request_msg)? else { + return Err(DataFusionError::Execution( + "failed to spill batch to disk".into(), + )); + }; + Ok(SpillLocation::Disk(file)) + } + } + + pub fn spill_batches_auto( + &self, + batches: &[RecordBatch], + request_msg: &str, + ) -> Result> { + let mut result = Vec::with_capacity(batches.len()); + for batch in batches { + result.push(self.spill_batch_auto(batch, request_msg)?); + } + Ok(result) + } + /// Reads a spill file as a stream. The file must be created by the current `SpillManager`. /// This method will generate output in FIFO order: the batch appended first /// will be read first. @@ -182,8 +221,25 @@ impl SpillManager { Ok(spawn_buffered(stream, self.batch_read_buffer_capacity)) } + + pub fn load_spilled_batch( + &self, + spill: SpillLocation, + ) -> Result { + match spill { + SpillLocation::Memory(buf) => Ok(buf.as_stream(Arc::clone(&self.schema))?), + SpillLocation::Disk(file) => self.read_spill_as_stream(file), + } + } +} + +#[derive(Debug)] +pub enum SpillLocation { + Memory(Arc), + Disk(RefCountedTempFile), } + pub(crate) trait GetSlicedSize { /// Returns the size of the `RecordBatch` when sliced. /// Note: if multiple arrays or even a single array share the same data buffers, we may double count each buffer. From e9441e209334b2eaea468d6a6c8d51a77dae3d47 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Mon, 27 Oct 2025 16:29:27 +0300 Subject: [PATCH 2/7] Basic memory + disk spilling --- datafusion/physical-plan/src/spill/in_memory_spill_buffer.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/physical-plan/src/spill/in_memory_spill_buffer.rs b/datafusion/physical-plan/src/spill/in_memory_spill_buffer.rs index 81c8e594af09..bba0f6f95625 100644 --- a/datafusion/physical-plan/src/spill/in_memory_spill_buffer.rs +++ b/datafusion/physical-plan/src/spill/in_memory_spill_buffer.rs @@ -32,7 +32,6 @@ impl InMemorySpillBuffer { }) } - /// return FIFO stream of batches pub fn as_stream( self: Arc, schema: Arc, From c1d66ff83ebe9adf46b964b800ec898faddfafa4 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Mon, 27 Oct 2025 16:31:54 +0300 Subject: [PATCH 3/7] Basic memory + disk spilling --- datafusion/physical-plan/src/spill/spill_manager.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs b/datafusion/physical-plan/src/spill/spill_manager.rs index 1ac6d072f45d..e8f5037f764b 100644 --- a/datafusion/physical-plan/src/spill/spill_manager.rs +++ b/datafusion/physical-plan/src/spill/spill_manager.rs @@ -170,22 +170,26 @@ impl SpillManager { Ok(file.map(|f| (f, max_record_batch_size))) } + /// Automatically decides whether to spill the given RecordBatch to memory or disk, + /// depending on available memory pool capacity. pub(crate) fn spill_batch_auto(&self, batch: &RecordBatch, request_msg: &str) -> Result { let size = batch.get_sliced_size()?; - // check pool limit + // Check current memory usage and total limit from the runtime memory pool let used = self.env.memory_pool.reserved(); let limit = match self.env.memory_pool.memory_limit() { datafusion_execution::memory_pool::MemoryLimit::Finite(l) => l, _ => usize::MAX, }; + // If there's enough memory (with a small safety margin), keep it in memory if used + size * 3 / 2 <= limit { let buf = Arc::new(InMemorySpillBuffer::from_batch(batch)?); self.metrics.spilled_bytes.add(size); self.metrics.spilled_rows.add(batch.num_rows()); Ok(SpillLocation::Memory(buf)) } else { + // Otherwise spill to disk using the existing SpillManager logic let Some(file) = self.spill_record_batch_and_finish(slice::from_ref(batch), request_msg)? else { return Err(DataFusionError::Execution( "failed to spill batch to disk".into(), From 5712df8a018fcee23d3e46c52149594f34ac1016 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Tue, 4 Nov 2025 01:01:10 +0300 Subject: [PATCH 4/7] DRAFT GraceHashJoin with disk spilling --- datafusion/execution/src/disk_manager.rs | 13 +- .../src/joins/grace_hash_join/exec.rs | 1219 +++++++++++++++++ .../src/joins/grace_hash_join/mod.rs | 23 + .../src/joins/grace_hash_join/stream.rs | 406 ++++++ .../physical-plan/src/joins/hash_join/mod.rs | 2 +- datafusion/physical-plan/src/joins/mod.rs | 2 + datafusion/physical-plan/src/joins/utils.rs | 4 +- datafusion/physical-plan/src/spill/mod.rs | 2 +- .../physical-plan/src/spill/spill_manager.rs | 75 +- 9 files changed, 1713 insertions(+), 33 deletions(-) create mode 100644 datafusion/physical-plan/src/joins/grace_hash_join/exec.rs create mode 100644 datafusion/physical-plan/src/joins/grace_hash_join/mod.rs create mode 100644 datafusion/physical-plan/src/joins/grace_hash_join/stream.rs diff --git a/datafusion/execution/src/disk_manager.rs b/datafusion/execution/src/disk_manager.rs index 82f2d75ac1b5..67251088af12 100644 --- a/datafusion/execution/src/disk_manager.rs +++ b/datafusion/execution/src/disk_manager.rs @@ -26,7 +26,7 @@ use rand::{rng, Rng}; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use tempfile::{Builder, NamedTempFile, TempDir}; +use tempfile::{Builder, NamedTempFile, TempDir, TempPath}; use crate::memory_pool::human_readable_size; @@ -370,6 +370,17 @@ impl RefCountedTempFile { pub fn current_disk_usage(&self) -> u64 { self.current_file_disk_usage } + + pub fn clone_refcounted(&self) -> Result { + let reopened = std::fs::File::open(self.path())?; + let temp_path = TempPath::from_path(self.path()); + Ok(Self { + _parent_temp_dir: Arc::clone(&self._parent_temp_dir), + tempfile: NamedTempFile::from_parts(reopened, temp_path), + current_file_disk_usage: self.current_file_disk_usage, + disk_manager: Arc::clone(&self.disk_manager), + }) + } } /// When the temporary file is dropped, subtract its disk usage from the disk manager's total diff --git a/datafusion/physical-plan/src/joins/grace_hash_join/exec.rs b/datafusion/physical-plan/src/joins/grace_hash_join/exec.rs new file mode 100644 index 000000000000..e25407c084bb --- /dev/null +++ b/datafusion/physical-plan/src/joins/grace_hash_join/exec.rs @@ -0,0 +1,1219 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt; +use std::mem::size_of; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, OnceLock}; +use std::{any::Any, vec}; +use std::fmt::{format, Formatter}; +use crate::execution_plan::{boundedness_from_children, EmissionType}; +use crate::filter_pushdown::{ + ChildPushdownResult, FilterDescription, FilterPushdownPhase, + FilterPushdownPropagation, +}; +use crate::joins::utils::{ + asymmetric_join_output_partitioning, reorder_output_after_swap, swap_join_projection, + update_hash, OnceAsync, OnceFut, +}; +use crate::joins::{JoinOn, JoinOnRef, PartitionMode, SharedBitmapBuilder}; +use crate::projection::{ + try_embed_projection, try_pushdown_through_join, EmbeddedProjection, JoinData, + ProjectionExec, +}; +use crate::spill::get_record_batch_memory_size; +use crate::{displayable, ExecutionPlanProperties, SpillManager}; +use crate::{ + common::can_project, + joins::utils::{ + build_join_schema, check_join_is_valid, estimate_join_statistics, + need_produce_result_in_final, symmetric_join_output_partitioning, + BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType, + }, + metrics::{ExecutionPlanMetricsSet, MetricsSet}, + DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + PlanProperties, SendableRecordBatchStream, Statistics, +}; + +use arrow::array::{ArrayRef, BooleanBufferBuilder, UInt32Array}; +use arrow::compute::{concat, concat_batches, take}; +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use arrow::util::bit_util; +use arrow_schema::DataType; +use datafusion_common::config::ConfigOptions; +use datafusion_common::utils::memory::estimate_memory_size; +use datafusion_common::{internal_datafusion_err, internal_err, plan_err, project_schema, DataFusionError, JoinSide, JoinType, NullEquality, Result}; +use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; +use datafusion_execution::TaskContext; +use datafusion_expr::{Accumulator, UserDefinedLogicalNode}; +use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator}; +use datafusion_physical_expr::equivalence::{ + join_equivalence_properties, ProjectionMapping, +}; +use datafusion_physical_expr::expressions::{lit, DynamicFilterPhysicalExpr}; +use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; + +use ahash::RandomState; +use arrow_ord::partition::partition; +use datafusion_physical_expr_common::physical_expr::fmt_sql; +use futures::{StreamExt, TryStreamExt}; +use futures::executor::block_on; +use parking_lot::Mutex; +use datafusion_common::hash_utils::create_hashes; +use datafusion_execution::runtime_env::RuntimeEnv; +use crate::empty::EmptyExec; +use crate::joins::grace_hash_join::stream::{GraceAccumulator, GraceHashJoinStream, SpillFut}; +use crate::joins::hash_join::shared_bounds::SharedBoundsAccumulator; +use crate::metrics::SpillMetrics; +use crate::spill::spill_manager::{GetSlicedSize, SpillLocation}; + +/// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions. +const HASH_JOIN_SEED: RandomState = + RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64); + +pub struct GraceHashJoinExec { + /// left (build) side which gets hashed + pub left: Arc, + /// right (probe) side which are filtered by the hash table + pub right: Arc, + /// Set of equijoin columns from the relations: `(left_col, right_col)` + pub on: Vec<(PhysicalExprRef, PhysicalExprRef)>, + /// Filters which are applied while finding matching rows + pub filter: Option, + /// How the join is performed (`OUTER`, `INNER`, etc) + pub join_type: JoinType, + /// The schema after join. Please be careful when using this schema, + /// if there is a projection, the schema isn't the same as the output schema. + join_schema: SchemaRef, + /// Shared the `RandomState` for the hashing algorithm + random_state: RandomState, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, + /// The projection indices of the columns in the output schema of join + pub projection: Option>, + /// Information of index and left / right placement of columns + column_indices: Vec, + /// The equality null-handling behavior of the join algorithm. + pub null_equality: NullEquality, + /// Cache holding plan properties like equivalences, output partitioning etc. + cache: PlanProperties, + /// Dynamic filter for pushing down to the probe side + /// Set when dynamic filter pushdown is detected in handle_child_pushdown_result. + /// HashJoinExec also needs to keep a shared bounds accumulator for coordinating updates. + dynamic_filter: Option, + accumulator: Arc, + spill_left: Arc, + spill_right: Arc, +} + +#[derive(Clone)] +struct HashJoinExecDynamicFilter { + /// Dynamic filter that we'll update with the results of the build side once that is done. + filter: Arc, + /// Bounds accumulator to keep track of the min/max bounds on the join keys for each partition. + /// It is lazily initialized during execution to make sure we use the actual execution time partition counts. + bounds_accumulator: OnceLock>, +} + +impl fmt::Debug for GraceHashJoinExec { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("HashJoinExec") + .field("left", &self.left) + .field("right", &self.right) + .field("on", &self.on) + .field("filter", &self.filter) + .field("join_type", &self.join_type) + .field("join_schema", &self.join_schema) + .field("random_state", &self.random_state) + .field("metrics", &self.metrics) + .field("projection", &self.projection) + .field("column_indices", &self.column_indices) + .field("null_equality", &self.null_equality) + .field("cache", &self.cache) + // Explicitly exclude dynamic_filter to avoid runtime state differences in tests + .finish() + } +} + + +impl EmbeddedProjection for GraceHashJoinExec { + fn with_projection(&self, projection: Option>) -> Result { + self.with_projection(projection) + } +} + +impl GraceHashJoinExec { + /// Tries to create a new [GraceHashJoinExec]. + /// + /// # Error + /// This function errors when it is not possible to join the left and right sides on keys `on`. + #[allow(clippy::too_many_arguments)] + pub fn try_new( + left: Arc, + right: Arc, + on: JoinOn, + filter: Option, + join_type: &JoinType, + projection: Option>, + null_equality: NullEquality, + ) -> Result { + let left_schema = left.schema(); + let right_schema = right.schema(); + if on.is_empty() { + return plan_err!("On constraints in HashJoinExec should be non-empty"); + } + check_join_is_valid(&left_schema, &right_schema, &on)?; + + let (join_schema, column_indices) = + build_join_schema(&left_schema, &right_schema, join_type); + + let random_state = HASH_JOIN_SEED; + + let join_schema = Arc::new(join_schema); + + // check if the projection is valid + can_project(&join_schema, projection.as_ref())?; + + let cache = Self::compute_properties( + &left, + &right, + Arc::clone(&join_schema), + *join_type, + &on, + projection.as_ref(), + )?; + let partitions = left.output_partitioning().partition_count(); + let accumulator = GraceAccumulator::new(partitions); + + + let metrics = ExecutionPlanMetricsSet::new(); + let runtime = Arc::new(RuntimeEnv::default()); + let spill_left = Arc::new(SpillManager::new( + Arc::clone(&runtime), + SpillMetrics::new(&metrics, 0), + Arc::clone(&left_schema), + )); + let spill_right = Arc::new(SpillManager::new( + Arc::clone(&runtime), + SpillMetrics::new(&metrics, 0), + Arc::clone(&right_schema), + )); + + // Initialize both dynamic filter and bounds accumulator to None + // They will be set later if dynamic filtering is enabled + Ok(GraceHashJoinExec { + left, + right, + on, + filter, + join_type: *join_type, + join_schema, + random_state, + metrics, + projection, + column_indices, + null_equality, + cache, + dynamic_filter: None, + accumulator, + spill_left, + spill_right, + }) + } + + fn create_dynamic_filter(on: &JoinOn) -> Arc { + // Extract the right-side keys (probe side keys) from the `on` clauses + // Dynamic filter will be created from build side values (left side) and applied to probe side (right side) + let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect(); + // Initialize with a placeholder expression (true) that will be updated when the hash table is built + Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true))) + } + + /// left (build) side which gets hashed + pub fn left(&self) -> &Arc { + &self.left + } + + /// right (probe) side which are filtered by the hash table + pub fn right(&self) -> &Arc { + &self.right + } + + /// Set of common columns used to join on + pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)] { + &self.on + } + + /// Filters applied before join output + pub fn filter(&self) -> Option<&JoinFilter> { + self.filter.as_ref() + } + + /// How the join is performed + pub fn join_type(&self) -> &JoinType { + &self.join_type + } + + /// The schema after join. Please be careful when using this schema, + /// if there is a projection, the schema isn't the same as the output schema. + pub fn join_schema(&self) -> &SchemaRef { + &self.join_schema + } + + /// Get null_equality + pub fn null_equality(&self) -> NullEquality { + self.null_equality + } + + /// Calculate order preservation flags for this hash join. + fn maintains_input_order(join_type: JoinType) -> Vec { + vec![ + false, + matches!( + join_type, + JoinType::Inner + | JoinType::Right + | JoinType::RightAnti + | JoinType::RightSemi + | JoinType::RightMark + ), + ] + } + + /// Get probe side information for the hash join. + pub fn probe_side() -> JoinSide { + // In current implementation right side is always probe side. + JoinSide::Right + } + + /// Return whether the join contains a projection + pub fn contains_projection(&self) -> bool { + self.projection.is_some() + } + + /// Return new instance of [HashJoinExec] with the given projection. + pub fn with_projection(&self, projection: Option>) -> Result { + // check if the projection is valid + can_project(&self.schema(), projection.as_ref())?; + let projection = match projection { + Some(projection) => match &self.projection { + Some(p) => Some(projection.iter().map(|i| p[*i]).collect()), + None => Some(projection), + }, + None => None, + }; + Self::try_new( + Arc::clone(&self.left), + Arc::clone(&self.right), + self.on.clone(), + self.filter.clone(), + &self.join_type, + projection, + self.null_equality, + ) + } + + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + fn compute_properties( + left: &Arc, + right: &Arc, + schema: SchemaRef, + join_type: JoinType, + on: JoinOnRef, + projection: Option<&Vec>, + ) -> Result { + // Calculate equivalence properties: + let mut eq_properties = join_equivalence_properties( + left.equivalence_properties().clone(), + right.equivalence_properties().clone(), + &join_type, + Arc::clone(&schema), + &Self::maintains_input_order(join_type), + Some(Self::probe_side()), + on, + )?; + + let mut output_partitioning = symmetric_join_output_partitioning(left, right, &join_type)?; + let emission_type = if left.boundedness().is_unbounded() { + EmissionType::Final + } else if right.pipeline_behavior() == EmissionType::Incremental { + match join_type { + // If we only need to generate matched rows from the probe side, + // we can emit rows incrementally. + JoinType::Inner + | JoinType::LeftSemi + | JoinType::RightSemi + | JoinType::Right + | JoinType::RightAnti + | JoinType::RightMark => EmissionType::Incremental, + // If we need to generate unmatched rows from the *build side*, + // we need to emit them at the end. + JoinType::Left + | JoinType::LeftAnti + | JoinType::LeftMark + | JoinType::Full => EmissionType::Both, + } + } else { + right.pipeline_behavior() + }; + + // If contains projection, update the PlanProperties. + if let Some(projection) = projection { + // construct a map from the input expressions to the output expression of the Projection + let projection_mapping = + ProjectionMapping::from_indices(projection, &schema)?; + let out_schema = project_schema(&schema, Some(projection))?; + output_partitioning = + output_partitioning.project(&projection_mapping, &eq_properties); + eq_properties = eq_properties.project(&projection_mapping, out_schema); + } + + Ok(PlanProperties::new( + eq_properties, + output_partitioning, + emission_type, + boundedness_from_children([left, right]), + )) + } + + /// Returns a new `ExecutionPlan` that computes the same join as this one, + /// with the left and right inputs swapped using the specified + /// `partition_mode`. + /// + /// # Notes: + /// + /// This function is public so other downstream projects can use it to + /// construct `HashJoinExec` with right side as the build side. + /// + /// For using this interface directly, please refer to below: + /// + /// Hash join execution may require specific input partitioning (for example, + /// the left child may have a single partition while the right child has multiple). + /// + /// Calling this function on join nodes whose children have already been repartitioned + /// (e.g., after a `RepartitionExec` has been inserted) may break the partitioning + /// requirements of the hash join. Therefore, ensure you call this function + /// before inserting any repartitioning operators on the join's children. + /// + /// In DataFusion's default SQL interface, this function is used by the `JoinSelection` + /// physical optimizer rule to determine a good join order, which is + /// executed before the `EnforceDistribution` rule (the rule that may + /// insert `RepartitionExec` operators). + pub fn swap_inputs( + &self, + partition_mode: PartitionMode, + ) -> Result> { + let left = self.left(); + let right = self.right(); + let new_join = GraceHashJoinExec::try_new( + Arc::clone(right), + Arc::clone(left), + self.on() + .iter() + .map(|(l, r)| (Arc::clone(r), Arc::clone(l))) + .collect(), + self.filter().map(JoinFilter::swap), + &self.join_type().swap(), + swap_join_projection( + left.schema().fields().len(), + right.schema().fields().len(), + self.projection.as_ref(), + self.join_type(), + ), + self.null_equality(), + )?; + // In case of anti / semi joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again + if matches!( + self.join_type(), + JoinType::LeftSemi + | JoinType::RightSemi + | JoinType::LeftAnti + | JoinType::RightAnti + ) || self.projection.is_some() + { + Ok(Arc::new(new_join)) + } else { + reorder_output_after_swap(Arc::new(new_join), &left.schema(), &right.schema()) + } + } +} + +impl DisplayAs for GraceHashJoinExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let display_filter = self.filter.as_ref().map_or_else( + || "".to_string(), + |f| format!(", filter={}", f.expression()), + ); + let display_projections = if self.contains_projection() { + format!( + ", projection=[{}]", + self.projection + .as_ref() + .unwrap() + .iter() + .map(|index| format!( + "{}@{}", + self.join_schema.fields().get(*index).unwrap().name(), + index + )) + .collect::>() + .join(", ") + ) + } else { + "".to_string() + }; + let on = self + .on + .iter() + .map(|(c1, c2)| format!("({c1}, {c2})")) + .collect::>() + .join(", "); + write!( + f, + "GraceHashJoinExec: join_type={:?}, on=[{}]{}{}", + self.join_type, on, display_filter, display_projections, + ) + } + DisplayFormatType::TreeRender => { + let on = self + .on + .iter() + .map(|(c1, c2)| { + format!("({} = {})", fmt_sql(c1.as_ref()), fmt_sql(c2.as_ref())) + }) + .collect::>() + .join(", "); + + if *self.join_type() != JoinType::Inner { + writeln!(f, "join_type={:?}", self.join_type)?; + } + + writeln!(f, "on={on}")?; + + if let Some(filter) = self.filter.as_ref() { + writeln!(f, "filter={filter}")?; + } + + Ok(()) + } + } + } +} + +impl ExecutionPlan for GraceHashJoinExec { + fn name(&self) -> &'static str { + "GraceHashJoinExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn required_input_distribution(&self) -> Vec { + let (left_expr, right_expr) = self + .on + .iter() + .map(|(l, r)| (Arc::clone(l), Arc::clone(r))) + .unzip(); + vec![ + Distribution::HashPartitioned(left_expr), + Distribution::HashPartitioned(right_expr), + ] + } + + // For [JoinType::Inner] and [JoinType::RightSemi] in hash joins, the probe phase initiates by + // applying the hash function to convert the join key(s) in each row into a hash value from the + // probe side table in the order they're arranged. The hash value is used to look up corresponding + // entries in the hash table that was constructed from the build side table during the build phase. + // + // Because of the immediate generation of result rows once a match is found, + // the output of the join tends to follow the order in which the rows were read from + // the probe side table. This is simply due to the sequence in which the rows were processed. + // Hence, it appears that the hash join is preserving the order of the probe side. + // + // Meanwhile, in the case of a [JoinType::RightAnti] hash join, + // the unmatched rows from the probe side are also kept in order. + // This is because the **`RightAnti`** join is designed to return rows from the right + // (probe side) table that have no match in the left (build side) table. Because the rows + // are processed sequentially in the probe phase, and unmatched rows are directly output + // as results, these results tend to retain the order of the probe side table. + fn maintains_input_order(&self) -> Vec { + Self::maintains_input_order(self.join_type) + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.left, &self.right] + } + + /// Creates a new HashJoinExec with different children while preserving configuration. + /// + /// This method is called during query optimization when the optimizer creates new + /// plan nodes. Importantly, it creates a fresh bounds_accumulator via `try_new` + /// rather than cloning the existing one because partitioning may have changed. + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + let partitions = children[0].output_partitioning().partition_count(); + Ok(Arc::new(GraceHashJoinExec { + left: Arc::clone(&children[0]), + right: Arc::clone(&children[1]), + on: self.on.clone(), + filter: self.filter.clone(), + join_type: self.join_type, + join_schema: Arc::clone(&self.join_schema), + random_state: self.random_state.clone(), + metrics: ExecutionPlanMetricsSet::new(), + projection: self.projection.clone(), + column_indices: self.column_indices.clone(), + null_equality: self.null_equality, + cache: Self::compute_properties( + &children[0], + &children[1], + Arc::clone(&self.join_schema), + self.join_type, + &self.on, + self.projection.as_ref(), + )?, + // Keep the dynamic filter, bounds accumulator will be reset + dynamic_filter: self.dynamic_filter.clone(), + accumulator: Arc::clone(&self.accumulator), + spill_left: Arc::clone(&self.spill_left), + spill_right: Arc::clone(&self.spill_right), + })) + } + + fn reset_state(self: Arc) -> Result> { + Ok(Arc::new(GraceHashJoinExec { + left: Arc::clone(&self.left), + right: Arc::clone(&self.right), + on: self.on.clone(), + filter: self.filter.clone(), + join_type: self.join_type, + join_schema: Arc::clone(&self.join_schema), + random_state: self.random_state.clone(), + metrics: ExecutionPlanMetricsSet::new(), + projection: self.projection.clone(), + column_indices: self.column_indices.clone(), + null_equality: self.null_equality, + cache: self.cache.clone(), + // Reset dynamic filter and bounds accumulator to initial state + dynamic_filter: None, + accumulator: Arc::clone(&self.accumulator), + spill_left: Arc::clone(&self.spill_left), + spill_right: Arc::clone(&self.spill_right), + })) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + let left_partitions = self.left.output_partitioning().partition_count(); + let right_partitions = self.right.output_partitioning().partition_count(); + + if left_partitions != right_partitions { + return internal_err!( + "Invalid GraceHashJoinExec, partition count mismatch {left_partitions}!={right_partitions},\ + consider using RepartitionExec" + ); + } + + let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some(); + + let join_metrics = Arc::new(BuildProbeJoinMetrics::new(partition, &self.metrics)); + + let left = self.left.execute(partition, Arc::clone(&context))?; + let left_schema = Arc::clone(&self.left.schema()); + let on_left = self + .on + .iter() + .map(|(left_expr, _)| Arc::clone(left_expr)) + .collect::>(); + + let right = self.right.execute(partition, Arc::clone(&context))?; + let right_schema = Arc::clone(&self.right.schema()); + let on_right = self + .on + .iter() + .map(|(_, right_expr)| Arc::clone(right_expr)) + .collect::>(); + + let spill_left = Arc::new(SpillManager::new( + Arc::clone(&context.runtime_env()), + SpillMetrics::new(&self.metrics, partition), + Arc::clone(&left_schema), + )); + let spill_right = Arc::new(SpillManager::new( + Arc::clone(&context.runtime_env()), + SpillMetrics::new(&self.metrics, partition), + Arc::clone(&right_schema), + )); + + // update column indices to reflect the projection + let column_indices_after_projection = match &self.projection { + Some(projection) => projection + .iter() + .map(|i| self.column_indices[*i].clone()) + .collect(), + None => self.column_indices.clone(), + }; + + let random_state = self.random_state.clone(); + let on = self.on.clone(); + let spill_left_clone = Arc::clone(&spill_left); + let spill_right_clone = Arc::clone(&spill_right); + let accumulator_clone = Arc::clone(&self.accumulator); + let join_metrics_clone = Arc::clone(&join_metrics); + let spill_fut = OnceFut::new(async move { + let (left_idx, right_idx) = partition_and_spill( + random_state, + on, + left, + right, + join_metrics_clone, + enable_dynamic_filter_pushdown, + left_partitions, + spill_left_clone, + spill_right_clone, + partition, + ).await?; + accumulator_clone.report_partition(partition, left_idx.clone(), right_idx.clone()).await; + Ok(SpillFut::new(partition, left_idx, right_idx)) + }); + + Ok(Box::pin(GraceHashJoinStream::new( + self.schema(), + spill_fut, + spill_left, + spill_right, + on_left, + on_right, + self.random_state.clone(), + self.join_type, + column_indices_after_projection, + join_metrics, + context, + Arc::clone(&self.accumulator), + ))) + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn statistics(&self) -> Result { + self.partition_statistics(None) + } + + fn partition_statistics(&self, partition: Option) -> Result { + if partition.is_some() { + return Ok(Statistics::new_unknown(&self.schema())); + } + // TODO stats: it is not possible in general to know the output size of joins + // There are some special cases though, for example: + // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` + let stats = estimate_join_statistics( + self.left.partition_statistics(None)?, + self.right.partition_statistics(None)?, + self.on.clone(), + &self.join_type, + &self.join_schema, + )?; + // Project statistics if there is a projection + Ok(stats.project(self.projection.as_ref())) + } + + /// Tries to push `projection` down through `hash_join`. If possible, performs the + /// pushdown and returns a new [`HashJoinExec`] as the top plan which has projections + /// as its children. Otherwise, returns `None`. + fn try_swapping_with_projection( + &self, + projection: &ProjectionExec, + ) -> Result>> { + // TODO: currently if there is projection in GraceHashJoinExec, we can't push down projection to left or right input. Maybe we can pushdown the mixed projection later. + if self.contains_projection() { + return Ok(None); + } + + if let Some(JoinData { + projected_left_child, + projected_right_child, + join_filter, + join_on, + }) = try_pushdown_through_join( + projection, + self.left(), + self.right(), + self.on(), + self.schema(), + self.filter(), + )? { + Ok(Some(Arc::new(GraceHashJoinExec::try_new( + Arc::new(projected_left_child), + Arc::new(projected_right_child), + join_on, + join_filter, + self.join_type(), + // Returned early if projection is not None + None, + self.null_equality, + )?))) + } else { + try_embed_projection(projection, self) + } + } + + fn gather_filters_for_pushdown( + &self, + phase: FilterPushdownPhase, + parent_filters: Vec>, + config: &ConfigOptions, + ) -> Result { + // Other types of joins can support *some* filters, but restrictions are complex and error prone. + // For now we don't support them. + // See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs + // See https://github.com/apache/datafusion/issues/16973 for tracking. + if self.join_type != JoinType::Inner { + return Ok(FilterDescription::all_unsupported( + &parent_filters, + &self.children(), + )); + } + + // Get basic filter descriptions for both children + let left_child = crate::filter_pushdown::ChildFilterDescription::from_child( + &parent_filters, + self.left(), + )?; + let mut right_child = crate::filter_pushdown::ChildFilterDescription::from_child( + &parent_filters, + self.right(), + )?; + + // Add dynamic filters in Post phase if enabled + if matches!(phase, FilterPushdownPhase::Post) + && config.optimizer.enable_dynamic_filter_pushdown + { + // Add actual dynamic filter to right side (probe side) + let dynamic_filter = Self::create_dynamic_filter(&self.on); + right_child = right_child.with_self_filter(dynamic_filter); + } + + Ok(FilterDescription::new() + .with_child(left_child) + .with_child(right_child)) + } + + fn handle_child_pushdown_result( + &self, + _phase: FilterPushdownPhase, + child_pushdown_result: ChildPushdownResult, + _config: &ConfigOptions, + ) -> Result>> { + // Note: this check shouldn't be necessary because we already marked all parent filters as unsupported for + // non-inner joins in `gather_filters_for_pushdown`. + // However it's a cheap check and serves to inform future devs touching this function that they need to be really + // careful pushing down filters through non-inner joins. + if self.join_type != JoinType::Inner { + // Other types of joins can support *some* filters, but restrictions are complex and error prone. + // For now we don't support them. + // See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs + return Ok(FilterPushdownPropagation::all_unsupported( + child_pushdown_result, + )); + } + + let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone()); + assert_eq!(child_pushdown_result.self_filters.len(), 2); // Should always be 2, we have 2 children + let right_child_self_filters = &child_pushdown_result.self_filters[1]; // We only push down filters to the right child + // We expect 0 or 1 self filters + if let Some(filter) = right_child_self_filters.first() { + // Note that we don't check PushdDownPredicate::discrimnant because even if nothing said + // "yes, I can fully evaluate this filter" things might still use it for statistics -> it's worth updating + let predicate = Arc::clone(&filter.predicate); + if let Ok(dynamic_filter) = + Arc::downcast::(predicate) + { + // We successfully pushed down our self filter - we need to make a new node with the dynamic filter + let new_node = Arc::new(GraceHashJoinExec { + left: Arc::clone(&self.left), + right: Arc::clone(&self.right), + on: self.on.clone(), + filter: self.filter.clone(), + join_type: self.join_type, + join_schema: Arc::clone(&self.join_schema), + random_state: self.random_state.clone(), + metrics: ExecutionPlanMetricsSet::new(), + projection: self.projection.clone(), + column_indices: self.column_indices.clone(), + null_equality: self.null_equality, + cache: self.cache.clone(), + dynamic_filter: Some(HashJoinExecDynamicFilter { + filter: dynamic_filter, + bounds_accumulator: OnceLock::new(), + }), + spill_left: Arc::clone(&self.spill_left), + spill_right: Arc::clone(&self.spill_right), + accumulator: Arc::clone(&self.accumulator), + }); + result = result.with_updated_node(new_node as Arc); + } + } + Ok(result) + } +} + +/// Accumulator for collecting min/max bounds from build-side data during hash join. +/// +/// This struct encapsulates the logic for progressively computing column bounds +/// (minimum and maximum values) for a specific join key expression as batches +/// are processed during the build phase of a hash join. +/// +/// The bounds are used for dynamic filter pushdown optimization, where filters +/// based on the actual data ranges can be pushed down to the probe side to +/// eliminate unnecessary data early. +struct CollectLeftAccumulator { + /// The physical expression to evaluate for each batch + expr: Arc, + /// Accumulator for tracking the minimum value across all batches + min: MinAccumulator, + /// Accumulator for tracking the maximum value across all batches + max: MaxAccumulator, +} + +pub async fn partition_and_spill( + random_state: RandomState, + on: Vec<(PhysicalExprRef, PhysicalExprRef)>, + mut left_stream: SendableRecordBatchStream, + mut right_stream: SendableRecordBatchStream, + join_metrics: Arc, + enable_dynamic_filter_pushdown: bool, + partition_count: usize, + spill_left: Arc, + spill_right: Arc, + partition: usize, +) -> Result<(Vec, Vec)> { + let on_left: Vec<_> = on.iter().map(|(l, _)| Arc::clone(l)).collect(); + let on_right: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect(); + + // === LEFT side partitioning === + let left_index = partition_and_spill_one_side( + &mut left_stream, + &on_left, + &random_state, + partition_count, + spill_left, + &join_metrics, + enable_dynamic_filter_pushdown, + &format!("left_{partition}"), + ) + .await?; + + // === RIGHT side partitioning === + let right_index = partition_and_spill_one_side( + &mut right_stream, + &on_right, + &random_state, + partition_count, + spill_right, + &join_metrics, + enable_dynamic_filter_pushdown, + &format!("right_{partition}"), + ) + .await?; + Ok((left_index, right_index)) +} + +async fn partition_and_spill_one_side( + input: &mut SendableRecordBatchStream, + on_exprs: &[PhysicalExprRef], + random_state: &RandomState, + partition_count: usize, + spill_manager: Arc, + join_metrics: &BuildProbeJoinMetrics, + enable_dynamic_filter_pushdown: bool, + file_request_msg: &str, +) -> Result> { + let mut partitions: Vec = (0..partition_count) + .map(|_| PartitionWriter::new(Arc::clone(&spill_manager))) + .collect(); + + let mut total_rows = 0usize; + + while let Some(batch) = input.next().await { + let batch = batch?; + let num_rows = batch.num_rows(); + if num_rows == 0 { + continue; + } + + total_rows += num_rows; + join_metrics.build_input_batches.add(1); + join_metrics.build_input_rows.add(num_rows); + + // Calculate hashes + let keys = on_exprs + .iter() + .map(|c| c.evaluate(&batch)?.into_array(num_rows)) + .collect::>>()?; + + let mut hashes = vec![0u64; num_rows]; + create_hashes(&keys, random_state, &mut hashes)?; + + // Spread to partitions + let mut indices: Vec> = vec![Vec::new(); partition_count]; + for (row, h) in hashes.iter().enumerate() { + let bucket = (*h as usize) % partition_count; + indices[bucket].push(row as u32); + } + + // Collect and spill + for (i, idxs) in indices.into_iter().enumerate() { + if idxs.is_empty() { + continue; + } + let idx_array = UInt32Array::from(idxs); + let taken = batch + .columns() + .iter() + .map(|c| take(c.as_ref(), &idx_array, None)) + .collect::>>()?; + let part_batch = RecordBatch::try_new(batch.schema(), taken)?; + let request_msg = format!("grace_partition_{file_request_msg}_{i}"); + partitions[i].spill_batch_auto(&part_batch, &request_msg)?; + } + } + + // Prepare indexes + let mut result = Vec::with_capacity(partitions.len()); + for (i, writer) in partitions.into_iter().enumerate() { + result.push(writer.finish(i)?); + } + + Ok(result) +} + +#[derive(Debug)] +pub struct PartitionWriter { + spill_manager: Arc, + total_rows: usize, + total_bytes: usize, + chunks: Vec, +} + +impl PartitionWriter { + pub fn new(spill_manager: Arc) -> Self { + Self { + spill_manager, + total_rows: 0, + total_bytes: 0, + chunks: vec![], + } + } + + pub fn spill_batch_auto(&mut self, batch: &RecordBatch, request_msg: &str) -> Result<()> { + let loc = self.spill_manager.spill_batch_auto(batch, request_msg)?; + self.total_rows += batch.num_rows(); + self.total_bytes += get_record_batch_memory_size(batch); + self.chunks.push(loc); + Ok(()) + } + + pub fn finish(self, part_id: usize) -> Result { + Ok(PartitionIndex { + part_id, + chunks: self.chunks, + total_rows: self.total_rows, + total_bytes: self.total_bytes, + }) + } +} + +/// Describes a single partition of spilled data (used in GraceHashJoin). +/// +/// Each partition can consist of one or multiple chunks (batches) +/// that were spilled either to memory or to disk. +/// These chunks are later reloaded during the join phase. +/// +/// Example: +/// Partition 3 -> [ spill_chunk_3_0.arrow, spill_chunk_3_1.arrow ] +#[derive(Debug, Clone)] +pub struct PartitionIndex { + /// Unique partition identifier (0..N-1) + pub part_id: usize, + + /// Total number of rows in this partition + pub total_rows: usize, + + /// Total size in bytes of all batches in this partition + pub total_bytes: usize, + + /// Collection of spill locations (each corresponds to one batch written + /// by [`PartitionWriter::spill_batch_auto`]) + pub chunks: Vec, +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::coalesce_partitions::CoalescePartitionsExec; + use crate::test::{assert_join_metrics, TestMemoryExec}; + use crate::{ + common, expressions::Column, repartition::RepartitionExec, test::build_table_i32, + test::exec::MockExec, + }; + + use arrow::array::{Date32Array, Int32Array, StructArray, UInt32Array, UInt64Array}; + use arrow::buffer::NullBuffer; + use arrow::datatypes::{DataType, Field}; + use arrow::util::pretty::print_batches; + use arrow_schema::Schema; + use futures::future; + use datafusion_common::hash_utils::create_hashes; + use datafusion_common::test_util::{batches_to_sort_string, batches_to_string}; + use datafusion_common::{ + assert_batches_eq, assert_batches_sorted_eq, assert_contains, exec_err, + ScalarValue, + }; + use datafusion_execution::config::SessionConfig; + use hashbrown::HashTable; + use insta::{allow_duplicates, assert_snapshot}; + use rstest::*; + use rstest_reuse::*; + use datafusion_execution::runtime_env::RuntimeEnvBuilder; + + fn build_large_table( + a_name: &str, + b_name: &str, + c_name: &str, + n: usize, + ) -> Arc { + let a: ArrayRef = Arc::new(Int32Array::from_iter_values(1..=n as i32)); + let b: ArrayRef = Arc::new(Int32Array::from_iter_values(1..=n as i32)); + let c: ArrayRef = Arc::new(Int32Array::from_iter_values((1..=n as i32).map(|x| x * 10))); + + let schema = Arc::new(arrow::datatypes::Schema::new(vec![ + arrow::datatypes::Field::new(a_name, arrow::datatypes::DataType::Int32, false), + arrow::datatypes::Field::new(b_name, arrow::datatypes::DataType::Int32, false), + arrow::datatypes::Field::new(c_name, arrow::datatypes::DataType::Int32, false), + ])); + + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![a, b, c]).unwrap(); + + // MemoryExec требует список партиций: Vec> + Arc::new(TestMemoryExec::try_new(&[vec![batch]], schema, None).unwrap()) + } + + fn build_table( + a: (&str, &Vec), + b: (&str, &Vec), + c: (&str, &Vec), + ) -> Arc { + let batch = build_table_i32(a, b, c); + let schema = batch.schema(); + TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap() + } + + #[tokio::test] + async fn single_partition_join_overallocation() -> Result<()> { + // let left = build_table( + // ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]), + // ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]), + // ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]), + // ); + // let right = build_table( + // ("a2", &vec![1, 2]), + // ("b2", &vec![1, 2]), + // ("c2", &vec![14, 15]), + // ); + let left = build_large_table("a1", "b1", "c1", 100_000); + let right = build_large_table("a2", "b2", "c2", 50_000); + let on = vec![( + Arc::new(Column::new_with_schema("a1", &left.schema()).unwrap()) as _, + Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _, + )]; + let (left_expr, right_expr) = on + .iter() + .map(|(l, r)| (Arc::clone(l), Arc::clone(r))) + .unzip(); + let left_repartitioned: Arc = Arc::new(RepartitionExec::try_new( + left, + Partitioning::Hash(left_expr, 32), + )?); + let right_repartitioned: Arc = Arc::new(RepartitionExec::try_new( + right, + Partitioning::Hash(right_expr, 32), + )?); + + let runtime = RuntimeEnvBuilder::new() + .with_memory_limit(50_000_000_000, 1.0) + .build_arc()?; + let task_ctx = TaskContext::default().with_runtime(runtime); + let task_ctx = Arc::new(task_ctx); + + let join = GraceHashJoinExec::try_new( + Arc::clone(&left_repartitioned), + Arc::clone(&right_repartitioned), + on.clone(), + None, + &JoinType::Inner, + None, + NullEquality::NullEqualsNothing, + )?; + + let partition_count = right_repartitioned.output_partitioning().partition_count(); + println!("partition_count {partition_count}"); + + let tasks: Vec<_> = (0..partition_count) + .map(|i| { + let ctx = Arc::clone(&task_ctx); + let s = join.execute(i, ctx).unwrap(); + async move { common::collect(s).await } + }) + .collect(); + + let results = future::join_all(tasks).await; + let mut batches = Vec::new(); + for r in results { + let mut v = r?; + v.retain(|b| b.num_rows() > 0); + batches.extend(v); + } + + print_batches(&*batches).unwrap(); + // Asserting that operator-level reservation attempting to overallocate + // assert_contains!( + // err.to_string(), + // "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n HashJoinInput" + // ); + // + // assert_contains!( + // err.to_string(), + // "Failed to allocate additional 120.0 B for HashJoinInput" + // ); + Ok(()) + } +} diff --git a/datafusion/physical-plan/src/joins/grace_hash_join/mod.rs b/datafusion/physical-plan/src/joins/grace_hash_join/mod.rs new file mode 100644 index 000000000000..55d7e2035e6c --- /dev/null +++ b/datafusion/physical-plan/src/joins/grace_hash_join/mod.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`GraceHashJoinExec`] Partitioned Hash Join Operator + +pub use exec::GraceHashJoinExec; + +mod exec; +mod stream; diff --git a/datafusion/physical-plan/src/joins/grace_hash_join/stream.rs b/datafusion/physical-plan/src/joins/grace_hash_join/stream.rs new file mode 100644 index 000000000000..417990da43f1 --- /dev/null +++ b/datafusion/physical-plan/src/joins/grace_hash_join/stream.rs @@ -0,0 +1,406 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Stream implementation for Hash Join +//! +//! This module implements [`HashJoinStream`], the streaming engine for +//! [`super::HashJoinExec`]. See comments in [`HashJoinStream`] for more details. + +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use crate::joins::utils::{ + equal_rows_arr, get_final_indices_from_shared_bitmap, OnceFut, +}; +use crate::{handle_state, hash_utils::create_hashes, joins::join_hash_map::JoinHashMapOffset, joins::utils::{ + adjust_indices_by_join_type, apply_join_filter_to_indices, + build_batch_empty_build_side, build_batch_from_indices, + need_produce_result_in_final, BuildProbeJoinMetrics, ColumnIndex, JoinFilter, + JoinHashMapType, StatefulStreamResult, +}, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, SpillManager}; + +use arrow::array::{ArrayRef, UInt32Array, UInt64Array}; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use datafusion_common::{ + internal_datafusion_err, internal_err, JoinSide, JoinType, NullEquality, Result, +}; +use datafusion_physical_expr::PhysicalExprRef; +use ahash::RandomState; +use futures::{ready, FutureExt, Stream, StreamExt}; +use tokio::sync::Mutex; +use datafusion_execution::TaskContext; +use crate::empty::EmptyExec; +use crate::joins::grace_hash_join::exec::PartitionIndex; +use crate::joins::{HashJoinExec, PartitionMode}; +use crate::memory::MemoryStream; +use crate::stream::RecordBatchStreamAdapter; +use crate::test::TestMemoryExec; + +enum GraceJoinState { + /// Waiting for the partitioning phase (Phase 1) to finish + WaitPartitioning, + + WaitAllPartitions { + wait_all_fut: Option>>, + }, + + /// Currently joining partition `current` + JoinPartition { + current: usize, + all_parts: Arc>, + current_stream: Option, + left_fut: Option>>, + right_fut: Option>>, + }, + + Done, +} + +/// Container for HashJoinStreamState::ProcessProbeBatch related data +#[derive(Debug, Clone)] +pub(super) struct ProcessProbeBatchState { + /// Current probe-side batch + batch: RecordBatch, + /// Probe-side on expressions values + values: Vec, + /// Starting offset for JoinHashMap lookups + offset: JoinHashMapOffset, + /// Max joined probe-side index from current batch + joined_probe_idx: Option, +} + +pub struct GraceHashJoinStream { + schema: SchemaRef, + spill_fut: OnceFut, + spill_left: Arc, + spill_right: Arc, + on_left: Vec, + on_right: Vec, + random_state: RandomState, + join_type: JoinType, + column_indices: Vec, + join_metrics: Arc, + context: Arc, + accumulator: Arc, + state: GraceJoinState, +} + +#[derive(Debug, Clone)] +pub struct SpillFut { + partition: usize, + left: Vec, + right: Vec +} +impl SpillFut { + pub(crate) fn new(partition: usize, left: Vec, right: Vec) -> Self { + SpillFut { + partition, + left, + right, + } + } +} + +impl RecordBatchStream for GraceHashJoinStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + +impl GraceHashJoinStream { + pub fn new( + schema: SchemaRef, + spill_fut: OnceFut, + spill_left: Arc, + spill_right: Arc, + on_left: Vec, + on_right: Vec, + random_state: RandomState, + join_type: JoinType, + column_indices: Vec, + join_metrics: Arc, + context: Arc, + accumulator: Arc, + ) -> Self { + Self { + schema, + spill_fut, + spill_left, + spill_right, + on_left, + on_right, + random_state, + join_type, + column_indices, + join_metrics, + context, + accumulator, + state: GraceJoinState::WaitPartitioning, + } + } + + /// Core state machine logic (poll implementation) + fn poll_next_impl(&mut self, cx: &mut Context<'_>) -> Poll>> { + loop { + match &mut self.state { + GraceJoinState::WaitPartitioning => { + let shared = ready!(self.spill_fut.get_shared(cx))?; + + let acc = Arc::clone(&self.accumulator); + let left = shared.left.clone(); + let right = shared.right.clone(); + let wait_all_fut = if shared.partition == 0 { + OnceFut::new(async move { + acc.report_partition(shared.partition, left, right).await; + let all = acc.wait_all().await; + Ok(all) + }) + } else { + OnceFut::new(async move { + acc.report_partition(shared.partition, left, right).await; + acc.wait_ready().await; + Ok(vec![]) + }) + }; + self.state = GraceJoinState::WaitAllPartitions { wait_all_fut: Some(wait_all_fut) }; + continue; + } + GraceJoinState::WaitAllPartitions { wait_all_fut } => { + if let Some(fut) = wait_all_fut { + let all_arc = ready!(fut.get_shared(cx))?; + let mut all = (*all_arc).clone(); + all.sort_by_key(|s| s.partition); + + self.state = GraceJoinState::JoinPartition { + current: 0, + all_parts: Arc::from(all), + current_stream: None, + left_fut: None, + right_fut: None, + }; + continue; + } else { + return Poll::Pending; + } + } + GraceJoinState::JoinPartition { + current, + all_parts, + current_stream, + left_fut, + right_fut, + } => { + if *current >= all_parts.len() { + self.state = GraceJoinState::Done; + continue; + } + + // If we don't have a stream yet, create one for the current partition pair + if current_stream.is_none() { + if left_fut.is_none() && right_fut.is_none() { + let spill_fut = &all_parts[*current]; + *left_fut = Some(load_partition_async(Arc::clone(&self.spill_left), spill_fut.left.clone())); + *right_fut = Some(load_partition_async(Arc::clone(&self.spill_right), spill_fut.right.clone())); + } + + let left_batches = (*ready!(left_fut.as_mut().unwrap().get_shared(cx))?).clone(); + let right_batches = (*ready!(right_fut.as_mut().unwrap().get_shared(cx))?).clone(); + + let stream = build_in_memory_join_stream( + Arc::clone(&self.schema), + left_batches, + right_batches, + &self.on_left, + &self.on_right, + self.random_state.clone(), + self.join_type, + &self.column_indices, + &self.join_metrics, + &self.context, + )?; + + *current_stream = Some(stream); + *left_fut = None; + *right_fut = None; + } + + // Drive current stream forward + if let Some(stream) = current_stream { + match ready!(stream.poll_next_unpin(cx)) { + Some(Ok(batch)) => return Poll::Ready(Some(Ok(batch))), + Some(Err(e)) => return Poll::Ready(Some(Err(e))), + None => { + *current += 1; + *current_stream = None; + continue; + } + } + } + } + GraceJoinState::Done => return Poll::Ready(None), + } + } + } +} + +fn load_partition_async( + spill_manager: Arc, + partitions: Vec, +) -> OnceFut> { + OnceFut::new(async move { + let mut all_batches = Vec::new(); + for p in partitions { + for chunk in p.chunks { + let mut reader = spill_manager.load_spilled_batch(&chunk)?; + while let Some(batch_result) = reader.next().await { + let batch = batch_result?; + all_batches.push(batch); + } + } + } + Ok(all_batches) + }) +} + +/// Build an in-memory HashJoinExec for one pair of spilled partitions +fn build_in_memory_join_stream( + output_schema: SchemaRef, + left_batches: Vec, + right_batches: Vec, + on_left: &[PhysicalExprRef], + on_right: &[PhysicalExprRef], + random_state: RandomState, + join_type: JoinType, + column_indices: &[ColumnIndex], + join_metrics: &BuildProbeJoinMetrics, + context: &Arc, +) -> Result { + if left_batches.is_empty() && right_batches.is_empty() { + return EmptyExec::new(output_schema).execute(0, Arc::clone(context)); + } + + let left_schema = left_batches + .first() + .map(|b| b.schema()) + .unwrap_or_else(|| Arc::new(arrow_schema::Schema::empty())); + + let right_schema = right_batches + .first() + .map(|b| b.schema()) + .unwrap_or_else(|| Arc::new(arrow_schema::Schema::empty())); + + // Build memory execution nodes for each side + let left_plan: Arc = + Arc::new(TestMemoryExec::try_new(&[left_batches], left_schema, None)?); + let right_plan: Arc = + Arc::new(TestMemoryExec::try_new(&[right_batches], right_schema, None)?); + + // Combine join expressions into pairs + let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = on_left + .iter() + .cloned() + .zip(on_right.iter().cloned()) + .collect(); + + // For one partition pair: always CollectLeft (build left, stream right) + let join_exec = HashJoinExec::try_new( + left_plan, + right_plan, + on, + None::, + &join_type, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + )?; + + // Each join executes locally with the same context + join_exec.execute(0, Arc::clone(context)) +} + +impl Stream for GraceHashJoinStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.poll_next_impl(cx) + } +} + +#[derive(Debug)] +pub struct GraceAccumulator { + expected: usize, + collected: Mutex>, + notify: tokio::sync::Notify, +} + +impl GraceAccumulator { + pub fn new(expected: usize) -> Arc { + Arc::new(Self { + expected, + collected: Mutex::new(vec![]), + notify: tokio::sync::Notify::new(), + }) + } + + pub async fn report_partition( + &self, + part_id: usize, + left_idx: Vec, + right_idx: Vec, + ) { + let mut guard = self.collected.lock().await; + if let Some(pos) = guard.iter().position(|s| s.partition == part_id) { + guard[pos] = SpillFut::new(part_id, left_idx, right_idx); + } else { + guard.push(SpillFut::new(part_id, left_idx, right_idx)); + } + + if guard.len() == self.expected { + self.notify.notify_waiters(); + } + } + + pub async fn wait_all( + &self, + ) -> Vec { + loop { + { + let guard = self.collected.lock().await; + if guard.len() == self.expected { + return guard.clone(); + } + } + self.notify.notified().await; + } + } + pub async fn wait_ready(&self) { + loop { + { + let guard = self.collected.lock().await; + if guard.len() == self.expected { + return; + } + } + self.notify.notified().await; + } + } +} \ No newline at end of file diff --git a/datafusion/physical-plan/src/joins/hash_join/mod.rs b/datafusion/physical-plan/src/joins/hash_join/mod.rs index 7f1e5cae13a3..612134604c7b 100644 --- a/datafusion/physical-plan/src/joins/hash_join/mod.rs +++ b/datafusion/physical-plan/src/joins/hash_join/mod.rs @@ -20,5 +20,5 @@ pub use exec::HashJoinExec; mod exec; -mod shared_bounds; +pub mod shared_bounds; mod stream; diff --git a/datafusion/physical-plan/src/joins/mod.rs b/datafusion/physical-plan/src/joins/mod.rs index 1d36db996434..14429ec55182 100644 --- a/datafusion/physical-plan/src/joins/mod.rs +++ b/datafusion/physical-plan/src/joins/mod.rs @@ -28,6 +28,8 @@ pub use sort_merge_join::SortMergeJoinExec; pub use symmetric_hash_join::SymmetricHashJoinExec; mod cross_join; mod hash_join; +mod grace_hash_join; + mod nested_loop_join; mod sort_merge_join; mod stream_join_utils; diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index d392650f88dd..1c49454f1a03 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1663,7 +1663,7 @@ pub fn update_hash( hashes_buffer: &mut Vec, deleted_offset: usize, fifo_hashmap: bool, -) -> Result<()> { +) -> Result> { // evaluate the keys let keys_values = on .iter() @@ -1688,7 +1688,7 @@ pub fn update_hash( hash_map.update_from_iter(Box::new(hash_values_iter), deleted_offset); } - Ok(()) + Ok(keys_values) } pub(super) fn equal_rows_arr( diff --git a/datafusion/physical-plan/src/spill/mod.rs b/datafusion/physical-plan/src/spill/mod.rs index 270b3654b2ba..9d19e72cb379 100644 --- a/datafusion/physical-plan/src/spill/mod.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -478,7 +478,7 @@ mod tests { assert_eq!(spilled_rows, num_rows); for spill in results { - let stream = spill_manager.load_spilled_batch(spill)?; + let stream = spill_manager.load_spilled_batch(&spill)?; let collected = collect(stream).await?; assert!(!collected.is_empty()); assert_eq!(collected[0].schema(), schema); diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs b/datafusion/physical-plan/src/spill/spill_manager.rs index e8f5037f764b..2c4d6b7844cd 100644 --- a/datafusion/physical-plan/src/spill/spill_manager.rs +++ b/datafusion/physical-plan/src/spill/spill_manager.rs @@ -173,30 +173,37 @@ impl SpillManager { /// Automatically decides whether to spill the given RecordBatch to memory or disk, /// depending on available memory pool capacity. pub(crate) fn spill_batch_auto(&self, batch: &RecordBatch, request_msg: &str) -> Result { - let size = batch.get_sliced_size()?; - - // Check current memory usage and total limit from the runtime memory pool - let used = self.env.memory_pool.reserved(); - let limit = match self.env.memory_pool.memory_limit() { - datafusion_execution::memory_pool::MemoryLimit::Finite(l) => l, - _ => usize::MAX, + let Some(file) = self.spill_record_batch_and_finish(slice::from_ref(batch), request_msg)? else { + return Err(DataFusionError::Execution( + "failed to spill batch to disk".into(), + )); }; - - // If there's enough memory (with a small safety margin), keep it in memory - if used + size * 3 / 2 <= limit { - let buf = Arc::new(InMemorySpillBuffer::from_batch(batch)?); - self.metrics.spilled_bytes.add(size); - self.metrics.spilled_rows.add(batch.num_rows()); - Ok(SpillLocation::Memory(buf)) - } else { - // Otherwise spill to disk using the existing SpillManager logic - let Some(file) = self.spill_record_batch_and_finish(slice::from_ref(batch), request_msg)? else { - return Err(DataFusionError::Execution( - "failed to spill batch to disk".into(), - )); - }; - Ok(SpillLocation::Disk(file)) - } + Ok(SpillLocation::Disk(Arc::new(file))) + // + // let size = batch.get_sliced_size()?; + // + // // Check current memory usage and total limit from the runtime memory pool + // let used = self.env.memory_pool.reserved(); + // let limit = match self.env.memory_pool.memory_limit() { + // datafusion_execution::memory_pool::MemoryLimit::Finite(l) => l, + // _ => usize::MAX, + // }; + // + // // If there's enough memory (with a small safety margin), keep it in memory + // if used + size * 3 / 2 <= limit { + // let buf = Arc::new(InMemorySpillBuffer::from_batch(batch)?); + // self.metrics.spilled_bytes.add(size); + // self.metrics.spilled_rows.add(batch.num_rows()); + // Ok(SpillLocation::Memory(buf)) + // } else { + // // Otherwise spill to disk using the existing SpillManager logic + // let Some(file) = self.spill_record_batch_and_finish(slice::from_ref(batch), request_msg)? else { + // return Err(DataFusionError::Execution( + // "failed to spill batch to disk".into(), + // )); + // }; + // Ok(SpillLocation::Disk(Arc::new(file))) + // } } pub fn spill_batches_auto( @@ -226,21 +233,33 @@ impl SpillManager { Ok(spawn_buffered(stream, self.batch_read_buffer_capacity)) } + pub fn read_spill_as_stream_ref( + &self, + spill_file_path: &RefCountedTempFile, + ) -> Result { + let stream = Box::pin(cooperative(SpillReaderStream::new( + Arc::clone(&self.schema), + spill_file_path.clone_refcounted()?, + ))); + + Ok(spawn_buffered(stream, self.batch_read_buffer_capacity)) + } + pub fn load_spilled_batch( &self, - spill: SpillLocation, + spill: &SpillLocation, ) -> Result { match spill { - SpillLocation::Memory(buf) => Ok(buf.as_stream(Arc::clone(&self.schema))?), - SpillLocation::Disk(file) => self.read_spill_as_stream(file), + SpillLocation::Memory(buf) => Ok(Arc::clone(&buf).as_stream(Arc::clone(&self.schema))?), + SpillLocation::Disk(file) => self.read_spill_as_stream_ref(file), } } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum SpillLocation { Memory(Arc), - Disk(RefCountedTempFile), + Disk(Arc), } From 0932d547c7f739265b38d9eae3ff7307b5d79367 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Thu, 6 Nov 2025 20:15:19 +0300 Subject: [PATCH 5/7] Upd --- .../src/joins/grace_hash_join/exec.rs | 165 +++++++++++------- .../src/joins/grace_hash_join/stream.rs | 1 + 2 files changed, 103 insertions(+), 63 deletions(-) diff --git a/datafusion/physical-plan/src/joins/grace_hash_join/exec.rs b/datafusion/physical-plan/src/joins/grace_hash_join/exec.rs index e25407c084bb..0fca133f79d7 100644 --- a/datafusion/physical-plan/src/joins/grace_hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/grace_hash_join/exec.rs @@ -117,8 +117,6 @@ pub struct GraceHashJoinExec { /// HashJoinExec also needs to keep a shared bounds accumulator for coordinating updates. dynamic_filter: Option, accumulator: Arc, - spill_left: Arc, - spill_right: Arc, } #[derive(Clone)] @@ -200,20 +198,7 @@ impl GraceHashJoinExec { let partitions = left.output_partitioning().partition_count(); let accumulator = GraceAccumulator::new(partitions); - let metrics = ExecutionPlanMetricsSet::new(); - let runtime = Arc::new(RuntimeEnv::default()); - let spill_left = Arc::new(SpillManager::new( - Arc::clone(&runtime), - SpillMetrics::new(&metrics, 0), - Arc::clone(&left_schema), - )); - let spill_right = Arc::new(SpillManager::new( - Arc::clone(&runtime), - SpillMetrics::new(&metrics, 0), - Arc::clone(&right_schema), - )); - // Initialize both dynamic filter and bounds accumulator to None // They will be set later if dynamic filtering is enabled Ok(GraceHashJoinExec { @@ -231,8 +216,6 @@ impl GraceHashJoinExec { cache, dynamic_filter: None, accumulator, - spill_left, - spill_right, }) } @@ -575,7 +558,6 @@ impl ExecutionPlan for GraceHashJoinExec { self: Arc, children: Vec>, ) -> Result> { - let partitions = children[0].output_partitioning().partition_count(); Ok(Arc::new(GraceHashJoinExec { left: Arc::clone(&children[0]), right: Arc::clone(&children[1]), @@ -599,8 +581,6 @@ impl ExecutionPlan for GraceHashJoinExec { // Keep the dynamic filter, bounds accumulator will be reset dynamic_filter: self.dynamic_filter.clone(), accumulator: Arc::clone(&self.accumulator), - spill_left: Arc::clone(&self.spill_left), - spill_right: Arc::clone(&self.spill_right), })) } @@ -621,8 +601,6 @@ impl ExecutionPlan for GraceHashJoinExec { // Reset dynamic filter and bounds accumulator to initial state dynamic_filter: None, accumulator: Arc::clone(&self.accumulator), - spill_left: Arc::clone(&self.spill_left), - spill_right: Arc::clone(&self.spill_right), })) } @@ -875,8 +853,6 @@ impl ExecutionPlan for GraceHashJoinExec { filter: dynamic_filter, bounds_accumulator: OnceLock::new(), }), - spill_left: Arc::clone(&self.spill_left), - spill_right: Arc::clone(&self.spill_right), accumulator: Arc::clone(&self.accumulator), }); result = result.with_updated_node(new_node as Arc); @@ -961,50 +937,61 @@ async fn partition_and_spill_one_side( .map(|_| PartitionWriter::new(Arc::clone(&spill_manager))) .collect(); + let mut buffered_batches = Vec::new(); let mut total_rows = 0usize; - + let schema = input.schema(); while let Some(batch) = input.next().await { let batch = batch?; - let num_rows = batch.num_rows(); - if num_rows == 0 { + if batch.num_rows() == 0 { continue; } - - total_rows += num_rows; + total_rows += batch.num_rows(); join_metrics.build_input_batches.add(1); - join_metrics.build_input_rows.add(num_rows); + join_metrics.build_input_rows.add(batch.num_rows()); + buffered_batches.push(batch); + } + if buffered_batches.is_empty() { + return Ok(Vec::new()); + } + // Create single batch to reduce number of spilled files + let single_batch = concat_batches(&schema, &buffered_batches)?; + let num_rows = single_batch.num_rows(); + if num_rows == 0 { + return Ok(Vec::new()); + } - // Calculate hashes - let keys = on_exprs - .iter() - .map(|c| c.evaluate(&batch)?.into_array(num_rows)) - .collect::>>()?; + // Calculate hashes + let keys = on_exprs + .iter() + .map(|c| c.evaluate(&single_batch)?.into_array(num_rows)) + .collect::>>()?; - let mut hashes = vec![0u64; num_rows]; - create_hashes(&keys, random_state, &mut hashes)?; + let mut hashes = vec![0u64; num_rows]; + create_hashes(&keys, random_state, &mut hashes)?; - // Spread to partitions - let mut indices: Vec> = vec![Vec::new(); partition_count]; - for (row, h) in hashes.iter().enumerate() { - let bucket = (*h as usize) % partition_count; - indices[bucket].push(row as u32); - } + // Spread to partitions + let mut indices: Vec> = vec![Vec::new(); partition_count]; + for (row, h) in hashes.iter().enumerate() { + let bucket = (*h as usize) % partition_count; + indices[bucket].push(row as u32); + } - // Collect and spill - for (i, idxs) in indices.into_iter().enumerate() { - if idxs.is_empty() { - continue; - } - let idx_array = UInt32Array::from(idxs); - let taken = batch - .columns() - .iter() - .map(|c| take(c.as_ref(), &idx_array, None)) - .collect::>>()?; - let part_batch = RecordBatch::try_new(batch.schema(), taken)?; - let request_msg = format!("grace_partition_{file_request_msg}_{i}"); - partitions[i].spill_batch_auto(&part_batch, &request_msg)?; + // Collect and spill + for (i, idxs) in indices.into_iter().enumerate() { + if idxs.is_empty() { + continue; } + + let idx_array = UInt32Array::from(idxs); + let taken = single_batch + .columns() + .iter() + .map(|c| take(c.as_ref(), &idx_array, None)) + .collect::>>()?; + + let part_batch = RecordBatch::try_new(single_batch.schema(), taken)?; + let request_msg = format!("grace_partition_{file_request_msg}_{i}"); + partitions[i].spill_batch_auto(&part_batch, &request_msg)?; } // Prepare indexes @@ -1012,7 +999,7 @@ async fn partition_and_spill_one_side( for (i, writer) in partitions.into_iter().enumerate() { result.push(writer.finish(i)?); } - + println!("spill_manager {:?}", spill_manager.metrics); Ok(result) } @@ -1104,6 +1091,7 @@ mod tests { use rstest::*; use rstest_reuse::*; use datafusion_execution::runtime_env::RuntimeEnvBuilder; + use crate::joins::HashJoinExec; fn build_large_table( a_name: &str, @@ -1112,7 +1100,7 @@ mod tests { n: usize, ) -> Arc { let a: ArrayRef = Arc::new(Int32Array::from_iter_values(1..=n as i32)); - let b: ArrayRef = Arc::new(Int32Array::from_iter_values(1..=n as i32)); + let b: ArrayRef = Arc::new(Int32Array::from_iter_values((1..=n as i32).map(|x| x * 2))); let c: ArrayRef = Arc::new(Int32Array::from_iter_values((1..=n as i32).map(|x| x * 10))); let schema = Arc::new(arrow::datatypes::Schema::new(vec![ @@ -1138,7 +1126,8 @@ mod tests { } #[tokio::test] - async fn single_partition_join_overallocation() -> Result<()> { + async fn single_partition_join_overallocation() -> Result<()> + { // let left = build_table( // ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]), // ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]), @@ -1149,13 +1138,13 @@ mod tests { // ("b2", &vec![1, 2]), // ("c2", &vec![14, 15]), // ); - let left = build_large_table("a1", "b1", "c1", 100_000); - let right = build_large_table("a2", "b2", "c2", 50_000); + let left = build_large_table("a1", "b1", "c1", 200); + let right = build_large_table("a2", "b2", "c2", 500); let on = vec![( Arc::new(Column::new_with_schema("a1", &left.schema()).unwrap()) as _, Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _, )]; - let (left_expr, right_expr) = on + let (left_expr, right_expr): (Vec>, Vec>) = on .iter() .map(|(l, r)| (Arc::clone(l), Arc::clone(r))) .unzip(); @@ -1216,4 +1205,54 @@ mod tests { // ); Ok(()) } + + #[tokio::test] + async fn single_partition_join_overallocation_f() -> Result<()> { + + let left = build_large_table("a1", "b1", "c1", 200); + let right = build_large_table("a2", "b2", "c2", 500); + let on = vec![( + Arc::new(Column::new_with_schema("a1", &left.schema()).unwrap()) as _, + Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _, + )]; + let (left_expr, right_expr): (Vec>, Vec>) = on + .iter() + .map(|(l, r)| (Arc::clone(l), Arc::clone(r))) + .unzip(); + let left_repartitioned: Arc = Arc::new(RepartitionExec::try_new( + left, + Partitioning::Hash(left_expr, 32), + )?); + let right_repartitioned: Arc = Arc::new(RepartitionExec::try_new( + right, + Partitioning::Hash(right_expr, 32), + )?); + + let join = HashJoinExec::try_new( + left_repartitioned, + right_repartitioned, + on, + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + NullEquality::NullEqualsNothing, + )?; + + let task_ctx = Arc::new(TaskContext::default()); + let mut batches = vec![]; + for i in 0..32 { + let stream = join.execute(i, Arc::clone(&task_ctx))?; + let more_batches = common::collect(stream).await?; + batches.extend( + more_batches + .into_iter() + .filter(|b| b.num_rows() > 0) + .collect::>(), + ); + } + print_batches(&*batches).unwrap(); + Ok(()) + } + } diff --git a/datafusion/physical-plan/src/joins/grace_hash_join/stream.rs b/datafusion/physical-plan/src/joins/grace_hash_join/stream.rs index 417990da43f1..97e48fc81a36 100644 --- a/datafusion/physical-plan/src/joins/grace_hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/grace_hash_join/stream.rs @@ -265,6 +265,7 @@ fn load_partition_async( ) -> OnceFut> { OnceFut::new(async move { let mut all_batches = Vec::new(); + println!("partitions {:?}", partitions); for p in partitions { for chunk in p.chunks { let mut reader = spill_manager.load_spilled_batch(&chunk)?; From 2e85309b75659088f2989f6213164aedfe0df97b Mon Sep 17 00:00:00 2001 From: osipovartem Date: Fri, 7 Nov 2025 21:01:59 +0300 Subject: [PATCH 6/7] upd --- .../src/joins/grace_hash_join/exec.rs | 231 +++++++++--------- .../src/joins/grace_hash_join/stream.rs | 102 ++++---- datafusion/physical-plan/src/joins/mod.rs | 2 +- datafusion/physical-plan/src/spill/mod.rs | 18 +- .../physical-plan/src/spill/spill_manager.rs | 6 +- 5 files changed, 179 insertions(+), 180 deletions(-) diff --git a/datafusion/physical-plan/src/joins/grace_hash_join/exec.rs b/datafusion/physical-plan/src/joins/grace_hash_join/exec.rs index 0fca133f79d7..0782d9c84fc1 100644 --- a/datafusion/physical-plan/src/joins/grace_hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/grace_hash_join/exec.rs @@ -15,52 +15,47 @@ // specific language governing permissions and limitations // under the License. -use std::fmt; -use std::mem::size_of; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, OnceLock}; -use std::{any::Any, vec}; -use std::fmt::{format, Formatter}; use crate::execution_plan::{boundedness_from_children, EmissionType}; use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, }; use crate::joins::utils::{ - asymmetric_join_output_partitioning, reorder_output_after_swap, swap_join_projection, - update_hash, OnceAsync, OnceFut, + reorder_output_after_swap, swap_join_projection, OnceFut, }; -use crate::joins::{JoinOn, JoinOnRef, PartitionMode, SharedBitmapBuilder}; +use crate::joins::{JoinOn, JoinOnRef, PartitionMode}; use crate::projection::{ try_embed_projection, try_pushdown_through_join, EmbeddedProjection, JoinData, ProjectionExec, }; use crate::spill::get_record_batch_memory_size; -use crate::{displayable, ExecutionPlanProperties, SpillManager}; use crate::{ common::can_project, joins::utils::{ build_join_schema, check_join_is_valid, estimate_join_statistics, - need_produce_result_in_final, symmetric_join_output_partitioning, - BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType, + symmetric_join_output_partitioning, + BuildProbeJoinMetrics, ColumnIndex, JoinFilter, }, metrics::{ExecutionPlanMetricsSet, MetricsSet}, - DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, SendableRecordBatchStream, Statistics, }; +use crate::{ExecutionPlanProperties, SpillManager}; +use std::fmt; +use std::fmt::Formatter; +use std::sync::{Arc, OnceLock}; +use std::{any::Any, vec}; -use arrow::array::{ArrayRef, BooleanBufferBuilder, UInt32Array}; -use arrow::compute::{concat, concat_batches, take}; +use arrow::array::UInt32Array; +use arrow::compute::{concat_batches, take}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use arrow::util::bit_util; -use arrow_schema::DataType; use datafusion_common::config::ConfigOptions; -use datafusion_common::utils::memory::estimate_memory_size; -use datafusion_common::{internal_datafusion_err, internal_err, plan_err, project_schema, DataFusionError, JoinSide, JoinType, NullEquality, Result}; -use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; +use datafusion_common::{ + internal_err, plan_err, project_schema, JoinSide, JoinType, + NullEquality, Result, +}; use datafusion_execution::TaskContext; -use datafusion_expr::{Accumulator, UserDefinedLogicalNode}; use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator}; use datafusion_physical_expr::equivalence::{ join_equivalence_properties, ProjectionMapping, @@ -68,19 +63,16 @@ use datafusion_physical_expr::equivalence::{ use datafusion_physical_expr::expressions::{lit, DynamicFilterPhysicalExpr}; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; -use ahash::RandomState; -use arrow_ord::partition::partition; -use datafusion_physical_expr_common::physical_expr::fmt_sql; -use futures::{StreamExt, TryStreamExt}; -use futures::executor::block_on; -use parking_lot::Mutex; -use datafusion_common::hash_utils::create_hashes; -use datafusion_execution::runtime_env::RuntimeEnv; -use crate::empty::EmptyExec; -use crate::joins::grace_hash_join::stream::{GraceAccumulator, GraceHashJoinStream, SpillFut}; +use crate::joins::grace_hash_join::stream::{ + GraceAccumulator, GraceHashJoinStream, SpillFut, +}; use crate::joins::hash_join::shared_bounds::SharedBoundsAccumulator; use crate::metrics::SpillMetrics; -use crate::spill::spill_manager::{GetSlicedSize, SpillLocation}; +use crate::spill::spill_manager::SpillLocation; +use ahash::RandomState; +use datafusion_common::hash_utils::create_hashes; +use datafusion_physical_expr_common::physical_expr::fmt_sql; +use futures::StreamExt; /// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions. const HASH_JOIN_SEED: RandomState = @@ -148,7 +140,6 @@ impl fmt::Debug for GraceHashJoinExec { } } - impl EmbeddedProjection for GraceHashJoinExec { fn with_projection(&self, projection: Option>) -> Result { self.with_projection(projection) @@ -331,7 +322,8 @@ impl GraceHashJoinExec { on, )?; - let mut output_partitioning = symmetric_join_output_partitioning(left, right, &join_type)?; + let mut output_partitioning = + symmetric_join_output_partitioning(left, right, &join_type)?; let emission_type = if left.boundedness().is_unbounded() { EmissionType::Final } else if right.pipeline_behavior() == EmissionType::Incremental { @@ -399,7 +391,7 @@ impl GraceHashJoinExec { /// insert `RepartitionExec` operators). pub fn swap_inputs( &self, - partition_mode: PartitionMode, + _partition_mode: PartitionMode, ) -> Result> { let left = self.left(); let right = self.right(); @@ -677,8 +669,11 @@ impl ExecutionPlan for GraceHashJoinExec { spill_left_clone, spill_right_clone, partition, - ).await?; - accumulator_clone.report_partition(partition, left_idx.clone(), right_idx.clone()).await; + ) + .await?; + accumulator_clone + .report_partition(partition, left_idx.clone(), right_idx.clone()) + .await; Ok(SpillFut::new(partition, left_idx, right_idx)) }); @@ -710,9 +705,6 @@ impl ExecutionPlan for GraceHashJoinExec { if partition.is_some() { return Ok(Statistics::new_unknown(&self.schema())); } - // TODO stats: it is not possible in general to know the output size of joins - // There are some special cases though, for example: - // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` let stats = estimate_join_statistics( self.left.partition_statistics(None)?, self.right.partition_statistics(None)?, @@ -895,31 +887,31 @@ pub async fn partition_and_spill( let on_left: Vec<_> = on.iter().map(|(l, _)| Arc::clone(l)).collect(); let on_right: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect(); - // === LEFT side partitioning === + // LEFT side partitioning let left_index = partition_and_spill_one_side( &mut left_stream, &on_left, &random_state, partition_count, spill_left, + &format!("left_{partition}"), &join_metrics, enable_dynamic_filter_pushdown, - &format!("left_{partition}"), ) - .await?; + .await?; - // === RIGHT side partitioning === + // RIGHT side partitioning let right_index = partition_and_spill_one_side( &mut right_stream, &on_right, &random_state, partition_count, spill_right, + &format!("right_{partition}"), &join_metrics, enable_dynamic_filter_pushdown, - &format!("right_{partition}"), ) - .await?; + .await?; Ok((left_index, right_index)) } @@ -929,23 +921,22 @@ async fn partition_and_spill_one_side( random_state: &RandomState, partition_count: usize, spill_manager: Arc, + spilling_request_msg: &str, join_metrics: &BuildProbeJoinMetrics, - enable_dynamic_filter_pushdown: bool, - file_request_msg: &str, + _enable_dynamic_filter_pushdown: bool, ) -> Result> { let mut partitions: Vec = (0..partition_count) .map(|_| PartitionWriter::new(Arc::clone(&spill_manager))) .collect(); let mut buffered_batches = Vec::new(); - let mut total_rows = 0usize; + let schema = input.schema(); while let Some(batch) = input.next().await { let batch = batch?; if batch.num_rows() == 0 { continue; } - total_rows += batch.num_rows(); join_metrics.build_input_batches.add(1); join_metrics.build_input_rows.add(batch.num_rows()); buffered_batches.push(batch); @@ -990,7 +981,8 @@ async fn partition_and_spill_one_side( .collect::>>()?; let part_batch = RecordBatch::try_new(single_batch.schema(), taken)?; - let request_msg = format!("grace_partition_{file_request_msg}_{i}"); + // We need unique name for spilling + let request_msg = format!("grace_partition_{spilling_request_msg}_{i}"); partitions[i].spill_batch_auto(&part_batch, &request_msg)?; } @@ -999,7 +991,7 @@ async fn partition_and_spill_one_side( for (i, writer) in partitions.into_iter().enumerate() { result.push(writer.finish(i)?); } - println!("spill_manager {:?}", spill_manager.metrics); + // println!("spill_manager {:?}", spill_manager.metrics); Ok(result) } @@ -1021,7 +1013,11 @@ impl PartitionWriter { } } - pub fn spill_batch_auto(&mut self, batch: &RecordBatch, request_msg: &str) -> Result<()> { + pub fn spill_batch_auto( + &mut self, + batch: &RecordBatch, + request_msg: &str, + ) -> Result<()> { let loc = self.spill_manager.spill_batch_auto(batch, request_msg)?; self.total_rows += batch.num_rows(); self.total_bytes += get_record_batch_memory_size(batch); @@ -1066,32 +1062,16 @@ pub struct PartitionIndex { #[cfg(test)] mod tests { use super::*; - use crate::coalesce_partitions::CoalescePartitionsExec; - use crate::test::{assert_join_metrics, TestMemoryExec}; + use crate::test::TestMemoryExec; use crate::{ common, expressions::Column, repartition::RepartitionExec, test::build_table_i32, - test::exec::MockExec, }; - use arrow::array::{Date32Array, Int32Array, StructArray, UInt32Array, UInt64Array}; - use arrow::buffer::NullBuffer; - use arrow::datatypes::{DataType, Field}; - use arrow::util::pretty::print_batches; - use arrow_schema::Schema; - use futures::future; - use datafusion_common::hash_utils::create_hashes; - use datafusion_common::test_util::{batches_to_sort_string, batches_to_string}; - use datafusion_common::{ - assert_batches_eq, assert_batches_sorted_eq, assert_contains, exec_err, - ScalarValue, - }; - use datafusion_execution::config::SessionConfig; - use hashbrown::HashTable; - use insta::{allow_duplicates, assert_snapshot}; - use rstest::*; - use rstest_reuse::*; - use datafusion_execution::runtime_env::RuntimeEnvBuilder; use crate::joins::HashJoinExec; + use arrow::array::{ArrayRef, Int32Array}; + use datafusion_execution::runtime_env::RuntimeEnvBuilder; + use datafusion_physical_expr::Partitioning; + use futures::future; fn build_large_table( a_name: &str, @@ -1100,18 +1080,30 @@ mod tests { n: usize, ) -> Arc { let a: ArrayRef = Arc::new(Int32Array::from_iter_values(1..=n as i32)); - let b: ArrayRef = Arc::new(Int32Array::from_iter_values((1..=n as i32).map(|x| x * 2))); - let c: ArrayRef = Arc::new(Int32Array::from_iter_values((1..=n as i32).map(|x| x * 10))); + let b: ArrayRef = + Arc::new(Int32Array::from_iter_values((1..=n as i32).map(|x| x * 2))); + let c: ArrayRef = + Arc::new(Int32Array::from_iter_values((1..=n as i32).map(|x| x * 10))); let schema = Arc::new(arrow::datatypes::Schema::new(vec![ - arrow::datatypes::Field::new(a_name, arrow::datatypes::DataType::Int32, false), - arrow::datatypes::Field::new(b_name, arrow::datatypes::DataType::Int32, false), - arrow::datatypes::Field::new(c_name, arrow::datatypes::DataType::Int32, false), + arrow::datatypes::Field::new( + a_name, + arrow::datatypes::DataType::Int32, + false, + ), + arrow::datatypes::Field::new( + b_name, + arrow::datatypes::DataType::Int32, + false, + ), + arrow::datatypes::Field::new( + c_name, + arrow::datatypes::DataType::Int32, + false, + ), ])); let batch = RecordBatch::try_new(Arc::clone(&schema), vec![a, b, c]).unwrap(); - - // MemoryExec требует список партиций: Vec> Arc::new(TestMemoryExec::try_new(&[vec![batch]], schema, None).unwrap()) } @@ -1126,8 +1118,7 @@ mod tests { } #[tokio::test] - async fn single_partition_join_overallocation() -> Result<()> - { + async fn simple_grace_hash_join() -> Result<()> { // let left = build_table( // ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]), // ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]), @@ -1138,27 +1129,28 @@ mod tests { // ("b2", &vec![1, 2]), // ("c2", &vec![14, 15]), // ); - let left = build_large_table("a1", "b1", "c1", 200); - let right = build_large_table("a2", "b2", "c2", 500); + let left = build_large_table("a1", "b1", "c1", 2000000); + let right = build_large_table("a2", "b2", "c2", 5000000); let on = vec![( - Arc::new(Column::new_with_schema("a1", &left.schema()).unwrap()) as _, - Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _, + Arc::new(Column::new_with_schema("a1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b2", &right.schema())?) as _, )]; - let (left_expr, right_expr): (Vec>, Vec>) = on + let (left_expr, right_expr): ( + Vec>, + Vec>, + ) = on .iter() .map(|(l, r)| (Arc::clone(l), Arc::clone(r))) .unzip(); - let left_repartitioned: Arc = Arc::new(RepartitionExec::try_new( - left, - Partitioning::Hash(left_expr, 32), - )?); - let right_repartitioned: Arc = Arc::new(RepartitionExec::try_new( - right, - Partitioning::Hash(right_expr, 32), - )?); + let left_repartitioned: Arc = Arc::new( + RepartitionExec::try_new(left, Partitioning::Hash(left_expr, 32))?, + ); + let right_repartitioned: Arc = Arc::new( + RepartitionExec::try_new(right, Partitioning::Hash(right_expr, 32))?, + ); let runtime = RuntimeEnvBuilder::new() - .with_memory_limit(50_000_000_000, 1.0) + .with_memory_limit(500_000_000, 1.0) .build_arc()?; let task_ctx = TaskContext::default().with_runtime(runtime); let task_ctx = Arc::new(task_ctx); @@ -1174,8 +1166,6 @@ mod tests { )?; let partition_count = right_repartitioned.output_partitioning().partition_count(); - println!("partition_count {partition_count}"); - let tasks: Vec<_> = (0..partition_count) .map(|i| { let ctx = Arc::clone(&task_ctx); @@ -1191,8 +1181,10 @@ mod tests { v.retain(|b| b.num_rows() > 0); batches.extend(v); } + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + println!("TOTAL ROWS = {}", total_rows); - print_batches(&*batches).unwrap(); + // print_batches(&*batches).unwrap(); // Asserting that operator-level reservation attempting to overallocate // assert_contains!( // err.to_string(), @@ -1207,26 +1199,27 @@ mod tests { } #[tokio::test] - async fn single_partition_join_overallocation_f() -> Result<()> { - - let left = build_large_table("a1", "b1", "c1", 200); - let right = build_large_table("a2", "b2", "c2", 500); + async fn simple_hash_join() -> Result<()> { + let left = build_large_table("a1", "b1", "c1", 2000000); + let right = build_large_table("a2", "b2", "c2", 5000000); let on = vec![( - Arc::new(Column::new_with_schema("a1", &left.schema()).unwrap()) as _, - Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _, + Arc::new(Column::new_with_schema("a1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b2", &right.schema())?) as _, )]; - let (left_expr, right_expr): (Vec>, Vec>) = on + let (left_expr, right_expr): ( + Vec>, + Vec>, + ) = on .iter() .map(|(l, r)| (Arc::clone(l), Arc::clone(r))) .unzip(); - let left_repartitioned: Arc = Arc::new(RepartitionExec::try_new( - left, - Partitioning::Hash(left_expr, 32), - )?); - let right_repartitioned: Arc = Arc::new(RepartitionExec::try_new( - right, - Partitioning::Hash(right_expr, 32), - )?); + let left_repartitioned: Arc = Arc::new( + RepartitionExec::try_new(left, Partitioning::Hash(left_expr, 32))?, + ); + let right_repartitioned: Arc = Arc::new( + RepartitionExec::try_new(right, Partitioning::Hash(right_expr, 32))?, + ); + let partition_count = left_repartitioned.output_partitioning().partition_count(); let join = HashJoinExec::try_new( left_repartitioned, @@ -1241,7 +1234,7 @@ mod tests { let task_ctx = Arc::new(TaskContext::default()); let mut batches = vec![]; - for i in 0..32 { + for i in 0..partition_count { let stream = join.execute(i, Arc::clone(&task_ctx))?; let more_batches = common::collect(stream).await?; batches.extend( @@ -1251,8 +1244,10 @@ mod tests { .collect::>(), ); } - print_batches(&*batches).unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + println!("TOTAL ROWS = {}", total_rows); + + // print_batches(&*batches).unwrap(); Ok(()) } - } diff --git a/datafusion/physical-plan/src/joins/grace_hash_join/stream.rs b/datafusion/physical-plan/src/joins/grace_hash_join/stream.rs index 97e48fc81a36..a6afdef1029d 100644 --- a/datafusion/physical-plan/src/joins/grace_hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/grace_hash_join/stream.rs @@ -24,33 +24,24 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use crate::joins::utils::{ - equal_rows_arr, get_final_indices_from_shared_bitmap, OnceFut, +use crate::joins::utils::OnceFut; +use crate::{ + joins::utils::{BuildProbeJoinMetrics, ColumnIndex, JoinFilter}, + ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, SpillManager, }; -use crate::{handle_state, hash_utils::create_hashes, joins::join_hash_map::JoinHashMapOffset, joins::utils::{ - adjust_indices_by_join_type, apply_join_filter_to_indices, - build_batch_empty_build_side, build_batch_from_indices, - need_produce_result_in_final, BuildProbeJoinMetrics, ColumnIndex, JoinFilter, - JoinHashMapType, StatefulStreamResult, -}, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, SpillManager}; - -use arrow::array::{ArrayRef, UInt32Array, UInt64Array}; -use arrow::datatypes::{Schema, SchemaRef}; -use arrow::record_batch::RecordBatch; -use datafusion_common::{ - internal_datafusion_err, internal_err, JoinSide, JoinType, NullEquality, Result, -}; -use datafusion_physical_expr::PhysicalExprRef; -use ahash::RandomState; -use futures::{ready, FutureExt, Stream, StreamExt}; -use tokio::sync::Mutex; -use datafusion_execution::TaskContext; + use crate::empty::EmptyExec; use crate::joins::grace_hash_join::exec::PartitionIndex; use crate::joins::{HashJoinExec, PartitionMode}; -use crate::memory::MemoryStream; -use crate::stream::RecordBatchStreamAdapter; use crate::test::TestMemoryExec; +use ahash::RandomState; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use datafusion_common::{JoinType, NullEquality, Result}; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::PhysicalExprRef; +use futures::{ready, Stream, StreamExt}; +use tokio::sync::Mutex; enum GraceJoinState { /// Waiting for the partitioning phase (Phase 1) to finish @@ -72,19 +63,6 @@ enum GraceJoinState { Done, } -/// Container for HashJoinStreamState::ProcessProbeBatch related data -#[derive(Debug, Clone)] -pub(super) struct ProcessProbeBatchState { - /// Current probe-side batch - batch: RecordBatch, - /// Probe-side on expressions values - values: Vec, - /// Starting offset for JoinHashMap lookups - offset: JoinHashMapOffset, - /// Max joined probe-side index from current batch - joined_probe_idx: Option, -} - pub struct GraceHashJoinStream { schema: SchemaRef, spill_fut: OnceFut, @@ -105,10 +83,14 @@ pub struct GraceHashJoinStream { pub struct SpillFut { partition: usize, left: Vec, - right: Vec + right: Vec, } impl SpillFut { - pub(crate) fn new(partition: usize, left: Vec, right: Vec) -> Self { + pub(crate) fn new( + partition: usize, + left: Vec, + right: Vec, + ) -> Self { SpillFut { partition, left, @@ -156,7 +138,10 @@ impl GraceHashJoinStream { } /// Core state machine logic (poll implementation) - fn poll_next_impl(&mut self, cx: &mut Context<'_>) -> Poll>> { + fn poll_next_impl( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { loop { match &mut self.state { GraceJoinState::WaitPartitioning => { @@ -165,6 +150,7 @@ impl GraceHashJoinStream { let acc = Arc::clone(&self.accumulator); let left = shared.left.clone(); let right = shared.right.clone(); + // Use 0 partition as the main let wait_all_fut = if shared.partition == 0 { OnceFut::new(async move { acc.report_partition(shared.partition, left, right).await; @@ -178,7 +164,9 @@ impl GraceHashJoinStream { Ok(vec![]) }) }; - self.state = GraceJoinState::WaitAllPartitions { wait_all_fut: Some(wait_all_fut) }; + self.state = GraceJoinState::WaitAllPartitions { + wait_all_fut: Some(wait_all_fut), + }; continue; } GraceJoinState::WaitAllPartitions { wait_all_fut } => { @@ -215,12 +203,21 @@ impl GraceHashJoinStream { if current_stream.is_none() { if left_fut.is_none() && right_fut.is_none() { let spill_fut = &all_parts[*current]; - *left_fut = Some(load_partition_async(Arc::clone(&self.spill_left), spill_fut.left.clone())); - *right_fut = Some(load_partition_async(Arc::clone(&self.spill_right), spill_fut.right.clone())); + *left_fut = Some(load_partition_async( + Arc::clone(&self.spill_left), + spill_fut.left.clone(), + )); + *right_fut = Some(load_partition_async( + Arc::clone(&self.spill_right), + spill_fut.right.clone(), + )); } - let left_batches = (*ready!(left_fut.as_mut().unwrap().get_shared(cx))?).clone(); - let right_batches = (*ready!(right_fut.as_mut().unwrap().get_shared(cx))?).clone(); + let left_batches = + (*ready!(left_fut.as_mut().unwrap().get_shared(cx))?).clone(); + let right_batches = + (*ready!(right_fut.as_mut().unwrap().get_shared(cx))?) + .clone(); let stream = build_in_memory_join_stream( Arc::clone(&self.schema), @@ -265,7 +262,7 @@ fn load_partition_async( ) -> OnceFut> { OnceFut::new(async move { let mut all_batches = Vec::new(); - println!("partitions {:?}", partitions); + for p in partitions { for chunk in p.chunks { let mut reader = spill_manager.load_spilled_batch(&chunk)?; @@ -299,18 +296,21 @@ fn build_in_memory_join_stream( let left_schema = left_batches .first() .map(|b| b.schema()) - .unwrap_or_else(|| Arc::new(arrow_schema::Schema::empty())); + .unwrap_or_else(|| Arc::new(Schema::empty())); let right_schema = right_batches .first() .map(|b| b.schema()) - .unwrap_or_else(|| Arc::new(arrow_schema::Schema::empty())); + .unwrap_or_else(|| Arc::new(Schema::empty())); // Build memory execution nodes for each side let left_plan: Arc = Arc::new(TestMemoryExec::try_new(&[left_batches], left_schema, None)?); - let right_plan: Arc = - Arc::new(TestMemoryExec::try_new(&[right_batches], right_schema, None)?); + let right_plan: Arc = Arc::new(TestMemoryExec::try_new( + &[right_batches], + right_schema, + None, + )?); // Combine join expressions into pairs let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = on_left @@ -380,9 +380,7 @@ impl GraceAccumulator { } } - pub async fn wait_all( - &self, - ) -> Vec { + pub async fn wait_all(&self) -> Vec { loop { { let guard = self.collected.lock().await; @@ -404,4 +402,4 @@ impl GraceAccumulator { self.notify.notified().await; } } -} \ No newline at end of file +} diff --git a/datafusion/physical-plan/src/joins/mod.rs b/datafusion/physical-plan/src/joins/mod.rs index 14429ec55182..8ee4c3de430a 100644 --- a/datafusion/physical-plan/src/joins/mod.rs +++ b/datafusion/physical-plan/src/joins/mod.rs @@ -27,8 +27,8 @@ use parking_lot::Mutex; pub use sort_merge_join::SortMergeJoinExec; pub use symmetric_hash_join::SymmetricHashJoinExec; mod cross_join; -mod hash_join; mod grace_hash_join; +mod hash_join; mod nested_loop_join; mod sort_merge_join; diff --git a/datafusion/physical-plan/src/spill/mod.rs b/datafusion/physical-plan/src/spill/mod.rs index 9d19e72cb379..782100e6d4cf 100644 --- a/datafusion/physical-plan/src/spill/mod.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -17,9 +17,9 @@ //! Defines the spilling functions +pub(crate) mod in_memory_spill_buffer; pub(crate) mod in_progress_spill_file; pub(crate) mod spill_manager; -pub(crate) mod in_memory_spill_buffer; use std::fs::File; use std::io::BufReader; @@ -387,8 +387,8 @@ mod tests { use datafusion_execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder}; use futures::StreamExt as _; - use std::sync::Arc; use datafusion_execution::memory_pool::{FairSpillPool, MemoryPool}; + use std::sync::Arc; #[tokio::test] async fn test_batch_spill_and_read() -> Result<()> { @@ -456,8 +456,8 @@ mod tests { // --- create small memory pool (simulate memory pressure) --- let memory_limit_bytes = 20 * 1024; // 20KB - let memory_pool: Arc = Arc::new(FairSpillPool::new(memory_limit_bytes)); - + let memory_pool: Arc = + Arc::new(FairSpillPool::new(memory_limit_bytes)); // Construct SpillManager let env = RuntimeEnvBuilder::new() @@ -469,8 +469,14 @@ mod tests { let results = spill_manager.spill_batches_auto(&batches, "TestAutoSpill")?; assert_eq!(results.len(), 2); - let mem_count = results.iter().filter(|r| matches!(r, SpillLocation::Memory(_))).count(); - let disk_count = results.iter().filter(|r| matches!(r, SpillLocation::Disk(_))).count(); + let mem_count = results + .iter() + .filter(|r| matches!(r, SpillLocation::Memory(_))) + .count(); + let disk_count = results + .iter() + .filter(|r| matches!(r, SpillLocation::Disk(_))) + .count(); assert!(mem_count >= 1); assert!(disk_count >= 1); diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs b/datafusion/physical-plan/src/spill/spill_manager.rs index 2c4d6b7844cd..10f90c6cb349 100644 --- a/datafusion/physical-plan/src/spill/spill_manager.rs +++ b/datafusion/physical-plan/src/spill/spill_manager.rs @@ -179,7 +179,7 @@ impl SpillManager { )); }; Ok(SpillLocation::Disk(Arc::new(file))) - // + // // // let size = batch.get_sliced_size()?; // // // Check current memory usage and total limit from the runtime memory pool @@ -188,9 +188,9 @@ impl SpillManager { // datafusion_execution::memory_pool::MemoryLimit::Finite(l) => l, // _ => usize::MAX, // }; - // + // println!("size {size} used {used}"); // // If there's enough memory (with a small safety margin), keep it in memory - // if used + size * 3 / 2 <= limit { + // if used + size * 3 * 64 / 2 <= limit { // let buf = Arc::new(InMemorySpillBuffer::from_batch(batch)?); // self.metrics.spilled_bytes.add(size); // self.metrics.spilled_rows.add(batch.num_rows()); From 4b0a66ba9f39c3307dd0d4adc7b6eb617fab104b Mon Sep 17 00:00:00 2001 From: osipovartem Date: Mon, 10 Nov 2025 12:58:02 +0300 Subject: [PATCH 7/7] Pass projection and filter --- .../src/joins/grace_hash_join/exec.rs | 22 ++----- .../src/joins/grace_hash_join/stream.rs | 24 ++++---- .../physical-plan/src/spill/spill_manager.rs | 60 +++++++++---------- 3 files changed, 48 insertions(+), 58 deletions(-) diff --git a/datafusion/physical-plan/src/joins/grace_hash_join/exec.rs b/datafusion/physical-plan/src/joins/grace_hash_join/exec.rs index 0782d9c84fc1..2c9482f93f89 100644 --- a/datafusion/physical-plan/src/joins/grace_hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/grace_hash_join/exec.rs @@ -684,7 +684,8 @@ impl ExecutionPlan for GraceHashJoinExec { spill_right, on_left, on_right, - self.random_state.clone(), + self.projection.clone(), + self.filter.clone(), self.join_type, column_indices_after_projection, join_metrics, @@ -854,24 +855,8 @@ impl ExecutionPlan for GraceHashJoinExec { } } -/// Accumulator for collecting min/max bounds from build-side data during hash join. -/// -/// This struct encapsulates the logic for progressively computing column bounds -/// (minimum and maximum values) for a specific join key expression as batches -/// are processed during the build phase of a hash join. -/// -/// The bounds are used for dynamic filter pushdown optimization, where filters -/// based on the actual data ranges can be pushed down to the probe side to -/// eliminate unnecessary data early. -struct CollectLeftAccumulator { - /// The physical expression to evaluate for each batch - expr: Arc, - /// Accumulator for tracking the minimum value across all batches - min: MinAccumulator, - /// Accumulator for tracking the maximum value across all batches - max: MaxAccumulator, -} +#[allow(clippy::too_many_arguments)] pub async fn partition_and_spill( random_state: RandomState, on: Vec<(PhysicalExprRef, PhysicalExprRef)>, @@ -915,6 +900,7 @@ pub async fn partition_and_spill( Ok((left_index, right_index)) } +#[allow(clippy::too_many_arguments)] async fn partition_and_spill_one_side( input: &mut SendableRecordBatchStream, on_exprs: &[PhysicalExprRef], diff --git a/datafusion/physical-plan/src/joins/grace_hash_join/stream.rs b/datafusion/physical-plan/src/joins/grace_hash_join/stream.rs index a6afdef1029d..d028b0e8bcf8 100644 --- a/datafusion/physical-plan/src/joins/grace_hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/grace_hash_join/stream.rs @@ -34,7 +34,6 @@ use crate::empty::EmptyExec; use crate::joins::grace_hash_join::exec::PartitionIndex; use crate::joins::{HashJoinExec, PartitionMode}; use crate::test::TestMemoryExec; -use ahash::RandomState; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::{JoinType, NullEquality, Result}; @@ -70,7 +69,8 @@ pub struct GraceHashJoinStream { spill_right: Arc, on_left: Vec, on_right: Vec, - random_state: RandomState, + projection: Option>, + filter: Option, join_type: JoinType, column_indices: Vec, join_metrics: Arc, @@ -113,7 +113,8 @@ impl GraceHashJoinStream { spill_right: Arc, on_left: Vec, on_right: Vec, - random_state: RandomState, + projection: Option>, + filter: Option, join_type: JoinType, column_indices: Vec, join_metrics: Arc, @@ -127,7 +128,8 @@ impl GraceHashJoinStream { spill_right, on_left, on_right, - random_state, + projection, + filter, join_type, column_indices, join_metrics, @@ -225,7 +227,8 @@ impl GraceHashJoinStream { right_batches, &self.on_left, &self.on_right, - self.random_state.clone(), + self.projection.clone(), + self.filter.clone(), self.join_type, &self.column_indices, &self.join_metrics, @@ -283,10 +286,11 @@ fn build_in_memory_join_stream( right_batches: Vec, on_left: &[PhysicalExprRef], on_right: &[PhysicalExprRef], - random_state: RandomState, + projection: Option>, + filter: Option, join_type: JoinType, - column_indices: &[ColumnIndex], - join_metrics: &BuildProbeJoinMetrics, + _column_indices: &[ColumnIndex], + _join_metrics: &BuildProbeJoinMetrics, context: &Arc, ) -> Result { if left_batches.is_empty() && right_batches.is_empty() { @@ -324,9 +328,9 @@ fn build_in_memory_join_stream( left_plan, right_plan, on, - None::, + filter, &join_type, - None, + projection, PartitionMode::CollectLeft, NullEquality::NullEqualsNothing, )?; diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs b/datafusion/physical-plan/src/spill/spill_manager.rs index 10f90c6cb349..d3c38aba4598 100644 --- a/datafusion/physical-plan/src/spill/spill_manager.rs +++ b/datafusion/physical-plan/src/spill/spill_manager.rs @@ -173,37 +173,37 @@ impl SpillManager { /// Automatically decides whether to spill the given RecordBatch to memory or disk, /// depending on available memory pool capacity. pub(crate) fn spill_batch_auto(&self, batch: &RecordBatch, request_msg: &str) -> Result { - let Some(file) = self.spill_record_batch_and_finish(slice::from_ref(batch), request_msg)? else { - return Err(DataFusionError::Execution( - "failed to spill batch to disk".into(), - )); - }; - Ok(SpillLocation::Disk(Arc::new(file))) - // // - // let size = batch.get_sliced_size()?; - // - // // Check current memory usage and total limit from the runtime memory pool - // let used = self.env.memory_pool.reserved(); - // let limit = match self.env.memory_pool.memory_limit() { - // datafusion_execution::memory_pool::MemoryLimit::Finite(l) => l, - // _ => usize::MAX, + // let Some(file) = self.spill_record_batch_and_finish(slice::from_ref(batch), request_msg)? else { + // return Err(DataFusionError::Execution( + // "failed to spill batch to disk".into(), + // )); // }; - // println!("size {size} used {used}"); - // // If there's enough memory (with a small safety margin), keep it in memory - // if used + size * 3 * 64 / 2 <= limit { - // let buf = Arc::new(InMemorySpillBuffer::from_batch(batch)?); - // self.metrics.spilled_bytes.add(size); - // self.metrics.spilled_rows.add(batch.num_rows()); - // Ok(SpillLocation::Memory(buf)) - // } else { - // // Otherwise spill to disk using the existing SpillManager logic - // let Some(file) = self.spill_record_batch_and_finish(slice::from_ref(batch), request_msg)? else { - // return Err(DataFusionError::Execution( - // "failed to spill batch to disk".into(), - // )); - // }; - // Ok(SpillLocation::Disk(Arc::new(file))) - // } + // Ok(SpillLocation::Disk(Arc::new(file))) + // // + let size = batch.get_sliced_size()?; + + // Check current memory usage and total limit from the runtime memory pool + let used = self.env.memory_pool.reserved(); + let limit = match self.env.memory_pool.memory_limit() { + datafusion_execution::memory_pool::MemoryLimit::Finite(l) => l, + _ => usize::MAX, + }; + + // If there's enough memory (with a safety margin), keep it in memory + if used + size * 3 / 2 <= limit { + let buf = Arc::new(InMemorySpillBuffer::from_batch(batch)?); + self.metrics.spilled_bytes.add(size); + self.metrics.spilled_rows.add(batch.num_rows()); + Ok(SpillLocation::Memory(buf)) + } else { + // Otherwise spill to disk using the existing SpillManager logic + let Some(file) = self.spill_record_batch_and_finish(slice::from_ref(batch), request_msg)? else { + return Err(DataFusionError::Execution( + "failed to spill batch to disk".into(), + )); + }; + Ok(SpillLocation::Disk(Arc::new(file))) + } } pub fn spill_batches_auto(