Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 14 additions & 23 deletions reframe/core/schedulers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def log(self, message, level=DEBUG2):
getlogger().log(level, f'[S] {self.registered_name}: {message}')


def filter_nodes_by_state(nodelist, state):
def filter_nodes_by_state(nodelist, state, scheduler):
'''Filter nodes by their state

:arg nodelist: List of :class:`Node` instances to filter.
Expand All @@ -178,11 +178,13 @@ def filter_nodes_by_state(nodelist, state):
allowed_states = state.split('|')
final_nodelist = set()
for s in allowed_states:
final_nodelist.update(filter_nodes_by_state(nodelist, s))
final_nodelist.update(
filter_nodes_by_state(nodelist, s, scheduler)
)

nodelist = final_nodelist
elif state == 'avail':
nodelist = {n for n in nodelist if n.is_avail()}
nodelist = {n for n in nodelist if scheduler.is_node_avail(n)}
elif state != 'all':
if state.endswith('*'):
# non-exclusive state match
Expand Down Expand Up @@ -618,19 +620,22 @@ def guess_num_tasks(self):
f'[F] Total available nodes: {len(available_nodes)}'
)

available_nodes = self.scheduler.filternodes(self, available_nodes)
getlogger().debug(
f'[F] Total available after scheduler filter: '
f'{len(available_nodes)}'
)

# Try to guess the number of tasks now
available_nodes = filter_nodes_by_state(
available_nodes, self.sched_flex_alloc_nodes.lower()
available_nodes,
self.sched_flex_alloc_nodes.lower(),
self.scheduler
)
getlogger().debug(
f'[F] Total available in state='
f'{self.sched_flex_alloc_nodes.lower()}: {len(available_nodes)}'
)
available_nodes = self.scheduler.filternodes(self, available_nodes)
getlogger().debug(
f'[F] Total available after scheduler filter: '
f'{len(available_nodes)}'
)
return len(available_nodes) * num_tasks_per_node

def submit(self):
Expand Down Expand Up @@ -694,17 +699,6 @@ def in_state(self, state):
:class:`False` otherwise.
'''

@abc.abstractmethod
def is_avail(self):
'''Check whether the node is available for scheduling jobs.'''

def is_down(self):
'''Check whether node is down.

This is the inverse of :func:`is_avail`.
'''
return not self.is_avail()


class AlwaysIdleNode(Node):
def __init__(self, name):
Expand All @@ -715,9 +709,6 @@ def __init__(self, name):
def name(self):
return self._name

def is_avail(self):
return True

def in_statex(self, state):
return state.lower() == self._state

Expand Down
36 changes: 22 additions & 14 deletions reframe/core/schedulers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ def __init__(self):
self._sched_access_in_submit = self.get_option(
'sched_access_in_submit'
)
self.node_available_states = {
'ALLOCATED',
'COMPLETING',
'IDLE',
'PLANNED',
'RESERVED'
}

def make_job(self, *args, **kwargs):
return _SlurmJob(*args, **kwargs)
Expand Down Expand Up @@ -436,6 +443,14 @@ def _get_reservation_nodes(self, reservation):
raise JobSchedulerError("could not extract the node names for "
"reservation '%s'" % reservation)

flags_match = re.search(r'Flags=(\S+)', completed.stdout)
if flags_match:
if 'MAINT' in flags_match.group(1).split(','):
self.node_available_states.add('MAINTENANCE')
else:
self.log(f"could not extract the reservation flags for "
f"reservation '{reservation}'")

completed = _run_strict('scontrol -a show -o %s' % reservation_nodes)
node_descriptions = completed.stdout.splitlines()
return _create_nodes(node_descriptions)
Expand Down Expand Up @@ -594,7 +609,7 @@ def _do_cancel_if_blocked(self, job, reason_descr):
self.log(f'Checking if nodes {node_names!r} '
f'are indeed unavailable')
nodes = self._get_nodes_by_name(node_names)
if not any(n.is_down() for n in nodes):
if not any(self.is_node_down(n) for n in nodes):
return

self.cancel(job)
Expand Down Expand Up @@ -630,6 +645,12 @@ def cancel(self, job):
def finished(self, job):
return slurm_state_completed(job.state)

def is_node_avail(self, node):
return node.states <= self.node_available_states

def is_node_down(self, node):
return not self.is_node_avail(node)


@register_scheduler('squeue')
class SqueueJobScheduler(SlurmJobScheduler):
Expand Down Expand Up @@ -734,19 +755,6 @@ def in_state(self, state):
def in_statex(self, state):
return self._states == set(state.upper().split('+'))

def is_avail(self):
available_states = {
'ALLOCATED',
'COMPLETING',
'IDLE',
'PLANNED',
'RESERVED'
}
return self._states <= available_states

def is_down(self):
return not self.is_avail()

def satisfies(self, slurm_constraint):
# Convert the Slurm constraint to a Python expression and evaluate it,
# but restrict our syntax to accept only AND or OR constraints and
Expand Down
6 changes: 5 additions & 1 deletion reframe/frontend/testgenerators.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ def getallnodes(state, jobs_cli_options=None):
f'Total available nodes for {part.name}: {len(available_nodes)}'
)

available_nodes = filter_nodes_by_state(available_nodes, state)
available_nodes = filter_nodes_by_state(
available_nodes,
state,
part.scheduler
)
nodes[part.fullname] = [n.name for n in available_nodes]

return nodes
Expand Down
11 changes: 7 additions & 4 deletions unittests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ def slurm_only(scheduler):
if scheduler.registered_name not in ('slurm', 'squeue'):
pytest.skip('test is relevant only for Slurm backends')

return scheduler


@pytest.fixture
def local_only(scheduler):
Expand Down Expand Up @@ -1455,7 +1457,8 @@ def test_slurm_node_in_state(slurm_node_allocated,

def test_slurm_node_is_down(slurm_node_allocated,
slurm_node_idle,
slurm_node_nopart):
assert not slurm_node_allocated.is_down()
assert not slurm_node_idle.is_down()
assert slurm_node_nopart.is_down()
slurm_node_nopart,
slurm_only):
assert not slurm_only().is_node_down(slurm_node_allocated)
assert not slurm_only().is_node_down(slurm_node_idle)
assert slurm_only().is_node_down(slurm_node_nopart)
Loading