1111
1212# Import packages
1313from multiprocessing import Process , Pool , cpu_count , pool
14+ import threading
1415from traceback import format_exception
1516import sys
1617
2021from ... import logging , config
2122from ...utils .misc import str2bool
2223from ..engine import MapNode
23- from ..plugins import semaphore_singleton
2424from .base import (DistributedPluginBase , report_crash )
2525
2626# Init logger
2727logger = logging .getLogger ('workflow' )
2828
2929# Run node
30- def run_node (node , updatehash ):
30+ def run_node (node , updatehash , taskid ):
3131 """Function to execute node.run(), catch and log any errors and
3232 return the result dictionary
3333
@@ -45,7 +45,7 @@ def run_node(node, updatehash):
4545 """
4646
4747 # Init variables
48- result = dict (result = None , traceback = None )
48+ result = dict (result = None , traceback = None , taskid = taskid )
4949
5050 # Try and execute the node via node.run()
5151 try :
@@ -77,10 +77,6 @@ class NonDaemonPool(pool.Pool):
7777 Process = NonDaemonProcess
7878
7979
80- def release_lock (args ):
81- semaphore_singleton .semaphore .release ()
82-
83-
8480# Get total system RAM
8581def get_system_total_memory_gb ():
8682 """Function to get the total RAM of the running system in GB
@@ -136,12 +132,18 @@ def __init__(self, plugin_args=None):
136132 # Init variables and instance attributes
137133 super (MultiProcPlugin , self ).__init__ (plugin_args = plugin_args )
138134 self ._taskresult = {}
135+ self ._task_obj = {}
139136 self ._taskid = 0
140137 non_daemon = True
141138 self .plugin_args = plugin_args
142139 self .processors = cpu_count ()
143140 self .memory_gb = get_system_total_memory_gb ()* 0.9 # 90% of system memory
144141
142+ self ._timeout = 2.0
143+ self ._event = threading .Event ()
144+
145+
146+
145147 # Check plugin args
146148 if self .plugin_args :
147149 if 'non_daemon' in self .plugin_args :
@@ -150,6 +152,9 @@ def __init__(self, plugin_args=None):
150152 self .processors = self .plugin_args ['n_procs' ]
151153 if 'memory_gb' in self .plugin_args :
152154 self .memory_gb = self .plugin_args ['memory_gb' ]
155+
156+ logger .debug ("MultiProcPlugin starting %d threads in pool" % (self .processors ))
157+
153158 # Instantiate different thread pools for non-daemon processes
154159 if non_daemon :
155160 # run the execution using the non-daemon pool subclass
@@ -159,14 +164,23 @@ def __init__(self, plugin_args=None):
159164
160165 def _wait (self ):
161166 if len (self .pending_tasks ) > 0 :
162- semaphore_singleton .semaphore .acquire ()
167+ if self ._config ['execution' ]['poll_sleep_duration' ]:
168+ self ._timeout = float (self ._config ['execution' ]['poll_sleep_duration' ])
169+ sig_received = self ._event .wait (self ._timeout )
170+ if not sig_received :
171+ logger .debug ('MultiProcPlugin timeout before signal received. Deadlock averted??' )
172+ self ._event .clear ()
173+
174+ def _async_callback (self , args ):
175+ self ._taskresult [args ['taskid' ]]= args
176+ self ._event .set ()
163177
164178 def _get_result (self , taskid ):
165179 if taskid not in self ._taskresult :
166- raise RuntimeError ( 'Multiproc task %d not found' % taskid )
167- if not self . _taskresult [ taskid ]. ready () :
168- return None
169- return self . _taskresult [ taskid ]. get ()
180+ result = None
181+ else :
182+ result = self . _taskresult [ taskid ]
183+ return result
170184
171185 def _report_crash (self , node , result = None ):
172186 if result and result ['traceback' ]:
@@ -178,36 +192,50 @@ def _report_crash(self, node, result=None):
178192 return report_crash (node )
179193
180194 def _clear_task (self , taskid ):
181- del self ._taskresult [taskid ]
195+ del self ._task_obj [taskid ]
182196
183197 def _submit_job (self , node , updatehash = False ):
184198 self ._taskid += 1
185199 if hasattr (node .inputs , 'terminal_output' ):
186200 if node .inputs .terminal_output == 'stream' :
187201 node .inputs .terminal_output = 'allatonce'
188202
189- self ._taskresult [self ._taskid ] = \
203+ self ._task_obj [self ._taskid ] = \
190204 self .pool .apply_async (run_node ,
191- (node , updatehash ),
192- callback = release_lock )
205+ (node , updatehash , self . _taskid ),
206+ callback = self . _async_callback )
193207 return self ._taskid
194208
209+ def _close (self ):
210+ self .pool .close ()
211+ return True
212+
195213 def _send_procs_to_workers (self , updatehash = False , graph = None ):
196214 """ Sends jobs to workers when system resources are available.
197215 Check memory (gb) and cores usage before running jobs.
198216 """
199217 executing_now = []
200218
201219 # Check to see if a job is available
202- jobids = np .flatnonzero ((self .proc_pending == True ) & \
220+ currently_running_jobids = np .flatnonzero ((self .proc_pending == True ) & \
203221 (self .depidx .sum (axis = 0 ) == 0 ).__array__ ())
204222
205223 # Check available system resources by summing all threads and memory used
206224 busy_memory_gb = 0
207225 busy_processors = 0
208- for jobid in jobids :
209- busy_memory_gb += self .procs [jobid ]._interface .estimated_memory_gb
210- busy_processors += self .procs [jobid ]._interface .num_threads
226+ for jobid in currently_running_jobids :
227+ if self .procs [jobid ]._interface .estimated_memory_gb <= self .memory_gb and \
228+ self .procs [jobid ]._interface .num_threads <= self .processors :
229+
230+ busy_memory_gb += self .procs [jobid ]._interface .estimated_memory_gb
231+ busy_processors += self .procs [jobid ]._interface .num_threads
232+
233+ else :
234+ raise ValueError ("Resources required by jobid %d (%f GB, %d threads)"
235+ "exceed what is available on the system (%f GB, %d threads)" % (jobid ,
236+ self .procs [jobid ].__interface .estimated_memory_gb ,
237+ self .procs [jobid ].__interface .num_threads ,
238+ self .memory_gb ,self .processors ))
211239
212240 free_memory_gb = self .memory_gb - busy_memory_gb
213241 free_processors = self .processors - busy_processors
@@ -271,8 +299,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
271299 hash_exists , _ , _ , _ = self .procs [
272300 jobid ].hash_exists ()
273301 logger .debug ('Hash exists %s' % str (hash_exists ))
274- if (hash_exists and (self .procs [jobid ].overwrite == False or \
275- (self .procs [jobid ].overwrite == None and \
302+ if (hash_exists and (self .procs [jobid ].overwrite == False or
303+ (self .procs [jobid ].overwrite == None and
276304 not self .procs [jobid ]._interface .always_run ))):
277305 self ._task_finished_cb (jobid )
278306 self ._remove_node_dirs ()
@@ -299,7 +327,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
299327 self ._remove_node_dirs ()
300328
301329 else :
302- logger .debug ('submitting %s' % str (jobid ))
330+ logger .debug ('MultiProcPlugin submitting %s' % str (jobid ))
303331 tid = self ._submit_job (deepcopy (self .procs [jobid ]),
304332 updatehash = updatehash )
305333 if tid is None :
0 commit comments