-
Notifications
You must be signed in to change notification settings - Fork 3
Improve process future tasks command #18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,3 +4,6 @@ __pycache__ | |
| db.sqlite3 | ||
| build/* | ||
| tests/static/* | ||
| .python-version | ||
| .coverage | ||
| uv.lock | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,101 +21,103 @@ class Command(BaseCommand): | |
|
|
||
| current_task_pk = None | ||
|
|
||
| def add_arguments(self, parser): | ||
| parser.add_argument( | ||
| "--onetimerun", | ||
| action="append", | ||
| type=bool, | ||
| default=False, | ||
| help="Run command only one times", | ||
| ) | ||
| def __init__(self, *args, **kwargs): | ||
| super().__init__(*args, **kwargs) | ||
|
|
||
| # The command will run as long as the `_running` attribute is | ||
| # set to `True`. To safely quit the command, just set this attribute to `False` and the | ||
| # command will finish a running tick and quit afterwards. | ||
| self._running = True | ||
|
|
||
| # Register system signal handler to gracefully quit the service when | ||
| # getting a `SIGINT` or `SIGTERM` signal (e.g. by CTRL+C). | ||
| signal.signal(signal.SIGINT, self._handle_termination) | ||
| signal.signal(signal.SIGTERM, self._handle_termination) | ||
|
|
||
| def _handle_termination(self, *args, **kwargs): | ||
| # Mark the task as interrupted in case the command will receive a SIGKILL before the task was completed. | ||
| # If the command terminates graciously instead, the task will be finished and marked as done again by the | ||
| # main loop. | ||
| try: | ||
| current_task = FutureTask.objects.get(pk=self.current_task_pk) | ||
| current_task.status = FutureTask.FUTURE_TASK_STATUS_INTERRUPTED | ||
| current_task.save() | ||
| except FutureTask.DoesNotExist: | ||
| pass | ||
|
|
||
| self._running = False | ||
|
|
||
| def _handle_options(self, options): | ||
| self.tick = 1 | ||
| self.one_time_run = options["onetimerun"] | ||
| self.one_time_run = options["one_time_run"] | ||
| self.wait_for_tasks_duration_seconds = options["wait_for_tasks_duration_seconds"] | ||
|
|
||
| @staticmethod | ||
| def tasks_for_processing(): | ||
| def _get_open_tasks(self): | ||
| return FutureTask.objects.filter( | ||
| eta__lte=timezone.now(), | ||
| status=FutureTask.FUTURE_TASK_STATUS_OPEN, | ||
| ).order_by("eta") | ||
|
|
||
| def _endless_task_iterator(self): | ||
| while self._running: | ||
| tasks = self._get_open_tasks() | ||
| yield from tasks | ||
| if not tasks: | ||
| time.sleep(self.wait_for_tasks_duration_seconds) | ||
|
|
||
| @staticmethod | ||
| def _convert_exception_args(args): | ||
| return [str(arg) for arg in args] | ||
|
|
||
| def handle_tick(self): | ||
| task_list = self.tasks_for_processing() | ||
| logger.debug(f"Got {len(task_list)} tasks for processing") | ||
| def _handle_task(self, task): | ||
| task.status = FutureTask.FUTURE_TASK_STATUS_IN_PROGRESS | ||
| task.save() | ||
| self.current_task_pk = task.pk | ||
|
Comment on lines
+71
to
+74
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of handling batches of tasks, we now handle one task in one "tick". |
||
| try: | ||
| start_time = timeit.default_timer() | ||
| future_task_signal.send(sender=intern(task.type), instance=task) | ||
| task.execution_time = timeit.default_timer() - start_time | ||
| task.status = FutureTask.FUTURE_TASK_STATUS_DONE | ||
| except Exception as exception: | ||
| task.status = FutureTask.FUTURE_TASK_STATUS_ERROR | ||
| task.result = { | ||
| "exception": f"An exception of type {type(exception).__name__} occurred.", | ||
| "args": self._convert_exception_args(exception.args), | ||
| "traceback": traceback.format_exception( | ||
| *sys.exc_info(), | ||
| limit=None, | ||
| chain=None, | ||
| ), | ||
| } | ||
| logger.exception(exception) | ||
| self.current_task_pk = None | ||
| task.save() | ||
|
|
||
| for task in task_list: | ||
| task.status = FutureTask.FUTURE_TASK_STATUS_IN_PROGRESS | ||
| task.save() | ||
| self.current_task_pk = task.pk | ||
| try: | ||
| start_time = timeit.default_timer() | ||
| future_task_signal.send(sender=intern(task.type), instance=task) | ||
| task.execution_time = timeit.default_timer() - start_time | ||
| task.status = FutureTask.FUTURE_TASK_STATUS_DONE | ||
| except Exception as exc: | ||
| task.status = FutureTask.FUTURE_TASK_STATUS_ERROR | ||
| task.result = { | ||
| "exception": "An exception of type {} occurred.".format( | ||
| type(exc).__name__, | ||
| ), | ||
| "args": self._convert_exception_args(exc.args), | ||
| "traceback": traceback.format_exception( | ||
| *sys.exc_info(), | ||
| limit=None, | ||
| chain=None, | ||
| ), | ||
| } | ||
| logger.exception(exc) | ||
| self.current_task_pk = None | ||
| task.save() | ||
|
|
||
| time.sleep(self.tick) | ||
| def add_arguments(self, parser): | ||
| parser.add_argument( | ||
| "--one-time-run", | ||
| action="store_true", | ||
| help="Process tasks that are open at the time of running the command and exit.", | ||
| ) | ||
|
Comment on lines
+96
to
+100
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed the one-time-run argument. The old one was broken. |
||
| parser.add_argument( | ||
| "--wait-for-tasks-duration-seconds", | ||
| type=float, | ||
| default=1.0, | ||
| help="If there are no open tasks the command waits this amount of time until it checks for open tasks again.", | ||
| ) | ||
|
Comment on lines
+101
to
+106
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. New argument for the waiting time. |
||
|
|
||
| def handle(self, *args, **options): | ||
| # Load given options. | ||
| self._handle_options(options) | ||
|
|
||
| tasks = iter(self._get_open_tasks()) if self.one_time_run else self._endless_task_iterator() | ||
| while self._running: | ||
| time.sleep(self.tick) | ||
|
|
||
| try: | ||
| self.handle_tick() | ||
| if self.one_time_run: | ||
| break | ||
|
|
||
| self._handle_task(next(tasks)) | ||
| except StopIteration: | ||
| break | ||
| except Exception as exc: | ||
| logger.exception( | ||
| f"{exc.__class__.__name__} exception occurred...", | ||
| ) | ||
|
|
||
| # As the database connection might have failed, we discard it here, so django will | ||
| # create a new one on the next database access. | ||
| db.close_old_connections() | ||
|
|
||
| def __init__(self, *args, **kwargs): | ||
| super().__init__(*args, **kwargs) | ||
|
|
||
| # The command will run as long as the `_running` attribute is | ||
| # set to `True`. To safely quit the command, just set this attribute to `False` and the | ||
| # command will finish a running tick and quit afterwards. | ||
| self._running = True | ||
|
|
||
| # Register system signal handler to gracefully quit the service when | ||
| # getting a `SIGINT` or `SIGTERM` signal (e.g. by CTRL+C). | ||
| signal.signal(signal.SIGINT, self._handle_termination) | ||
| signal.signal(signal.SIGTERM, self._handle_termination) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the new basis for the worker-behavior.