From 085a791a308942baa17015b2fe1a62beb1e0dbd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Wed, 22 Apr 2026 10:14:10 +0000 Subject: [PATCH 1/4] docs: update KubeflowExecutor guide to TrainJob v2 only MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove the outdated PyTorchJob (v1) example and job_kind parameter references — KubeflowExecutor now only supports TrainJob (trainer.kubeflow.org/v1alpha1). Replace the two-snippet example with a single, comprehensive configuration derived from the real-world local/real_trainjob.py, covering env_list, tolerations, volumes, workdir_pvc, and image_pull_secrets. Add an advanced-options table for less-common parameters (nprocs_per_node, extra_resource_requests, pod_spec_overrides, container_kwargs, workdir_local_path). Signed-off-by: Oliver Koenig Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: oliver könig --- docs/guides/execution.md | 80 ++++++++++++++++++++++++++-------------- 1 file changed, 53 insertions(+), 27 deletions(-) diff --git a/docs/guides/execution.md b/docs/guides/execution.md index 77ef25e0..35dd393b 100644 --- a/docs/guides/execution.md +++ b/docs/guides/execution.md @@ -296,50 +296,76 @@ For a complete end-to-end example using DGX Cloud with NeMo, refer to the [NVIDI #### KubeflowExecutor -The `KubeflowExecutor` integrates with the [Kubeflow Training Operator](https://github.com/kubeflow/training-operator) to run distributed training jobs on any Kubernetes cluster. It submits CRDs directly via the Kubernetes API — no `kubectl` required. - -Two job kinds are supported via the `job_kind` parameter: - -- **`"PyTorchJob"`** (default) — Training Operator v1 (`kubeflow.org/v1`) -- **`"TrainJob"`** — Training Operator v2 (`trainer.kubeflow.org/v1alpha1`) +The `KubeflowExecutor` integrates with the [Kubeflow Training Operator v2](https://github.com/kubeflow/training-operator) to run distributed training jobs on any Kubernetes cluster. It submits `TrainJob` CRDs (`trainer.kubeflow.org/v1alpha1`) directly via the Kubernetes API — no `kubectl` required. Kubernetes configuration is loaded automatically: local kubeconfig is tried first, falling back to in-cluster config when running inside a pod. +A `ClusterTrainingRuntime` named `runtime_ref` must exist in the target namespace; `"torch-distributed"` is the conventional name for PyTorch distributed workloads. + Here's an example configuration: ```python -# PyTorchJob (default) -executor = run.KubeflowExecutor( - namespace="runai-nemo-ci", - image="nvcr.io/nvidia/nemo:26.02", - num_nodes=3, # total pods: 1 Master + (num_nodes-1) Workers - gpus_per_node=8, # also sets nproc_per_node unless overridden explicitly +import nemo_run as run +from nemo_run.core.execution.kubeflow import KubeflowExecutor + +executor = KubeflowExecutor( + launcher=run.Torchrun(), + runtime_ref="torch-distributed", # ClusterTrainingRuntime in your cluster + namespace="my-namespace", + image="nvcr.io/nvidia/nemo:25.04", + num_nodes=4, + gpus_per_node=8, cpu_requests="16", memory_requests="64Gi", + image_pull_secrets=["ngc-registry-secret"], + # Simple key=value env vars + env_vars={ + "NCCL_DEBUG": "INFO", + "PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True", + }, + # Full env var dicts — use for secretKeyRef, fieldRef, etc. + env_list=[ + { + "name": "WANDB_API_KEY", + "valueFrom": {"secretKeyRef": {"name": "my-secrets", "key": "WANDB_API_KEY"}}, + }, + ], + labels={"app": "my-training-job"}, + tolerations=[ + {"effect": "NoSchedule", "key": "nvidia.com/gpu", "operator": "Exists"}, + ], volumes=[ - {"name": "model-cache", "persistentVolumeClaim": {"claimName": "data-pvc"}} + {"name": "dshm", "emptyDir": {"medium": "Memory"}}, + {"name": "model-cache", "persistentVolumeClaim": {"claimName": "model-cache"}}, ], - volume_mounts=[{"name": "model-cache", "mountPath": "/nemo-workspace"}], - labels={"app": "nemo-ci-training"}, - env_vars={"NCCL_DEBUG": "INFO"}, -) - -# TrainJob (Training Operator v2) -executor = run.KubeflowExecutor( - job_kind="TrainJob", - runtime_ref="torch-distributed", # name of the ClusterTrainingRuntime - namespace="runai-nemo-ci", - image="nvcr.io/nvidia/nemo:26.02", - num_nodes=3, - gpus_per_node=8, + volume_mounts=[ + {"name": "dshm", "mountPath": "/dev/shm"}, + {"name": "model-cache", "mountPath": "/nemo-workspace"}, + ], + # Sync the generated launch script to the pod before launch, + # and pull results back after the job completes. + workdir_pvc="model-cache", + workdir_pvc_path="/nemo-workspace", ) ``` `cancel(wait=True)` polls until both the CR and all associated pods are fully terminated before returning. +##### Advanced options + +| Parameter | Purpose | +|-----------|---------| +| `nprocs_per_node` | Override processes per node; defaults to `gpus_per_node` when unset | +| `extra_resource_requests` / `extra_resource_limits` | Non-GPU extended resources, e.g. `{"vpc.amazonaws.com/efa": "32"}` for AWS EFA NICs | +| `pod_spec_overrides` | Merge arbitrary fields into `podTemplateOverrides[].spec`, e.g. `{"nodeSelector": {...}}` | +| `container_kwargs` | Extra container-level fields, e.g. `{"securityContext": {"privileged": True}}` | +| `workdir_local_path` | Local directory merged into the job dir before PVC sync — useful for hand-written scripts not managed by the packager | +| `annotations` | Kubernetes annotations added to the `TrainJob` CR | +| `affinity` | Pod scheduling affinity rules | + ##### Limitations -Attributes like `resourceClaims` are not [supported](https://github.com/kubeflow/trainer/issues/3264) and must be injected in different ways, like by Mutating Webhooks. +Attributes like `resourceClaims` are not [supported](https://github.com/kubeflow/trainer/issues/3264) natively and must be injected via Mutating Webhooks or `pod_spec_overrides`. #### LeptonExecutor From 6f349c0cb2aa65f1f0a26dd2919bf8c572391e57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Wed, 22 Apr 2026 10:18:25 +0000 Subject: [PATCH 2/4] docs: add KubeflowExecutor e2e example and link from guide MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add examples/kubeflow/hello_kubeflow.py — a self-contained script that shows the complete executor setup (Torchrun launcher, env_list, tolerations, dshm volume, PVC workdir sync) with CLI flags for namespace, image, node count, and PVC name. Update docs/guides/execution.md to link to the new example after the configuration snippet. Signed-off-by: Oliver Koenig Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: oliver könig --- docs/guides/execution.md | 2 + examples/kubeflow/hello_kubeflow.py | 138 ++++++++++++++++++++++++++++ 2 files changed, 140 insertions(+) create mode 100644 examples/kubeflow/hello_kubeflow.py diff --git a/docs/guides/execution.md b/docs/guides/execution.md index 35dd393b..5ea1a4d1 100644 --- a/docs/guides/execution.md +++ b/docs/guides/execution.md @@ -351,6 +351,8 @@ executor = KubeflowExecutor( `cancel(wait=True)` polls until both the CR and all associated pods are fully terminated before returning. +A self-contained end-to-end example — including volume setup, secret injection, and workdir PVC sync — is available at [`examples/kubeflow/hello_kubeflow.py`](../../examples/kubeflow/hello_kubeflow.py). + ##### Advanced options | Parameter | Purpose | diff --git a/examples/kubeflow/hello_kubeflow.py b/examples/kubeflow/hello_kubeflow.py new file mode 100644 index 00000000..ebaa0b4a --- /dev/null +++ b/examples/kubeflow/hello_kubeflow.py @@ -0,0 +1,138 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""End-to-end example: run a distributed training job via KubeflowExecutor. + +Prerequisites +------------- +* Kubeflow Training Operator v2 installed in your cluster with a + ``ClusterTrainingRuntime`` named ``"torch-distributed"``. +* A kubeconfig pointing at the target cluster (or run from inside a pod). +* An image that contains your training code and all dependencies. + +Usage +----- + python examples/kubeflow/hello_kubeflow.py \\ + --namespace my-namespace \\ + --image nvcr.io/nvidia/nemo:25.04 \\ + --pvc model-cache + +The job runs ``nvidia-smi`` and a quick PyTorch device check on every worker, +then exits. Swap the ``inline`` script for your real training command. +""" + +import argparse +import logging + +import nemo_run as run +from nemo_run.core.execution.kubeflow import KubeflowExecutor + +logging.basicConfig(level=logging.INFO) + +# ── CLI ─────────────────────────────────────────────────────────────────────── + +parser = argparse.ArgumentParser(description="KubeflowExecutor hello-world example") +parser.add_argument("--namespace", default="default", help="Kubernetes namespace") +parser.add_argument("--image", required=True, help="Container image with your training env") +parser.add_argument("--num-nodes", type=int, default=2, help="Number of worker pods") +parser.add_argument("--gpus-per-node", type=int, default=8, help="GPUs per pod") +parser.add_argument("--pvc", default=None, help="PVC name for workdir sync (optional)") +parser.add_argument( + "--runtime-ref", + default="torch-distributed", + help="ClusterTrainingRuntime name in your cluster", +) +args = parser.parse_args() + +# ── Executor ────────────────────────────────────────────────────────────────── + +executor = KubeflowExecutor( + # Kubeflow TrainJob settings + launcher=run.Torchrun(), + runtime_ref=args.runtime_ref, + namespace=args.namespace, + image=args.image, + num_nodes=args.num_nodes, + gpus_per_node=args.gpus_per_node, + + # Resource requests — tune these to your node type + cpu_requests="8", + memory_requests="32Gi", + + # Simple key=value environment variables + env_vars={ + "NCCL_DEBUG": "INFO", + "PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True", + }, + + # Full env var dicts — use for Kubernetes secrets, field references, etc. + # Example: inject a W&B API key from a Kubernetes Secret named "my-secrets" + env_list=[ + # { + # "name": "WANDB_API_KEY", + # "valueFrom": {"secretKeyRef": {"name": "my-secrets", "key": "WANDB_API_KEY"}}, + # }, + ], + + # Toleration that allows scheduling on GPU-tainted nodes + tolerations=[ + {"effect": "NoSchedule", "key": "nvidia.com/gpu", "operator": "Exists"}, + ], + + # Volumes: a memory-backed /dev/shm so PyTorch DataLoader workers have + # enough shared memory (the default Kubernetes limit is only 64 MiB). + volumes=[ + {"name": "dshm", "emptyDir": {"medium": "Memory"}}, + *( + [{"name": "workdir", "persistentVolumeClaim": {"claimName": args.pvc}}] + if args.pvc + else [] + ), + ], + volume_mounts=[ + {"name": "dshm", "mountPath": "/dev/shm"}, + *( + [{"name": "workdir", "mountPath": "/nemo-workspace"}] + if args.pvc + else [] + ), + ], + + # Sync the generated launch script to the pod via PVC before launch. + # Required whenever you use a custom launcher (e.g. run.Torchrun()). + workdir_pvc=args.pvc, + workdir_pvc_path="/nemo-workspace", + + labels={"app": "hello-kubeflow"}, +) + +# ── Task ────────────────────────────────────────────────────────────────────── + +# Replace this inline script with your real training command. +script = run.Script( + inline="""\ +nvidia-smi +python - <<'PY' +import os, torch +rank = int(os.environ.get("RANK", 0)) +world = int(os.environ.get("WORLD_SIZE", 1)) +print(f"rank {rank}/{world} — cuda devices: {torch.cuda.device_count()}") +PY +""" +) + +# ── Launch ──────────────────────────────────────────────────────────────────── + +run.run(script, executor=executor, name="hello-kubeflow") From a990619a6f8bb13b878ccae32ee995740139983c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Wed, 22 Apr 2026 10:20:15 +0000 Subject: [PATCH 3/4] docs(kubeflow): add log tailing and SIGINT/SIGTERM cancel handler to example MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Register SIGINT/SIGTERM handlers before job submission so Ctrl-C or pod eviction triggers executor.cancel(wait=True). Switch from run.run() to run.Experiment so tail_logs=True can be passed to exp.run(), streaming pod logs back to the terminal. Signed-off-by: Oliver Koenig Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: oliver könig --- examples/kubeflow/hello_kubeflow.py | 33 ++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/examples/kubeflow/hello_kubeflow.py b/examples/kubeflow/hello_kubeflow.py index ebaa0b4a..1b98d70e 100644 --- a/examples/kubeflow/hello_kubeflow.py +++ b/examples/kubeflow/hello_kubeflow.py @@ -30,16 +30,20 @@ --pvc model-cache The job runs ``nvidia-smi`` and a quick PyTorch device check on every worker, -then exits. Swap the ``inline`` script for your real training command. +streams logs back to your terminal, and cancels cleanly on SIGINT/SIGTERM. +Swap the ``inline`` script for your real training command. """ import argparse import logging +import signal +import sys import nemo_run as run from nemo_run.core.execution.kubeflow import KubeflowExecutor logging.basicConfig(level=logging.INFO) +log = logging.getLogger(__name__) # ── CLI ─────────────────────────────────────────────────────────────────────── @@ -56,6 +60,8 @@ ) args = parser.parse_args() +JOB_NAME = "hello-kubeflow" + # ── Executor ────────────────────────────────────────────────────────────────── executor = KubeflowExecutor( @@ -115,7 +121,7 @@ workdir_pvc=args.pvc, workdir_pvc_path="/nemo-workspace", - labels={"app": "hello-kubeflow"}, + labels={"app": JOB_NAME}, ) # ── Task ────────────────────────────────────────────────────────────────────── @@ -133,6 +139,27 @@ """ ) +# ── Signal handling ─────────────────────────────────────────────────────────── + +# Register SIGINT / SIGTERM handlers *before* submitting so that Ctrl-C or a +# pod eviction during startup still triggers a clean TrainJob deletion. +# executor.cancel() deletes the TrainJob CR and polls until all pods are gone. +def _cancel(signum: int, frame: object) -> None: + log.info("Signal %d received — cancelling %s", signum, JOB_NAME) + try: + executor.cancel(JOB_NAME, wait=True) + except Exception as exc: + log.warning("Cancel failed: %s", exc) + sys.exit(0) + + +signal.signal(signal.SIGINT, _cancel) +signal.signal(signal.SIGTERM, _cancel) + # ── Launch ──────────────────────────────────────────────────────────────────── -run.run(script, executor=executor, name="hello-kubeflow") +# run.Experiment gives direct control over log tailing. +# detach=False blocks until the job finishes; tail_logs=True streams pod logs. +with run.Experiment(JOB_NAME, executor=executor) as exp: + exp.add(script, name=JOB_NAME, tail_logs=False) + exp.run(detach=False, tail_logs=True) From 7a757f15c1b0ab2f19195201cb0fc800afab69dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Wed, 22 Apr 2026 10:23:13 +0000 Subject: [PATCH 4/4] style: apply ruff formatting to hello_kubeflow.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Oliver Koenig Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: oliver könig --- examples/kubeflow/hello_kubeflow.py | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/examples/kubeflow/hello_kubeflow.py b/examples/kubeflow/hello_kubeflow.py index 1b98d70e..67b97351 100644 --- a/examples/kubeflow/hello_kubeflow.py +++ b/examples/kubeflow/hello_kubeflow.py @@ -72,17 +72,14 @@ image=args.image, num_nodes=args.num_nodes, gpus_per_node=args.gpus_per_node, - # Resource requests — tune these to your node type cpu_requests="8", memory_requests="32Gi", - # Simple key=value environment variables env_vars={ "NCCL_DEBUG": "INFO", "PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True", }, - # Full env var dicts — use for Kubernetes secrets, field references, etc. # Example: inject a W&B API key from a Kubernetes Secret named "my-secrets" env_list=[ @@ -91,12 +88,10 @@ # "valueFrom": {"secretKeyRef": {"name": "my-secrets", "key": "WANDB_API_KEY"}}, # }, ], - # Toleration that allows scheduling on GPU-tainted nodes tolerations=[ {"effect": "NoSchedule", "key": "nvidia.com/gpu", "operator": "Exists"}, ], - # Volumes: a memory-backed /dev/shm so PyTorch DataLoader workers have # enough shared memory (the default Kubernetes limit is only 64 MiB). volumes=[ @@ -109,18 +104,12 @@ ], volume_mounts=[ {"name": "dshm", "mountPath": "/dev/shm"}, - *( - [{"name": "workdir", "mountPath": "/nemo-workspace"}] - if args.pvc - else [] - ), + *([{"name": "workdir", "mountPath": "/nemo-workspace"}] if args.pvc else []), ], - # Sync the generated launch script to the pod via PVC before launch. # Required whenever you use a custom launcher (e.g. run.Torchrun()). workdir_pvc=args.pvc, workdir_pvc_path="/nemo-workspace", - labels={"app": JOB_NAME}, ) @@ -141,6 +130,7 @@ # ── Signal handling ─────────────────────────────────────────────────────────── + # Register SIGINT / SIGTERM handlers *before* submitting so that Ctrl-C or a # pod eviction during startup still triggers a clean TrainJob deletion. # executor.cancel() deletes the TrainJob CR and polls until all pods are gone.