Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Jan 7, 2026

📄 5,297% (52.97x) speedup for find_last_node in src/algorithms/graph.py

⏱️ Runtime : 5.81 milliseconds 108 microseconds (best of 250 runs)

📝 Explanation and details

The optimization achieves a 53x speedup by eliminating nested iteration through a classic algorithmic improvement: replacing O(n×m) complexity with O(n+m) complexity.

What changed:
The original code checks every edge for every node using nested iteration: all(e["source"] != n["id"] for e in edges). The optimized version pre-computes a set of all source node IDs once: sources = {e["source"] for e in edges}, then performs fast O(1) membership lookups: n["id"] not in sources.

Why this is faster:

  1. Set lookup vs. linear scan: Python set membership (in) is O(1) average case using hash tables, while the all() check iterates through all edges for each node, resulting in O(m) per node check
  2. Single pass vs. repeated iteration: The set is built once in O(m) time, then reused for all n nodes. The original code iterates through all m edges for each of the n nodes
  3. Algorithmic complexity reduction: Total complexity drops from O(n×m) to O(n+m), which is dramatically faster as graph size grows

Performance characteristics by test case:

  • Small graphs (2-5 nodes/edges): 26-100% faster - overhead of set creation is negligible
  • Medium graphs (100 nodes/edges): 97-3193% faster - set lookup advantage becomes clear
  • Large graphs (500+ nodes/edges): 418-16229% faster - the O(n×m) vs O(n+m) difference dominates
  • Dense graphs (many edges, few nodes): 112-6874% faster - particularly benefits from avoiding repeated edge iteration

The line profiler confirms this: the original code spent 100% of time (46.7ms) in the nested iteration, while the optimized version spends only 43.8% (149μs) building the set and 56.2% (191μs) doing lookups - a total of 340μs vs 46.7ms.

Impact considerations:
Without function_references, we cannot determine if this function is in a hot path, but given it processes graph structures (potentially in flow/workflow systems based on "flow" in the docstring), any system repeatedly querying for terminal nodes would benefit significantly, especially as graph size scales.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 35 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Click to see Generated Regression Tests
import pytest  # used for our unit tests
from src.algorithms.graph import find_last_node

# unit tests


class TestFindLastNodeBasic:
    """Basic test cases for normal operation"""

    def test_single_node_no_edges(self):
        # Single node with no edges should be the last node
        nodes = [{"id": "A", "name": "Node A"}]
        edges = []
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.21μs -> 958ns (26.2% faster)

    def test_linear_chain_two_nodes(self):
        # In A→B, B should be the last node
        nodes = [{"id": "A"}, {"id": "B"}]
        edges = [{"source": "A", "target": "B"}]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.92μs -> 1.12μs (70.4% faster)

    def test_linear_chain_three_nodes(self):
        # In A→B→C, C should be the last node
        nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
        edges = [{"source": "A", "target": "B"}, {"source": "B", "target": "C"}]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 2.33μs -> 1.12μs (107% faster)

    def test_last_node_with_additional_properties(self):
        # Last node should be returned with all its properties intact
        nodes = [{"id": 1, "type": "start"}, {"id": 2, "type": "end", "extra": "data"}]
        edges = [{"source": 1, "target": 2}]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.75μs -> 1.17μs (50.0% faster)

    def test_last_node_appears_first_in_list(self):
        # Last node appearing first in nodes list should still be found
        nodes = [{"id": "Z"}, {"id": "A"}, {"id": "B"}]
        edges = [{"source": "A", "target": "B"}, {"source": "B", "target": "Z"}]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.58μs -> 1.08μs (46.2% faster)


class TestFindLastNodeEdgeCases:
    """Edge cases and boundary conditions"""

    def test_empty_nodes_list(self):
        # Empty nodes list should return None
        nodes = []
        edges = []
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 750ns -> 833ns (9.96% slower)

    def test_empty_edges_all_nodes_are_last(self):
        # With no edges, first node should be returned as "last"
        nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
        edges = []
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.21μs -> 958ns (26.1% faster)

    def test_all_nodes_have_outgoing_edges_cycle(self):
        # Cycle A→B→A means no node is "last"
        nodes = [{"id": "A"}, {"id": "B"}]
        edges = [{"source": "A", "target": "B"}, {"source": "B", "target": "A"}]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.92μs -> 1.21μs (58.7% faster)

    def test_multiple_valid_last_nodes_returns_first(self):
        # When multiple nodes have no outgoing edges, return first found
        nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
        edges = [{"source": "A", "target": "B"}]
        # Both B and C have no outgoing edges; B appears first
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.83μs -> 1.12μs (62.9% faster)

    def test_string_node_ids(self):
        # Node IDs as strings should work correctly
        nodes = [{"id": "start"}, {"id": "middle"}, {"id": "end"}]
        edges = [
            {"source": "start", "target": "middle"},
            {"source": "middle", "target": "end"},
        ]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 2.33μs -> 1.17μs (99.9% faster)

    def test_integer_node_ids(self):
        # Node IDs as integers should work correctly
        nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
        edges = [{"source": 1, "target": 2}, {"source": 2, "target": 3}]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 2.21μs -> 1.25μs (76.6% faster)

    def test_mixed_type_node_ids(self):
        # Mix of string and integer IDs
        nodes = [{"id": "A"}, {"id": 1}, {"id": "B"}]
        edges = [{"source": "A", "target": 1}, {"source": 1, "target": "B"}]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 2.38μs -> 1.25μs (90.0% faster)

    def test_edges_reference_nonexistent_nodes(self):
        # Edges pointing to nodes not in the list
        nodes = [{"id": "A"}, {"id": "B"}]
        edges = [
            {"source": "A", "target": "C"},  # C doesn't exist
            {"source": "X", "target": "Y"},  # Neither exists
        ]
        # A has outgoing edge, B doesn't
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.92μs -> 1.17μs (64.3% faster)

    def test_self_loop(self):
        # Node with self-loop should not be considered last
        nodes = [{"id": "A"}, {"id": "B"}]
        edges = [
            {"source": "A", "target": "B"},
            {"source": "B", "target": "B"},  # Self-loop
        ]
        # B has outgoing edge to itself, so it's not last
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.92μs -> 1.17μs (64.3% faster)

    def test_node_with_multiple_outgoing_edges(self):
        # Node with multiple outgoing edges is not last
        nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
        edges = [{"source": "A", "target": "B"}, {"source": "A", "target": "C"}]
        # B and C have no outgoing edges; B is first
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.88μs -> 1.21μs (55.2% faster)


class TestFindLastNodeComplexStructures:
    """Complex graph structures"""

    def test_tree_with_multiple_leaves(self):
        # Tree structure where multiple leaves exist
        #     A
        #    / \
        #   B   C
        #  / \
        # D   E
        nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}, {"id": "D"}, {"id": "E"}]
        edges = [
            {"source": "A", "target": "B"},
            {"source": "A", "target": "C"},
            {"source": "B", "target": "D"},
            {"source": "B", "target": "E"},
        ]
        # C, D, E are all leaves; C appears first in nodes list
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 2.50μs -> 1.25μs (100% faster)

    def test_diamond_pattern(self):
        # Diamond: A→B, A→C, B→D, C→D
        nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}, {"id": "D"}]
        edges = [
            {"source": "A", "target": "B"},
            {"source": "A", "target": "C"},
            {"source": "B", "target": "D"},
            {"source": "C", "target": "D"},
        ]
        # Only D has no outgoing edges
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 2.83μs -> 1.29μs (119% faster)

    def test_disconnected_components(self):
        # Two separate chains: A→B and C→D
        nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}, {"id": "D"}]
        edges = [{"source": "A", "target": "B"}, {"source": "C", "target": "D"}]
        # B and D have no outgoing edges; B appears first
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 2.00μs -> 1.17μs (71.4% faster)

    def test_graph_with_cycle_and_sink(self):
        # A→B→C→B (cycle) and A→D (sink)
        nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}, {"id": "D"}]
        edges = [
            {"source": "A", "target": "B"},
            {"source": "B", "target": "C"},
            {"source": "C", "target": "B"},
            {"source": "A", "target": "D"},
        ]
        # Only D has no outgoing edges
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 2.67μs -> 1.29μs (106% faster)

    def test_star_pattern_center_to_periphery(self):
        # Star: A→B, A→C, A→D, A→E
        nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}, {"id": "D"}, {"id": "E"}]
        edges = [
            {"source": "A", "target": "B"},
            {"source": "A", "target": "C"},
            {"source": "A", "target": "D"},
            {"source": "A", "target": "E"},
        ]
        # B, C, D, E all have no outgoing edges; B is first
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 2.00μs -> 1.29μs (54.8% faster)

    def test_converging_paths(self):
        # Multiple paths converge: A→C, B→C
        nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
        edges = [{"source": "A", "target": "C"}, {"source": "B", "target": "C"}]
        # Only C has no outgoing edges
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 2.17μs -> 1.17μs (85.6% faster)


class TestFindLastNodeDataIntegrity:
    """Tests for unusual data structures and integrity issues"""

    def test_nodes_with_none_id(self):
        # Nodes with None as ID
        nodes = [{"id": None}, {"id": "A"}]
        edges = [{"source": "A", "target": "B"}]
        # Node with None ID has no outgoing edge, so it's first valid last node
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.50μs -> 1.08μs (38.5% faster)

    def test_duplicate_node_ids(self):
        # Multiple nodes with same ID
        nodes = [{"id": "A", "data": 1}, {"id": "A", "data": 2}, {"id": "B"}]
        edges = [{"source": "A", "target": "B"}]
        # Both A nodes have outgoing edges, only B doesn't
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 2.04μs -> 1.12μs (81.5% faster)

    def test_edge_with_additional_properties(self):
        # Edges with extra properties shouldn't affect logic
        nodes = [{"id": "A"}, {"id": "B"}]
        edges = [{"source": "A", "target": "B", "weight": 5, "label": "edge1"}]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.79μs -> 1.17μs (53.7% faster)

    def test_nodes_as_simple_dicts(self):
        # Minimal node structure with only id
        nodes = [{"id": 1}, {"id": 2}]
        edges = [{"source": 1, "target": 2}]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.88μs -> 1.17μs (60.7% faster)

    def test_zero_as_node_id(self):
        # Zero should be a valid node ID
        nodes = [{"id": 0}, {"id": 1}]
        edges = [{"source": 0, "target": 1}]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.83μs -> 1.17μs (57.1% faster)

    def test_empty_string_as_node_id(self):
        # Empty string as node ID
        nodes = [{"id": ""}, {"id": "A"}]
        edges = [{"source": "A", "target": ""}]
        # Empty string node has no outgoing edge
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.46μs -> 1.04μs (39.9% faster)


class TestFindLastNodeLargeScale:
    """Large scale test cases for performance and scalability"""

    def test_long_linear_chain(self):
        # Chain of 500 nodes: 0→1→2→...→499
        num_nodes = 500
        nodes = [{"id": i} for i in range(num_nodes)]
        edges = [{"source": i, "target": i + 1} for i in range(num_nodes - 1)]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 4.54ms -> 27.8μs (16229% faster)

    def test_large_number_of_nodes_no_edges(self):
        # 800 nodes with no edges, first should be returned
        num_nodes = 800
        nodes = [{"id": i, "value": f"node_{i}"} for i in range(num_nodes)]
        edges = []
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.33μs -> 1.08μs (23.2% faster)

    def test_dense_graph_one_sink(self):
        # 100 nodes, all pointing to node 100 (sink)
        num_nodes = 100
        nodes = [{"id": i} for i in range(num_nodes + 1)]
        edges = [{"source": i, "target": num_nodes} for i in range(num_nodes)]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 211μs -> 6.42μs (3193% faster)

    def test_many_edges_between_few_nodes(self):
        # 3 nodes but 300 edges (including duplicates)
        nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
        edges = []
        # Create many duplicate edges
        for _ in range(100):
            edges.append({"source": "A", "target": "B"})
            edges.append({"source": "B", "target": "C"})
            edges.append({"source": "A", "target": "C"})
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 14.8μs -> 7.00μs (112% faster)

    def test_wide_tree_structure(self):
        # Root node with 100 children, all are leaf nodes
        nodes = [{"id": 0}] + [{"id": i} for i in range(1, 101)]
        edges = [{"source": 0, "target": i} for i in range(1, 101)]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 6.42μs -> 3.25μs (97.4% faster)

    def test_multiple_disconnected_chains(self):
        # 10 separate chains of 50 nodes each
        nodes = []
        edges = []
        for chain in range(10):
            offset = chain * 50
            for i in range(50):
                nodes.append({"id": offset + i})
            for i in range(49):
                edges.append({"source": offset + i, "target": offset + i + 1})
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 75.3μs -> 14.5μs (418% faster)

    def test_binary_tree_structure(self):
        # Complete binary tree with 127 nodes (7 levels)
        # Node i has children at 2i+1 and 2i+2
        num_nodes = 127
        nodes = [{"id": i} for i in range(num_nodes)]
        edges = []
        for i in range(num_nodes // 2):
            left_child = 2 * i + 1
            right_child = 2 * i + 2
            if left_child < num_nodes:
                edges.append({"source": i, "target": left_child})
            if right_child < num_nodes:
                edges.append({"source": i, "target": right_child})
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 164μs -> 5.88μs (2703% faster)

    def test_large_cycle_no_exit(self):
        # Cycle of 200 nodes with no exit
        num_nodes = 200
        nodes = [{"id": i} for i in range(num_nodes)]
        edges = [{"source": i, "target": (i + 1) % num_nodes} for i in range(num_nodes)]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 749μs -> 10.8μs (6874% faster)


# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-find_last_node-mk3kjwyw and push.

Codeflash Static Badge

The optimization achieves a **53x speedup** by eliminating nested iteration through a classic algorithmic improvement: replacing O(n×m) complexity with O(n+m) complexity.

**What changed:**
The original code checks every edge for every node using nested iteration: `all(e["source"] != n["id"] for e in edges)`. The optimized version pre-computes a set of all source node IDs once: `sources = {e["source"] for e in edges}`, then performs fast O(1) membership lookups: `n["id"] not in sources`.

**Why this is faster:**
1. **Set lookup vs. linear scan**: Python set membership (`in`) is O(1) average case using hash tables, while the `all()` check iterates through all edges for each node, resulting in O(m) per node check
2. **Single pass vs. repeated iteration**: The set is built once in O(m) time, then reused for all n nodes. The original code iterates through all m edges for each of the n nodes
3. **Algorithmic complexity reduction**: Total complexity drops from O(n×m) to O(n+m), which is dramatically faster as graph size grows

**Performance characteristics by test case:**
- **Small graphs** (2-5 nodes/edges): 26-100% faster - overhead of set creation is negligible
- **Medium graphs** (100 nodes/edges): 97-3193% faster - set lookup advantage becomes clear
- **Large graphs** (500+ nodes/edges): 418-16229% faster - the O(n×m) vs O(n+m) difference dominates
- **Dense graphs** (many edges, few nodes): 112-6874% faster - particularly benefits from avoiding repeated edge iteration

The line profiler confirms this: the original code spent 100% of time (46.7ms) in the nested iteration, while the optimized version spends only 43.8% (149μs) building the set and 56.2% (191μs) doing lookups - a total of 340μs vs 46.7ms.

**Impact considerations:**
Without `function_references`, we cannot determine if this function is in a hot path, but given it processes graph structures (potentially in flow/workflow systems based on "flow" in the docstring), any system repeatedly querying for terminal nodes would benefit significantly, especially as graph size scales.
@codeflash-ai codeflash-ai bot requested a review from KRRT7 January 7, 2026 05:19
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Jan 7, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant