Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion docs/config_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,23 @@ System Partition Configuration
List of hosts in a partition that uses the ``ssh`` scheduler.


.. py:attribute:: systems.partitions.sched_options.slurm_multi_cluster_mode

:required: No
:default: ``[]``

List of Slurm clusters to poll for submitted jobs.

If empty, only the local cluster is considered.
If the single value ``all`` is passed, then all clusters will be considered.
This is translated directly to Slurm's ``-M`` option passed to the ``sacct`` or ``squeue`` commands.
If set, the ``-M`` option will also be passed in the partition's :attr:`~config.systems.partitions.access` options.

This option is relevant only for the Slurm backends.

.. versionadded:: 4.10


.. py:attribute:: systems.partitions.sched_options.ignore_reqnodenotavail

:required: No
Expand Down Expand Up @@ -1647,7 +1664,7 @@ The additional properties for the ``httpjson`` handler are the following:
These may depend on the server configuration.

.. note::
If you specify an authorization header here, it will be evaluated at the start of the test session and potentially expire.
If you specify an authorization header here, it will be evaluated at the start of the test session and potentially expire.
Consider using the :attr:`~config.logging.handlers_perflog..httpjson..authorization_header` parameter instead for dynamic authorization headers.

.. versionadded:: 4.2
Expand Down
6 changes: 3 additions & 3 deletions reframe/core/schedulers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,13 +370,13 @@ class Job(jsonext.JSONSerializable, metaclass=JobMeta):

# The sched_* arguments are exposed also to the frontend
def __init__(self,
name,
name, *,
workdir='.',
script_filename=None,
stdout=None,
stderr=None,
sched_flex_alloc_nodes=None,
sched_access=[],
sched_access=None,
sched_options=None):

self._cli_options = list(sched_options) if sched_options else []
Expand All @@ -390,7 +390,7 @@ def __init__(self,

# Backend scheduler related information
self._sched_flex_alloc_nodes = sched_flex_alloc_nodes
self._sched_access = sched_access
self._sched_access = list(sched_access) if sched_access else []

# Live job information; to be filled during job's lifetime by the
# scheduler
Expand Down
24 changes: 21 additions & 3 deletions reframe/core/schedulers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ def __init__(self):
self._sched_access_in_submit = self.get_option(
'sched_access_in_submit'
)
self._multi_clusters = self.get_option('slurm_multi_cluster_mode')
self._available_states = {
'ALLOCATED',
'COMPLETING',
Expand All @@ -155,7 +156,22 @@ def __init__(self):
'RESERVED'
}

# Define the base sacct and squeue commands to account for Slurm's
# multiple cluster mode if enabled
self._sacct = 'sacct'
self._squeue = 'squeue'
if self._multi_clusters:
clusters = ','.join(self._multi_clusters)
self._sacct += f' -M {clusters}'
self._squeue += f' -M {clusters}'

def make_job(self, *args, **kwargs):
if self._multi_clusters:
# Inject the `-M` option in case of multiple clusters
sched_access = kwargs.get('sched_access') or []
sched_access += [f'-M {",".join(self._multi_clusters)}']
kwargs['sched_access'] = sched_access

return _SlurmJob(*args, **kwargs)

def _format_option(self, var, option):
Expand Down Expand Up @@ -491,7 +507,7 @@ def poll(self, *jobs):
)
try:
completed = _run_strict(
f'sacct -S {t_start} -P '
f'{self._sacct} -S {t_start} -P '
f'-j {",".join(job.jobid for job in jobs)} '
f'-o jobid,state,exitcode,end,nodelist'
)
Expand Down Expand Up @@ -570,7 +586,9 @@ def _cancel_if_blocked(self, job, reasons=None):
return

if not reasons:
completed = osext.run_command('squeue -h -j %s -o %%r' % job.jobid)
completed = osext.run_command(
f'{self._squeue} -h -j {job.jobid} -o %r'
)
reasons = completed.stdout.splitlines()
if not reasons:
# Can't retrieve job's state. Perhaps it has finished already
Expand Down Expand Up @@ -677,7 +695,7 @@ def poll(self, *jobs):
# finished already, squeue might return an error about an invalid
# job id.
completed = osext.run_command(
f'squeue -h -j {",".join(job.jobid for job in jobs)} '
f'{self._squeue} -h -j {",".join(job.jobid for job in jobs)} '
f'-o "%%i|%%T|%%N|%%r"'
)

Expand Down
5 changes: 5 additions & 0 deletions reframe/schemas/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@
"type": "array",
"items": {"type": "string"}
},
"slurm_multi_cluster_mode": {
"type": "array",
"items": {"type": "string"}
},
"sched_access_in_submit": {"type": "boolean"},
"unqualified_hostnames": {"type": "boolean"},
"use_nodes_option": {"type": "boolean"}
Expand Down Expand Up @@ -708,6 +712,7 @@
"systems*/sched_options/job_submit_timeout": 60,
"systems*/sched_options/max_sacct_failures": 3,
"systems*/sched_options/sched_access_in_submit": false,
"systems*/sched_options/slurm_multi_cluster_mode": [],
"systems*/sched_options/ssh_hosts": [],
"systems*/sched_options/resubmit_on_errors": [],
"systems*/sched_options/unqualified_hostnames": false,
Expand Down
31 changes: 26 additions & 5 deletions unittests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,25 @@ def exec_ctx(make_exec_ctx, scheduler):

@pytest.fixture
def make_job(scheduler, launcher, tmp_path):
def _make_job(sched_opts=None, **jobargs):
def _make_job(sched_opts=None, config_opts=None, **jobargs):
sched_type = scheduler

class PatchedScheduler(sched_type):
def get_option(self, name):
try:
return config_opts[name]
except KeyError:
return super().get_option(name)

if config_opts:
sched_type = PatchedScheduler

if sched_opts:
sched = scheduler(**sched_opts)
elif scheduler.registered_name == 'ssh':
sched = scheduler(hosts=['localhost'])
sched = sched_type(**sched_opts)
elif sched_type.registered_name == 'ssh':
sched = sched_type(hosts=['localhost'])
else:
sched = scheduler()
sched = sched_type()

return Job.create(
sched, launcher(),
Expand Down Expand Up @@ -487,6 +499,15 @@ def test_prepare_nodes_option_minimal(make_exec_ctx, make_job, slurm_only):
assert re.search(r'--nodes=16', fp.read()) is not None


def test_prepare_multi_cluster(make_job, slurm_only):
job = make_job(config_opts={
'slurm_multi_cluster_mode': ['cluster1', 'cluster2']
})
prepare_job(job)
with open(job.script_filename) as fp:
assert re.search(r'-M cluster1,cluster2', fp.read()) is not None


def test_submit(make_job, exec_ctx):
minimal_job = make_job(sched_access=exec_ctx.access)
prepare_job(minimal_job)
Expand Down
Loading