Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ mount_as = "/opt/Megatron-Bridge"
[cmd_args]
gpu_type = "b200"
container_image = "nvcr.io#nvidia/nemo:25.11.01"
model_name = "qwen3"
model_size = "30b_a3b"
model_family_name = "qwen3"
model_recipe_name = "30b_a3b"
gpus_per_node = 4
num_gpus = 8
domain = "llm"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ mount_as = "/opt/Megatron-Bridge"
[cmd_args]
gpu_type = "gb200"
container_image = "nvcr.io#nvidia/nemo:25.11.01"
model_name = "qwen3"
model_size = "30b_a3b"
model_family_name = "qwen3"
model_recipe_name = "30b_a3b"
gpus_per_node = 4
num_gpus = 8
domain = "llm"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ mount_as = "/opt/Megatron-Bridge"
[cmd_args]
gpu_type = "gb300"
container_image = "nvcr.io#nvidia/nemo:25.11.01"
model_name = "qwen3"
model_size = "30b_a3b"
model_family_name = "qwen3"
model_recipe_name = "30b_a3b"
gpus_per_node = 4
num_gpus = 8
domain = "llm"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ mount_as = "/opt/Megatron-Bridge"
[cmd_args]
gpu_type = "h100"
container_image = "nvcr.io#nvidia/nemo:25.11.01"
model_name = "qwen3"
model_size = "30b_a3b"
model_family_name = "qwen3"
model_recipe_name = "30b_a3b"
gpus_per_node = 8
num_gpus = 16
domain = "llm"
Expand Down
8 changes: 4 additions & 4 deletions doc/workloads/megatron_bridge.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ Test TOML example:
# Container can be an NGC/enroot URL (nvcr.io#...) or a local .sqsh path.
container_image = "nvcr.io#nvidia/nemo:25.11.01"

model_name = "qwen3"
model_size = "30b_a3b"
model_family_name = "qwen3"
model_recipe_name = "30b_a3b"
task = "pretrain"
domain = "llm"
compute_dtype = "fp8_mx"
Expand Down Expand Up @@ -55,8 +55,8 @@ Test-in-Scenario example:

[Tests.cmd_args]
container_image = "nvcr.io#nvidia/nemo:25.11.01"
model_name = "qwen3"
model_size = "30b_a3b"
model_family_name = "qwen3"
model_recipe_name = "30b_a3b"
task = "pretrain"
domain = "llm"
compute_dtype = "fp8_mx"
Expand Down
54 changes: 48 additions & 6 deletions src/cloudai/workloads/megatron_bridge/megatron_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,27 @@ class MegatronBridgeCmdArgs(CmdArgs):
detach: Optional[bool] = Field(default=None)

# Model/task
model_name: str = Field(default="")
model_size: str = Field(default="")
domain: str = Field(default="llm")
model_family_name: str = Field(default="")
model_recipe_name: str = Field(default="")
use_recipes: Optional[bool] = Field(default=None)
task: str = Field(default="pretrain")
compute_dtype: str = Field(default="bf16")
fp8_recipe: Optional[str] = Field(default=None)
hf_token: Optional[str] = Field(default=None)
nemo_home: Optional[str] = Field(default=None)
wandb_key: Optional[str] = Field(default=None)
wandb_prj_name: Optional[str] = Field(default=None)
wandb_exp_name: Optional[str] = Field(default=None)
wandb_project_name: Optional[str] = Field(default=None)
wandb_entity_name: Optional[str] = Field(default=None)
wandb_experiment_name: Optional[str] = Field(default=None)
wandb_save_dir: Optional[str] = Field(default=None)

# Retries
max_retries: Optional[int] = Field(default=None)

# Feature flags (allow sweeps)
use_tokendrop: Optional[Union[bool, List[bool]]] = Field(default=None)
use_megatron_fsdp: Optional[Union[bool, List[bool]]] = Field(default=None)
cuda_graph_impl: Optional[str] = Field(default=None)
cuda_graph_impl: Optional[Union[str, List[str]]] = Field(default=None)
cuda_graph_scope: Optional[Union[str, List[str]]] = Field(default=None)

# Parallelism
Expand All @@ -69,6 +74,43 @@ class MegatronBridgeCmdArgs(CmdArgs):
# Batch sizes
mb: Optional[Union[int, List[int]]] = Field(default=None)
gb: Optional[Union[int, List[int]]] = Field(default=None)
seq_length: Optional[Union[int, List[int]]] = Field(default=None)

# Optimizer
lr: Optional[Union[float, List[float]]] = Field(default=None)
min_lr: Optional[Union[float, List[float]]] = Field(default=None)
warmup_iters: Optional[Union[int, List[int]]] = Field(default=None)

# Checkpointing
pretrained_checkpoint: Optional[str] = Field(default=None)
save_dir: Optional[str] = Field(default=None)
load_dir: Optional[str] = Field(default=None)
save_interval: Optional[int] = Field(default=None)
most_recent_k: Optional[int] = Field(default=None)
save_config_filepath: Optional[str] = Field(default=None)

# Data / Tokenizer
data: Optional[str] = Field(default=None)
dataset_paths: Optional[Union[str, List[str]]] = Field(default=None)
dataset_root: Optional[str] = Field(default=None)
index_mapping_dir: Optional[str] = Field(default=None)
dataset_name: Optional[str] = Field(default=None)
packed_sequence: Optional[bool] = Field(default=None)
head_only: Optional[bool] = Field(default=None)
tokenizer_type: Optional[str] = Field(default=None)
tokenizer_model: Optional[str] = Field(default=None)
vocab_size: Optional[int] = Field(default=None)

# Profiling (performance group in argument_parser.py)
pytorch_profiler: Optional[bool] = Field(default=None)
profiling_start_step: Optional[int] = Field(default=None)
profiling_stop_step: Optional[int] = Field(default=None)
record_memory_history: Optional[bool] = Field(default=None)
profiling_gpu_metrics: Optional[bool] = Field(default=None)
profiling_ranks: Optional[Union[int, List[int]]] = Field(default=None)

# Performance
nccl_ub: Optional[Union[bool, List[bool]]] = Field(default=None)

# Perf/tuning
moe_a2a_overlap: Optional[Union[bool, List[bool]]] = Field(default=None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ class MegatronBridgeReportGenerationStrategy(ReportGenerationStrategy):
metrics: ClassVar[list[str]] = ["default", "step-time", "tflops-per-gpu"]

def get_log_file(self) -> Path | None:
log = self.test_run.output_path / "megatron_bridge_launcher.log"
log = self.test_run.output_path / "cloudai_megatron_bridge_launcher.log"
return log if log.is_file() else None

@property
def results_file(self) -> Path:
return self.get_log_file() or (self.test_run.output_path / "megatron_bridge_launcher.log")
return self.get_log_file() or (self.test_run.output_path / "cloudai_megatron_bridge_launcher.log")

def can_handle_directory(self) -> bool:
return self.get_log_file() is not None
Expand Down Expand Up @@ -75,8 +75,8 @@ def generate_report(self) -> None:
log_file, step_times_s, gpu_tflops = self._get_extracted_data()
if not log_file:
logging.error(
"No Megatron-Bridge launcher log file found: %s",
self.test_run.output_path / "megatron_bridge_launcher.log",
"No Megatron-Bridge launcher log file found in: %s",
self.test_run.output_path,
)
return

Expand Down Expand Up @@ -130,8 +130,8 @@ def get_metric(self, metric: str) -> float:
log_file, step_times_s, gpu_tflops = self._get_extracted_data()
if not log_file:
logging.error(
"No Megatron-Bridge launcher log file found: %s",
self.test_run.output_path / "megatron_bridge_launcher.log",
"No Megatron-Bridge launcher log file found in: %s",
self.test_run.output_path,
)
return METRIC_ERROR
if not step_times_s:
Expand Down
104 changes: 83 additions & 21 deletions src/cloudai/workloads/megatron_bridge/slurm_command_gen_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,28 +116,41 @@ def _wrap_launcher_for_job_id_and_quiet_output(self, launcher_cmd: str) -> str:

script_lines = [
"#!/usr/bin/env bash",
"set -euo pipefail",
"set -uo pipefail",
"",
f'export NEMORUN_HOME="{output_dir}"',
'mkdir -p "$NEMORUN_HOME"',
f'LOG="{log_path}"',
f'WRAPPER_STDOUT="{output_dir / "cloudai_megatron_bridge_wrapper.stdout"}"',
f'WRAPPER_STDERR="{output_dir / "cloudai_megatron_bridge_wrapper.stderr"}"',
# Mirror wrapper stdout/stderr to files for debugging while still emitting to the parent process.
'exec > >(tee -a "$WRAPPER_STDOUT") 2> >(tee -a "$WRAPPER_STDERR" >&2)',
"",
# Launch Megatron-Bridge (log stdout/stderr to file)
"",
': >"$LOG"',
f'{launcher_cmd} >>"$LOG" 2>&1',
"LAUNCH_RC=0",
f"{launcher_cmd} >>\"$LOG\" 2>&1 || LAUNCH_RC=$?",
"",
# Parse job id from Megatron-Bridge output (format: 'Job id: <num>')
# Parse job id from Megatron-Bridge output (multiple possible formats)
# Patterns: "Job id: 694112", "- Job id: 694112", "Job ID: 694112"
"",
'JOB_ID=""',
'JOB_ID=$(grep -Eio "Job id[: ]+[0-9]+" "$LOG" | tail -n1 | grep -Eo "[0-9]+" | tail -n1 || true)',
'JOB_ID=$(grep -Eio "(Job id[: ]+[0-9]+|-[ ]*Job id[: ]+[0-9]+)" "$LOG" | tail -n1 | grep -Eo "[0-9]+" | tail -n1 || true)',
"",
# Emit a canonical line for CloudAI to parse
"",
'if [ -n "${JOB_ID}" ]; then',
' if [ "${LAUNCH_RC}" -ne 0 ]; then',
' echo "Megatron-Bridge launcher exited non-zero (${LAUNCH_RC}) after submitting job ${JOB_ID}." >&2',
' tail -n 200 "$LOG" >&2 || true',
" fi",
' echo "Submitted batch job ${JOB_ID}"',
"else",
' echo "Failed to retrieve job ID." >&2',
' if [ "${LAUNCH_RC}" -ne 0 ]; then',
' echo "Launcher exit code: ${LAUNCH_RC}" >&2',
" fi",
' tail -n 200 "$LOG" >&2 || true',
" exit 1",
"fi",
Expand All @@ -154,8 +167,8 @@ def _build_launcher_parts( # noqa: C901
) -> list[str]:
fields_set = args.model_fields_set
force_fields = {
"model_name",
"model_size",
"model_family_name",
"model_recipe_name",
"num_gpus",
"gpus_per_node",
"hf_token",
Expand Down Expand Up @@ -198,6 +211,15 @@ def add(flag: str, value: Any) -> None:
return
if isinstance(value, bool):
parts.extend([flag, "true" if value else "false"])
elif isinstance(value, (list, tuple)):
if not value:
return
if flag == "--dataset_paths":
parts.extend([flag, *[str(x) for x in value]])
elif flag == "--profiling_ranks":
parts.extend([flag, ",".join(str(x) for x in value)])
else:
parts.extend([flag, str(value[0]) if len(value) == 1 else ",".join(str(x) for x in value)])
else:
sv = str(value)
if sv != "":
Expand All @@ -222,31 +244,37 @@ def add_field(field: str, flag: str, value: Any) -> None:
add_field("hf_token", "-hf", args.hf_token)
add_field("nemo_home", "-nh", args.nemo_home)
add_field("wandb_key", "-wdk", args.wandb_key)
add_field("wandb_prj_name", "-wdp", args.wandb_prj_name)
add_field("wandb_exp_name", "-wdj", args.wandb_exp_name)
add_field("wandb_project_name", "-wdp", args.wandb_project_name)
add_field("wandb_entity_name", "-wde", args.wandb_entity_name)
add_field("wandb_experiment_name", "-wdj", args.wandb_experiment_name)
add_field("wandb_save_dir", "-wds", args.wandb_save_dir)
add_field("max_retries", "--max_retries", args.max_retries)
if args.dryrun and "dryrun" in fields_set:
parts.append("-d")
add_field("num_gpus", "-ng", args.num_gpus)
add_field("gpus_per_node", "-gn", args.gpus_per_node)
if mounts:
add("-cm", ",".join(mounts))

# Model flags (Megatron-Bridge r0.2.0 API)
# Model flags (Megatron-Bridge main-branch API)
if args.use_recipes and "use_recipes" in fields_set:
parts.append("--use_recipes")
if "enable_vboost" in fields_set:
add_field("enable_vboost", "-vb", bool(args.enable_vboost))
if not args.model_name:
raise RuntimeError("Missing required cmd_args.model_name (maps to -m/--model_name).")
if not args.model_size:
raise RuntimeError("Missing required cmd_args.model_size (maps to -s/--model_size).")
add_field("model_name", "-m", args.model_name)
add_field("model_size", "-s", args.model_size)
if not args.model_family_name:
raise RuntimeError("Missing required cmd_args.model_family_name (maps to -m/--model_family_name).")
if not args.model_recipe_name:
raise RuntimeError("Missing required cmd_args.model_recipe_name (maps to -mr/--model_recipe_name).")
add_field("model_family_name", "-m", args.model_family_name)
add_field("model_recipe_name", "-mr", args.model_recipe_name)
if args.enable_nsys and "enable_nsys" in fields_set:
parts.append("-en")
add_field("domain", "--domain", args.domain)
if "use_tokendrop" in fields_set and args.use_tokendrop is not None:
add_field("use_tokendrop", "--use_tokendrop", bool(args.use_tokendrop))
if "use_megatron_fsdp" in fields_set and args.use_megatron_fsdp is not None:
add_field("use_megatron_fsdp", "--use_megatron_fsdp", bool(args.use_megatron_fsdp))
if "nccl_ub" in fields_set and args.nccl_ub is not None:
add_field("nccl_ub", "--nccl_ub", bool(args.nccl_ub))
add_field("cuda_graph_impl", "--cuda_graph_impl", args.cuda_graph_impl)
if args.cuda_graph_scope and "cuda_graph_scope" in fields_set:
add_field(
Expand All @@ -264,6 +292,7 @@ def add_field(field: str, flag: str, value: Any) -> None:
# Batch
add_field("mb", "-mb", args.mb)
add_field("gb", "-gb", args.gb)
add_field("seq_length", "-sl", args.seq_length)

# Misc
if "moe_a2a_overlap" in fields_set:
Expand All @@ -273,11 +302,44 @@ def add_field(field: str, flag: str, value: Any) -> None:
add_field("activation_offload_layers", "-ol", args.activation_offload_layers)
if args.recompute_modules and "recompute_modules" in fields_set:
parts.extend(["--recompute_modules", self._normalize_recompute_modules(args.recompute_modules)])
# r0.2.0 supports `--detach` / `--no-detach` flags (no boolean value)
if args.detach is True and "detach" in fields_set:
parts.append("--detach")
elif args.detach is False and "detach" in fields_set:
parts.append("--no-detach")
if "detach" in fields_set and args.detach is not None:
parts.extend(["--detach", "true" if args.detach else "false"])

# Optimizer
add_field("lr", "--lr", args.lr)
add_field("min_lr", "--min_lr", args.min_lr)
add_field("warmup_iters", "--warmup_iters", args.warmup_iters)

# Checkpointing
add_field("pretrained_checkpoint", "--pretrained_checkpoint", args.pretrained_checkpoint)
add_field("save_dir", "--save_dir", args.save_dir)
add_field("load_dir", "--load_dir", args.load_dir)
add_field("save_interval", "--save_interval", args.save_interval)
add_field("most_recent_k", "--most_recent_k", args.most_recent_k)
add_field("save_config_filepath", "--save_config_filepath", args.save_config_filepath)

# Data / Tokenizer
add_field("data", "--data", args.data)
add_field("dataset_paths", "--dataset_paths", args.dataset_paths)
add_field("dataset_root", "--dataset_root", args.dataset_root)
add_field("index_mapping_dir", "--index_mapping_dir", args.index_mapping_dir)
add_field("dataset_name", "--dataset_name", args.dataset_name)
if args.packed_sequence and "packed_sequence" in fields_set:
parts.append("--packed_sequence")
if args.head_only and "head_only" in fields_set:
parts.append("--head_only")
add_field("tokenizer_type", "--tokenizer_type", args.tokenizer_type)
add_field("tokenizer_model", "--tokenizer_model", args.tokenizer_model)
add_field("vocab_size", "--vocab_size", args.vocab_size)

# Profiling (performance group)
add_field("pytorch_profiler", "-pyp", args.pytorch_profiler)
add_field("profiling_start_step", "--profiling_start_step", args.profiling_start_step)
add_field("profiling_stop_step", "--profiling_stop_step", args.profiling_stop_step)
add_field("record_memory_history", "-mh", args.record_memory_history)
if args.profiling_gpu_metrics and "profiling_gpu_metrics" in fields_set:
parts.append("--profiling_gpu_metrics")
add_field("profiling_ranks", "--profiling_ranks", args.profiling_ranks)

# Extra user args (dict -> string)
if tdef.extra_cmd_args:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def mb_tr(tmp_path: Path) -> TestRun:
"",
]
)
(tr.output_path / "megatron_bridge_launcher.log").write_text(log_content)
(tr.output_path / "cloudai_megatron_bridge_launcher.log").write_text(log_content)
return tr


Expand Down
Loading