-
Notifications
You must be signed in to change notification settings - Fork 74
[#722] fix segfault and hung threads on KeyboardIinterrupt during parallel get #728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
a3dd06e
481952c
1213d2b
59fd131
ba0407a
dddcc95
b96f805
ed25eda
9d2ff7a
0aaf747
ade42ea
8272b5a
7584b71
368e08e
0765f71
9ec506b
1b42f97
1740e80
c5824cc
92474be
df05ce1
14037f9
107ef8d
f7b5a73
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,13 +9,39 @@ | |
| import concurrent.futures | ||
| import threading | ||
| import multiprocessing | ||
| from typing import List, Union | ||
| from typing import List, Union, Any | ||
| import weakref | ||
|
|
||
| from irods.data_object import iRODSDataObject | ||
| from irods.exception import DataObjectDoesNotExist | ||
| import irods.keywords as kw | ||
| from queue import Queue, Full, Empty | ||
|
|
||
| paths_active: weakref.WeakValueDictionary[str,"AsyncNotify"] = weakref.WeakValueDictionary() | ||
| transfer_managers: weakref.WeakKeyDictionary["_Multipart_close_manager", Any] = weakref.WeakKeyDictionary() | ||
|
|
||
| def abort_parallel_transfers(dry_run=False, filter_function=None): | ||
| """ | ||
| If no explicit arguments are given, all ongoing parallel puts and gets are cancelled | ||
| as soon as possible. The corresponding threads are signalled to exit by calling the | ||
| quit() method on their corresponding transfer-manager objects. | ||
|
|
||
| Setting dry_run=True results in no such cancellation being performed, but a dictionary | ||
| object is returned containing, as its keys, the transfer-manager which would have been so affected. | ||
|
|
||
| filter_function is usually left to its default value of None. Otherwise the effect will be to | ||
| limit which transfers are to be aborted (or returned in a call with dry_run=True). | ||
| """ | ||
| mgrs = dict(filter(filter_function, transfer_managers.items())) | ||
| if not dry_run: | ||
| for mgr, item in mgrs.items(): | ||
| if isinstance(item,tuple): | ||
| quit_func,args = item[:2] | ||
| quit_func(*args) | ||
| else: | ||
| mgr.quit() | ||
| return mgrs | ||
|
|
||
|
|
||
| logger = logging.getLogger(__name__) | ||
| _nullh = logging.NullHandler() | ||
|
|
@@ -91,9 +117,11 @@ def __init__( | |
| for future in self._futures: | ||
| future.add_done_callback(self) | ||
| else: | ||
| self.__invoke_done_callback() | ||
| self.__invoke_futures_done_logic() | ||
| return | ||
|
|
||
| self.progress = [0, 0] | ||
|
|
||
| if (progress_Queue) and (total is not None): | ||
| self.progress[1] = total | ||
|
|
||
|
|
@@ -112,7 +140,7 @@ def _progress(Q, this): # - thread to update progress indicator | |
|
|
||
| self._progress_fn = _progress | ||
| self._progress_thread = threading.Thread( | ||
| target=self._progress_fn, args=(progress_Queue, self) | ||
| target=self._progress_fn, args=(progress_Queue, self), daemon=True | ||
| ) | ||
| self._progress_thread.start() | ||
|
|
||
|
|
@@ -153,11 +181,14 @@ def __call__( | |
| with self._lock: | ||
| self._futures_done[future] = future.result() | ||
| if len(self._futures) == len(self._futures_done): | ||
| self.__invoke_done_callback() | ||
| # If a future returns None rather than an integer byte count, it has aborted the transfer. | ||
| self.__invoke_futures_done_logic( | ||
| skip_user_callback=(None in self._futures_done.values()) | ||
| ) | ||
|
|
||
| def __invoke_done_callback(self): | ||
| def __invoke_futures_done_logic(self, skip_user_callback=False): | ||
| try: | ||
| if callable(self.done_callback): | ||
| if not skip_user_callback and callable(self.done_callback): | ||
| self.done_callback(self) | ||
| finally: | ||
| self.keep.pop("mgr", None) | ||
|
|
@@ -240,6 +271,12 @@ def _copy_part(src, dst, length, queueObject, debug_info, mgr, updatables=()): | |
| bytecount = 0 | ||
| accum = 0 | ||
| while True and bytecount < length: | ||
| if mgr._quit: | ||
| # Indicate by the return value that we are aborting (this part of) the data transfer. | ||
| # In the great majority of cases, this should be seen by the application as an overall | ||
| # abort of the PUT or GET of the requested object. | ||
| bytecount = None | ||
| break | ||
| buf = src.read(min(COPY_BUF_SIZE, length - bytecount)) | ||
| buf_len = len(buf) | ||
| if 0 == buf_len: | ||
|
|
@@ -274,11 +311,34 @@ class _Multipart_close_manager: | |
|
|
||
| """ | ||
|
|
||
| def __init__(self, initial_io_, exit_barrier_): | ||
| def __init__(self, initial_io_, exit_barrier_, executor = None): | ||
| self._quit = False | ||
| self.exit_barrier = exit_barrier_ | ||
| self.initial_io = initial_io_ | ||
| self.__lock = threading.Lock() | ||
| self.aux = [] | ||
| self.futures = set() | ||
| self.executor = executor | ||
|
|
||
| def add_future(self, future): self.futures.add(future) | ||
|
|
||
| @property | ||
| def active_futures(self): | ||
| return tuple(_ for _ in self.futures if not _.done()) | ||
|
|
||
| def shutdown(self): | ||
| if self.executor: | ||
| self.executor.shutdown(cancel_futures = True) | ||
|
|
||
| def quit(self): | ||
| from irods.session import _exclude_fds_from_auto_close | ||
| _exclude_fds_from_auto_close(self.aux + [self.initial_io]) | ||
|
|
||
| # abort threads. | ||
| self._quit = True | ||
| self.exit_barrier.abort() | ||
| self.shutdown() | ||
| return self.active_futures | ||
|
|
||
| def __contains__(self, Io): | ||
| with self.__lock: | ||
|
|
@@ -297,15 +357,20 @@ def add_io(self, Io): | |
| # synchronizes all of the parallel threads just before exit, so that we know | ||
| # exactly when to perform a finalizing close on the data object | ||
|
|
||
|
|
||
| def remove_io(self, Io): | ||
| is_initial = True | ||
| with self.__lock: | ||
| if Io is not self.initial_io: | ||
| Io.close() | ||
| self.aux.remove(Io) | ||
| is_initial = False | ||
| self.exit_barrier.wait() | ||
| if is_initial: | ||
| broken = False | ||
| try: | ||
| self.exit_barrier.wait() | ||
| except threading.BrokenBarrierError: | ||
| broken = True | ||
| if is_initial and not (broken or self._quit): | ||
| self.finalize() | ||
|
|
||
| def finalize(self): | ||
|
|
@@ -393,7 +458,7 @@ def bytes_range_for_thread(i, num_threads, total_bytes, chunk): | |
| futures = [] | ||
| executor = concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) | ||
| num_threads = min(num_threads, len(ranges)) | ||
| mgr = _Multipart_close_manager(Io, Barrier(num_threads)) | ||
| mgr = _Multipart_close_manager(Io, Barrier(num_threads), executor) | ||
| counter = 1 | ||
| gen_file_handle = lambda: open( | ||
| fname, Operation.disk_file_mode(initial_open=(counter == 1)) | ||
|
|
@@ -405,49 +470,86 @@ def bytes_range_for_thread(i, num_threads, total_bytes, chunk): | |
| "queueObject": queueObject, | ||
| } | ||
|
|
||
| for byte_range in ranges: | ||
| if Io is None: | ||
| Io = session.data_objects.open( | ||
| Data_object.path, | ||
| Operation.data_object_mode(initial_open=False), | ||
| create=False, | ||
| finalize_on_close=False, | ||
| allow_redirect=False, | ||
| **{ | ||
| kw.NUM_THREADS_KW: str(num_threads), | ||
| kw.DATA_SIZE_KW: str(total_size), | ||
| kw.RESC_HIER_STR_KW: hier_str, | ||
| kw.REPLICA_TOKEN_KW: replica_token, | ||
| } | ||
| ) | ||
| mgr.add_io(Io) | ||
| logger.debug("target_host = %s", Io.raw.session.pool.account.host) | ||
| if File is None: | ||
| File = gen_file_handle() | ||
| futures.append( | ||
| executor.submit( | ||
| _io_part, | ||
| Io, | ||
| byte_range, | ||
| File, | ||
| Operation, | ||
| mgr, | ||
| thread_debug_id=str(counter), | ||
| **thread_opts | ||
| ) | ||
| ) | ||
| counter += 1 | ||
| Io = File = None | ||
| transfer_managers[mgr] = (_quit_current_transfer, [id(mgr)]) | ||
|
|
||
| if Operation.isNonBlocking(): | ||
| if queueLength: | ||
| return futures, queueObject, mgr | ||
| try: | ||
| transfer_aborted = False | ||
|
|
||
| for byte_range in ranges: | ||
| if Io is None: | ||
| Io = session.data_objects.open( | ||
| Data_object.path, | ||
| Operation.data_object_mode(initial_open=False), | ||
| create=False, | ||
| finalize_on_close=False, | ||
| allow_redirect=False, | ||
| **{ | ||
| kw.NUM_THREADS_KW: str(num_threads), | ||
| kw.DATA_SIZE_KW: str(total_size), | ||
| kw.RESC_HIER_STR_KW: hier_str, | ||
| kw.REPLICA_TOKEN_KW: replica_token, | ||
| } | ||
| ) | ||
| mgr.add_io(Io) | ||
| logger.debug("target_host = %s", Io.raw.session.pool.account.host) | ||
| if File is None: | ||
| File = gen_file_handle() | ||
| try: | ||
| f = None | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will look thru codacy, proabaly one among many unused variable notifications. |
||
| futures.append( | ||
| f := executor.submit( | ||
| _io_part, | ||
| Io, | ||
| byte_range, | ||
| File, | ||
| Operation, | ||
| mgr, | ||
| thread_debug_id=str(counter), | ||
| **thread_opts | ||
| ) | ||
| ) | ||
| except RuntimeError as error: | ||
| # Executor was probably shut down before parallel transfer could be initiated. | ||
| transfer_aborted = True | ||
| break | ||
| else: | ||
| mgr.add_future(f) | ||
|
|
||
| counter += 1 | ||
| Io = File = None | ||
|
|
||
| if transfer_aborted: | ||
| return ((bytes_transferred:=0), total_size) | ||
|
|
||
| if Operation.isNonBlocking(): | ||
| transfer_managers[mgr] = None | ||
| return (futures, mgr, queueObject) | ||
| else: | ||
| return futures | ||
| else: | ||
| bytecounts = [f.result() for f in futures] | ||
| return sum(bytecounts), total_size | ||
| bytes_transferred = 0 | ||
| # Enable user attempts to cancel the current synchronous transfer. | ||
| # At any given time, only one transfer manager key should map to a tuple object T. | ||
| # You should be able to quit all threads of the current transfer by calling T[0](*T[1]). | ||
| bytecounts = [f.result() for f in futures] | ||
| # If, rather than an integer byte-count, the "None" object was included as one of futures' return values, this | ||
| # is an indication that the PUT or GET operation should be marked as aborted, i.e. no bytes transferred. | ||
| if None not in bytecounts: | ||
| bytes_transferred = sum(bytecounts) | ||
|
|
||
| return (bytes_transferred, total_size) | ||
|
|
||
| except BaseException as e: | ||
|
|
||
| # TODO - examine this experimentally restored code, as | ||
| # library should react to these two exception types(and perhaps others) by quitting all transfer threads | ||
|
Comment on lines
+542
to
+543
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there an issue we can link to with this TODO? To which two exception types is it referring?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's no issue (yet). I was just in the process - incomplete when I wrote the TODO - of trhing to understand how the parts fit together. I can change it to an informative comment - not a todo - wrt BaseException types and their normal place in handling cleanup. In a nutshell, they are not often trapped by application writers. But libraries like this one (since it spawns threads behind the scenes) need to handle them. I'm going to remove the TODO unless there's objections. |
||
|
|
||
| if isinstance(e, (SystemExit, KeyboardInterrupt)): | ||
| mgr.quit() | ||
| raise | ||
|
|
||
| def _quit_current_transfer(obj_id): | ||
| l = [_ for _ in transfer_managers if id(_) == obj_id] | ||
| if l: | ||
| l[0].quit() | ||
|
|
||
| def io_main(session, Data, opr_, fname, R="", **kwopt): | ||
| """ | ||
|
|
@@ -559,18 +661,21 @@ def io_main(session, Data, opr_, fname, R="", **kwopt): | |
|
|
||
| if Operation.isNonBlocking(): | ||
|
|
||
| if queueLength > 0: | ||
| (futures, chunk_notify_queue, mgr) = retval | ||
| else: | ||
| futures = retval | ||
| chunk_notify_queue = total_bytes = None | ||
| # if queueLength > 0: | ||
| (futures, mgr, chunk_notify_queue) = retval | ||
| # else: | ||
| # futures = retval | ||
| # TODO: investigate: Huh? Why were we zeroing out total_bytes when there is no progress queue? | ||
| #chunk_notify_queue = total_bytes = None | ||
|
Comment on lines
+664
to
+669
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to keep these commented-out bits? And if so, can we get an issue number for the TODO?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, this TODO can go. |
||
|
|
||
| return AsyncNotify( | ||
| transfer_managers[mgr] = Data.path | ||
| paths_active[Data.path] = async_notify = AsyncNotify( | ||
| futures, # individual futures, one per transfer thread | ||
| progress_Queue=chunk_notify_queue, # for notifying the progress indicator thread | ||
| total=total_bytes, # total number of bytes for parallel transfer | ||
| keep_={"mgr": mgr}, | ||
| ) # an open raw i/o object needing to be persisted, if any | ||
| return async_notify | ||
| else: | ||
| (_bytes_transferred, _bytes_total) = retval | ||
| return _bytes_transferred == _bytes_total | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.