diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index b8b64528..0b6a846f 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -21,6 +21,11 @@ logger = logging.getLogger(__name__) +def _default_custom_spec() -> dict[str, Any]: + """Default custom_spec with TTL configuration for automatic pod cleanup.""" + return {"ttlSecondsAfterFinished": 3600} + + class DGXCloudState(Enum): CREATING = "Creating" INITIALIZING = "Initializing" @@ -62,7 +67,7 @@ class DGXCloudExecutor(Executor): pvc_job_dir: str = field(init=False, default="") pvcs: list[dict[str, Any]] = field(default_factory=list) distributed_framework: str = "PyTorch" - custom_spec: dict[str, Any] = field(default_factory=dict) + custom_spec: dict[str, Any] = field(default_factory=_default_custom_spec) def get_auth_token(self) -> Optional[str]: url = f"{self.base_url}/token" diff --git a/nemo_run/run/experiment.py b/nemo_run/run/experiment.py index 67c2f50f..cd569037 100644 --- a/nemo_run/run/experiment.py +++ b/nemo_run/run/experiment.py @@ -636,7 +636,7 @@ def run( If sequential=True, all tasks will be run one after the other. The order is based on the order in which they were added. - Parallel mode only works if all exectuors in the experiment support it. + Parallel mode only works if all executors in the experiment support it. Currently, all executors support parallel mode. In sequential mode, if all executor supports dependencies, then all tasks will be scheduled at once diff --git a/test/core/execution/test_dgxcloud.py b/test/core/execution/test_dgxcloud.py index 4d431e3c..c6bcc5ac 100644 --- a/test/core/execution/test_dgxcloud.py +++ b/test/core/execution/test_dgxcloud.py @@ -37,6 +37,7 @@ def test_init(self): gpus_per_node=8, pvc_nemo_run_dir="/workspace/nemo_run", pvcs=[{"path": "/workspace", "claimName": "test-claim"}], + custom_spec={"ttlSecondsAfterFinished": 7200}, ) assert executor.base_url == "https://dgxapi.example.com" @@ -48,8 +49,59 @@ def test_init(self): assert executor.gpus_per_node == 8 assert executor.pvcs == [{"path": "/workspace", "claimName": "test-claim"}] assert executor.distributed_framework == "PyTorch" + assert executor.custom_spec["ttlSecondsAfterFinished"] == 7200 assert executor.pvc_nemo_run_dir == "/workspace/nemo_run" + def test_init_default_ttl(self): + """Test that DGXCloudExecutor has default TTL when not specified in custom_spec""" + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + ) + + # Should have default TTL of 3600 seconds (1 hour) + assert executor.custom_spec == {"ttlSecondsAfterFinished": 3600} + + def test_init_custom_spec_with_other_fields(self): + """Test that DGXCloudExecutor can have TTL alongside other custom_spec fields""" + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + custom_spec={ + "ttlSecondsAfterFinished": 7200, + "activeDeadlineSeconds": 14400, + "restartPolicy": "Never", + }, + ) + + # Should have all custom_spec fields + assert executor.custom_spec["ttlSecondsAfterFinished"] == 7200 + assert executor.custom_spec["activeDeadlineSeconds"] == 14400 + assert executor.custom_spec["restartPolicy"] == "Never" + + def test_init_override_default_ttl(self): + """Test that DGXCloudExecutor can override default TTL with custom_spec""" + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + custom_spec={"restartPolicy": "OnFailure"}, # No TTL specified + ) + + # Should only have the specified custom_spec field, no default TTL + assert executor.custom_spec == {"restartPolicy": "OnFailure"} + @patch("requests.post") def test_get_auth_token_success(self, mock_post): mock_response = MagicMock() diff --git a/test/run/torchx_backend/schedulers/test_dgxcloud.py b/test/run/torchx_backend/schedulers/test_dgxcloud.py index 5a3f9ae9..d73c8dc5 100644 --- a/test/run/torchx_backend/schedulers/test_dgxcloud.py +++ b/test/run/torchx_backend/schedulers/test_dgxcloud.py @@ -44,6 +44,7 @@ def dgx_cloud_executor(): container_image="nvcr.io/nvidia/test:latest", pvc_nemo_run_dir="/workspace/nemo_run", job_dir=tempfile.mkdtemp(), + custom_spec={"ttlSecondsAfterFinished": 7200}, ) @@ -160,3 +161,50 @@ def test_save_and_get_job_dirs(): assert "test_app_id" in job_dirs assert isinstance(job_dirs["test_app_id"]["executor"], DGXCloudExecutor) + + +def test_dgx_cloud_executor_ttl_configuration(): + """Test that DGXCloudExecutor properly handles TTL configuration via custom_spec""" + # Test with custom TTL in custom_spec + executor_with_ttl = DGXCloudExecutor( + base_url="https://dgx.example.com", + app_id="test_app_id", + app_secret="test_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + job_dir=tempfile.mkdtemp(), + custom_spec={"ttlSecondsAfterFinished": 7200}, + ) + assert executor_with_ttl.custom_spec["ttlSecondsAfterFinished"] == 7200 + + # Test with default TTL (should have default 3600 seconds) + executor_default_ttl = DGXCloudExecutor( + base_url="https://dgx.example.com", + app_id="test_app_id", + app_secret="test_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + job_dir=tempfile.mkdtemp(), + ) + assert executor_default_ttl.custom_spec == {"ttlSecondsAfterFinished": 3600} + + # Test with TTL and other custom_spec fields + executor_mixed_spec = DGXCloudExecutor( + base_url="https://dgx.example.com", + app_id="test_app_id", + app_secret="test_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + job_dir=tempfile.mkdtemp(), + custom_spec={ + "ttlSecondsAfterFinished": 3600, + "activeDeadlineSeconds": 7200, + "restartPolicy": "OnFailure", + }, + ) + assert executor_mixed_spec.custom_spec["ttlSecondsAfterFinished"] == 3600 + assert executor_mixed_spec.custom_spec["activeDeadlineSeconds"] == 7200 + assert executor_mixed_spec.custom_spec["restartPolicy"] == "OnFailure"