Skip to content

Conversation

@guyueh1
Copy link
Contributor

@guyueh1 guyueh1 commented Dec 18, 2025

What does this PR do ?

This PR implements an experimental feature to use a unified placement group for all workers in both colocated and noncolocated cases, and it enables placing workers based on order of IP and gpu ID.

This PR only changes the grpo algorithm right now, the changes are designed backward-compat so other algorithms like distillation or sft won't be affected.

Issues

List issues that this PR closes (syntax):

Usage

  • You can potentially add a usage example below
# Add a code snippet demonstrating how to use this

Before your PR is "Ready for review"

Pre checks:

  • Make sure you read and followed Contributor guidelines
  • Did you write any new necessary tests?
  • Did you run the unit tests and functional tests locally? Visit our Testing Guide for how to run tests
  • Did you add or update any necessary documentation? Visit our Document Development Guide for how to write, build and test the docs.

Additional Information

  • ...

Summary by CodeRabbit

Release Notes

  • New Features

    • Added ability to report and query worker node IP addresses and GPU assignments for improved cluster monitoring and troubleshooting.
  • Refactor

    • Unified cluster resource allocation and management into a single configuration system, replacing separate train and inference cluster handling.
    • Improved worker group initialization with streamlined parameter passing and enhanced resource allocation logic.

✏️ Tip: You can customize this high-level summary in your review settings.

root and others added 23 commits November 29, 2025 11:21
Signed-off-by: root <root@pool0-00514.cm.cluster>
Signed-off-by: Guyue Huang <guyueh@nvidia.com>
Signed-off-by: Guyue Huang <guyueh@nvidia.com>
Signed-off-by: Guyue Huang <guyueh@nvidia.com>
Signed-off-by: Guyue Huang <guyueh@nvidia.com>
Signed-off-by: Guyue Huang <guyueh@nvidia.com>
Signed-off-by: Guyue Huang <guyueh@nvidia.com>
Signed-off-by: Guyue Huang <guyueh@nvidia.com>
Signed-off-by: Guyue Huang <guyueh@nvidia.com>
Signed-off-by: Guyue Huang <guyueh@nvidia.com>
Signed-off-by: Guyue Huang <guyueh@nvidia.com>
Signed-off-by: Guyue Huang <guyueh@nvidia.com>
Signed-off-by: Guyue Huang <guyueh@nvidia.com>
This reverts commit 00f8c77.
Signed-off-by: Guyue Huang <guyueh@nvidia.com>
Signed-off-by: Guyue Huang <guyueh@nvidia.com>
Signed-off-by: Guyue Huang <guyueh@nvidia.com>
Signed-off-by: Guyue Huang <guyueh@nvidia.com>
@guyueh1 guyueh1 requested review from a team as code owners December 18, 2025 07:24
@guyueh1 guyueh1 self-assigned this Dec 18, 2025
@guyueh1 guyueh1 requested a review from terrykong December 18, 2025 07:24
@guyueh1 guyueh1 added the GB200 label Dec 18, 2025
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 18, 2025

📝 Walkthrough

Walkthrough

This PR refactors NeMo RL's distributed cluster management from per-node placement groups to a unified placement group model. It introduces explicit node and GPU offset parameters throughout worker group and policy initialization, centralizes GRPO resource allocation computation, adds node/GPU discovery methods to generation and policy interfaces, and consolidates dual cluster objects into a single unified cluster in GRPO setup.

Changes

Cohort / File(s) Summary
Virtual Cluster Unification
nemo_rl/distributed/virtual_cluster.py, tests/unit/distributed/test_virtual_cluster.py
Replaced per-node placement group handling with single unified placement group; renamed methods from plural to singular (_init_placement_groups_init_placement_group, _create_placement_groups_internal_create_placement_group_internal); changed GetGPUIDActor.get_gpu_id() to get_ip_and_gpu_id() returning (node_ip, gpu_id); default strategy changed from SPREAD to PACK; refactored address/port lookups to use unified group; added get_bundle_indices_list_for_worker_group() utility; updated test expectations for new internal state.
Worker Group Refactoring
nemo_rl/distributed/worker_groups.py
Changed RayWorkerGroup constructor: workers_per_node now Optional[int] (was Union[int, list[int]]); added num_nodes, tied_worker_group_size, cluster_node_offset, cluster_gpu_offset_each_node parameters; removed bundle_indices_list parameter; internally derives bundle indices via cluster.get_bundle_indices_list_for_worker_group(); updated node rank and worker naming to use node_idx instead of pg_idx.
GRPO Algorithm
nemo_rl/algorithms/grpo.py
Introduced _calculate_resource_allocation() to centralize GRPO resource computation; replaced separate (train_cluster, inference_cluster) with unified cluster object; updated setup() return signature; refactored init_policy() and init_vllm() to use new node/offset parameters and unified cluster; replaced policy.print_node_ip_and_gpu_id() with print_ip_and_gpu_id_of_workers().
Generation Interfaces & Implementations
nemo_rl/models/generation/interfaces.py, nemo_rl/models/generation/vllm/vllm_*
Added report_node_ip_and_gpu_id() abstract method to GenerationInterface; implemented in VllmGenerationWorker, VllmAsyncGenerationWorker, and VllmInternalWorkerExtension; updated VllmGeneration.__init__ to accept num_nodes, cluster_node_offset, cluster_gpu_offset_each_node; switched world-size computation from cluster.world_size() to num_nodes * workers_per_node; removed complex tied-worker bundle logic; replaced _init_placement_groups() call with simplified _init_placement_group().
Policy Interfaces & Implementations
nemo_rl/models/policy/interfaces.py, nemo_rl/models/policy/lm_policy.py, nemo_rl/models/policy/workers/megatron_policy_worker.py
Added report_node_ip_and_gpu_id() method to ColocatablePolicyInterface and Policy; added num_nodes, cluster_node_offset, cluster_gpu_offset_each_node to Policy.__init__; updated world-size calculation to use num_nodes * workers_per_node; refactored RayWorkerGroup construction with new parameters; renamed print_node_ip_and_gpu_id() to report_node_ip_and_gpu_id() and simplified to return without printing.
Utilities & Environments
nemo_rl/algorithms/utils.py, nemo_rl/environments/reward_model_environment.py
Added print_ip_and_gpu_id_of_workers() public function to collect and display node IPs and GPU IDs from policy and generation workers; replaced get_placement_groups() usage with get_unified_placement_group() in reward model environment.
Tests
tests/unit/models/policy/test_policy_validation.py
Updated mock cluster setup to use get_unified_placement_group() instead of get_placement_groups().

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

  • Multiple interdependent API signature changes across distributed, policy, and generation layers
  • Consolidated placement group logic requires careful validation of bundle indexing and address resolution
  • Resource allocation and world-size computation logic changes across multiple components
  • Parameter flow through GRPO setup → Policy/VllmGeneration → RayWorkerGroup needs validation for correctness
  • New node/offset parametrization affects worker discovery and initialization ordering

Areas requiring extra attention:

  • Bundle index mapping and sorting logic in unified placement group
  • Resource allocation calculation in _calculate_resource_allocation() for colocated/non-colocated modes
  • World-size computation consistency across Policy and VllmGeneration
  • Node/GPU discovery method implementations across sync/async workers

Possibly related PRs

  • PR #1081: Modifies RayVirtualCluster and RayWorkerGroup placement/address APIs (get_available_address_and_port, bundle/placement-group handling).
  • PR #1454: Modifies MegatronPolicyWorker class methods and node/GPU reporting functionality.
  • PR #971: Changes policy and worker initialization paths (lm_policy.py, megatron_policy_worker.py) related to node/GPU information reporting and constructor parameters.

Suggested labels

r0.4.0, CI:L1

Suggested reviewers

  • terrykong
  • parthchadha
  • yfw

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 79.25% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Test Results For Major Changes ⚠️ Warning Major PR changes lack documented test results, performance metrics, convergence verification, or testing information in the PR description. Add test execution results, convergence verification, performance metrics, and behavioral change documentation to the PR description before merging.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'refactor: Order node by IP in GRPO' accurately describes the main change—unifying placement groups and ordering workers by IP/GPU. However, it underemphasizes this is an experimental feature addition that refactors cluster initialization, making it partially descriptive of the broader scope.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (5)
tests/unit/distributed/test_virtual_cluster.py (1)

70-90: Fix _init_placement_group call-count expectation in test_env_max_retries_default_value

You patch RayVirtualCluster._init_placement_group and then:

cluster = RayVirtualCluster(bundle_ct_per_node_list=[1])
cluster._init_placement_group()
assert mock_init.call_count == 1

But the current RayVirtualCluster.__init__ already calls _init_placement_group() internally (see the snippet in nemo_rl/distributed/virtual_cluster.py), so with this patch:

  • mock_init is called once from __init__
  • and once again from the explicit cluster._init_placement_group()

mock_init.call_count should therefore be 2, not 1.

You can either:

  • Drop the explicit cluster._init_placement_group() call and keep call_count == 1, or
  • Keep the explicit call and update the assertion to assert mock_init.call_count == 2.

Given the intent (“default value is used and init is attempted”), removing the explicit call is probably cleaner.

nemo_rl/models/policy/lm_policy.py (1)

58-69: Handle workers_per_node consistently with its Union[int, list[int]] type

In Policy.__init__ you added topology parameters:

workers_per_node: Optional[Union[int, list[int]]] = None,
num_nodes: Optional[int] = None,
...
num_nodes = num_nodes or cluster.node_count()
workers_per_node = workers_per_node or cluster.workers_per_node()
actual_world_size = num_nodes * workers_per_node

and later pass workers_per_node through to RayWorkerGroup.

Issues:

  • The annotation explicitly allows list[int], but num_nodes * workers_per_node will raise TypeError if a list is ever passed (either by the caller or via cluster.workers_per_node()).
  • RayWorkerGroup.__init__ in the provided snippet also expects workers_per_node: Optional[int] | list[int], but its internal world size logic will similarly need a clear convention for list input (e.g., sum vs per-node counts).

If the intent for GRPO and most usages is a homogeneous integer workers-per-node, I’d recommend:

  • Narrowing the type here to Optional[int] (and matching that in call sites), or
  • Explicitly handling the list case, e.g.:
if isinstance(workers_per_node, list):
    actual_world_size = sum(workers_per_node)
else:
    actual_world_size = num_nodes * workers_per_node

and ensuring RayWorkerGroup uses the same convention.

Right now the code compiles but will crash if a valid list[int] is provided, which is surprising given the type hints.

Also applies to: 129-135, 152-165, 181-191

nemo_rl/models/generation/vllm/vllm_generation.py (1)

47-57: Constrain or normalize workers_per_node in VllmGeneration to avoid type errors

Constructor changes:

workers_per_node: Optional[Union[int, list[int]]] = None,
num_nodes: Optional[int] = None,
...
num_nodes = num_nodes or cluster.node_count()
workers_per_node = workers_per_node or cluster.workers_per_node()
world_size = num_nodes * workers_per_node
assert world_size % self.model_parallel_size == 0, ...
self.dp_size = world_size // self.model_parallel_size
...
needs_cross_node_parallelism = (
    self.model_parallel_size > workers_per_node
)
...
self.worker_group = RayWorkerGroup(
    cluster,
    worker_builder,
    num_nodes=num_nodes,
    workers_per_node=workers_per_node,
    tied_worker_group_size=self.model_parallel_size,
    ...
)

Here again, workers_per_node is annotated as Union[int, list[int]] but is:

  • Multiplied by num_nodes
  • Compared with self.model_parallel_size

Both operations assume workers_per_node is an int. If a list[int] is ever passed or returned by cluster.workers_per_node(), this will raise TypeError (and needs_cross_node_parallelism’s > comparison will also fail).

To keep the new unified topology robust, you likely want to:

  • Either narrow workers_per_node here to Optional[int] (and avoid list semantics in this class), or
  • Explicitly define semantics for a list, e.g.:
if isinstance(workers_per_node, list):
    world_size = sum(workers_per_node)
    local_workers_per_node = max(workers_per_node)  # or some other convention
else:
    world_size = num_nodes * workers_per_node
    local_workers_per_node = workers_per_node

needs_cross_node_parallelism = (self.model_parallel_size > local_workers_per_node)

and pass the appropriately normalized values into RayWorkerGroup.

Right now the signature advertises list support, but the implementation does not.

Also applies to: 66-75, 137-147, 150-151, 182-193

nemo_rl/algorithms/grpo.py (2)

40-46: Centralized resource allocation looks good; fix one incorrect assert and tighten invariants

The new _calculate_resource_allocation helper is a good step toward making GRPO’s resource layout explicit and unified across reward model / (non-)colocated generation. A couple of details need adjustment:

  1. Incorrect assert in multi-node non-colocated branch

For grpo_nodes > 1 and non-colocated inference:

inference_nodes, inference_gpus_per_node = (
    generation_resources["num_nodes"],
    generation_resources["gpus_per_node"],
)
assert inference_gpus_per_node == generation_resources["gpus_per_node"], (
    "When grpo_nodes > 1, inference.generation.colocated.resources.gpus_per_node must be equal to cluster.gpus_per_node"
    f"but got {inference_gpus_per_node} and {generation_resources['gpus_per_node']}."
)

This assert currently compares inference_gpus_per_node to generation_resources["gpus_per_node"], which are always equal by assignment, so the check is a no-op. From the error message, it looks like the intended validation was:

  • generation_resources["gpus_per_node"] == cluster_resources["gpus_per_node"] (i.e., each inference node should use the full per-node GPU count when grpo_nodes > 1).

You probably want:

-                inference_nodes, inference_gpus_per_node = generation_resources["num_nodes"], generation_resources["gpus_per_node"]
-                assert inference_gpus_per_node == generation_resources["gpus_per_node"], (
-                    "When grpo_nodes > 1, inference.generation.colocated.resources.gpus_per_node must be equal to cluster.gpus_per_node"
-                    f"but got {inference_gpus_per_node} and {generation_resources['gpus_per_node']}."
-                )
+                inference_nodes = generation_resources["num_nodes"]
+                inference_gpus_per_node = generation_resources["gpus_per_node"]
+                assert inference_gpus_per_node == total_gpus_per_node, (
+                    "When grpo_nodes > 1, generation.colocated.resources.gpus_per_node "
+                    "must be equal to cluster.gpus_per_node "
+                    f"but got {inference_gpus_per_node} and {total_gpus_per_node}."
+                )

so mismatched per-node GPU counts are caught early instead of producing a subtle misallocation.

  1. Error-message wording in reward-model branches

In the 1-node reward-model branch:

assert grpo_gpus_per_node > 0, (
    "policy.generation.colocated.resources.gpus_per_node must be > 0 "
    "when cluster.num_nodes = 1, "
    f"but got {grpo_gpus_per_node}."
)

At this point you are validating remaining GRPO GPUs after removing reward-model GPUs. Mentioning policy.generation.colocated.resources here is somewhat misleading; the constraint is really about:

  • cluster.gpus_per_node > rm_resources["gpus_per_node"]

Consider rewording to make it clear you’re checking that there is at least one GPU left per node for GRPO after carving out reward-model resources.

Otherwise, the overall structure — computing (grpo_nodes, grpo_gpus_per_node) first, then splitting into (policy_*, inference_*, *_offset) — is sound and matches the PR’s unified-cluster design.

Also applies to: 321-396, 398-413, 415-421


186-203: Fix setup return type annotation to match single-cluster return value

The setup function return type annotation specifies tuple[RayVirtualCluster, RayVirtualCluster] at the third position, but the implementation returns a single RayVirtualCluster instance:

return (
    policy,
    policy_generation,
    cluster,  # single RayVirtualCluster, not a tuple
    dataloader,
    val_dataloader,
    loss_fn,
    logger,
    checkpointer,
    grpo_save_state,
    master_config,
)

This causes a type annotation mismatch that will trigger errors in type checkers (mypy, pyright, IDE inspections) and mislead developers about the function's contract. Update the return type at line 16 from tuple[RayVirtualCluster, RayVirtualCluster] to RayVirtualCluster to match the actual implementation.

🧹 Nitpick comments (9)
tests/unit/distributed/test_virtual_cluster.py (1)

234-244: Verify _node_ids_sorted_by_node_id naming against RayVirtualCluster internals

test_create_sorted_bundle_and_node_ids_for_unified_pg asserts:

assert cluster._bundle_ids_sorted_by_ip_and_gpu is not None
...
assert cluster._node_ids_sorted_by_node_id is not None
assert len(cluster._node_ids_sorted_by_node_id) == 2
assert cluster._node_ids_sorted_by_node_id == [0, 0]

The provided context for RayVirtualCluster.__init__ initializes:

self._bundle_ids_sorted_by_ip_and_gpu: Optional[list[int]] = None
self._node_ids_sorted_by_ip_and_gpu: Optional[list[int]] = None

(no _node_ids_sorted_by_node_id there). If the implementation has not been updated to define _node_ids_sorted_by_node_id, this test will fail with AttributeError.

Please double-check that:

  • The production code actually defines and populates _node_ids_sorted_by_node_id, or
  • The test is updated to assert on the correctly named attribute (e.g. _node_ids_sorted_by_ip_and_gpu) if that’s the one you intend to verify.

Also, since this is poking into private attributes, consider adding a small public helper or property for this mapping to avoid test fragility when internals are refactored.

nemo_rl/models/policy/lm_policy.py (1)

839-846: Update report_node_ip_and_gpu_id docstring to match behavior

The new method:

def report_node_ip_and_gpu_id(self) -> list[tuple[str, int]]:
    """Print the node IP and GPU ID of the current worker."""
    results = ray.get(
        self.worker_group.run_all_workers_single_data(
            "report_node_ip_and_gpu_id",
        )
    )
    return results

now returns the list of (ip, gpu_id) pairs instead of printing them, which is correct for integration with print_ip_and_gpu_id_of_workers. The docstring still says “Print…”, which is misleading.

Recommend updating the docstring to something like:

"""Report the node IP and GPU ID for all policy workers."""

to reflect the actual behavior.

nemo_rl/models/generation/vllm/vllm_generation.py (1)

782-792: Align report_node_ip_and_gpu_id docstring and ensure result shape matches callers

Implementation:

def report_node_ip_and_gpu_id(self) -> list[tuple[str, int]]:
    """Print the node IP and GPU ID of the current worker."""
    method_name = "report_node_ip_and_gpu_id_async" if self.cfg["vllm_cfg"]["async_engine"] else "report_node_ip_and_gpu_id"
    results = ray.get(
        self.worker_group.run_all_workers_single_data(
            method_name,
            run_rank_0_only_axes=["tensor_parallel", "pipeline_parallel"],
        )
    )
    results = [tup for result in results for tup in result]
    return results

Comments:

  • The docstring should say “Report/return…” rather than “Print…”, as this method now returns the list for consumption by utilities like print_ip_and_gpu_id_of_workers.
  • The flattening logic assumes that each worker’s report_node_ip_and_gpu_id[_async] returns an iterable of (ip, gpu_id) tuples. That matches the VllmGenerationWorker.report_node_ip_and_gpu_id snippet, but it’s worth keeping this contract documented because the utility in nemo_rl/algorithms/utils.py depends on this exact shape.

I’d suggest:

"""Report (IP, GPU-ID) pairs for all vLLM workers."""

and possibly a short comment noting that the worker-level method returns per-rank lists which are flattened here.

nemo_rl/distributed/worker_groups.py (1)

430-434: Consider using underscore prefix for unused loop variables.

Static analysis correctly identifies that group_idx, node_idx, and local_rank are unused in this loop. Using _ prefix signals intent and silences linter warnings.

🔎 Suggested fix:
-        for group_idx, (node_idx, local_bundle_indices) in enumerate(bundle_indices_list):
-            for local_rank, bundle_idx in enumerate(local_bundle_indices):
+        for _group_idx, (_node_idx, local_bundle_indices) in enumerate(bundle_indices_list):
+            for _local_rank, bundle_idx in enumerate(local_bundle_indices):
                 addr, port = self.cluster.get_available_address_and_port(bundle_idx)
nemo_rl/distributed/virtual_cluster.py (3)

259-266: Consider moving successful return to an else block.

Per static analysis hint TRY300, placing the success path in else clarifies that it only executes when no exception occurred.

🔎 Suggested fix:
             try:
                 self._unified_placement_group = self._create_placement_group_internal(
                     self.placement_group_strategy
                 )
                 (
                     self._bundle_ids_sorted_by_ip_and_gpu,
                     self._node_ids_sorted_by_ip_and_gpu
                 ) = self._get_sorted_bundle_and_node_ids()
-                return
             except ResourceInsufficientError as e:
                 print(e)
                 print(
                     f"Retrying placement group creation... {i + 1}/{max_retries}. Next retry in {2**i} seconds."
                 )
                 time.sleep(2**i)
                 continue
+            else:
+                return

327-334: Improve exception handling in timeout cleanup.

The broad except Exception catch silently discards cleanup errors, which could hide issues. Consider logging the error. Also, the TimeoutError should be raised with proper chaining.

🔎 Suggested fix:
         except (TimeoutError, ray.exceptions.GetTimeoutError):
             # Clean up any created placement groups
             try:
                 remove_placement_group(pg)
-            except Exception:
-                pass
-            raise TimeoutError(
+            except Exception as cleanup_err:
+                logger.warning(f"Failed to clean up placement group during timeout: {cleanup_err}")
+            raise TimeoutError(
                 "Timed out waiting for placement groups to be ready. The cluster may not have enough resources "
                 "to satisfy the requested configuration, or the resources may be busy with other tasks."
-            )
+            ) from None

415-417: Consider converting lambda to a named function.

Per PEP 8 (E731), lambdas assigned to names should be def statements for better traceability in stack traces.

🔎 Suggested fix:
+        def ip_to_tuple(ip_address: str) -> tuple[int, ...]:
+            return tuple(int(x) for x in ip_address.split("."))
+
         # original index, node_id, gpu_id
-        ip_map = lambda ip_address: tuple(int(x) for x in ip_address.split("."))
         bundle_infos = [
-            (i, ip_map(ip_and_gpu_ids[i][0]), ip_and_gpu_ids[i][1]) for i in range(num_bundles)
+            (i, ip_to_tuple(ip_and_gpu_ids[i][0]), ip_and_gpu_ids[i][1]) for i in range(num_bundles)
         ]
nemo_rl/models/generation/interfaces.py (1)

261-263: Clarify docstring to match aggregated return type

The return type is list[tuple[str, int]], but the docstring says “current worker,” which reads like a single (ip, gpu_id). Consider clarifying that this returns all workers’ (ip, gpu_id) pairs for consistency with implementations.

nemo_rl/models/policy/interfaces.py (1)

195-197: Align ColocatablePolicyInterface return contract with worker/aggregator usage

ColocatablePolicyInterface.report_node_ip_and_gpu_id is typed as returning list[tuple[str, int]], but individual workers typically return a single (ip, gpu_id) while the top-level Policy aggregates them into a list.

To avoid confusion and type-checking friction, consider:

  • Making the interface explicitly represent the worker-level contract (e.g., tuple[str, int]) and letting aggregators wrap that in a list, or
  • Introducing a separate interface or helper for the aggregated “all workers” view.

Right now the shared annotation doesn’t cleanly match both roles.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5bf56a9 and c6b96a7.

📒 Files selected for processing (15)
  • nemo_rl/algorithms/grpo.py (6 hunks)
  • nemo_rl/algorithms/utils.py (2 hunks)
  • nemo_rl/distributed/virtual_cluster.py (11 hunks)
  • nemo_rl/distributed/worker_groups.py (8 hunks)
  • nemo_rl/environments/reward_model_environment.py (1 hunks)
  • nemo_rl/models/generation/interfaces.py (1 hunks)
  • nemo_rl/models/generation/vllm/vllm_backend.py (2 hunks)
  • nemo_rl/models/generation/vllm/vllm_generation.py (5 hunks)
  • nemo_rl/models/generation/vllm/vllm_worker.py (1 hunks)
  • nemo_rl/models/generation/vllm/vllm_worker_async.py (1 hunks)
  • nemo_rl/models/policy/interfaces.py (1 hunks)
  • nemo_rl/models/policy/lm_policy.py (6 hunks)
  • nemo_rl/models/policy/workers/megatron_policy_worker.py (1 hunks)
  • tests/unit/distributed/test_virtual_cluster.py (5 hunks)
  • tests/unit/models/policy/test_policy_validation.py (1 hunks)
🧰 Additional context used
📓 Path-based instructions (4)
**/*.py

📄 CodeRabbit inference engine (CODING_GUIDELINES.md)

**/*.py: Conform code to Python 3.12+
Indent code with 4 spaces. Do not use tabs
Use snake_case for file names
Use PascalCase for class names
Use snake_case for function and method names
Use snake_case for local variables
Prefix variable names that start with a number with 'k' (e.g., k_99th_percentile)
Use upper snake_case with 'G' prefix for global variables (e.g., G_MY_GLOBAL)
Use upper snake_case for constants
Avoid shadowing variables declared in an outer scope
Initialize all externally visible members of a class in the constructor
Prefer docstrings over comments for interfaces that may be used outside a file
Reserve comments for code within a function or interfaces that are local to a file
If a piece of code is commented out, include a comment describing its usage and why it's commented out. Remove debug comments before merging
Use Google style docstrings for classes and functions in Python, which can be parsed by Sphinx
Avoid using reflection when functionality can be easily achieved without reflection
When using try-except blocks, limit the except clause to the smallest set of specific errors possible
When using try-except blocks for duck-typing, keep the body of the try as small as possible and use the else block for logic
YAML is the single source of truth for configuration defaults. Do not set non-None defaults in code for configuration values
For required configuration attributes, access config directly and expect presence (e.g., policy_cfg['precision']) without hidden defaults
Use typing.NotRequired to mark optional attributes in TypedDict for configuration
When adding a new config key to a TypedDict subclass, document the key's purpose, valid values/types, and recommended default, and reflect the default in exemplar YAMLs under examples/configs/*.yaml
Follow the Google Python Style Guide for Python code

Files:

  • nemo_rl/environments/reward_model_environment.py
  • nemo_rl/models/policy/interfaces.py
  • nemo_rl/models/policy/workers/megatron_policy_worker.py
  • nemo_rl/models/generation/vllm/vllm_worker_async.py
  • nemo_rl/models/generation/vllm/vllm_worker.py
  • nemo_rl/models/generation/vllm/vllm_backend.py
  • tests/unit/models/policy/test_policy_validation.py
  • nemo_rl/algorithms/utils.py
  • nemo_rl/models/policy/lm_policy.py
  • tests/unit/distributed/test_virtual_cluster.py
  • nemo_rl/models/generation/interfaces.py
  • nemo_rl/distributed/virtual_cluster.py
  • nemo_rl/algorithms/grpo.py
  • nemo_rl/distributed/worker_groups.py
  • nemo_rl/models/generation/vllm/vllm_generation.py
nemo_rl/**/*.py

📄 CodeRabbit inference engine (CODING_GUIDELINES.md)

For any source file under nemo_rl/*.py that defines a class or function decorated with @ray.remote, add a coverage pragma (# pragma: no cover) because these run in separate Ray processes

Files:

  • nemo_rl/environments/reward_model_environment.py
  • nemo_rl/models/policy/interfaces.py
  • nemo_rl/models/policy/workers/megatron_policy_worker.py
  • nemo_rl/models/generation/vllm/vllm_worker_async.py
  • nemo_rl/models/generation/vllm/vllm_worker.py
  • nemo_rl/models/generation/vllm/vllm_backend.py
  • nemo_rl/algorithms/utils.py
  • nemo_rl/models/policy/lm_policy.py
  • nemo_rl/models/generation/interfaces.py
  • nemo_rl/distributed/virtual_cluster.py
  • nemo_rl/algorithms/grpo.py
  • nemo_rl/distributed/worker_groups.py
  • nemo_rl/models/generation/vllm/vllm_generation.py
!(**/tests/**|**/test_*.py|**/test_*.sh)

📄 CodeRabbit inference engine (CODING_GUIDELINES.md)

Add the NVIDIA copyright header to all Python files and shell scripts (excluding tests). The header should include the current year

Files:

  • nemo_rl/environments/reward_model_environment.py
  • nemo_rl/models/policy/interfaces.py
  • nemo_rl/models/policy/workers/megatron_policy_worker.py
  • nemo_rl/models/generation/vllm/vllm_worker_async.py
  • nemo_rl/models/generation/vllm/vllm_worker.py
  • nemo_rl/models/generation/vllm/vllm_backend.py
  • tests/unit/models/policy/test_policy_validation.py
  • nemo_rl/algorithms/utils.py
  • nemo_rl/models/policy/lm_policy.py
  • tests/unit/distributed/test_virtual_cluster.py
  • nemo_rl/models/generation/interfaces.py
  • nemo_rl/distributed/virtual_cluster.py
  • nemo_rl/algorithms/grpo.py
  • nemo_rl/distributed/worker_groups.py
  • nemo_rl/models/generation/vllm/vllm_generation.py
**/*.{py,sh}

📄 CodeRabbit inference engine (CODING_GUIDELINES.md)

The NVIDIA copyright header should appear at the top of all Python files and shell scripts (excluding tests)

Files:

  • nemo_rl/environments/reward_model_environment.py
  • nemo_rl/models/policy/interfaces.py
  • nemo_rl/models/policy/workers/megatron_policy_worker.py
  • nemo_rl/models/generation/vllm/vllm_worker_async.py
  • nemo_rl/models/generation/vllm/vllm_worker.py
  • nemo_rl/models/generation/vllm/vllm_backend.py
  • tests/unit/models/policy/test_policy_validation.py
  • nemo_rl/algorithms/utils.py
  • nemo_rl/models/policy/lm_policy.py
  • tests/unit/distributed/test_virtual_cluster.py
  • nemo_rl/models/generation/interfaces.py
  • nemo_rl/distributed/virtual_cluster.py
  • nemo_rl/algorithms/grpo.py
  • nemo_rl/distributed/worker_groups.py
  • nemo_rl/models/generation/vllm/vllm_generation.py
🧠 Learnings (4)
📓 Common learnings
Learnt from: zpqiu
Repo: NVIDIA-NeMo/RL PR: 1006
File: nemo_rl/algorithms/distillation.py:312-354
Timestamp: 2025-09-18T14:57:31.003Z
Learning: The distillation algorithm's cluster setup logic is designed to follow the same patterns used in GRPO for handling distributed training clusters and resource allocation.
📚 Learning: 2025-09-10T05:35:59.840Z
Learnt from: bxyu-nvidia
Repo: NVIDIA-NeMo/RL PR: 1110
File: nemo_rl/models/generation/vllm/vllm_worker_async.py:363-369
Timestamp: 2025-09-10T05:35:59.840Z
Learning: In nemo_rl/models/generation/vllm/vllm_worker_async.py, the HTTP server should explicitly bind to "0.0.0.0" (all interfaces) rather than a specific interface, as confirmed by bxyu-nvidia. This is an intentional design decision for the vLLM HTTP server functionality.

Applied to files:

  • nemo_rl/models/generation/vllm/vllm_worker_async.py
  • nemo_rl/models/generation/vllm/vllm_worker.py
📚 Learning: 2025-09-18T14:57:31.003Z
Learnt from: zpqiu
Repo: NVIDIA-NeMo/RL PR: 1006
File: nemo_rl/algorithms/distillation.py:312-354
Timestamp: 2025-09-18T14:57:31.003Z
Learning: The distillation algorithm's cluster setup logic is designed to follow the same patterns used in GRPO for handling distributed training clusters and resource allocation.

Applied to files:

  • nemo_rl/algorithms/grpo.py
📚 Learning: 2025-10-30T20:50:44.126Z
Learnt from: adil-a
Repo: NVIDIA-NeMo/RL PR: 1440
File: examples/configs/sft_automodel.yaml:48-58
Timestamp: 2025-10-30T20:50:44.126Z
Learning: In DTensor configurations for MoE (Mixture of Experts) models, expert_parallel_size and data_parallel_size can be applied together without multiplying the GPU requirements. Expert Parallelism (EP) only applies to MoE layers, while Data Parallelism/FSDP applies to non-MoE layers. Therefore, configurations like expert_parallel_size: 8 and data_parallel_size: 8 are valid on an 8-GPU cluster for MoE models.

Applied to files:

  • nemo_rl/models/generation/vllm/vllm_generation.py
🧬 Code graph analysis (12)
nemo_rl/environments/reward_model_environment.py (2)
tests/unit/distributed/test_worker_groups.py (1)
  • virtual_cluster (213-221)
nemo_rl/distributed/virtual_cluster.py (1)
  • get_unified_placement_group (338-342)
nemo_rl/models/policy/workers/megatron_policy_worker.py (7)
nemo_rl/models/generation/vllm/vllm_backend.py (1)
  • report_node_ip_and_gpu_id (278-285)
nemo_rl/models/generation/vllm/vllm_generation.py (1)
  • report_node_ip_and_gpu_id (782-792)
nemo_rl/models/policy/lm_policy.py (1)
  • report_node_ip_and_gpu_id (839-846)
nemo_rl/models/generation/interfaces.py (1)
  • report_node_ip_and_gpu_id (261-263)
nemo_rl/models/generation/vllm/vllm_worker.py (1)
  • report_node_ip_and_gpu_id (774-786)
nemo_rl/models/policy/interfaces.py (1)
  • report_node_ip_and_gpu_id (196-197)
nemo_rl/models/policy/workers/base_policy_worker.py (1)
  • report_node_ip_and_gpu_id (119-123)
nemo_rl/models/generation/vllm/vllm_worker_async.py (1)
nemo_rl/models/generation/vllm/vllm_worker.py (1)
  • llm (337-338)
nemo_rl/models/generation/vllm/vllm_worker.py (4)
nemo_rl/models/generation/vllm/vllm_backend.py (1)
  • report_node_ip_and_gpu_id (278-285)
nemo_rl/models/generation/vllm/vllm_generation.py (1)
  • report_node_ip_and_gpu_id (782-792)
nemo_rl/models/generation/interfaces.py (1)
  • report_node_ip_and_gpu_id (261-263)
nemo_rl/models/policy/interfaces.py (1)
  • report_node_ip_and_gpu_id (196-197)
nemo_rl/models/generation/vllm/vllm_backend.py (5)
nemo_rl/models/generation/vllm/vllm_generation.py (1)
  • report_node_ip_and_gpu_id (782-792)
nemo_rl/models/policy/lm_policy.py (1)
  • report_node_ip_and_gpu_id (839-846)
nemo_rl/models/generation/interfaces.py (1)
  • report_node_ip_and_gpu_id (261-263)
nemo_rl/models/policy/workers/megatron_policy_worker.py (1)
  • report_node_ip_and_gpu_id (2402-2406)
nemo_rl/models/policy/interfaces.py (1)
  • report_node_ip_and_gpu_id (196-197)
tests/unit/models/policy/test_policy_validation.py (1)
nemo_rl/distributed/virtual_cluster.py (2)
  • world_size (348-349)
  • get_unified_placement_group (338-342)
nemo_rl/algorithms/utils.py (6)
nemo_rl/models/policy/interfaces.py (2)
  • ColocatablePolicyInterface (160-197)
  • report_node_ip_and_gpu_id (196-197)
nemo_rl/models/generation/interfaces.py (2)
  • GenerationInterface (215-263)
  • report_node_ip_and_gpu_id (261-263)
nemo_rl/models/generation/vllm/vllm_backend.py (1)
  • report_node_ip_and_gpu_id (278-285)
nemo_rl/models/generation/vllm/vllm_generation.py (1)
  • report_node_ip_and_gpu_id (782-792)
nemo_rl/models/policy/lm_policy.py (1)
  • report_node_ip_and_gpu_id (839-846)
nemo_rl/models/generation/vllm/vllm_worker.py (1)
  • report_node_ip_and_gpu_id (774-786)
nemo_rl/models/policy/lm_policy.py (7)
nemo_rl/distributed/virtual_cluster.py (1)
  • node_count (351-352)
nemo_rl/distributed/worker_groups.py (3)
  • RayWorkerGroup (304-987)
  • workers (575-576)
  • run_all_workers_single_data (711-755)
nemo_rl/models/generation/vllm/vllm_generation.py (1)
  • report_node_ip_and_gpu_id (782-792)
nemo_rl/models/generation/interfaces.py (1)
  • report_node_ip_and_gpu_id (261-263)
nemo_rl/models/policy/workers/megatron_policy_worker.py (1)
  • report_node_ip_and_gpu_id (2402-2406)
nemo_rl/models/policy/interfaces.py (1)
  • report_node_ip_and_gpu_id (196-197)
nemo_rl/models/policy/workers/base_policy_worker.py (1)
  • report_node_ip_and_gpu_id (119-123)
tests/unit/distributed/test_virtual_cluster.py (1)
nemo_rl/distributed/virtual_cluster.py (3)
  • _init_placement_group (237-276)
  • RayVirtualCluster (186-503)
  • ResourceInsufficientError (182-183)
nemo_rl/algorithms/grpo.py (1)
nemo_rl/distributed/virtual_cluster.py (2)
  • RayVirtualCluster (186-503)
  • get_master_address_and_port (370-384)
nemo_rl/distributed/worker_groups.py (1)
nemo_rl/distributed/virtual_cluster.py (3)
  • get_bundle_indices_list_for_worker_group (429-474)
  • get_available_address_and_port (354-368)
  • get_unified_placement_group (338-342)
nemo_rl/models/generation/vllm/vllm_generation.py (6)
nemo_rl/distributed/virtual_cluster.py (2)
  • node_count (351-352)
  • world_size (348-349)
nemo_rl/distributed/worker_groups.py (1)
  • RayWorkerGroup (304-987)
nemo_rl/models/generation/vllm/vllm_backend.py (1)
  • report_node_ip_and_gpu_id (278-285)
nemo_rl/models/generation/interfaces.py (1)
  • report_node_ip_and_gpu_id (261-263)
nemo_rl/models/generation/vllm/vllm_worker.py (1)
  • report_node_ip_and_gpu_id (774-786)
nemo_rl/models/policy/interfaces.py (1)
  • report_node_ip_and_gpu_id (196-197)
🪛 Ruff (0.14.8)
nemo_rl/models/generation/vllm/vllm_worker.py

781-783: Avoid specifying long messages outside the exception class

(TRY003)

nemo_rl/models/generation/vllm/vllm_backend.py

283-283: Do not use bare except

(E722)

nemo_rl/distributed/virtual_cluster.py

266-266: Consider moving this statement to an else block

(TRY300)


329-330: try-except-pass detected, consider logging the exception

(S110)


329-329: Do not catch blind exception: Exception

(BLE001)


331-334: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


331-334: Avoid specifying long messages outside the exception class

(TRY003)


415-415: Do not assign a lambda expression, use a def

Rewrite ip_map as a def

(E731)


487-487: Do not catch blind exception: Exception

(BLE001)

nemo_rl/distributed/worker_groups.py

430-430: Loop control variable group_idx not used within loop body

(B007)


430-430: Loop control variable node_idx not used within loop body

(B007)


431-431: Loop control variable local_rank not used within loop body

(B007)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Lint check
  • GitHub Check: Lint check
  • GitHub Check: Post submodule check comment / Comment on PR
  • GitHub Check: Post automodel integration comment / Comment on PR
🔇 Additional comments (16)
nemo_rl/models/policy/lm_policy.py (1)

525-535: Double-check FLOPs aggregation assumptions (gpus_per_worker)

In the FLOPs accounting branch:

aggregated_results["total_flops"] = self.flops_tracker.total_flops
aggregated_results["num_ranks"] = len(self.worker_group.workers)
gpus_per_worker = len(self.worker_group.workers) / len(results)

aggregated_results["theoretical_tflops"] = gpus_per_worker * sum(
    get_theoretical_tflops(r["gpu_name"], r["model_dtype"])
    for r in results
)

This assumes:

  • len(self.worker_group.workers) is the true GPU world size.
  • len(results) is proportional to the number of distinct GPU types you want to sum over, and gpus_per_worker acts as a multiplicity factor.

That’s plausible, but brittle if results is, for example, already per-rank or filtered to a subset (e.g., rank-0 per DP group). If len(results) ever changes without adjusting this logic, your theoretical TFLOPS numbers will silently drift.

I’d suggest:

  • Either documenting the intended invariant (len(results) == number of distinct GPU descriptors, gpus_per_worker == GPUs per descriptor), or
  • Refactoring to compute a per-rank theoretical TFLOPS directly from the actual world size and a per-GPU spec to avoid relying on len(results) divisions.

No functional change is strictly required if current invariants hold, but this is a spot worth re-checking.

nemo_rl/algorithms/grpo.py (3)

415-421: Unified cluster wiring and sequential init look consistent

The following pieces fit together coherently with the new unified placement-group model:

  • Creating a single RayVirtualCluster:
cluster = RayVirtualCluster(
    name="grpo_cluster",
    bundle_ct_per_node_list=[grpo_gpus_per_node] * grpo_nodes,
    use_gpus=True,
    num_gpus_per_node=grpo_gpus_per_node,
    max_colocated_worker_groups=(
        2 if colocated_inference and generation_config["backend"] != "megatron" else 1
    ),
)
  • Passing the derived topology into Policy and VllmGeneration:
p = Policy(
    cluster=cluster,
    ...,
    num_nodes=policy_nodes,
    workers_per_node=policy_gpus_per_node,
    cluster_node_offset=policy_node_offset,
    cluster_gpu_offset_each_node=policy_gpu_offset,
)

pg = VllmGeneration(
    cluster=cluster,
    config=generation_config,
    num_nodes=inference_nodes,
    workers_per_node=inference_gpus_per_node,
    cluster_node_offset=inference_node_offset,
    cluster_gpu_offset_each_node=inference_gpu_offset,
)
  • Sequentially initializing vLLM first and then policy in the vLLM backend path to reduce memory pressure, and explicitly recording the timing metrics (with parallel_init_enabled = 0.0).

This matches the new RayWorkerGroup API and the PR goal of using a single placement group plus node/GPU offsets. Once the minor resource-allocation assert is fixed (see previous comment) and the workers_per_node type assumptions in Policy/VllmGeneration are addressed, this wiring should behave as intended.

Also applies to: 455-471, 474-487, 529-541


551-552: print_ip_and_gpu_id_of_workers integration is helpful; ensure backends obey the contract

Using:

print_ip_and_gpu_id_of_workers(policy, policy_generation)

right after worker initialization is a nice debugging aid for the new IP/GPU-aware placement.

Given this call depends on:

  • policy.report_node_ip_and_gpu_id()
  • policy_generation.report_node_ip_and_gpu_id() (when present)

all returning list[tuple[ip, gpu_id]] with a consistent GPU-id type, please make sure:

  • All concrete implementations (Policy, vLLM generation, Megatron/DTensor workers, vLLM backends) adhere to that contract, and
  • GPU IDs are normalized as discussed in the utils comment so mixed int/str values don’t break the printer.

No code change strictly required here beyond those backend/utility fixes.


553-568: Collective world size computation looks consistent with new topology

For non-colocated inference you now do:

ip, port = cluster.get_master_address_and_port()
...
train_world_size = policy_nodes * policy_gpus_per_node
inference_world_size = inference_nodes * inference_gpus_per_node
world_size = train_world_size + inference_world_size
...
futures_train = policy.init_collective(
    ip, port, world_size, train_world_size=train_world_size
)
futures_inference = policy_generation.init_collective(
    ip, port, world_size, train_world_size=train_world_size
)

Given how _calculate_resource_allocation defines (policy_nodes, policy_gpus_per_node, inference_nodes, inference_gpus_per_node), this ensures:

  • NCCL world size matches the total number of training plus inference ranks, and
  • Each side knows the training world size for its own local logic.

This is aligned with the unified-cluster design and looks correct as long as the _calculate_resource_allocation invariants hold (notably that the sum of requested ranks does not exceed the underlying placement group’s world size).

nemo_rl/distributed/worker_groups.py (3)

316-341: LGTM! Clean API simplification.

The constructor signature change centralizes bundle index computation in the cluster object, reducing caller complexity. The new parameters (num_nodes, cluster_node_offset, cluster_gpu_offset_each_node) provide fine-grained control over worker placement in the unified placement group model.


349-357: LGTM!

Clean delegation to the cluster's bundle index computation. The relevant code snippet shows that get_bundle_indices_list_for_worker_group handles defaults and validation internally.


436-521: LGTM! Worker creation logic properly adapted to unified placement group model.

The loop correctly:

  • Uses node_idx from bundle indices for NODE_RANK environment variable (line 451)
  • Sets worker_bundle_indices with (node_idx, local_bundle_indices) for the leader (line 468)
  • Updates naming to use node_idx for non-parallel groups (line 475)
  • Tracks metadata with correct node_idx (line 511)
nemo_rl/distributed/virtual_cluster.py (6)

178-179: LGTM!

Method renamed from get_gpu_id to get_ip_and_gpu_id with appropriate return type change to (node_ip, gpu_id). This supports the new IP-based node ordering feature.


206-206: Behavioral change: Default placement strategy changed from SPREAD to PACK.

This change affects how Ray allocates resources. PACK groups bundles on fewer nodes while SPREAD distributes across nodes. Ensure this is intentional and tested, as it may impact existing workloads that implicitly relied on SPREAD behavior.


338-346: LGTM! Good backward compatibility pattern.

get_unified_placement_group provides the new API with an assertion guard, while get_placement_groups maintains backward compatibility by wrapping the unified group in a list.


449-453: Note: or 0 pattern handles explicit zero correctly here.

Using cluster_node_offset or 0 works correctly because the default is 0 anyway. If explicit 0 is passed, 0 or 0 evaluates to 0. However, if the default ever needs to change, consider using if x is None pattern for clarity.


455-474: LGTM! Solid bounds checking and index computation.

The assertions provide clear error messages with actual values. The bundle indices computation correctly handles the tied worker group size and produces the expected (node_idx, bundle_indices) tuples.


483-492: LGTM!

The shutdown logic correctly handles the unified placement group. The broad exception catch is acceptable here since cleanup should be best-effort, and the error is logged with the placement group ID for debugging.

tests/unit/models/policy/test_policy_validation.py (1)

39-42: Mock unified placement group wiring looks correct

The mock unified placement group and bundle_count setup align with the new get_unified_placement_group API and should keep the validation tests behaving as intended.

nemo_rl/environments/reward_model_environment.py (1)

157-159: Virtual-cluster logging change is consistent with unified PG API

Using get_unified_placement_group() in the init log matches the unified placement-group API and is a safe, no-op behavior change.

nemo_rl/models/generation/vllm/vllm_worker.py (1)

774-785: Synchronous IP/GPU reporting hook is consistent with existing patterns

The new report_node_ip_and_gpu_id follows the same collective RPC pattern and async-engine guarding used elsewhere in this worker; looks correct for the synchronous vLLM path.

Comment on lines 31 to 32
from nemo_rl.models.policy.interfaces import ColocatablePolicyInterface
from nemo_rl.models.generation.interfaces import GenerationInterface
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Normalize GPU ID type in print_ip_and_gpu_id_of_workers to avoid runtime errors

_print_in_table assumes gpu_id is int (type hint list[tuple[str, int]]) and does:

  • all_gpu_ids = sorted(set([result[1] for result in results]))
  • uses those values as keys to index into worker_id_list.

However, the vLLM backend’s report_node_ip_and_gpu_id can return "Unknown" as a string GPU id when ray.get_gpu_ids() fails (see nemo_rl/models/generation/vllm/vllm_backend.py in the snippets). That means results may contain a mix of int and str GPU IDs, and:

  • sorted(set([...])) will raise TypeError in Python 3 when comparing int and str.

This can make print_ip_and_gpu_id_of_workers crash exactly in the scenarios where it is most useful (cluster misconfiguration / missing GPUs).

I’d recommend:

  • Widening the type hint to allow str (e.g. list[tuple[str, int | str]]), and
  • Normalizing to a string representation before computing all_gpu_ids and indexing.

For example:

Suggested change
-def print_ip_and_gpu_id_of_workers(policy: ColocatablePolicyInterface, policy_generation: Optional[GenerationInterface]) -> None:
+def print_ip_and_gpu_id_of_workers(
+    policy: ColocatablePolicyInterface,
+    policy_generation: Optional[GenerationInterface],
+) -> None:
@@
-    def _print_in_table(results: list[tuple[str, int]], table_title: str) -> None:
-        all_node_ips = sorted(set([result[0] for result in results]))
-        all_gpu_ids = sorted(set([result[1] for result in results]))
+    def _print_in_table(
+        results: list[tuple[str, int | str]], table_title: str
+    ) -> None:
+        all_node_ips = sorted({ip for ip, _ in results})
+        # Normalize GPU IDs to strings to avoid int/str comparison issues
+        all_gpu_ids = sorted({str(gpu_id) for _, gpu_id in results})
@@
-        for worker_id, (ip, gpu_id) in enumerate(results):
+        for worker_id, (ip, gpu_id) in enumerate(results):
             node_idx = all_node_ips.index(ip)
-            gpu_idx = all_gpu_ids.index(gpu_id)
+            gpu_idx = all_gpu_ids.index(str(gpu_id))
             worker_id_list[node_idx][gpu_idx].append("worker-" + str(worker_id))
@@
-        table.field_names = ["Node_IP"] + [
-            "GPU_ID=" + str(gpu_id) for gpu_id in all_gpu_ids
-        ]
+        table.field_names = ["Node_IP"] + [
+            f"GPU_ID={gpu_id}" for gpu_id in all_gpu_ids
+        ]

This keeps the table readable and robust to backends that express GPU IDs as strings. You may also want to align the interface/type hints in the various report_node_ip_and_gpu_id implementations to match this.

Also applies to: 752-784

🤖 Prompt for AI Agents
In nemo_rl/algorithms/utils.py around lines 31-32 (and similarly for the other
occurrence at ~752-784), the GPU id type is assumed to be int which causes
TypeError when backends return string values like "Unknown"; update the type
hints from list[tuple[str, int]] to list[tuple[str, int | str]] (or the
project’s compatible union syntax) and normalize GPU ids to strings before
computing all_gpu_ids and using them as keys: when building/receiving results
cast each gpu_id to str(gpu_id) (or map to a canonical string such as "Unknown"
for None), use those string ids to compute sorted unique ids and to index
worker_id_list/_print_in_table, and update any related function signatures and
docstrings so all internal operations treat GPU ids as strings to avoid
mixed-type comparisons.

Comment on lines 278 to 285
def report_node_ip_and_gpu_id(self) -> list[tuple[str, int]]:
"""Report the node IP and GPU ID of the current worker."""
ip = ray._private.services.get_node_ip_address()
try:
gpu_id = int(ray.get_gpu_ids()[0])
except:
gpu_id = "Unknown"
return (ip, gpu_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Tighten exception handling and fix return typing in report_node_ip_and_gpu_id

Two small issues here:

  • The method is annotated as list[tuple[str, int]] but returns a single (ip, gpu_id) tuple, and gpu_id may be the string "Unknown". The effective type is tuple[str, int | str], not list[tuple[str, int]].
  • The bare except: around ray.get_gpu_ids()[0] is too broad and flagged by Ruff (E722).

Consider:

  • Changing the signature to something like -> tuple[str, int | str] (or at least -> tuple[str, object]) to match reality, and relying on the caller (collective_rpc) to aggregate into a list.
  • Narrowing the handler to except Exception: (or a more specific Ray error type) instead of a bare except.
🧰 Tools
🪛 Ruff (0.14.8)

283-283: Do not use bare except

(E722)

🤖 Prompt for AI Agents
nemo_rl/models/generation/vllm/vllm_backend.py around lines 278 to 285: the
function is annotated as list[tuple[str, int]] but actually returns a single
tuple and may return a string for gpu_id; change the signature to return
tuple[str, int | str] (or tuple[str, object] if Python version/type constraints
require) and update callers if they expect a list, and replace the bare except:
with a narrowed handler such as except Exception: (or a more specific Ray error)
so only expected errors are caught and Ruff E722 is avoided.

Comment on lines 1115 to 1127
async def report_node_ip_and_gpu_id_async(self) -> list[tuple[str, int]]:
"""Async version of report_node_ip_and_gpu_id."""
assert self.llm is not None, (
"Attempting to report node IP and GPU ID with either an uninitialized vLLM or non-model-owner"
)

if not self.cfg["vllm_cfg"]["async_engine"]:
raise RuntimeError(
"report_node_ip_and_gpu_id_async can only be used with async_engine=True. Use report_node_ip_and_gpu_id instead."
)

result_or_coro = await self.llm.collective_rpc("report_node_ip_and_gpu_id", args=tuple())
return cast(list[tuple[str, int]], result_or_coro)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Handle coroutine result from collective_rpc in report_node_ip_and_gpu_id_async

Unlike report_device_id_async and the async weight-update helpers, this method doesn’t unwrap a possible coroutine result from collective_rpc. If AsyncLLM.collective_rpc returns a coroutine here, callers will receive a coroutine instead of list[tuple[str, int]], and code that iterates the result will break.

You can mirror the existing pattern used in report_device_id_async:

Suggested fix
     async def report_node_ip_and_gpu_id_async(self) -> list[tuple[str, int]]:
         """Async version of report_node_ip_and_gpu_id."""
         assert self.llm is not None, (
             "Attempting to report node IP and GPU ID with either an uninitialized vLLM or non-model-owner"
         )

         if not self.cfg["vllm_cfg"]["async_engine"]:
             raise RuntimeError(
                 "report_node_ip_and_gpu_id_async can only be used with async_engine=True. Use report_node_ip_and_gpu_id instead."
             )

-        result_or_coro = await self.llm.collective_rpc("report_node_ip_and_gpu_id", args=tuple())
-        return cast(list[tuple[str, int]], result_or_coro)
+        result_or_coro = await self.llm.collective_rpc(
+            "report_node_ip_and_gpu_id", args=tuple()
+        )
+        if asyncio.iscoroutine(result_or_coro):
+            worker_results = await result_or_coro
+        else:
+            worker_results = result_or_coro
+        return cast(list[tuple[str, int]], worker_results)
🤖 Prompt for AI Agents
In nemo_rl/models/generation/vllm/vllm_worker_async.py around lines 1115 to
1127, report_node_ip_and_gpu_id_async currently awaits collective_rpc but does
not handle the case where collective_rpc returns a coroutine; mirror the pattern
used in report_device_id_async by checking if the result is a coroutine (e.g.,
using asyncio.iscoroutine or inspect.iscoroutine), await it if so, then cast and
return the final list[tuple[str, int]]; keep the existing assertions and
async_engine check intact.

Comment on lines 2402 to 2406
def report_node_ip_and_gpu_id(self) -> list[tuple[str, int]]:
"""Report the node IP and GPU ID of the current worker."""
ip = ray._private.services.get_node_ip_address()
gpu_id = int(ray.get_gpu_ids()[0])
return (ip, gpu_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix return type mismatch (tuple vs list) for report_node_ip_and_gpu_id

This method is annotated as returning list[tuple[str, int]] but actually returns a single (ip, gpu_id) tuple. Callers (e.g., LMPolicy.report_node_ip_and_gpu_id) expect each worker to return a single tuple, not a list, and the base worker already provides that behavior.

Consider either:

  • Dropping this override and using AbstractPolicyWorker.report_node_ip_and_gpu_id, or
  • Adjusting the signature and implementation here to return a tuple[str, int] so it is consistent with the worker-level contract and the aggregator’s expectations.

Copy link
Contributor

@terrykong terrykong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for fixing @guyueh1

at first glance this is quite a lot of boilerplate that we'd have to add in grpo.py and i'd like to discuss the design with you to understand if we can better hide/remove this complexity.

For instance, would our lives be made easier if we instead use https://docs.ray.io/en/latest/ray-core/scheduling/index.html#nodeaffinityschedulingstrategy and just do everything manually? This has the caveat of course of cutting off the possibility of autoscaling, but maybe that's okay since gpus don't magically appear, usually they're added deliberately so may not need control plane to automatically figure that out.

i haven't thought hard on if switching to ^ makes our life harder or easier for fault tolerance, so something also i'd like your opinion on

Signed-off-by: Guyue Huang <guyueh@nvidia.com>
@guyueh1 guyueh1 added the CI:L2 Run doctests, unit tests, functional tests, and convergence tests label Dec 18, 2025
@terrykong terrykong linked an issue Dec 18, 2025 that may be closed by this pull request
@guyueh1 guyueh1 changed the title feat: Order node by IP in GRPO refactor: Order node by IP in GRPO Dec 19, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CI:L2 Run doctests, unit tests, functional tests, and convergence tests GB200

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants