Skip to content

Commit 51a2bd6

Browse files
Improve process_future_tasks command
1 parent f564ec4 commit 51a2bd6

File tree

6 files changed

+232
-183
lines changed

6 files changed

+232
-183
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1212
- Use `timezone.now()` instead of `datetime.now()` in `PeriodicFutureTask.save()`.
1313
- Use `condition` instead of `check` attribute in `CheckConstraint` with `django>=5.1`.
1414

15+
### Changed
16+
17+
- Improve `process_future_tasks` command.
18+
- Terminate after processing the current task instead of the current task batch if SIGINT/SIGTERM is received.
19+
- Remove unnecessary waiting for new tasks when there are already tasks that can be processed.
20+
- Configurable waiting duration for new tasks.
21+
1522
### Added
1623

1724
- Support for Python 3.14.

django_future_tasks/management/commands/process_future_tasks.py

Lines changed: 65 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -21,99 +21,103 @@ class Command(BaseCommand):
2121

2222
current_task_pk = None
2323

24-
def add_arguments(self, parser):
25-
parser.add_argument(
26-
"--onetimerun",
27-
action="append",
28-
type=bool,
29-
default=False,
30-
help="Run command only one times",
31-
)
24+
def __init__(self, *args, **kwargs):
25+
super().__init__(*args, **kwargs)
26+
27+
# The command will run as long as the `_running` attribute is
28+
# set to `True`. To safely quit the command, just set this attribute to `False` and the
29+
# command will finish a running tick and quit afterwards.
30+
self._running = True
31+
32+
# Register system signal handler to gracefully quit the service when
33+
# getting a `SIGINT` or `SIGTERM` signal (e.g. by CTRL+C).
34+
signal.signal(signal.SIGINT, self._handle_termination)
35+
signal.signal(signal.SIGTERM, self._handle_termination)
3236

3337
def _handle_termination(self, *args, **kwargs):
38+
# Mark the task as interrupted in case the command will receive a SIGKILL before the task was completed.
39+
# If the command terminates graciously instead, the task will be finished and marked as done again by the
40+
# main loop.
3441
try:
3542
current_task = FutureTask.objects.get(pk=self.current_task_pk)
3643
current_task.status = FutureTask.FUTURE_TASK_STATUS_INTERRUPTED
3744
current_task.save()
3845
except FutureTask.DoesNotExist:
3946
pass
47+
4048
self._running = False
4149

4250
def _handle_options(self, options):
43-
self.tick = 1
44-
self.one_time_run = options["onetimerun"]
51+
self.one_time_run = options["one_time_run"]
52+
self.wait_for_tasks_duration_seconds = options["wait_for_tasks_duration_seconds"]
4553

46-
@staticmethod
47-
def tasks_for_processing():
54+
def _get_open_tasks(self):
4855
return FutureTask.objects.filter(
4956
eta__lte=timezone.now(),
5057
status=FutureTask.FUTURE_TASK_STATUS_OPEN,
5158
).order_by("eta")
5259

60+
def _endless_task_iterator(self):
61+
while self._running:
62+
tasks = self._get_open_tasks()
63+
yield from tasks
64+
if not tasks:
65+
time.sleep(self.wait_for_tasks_duration_seconds)
66+
5367
@staticmethod
5468
def _convert_exception_args(args):
5569
return [str(arg) for arg in args]
5670

57-
def handle_tick(self):
58-
task_list = self.tasks_for_processing()
59-
logger.debug(f"Got {len(task_list)} tasks for processing")
71+
def _handle_task(self, task):
72+
task.status = FutureTask.FUTURE_TASK_STATUS_IN_PROGRESS
73+
task.save()
74+
self.current_task_pk = task.pk
75+
try:
76+
start_time = timeit.default_timer()
77+
future_task_signal.send(sender=intern(task.type), instance=task)
78+
task.execution_time = timeit.default_timer() - start_time
79+
task.status = FutureTask.FUTURE_TASK_STATUS_DONE
80+
except Exception as exception:
81+
task.status = FutureTask.FUTURE_TASK_STATUS_ERROR
82+
task.result = {
83+
"exception": f"An exception of type {type(exception).__name__} occurred.",
84+
"args": self._convert_exception_args(exception.args),
85+
"traceback": traceback.format_exception(
86+
*sys.exc_info(),
87+
limit=None,
88+
chain=None,
89+
),
90+
}
91+
logger.exception(exception)
92+
self.current_task_pk = None
93+
task.save()
6094

61-
for task in task_list:
62-
task.status = FutureTask.FUTURE_TASK_STATUS_IN_PROGRESS
63-
task.save()
64-
self.current_task_pk = task.pk
65-
try:
66-
start_time = timeit.default_timer()
67-
future_task_signal.send(sender=intern(task.type), instance=task)
68-
task.execution_time = timeit.default_timer() - start_time
69-
task.status = FutureTask.FUTURE_TASK_STATUS_DONE
70-
except Exception as exc:
71-
task.status = FutureTask.FUTURE_TASK_STATUS_ERROR
72-
task.result = {
73-
"exception": f"An exception of type {type(exc).__name__} occurred.",
74-
"args": self._convert_exception_args(exc.args),
75-
"traceback": traceback.format_exception(
76-
*sys.exc_info(),
77-
limit=None,
78-
chain=None,
79-
),
80-
}
81-
logger.exception(exc)
82-
self.current_task_pk = None
83-
task.save()
84-
85-
time.sleep(self.tick)
95+
def add_arguments(self, parser):
96+
parser.add_argument(
97+
"--one-time-run",
98+
action="store_true",
99+
help="Process tasks that are open at the time of running the command and exit.",
100+
)
101+
parser.add_argument(
102+
"--wait-for-tasks-duration-seconds",
103+
type=float,
104+
default=1.0,
105+
help="If there are no open tasks the command waits this amount of time until it checks for open tasks again.",
106+
)
86107

87108
def handle(self, *args, **options):
88109
# Load given options.
89110
self._handle_options(options)
90-
111+
tasks = iter(self._get_open_tasks()) if self.one_time_run else self._endless_task_iterator()
91112
while self._running:
92-
time.sleep(self.tick)
93-
94113
try:
95-
self.handle_tick()
96-
if self.one_time_run:
97-
break
98-
114+
self._handle_task(next(tasks))
115+
except StopIteration:
116+
break
99117
except Exception as exc:
100118
logger.exception(
101119
f"{exc.__class__.__name__} exception occurred...",
102120
)
103-
104121
# As the database connection might have failed, we discard it here, so django will
105122
# create a new one on the next database access.
106123
db.close_old_connections()
107-
108-
def __init__(self, *args, **kwargs):
109-
super().__init__(*args, **kwargs)
110-
111-
# The command will run as long as the `_running` attribute is
112-
# set to `True`. To safely quit the command, just set this attribute to `False` and the
113-
# command will finish a running tick and quit afterwards.
114-
self._running = True
115-
116-
# Register system signal handler to gracefully quit the service when
117-
# getting a `SIGINT` or `SIGTERM` signal (e.g. by CTRL+C).
118-
signal.signal(signal.SIGINT, self._handle_termination)
119-
signal.signal(signal.SIGTERM, self._handle_termination)

tests/testapp/handlers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def my_task_function_error(sender, instance, **kwargs):
2525

2626
@receiver(future_task_signal, sender=intern(settings.FUTURE_TASK_TYPE_INTERRUPTION))
2727
def my_task_function_interruption(sender, instance, **kwargs):
28-
time.sleep(10)
28+
time.sleep(2)
2929

3030

3131
@receiver(future_task_signal, sender=intern(settings.FUTURE_TASK_TYPE_ETA_ORDERING))

tests/testapp/mixins.py

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,6 @@
77
from django_future_tasks.management.commands.populate_periodic_future_tasks import (
88
Command as PopulatePeriodicTasksCommand,
99
)
10-
from django_future_tasks.management.commands.process_future_tasks import (
11-
Command as ProcessTasksCommand,
12-
)
1310

1411
logger = logging.getLogger(__name__)
1512

@@ -20,31 +17,6 @@ def run(self):
2017
connection.close()
2118

2219

23-
class ProcessTasksCommandMixin:
24-
@classmethod
25-
def setUpClass(cls):
26-
assert not hasattr(cls, "command_instance") or cls.command_instance is None, (
27-
"process_future_tasks has already been started"
28-
)
29-
logger.info("Starting process_future_tasks...")
30-
31-
cls.command_instance = ProcessTasksCommand()
32-
cls.thread = TestThread(target=call_command, args=(cls.command_instance,))
33-
cls.thread.start()
34-
super().setUpClass()
35-
36-
@classmethod
37-
def tearDownClass(cls):
38-
assert cls.command_instance is not None, (
39-
"process_future_tasks has not been started and can therefore not be stopped"
40-
)
41-
logger.info("Stopping process_future_tasks...")
42-
43-
super().tearDownClass()
44-
cls.command_instance._handle_termination()
45-
cls.thread.join()
46-
47-
4820
class PopulatePeriodicTaskCommandMixin:
4921
@classmethod
5022
def setUpClass(cls):

0 commit comments

Comments
 (0)