diff --git a/examples/slurm/__init__.py b/examples/slurm/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/examples/slurm/utils.py b/examples/slurm/utils.py deleted file mode 100644 index c46584b96..000000000 --- a/examples/slurm/utils.py +++ /dev/null @@ -1,92 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -import getpass - -import json -import logging -import os -import pathlib - -from monarch._rust_bindings.monarch_hyperactor.alloc import AllocConstraints, AllocSpec -from monarch._src.actor.allocator import RemoteAllocator, TorchXRemoteAllocInitializer - -from monarch.actor import ProcMesh -from monarch.tools import commands -from monarch.tools.components import hyperactor -from monarch.tools.config import Config - - -USER = getpass.getuser() -HOME = pathlib.Path().home() -CWD = os.getcwd() -DEACTIVATE = None - -logging.basicConfig( - level=logging.INFO, - format="%(name)s %(asctime)s %(levelname)s %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - force=True, -) -logger: logging.Logger = logging.getLogger(__name__) - -# pre-configured for H100 -HOST_TYPE = "gpu.xlarge" -HOST_MEMORY = 2062607 - - -async def get_appdef(num_hosts: int, host_type: str = HOST_TYPE): - # similar to Docker image; should contain a conda env in the $img_root/conda/ directory - # when config.workspace is not None, an ephemeral fbpkg version is created - # that conda-packs the currently active local conda env AND the directory specified by workspace - image = "monarch_default_workspace:latest" - - appdef = hyperactor.host_mesh( - image=image, - meshes=[f"mesh0:{num_hosts}:{host_type}"], # mesh_name:num_hosts:host_type - ) - return appdef - - -async def get_server_info(appdef, host_memory: int = HOST_MEMORY): - jobname = f"monarch-{USER}" - - # TODO: Register this so we don't have to do this every time - for role in appdef.roles: - role.resource.memMB = host_memory - - config = Config( - scheduler="slurm", - appdef=appdef, - workspace=str(CWD), # or None to disable building ephemeral, - ) - - server_info = await commands.get_or_create( - jobname, - config, - force_restart=False, - ) - return server_info - - -async def create_proc_mesh(num_hosts, appdef, server_info): - num_gpus_per_host = appdef.roles[0].resource.gpu - - logger.info( - "\n===== Server Info =====\n%s", - json.dumps(server_info.to_json(), indent=2), - ) - - allocator = RemoteAllocator( - world_id="foo", - initializer=TorchXRemoteAllocInitializer(server_info.server_handle), - ) - alloc = await allocator.allocate( - AllocSpec(AllocConstraints(), hosts=num_hosts, gpus=num_gpus_per_host) - ) - - proc_mesh = await ProcMesh.from_alloc(alloc) - return proc_mesh diff --git a/examples/slurm_allreduce.ipynb b/examples/slurm_allreduce.ipynb index b60950179..b26dc6d36 100644 --- a/examples/slurm_allreduce.ipynb +++ b/examples/slurm_allreduce.ipynb @@ -5,113 +5,7 @@ "execution_count": null, "id": "c443b989-5a71-455f-9a59-9963338634ec", "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - ":8: FutureWarning: Setting `workspace='/home/ubuntu/ahmads/monarch/examples'` is deprecated.\n", - "torchx.schedulers.slurm_scheduler 2025-08-29 21:02:20 INFO unable to get job info for `monarch-ubuntu` with `squeue` (squeue: error: Invalid job id: monarch-ubuntu\n", - "), trying `sacct`\n", - "torchx.schedulers.slurm_scheduler 2025-08-29 21:02:20 INFO unable to get job info for `monarch-ubuntu` with `sacct` (sacct: fatal: Bad job/step specified: monarch-ubuntu\n", - ")\n", - "monarch.tools.commands 2025-08-29 21:02:20 INFO no existing RUNNING server `slurm:///monarch-ubuntu` creating new one...\n", - "torchx.runner.api 2025-08-29 21:02:20 INFO Tracker configurations: {}\n", - "torchx.runner.api 2025-08-29 21:02:20 INFO Checking for changes in workspace `/home/ubuntu/.monarch/out/tmpkk97qppi/workspace`...\n", - "torchx.runner.api 2025-08-29 21:02:20 INFO To disable workspaces pass: --workspace=\"\" from CLI or workspace=None programmatically.\n", - "torchx.runner.api 2025-08-29 21:02:20 INFO Reusing original image `monarch_default_workspace:latest` for role[0]=mesh0. Either a patch was built or no changes to workspace was detected.\n", - "monarch.tools.commands 2025-08-29 21:02:20 INFO created new `slurm:///418` waiting for it to be ready...\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Ahmad: {'requeue': None, 'ntasks-per-node': '1', 'cpus-per-task': '48', 'mem': '186777', 'gpus-per-task': '4', 'ntasks': '1'}\n", - "Ahmad: {'requeue': None, 'ntasks-per-node': '1', 'cpus-per-task': '48', 'mem': '186777', 'gpus-per-task': '4', 'ntasks': '1'}\n", - "Waiting for slurm:///418 to be RUNNING (current: PENDING); will check again in 5.0 seconds. Total wait time: 0:00:20.105986\r" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "slurm.utils 2025-08-29 21:02:45 INFO \n", - "===== Server Info =====\n", - "{\n", - " \"name\": \"418\",\n", - " \"server_handle\": \"slurm:///418\",\n", - " \"state\": \"RUNNING\",\n", - " \"meshes\": {\n", - " \"mesh0\": {\n", - " \"host_type\": \"__UNSET__\",\n", - " \"hosts\": 2,\n", - " \"gpus\": -1,\n", - " \"hostnames\": [\n", - " \"gpu-queue-st-gpu-compute-1\",\n", - " \"gpu-queue-st-gpu-compute-2\"\n", - " ]\n", - " }\n", - " }\n", - "}\n", - "__main__ 2025-08-29 21:02:45 INFO computing world size...\n", - "monarch._src.actor.allocator 2025-08-29 21:02:45 INFO no match label `procmesh.monarch.meta.com/name` specified in alloc constraints\n", - "monarch._src.actor.allocator 2025-08-29 21:02:45 INFO found a single proc mesh `mesh0` in slurm:///418, will allocate on it\n", - "monarch.tools.network 2025-08-29 21:02:45 INFO no AF_INET6 address that can bind TCP sockets for `gpu-queue-st-gpu-compute-1:26600` (error: [Errno -5] No address associated with hostname)\n", - "monarch.tools.network 2025-08-29 21:02:45 INFO resolved AF_INET address `10.0.2.236:26600` for `gpu-queue-st-gpu-compute-1:26600`\n", - "monarch.tools.network 2025-08-29 21:02:45 INFO no AF_INET6 address that can bind TCP sockets for `gpu-queue-st-gpu-compute-2:26600` (error: [Errno -5] No address associated with hostname)\n", - "monarch.tools.network 2025-08-29 21:02:45 INFO resolved AF_INET address `10.0.2.132:26600` for `gpu-queue-st-gpu-compute-2:26600`\n", - "monarch._src.actor.allocator 2025-08-29 21:02:45 INFO initializing alloc on remote allocator addresses: ['tcp!10.0.2.236:26600', 'tcp!10.0.2.132:26600']\n", - "monarch._src.actor.allocator 2025-08-29 21:02:45 INFO no match label `procmesh.monarch.meta.com/name` specified in alloc constraints\n", - "monarch._src.actor.allocator 2025-08-29 21:02:45 INFO found a single proc mesh `mesh0` in slurm:///418, will allocate on it\n", - "monarch.tools.network 2025-08-29 21:02:45 INFO no AF_INET6 address that can bind TCP sockets for `gpu-queue-st-gpu-compute-1:26600` (error: [Errno -5] No address associated with hostname)\n", - "monarch.tools.network 2025-08-29 21:02:45 INFO resolved AF_INET address `10.0.2.236:26600` for `gpu-queue-st-gpu-compute-1:26600`\n", - "monarch.tools.network 2025-08-29 21:02:45 INFO no AF_INET6 address that can bind TCP sockets for `gpu-queue-st-gpu-compute-2:26600` (error: [Errno -5] No address associated with hostname)\n", - "monarch.tools.network 2025-08-29 21:02:45 INFO resolved AF_INET address `10.0.2.132:26600` for `gpu-queue-st-gpu-compute-2:26600`\n", - "monarch._src.actor.allocator 2025-08-29 21:02:45 INFO initializing alloc on remote allocator addresses: ['tcp!10.0.2.236:26600', 'tcp!10.0.2.132:26600']\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "New job `slurm:///418` is ready to serve.\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "__main__ 2025-08-29 21:02:53 INFO computed world_sizes:\n", - " ----------------------------------------\n", - " {\n", - " \"rank_0\": 8,\n", - " \"rank_1\": 8,\n", - " \"rank_2\": 8,\n", - " \"rank_3\": 8,\n", - " \"rank_4\": 8,\n", - " \"rank_5\": 8,\n", - " \"rank_6\": 8,\n", - " \"rank_7\": 8\n", - "}\n", - " ----------------------------------------\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\u001b[36m>>> Aggregated Logs (2025-08-29 21:02:51) >>>\u001b[0m\n", - "\u001b[33m[8 similar log lines]\u001b[0m Initializing process group `nccl`:\n", - "\u001b[33m[8 similar log lines]\u001b[0m MASTER_ADDR = gpu-queue-st-gpu-compute-1\n", - "\u001b[33m[8 similar log lines]\u001b[0m MASTER_PORT = 29500\n", - "\u001b[33m[8 similar log lines]\u001b[0m RANK = 5\n", - "\u001b[33m[8 similar log lines]\u001b[0m WORLD_SIZE = 8\n", - "\u001b[36m<<< Aggregated Logs (2025-08-29 21:02:54) <<<\u001b[0m\n", - "\n" - ] - } - ], + "outputs": [], "source": [ "# (c) Meta Platforms, Inc. and affiliates. Confidential and proprietary.\n", "\n", @@ -119,12 +13,13 @@ "# pyre-ignore-all-errors\n", "import json\n", "import logging\n", + "import socket\n", "import sys\n", "\n", "import cloudpickle\n", - "from monarch.tools import commands\n", "from example_actors.compute_world_size_actor import ComputeWorldSizeActor\n", - "from slurm.utils import get_appdef, get_server_info, create_proc_mesh\n", + "from monarch.actor import Actor, endpoint\n", + "from monarch.job import SlurmJob\n", "\n", "\n", "logging.basicConfig(\n", @@ -138,21 +33,45 @@ "logger: logging.Logger = logging.getLogger(__name__)\n", "\n", "\n", + "class _HostnameActor(Actor):\n", + " \"\"\"Helper actor to get hostname from rank 0\"\"\"\n", + " @endpoint\n", + " def get_hostname(self) -> str:\n", + " return socket.gethostname()\n", + "\n", + "\n", "async def main():\n", - " num_hosts = 2\n", - " appdef = await get_appdef(num_hosts)\n", - " server_info = await get_server_info(appdef)\n", + " num_nodes = 2\n", + " gpus_per_node = 8\n", + " mesh_name = \"mesh0\"\n", + " master_port = 29500\n", + " \n", + " # Create SLURM job\n", + " slurm_job = SlurmJob(\n", + " meshes={mesh_name: num_nodes},\n", + " job_name=\"monarch_example\",\n", + " gpus_per_node=gpus_per_node,\n", + " time_limit=\"06:00:00\",\n", + " )\n", "\n", " try:\n", - " proc_mesh = await create_proc_mesh(num_hosts, appdef, server_info)\n", - " actor = await proc_mesh.spawn(\"compute_world_size_actor\", ComputeWorldSizeActor)\n", + " # Get job state and create process mesh\n", + " job_state = slurm_job.state()\n", + " proc_mesh = job_state.mesh0.spawn_procs({\"gpus\": gpus_per_node})\n", + " \n", + " # Get master_addr from rank 0\n", + " hostname_actor = proc_mesh.spawn(\"hostname_actor\", _HostnameActor)\n", + " hostname_values = await hostname_actor.flatten(\"rank\").slice(rank=0).get_hostname.call()\n", + " master_addr = hostname_values.item()\n", + " \n", + " # Spawn actor\n", + " actor = proc_mesh.spawn(\"compute_world_size_actor\", ComputeWorldSizeActor)\n", "\n", " logger.info(\"computing world size...\")\n", " # this is redundant but is here for example sake\n", - " mesh_name = server_info.get_mesh_spec(\"mesh0\").name\n", " values = await actor.compute_world_size.call(\n", - " master_addr=server_info.host0(mesh_name),\n", - " master_port=29500,\n", + " master_addr=master_addr,\n", + " master_port=master_port,\n", " )\n", "\n", " values_by_rank = {f\"rank_{p.rank}\": v for p, v in list(values.flatten(\"rank\"))}\n", @@ -164,7 +83,9 @@ " {'-'*40}\"\"\"\n", " )\n", " finally:\n", - " commands.kill(f\"slurm:///{server_info.name}\")\n", + " # Cancel the SLURM job, releasing all reserved nodes back to the cluster\n", + " slurm_job.kill()\n", + " logger.info(\"Job terminated successfully\")\n", "\n", "\n", "if __name__ == \"__main__\":\n", @@ -172,6 +93,14 @@ "\n", " await main()" ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "71ca61d9", + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { diff --git a/examples/slurm_ddp.ipynb b/examples/slurm_ddp.ipynb index 14dc7a65d..3d72ba7bd 100644 --- a/examples/slurm_ddp.ipynb +++ b/examples/slurm_ddp.ipynb @@ -2,120 +2,10 @@ "cells": [ { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "id": "c443b989-5a71-455f-9a59-9963338634ec", "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - ":8: FutureWarning: Setting `workspace='/home/ubuntu/ahmads/monarch/examples'` is deprecated.\n", - "torchx.schedulers.slurm_scheduler 2025-08-29 20:40:34 INFO unable to get job info for `monarch-ubuntu` with `squeue` (squeue: error: Invalid job id: monarch-ubuntu\n", - "), trying `sacct`\n", - "torchx.schedulers.slurm_scheduler 2025-08-29 20:40:34 INFO unable to get job info for `monarch-ubuntu` with `sacct` (sacct: fatal: Bad job/step specified: monarch-ubuntu\n", - ")\n", - "monarch.tools.commands 2025-08-29 20:40:34 INFO no existing RUNNING server `slurm:///monarch-ubuntu` creating new one...\n", - "torchx.runner.api 2025-08-29 20:40:34 INFO Tracker configurations: {}\n", - "torchx.runner.api 2025-08-29 20:40:34 INFO Checking for changes in workspace `/home/ubuntu/.monarch/out/tmp3m4zzjg6/workspace`...\n", - "torchx.runner.api 2025-08-29 20:40:34 INFO To disable workspaces pass: --workspace=\"\" from CLI or workspace=None programmatically.\n", - "torchx.runner.api 2025-08-29 20:40:34 INFO Reusing original image `monarch_default_workspace:latest` for role[0]=mesh0. Either a patch was built or no changes to workspace was detected.\n", - "monarch.tools.commands 2025-08-29 20:40:34 INFO created new `slurm:///410` waiting for it to be ready...\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Ahmad: {'requeue': None, 'ntasks-per-node': '1', 'cpus-per-task': '48', 'mem': '186777', 'gpus-per-task': '4', 'ntasks': '1'}\n", - "Ahmad: {'requeue': None, 'ntasks-per-node': '1', 'cpus-per-task': '48', 'mem': '186777', 'gpus-per-task': '4', 'ntasks': '1'}\n", - "Waiting for slurm:///410 to be RUNNING (current: PENDING); will check again in 5.0 seconds. Total wait time: 0:00:00.015800\r" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "[-]E0829 20:40:34.996334 8536 hyperactor/src/channel/net.rs:695] error_msg:session tcp:10.0.2.236:26600.5117454862225131082: failed to deliver message within timeout\n", - "[-]E0829 20:40:35.341902 8536 hyperactor/src/channel/net.rs:708] error_msg:session tcp:10.0.2.132:26600.8381289842876906331: failed to receive ack within timeout 30 secs; link is currently broken\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Waiting for slurm:///410 to be RUNNING (current: PENDING); will check again in 5.0 seconds. Total wait time: 0:00:10.059201\r" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "slurm.utils 2025-08-29 20:40:49 INFO \n", - "===== Server Info =====\n", - "{\n", - " \"name\": \"410\",\n", - " \"server_handle\": \"slurm:///410\",\n", - " \"state\": \"RUNNING\",\n", - " \"meshes\": {\n", - " \"mesh0\": {\n", - " \"host_type\": \"__UNSET__\",\n", - " \"hosts\": 2,\n", - " \"gpus\": -1,\n", - " \"hostnames\": [\n", - " \"gpu-queue-st-gpu-compute-1\",\n", - " \"gpu-queue-st-gpu-compute-2\"\n", - " ]\n", - " }\n", - " }\n", - "}\n", - "monarch._src.actor.allocator 2025-08-29 20:40:49 INFO no match label `procmesh.monarch.meta.com/name` specified in alloc constraints\n", - "monarch._src.actor.allocator 2025-08-29 20:40:49 INFO found a single proc mesh `mesh0` in slurm:///410, will allocate on it\n", - "monarch.tools.network 2025-08-29 20:40:49 INFO no AF_INET6 address that can bind TCP sockets for `gpu-queue-st-gpu-compute-1:26600` (error: [Errno -5] No address associated with hostname)\n", - "monarch.tools.network 2025-08-29 20:40:49 INFO resolved AF_INET address `10.0.2.236:26600` for `gpu-queue-st-gpu-compute-1:26600`\n", - "monarch.tools.network 2025-08-29 20:40:49 INFO no AF_INET6 address that can bind TCP sockets for `gpu-queue-st-gpu-compute-2:26600` (error: [Errno -5] No address associated with hostname)\n", - "monarch.tools.network 2025-08-29 20:40:49 INFO resolved AF_INET address `10.0.2.132:26600` for `gpu-queue-st-gpu-compute-2:26600`\n", - "monarch._src.actor.allocator 2025-08-29 20:40:49 INFO initializing alloc on remote allocator addresses: ['tcp!10.0.2.236:26600', 'tcp!10.0.2.132:26600']\n", - "monarch._src.actor.allocator 2025-08-29 20:40:49 INFO no match label `procmesh.monarch.meta.com/name` specified in alloc constraints\n", - "monarch._src.actor.allocator 2025-08-29 20:40:49 INFO found a single proc mesh `mesh0` in slurm:///410, will allocate on it\n", - "monarch.tools.network 2025-08-29 20:40:49 INFO no AF_INET6 address that can bind TCP sockets for `gpu-queue-st-gpu-compute-1:26600` (error: [Errno -5] No address associated with hostname)\n", - "monarch.tools.network 2025-08-29 20:40:49 INFO resolved AF_INET address `10.0.2.236:26600` for `gpu-queue-st-gpu-compute-1:26600`\n", - "monarch.tools.network 2025-08-29 20:40:49 INFO no AF_INET6 address that can bind TCP sockets for `gpu-queue-st-gpu-compute-2:26600` (error: [Errno -5] No address associated with hostname)\n", - "monarch.tools.network 2025-08-29 20:40:49 INFO resolved AF_INET address `10.0.2.132:26600` for `gpu-queue-st-gpu-compute-2:26600`\n", - "monarch._src.actor.allocator 2025-08-29 20:40:49 INFO initializing alloc on remote allocator addresses: ['tcp!10.0.2.236:26600', 'tcp!10.0.2.132:26600']\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "New job `slurm:///410` is ready to serve.\n", - "\u001b[36m>>> Aggregated Logs (2025-08-29 20:40:55) >>>\u001b[0m\n", - "\u001b[33m[8 similar log lines]\u001b[0m self.rank=7 Initializing torch distributed\n", - "\u001b[33m[8 similar log lines]\u001b[0m [Gloo] Rank 0 is connected to 7 peer ranks. Expected number of connected peer ranks is : 7\n", - "\u001b[33m[8 similar log lines]\u001b[0m self.rank=0 Finished initializing torch distributed\n", - "\u001b[33m[8 similar log lines]\u001b[0m self.rank=0 Running basic DDP example\n", - "\u001b[33m[8 similar log lines]\u001b[0m self.rank=5 local_rank=1\n", - "\u001b[36m<<< Aggregated Logs (2025-08-29 20:40:58) <<<\u001b[0m\n", - "\n", - "DDP example completed successfully!\n", - "\u001b[36m>>> Aggregated Logs (2025-08-29 20:40:58) >>>\u001b[0m\n", - "\u001b[33m[8 similar log lines]\u001b[0m self.rank=6 Finished running basic DDP example\n", - "\u001b[33m[8 similar log lines]\u001b[0m self.rank=0 Cleaning up torch distributed\n", - "\u001b[36m<<< Aggregated Logs (2025-08-29 20:41:01) <<<\u001b[0m\n", - "\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "[-]E0829 20:41:30.158934 8536 hyperactor/src/channel/net.rs:695] error_msg:session tcp:10.0.2.132:26600.11111315873644166091: failed to deliver message within timeout\n", - "[-]E0829 20:41:30.774458 8536 hyperactor/src/channel/net.rs:695] error_msg:session tcp:10.0.2.236:26600.6097672994633804723: failed to deliver message within timeout\n", - "[-]E0829 20:41:34.705394 8536 hyperactor/src/channel/net.rs:695] error_msg:session tcp:10.0.2.236:38955.9004778724387042266: failed to deliver message within timeout\n" - ] - } - ], + "outputs": [], "source": [ "# (c) Meta Platforms, Inc. and affiliates. Confidential and proprietary.\n", "\n", @@ -123,19 +13,15 @@ "# pyre-ignore-all-errors\n", "import logging\n", "import os\n", - "import os\n", "import torch\n", "import torch.distributed as dist\n", - "import torch.distributed as dist\n", "import torch.nn as nn\n", "import torch.optim as optim\n", "\n", - "from monarch.tools import commands\n", - "from monarch.actor import Actor, current_rank, endpoint\n", "from monarch.actor import Actor, current_rank, endpoint\n", + "from monarch.job import SlurmJob\n", "from monarch.utils import setup_env_for_distributed\n", "from torch.nn.parallel import DistributedDataParallel as DDP\n", - "from slurm.utils import get_appdef, get_server_info, create_proc_mesh\n", "\n", "\n", "logging.basicConfig(\n", @@ -219,17 +105,30 @@ "\n", "\n", "async def main():\n", - " num_hosts = 2\n", - " appdef = await get_appdef(num_hosts)\n", - " server_info = await get_server_info(appdef)\n", + " num_nodes = 2\n", + " gpus_per_node = 8\n", + " mesh_name = \"mesh0\"\n", + " \n", + " # Create SLURM job\n", + " slurm_job = SlurmJob(\n", + " meshes={mesh_name: num_nodes},\n", + " job_name=\"monarch_example\",\n", + " gpus_per_node=gpus_per_node,\n", + " time_limit=\"06:00:00\",\n", + " )\n", "\n", " try:\n", - " proc_mesh = await create_proc_mesh(num_hosts, appdef, server_info)\n", + " # Get job state and create process mesh\n", + " job_state = slurm_job.state()\n", + " proc_mesh = job_state.mesh0.spawn_procs({\"gpus\": gpus_per_node})\n", "\n", - " ddp_actor = await proc_mesh.spawn(\"ddp_actor\", DDPActor)\n", + " # Spawn DDP actor\n", + " ddp_actor = proc_mesh.spawn(\"ddp_actor\", DDPActor)\n", "\n", + " # Setup distributed environment\n", " await setup_env_for_distributed(proc_mesh)\n", "\n", + " # Run DDP example\n", " await ddp_actor.setup.call()\n", " await ddp_actor.demo_basic.call()\n", " await ddp_actor.cleanup.call()\n", @@ -237,7 +136,9 @@ " print(\"DDP example completed successfully!\")\n", "\n", " finally:\n", - " commands.kill(f\"slurm:///{server_info.name}\")\n", + " # Cancel the SLURM job, releasing all reserved nodes back to the cluster\n", + " slurm_job.kill()\n", + " logger.info(\"Job terminated successfully\")\n", "\n", "\n", "if __name__ == \"__main__\":\n", @@ -255,21 +156,9 @@ ], "metadata": { "kernelspec": { - "display_name": "ahmads-nightly4", + "display_name": "Python (monarch)", "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.10.18" + "name": "monarch" } }, "nbformat": 4, diff --git a/examples/slurm_titan.ipynb b/examples/slurm_titan.ipynb index 4edd474e2..f6d7fb0d1 100644 --- a/examples/slurm_titan.ipynb +++ b/examples/slurm_titan.ipynb @@ -1,389 +1,270 @@ { - "cells": [ - { - "cell_type": "markdown", - "id": "1c91f6d2", - "metadata": {}, - "source": [ - "## Monarch + TorchTitan on SLURM\n", - "This example notebook demonstrates how you can easily run and iterate on a distributed training job with Monarch and TorchTitan.\n", - "\n", - "#### Prerequisites\n", - "Please make sure your environment is setup for this notebook:\n", - "1. Install Monarch nightly: https://github.com/meta-pytorch/monarch/blob/main/scripts/install_nightly.py\n", - "2. Install Titan nightly: https://github.com/pytorch/torchtitan?tab=readme-ov-file#nightly-builds\n", - "3. Ensure you have a valid Titan model config in the script directory (i.e: https://github.com/pytorch/torchtitan/blob/main/torchtitan/models/llama3/train_configs/debug_model.toml)" - ] - }, - { - "cell_type": "markdown", - "id": "77cd971d", - "metadata": {}, - "source": [ - "### 1. Reserve your SLURM job\n", - "If necessary, update paramaters for your cluster:\n", - "- host_type: TorchX named resource for your cluster (default: \"gpu.xlarge\")\n", - "- host_memory: Memory per machine in MB (default: 2062607)\n", - "\n", - "For more information on TorchX resources: https://docs.pytorch.org/torchx/main/specs.html#resource" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "85b0693f", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - ":8: FutureWarning: Setting `workspace='/mnt/home/amirafzali/monarch/examples'` is deprecated. Use `workspace=monarch.tools.config.workspace.Workspace(dirs=['/mnt/home/amirafzali/monarch/examples'])` instead.\n", - "torchx.schedulers.slurm_scheduler 2025-09-23 04:50:42 INFO unable to get job info for `monarch-amirafzali` with `squeue` (squeue: error: Invalid job id: monarch-amirafzali\n", - "), trying `sacct`\n", - "torchx.schedulers.slurm_scheduler 2025-09-23 04:50:42 INFO unable to get job info for `monarch-amirafzali` with `sacct` (sacct: fatal: Bad job/step specified: monarch-amirafzali\n", - ")\n", - "monarch.tools.commands 2025-09-23 04:50:42 INFO no existing RUNNING server `slurm:///monarch-amirafzali` creating new one...\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "torchx.runner.api 2025-09-23 04:50:42 INFO Tracker configurations: {}\n", - "torchx.runner.api 2025-09-23 04:50:42 INFO Checking for changes in workspace `/mnt/home/amirafzali/.monarch/out/tmptsrcdyex/workspace`...\n", - "torchx.runner.api 2025-09-23 04:50:42 INFO To disable workspaces pass: --workspace=\"\" from CLI or workspace=None programmatically.\n", - "torchx.runner.api 2025-09-23 04:50:42 INFO Reusing original image `monarch_default_workspace:latest` for role[0]=mesh0. Either a patch was built or no changes to workspace was detected.\n", - "monarch.tools.commands 2025-09-23 04:50:42 INFO created new `slurm:///295` waiting for it to be ready...\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "New job `slurm:///295` is ready to serve.urrent: PENDING); will check again in 5.0 seconds. Total wait time: 0:00:25.251284\n" - ] - } - ], - "source": [ - "# (c) Meta Platforms, Inc. and affiliates. Confidential and proprietary.\n", - "\n", - "from slurm.utils import get_appdef, get_server_info, create_proc_mesh\n", - "\n", - "\n", - "num_nodes = 2 # assign for your system\n", - "appdef = await get_appdef(\n", - " num_nodes,\n", - " # host_type = ...\n", - ")\n", - "server_info = await get_server_info(\n", - " appdef,\n", - " # host_memory = ...\n", - ")" - ] - }, - { - "cell_type": "markdown", - "id": "663e41ce", - "metadata": {}, - "source": [ - "### 2. Define your Titan and cluster parameters" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "39d51df7", - "metadata": {}, - "outputs": [], - "source": [ - "# (c) Meta Platforms, Inc. and affiliates. Confidential and proprietary.\n", - "\n", - "from torchtitan.train import Trainer\n", - "from torchtitan.config import ConfigManager, JobConfig\n", - "from monarch.actor import Actor, current_rank, endpoint\n", - "from torchtitan.tools.logging import init_logger, logger\n", - "import torch\n", - "from dataclasses import dataclass\n", - "import os\n", - "from monarch.tools import commands\n", - "from monarch.utils import setup_env_for_distributed\n", - "\n", - "\n", - "@dataclass\n", - "class RunParams:\n", - " \"\"\"\n", - " Parameters for your cluster and training job, adjust as needed\n", - " \"\"\"\n", - " training_steps: int = 50\n", - " model_config = \"debug_model.toml\"\n", - " dataset = \"c4\"\n", - " num_nodes = num_nodes\n", - " gpus_per_node = 4\n", - "\n", - "\n", - "class TrainerActor(Actor):\n", - " \"\"\"\n", - " A simple wrapper class with executes a TorchTitan trainer in a Monarch actor\n", - " \"\"\"\n", - " def __init__(self, job_config: JobConfig) -> None:\n", - " self.job_config = job_config\n", - " rank = current_rank().rank\n", - " self.uid = f\"[trainer_{rank}]\"\n", - "\n", - " @endpoint\n", - " async def start_training(self) -> None:\n", - " init_logger()\n", - " trainer: Trainer | None = None\n", - "\n", - " try:\n", - " trainer = Trainer(self.job_config)\n", - " logger.info(f\"{self.uid} initialized successfully and starting training\")\n", - " trainer.train()\n", - " except Exception:\n", - " if trainer:\n", - " trainer.close()\n", - " raise\n", - " else:\n", - " trainer.close()\n", - " finally:\n", - " torch.distributed.destroy_process_group()\n", - " logger.info(f\"{self.uid} trainer cleaned up\")\n", - "\n", - "def make_job_config() -> JobConfig:\n", - " \"\"\"\n", - " Create a job config which is digested by TorchTitan, sourced from RunParams\n", - " \"\"\"\n", - " data_parallel_shard_degree = RunParams.num_nodes * RunParams.gpus_per_node\n", - " output_path = \"./outputs\"\n", - "\n", - " script_dir = globals()['_dh'][0]\n", - " default_args = [\n", - " \"--job.config_file\",\n", - " os.path.join(script_dir, RunParams.model_config),\n", - " \"--model.tokenizer_path\",\n", - " os.path.join(script_dir, \"tokenizer\"),\n", - " \"--comm.trace_buf_size\",\n", - " \"0\",\n", - " \"--metrics.log_freq\",\n", - " \"1\",\n", - " \"--parallelism.data_parallel_shard_degree\",\n", - " str(data_parallel_shard_degree),\n", - " \"--activation_checkpoint.mode\",\n", - " \"full\",\n", - " \"--comm.train_timeout_seconds\",\n", - " \"60\",\n", - " \"--training.steps\",\n", - " str(RunParams.training_steps),\n", - " \"--training.dataset\",\n", - " RunParams.dataset,\n", - " \"--job.dump_folder\",\n", - " output_path,\n", - " \"--metrics.enable_tensorboard\",\n", - " ]\n", - "\n", - " config_manager = ConfigManager()\n", - " job_config = config_manager.parse_args(default_args)\n", - "\n", - " return job_config" - ] - }, - { - "cell_type": "markdown", - "id": "04425384", - "metadata": {}, - "source": [ - "### 3. Execute your training job\n", - "You can make adjustments and run this on the existing SLURM allocations as many times as you would like!" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "091a7066", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "root 2025-09-23 04:51:17 WARNING tokenizer_path is deprecated, use model.hf_assets_path instead. Setting hf_assets_path to tokenizer_path temporarily.\n", - "slurm.utils 2025-09-23 04:51:17 INFO \n", - "===== Server Info =====\n", - "{\n", - " \"name\": \"295\",\n", - " \"server_handle\": \"slurm:///295\",\n", - " \"state\": \"RUNNING\",\n", - " \"meshes\": {\n", - " \"mesh0\": {\n", - " \"host_type\": \"__UNSET__\",\n", - " \"hosts\": 2,\n", - " \"gpus\": -1,\n", - " \"hostnames\": [\n", - " \"slurm-h100-206-061\",\n", - " \"slurm-h100-206-089\"\n", - " ]\n", - " }\n", - " }\n", - "}\n", - "monarch._src.actor.allocator 2025-09-23 04:51:17 INFO no match label `procmesh.monarch.meta.com/name` specified in alloc constraints\n", - "monarch._src.actor.allocator 2025-09-23 04:51:17 INFO found a single proc mesh `mesh0` in slurm:///295, will allocate on it\n", - "monarch.tools.network 2025-09-23 04:51:17 INFO no AF_INET6 address that can bind TCP sockets for `slurm-h100-206-061:26600` (error: [Errno -5] No address associated with hostname)\n", - "monarch.tools.network 2025-09-23 04:51:17 INFO resolved AF_INET address `address1:26600` for `slurm-h100-206-061:26600`\n", - "monarch.tools.network 2025-09-23 04:51:17 INFO no AF_INET6 address that can bind TCP sockets for `slurm-h100-206-089:26600` (error: [Errno -5] No address associated with hostname)\n", - "monarch.tools.network 2025-09-23 04:51:17 INFO resolved AF_INET address `address2:26600` for `slurm-h100-206-089:26600`\n", - "monarch._src.actor.allocator 2025-09-23 04:51:17 INFO initializing alloc on remote allocator addresses: ['tcp!address1:26600', 'tcp!address2:26600']\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "monarch._src.actor.allocator 2025-09-23 04:51:17 INFO no match label `procmesh.monarch.meta.com/name` specified in alloc constraints\n", - "monarch._src.actor.allocator 2025-09-23 04:51:17 INFO found a single proc mesh `mesh0` in slurm:///295, will allocate on it\n", - "monarch.tools.network 2025-09-23 04:51:17 INFO no AF_INET6 address that can bind TCP sockets for `slurm-h100-206-061:26600` (error: [Errno -5] No address associated with hostname)\n", - "monarch.tools.network 2025-09-23 04:51:17 INFO resolved AF_INET address `address1:26600` for `slurm-h100-206-061:26600`\n", - "monarch.tools.network 2025-09-23 04:51:17 INFO no AF_INET6 address that can bind TCP sockets for `slurm-h100-206-089:26600` (error: [Errno -5] No address associated with hostname)\n", - "monarch.tools.network 2025-09-23 04:51:17 INFO resolved AF_INET address `address2:26600` for `slurm-h100-206-089:26600`\n", - "monarch._src.actor.allocator 2025-09-23 04:51:17 INFO initializing alloc on remote allocator addresses: ['tcp!address1:26600', 'tcp!address2:26600']\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\u001b[36m>>> Aggregated Logs (2025-09-23 04:51:21) >>>\u001b[0m\n", - "\u001b[33m[1 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:26,853 - root - INFO - Starting job: Llama 3 debug training\n", - "\u001b[36m<<< Aggregated Logs (2025-09-23 04:51:26) <<<\u001b[0m\n", - "\n", - "\u001b[36m>>> Aggregated Logs (2025-09-23 04:51:26) >>>\u001b[0m\n", - "\u001b[33m[7 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:26,853 - root - INFO - Starting job: Llama 3 debug training\n", - "\u001b[33m[8 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:27,460 - root - INFO - Building 1-D device mesh with ['dp_shard'], [8]\n", - "\u001b[33m[8 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:27,463 - root - INFO - [GC] Initial GC collection 0.00 seconds\n", - "\u001b[33m[8 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:29,199 - root - INFO - Loading tokenizer from tokenizer.json\n", - "\u001b[33m[8 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:29,201 - root - INFO - Preparing c4 dataset from allenai/c4\n", - "\u001b[36m<<< Aggregated Logs (2025-09-23 04:51:29) <<<\u001b[0m\n", - "\n", - "\u001b[36m>>> Aggregated Logs (2025-09-23 04:51:29) >>>\u001b[0m\n", - "\u001b[33m[1 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:33,920 - root - INFO - Building llama3 debugmodel with TransformerModelArgs(_enforced='This field is used to enforce all fields have defaults.', dim=256, n_layers=6, n_heads=16, n_kv_heads=None, vocab_size=2000, multiple_of=256, ffn_dim_multiplier=None, norm_eps=1e-05, rope_theta=500000, max_seq_len=2048, depth_init=True, use_flex_attn=False, attn_mask_type='causal', eos_id=0)\n", - "\u001b[36m<<< Aggregated Logs (2025-09-23 04:51:33) <<<\u001b[0m\n", - "\n", - "\u001b[36m>>> Aggregated Logs (2025-09-23 04:51:33) >>>\u001b[0m\n", - "\u001b[33m[1 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:33,972 - root - INFO - TensorBoard logging enabled. Logs will be saved at ./outputs/tb/20250923-0451\n", - "\u001b[33m[8 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:33,973 - root - INFO - CUDA capacity: NVIDIA H100 80GB HBM3 with 79.19GiB memory\n", - "\u001b[33m[16 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:33,976 - root - WARNING - Error running lspci: [Errno 2] No such file or directory: 'lspci', fallback to use device_name\n", - "\u001b[33m[8 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:33,976 - root - INFO - \u001b[34mModel llama3 debugmodel \u001b[31msize: 6,139,136 total parameters\u001b[39m\n", - "\u001b[33m[8 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:33,976 - root - INFO - Applied full activation checkpointing to the model\n", - "\u001b[33m[8 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:33,999 - root - INFO - Applied FSDP to the model\n", - "\u001b[33m[8 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:34,128 - root - INFO - Peak FLOPS used for computing MFU: 9.890e+14\n", - "\u001b[33m[8 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:34,129 - root - INFO - CUDA memory usage for model: 0.00GiB(0.00%)\n", - "\u001b[33m[8 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:34,129 - root - WARNING - model.safetensors.index.json not found at hf_assets_path: /mnt/home/amirafzali/monarch/examples/tokenizer/model.safetensors.index.json. Defaulting to saving a single safetensors file if checkpoint is saved in HF format\n", - "\u001b[33m[8 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:34,129 - root - INFO - Mixed precision training is handled by fully_shard\n", - "\u001b[33m[8 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:34,129 - root - INFO - Trainer is initialized with local batch size 8, global batch size 64, gradient accumulation steps 1, sequence length 2048, total steps 50 (warmup 2)\n", - "\u001b[33m[8 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:34,129 - root - INFO - [trainer_0] initialized successfully and starting training\n", - "\u001b[33m[8 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:34,129 - root - INFO - Training starts at step 1\n", - "\u001b[33m[7 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:34,715 - root - INFO - Building llama3 debugmodel with TransformerModelArgs(_enforced='This field is used to enforce all fields have defaults.', dim=256, n_layers=6, n_heads=16, n_kv_heads=None, vocab_size=2000, multiple_of=256, ffn_dim_multiplier=None, norm_eps=1e-05, rope_theta=500000, max_seq_len=2048, depth_init=True, use_flex_attn=False, attn_mask_type='causal', eos_id=0)\n", - "\u001b[36m<<< Aggregated Logs (2025-09-23 04:51:36) <<<\u001b[0m\n", - "\n", - "\u001b[36m>>> Aggregated Logs (2025-09-23 04:51:36) >>>\u001b[0m\n", - "\u001b[33m[222 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:37,619 - root - INFO - \u001b[31mstep: 1 \u001b[32mloss: 8.0845 \u001b[38;2;180;60;0mgrad_norm: 1.3289 \u001b[38;2;54;234;195mmemory: 0.71GiB(0.90%) \u001b[34mtps: 6,301 \u001b[36mtflops: 0.45 \u001b[35mmfu: 0.05%\u001b[39m\n", - "\u001b[33m[8 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:37,619 - root - INFO - Synchronizing and adjusting timeout for all ProcessGroups to 0:01:00\n", - "\u001b[33m[178 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:38,844 - root - INFO - \u001b[31mstep: 28 \u001b[32mloss: 2.8993 \u001b[38;2;180;60;0mgrad_norm: 0.2466 \u001b[38;2;54;234;195mmemory: 0.84GiB(1.06%) \u001b[34mtps: 398,483 \u001b[36mtflops: 28.50 \u001b[35mmfu: 2.88%\u001b[39m\n", - "\u001b[33m[8 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:39,821 - root - INFO - [GC] Performing periodical GC collection 0.01 seconds\n", - "\u001b[33m[7 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:39,866 - root - INFO - Training completed\n", - "\u001b[33m[1 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:39,866 - root - INFO - Sleeping 2 seconds for other ranks to complete\n", - "\u001b[36m<<< Aggregated Logs (2025-09-23 04:51:39) <<<\u001b[0m\n", - "\n", - "\u001b[36m>>> Aggregated Logs (2025-09-23 04:51:39) >>>\u001b[0m\n", - "\u001b[33m[8 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:40,045 - root - INFO - [trainer_2] trainer cleaned up\n", - "\u001b[33m[1 similar log lines]\u001b[0m [titan] 2025-09-23 04:51:41,868 - root - INFO - Training completed\n", - "\u001b[36m<<< Aggregated Logs (2025-09-23 04:51:42) <<<\u001b[0m\n", - "\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "[-]E0923 04:51:44.398610 2147845 hyperactor_mesh/src/alloc/remoteprocess.rs:1088] failed to cleanup disconnected host: got channel closed event for host address1 which has no known state\n" - ] - } - ], - "source": [ - "# (c) Meta Platforms, Inc. and affiliates. Confidential and proprietary.\n", - "\n", - "async def main():\n", - " job_config = make_job_config()\n", - " proc_mesh = None\n", - "\n", - " try:\n", - " # 1. Create a proc mesh on your SLURM allocation\n", - " proc_mesh = await create_proc_mesh(RunParams.num_nodes, appdef, server_info)\n", - " # 2. Define remote logging behavior\n", - " await proc_mesh.logging_option(\n", - " stream_to_client=True\n", - " # aggregate_window_sec=None\n", - " )\n", - " # 3. Prepare trainer for torch distributed\n", - " await setup_env_for_distributed(proc_mesh)\n", - " trainer = await proc_mesh.spawn(\"trainer_actor\", TrainerActor, job_config)\n", - " # 4. Execute the taining job\n", - " await trainer.start_training.call()\n", - " except Exception as e:\n", - " logger.info(f\"Trainer failed: {e}\")\n", - " finally:\n", - " if proc_mesh:\n", - " await proc_mesh.stop()\n", - "\n", - "\n", - "if __name__ == \"__main__\":\n", - " await main()" - ] - }, - { - "cell_type": "markdown", - "id": "0e13bf71", - "metadata": {}, - "source": [ - "### 4. Destory the SLURM job\n", - "Once you're done experimenting, free up the allocation" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "9c10aa93", - "metadata": {}, - "outputs": [], - "source": [ - "# (c) Meta Platforms, Inc. and affiliates. Confidential and proprietary.\n", - "\n", - "commands.kill(f\"slurm:///{server_info.name}\")" - ] - } - ], - "metadata": { - "fileHeader": "", - "fileUid": "2fa680ca-06ba-41e3-ac40-90b22d77bbc3", - "isAdHoc": false, - "kernelspec": { - "display_name": "venv", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.10.18" - } + "cells": [ + { + "cell_type": "markdown", + "id": "1c91f6d2", + "metadata": {}, + "source": [ + "## Monarch + TorchTitan on SLURM\n", + "This example notebook demonstrates how you can easily run and iterate on a distributed training job with Monarch and TorchTitan.\n", + "\n", + "#### Prerequisites\n", + "Please make sure your environment is setup for this notebook:\n", + "1. Install Monarch nightly: https://github.com/meta-pytorch/monarch/blob/main/scripts/install_nightly.py\n", + "2. Install Titan nightly: https://github.com/pytorch/torchtitan?tab=readme-ov-file#nightly-builds\n", + "3. Ensure you have a valid Titan model config in the script directory (i.e: https://github.com/pytorch/torchtitan/blob/main/torchtitan/models/llama3/train_configs/debug_model.toml)" + ] }, - "nbformat": 4, - "nbformat_minor": 2 + { + "cell_type": "markdown", + "id": "77cd971d", + "metadata": {}, + "source": [ + "### 1. Create your SLURM job\n", + "Configure parameters for your cluster:\n", + "- num_nodes: Number of nodes to allocate (default: 2)\n", + "- gpus_per_node: Number of GPUs per node (default: 8)\n", + "- mesh_name: Name for the mesh (default: \"mesh0\")\n", + "- time_limit: Maximum job duration (default: \"06:00:00\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "85b0693f", + "metadata": {}, + "outputs": [], + "source": [ + "# (c) Meta Platforms, Inc. and affiliates. Confidential and proprietary.\n", + "\n", + "import logging\n", + "from monarch.job import SlurmJob\n", + "\n", + "# Configure logging\n", + "logging.basicConfig(\n", + " level=logging.INFO,\n", + " format=\"%(name)s %(asctime)s %(levelname)s %(message)s\",\n", + " datefmt=\"%Y-%m-%d %H:%M:%S\",\n", + " force=True,\n", + ")\n", + "logger: logging.Logger = logging.getLogger(__name__)\n", + "\n", + "# Configure job parameters\n", + "num_nodes = 2 # assign for your system\n", + "gpus_per_node = 8 # adjust for your hardware\n", + "mesh_name = \"mesh0\"\n", + "\n", + "# Create a SLURM job with N nodes\n", + "slurm_job = SlurmJob(\n", + " meshes={mesh_name: num_nodes},\n", + " job_name=\"monarch_example\",\n", + " gpus_per_node=gpus_per_node,\n", + " time_limit=\"06:00:00\",\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "663e41ce", + "metadata": {}, + "source": [ + "### 2. Define your Titan and cluster parameters" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "39d51df7", + "metadata": {}, + "outputs": [], + "source": [ + "# (c) Meta Platforms, Inc. and affiliates. Confidential and proprietary.\n", + "\n", + "from torchtitan.train import Trainer\n", + "from torchtitan.config import ConfigManager, JobConfig\n", + "from monarch.actor import Actor, current_rank, endpoint\n", + "from torchtitan.tools.logging import init_logger, logger\n", + "import torch\n", + "from dataclasses import dataclass\n", + "import os\n", + "from monarch.utils import setup_env_for_distributed\n", + "\n", + "\n", + "@dataclass\n", + "class RunParams:\n", + " \"\"\"\n", + " Parameters for your cluster and training job, adjust as needed\n", + " \"\"\"\n", + " training_steps: int = 50\n", + " model_config = \"debug_model.toml\"\n", + " dataset = \"c4\"\n", + " num_nodes = num_nodes\n", + " gpus_per_node = gpus_per_node\n", + "\n", + "\n", + "class TrainerActor(Actor):\n", + " \"\"\"\n", + " A simple wrapper class with executes a TorchTitan trainer in a Monarch actor\n", + " \"\"\"\n", + " def __init__(self, job_config: JobConfig) -> None:\n", + " self.job_config = job_config\n", + " rank = current_rank().rank\n", + " self.uid = f\"[trainer_{rank}]\"\n", + "\n", + " @endpoint\n", + " async def start_training(self) -> None:\n", + " init_logger()\n", + " trainer: Trainer | None = None\n", + "\n", + " try:\n", + " trainer = Trainer(self.job_config)\n", + " logger.info(f\"{self.uid} initialized successfully and starting training\")\n", + " trainer.train()\n", + " except Exception:\n", + " if trainer:\n", + " trainer.close()\n", + " raise\n", + " else:\n", + " trainer.close()\n", + " finally:\n", + " torch.distributed.destroy_process_group()\n", + " logger.info(f\"{self.uid} trainer cleaned up\")\n", + "\n", + "def make_job_config() -> JobConfig:\n", + " \"\"\"\n", + " Create a job config which is digested by TorchTitan, sourced from RunParams\n", + " \"\"\"\n", + " data_parallel_shard_degree = RunParams.num_nodes * RunParams.gpus_per_node\n", + " output_path = \"./outputs\"\n", + "\n", + " script_dir = globals()['_dh'][0]\n", + " default_args = [\n", + " \"--job.config_file\",\n", + " os.path.join(script_dir, RunParams.model_config),\n", + " \"--model.tokenizer_path\",\n", + " os.path.join(script_dir, \"tokenizer\"),\n", + " \"--comm.trace_buf_size\",\n", + " \"0\",\n", + " \"--metrics.log_freq\",\n", + " \"1\",\n", + " \"--parallelism.data_parallel_shard_degree\",\n", + " str(data_parallel_shard_degree),\n", + " \"--activation_checkpoint.mode\",\n", + " \"full\",\n", + " \"--comm.train_timeout_seconds\",\n", + " \"60\",\n", + " \"--training.steps\",\n", + " str(RunParams.training_steps),\n", + " \"--training.dataset\",\n", + " RunParams.dataset,\n", + " \"--job.dump_folder\",\n", + " output_path,\n", + " \"--metrics.enable_tensorboard\",\n", + " ]\n", + "\n", + " config_manager = ConfigManager()\n", + " job_config = config_manager.parse_args(default_args)\n", + "\n", + " return job_config" + ] + }, + { + "cell_type": "markdown", + "id": "04425384", + "metadata": {}, + "source": [ + "### 3. Execute your training job\n", + "You can make adjustments and run this on the existing SLURM allocations as many times as you would like!" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "091a7066", + "metadata": {}, + "outputs": [], + "source": [ + "# (c) Meta Platforms, Inc. and affiliates. Confidential and proprietary.\n", + "\n", + "async def main():\n", + " job_config = make_job_config()\n", + "\n", + " try:\n", + " # 1. Get job state and create process mesh\n", + " job_state = slurm_job.state()\n", + " proc_mesh = job_state.mesh0.spawn_procs({\"gpus\": RunParams.gpus_per_node})\n", + " \n", + " # 2. Configure remote logging behavior\n", + " await proc_mesh.logging_option(\n", + " stream_to_client=True,\n", + " # aggregate_window_sec=None # Uncomment to disable log batching\n", + " )\n", + " \n", + " # 3. Setup environment for torch.distributed\n", + " await setup_env_for_distributed(proc_mesh)\n", + " \n", + " # 4. Spawn TrainerActor on each GPU\n", + " trainer = proc_mesh.spawn(\"trainer_actor\", TrainerActor, job_config)\n", + " \n", + " # 5. Execute the training job\n", + " await trainer.start_training.call()\n", + " \n", + " logger.info(\"Training completed successfully!\")\n", + " \n", + " except Exception as e:\n", + " logger.error(f\"Training workflow failed: {e}\")\n", + "\n", + "\n", + "if __name__ == \"__main__\":\n", + " await main()" + ] + }, + { + "cell_type": "markdown", + "id": "0e13bf71", + "metadata": {}, + "source": [ + "### 4. Cleanup the SLURM job\n", + "Once you're done experimenting, free up the allocation" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9c10aa93", + "metadata": {}, + "outputs": [], + "source": [ + "# (c) Meta Platforms, Inc. and affiliates. Confidential and proprietary.\n", + "\n", + "# Cancel the SLURM job, releasing all reserved nodes back to the cluster\n", + "slurm_job.kill()\n", + "logger.info(\"Job terminated successfully\")" + ] + } + ], + "metadata": { + "fileHeader": "", + "fileUid": "2fa680ca-06ba-41e3-ac40-90b22d77bbc3", + "isAdHoc": false, + "kernelspec": { + "display_name": "Python (monarch)", + "language": "python", + "name": "monarch" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.12" + } + }, + "nbformat": 4, + "nbformat_minor": 2 }