Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Jan 7, 2026

📄 22,229% (222.29x) speedup for find_last_node in src/algorithms/graph.py

⏱️ Runtime : 69.9 milliseconds 313 microseconds (best of 250 runs)

📝 Explanation and details

The optimized code achieves a 222x speedup by eliminating a nested loop anti-pattern that created O(n*m) time complexity.

What changed:
The original implementation used a nested comprehension all(e["source"] != n["id"] for e in edges) that, for each node, iterated through ALL edges to verify none had that node as a source. This means for every node candidate, the entire edge list was scanned.

The optimization pre-computes a set of all source IDs once (source_ids = {e["source"] for e in edges}), then performs a simple O(1) set membership check (n["id"] not in source_ids) for each node.

Why it's faster:

  • Original: O(n × m) - For n nodes and m edges, this performs up to n×m comparisons
  • Optimized: O(n + m) - Creates the set in O(m) time, then checks n nodes in O(1) each

The performance gain is most dramatic when both node and edge counts are large, as shown in the tests:

  • test_large_linear_chain: 31,815% faster (17.8ms → 55.8μs) with 1000 nodes/999 edges
  • test_large_graph_no_last_node: 32,219% faster (17.7ms → 54.8μs)
  • test_large_complete_binary_tree: 12,333% faster (2.26ms → 18.2μs)

Even small graphs benefit significantly (50-90% faster in most test cases) because set construction and membership testing are highly optimized in Python's C implementation.

Impact:
This optimization is crucial if find_last_node is called frequently or on graphs with hundreds/thousands of nodes and edges. The quadratic behavior of the original would become a bottleneck in any workflow processing multiple large graphs. The optimized version scales linearly and maintains consistent sub-millisecond performance even on 1000-node graphs.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 46 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Click to see Generated Regression Tests
from __future__ import annotations

# imports
import pytest  # used for our unit tests
from src.algorithms.graph import find_last_node

# unit tests


def test_basic_linear_chain_returns_terminal_node():
    # Basic case: a simple linear flow 1 -> 2 -> 3 should return node with id 3
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = [{"source": 1, "target": 2}, {"source": 2, "target": 3}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 2.25μs -> 1.25μs (80.0% faster)


def test_empty_nodes_returns_none():
    # Edge case: no nodes at all -> nothing to return
    nodes = []
    edges = []
    codeflash_output = find_last_node(nodes, edges)  # 791ns -> 875ns (9.60% slower)


def test_empty_edges_returns_first_node():
    # If there are no edges, no node has an outgoing edge; function should return the first node
    nodes = [{"id": "a"}, {"id": "b"}, {"id": "c"}]
    edges = []
    codeflash_output = find_last_node(nodes, edges)  # 1.21μs -> 958ns (26.1% faster)


def test_multiple_terminals_returns_first_in_nodes_order():
    # When multiple nodes have no outgoing edges, the first such node in the nodes list is chosen.
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = [
        {"source": 1, "target": 2}
    ]  # nodes 2 and 3 are terminals; node 2 appears first
    codeflash_output = find_last_node(nodes, edges)  # 1.75μs -> 1.17μs (50.1% faster)


def test_cycle_returns_none():
    # All nodes have outgoing edges in a cycle -> there is no terminal node (should return None)
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = [
        {"source": "A", "target": "B"},
        {"source": "B", "target": "C"},
        {"source": "C", "target": "A"},
    ]
    codeflash_output = find_last_node(nodes, edges)  # 2.38μs -> 1.29μs (84.0% faster)


def test_self_loop_is_considered_outgoing_and_returns_none():
    # A node with a self-loop has an outgoing edge and therefore is not a terminal
    nodes = [{"id": "solo"}]
    edges = [{"source": "solo", "target": "solo"}]
    codeflash_output = find_last_node(nodes, edges)  # 1.42μs -> 1.08μs (30.7% faster)


def test_edges_with_unknown_sources_do_not_affect_result():
    # Edges that reference source ids not present in nodes should simply be ignored for terminal detection
    nodes = [{"id": 10}, {"id": 20}]
    edges = [
        {"source": 9999, "target": 10}
    ]  # 9999 not in nodes -> no node in nodes is considered to have outgoing edges
    # All nodes are terminals in effect, so the first node should be returned
    codeflash_output = find_last_node(nodes, edges)  # 1.54μs -> 1.12μs (37.1% faster)


def test_duplicate_node_ids_all_marked_by_edge_and_return_none():
    # If nodes contain duplicate ids and an edge lists that id as a source,
    # every node with that id is considered to have an outgoing edge.
    a = {"id": 1, "name": "first"}
    b = {"id": 1, "name": "second"}
    nodes = [a, b]
    edges = [{"source": 1, "target": 2}]
    # Since the edge marks id 1 as a source, neither a nor b is terminal -> None
    codeflash_output = find_last_node(nodes, edges)  # 1.71μs -> 1.17μs (46.4% faster)


def test_various_id_types_and_complex_values():
    # IDs need not be integers; function should work with tuples / complex but comparable types
    node1 = {"id": ("user", 1)}
    node2 = {"id": ("user", 2)}
    nodes = [node1, node2]
    edges = [{"source": ("user", 1), "target": ("user", 2)}]
    # node2 should be the terminal node
    codeflash_output = find_last_node(nodes, edges)  # 2.00μs -> 1.21μs (65.6% faster)


def test_missing_source_key_in_edge_raises_keyerror():
    # The implementation directly accesses e["source"], so a missing key should raise KeyError.
    nodes = [{"id": 1}, {"id": 2}]
    edges = [{"src": 1, "target": 2}]  # typo: "src" instead of "source"
    with pytest.raises(KeyError):
        # The function should raise a KeyError when trying to access e["source"]
        find_last_node(nodes, edges)  # 1.62μs -> 792ns (105% faster)


def test_large_scale_linear_chain_performance_and_correctness():
    # Large-scale test but under the requested limits: create 800 nodes in a chain 0->1->...->799
    n = 800  # kept <= 1000 as required
    nodes = [{"id": i} for i in range(n)]
    # edges: 0->1, 1->2, ..., n-2 -> n-1
    edges = [{"source": i, "target": i + 1} for i in range(n - 1)]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 11.5ms -> 44.5μs (25727% faster)


def test_large_scale_many_terminals_returns_first_terminal():
    # Large-scale scenario with many terminal nodes: only half the nodes produce outgoing edges
    n = 800
    nodes = [{"id": i} for i in range(n)]
    # Make nodes 0..399 have outgoing edges to 1..400; nodes 400..799 are terminals.
    edges = [{"source": i, "target": i + 1} for i in range(400)]
    # The first node in nodes with no outgoing edges should be node with id 400
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 2.86ms -> 22.4μs (12677% faster)


def test_duplicate_edges_do_not_change_behavior():
    # Duplicate edge entries should not change which nodes are considered to have outgoing edges
    nodes = [{"id": "a"}, {"id": "b"}, {"id": "c"}]
    # Mark "a" as having outgoing edges, but duplicate the entry multiple times
    edges = [{"source": "a", "target": "b"}] * 5
    # Terminals are b and c; first in nodes order is b
    codeflash_output = find_last_node(nodes, edges)  # 2.21μs -> 1.29μs (70.9% faster)


def test_returned_object_is_same_reference_from_nodes_list():
    # Verify that the returned node is the exact object stored in the nodes list (not a copy)
    special = {"id": "end", "payload": [1, 2, 3]}
    nodes = [{"id": "start"}, special]
    edges = [{"source": "start", "target": "end"}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.92μs -> 1.08μs (76.8% faster)
    # Mutate the returned object and ensure nodes list sees the change (same reference)
    result["payload"].append(4)


def test_edges_with_extra_keys_are_ignored_for_logic():
    # Edges may carry additional metadata; only "source" is used by find_last_node
    nodes = [{"id": 1}, {"id": 2}]
    edges = [{"source": 1, "target": 2, "weight": 0.5, "meta": {"foo": "bar"}}]
    # Node 2 should be the terminal node regardless of extra keys
    codeflash_output = find_last_node(nodes, edges)  # 1.83μs -> 1.12μs (62.9% faster)


# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
import pytest  # used for our unit tests
from src.algorithms.graph import find_last_node

# unit tests


class TestFindLastNodeBasic:
    """Test basic functionality of find_last_node."""

    def test_single_node_no_edges(self):
        """Test with a single isolated node and no edges - should return that node."""
        nodes = [{"id": "A"}]
        edges = []
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.25μs -> 1.00μs (25.0% faster)

    def test_two_nodes_linear_chain(self):
        """Test with two nodes in a linear chain A->B, should return B."""
        nodes = [{"id": "A"}, {"id": "B"}]
        edges = [{"source": "A", "target": "B"}]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.92μs -> 1.12μs (70.4% faster)

    def test_three_nodes_linear_chain(self):
        """Test with three nodes in a linear chain A->B->C, should return C."""
        nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
        edges = [{"source": "A", "target": "B"}, {"source": "B", "target": "C"}]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 2.29μs -> 1.21μs (89.7% faster)

    def test_simple_tree_structure(self):
        """Test with a tree structure where one node has no outgoing edges."""
        # A -> B, A -> C (both B and C have no outgoing edges, should return first found)
        nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
        edges = [{"source": "A", "target": "B"}, {"source": "A", "target": "C"}]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.92μs -> 1.17μs (64.4% faster)


class TestFindLastNodeEdgeCases:
    """Test edge cases and unusual scenarios."""

    def test_empty_nodes_list(self):
        """Test with empty nodes list - should return None."""
        nodes = []
        edges = []
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 791ns -> 917ns (13.7% slower)

    def test_empty_edges_all_nodes_qualify(self):
        """Test with no edges - all nodes qualify, should return first node."""
        nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
        edges = []
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.29μs -> 1.00μs (29.2% faster)

    def test_all_nodes_are_sources_cycle(self):
        """Test with a cycle where all nodes are sources - should return None."""
        # A -> B -> C -> A (all nodes are sources)
        nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
        edges = [
            {"source": "A", "target": "B"},
            {"source": "B", "target": "C"},
            {"source": "C", "target": "A"},
        ]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 2.38μs -> 1.25μs (90.0% faster)

    def test_self_loop(self):
        """Test node with self-loop - it is a source, so cannot be last."""
        nodes = [{"id": "A"}, {"id": "B"}]
        edges = [{"source": "A", "target": "A"}, {"source": "A", "target": "B"}]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 2.00μs -> 1.25μs (60.0% faster)

    def test_multiple_edges_from_same_source(self):
        """Test node with multiple outgoing edges - should still be considered a source."""
        nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}, {"id": "D"}]
        edges = [
            {"source": "A", "target": "B"},
            {"source": "A", "target": "C"},
            {"source": "A", "target": "D"},
        ]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 2.08μs -> 1.21μs (72.4% faster)

    def test_multiple_last_nodes_returns_first(self):
        """Test that when multiple nodes qualify, the first one in iteration order is returned."""
        # A -> D, both B and C have no outgoing edges
        nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}, {"id": "D"}]
        edges = [{"source": "A", "target": "D"}]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.88μs -> 1.17μs (60.7% faster)

    def test_edges_reference_nonexistent_nodes(self):
        """Test when edges reference nodes not in the nodes list."""
        # Edge references "Z" which doesn't exist in nodes
        nodes = [{"id": "A"}, {"id": "B"}]
        edges = [{"source": "A", "target": "Z"}]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.83μs -> 1.12μs (63.0% faster)

    def test_node_is_target_but_not_source(self):
        """Test that being a target doesn't disqualify a node from being last."""
        # A -> B -> C, both B and C are targets but only C is not a source
        nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
        edges = [{"source": "A", "target": "B"}, {"source": "B", "target": "C"}]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 2.33μs -> 1.25μs (86.7% faster)


class TestFindLastNodeDataVariations:
    """Test with various data types and structures."""

    def test_integer_node_ids(self):
        """Test with integer node IDs."""
        nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
        edges = [{"source": 1, "target": 2}, {"source": 2, "target": 3}]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 2.25μs -> 1.25μs (80.0% faster)

    def test_mixed_type_node_ids(self):
        """Test with mixed type node IDs (string and integer)."""
        nodes = [{"id": "A"}, {"id": 1}, {"id": "B"}]
        edges = [{"source": "A", "target": 1}]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.92μs -> 1.17μs (64.3% faster)

    def test_nodes_with_additional_attributes(self):
        """Test that nodes with extra attributes are handled correctly."""
        nodes = [
            {"id": "A", "name": "Node A", "value": 10},
            {"id": "B", "name": "Node B", "value": 20},
        ]
        edges = [{"source": "A", "target": "B", "weight": 5}]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.88μs -> 1.17μs (60.7% faster)

    def test_complex_node_ids(self):
        """Test with complex string IDs containing special characters."""
        nodes = [{"id": "node-1"}, {"id": "node_2"}, {"id": "node.3"}]
        edges = [{"source": "node-1", "target": "node_2"}]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.92μs -> 1.12μs (70.4% faster)

    def test_empty_string_node_id(self):
        """Test with empty string as node ID."""
        nodes = [{"id": ""}, {"id": "A"}]
        edges = [{"source": "A", "target": ""}]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.46μs -> 1.04μs (39.9% faster)


class TestFindLastNodeComplexGraphs:
    """Test with more complex graph structures."""

    def test_diamond_structure(self):
        """Test with diamond structure: A -> B, A -> C, B -> D, C -> D."""
        nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}, {"id": "D"}]
        edges = [
            {"source": "A", "target": "B"},
            {"source": "A", "target": "C"},
            {"source": "B", "target": "D"},
            {"source": "C", "target": "D"},
        ]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 2.88μs -> 1.33μs (116% faster)

    def test_disconnected_components(self):
        """Test with multiple disconnected components."""
        # A -> B and C -> D (two separate chains)
        nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}, {"id": "D"}]
        edges = [{"source": "A", "target": "B"}, {"source": "C", "target": "D"}]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.96μs -> 1.21μs (62.0% faster)

    def test_star_topology(self):
        """Test star topology where one central node connects to many others."""
        nodes = [{"id": "center"}, {"id": "1"}, {"id": "2"}, {"id": "3"}, {"id": "4"}]
        edges = [
            {"source": "center", "target": "1"},
            {"source": "center", "target": "2"},
            {"source": "center", "target": "3"},
            {"source": "center", "target": "4"},
        ]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 2.12μs -> 1.21μs (75.9% faster)

    def test_partial_cycle(self):
        """Test graph with a cycle and additional nodes."""
        # A -> B -> C -> B (cycle), A -> D (D is last)
        nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}, {"id": "D"}]
        edges = [
            {"source": "A", "target": "B"},
            {"source": "B", "target": "C"},
            {"source": "C", "target": "B"},
            {"source": "A", "target": "D"},
        ]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 2.75μs -> 1.33μs (106% faster)


class TestFindLastNodeLargeScale:
    """Test performance and scalability with large datasets."""

    def test_large_linear_chain(self):
        """Test with a large linear chain of 1000 nodes."""
        # Create chain: 0 -> 1 -> 2 -> ... -> 999
        nodes = [{"id": i} for i in range(1000)]
        edges = [{"source": i, "target": i + 1} for i in range(999)]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 17.8ms -> 55.8μs (31815% faster)

    def test_large_star_topology(self):
        """Test with a large star topology (1 center, 999 leaves)."""
        nodes = [{"id": "center"}] + [{"id": i} for i in range(999)]
        edges = [{"source": "center", "target": i} for i in range(999)]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 48.6μs -> 19.3μs (152% faster)

    def test_large_graph_no_last_node(self):
        """Test with a large graph where all nodes are sources (cycle)."""
        # Create a cycle: 0 -> 1 -> 2 -> ... -> 999 -> 0
        nodes = [{"id": i} for i in range(1000)]
        edges = [{"source": i, "target": (i + 1) % 1000} for i in range(1000)]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 17.7ms -> 54.8μs (32219% faster)

    def test_large_graph_single_last_node(self):
        """Test with 1000 nodes where only the last one is not a source."""
        # All nodes point to the next, except node 999
        nodes = [{"id": i} for i in range(1000)]
        edges = [{"source": i, "target": i + 1} for i in range(999)]
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 17.7ms -> 54.9μs (32070% faster)

    def test_large_complete_binary_tree(self):
        """Test with a complete binary tree structure (depth 9, ~511 nodes)."""
        # Create a binary tree where each node i has children 2*i+1 and 2*i+2
        num_nodes = 511  # Complete binary tree with depth 9
        nodes = [{"id": i} for i in range(num_nodes)]
        edges = []
        for i in range(num_nodes // 2):  # Internal nodes
            left_child = 2 * i + 1
            right_child = 2 * i + 2
            if left_child < num_nodes:
                edges.append({"source": i, "target": left_child})
            if right_child < num_nodes:
                edges.append({"source": i, "target": right_child})

        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 2.26ms -> 18.2μs (12333% faster)

    def test_many_disconnected_single_nodes(self):
        """Test with many disconnected single nodes."""
        # 1000 isolated nodes with no edges
        nodes = [{"id": i} for i in range(1000)]
        edges = []
        codeflash_output = find_last_node(nodes, edges)
        result = codeflash_output  # 1.38μs -> 1.08μs (27.0% faster)


class TestFindLastNodeOrderDependency:
    """Test that the function returns the first qualifying node."""

    def test_order_matters_with_multiple_candidates(self):
        """Test that node order affects which last node is returned."""
        # Create scenario where multiple nodes qualify
        nodes_order1 = [{"id": "Z"}, {"id": "A"}, {"id": "M"}]
        nodes_order2 = [{"id": "A"}, {"id": "Z"}, {"id": "M"}]
        edges = []  # No edges, all qualify

        codeflash_output = find_last_node(nodes_order1, edges)
        result1 = codeflash_output  # 1.33μs -> 1.00μs (33.3% faster)
        codeflash_output = find_last_node(nodes_order2, edges)
        result2 = codeflash_output  # 583ns -> 458ns (27.3% faster)

    def test_order_with_some_sources(self):
        """Test order dependency when some nodes are sources."""
        nodes_order1 = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
        nodes_order2 = [{"id": "C"}, {"id": "B"}, {"id": "A"}]
        edges = [{"source": "A", "target": "C"}]

        codeflash_output = find_last_node(nodes_order1, edges)
        result1 = codeflash_output  # 1.92μs -> 1.17μs (64.4% faster)
        codeflash_output = find_last_node(nodes_order2, edges)
        result2 = codeflash_output  # 625ns -> 541ns (15.5% faster)


# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-find_last_node-mk3l3ik1 and push.

Codeflash Static Badge

The optimized code achieves a **222x speedup** by eliminating a nested loop anti-pattern that created O(n*m) time complexity.

**What changed:**
The original implementation used a nested comprehension `all(e["source"] != n["id"] for e in edges)` that, for each node, iterated through ALL edges to verify none had that node as a source. This means for every node candidate, the entire edge list was scanned.

The optimization **pre-computes a set** of all source IDs once (`source_ids = {e["source"] for e in edges}`), then performs a simple O(1) set membership check (`n["id"] not in source_ids`) for each node.

**Why it's faster:**
- **Original**: O(n × m) - For n nodes and m edges, this performs up to n×m comparisons
- **Optimized**: O(n + m) - Creates the set in O(m) time, then checks n nodes in O(1) each

The performance gain is most dramatic when both node and edge counts are large, as shown in the tests:
- `test_large_linear_chain`: **31,815% faster** (17.8ms → 55.8μs) with 1000 nodes/999 edges
- `test_large_graph_no_last_node`: **32,219% faster** (17.7ms → 54.8μs) 
- `test_large_complete_binary_tree`: **12,333% faster** (2.26ms → 18.2μs)

Even small graphs benefit significantly (50-90% faster in most test cases) because set construction and membership testing are highly optimized in Python's C implementation.

**Impact:**
This optimization is crucial if `find_last_node` is called frequently or on graphs with hundreds/thousands of nodes and edges. The quadratic behavior of the original would become a bottleneck in any workflow processing multiple large graphs. The optimized version scales linearly and maintains consistent sub-millisecond performance even on 1000-node graphs.
@codeflash-ai codeflash-ai bot requested a review from KRRT7 January 7, 2026 05:34
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Jan 7, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant