diff --git a/docs/guides/execution.md b/docs/guides/execution.md index 77ef25e0..5ea1a4d1 100644 --- a/docs/guides/execution.md +++ b/docs/guides/execution.md @@ -296,50 +296,78 @@ For a complete end-to-end example using DGX Cloud with NeMo, refer to the [NVIDI #### KubeflowExecutor -The `KubeflowExecutor` integrates with the [Kubeflow Training Operator](https://github.com/kubeflow/training-operator) to run distributed training jobs on any Kubernetes cluster. It submits CRDs directly via the Kubernetes API — no `kubectl` required. - -Two job kinds are supported via the `job_kind` parameter: - -- **`"PyTorchJob"`** (default) — Training Operator v1 (`kubeflow.org/v1`) -- **`"TrainJob"`** — Training Operator v2 (`trainer.kubeflow.org/v1alpha1`) +The `KubeflowExecutor` integrates with the [Kubeflow Training Operator v2](https://github.com/kubeflow/training-operator) to run distributed training jobs on any Kubernetes cluster. It submits `TrainJob` CRDs (`trainer.kubeflow.org/v1alpha1`) directly via the Kubernetes API — no `kubectl` required. Kubernetes configuration is loaded automatically: local kubeconfig is tried first, falling back to in-cluster config when running inside a pod. +A `ClusterTrainingRuntime` named `runtime_ref` must exist in the target namespace; `"torch-distributed"` is the conventional name for PyTorch distributed workloads. + Here's an example configuration: ```python -# PyTorchJob (default) -executor = run.KubeflowExecutor( - namespace="runai-nemo-ci", - image="nvcr.io/nvidia/nemo:26.02", - num_nodes=3, # total pods: 1 Master + (num_nodes-1) Workers - gpus_per_node=8, # also sets nproc_per_node unless overridden explicitly +import nemo_run as run +from nemo_run.core.execution.kubeflow import KubeflowExecutor + +executor = KubeflowExecutor( + launcher=run.Torchrun(), + runtime_ref="torch-distributed", # ClusterTrainingRuntime in your cluster + namespace="my-namespace", + image="nvcr.io/nvidia/nemo:25.04", + num_nodes=4, + gpus_per_node=8, cpu_requests="16", memory_requests="64Gi", + image_pull_secrets=["ngc-registry-secret"], + # Simple key=value env vars + env_vars={ + "NCCL_DEBUG": "INFO", + "PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True", + }, + # Full env var dicts — use for secretKeyRef, fieldRef, etc. + env_list=[ + { + "name": "WANDB_API_KEY", + "valueFrom": {"secretKeyRef": {"name": "my-secrets", "key": "WANDB_API_KEY"}}, + }, + ], + labels={"app": "my-training-job"}, + tolerations=[ + {"effect": "NoSchedule", "key": "nvidia.com/gpu", "operator": "Exists"}, + ], volumes=[ - {"name": "model-cache", "persistentVolumeClaim": {"claimName": "data-pvc"}} + {"name": "dshm", "emptyDir": {"medium": "Memory"}}, + {"name": "model-cache", "persistentVolumeClaim": {"claimName": "model-cache"}}, ], - volume_mounts=[{"name": "model-cache", "mountPath": "/nemo-workspace"}], - labels={"app": "nemo-ci-training"}, - env_vars={"NCCL_DEBUG": "INFO"}, -) - -# TrainJob (Training Operator v2) -executor = run.KubeflowExecutor( - job_kind="TrainJob", - runtime_ref="torch-distributed", # name of the ClusterTrainingRuntime - namespace="runai-nemo-ci", - image="nvcr.io/nvidia/nemo:26.02", - num_nodes=3, - gpus_per_node=8, + volume_mounts=[ + {"name": "dshm", "mountPath": "/dev/shm"}, + {"name": "model-cache", "mountPath": "/nemo-workspace"}, + ], + # Sync the generated launch script to the pod before launch, + # and pull results back after the job completes. + workdir_pvc="model-cache", + workdir_pvc_path="/nemo-workspace", ) ``` `cancel(wait=True)` polls until both the CR and all associated pods are fully terminated before returning. +A self-contained end-to-end example — including volume setup, secret injection, and workdir PVC sync — is available at [`examples/kubeflow/hello_kubeflow.py`](../../examples/kubeflow/hello_kubeflow.py). + +##### Advanced options + +| Parameter | Purpose | +|-----------|---------| +| `nprocs_per_node` | Override processes per node; defaults to `gpus_per_node` when unset | +| `extra_resource_requests` / `extra_resource_limits` | Non-GPU extended resources, e.g. `{"vpc.amazonaws.com/efa": "32"}` for AWS EFA NICs | +| `pod_spec_overrides` | Merge arbitrary fields into `podTemplateOverrides[].spec`, e.g. `{"nodeSelector": {...}}` | +| `container_kwargs` | Extra container-level fields, e.g. `{"securityContext": {"privileged": True}}` | +| `workdir_local_path` | Local directory merged into the job dir before PVC sync — useful for hand-written scripts not managed by the packager | +| `annotations` | Kubernetes annotations added to the `TrainJob` CR | +| `affinity` | Pod scheduling affinity rules | + ##### Limitations -Attributes like `resourceClaims` are not [supported](https://github.com/kubeflow/trainer/issues/3264) and must be injected in different ways, like by Mutating Webhooks. +Attributes like `resourceClaims` are not [supported](https://github.com/kubeflow/trainer/issues/3264) natively and must be injected via Mutating Webhooks or `pod_spec_overrides`. #### LeptonExecutor diff --git a/examples/kubeflow/hello_kubeflow.py b/examples/kubeflow/hello_kubeflow.py new file mode 100644 index 00000000..67b97351 --- /dev/null +++ b/examples/kubeflow/hello_kubeflow.py @@ -0,0 +1,155 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""End-to-end example: run a distributed training job via KubeflowExecutor. + +Prerequisites +------------- +* Kubeflow Training Operator v2 installed in your cluster with a + ``ClusterTrainingRuntime`` named ``"torch-distributed"``. +* A kubeconfig pointing at the target cluster (or run from inside a pod). +* An image that contains your training code and all dependencies. + +Usage +----- + python examples/kubeflow/hello_kubeflow.py \\ + --namespace my-namespace \\ + --image nvcr.io/nvidia/nemo:25.04 \\ + --pvc model-cache + +The job runs ``nvidia-smi`` and a quick PyTorch device check on every worker, +streams logs back to your terminal, and cancels cleanly on SIGINT/SIGTERM. +Swap the ``inline`` script for your real training command. +""" + +import argparse +import logging +import signal +import sys + +import nemo_run as run +from nemo_run.core.execution.kubeflow import KubeflowExecutor + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger(__name__) + +# ── CLI ─────────────────────────────────────────────────────────────────────── + +parser = argparse.ArgumentParser(description="KubeflowExecutor hello-world example") +parser.add_argument("--namespace", default="default", help="Kubernetes namespace") +parser.add_argument("--image", required=True, help="Container image with your training env") +parser.add_argument("--num-nodes", type=int, default=2, help="Number of worker pods") +parser.add_argument("--gpus-per-node", type=int, default=8, help="GPUs per pod") +parser.add_argument("--pvc", default=None, help="PVC name for workdir sync (optional)") +parser.add_argument( + "--runtime-ref", + default="torch-distributed", + help="ClusterTrainingRuntime name in your cluster", +) +args = parser.parse_args() + +JOB_NAME = "hello-kubeflow" + +# ── Executor ────────────────────────────────────────────────────────────────── + +executor = KubeflowExecutor( + # Kubeflow TrainJob settings + launcher=run.Torchrun(), + runtime_ref=args.runtime_ref, + namespace=args.namespace, + image=args.image, + num_nodes=args.num_nodes, + gpus_per_node=args.gpus_per_node, + # Resource requests — tune these to your node type + cpu_requests="8", + memory_requests="32Gi", + # Simple key=value environment variables + env_vars={ + "NCCL_DEBUG": "INFO", + "PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True", + }, + # Full env var dicts — use for Kubernetes secrets, field references, etc. + # Example: inject a W&B API key from a Kubernetes Secret named "my-secrets" + env_list=[ + # { + # "name": "WANDB_API_KEY", + # "valueFrom": {"secretKeyRef": {"name": "my-secrets", "key": "WANDB_API_KEY"}}, + # }, + ], + # Toleration that allows scheduling on GPU-tainted nodes + tolerations=[ + {"effect": "NoSchedule", "key": "nvidia.com/gpu", "operator": "Exists"}, + ], + # Volumes: a memory-backed /dev/shm so PyTorch DataLoader workers have + # enough shared memory (the default Kubernetes limit is only 64 MiB). + volumes=[ + {"name": "dshm", "emptyDir": {"medium": "Memory"}}, + *( + [{"name": "workdir", "persistentVolumeClaim": {"claimName": args.pvc}}] + if args.pvc + else [] + ), + ], + volume_mounts=[ + {"name": "dshm", "mountPath": "/dev/shm"}, + *([{"name": "workdir", "mountPath": "/nemo-workspace"}] if args.pvc else []), + ], + # Sync the generated launch script to the pod via PVC before launch. + # Required whenever you use a custom launcher (e.g. run.Torchrun()). + workdir_pvc=args.pvc, + workdir_pvc_path="/nemo-workspace", + labels={"app": JOB_NAME}, +) + +# ── Task ────────────────────────────────────────────────────────────────────── + +# Replace this inline script with your real training command. +script = run.Script( + inline="""\ +nvidia-smi +python - <<'PY' +import os, torch +rank = int(os.environ.get("RANK", 0)) +world = int(os.environ.get("WORLD_SIZE", 1)) +print(f"rank {rank}/{world} — cuda devices: {torch.cuda.device_count()}") +PY +""" +) + +# ── Signal handling ─────────────────────────────────────────────────────────── + + +# Register SIGINT / SIGTERM handlers *before* submitting so that Ctrl-C or a +# pod eviction during startup still triggers a clean TrainJob deletion. +# executor.cancel() deletes the TrainJob CR and polls until all pods are gone. +def _cancel(signum: int, frame: object) -> None: + log.info("Signal %d received — cancelling %s", signum, JOB_NAME) + try: + executor.cancel(JOB_NAME, wait=True) + except Exception as exc: + log.warning("Cancel failed: %s", exc) + sys.exit(0) + + +signal.signal(signal.SIGINT, _cancel) +signal.signal(signal.SIGTERM, _cancel) + +# ── Launch ──────────────────────────────────────────────────────────────────── + +# run.Experiment gives direct control over log tailing. +# detach=False blocks until the job finishes; tail_logs=True streams pod logs. +with run.Experiment(JOB_NAME, executor=executor) as exp: + exp.add(script, name=JOB_NAME, tail_logs=False) + exp.run(detach=False, tail_logs=True)