Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions nemo_run/core/execution/kubeflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ class KubeflowExecutor(Executor):
pod_labels: dict[str, Any] = field(default_factory=dict)
tolerations: list[dict[str, Any]] = field(default_factory=list)
affinity: dict[str, Any] = field(default_factory=dict)
# Shell commands run once per pod in launch.sh, BEFORE the training command
# (e.g. installing a missing dependency into the container venv). They run
# before torchrun spawns the per-GPU ranks, so each executes exactly once
# per node (not once per rank) and under errexit (a failure aborts the pod).
setup_commands: list[str] = field(default_factory=list)
# env_list accepts full env var dicts (e.g. valueFrom/secretKeyRef).
# Simple key=value pairs should use the inherited env_vars dict instead.
env_list: list[dict[str, Any]] = field(default_factory=list)
Expand Down Expand Up @@ -930,6 +935,7 @@ def materialize_launch_script(self, cmd: list[str], max_retries: int = 0) -> Non
env_vars=env_var_lines,
max_retries=max_retries,
code_dir=self.code_dir,
setup_commands=self.setup_commands,
)
os.makedirs(self.job_dir, exist_ok=True)
launch_script_path = os.path.join(self.job_dir, "launch.sh")
Expand Down Expand Up @@ -1055,6 +1061,28 @@ def pull_results(self, job_name: str, dest_dir: Optional[str] = None) -> None:

self.copy_from_workspace(self.code_dir, local_path, label=job_name)

def cleanup(self, handle: str) -> None:
"""Mirror run outputs from the workdir PVC back to ``job_dir``, then tear down.

On Kubernetes the training pods write every recipe output — including the
PyTorch profiler chrome trace and CUDA memory snapshot, which land under
``/nemo_run`` (the PVC ``code_dir``) — to the shared volume. The launcher
that collects artifacts and parses logs only sees the local ``job_dir``,
so without a copy-back those outputs are stranded on the PVC. Reuse the
existing data-mover (:meth:`pull_results`) to bring them back before
teardown. Best-effort: a failed pull must never break cleanup.

Args:
handle: The app handle ``<experiment>___<role>___<job_name>``.
"""
if self.workdir_pvc:
job_name = handle.split("___")[-1] if handle else ""
try:
self.pull_results(job_name)
except Exception as e:
logger.warning("pull_results during cleanup of '%s' failed: %s", handle, e)
super().cleanup(handle)

def _lookup_job_dir(self, job_name: str) -> str:
"""Look up the job_dir saved by the scheduler for *job_name*."""
try:
Expand Down
6 changes: 6 additions & 0 deletions nemo_run/core/execution/templates/kubeflow.sh.j2
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ export TORCHX_MAX_RETRIES={{max_retries}}

# Symlink /nemo_run → code_dir so pod-side paths like /nemo_run/scripts/... resolve correctly.
ln -sfn {{code_dir}} /nemo_run
{% if setup_commands %}
echo "Running setup commands..."
{%- for setup_command in setup_commands %}
{{setup_command}}
{%- endfor %}
{%- endif %}

echo "Starting training command..."
set +e
Expand Down
13 changes: 13 additions & 0 deletions test/core/execution/test_kubeflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,19 @@ def test_pull_results_noop_without_workdir_pvc(self, mock_k8s_clients):
e.pull_results("test-job")
mock_core.create_namespaced_pod.assert_not_called()

def test_cleanup_pulls_results_when_workdir_pvc(self, workdir_executor):
# cleanup() mirrors PVC outputs (incl. profiler traces) back to job_dir
# via pull_results, keying off the <exp>___<role>___<job_name> handle.
with patch.object(workdir_executor, "pull_results") as mock_pull:
workdir_executor.cleanup("exp-id___trainer___test-job")
mock_pull.assert_called_once_with("test-job")

def test_cleanup_noop_without_workdir_pvc(self):
e = KubeflowExecutor(image="test:latest")
with patch.object(e, "pull_results") as mock_pull:
e.cleanup("exp-id___trainer___test-job")
mock_pull.assert_not_called()

def test_data_mover_pod_inherits_tolerations_affinity_pull_secrets(
self, mock_k8s_clients, tmp_path
):
Expand Down
Loading