From 8a0d9838eb2336e7cf30b2a8de4e18927b4a6cea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Fri, 12 Jun 2026 12:19:40 +0000 Subject: [PATCH 1/2] feat(kubeflow): add setup_commands run once per pod before the job MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit KubeflowExecutor.setup_commands is a list of shell commands rendered into the generated launch.sh between the /nemo_run symlink and the training command. launch.sh runs once per pod before torchrun spawns the per-GPU ranks, so each setup command executes exactly once per node (not per rank), under errexit. Use case: install a dependency missing from the container image into the container venv (e.g. a broken release-candidate image) without rebuilding, when running synced code via the workdir data-mover. Signed-off-by: oliver könig --- nemo_run/core/execution/kubeflow.py | 6 ++++++ nemo_run/core/execution/templates/kubeflow.sh.j2 | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/nemo_run/core/execution/kubeflow.py b/nemo_run/core/execution/kubeflow.py index 525a7c0d..69ed3fd1 100644 --- a/nemo_run/core/execution/kubeflow.py +++ b/nemo_run/core/execution/kubeflow.py @@ -100,6 +100,11 @@ class KubeflowExecutor(Executor): pod_labels: dict[str, Any] = field(default_factory=dict) tolerations: list[dict[str, Any]] = field(default_factory=list) affinity: dict[str, Any] = field(default_factory=dict) + # Shell commands run once per pod in launch.sh, BEFORE the training command + # (e.g. installing a missing dependency into the container venv). They run + # before torchrun spawns the per-GPU ranks, so each executes exactly once + # per node (not once per rank) and under errexit (a failure aborts the pod). + setup_commands: list[str] = field(default_factory=list) # env_list accepts full env var dicts (e.g. valueFrom/secretKeyRef). # Simple key=value pairs should use the inherited env_vars dict instead. env_list: list[dict[str, Any]] = field(default_factory=list) @@ -930,6 +935,7 @@ def materialize_launch_script(self, cmd: list[str], max_retries: int = 0) -> Non env_vars=env_var_lines, max_retries=max_retries, code_dir=self.code_dir, + setup_commands=self.setup_commands, ) os.makedirs(self.job_dir, exist_ok=True) launch_script_path = os.path.join(self.job_dir, "launch.sh") diff --git a/nemo_run/core/execution/templates/kubeflow.sh.j2 b/nemo_run/core/execution/templates/kubeflow.sh.j2 index 24529d7c..248da146 100644 --- a/nemo_run/core/execution/templates/kubeflow.sh.j2 +++ b/nemo_run/core/execution/templates/kubeflow.sh.j2 @@ -12,6 +12,12 @@ export TORCHX_MAX_RETRIES={{max_retries}} # Symlink /nemo_run → code_dir so pod-side paths like /nemo_run/scripts/... resolve correctly. ln -sfn {{code_dir}} /nemo_run +{% if setup_commands %} +echo "Running setup commands..." +{%- for setup_command in setup_commands %} +{{setup_command}} +{%- endfor %} +{%- endif %} echo "Starting training command..." set +e From 51846152b7a650706ec346bc4ef7217c4b2c1ce0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Sat, 13 Jun 2026 17:01:42 +0000 Subject: [PATCH 2/2] feat(kubeflow): copy workdir-PVC outputs back to job_dir on cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit On Kubernetes the training pods write every recipe output to the shared workdir PVC, including the PyTorch profiler chrome trace and CUDA memory snapshot which land under /nemo_run (the PVC code_dir). The launcher that collects artifacts and parses logs only sees the local job_dir, so without a copy-back those outputs are stranded on the PVC and never reach CI artifacts. Override KubeflowExecutor.cleanup() to reuse the existing pull_results() data-mover, mirroring code_dir back to job_dir before teardown. Best-effort: a failed pull never breaks cleanup. Gated on workdir_pvc, so non-PVC (slurm/local) runs are unaffected. Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: oliver könig --- nemo_run/core/execution/kubeflow.py | 22 ++++++++++++++++++++++ test/core/execution/test_kubeflow.py | 13 +++++++++++++ 2 files changed, 35 insertions(+) diff --git a/nemo_run/core/execution/kubeflow.py b/nemo_run/core/execution/kubeflow.py index 69ed3fd1..11f4fe3c 100644 --- a/nemo_run/core/execution/kubeflow.py +++ b/nemo_run/core/execution/kubeflow.py @@ -1061,6 +1061,28 @@ def pull_results(self, job_name: str, dest_dir: Optional[str] = None) -> None: self.copy_from_workspace(self.code_dir, local_path, label=job_name) + def cleanup(self, handle: str) -> None: + """Mirror run outputs from the workdir PVC back to ``job_dir``, then tear down. + + On Kubernetes the training pods write every recipe output — including the + PyTorch profiler chrome trace and CUDA memory snapshot, which land under + ``/nemo_run`` (the PVC ``code_dir``) — to the shared volume. The launcher + that collects artifacts and parses logs only sees the local ``job_dir``, + so without a copy-back those outputs are stranded on the PVC. Reuse the + existing data-mover (:meth:`pull_results`) to bring them back before + teardown. Best-effort: a failed pull must never break cleanup. + + Args: + handle: The app handle ``______``. + """ + if self.workdir_pvc: + job_name = handle.split("___")[-1] if handle else "" + try: + self.pull_results(job_name) + except Exception as e: + logger.warning("pull_results during cleanup of '%s' failed: %s", handle, e) + super().cleanup(handle) + def _lookup_job_dir(self, job_name: str) -> str: """Look up the job_dir saved by the scheduler for *job_name*.""" try: diff --git a/test/core/execution/test_kubeflow.py b/test/core/execution/test_kubeflow.py index 8babcf57..42011d11 100644 --- a/test/core/execution/test_kubeflow.py +++ b/test/core/execution/test_kubeflow.py @@ -503,6 +503,19 @@ def test_pull_results_noop_without_workdir_pvc(self, mock_k8s_clients): e.pull_results("test-job") mock_core.create_namespaced_pod.assert_not_called() + def test_cleanup_pulls_results_when_workdir_pvc(self, workdir_executor): + # cleanup() mirrors PVC outputs (incl. profiler traces) back to job_dir + # via pull_results, keying off the ______ handle. + with patch.object(workdir_executor, "pull_results") as mock_pull: + workdir_executor.cleanup("exp-id___trainer___test-job") + mock_pull.assert_called_once_with("test-job") + + def test_cleanup_noop_without_workdir_pvc(self): + e = KubeflowExecutor(image="test:latest") + with patch.object(e, "pull_results") as mock_pull: + e.cleanup("exp-id___trainer___test-job") + mock_pull.assert_not_called() + def test_data_mover_pod_inherits_tolerations_affinity_pull_secrets( self, mock_k8s_clients, tmp_path ):