From a55b637de506c7cf3356d3bd4b3a0dcb6d9e1d46 Mon Sep 17 00:00:00 2001 From: nextn Date: Fri, 30 Jan 2026 19:47:02 +0630 Subject: [PATCH] workflow: avoid duplicate fan-in execution from concurrent predecessors --- apalis-workflow/src/dag/service.rs | 56 ++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/apalis-workflow/src/dag/service.rs b/apalis-workflow/src/dag/service.rs index 43014d10..e8ae088e 100644 --- a/apalis-workflow/src/dag/service.rs +++ b/apalis-workflow/src/dag/service.rs @@ -21,6 +21,20 @@ use crate::dag::context::DagFlowContext; use crate::dag::response::DagExecutionResponse; use crate::id_generator::GenerateId; +/// Determine if the previous node is the designated predecessor in a fan-in scenario +fn is_designated_fan_in_predecessor( + incoming_nodes: &[NodeIndex], + prev_node: Option, +) -> bool { + if incoming_nodes.is_empty() { + return false; + } + + let designated_parent = incoming_nodes.iter().min_by_key(|n| n.index()).copied(); + + prev_node == designated_parent +} + /// Service that manages the execution of a DAG workflow pub struct RootDagService where @@ -148,6 +162,13 @@ where .await?; // TODO(bug): The check of done is not a good one as it can be called more than once if the jobs a too quickly executed if results.iter().all(|s| matches!(s.status, Status::Done)) { + // ===== FIX START ===== + if !is_designated_fan_in_predecessor(&incoming_nodes, context.prev_node) { + return Ok(DagExecutionResponse::WaitingForDependencies { + pending_dependencies: dependency_task_ids, + }); + } + // ===== FIX END ===== let sorted_results = { // Match the order of incoming_nodes by matching NodeIndex let res = incoming_nodes @@ -409,3 +430,38 @@ where futures::future::try_join_all(enqueue_futures).await?; Ok(next_nodes) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn designated_predecessor_is_smallest_nodeindex() { + let a = NodeIndex::new(1); + let b = NodeIndex::new(5); + let incoming = vec![b, a]; // unordered input + + assert!(is_designated_fan_in_predecessor(&incoming, Some(a))); + assert!(!is_designated_fan_in_predecessor(&incoming, Some(b))); + } + + #[test] + fn no_prev_node_is_never_designated() { + let a = NodeIndex::new(1); + let b = NodeIndex::new(2); + let incoming = vec![a, b]; + + assert!(!is_designated_fan_in_predecessor(&incoming, None)); + } + + #[test] + fn empty_incoming_has_no_designated() { + let incoming: Vec = vec![]; + + assert!(!is_designated_fan_in_predecessor( + &incoming, + Some(NodeIndex::new(0)) + )); + assert!(!is_designated_fan_in_predecessor(&incoming, None)); + } +}