Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 55 additions & 27 deletions docs/guides/execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,50 +296,78 @@ For a complete end-to-end example using DGX Cloud with NeMo, refer to the [NVIDI

#### KubeflowExecutor

The `KubeflowExecutor` integrates with the [Kubeflow Training Operator](https://github.com/kubeflow/training-operator) to run distributed training jobs on any Kubernetes cluster. It submits CRDs directly via the Kubernetes API — no `kubectl` required.

Two job kinds are supported via the `job_kind` parameter:

- **`"PyTorchJob"`** (default) — Training Operator v1 (`kubeflow.org/v1`)
- **`"TrainJob"`** — Training Operator v2 (`trainer.kubeflow.org/v1alpha1`)
The `KubeflowExecutor` integrates with the [Kubeflow Training Operator v2](https://github.com/kubeflow/training-operator) to run distributed training jobs on any Kubernetes cluster. It submits `TrainJob` CRDs (`trainer.kubeflow.org/v1alpha1`) directly via the Kubernetes API — no `kubectl` required.

Kubernetes configuration is loaded automatically: local kubeconfig is tried first, falling back to in-cluster config when running inside a pod.

A `ClusterTrainingRuntime` named `runtime_ref` must exist in the target namespace; `"torch-distributed"` is the conventional name for PyTorch distributed workloads.

Here's an example configuration:

```python
# PyTorchJob (default)
executor = run.KubeflowExecutor(
namespace="runai-nemo-ci",
image="nvcr.io/nvidia/nemo:26.02",
num_nodes=3, # total pods: 1 Master + (num_nodes-1) Workers
gpus_per_node=8, # also sets nproc_per_node unless overridden explicitly
import nemo_run as run
from nemo_run.core.execution.kubeflow import KubeflowExecutor

executor = KubeflowExecutor(
launcher=run.Torchrun(),
runtime_ref="torch-distributed", # ClusterTrainingRuntime in your cluster
namespace="my-namespace",
image="nvcr.io/nvidia/nemo:25.04",
num_nodes=4,
gpus_per_node=8,
cpu_requests="16",
memory_requests="64Gi",
image_pull_secrets=["ngc-registry-secret"],
# Simple key=value env vars
env_vars={
"NCCL_DEBUG": "INFO",
"PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True",
},
# Full env var dicts — use for secretKeyRef, fieldRef, etc.
env_list=[
{
"name": "WANDB_API_KEY",
"valueFrom": {"secretKeyRef": {"name": "my-secrets", "key": "WANDB_API_KEY"}},
},
],
labels={"app": "my-training-job"},
tolerations=[
{"effect": "NoSchedule", "key": "nvidia.com/gpu", "operator": "Exists"},
],
volumes=[
{"name": "model-cache", "persistentVolumeClaim": {"claimName": "data-pvc"}}
{"name": "dshm", "emptyDir": {"medium": "Memory"}},
{"name": "model-cache", "persistentVolumeClaim": {"claimName": "model-cache"}},
],
volume_mounts=[{"name": "model-cache", "mountPath": "/nemo-workspace"}],
labels={"app": "nemo-ci-training"},
env_vars={"NCCL_DEBUG": "INFO"},
)

# TrainJob (Training Operator v2)
executor = run.KubeflowExecutor(
job_kind="TrainJob",
runtime_ref="torch-distributed", # name of the ClusterTrainingRuntime
namespace="runai-nemo-ci",
image="nvcr.io/nvidia/nemo:26.02",
num_nodes=3,
gpus_per_node=8,
volume_mounts=[
{"name": "dshm", "mountPath": "/dev/shm"},
{"name": "model-cache", "mountPath": "/nemo-workspace"},
],
# Sync the generated launch script to the pod before launch,
# and pull results back after the job completes.
workdir_pvc="model-cache",
workdir_pvc_path="/nemo-workspace",
)
```

`cancel(wait=True)` polls until both the CR and all associated pods are fully terminated before returning.

A self-contained end-to-end example — including volume setup, secret injection, and workdir PVC sync — is available at [`examples/kubeflow/hello_kubeflow.py`](../../examples/kubeflow/hello_kubeflow.py).

##### Advanced options

| Parameter | Purpose |
|-----------|---------|
| `nprocs_per_node` | Override processes per node; defaults to `gpus_per_node` when unset |
| `extra_resource_requests` / `extra_resource_limits` | Non-GPU extended resources, e.g. `{"vpc.amazonaws.com/efa": "32"}` for AWS EFA NICs |
| `pod_spec_overrides` | Merge arbitrary fields into `podTemplateOverrides[].spec`, e.g. `{"nodeSelector": {...}}` |
| `container_kwargs` | Extra container-level fields, e.g. `{"securityContext": {"privileged": True}}` |
| `workdir_local_path` | Local directory merged into the job dir before PVC sync — useful for hand-written scripts not managed by the packager |
| `annotations` | Kubernetes annotations added to the `TrainJob` CR |
| `affinity` | Pod scheduling affinity rules |

##### Limitations

Attributes like `resourceClaims` are not [supported](https://github.com/kubeflow/trainer/issues/3264) and must be injected in different ways, like by Mutating Webhooks.
Attributes like `resourceClaims` are not [supported](https://github.com/kubeflow/trainer/issues/3264) natively and must be injected via Mutating Webhooks or `pod_spec_overrides`.

#### LeptonExecutor

Expand Down
155 changes: 155 additions & 0 deletions examples/kubeflow/hello_kubeflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""End-to-end example: run a distributed training job via KubeflowExecutor.

Prerequisites
-------------
* Kubeflow Training Operator v2 installed in your cluster with a
``ClusterTrainingRuntime`` named ``"torch-distributed"``.
* A kubeconfig pointing at the target cluster (or run from inside a pod).
* An image that contains your training code and all dependencies.

Usage
-----
python examples/kubeflow/hello_kubeflow.py \\
--namespace my-namespace \\
--image nvcr.io/nvidia/nemo:25.04 \\
--pvc model-cache

The job runs ``nvidia-smi`` and a quick PyTorch device check on every worker,
streams logs back to your terminal, and cancels cleanly on SIGINT/SIGTERM.
Swap the ``inline`` script for your real training command.
"""

import argparse
import logging
import signal
import sys

import nemo_run as run
from nemo_run.core.execution.kubeflow import KubeflowExecutor

logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)

# ── CLI ───────────────────────────────────────────────────────────────────────

parser = argparse.ArgumentParser(description="KubeflowExecutor hello-world example")
parser.add_argument("--namespace", default="default", help="Kubernetes namespace")
parser.add_argument("--image", required=True, help="Container image with your training env")
parser.add_argument("--num-nodes", type=int, default=2, help="Number of worker pods")
parser.add_argument("--gpus-per-node", type=int, default=8, help="GPUs per pod")
parser.add_argument("--pvc", default=None, help="PVC name for workdir sync (optional)")
parser.add_argument(
"--runtime-ref",
default="torch-distributed",
help="ClusterTrainingRuntime name in your cluster",
)
args = parser.parse_args()

JOB_NAME = "hello-kubeflow"

# ── Executor ──────────────────────────────────────────────────────────────────

executor = KubeflowExecutor(
# Kubeflow TrainJob settings
launcher=run.Torchrun(),
runtime_ref=args.runtime_ref,
namespace=args.namespace,
image=args.image,
num_nodes=args.num_nodes,
gpus_per_node=args.gpus_per_node,
# Resource requests — tune these to your node type
cpu_requests="8",
memory_requests="32Gi",
# Simple key=value environment variables
env_vars={
"NCCL_DEBUG": "INFO",
"PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True",
},
# Full env var dicts — use for Kubernetes secrets, field references, etc.
# Example: inject a W&B API key from a Kubernetes Secret named "my-secrets"
env_list=[
# {
# "name": "WANDB_API_KEY",
# "valueFrom": {"secretKeyRef": {"name": "my-secrets", "key": "WANDB_API_KEY"}},
# },
],
# Toleration that allows scheduling on GPU-tainted nodes
tolerations=[
{"effect": "NoSchedule", "key": "nvidia.com/gpu", "operator": "Exists"},
],
# Volumes: a memory-backed /dev/shm so PyTorch DataLoader workers have
# enough shared memory (the default Kubernetes limit is only 64 MiB).
volumes=[
{"name": "dshm", "emptyDir": {"medium": "Memory"}},
*(
[{"name": "workdir", "persistentVolumeClaim": {"claimName": args.pvc}}]
if args.pvc
else []
),
],
volume_mounts=[
{"name": "dshm", "mountPath": "/dev/shm"},
*([{"name": "workdir", "mountPath": "/nemo-workspace"}] if args.pvc else []),
],
# Sync the generated launch script to the pod via PVC before launch.
# Required whenever you use a custom launcher (e.g. run.Torchrun()).
workdir_pvc=args.pvc,
workdir_pvc_path="/nemo-workspace",
labels={"app": JOB_NAME},
)

# ── Task ──────────────────────────────────────────────────────────────────────

# Replace this inline script with your real training command.
script = run.Script(
inline="""\
nvidia-smi
python - <<'PY'
import os, torch
rank = int(os.environ.get("RANK", 0))
world = int(os.environ.get("WORLD_SIZE", 1))
print(f"rank {rank}/{world} — cuda devices: {torch.cuda.device_count()}")
PY
"""
)

# ── Signal handling ───────────────────────────────────────────────────────────


# Register SIGINT / SIGTERM handlers *before* submitting so that Ctrl-C or a
# pod eviction during startup still triggers a clean TrainJob deletion.
# executor.cancel() deletes the TrainJob CR and polls until all pods are gone.
def _cancel(signum: int, frame: object) -> None:
log.info("Signal %d received — cancelling %s", signum, JOB_NAME)
try:
executor.cancel(JOB_NAME, wait=True)
except Exception as exc:
log.warning("Cancel failed: %s", exc)
sys.exit(0)


signal.signal(signal.SIGINT, _cancel)
signal.signal(signal.SIGTERM, _cancel)

# ── Launch ────────────────────────────────────────────────────────────────────

# run.Experiment gives direct control over log tailing.
# detach=False blocks until the job finishes; tail_logs=True streams pod logs.
with run.Experiment(JOB_NAME, executor=executor) as exp:
exp.add(script, name=JOB_NAME, tail_logs=False)
exp.run(detach=False, tail_logs=True)
Loading