Skip to content

Commit c6c7ba9

Browse files
Refactor multi-node running command into dedicated functions (#6623)
Fixes #6567 . ### Description The PR aims to refactor the multi-node command preparation and running for auto3dseg. In the initial draft, I assume the functions are only for Auto3DSeg and they are internal, so I put them in the Auto3DSeg `utils`. I am open to changes if we think the usage can be more general. Some details: To address the 3 variations of commands used in Auto3DSeg: 1. `python script.py <options>` 2. `torchrun <specs> script.py <options>` 3. `bcprun <specs> -c python script.py <options>` I split the `<options>` and `<specs>` passing in different stages - `<options>` is in preparation stage, e.g. `create_cmd` in Auto3DSeg - `<specs>` is related to device configuration, and so it is in the launch stage e.g. `run_cmd` in Auto3DSeg Each variation has its version of preparation and launching. ### Types of changes <!--- Put an `x` in all the boxes that apply, and remove the not applicable items --> - [x] Non-breaking change (fix or new feature that would not break existing functionality). - [ ] Breaking change (fix or new feature that would cause existing functionality to change). - [ ] New tests added to cover the changes.- [ ] Integration tests passed locally by running `./runtests.sh -f -u --net --coverage`. - [ ] Quick tests passed locally by running `./runtests.sh --quick --unittests --disttests`. - [ ] In-line docstrings updated. - [ ] Documentation updated, tested `make html` command in the `docs/` folder. --------- Signed-off-by: Mingxin <18563433+mingxin-zheng@users.noreply.github.com>
1 parent 922b11e commit c6c7ba9

File tree

3 files changed

+226
-77
lines changed

3 files changed

+226
-77
lines changed

monai/apps/auto3dseg/bundle_gen.py

Lines changed: 62 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@
1313

1414
import importlib
1515
import os
16+
import re
1617
import shutil
1718
import subprocess
1819
import sys
1920
import time
2021
import warnings
21-
from collections.abc import Mapping
2222
from copy import deepcopy
2323
from pathlib import Path
2424
from tempfile import TemporaryDirectory
@@ -30,10 +30,17 @@
3030
from monai.apps import download_and_extract
3131
from monai.apps.utils import get_logger
3232
from monai.auto3dseg.algo_gen import Algo, AlgoGen
33-
from monai.auto3dseg.utils import algo_to_pickle
33+
from monai.auto3dseg.utils import (
34+
_prepare_cmd_bcprun,
35+
_prepare_cmd_default,
36+
_prepare_cmd_torchrun,
37+
_run_cmd_bcprun,
38+
_run_cmd_torchrun,
39+
algo_to_pickle,
40+
)
3441
from monai.bundle.config_parser import ConfigParser
3542
from monai.config import PathLike
36-
from monai.utils import ensure_tuple, run_cmd
43+
from monai.utils import ensure_tuple, look_up_option, run_cmd
3744
from monai.utils.enums import AlgoKeys
3845

3946
logger = get_logger(module_name=__name__)
@@ -88,7 +95,7 @@ def __init__(self, template_path: PathLike):
8895
"n_devices": int(torch.cuda.device_count()),
8996
"NUM_NODES": int(os.environ.get("NUM_NODES", 1)),
9097
"MN_START_METHOD": os.environ.get("MN_START_METHOD", "bcprun"),
91-
"CMD_PREFIX": os.environ.get("CMD_PREFIX"), # type: ignore
98+
"CMD_PREFIX": os.environ.get("CMD_PREFIX", ""), # type: ignore
9299
}
93100

94101
def pre_check_skip_algo(self, skip_bundlegen: bool = False, skip_info: str = "") -> tuple[bool, str]:
@@ -175,36 +182,45 @@ def _create_cmd(self, train_params: None | dict = None) -> tuple[str, str]:
175182
train_py = os.path.join(self.output_path, "scripts", "train.py")
176183
config_dir = os.path.join(self.output_path, "configs")
177184

185+
config_files = []
178186
if os.path.isdir(config_dir):
179-
base_cmd = ""
180187
for file in sorted(os.listdir(config_dir)):
181-
if not (file.endswith("yaml") or file.endswith("json")):
182-
continue
183-
base_cmd += f"{train_py} run --config_file=" if len(base_cmd) == 0 else ","
184-
# Python Fire may be confused by single-quoted WindowsPath
185-
config_yaml = Path(os.path.join(config_dir, file)).as_posix()
186-
base_cmd += f"'{config_yaml}'"
187-
cmd: str | None = self.device_setting["CMD_PREFIX"] # type: ignore
188-
# make sure cmd end with a space
189-
if cmd is not None and not cmd.endswith(" "):
190-
cmd += " "
191-
if (int(self.device_setting["NUM_NODES"]) > 1 and self.device_setting["MN_START_METHOD"] == "bcprun") or (
192-
int(self.device_setting["NUM_NODES"]) <= 1 and int(self.device_setting["n_devices"]) <= 1
193-
):
194-
cmd = "python " if cmd is None else cmd
195-
elif int(self.device_setting["NUM_NODES"]) > 1:
196-
raise NotImplementedError(
197-
f"{self.device_setting['MN_START_METHOD']} is not supported yet."
198-
"Try modify BundleAlgo._create_cmd for your cluster."
188+
if file.endswith("yaml") or file.endswith("json"):
189+
# Python Fire may be confused by single-quoted WindowsPath
190+
config_files.append(Path(os.path.join(config_dir, file)).as_posix())
191+
192+
if int(self.device_setting["NUM_NODES"]) > 1:
193+
# multi-node command
194+
# only bcprun is supported for now
195+
try:
196+
look_up_option(self.device_setting["MN_START_METHOD"], ["bcprun"])
197+
except ValueError as err:
198+
raise NotImplementedError(
199+
f"{self.device_setting['MN_START_METHOD']} is not supported yet."
200+
"Try modify BundleAlgo._create_cmd for your cluster."
201+
) from err
202+
203+
return (
204+
_prepare_cmd_bcprun(
205+
f"{train_py} run",
206+
cmd_prefix=f"{self.device_setting['CMD_PREFIX']}",
207+
config_file=config_files,
208+
**params,
209+
),
210+
"",
199211
)
212+
elif int(self.device_setting["n_devices"]) > 1:
213+
return _prepare_cmd_torchrun(f"{train_py} run", config_file=config_files, **params), ""
200214
else:
201-
if cmd is None:
202-
cmd = f"torchrun --nnodes={1:d} --nproc_per_node={self.device_setting['n_devices']:d} "
203-
cmd += base_cmd
204-
if params and isinstance(params, Mapping):
205-
for k, v in params.items():
206-
cmd += f" --{k}={v}"
207-
return cmd, ""
215+
return (
216+
_prepare_cmd_default(
217+
f"{train_py} run",
218+
cmd_prefix=f"{self.device_setting['CMD_PREFIX']}",
219+
config_file=config_files,
220+
**params,
221+
),
222+
"",
223+
)
208224

209225
def _run_cmd(self, cmd: str, devices_info: str = "") -> subprocess.CompletedProcess:
210226
"""
@@ -216,34 +232,26 @@ def _run_cmd(self, cmd: str, devices_info: str = "") -> subprocess.CompletedProc
216232

217233
ps_environ = os.environ.copy()
218234
ps_environ["CUDA_VISIBLE_DEVICES"] = str(self.device_setting["CUDA_VISIBLE_DEVICES"])
235+
236+
# delete pattern "VAR=VALUE" at the beginning of the string, with optional leading/trailing whitespaces
237+
cmd = re.sub(r"^\s*\w+=.*?\s+", "", cmd)
238+
219239
if int(self.device_setting["NUM_NODES"]) > 1:
220-
if self.device_setting["MN_START_METHOD"] == "bcprun":
221-
cmd_list = [
222-
"bcprun",
223-
"-n",
224-
str(self.device_setting["NUM_NODES"]),
225-
"-p",
226-
str(self.device_setting["n_devices"]),
227-
"-c",
228-
cmd,
229-
]
230-
else:
240+
try:
241+
look_up_option(self.device_setting["MN_START_METHOD"], ["bcprun"])
242+
except ValueError as err:
231243
raise NotImplementedError(
232-
f"{self.device_setting['MN_START_METHOD']} is not supported yet. "
244+
f"{self.device_setting['MN_START_METHOD']} is not supported yet."
233245
"Try modify BundleAlgo._run_cmd for your cluster."
234-
)
235-
else:
236-
cmd_list = cmd.split()
237-
238-
_idx = 0
239-
for _idx, c in enumerate(cmd_list):
240-
if "=" not in c: # remove variable assignments before the command such as "OMP_NUM_THREADS=1"
241-
break
242-
cmd_list = cmd_list[_idx:]
246+
) from err
243247

244-
logger.info(f"Launching: {' '.join(cmd_list)}")
245-
246-
return run_cmd(cmd_list, env=ps_environ, check=True)
248+
return _run_cmd_bcprun(cmd, n=self.device_setting["NUM_NODES"], p=self.device_setting["n_devices"])
249+
elif int(self.device_setting["n_devices"]) > 1:
250+
return _run_cmd_torchrun(
251+
cmd, nnodes=1, nproc_per_node=self.device_setting["n_devices"], env=ps_environ, check=True
252+
)
253+
else:
254+
return run_cmd(cmd.split(), env=ps_environ, check=True)
247255

248256
def train(
249257
self, train_params: None | dict = None, device_setting: None | dict = None

monai/apps/auto3dseg/ensemble_builder.py

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,19 @@
2626
from monai.apps.auto3dseg.utils import get_name_from_algo_id, import_bundle_algo_history
2727
from monai.apps.utils import get_logger
2828
from monai.auto3dseg import concat_val_to_np
29-
from monai.auto3dseg.utils import datafold_read
29+
from monai.auto3dseg.utils import (
30+
_prepare_cmd_bcprun,
31+
_prepare_cmd_torchrun,
32+
_run_cmd_bcprun,
33+
_run_cmd_torchrun,
34+
datafold_read,
35+
)
3036
from monai.bundle import ConfigParser
3137
from monai.data import partition_dataset
3238
from monai.transforms import MeanEnsemble, SaveImage, VoteEnsemble
3339
from monai.utils import RankFilter, deprecated_arg
3440
from monai.utils.enums import AlgoKeys
35-
from monai.utils.misc import check_kwargs_exist_in_class_init, prob2class, run_cmd
41+
from monai.utils.misc import check_kwargs_exist_in_class_init, prob2class
3642
from monai.utils.module import look_up_option, optional_import
3743

3844
tqdm, has_tqdm = optional_import("tqdm", name="tqdm")
@@ -642,34 +648,20 @@ def _create_cmd(self) -> None:
642648
# define env for subprocess
643649
ps_environ = os.environ.copy()
644650
ps_environ["CUDA_VISIBLE_DEVICES"] = str(self.device_setting["CUDA_VISIBLE_DEVICES"])
645-
cmd: str | None = self.device_setting["CMD_PREFIX"] # type: ignore
646-
if cmd is not None and not str(cmd).endswith(" "):
647-
cmd += " "
648651
if int(self.device_setting["NUM_NODES"]) > 1:
649652
if self.device_setting["MN_START_METHOD"] != "bcprun":
650653
raise NotImplementedError(
651654
f"{self.device_setting['MN_START_METHOD']} is not supported yet. "
652655
"Try modify EnsembleRunner._create_cmd for your cluster."
653656
)
654657
logger.info(f"Ensembling on {self.device_setting['NUM_NODES']} nodes!")
655-
cmd = "python " if cmd is None else cmd
656-
cmd = f"{cmd} -m {base_cmd}"
657-
cmd_list = [
658-
"bcprun",
659-
"-n",
660-
str(self.device_setting["NUM_NODES"]),
661-
"-p",
662-
str(self.device_setting["n_devices"]),
663-
"-c",
664-
cmd,
665-
]
658+
cmd = _prepare_cmd_bcprun("-m " + base_cmd, cmd_prefix=f"{self.device_setting['CMD_PREFIX']}")
659+
_run_cmd_bcprun(cmd, n=self.device_setting["NUM_NODES"], p=self.device_setting["n_devices"])
666660

667661
else:
668662
logger.info(f"Ensembling using {self.device_setting['n_devices']} GPU!")
669-
if cmd is None:
670-
cmd = f"torchrun --nnodes={1:d} --nproc_per_node={self.device_setting['n_devices']:d} "
671-
cmd = f"{cmd} -m {base_cmd}"
672-
cmd_list = cmd.split()
673-
674-
run_cmd(cmd_list, env=ps_environ, check=True)
663+
cmd = _prepare_cmd_torchrun("-m " + base_cmd)
664+
_run_cmd_torchrun(
665+
cmd, nnodes=1, nproc_per_node=self.device_setting["n_devices"], env=ps_environ, check=True
666+
)
675667
return

monai/auto3dseg/utils.py

Lines changed: 150 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import logging
1515
import os
1616
import pickle
17+
import subprocess
1718
import sys
1819
from copy import deepcopy
1920
from numbers import Number
@@ -28,7 +29,7 @@
2829
from monai.config import PathLike
2930
from monai.data.meta_tensor import MetaTensor
3031
from monai.transforms import CropForeground, ToCupy
31-
from monai.utils import min_version, optional_import
32+
from monai.utils import min_version, optional_import, run_cmd
3233

3334
__all__ = [
3435
"get_foreground_image",
@@ -372,3 +373,151 @@ def algo_from_pickle(pkl_filename: str, template_path: PathLike | None = None, *
372373
algo_meta_data.update({k: v})
373374

374375
return algo, algo_meta_data
376+
377+
378+
def list_to_python_fire_arg_str(args: list) -> str:
379+
"""
380+
Convert a list of arguments to a string that can be used in python-fire.
381+
382+
Args:
383+
args: the list of arguments.
384+
385+
Returns:
386+
the string that can be used in python-fire.
387+
"""
388+
args_str = ",".join([str(arg) for arg in args])
389+
return f"'{args_str}'"
390+
391+
392+
def check_and_set_optional_args(params: dict) -> str:
393+
""" """
394+
cmd_mod_opt = ""
395+
for k, v in params.items():
396+
if isinstance(v, dict):
397+
raise ValueError("Nested dict is not supported.")
398+
elif isinstance(v, list):
399+
v = list_to_python_fire_arg_str(v)
400+
cmd_mod_opt += f" --{k} {str(v)}"
401+
return cmd_mod_opt
402+
403+
404+
def _prepare_cmd_default(cmd: str, cmd_prefix: str | None = None, **kwargs: Any) -> str:
405+
"""
406+
Prepare the command for subprocess to run the script with the given arguments.
407+
408+
Args:
409+
cmd: the command or script to run in the distributed job.
410+
cmd_prefix: the command prefix to run the script, e.g., "python", "python -m", "python3", "/opt/conda/bin/python3.8 ".
411+
kwargs: the keyword arguments to be passed to the script.
412+
413+
Returns:
414+
the command to run with ``subprocess``.
415+
416+
Examples:
417+
To prepare a subprocess command
418+
"python train.py run -k --config 'a,b'", the function can be called as
419+
- _prepare_cmd_default("train.py run -k", config=['a','b'])
420+
- _prepare_cmd_default("train.py run -k --config 'a,b'")
421+
422+
"""
423+
params = kwargs.copy()
424+
425+
cmd_prefix = cmd_prefix or "python"
426+
427+
if not cmd_prefix.endswith(" "):
428+
cmd_prefix += " " # ensure a space after the command prefix so that the script can be appended
429+
430+
return cmd_prefix + cmd + check_and_set_optional_args(params)
431+
432+
433+
def _prepare_cmd_torchrun(cmd: str, **kwargs: Any) -> str:
434+
"""
435+
Prepare the command for multi-gpu/multi-node job execution using torchrun.
436+
437+
Args:
438+
cmd: the command or script to run in the distributed job.
439+
kwargs: the keyword arguments to be passed to the script.
440+
441+
Returns:
442+
the command to append to ``torchrun``
443+
444+
Examples:
445+
For command "torchrun --nnodes=1 --nproc_per_node=8 train.py run -k --config 'a,b'",
446+
it only prepares command after the torchrun arguments, i.e., "train.py run -k --config 'a,b'".
447+
The function can be called as
448+
- _prepare_cmd_torchrun("train.py run -k", config=['a','b'])
449+
- _prepare_cmd_torchrun("train.py run -k --config 'a,b'")
450+
"""
451+
params = kwargs.copy()
452+
return cmd + check_and_set_optional_args(params)
453+
454+
455+
def _prepare_cmd_bcprun(cmd: str, cmd_prefix: str | None = None, **kwargs: Any) -> str:
456+
"""
457+
Prepare the command for distributed job running using bcprun.
458+
459+
Args:
460+
script: the script to run in the distributed job.
461+
cmd_prefix: the command prefix to run the script, e.g., "python".
462+
kwargs: the keyword arguments to be passed to the script.
463+
464+
Returns:
465+
The command to run the script in the distributed job.
466+
467+
Examples:
468+
For command "bcprun -n 2 -p 8 -c python train.py run -k --config 'a,b'",
469+
it only prepares command after the bcprun arguments, i.e., "train.py run -k --config 'a,b'".
470+
the function can be called as
471+
- _prepare_cmd_bcprun("train.py run -k", config=['a','b'], n=2, p=8)
472+
- _prepare_cmd_bcprun("train.py run -k --config 'a,b'", n=2, p=8)
473+
"""
474+
475+
return _prepare_cmd_default(cmd, cmd_prefix=cmd_prefix, **kwargs)
476+
477+
478+
def _run_cmd_torchrun(cmd: str, **kwargs: Any) -> subprocess.CompletedProcess:
479+
"""
480+
Run the command with torchrun.
481+
482+
Args:
483+
cmd: the command to run. Typically it is prepared by ``_prepare_cmd_torchrun``.
484+
kwargs: the keyword arguments to be passed to the ``torchrun``.
485+
486+
Return:
487+
the return code of the subprocess command.
488+
"""
489+
params = kwargs.copy()
490+
491+
cmd_list = cmd.split()
492+
493+
# append arguments to the command list
494+
torchrun_list = ["torchrun"]
495+
required_args = ["nnodes", "nproc_per_node"]
496+
for arg in required_args:
497+
if arg not in params:
498+
raise ValueError(f"Missing required argument {arg} for torchrun.")
499+
torchrun_list += [f"--{arg}", str(params.pop(arg))]
500+
torchrun_list += cmd_list
501+
return run_cmd(torchrun_list, **params)
502+
503+
504+
def _run_cmd_bcprun(cmd: str, **kwargs: Any) -> subprocess.CompletedProcess:
505+
"""
506+
Run the command with bcprun.
507+
508+
Args:
509+
cmd: the command to run. Typically it is prepared by ``_prepare_cmd_bcprun``.
510+
kwargs: the keyword arguments to be passed to the ``bcprun``.
511+
512+
Returns:
513+
the return code of the subprocess command.
514+
"""
515+
params = kwargs.copy()
516+
cmd_list = ["bcprun"]
517+
required_args = ["n", "p"]
518+
for arg in required_args:
519+
if arg not in params:
520+
raise ValueError(f"Missing required argument {arg} for bcprun.")
521+
cmd_list += [f"-{arg}", str(params.pop(arg))]
522+
cmd_list.extend(["-c", cmd])
523+
return run_cmd(cmd_list, **params)

0 commit comments

Comments
 (0)