Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions apalis-workflow/src/dag/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,20 @@ use crate::dag::context::DagFlowContext;
use crate::dag::response::DagExecutionResponse;
use crate::id_generator::GenerateId;

/// Determine if the previous node is the designated predecessor in a fan-in scenario
fn is_designated_fan_in_predecessor(
incoming_nodes: &[NodeIndex],
prev_node: Option<NodeIndex>,
) -> bool {
if incoming_nodes.is_empty() {
return false;
}

let designated_parent = incoming_nodes.iter().min_by_key(|n| n.index()).copied();

prev_node == designated_parent
}

/// Service that manages the execution of a DAG workflow
pub struct RootDagService<B>
where
Expand Down Expand Up @@ -148,6 +162,13 @@ where
.await?;
// TODO(bug): The check of done is not a good one as it can be called more than once if the jobs a too quickly executed
if results.iter().all(|s| matches!(s.status, Status::Done)) {
// ===== FIX START =====
if !is_designated_fan_in_predecessor(&incoming_nodes, context.prev_node) {
return Ok(DagExecutionResponse::WaitingForDependencies {
pending_dependencies: dependency_task_ids,
});
}
// ===== FIX END =====
let sorted_results = {
// Match the order of incoming_nodes by matching NodeIndex
let res = incoming_nodes
Expand Down Expand Up @@ -409,3 +430,38 @@ where
futures::future::try_join_all(enqueue_futures).await?;
Ok(next_nodes)
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn designated_predecessor_is_smallest_nodeindex() {
let a = NodeIndex::new(1);
let b = NodeIndex::new(5);
let incoming = vec![b, a]; // unordered input

assert!(is_designated_fan_in_predecessor(&incoming, Some(a)));
assert!(!is_designated_fan_in_predecessor(&incoming, Some(b)));
}

#[test]
fn no_prev_node_is_never_designated() {
let a = NodeIndex::new(1);
let b = NodeIndex::new(2);
let incoming = vec![a, b];

assert!(!is_designated_fan_in_predecessor(&incoming, None));
}

#[test]
fn empty_incoming_has_no_designated() {
let incoming: Vec<NodeIndex> = vec![];

assert!(!is_designated_fan_in_predecessor(
&incoming,
Some(NodeIndex::new(0))
));
assert!(!is_designated_fan_in_predecessor(&incoming, None));
}
}