Skip to content

Conversation

@christophbuermann
Copy link
Collaborator

  • Improve process_future_tasks command.
    • Terminate after processing the current task instead of the current task batch if SIGINT/SIGTERM is received.
    • Remove unnecessary waiting for new tasks when there are already tasks that can be processed.
    • Configurable waiting duration for new tasks.

@christophbuermann christophbuermann force-pushed the improve_process_future_tasks_command branch from eaeb63f to 51a2bd6 Compare October 17, 2025 14:54
@christophbuermann
Copy link
Collaborator Author

⚠️ This pull request depends on and is based on: #17

Comment on lines +60 to +65
def _endless_task_iterator(self):
while self._running:
tasks = self._get_open_tasks()
yield from tasks
if not tasks:
time.sleep(self.wait_for_tasks_duration_seconds)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the new basis for the worker-behavior.

Comment on lines +71 to +74
def _handle_task(self, task):
task.status = FutureTask.FUTURE_TASK_STATUS_IN_PROGRESS
task.save()
self.current_task_pk = task.pk
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of handling batches of tasks, we now handle one task in one "tick".

Comment on lines +96 to +100
parser.add_argument(
"--one-time-run",
action="store_true",
help="Process tasks that are open at the time of running the command and exit.",
)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed the one-time-run argument. The old one was broken.

Comment on lines +101 to +106
parser.add_argument(
"--wait-for-tasks-duration-seconds",
type=float,
default=1.0,
help="If there are no open tasks the command waits this amount of time until it checks for open tasks again.",
)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New argument for the waiting time.



def _wait_for_task_status(task, status, tick_seconds=0.1, timeout_seconds=3):
def _wait_for_task_status(task, status, tick_seconds: float = 0.15, timeout_seconds: float = 3.0):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Locking shenanigans with SQLite caused me to use 0.15 seconds as default waiting time because it doesn't align so perfectly with the default 1.0 waiting time of the worker. Postgres would be nice.

Comment on lines +35 to +36
@pytest.mark.django_db(transaction=True)
class TestWorker:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some pytest-friendly refactoring of the tests.

Comment on lines +9 to +10
class ProcessFutureTasksWorker:
def __init__(self, *args: str):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Helpful worker with context manager interface for usage in the tests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant