Skip to content
37 changes: 14 additions & 23 deletions reframe/core/schedulers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def log(self, message, level=DEBUG2):
getlogger().log(level, f'[S] {self.registered_name}: {message}')


def filter_nodes_by_state(nodelist, state):
def filter_nodes_by_state(nodelist, state, scheduler):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if made this a scheduler's method?

'''Filter nodes by their state

:arg nodelist: List of :class:`Node` instances to filter.
Expand All @@ -178,11 +178,13 @@ def filter_nodes_by_state(nodelist, state):
allowed_states = state.split('|')
final_nodelist = set()
for s in allowed_states:
final_nodelist.update(filter_nodes_by_state(nodelist, s))
final_nodelist.update(
filter_nodes_by_state(nodelist, s, scheduler)
)

nodelist = final_nodelist
elif state == 'avail':
nodelist = {n for n in nodelist if n.is_avail()}
nodelist = {n for n in nodelist if scheduler.is_node_avail(n)}
elif state != 'all':
if state.endswith('*'):
# non-exclusive state match
Expand Down Expand Up @@ -618,19 +620,22 @@ def guess_num_tasks(self):
f'[F] Total available nodes: {len(available_nodes)}'
)

available_nodes = self.scheduler.filternodes(self, available_nodes)
getlogger().debug(
f'[F] Total available after scheduler filter: '
f'{len(available_nodes)}'
)

# Try to guess the number of tasks now
available_nodes = filter_nodes_by_state(
available_nodes, self.sched_flex_alloc_nodes.lower()
available_nodes,
self.sched_flex_alloc_nodes.lower(),
self.scheduler
)
getlogger().debug(
f'[F] Total available in state='
f'{self.sched_flex_alloc_nodes.lower()}: {len(available_nodes)}'
)
available_nodes = self.scheduler.filternodes(self, available_nodes)
getlogger().debug(
f'[F] Total available after scheduler filter: '
f'{len(available_nodes)}'
)
return len(available_nodes) * num_tasks_per_node

def submit(self):
Expand Down Expand Up @@ -694,17 +699,6 @@ def in_state(self, state):
:class:`False` otherwise.
'''

@abc.abstractmethod
def is_avail(self):
'''Check whether the node is available for scheduling jobs.'''

def is_down(self):
'''Check whether node is down.

This is the inverse of :func:`is_avail`.
'''
return not self.is_avail()


class AlwaysIdleNode(Node):
def __init__(self, name):
Expand All @@ -715,9 +709,6 @@ def __init__(self, name):
def name(self):
return self._name

def is_avail(self):
return True

def in_statex(self, state):
return state.lower() == self._state

Expand Down
44 changes: 27 additions & 17 deletions reframe/core/schedulers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ def __init__(self):
self._sched_access_in_submit = self.get_option(
'sched_access_in_submit'
)
self.node_available_states = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to be public:

Suggested change
self.node_available_states = {
self._available_states = {

'ALLOCATED',
'COMPLETING',
'IDLE',
'PLANNED',
'RESERVED'
}

def make_job(self, *args, **kwargs):
return _SlurmJob(*args, **kwargs)
Expand Down Expand Up @@ -317,7 +324,7 @@ def submit(self, job):

def allnodes(self):
try:
completed = _run_strict('scontrol -a show -o nodes')
completed = _run_strict('scontrol -a show -o node')
except SpawnedProcessError as e:
raise JobSchedulerError(
'could not retrieve node information') from e
Expand Down Expand Up @@ -429,14 +436,24 @@ def filternodes(self, job, nodes):

def _get_reservation_nodes(self, reservation):
completed = _run_strict('scontrol -a show res %s' % reservation)
node_match = re.search(r'(Nodes=\S+)', completed.stdout)
node_match = re.search(r'Nodes=(\S+)', completed.stdout)
if node_match:
reservation_nodes = node_match[1]
else:
raise JobSchedulerError("could not extract the node names for "
"reservation '%s'" % reservation)

completed = _run_strict('scontrol -a show -o %s' % reservation_nodes)
flags_match = re.search(r'Flags=(\S+)', completed.stdout)
if flags_match:
if 'MAINT' in flags_match.group(1).split(','):
self.node_available_states.add('MAINTENANCE')
else:
self.log(f"could not extract the reservation flags for "
f"reservation '{reservation}'")
Comment on lines +450 to +452
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if instead of logging only the else here, we logged the full completed.stdout once we get it?

self.log(f'reservation info\n{completed.stdout}')


completed = _run_strict(
f'scontrol -a show -o node {reservation_nodes}'
)
node_descriptions = completed.stdout.splitlines()
return _create_nodes(node_descriptions)

Expand Down Expand Up @@ -594,7 +611,7 @@ def _do_cancel_if_blocked(self, job, reason_descr):
self.log(f'Checking if nodes {node_names!r} '
f'are indeed unavailable')
nodes = self._get_nodes_by_name(node_names)
if not any(n.is_down() for n in nodes):
if not any(self.is_node_down(n) for n in nodes):
return

self.cancel(job)
Expand Down Expand Up @@ -630,6 +647,12 @@ def cancel(self, job):
def finished(self, job):
return slurm_state_completed(job.state)

def is_node_avail(self, node):
return node.states <= self.node_available_states

def is_node_down(self, node):
return not self.is_node_avail(node)


@register_scheduler('squeue')
class SqueueJobScheduler(SlurmJobScheduler):
Expand Down Expand Up @@ -734,19 +757,6 @@ def in_state(self, state):
def in_statex(self, state):
return self._states == set(state.upper().split('+'))

def is_avail(self):
available_states = {
'ALLOCATED',
'COMPLETING',
'IDLE',
'PLANNED',
'RESERVED'
}
return self._states <= available_states

def is_down(self):
return not self.is_avail()

def satisfies(self, slurm_constraint):
# Convert the Slurm constraint to a Python expression and evaluate it,
# but restrict our syntax to accept only AND or OR constraints and
Expand Down
6 changes: 5 additions & 1 deletion reframe/frontend/testgenerators.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ def getallnodes(state, jobs_cli_options=None):
f'Total available nodes for {part.name}: {len(available_nodes)}'
)

available_nodes = filter_nodes_by_state(available_nodes, state)
available_nodes = filter_nodes_by_state(
available_nodes,
state,
part.scheduler
)
nodes[part.fullname] = [n.name for n in available_nodes]

return nodes
Expand Down
Loading
Loading