Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 168 additions & 0 deletions core/Command/TaskProcessing/WorkerCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
<?php

declare(strict_types=1);

/**
* SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors
* SPDX-FileCopyrightText: 2026 Nextcloud GmbH and Nextcloud contributors

* SPDX-License-Identifier: AGPL-3.0-or-later
*/
namespace OC\Core\Command\TaskProcessing;

use OC\Core\Command\Base;
use OC\Core\Command\InterruptedException;
use OCP\TaskProcessing\Exception\Exception;
use OCP\TaskProcessing\Exception\NotFoundException;
use OCP\TaskProcessing\IManager;
use OCP\TaskProcessing\ISynchronousProvider;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

class WorkerCommand extends Base {
public function __construct(
private readonly IManager $taskProcessingManager,
private readonly LoggerInterface $logger,
) {
parent::__construct();
}

protected function configure(): void {
$this
->setName('taskprocessing:worker')
->setDescription('Run a dedicated worker for synchronous TaskProcessing providers')
->addOption(
'timeout',
't',
InputOption::VALUE_OPTIONAL,
'Duration in seconds after which the worker exits (0 = run indefinitely)',
0
)
->addOption(
'interval',
'i',
InputOption::VALUE_OPTIONAL,
'Sleep duration in seconds between polling iterations when no task was processed',
1
)
->addOption(
'once',
null,
InputOption::VALUE_NONE,
'Process at most one task then exit'
)
->addOption(
'taskTypes',
null,
InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY,
'Only process tasks of the given task type IDs (can be specified multiple times)'
);
parent::configure();
}

protected function execute(InputInterface $input, OutputInterface $output): int {
$startTime = time();
$timeout = (int)$input->getOption('timeout');
$interval = (int)$input->getOption('interval');
$once = $input->getOption('once') === true;
/** @var list<string> $taskTypes */
$taskTypes = $input->getOption('taskTypes');

if ($timeout > 0) {
$output->writeln('<info>Task processing worker will stop after ' . $timeout . ' seconds</info>');
}

while (true) {
// Stop if timeout exceeded
if ($timeout > 0 && ($startTime + $timeout) < time()) {
$output->writeln('Timeout reached, exiting...', OutputInterface::VERBOSITY_VERBOSE);
break;
}

// Handle SIGTERM/SIGINT gracefully
try {
$this->abortIfInterrupted();
} catch (InterruptedException $e) {
$output->writeln('<info>Task processing worker stopped</info>');
break;
}

$processedTask = $this->processNextTask($output, $taskTypes);

if ($once) {
break;
}

if (!$processedTask) {
$output->writeln('No task processed, waiting ' . $interval . ' second(s)...', OutputInterface::VERBOSITY_VERBOSE);
sleep($interval);
}
}

return 0;
}

/**
* Attempt to process one task across all preferred synchronous providers.
*
* @param list<string> $taskTypes When non-empty, only providers for these task type IDs are considered.
* @return bool True if a task was processed, false if no task was found
*/
private function processNextTask(OutputInterface $output, array $taskTypes = []): bool {
$providers = $this->taskProcessingManager->getProviders();
// Shuffle providers to avoid starvation: if providers are always iterated
// in the same order, a provider with a constant stream of tasks would
// prevent all subsequent providers from ever being processed.
shuffle($providers);

foreach ($providers as $provider) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, we don't get the oldest task but we go through the task types, and we take the oldest task of the first provider that is synchronous.
So if there is a constant flow of scheduled tasks for this provider, they will run before the other ones. This looks like potential starvation 😁 .

I think we could loop on all the tasks, starting with the oldest and take the first one that is handled by a synchronous provider. Wdyt?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot's fix is to add a shuffle :D

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with looping through tasks is that it potentially takes ages until we reach an eligible task.

if (!$provider instanceof ISynchronousProvider) {
continue;
}

$taskTypeId = $provider->getTaskTypeId();

// If a task type whitelist was provided, skip providers not in the list
if (!empty($taskTypes) && !in_array($taskTypeId, $taskTypes, true)) {
continue;
}

// Only use this provider if it is the preferred one for the task type
try {
$preferredProvider = $this->taskProcessingManager->getPreferredProvider($taskTypeId);
} catch (Exception $e) {
$this->logger->error('Failed to get preferred provider for task type ' . $taskTypeId, ['exception' => $e]);
continue;
}

if ($provider->getId() !== $preferredProvider->getId()) {
continue;
}

try {
$task = $this->taskProcessingManager->getNextScheduledTask([$taskTypeId]);
} catch (NotFoundException) {
continue;
} catch (Exception $e) {
$this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]);
continue;
}

$output->writeln(
'Processing task ' . $task->getId() . ' of type ' . $taskTypeId . ' with provider ' . $provider->getId(),
OutputInterface::VERBOSITY_VERBOSE
);

$this->taskProcessingManager->processTask($task, $provider);

$output->writeln(
'Finished processing task ' . $task->getId(),
OutputInterface::VERBOSITY_VERBOSE
);

return true;
}

return false;
}
}
2 changes: 2 additions & 0 deletions core/register_command.php
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
use OC\Core\Command\TaskProcessing\EnabledCommand;
use OC\Core\Command\TaskProcessing\GetCommand;
use OC\Core\Command\TaskProcessing\Statistics;
use OC\Core\Command\TaskProcessing\WorkerCommand;
use OC\Core\Command\TwoFactorAuth\Cleanup;
use OC\Core\Command\TwoFactorAuth\Enforce;
use OC\Core\Command\TwoFactorAuth\State;
Expand Down Expand Up @@ -255,6 +256,7 @@
$application->add(Server::get(Command\TaskProcessing\ListCommand::class));
$application->add(Server::get(Statistics::class));
$application->add(Server::get(Command\TaskProcessing\Cleanup::class));
$application->add(Server::get(WorkerCommand::class));

$application->add(Server::get(RedisCommand::class));
$application->add(Server::get(DistributedClear::class));
Expand Down
1 change: 1 addition & 0 deletions lib/composer/composer/autoload_classmap.php
Original file line number Diff line number Diff line change
Expand Up @@ -1422,6 +1422,7 @@
'OC\\Core\\Command\\TaskProcessing\\GetCommand' => $baseDir . '/core/Command/TaskProcessing/GetCommand.php',
'OC\\Core\\Command\\TaskProcessing\\ListCommand' => $baseDir . '/core/Command/TaskProcessing/ListCommand.php',
'OC\\Core\\Command\\TaskProcessing\\Statistics' => $baseDir . '/core/Command/TaskProcessing/Statistics.php',
'OC\\Core\\Command\\TaskProcessing\\WorkerCommand' => $baseDir . '/core/Command/TaskProcessing/WorkerCommand.php',
'OC\\Core\\Command\\TwoFactorAuth\\Base' => $baseDir . '/core/Command/TwoFactorAuth/Base.php',
'OC\\Core\\Command\\TwoFactorAuth\\Cleanup' => $baseDir . '/core/Command/TwoFactorAuth/Cleanup.php',
'OC\\Core\\Command\\TwoFactorAuth\\Disable' => $baseDir . '/core/Command/TwoFactorAuth/Disable.php',
Expand Down
1 change: 1 addition & 0 deletions lib/composer/composer/autoload_static.php
Original file line number Diff line number Diff line change
Expand Up @@ -1463,6 +1463,7 @@ class ComposerStaticInit749170dad3f5e7f9ca158f5a9f04f6a2
'OC\\Core\\Command\\TaskProcessing\\GetCommand' => __DIR__ . '/../../..' . '/core/Command/TaskProcessing/GetCommand.php',
'OC\\Core\\Command\\TaskProcessing\\ListCommand' => __DIR__ . '/../../..' . '/core/Command/TaskProcessing/ListCommand.php',
'OC\\Core\\Command\\TaskProcessing\\Statistics' => __DIR__ . '/../../..' . '/core/Command/TaskProcessing/Statistics.php',
'OC\\Core\\Command\\TaskProcessing\\WorkerCommand' => __DIR__ . '/../../..' . '/core/Command/TaskProcessing/WorkerCommand.php',
'OC\\Core\\Command\\TwoFactorAuth\\Base' => __DIR__ . '/../../..' . '/core/Command/TwoFactorAuth/Base.php',
'OC\\Core\\Command\\TwoFactorAuth\\Cleanup' => __DIR__ . '/../../..' . '/core/Command/TwoFactorAuth/Cleanup.php',
'OC\\Core\\Command\\TwoFactorAuth\\Disable' => __DIR__ . '/../../..' . '/core/Command/TwoFactorAuth/Disable.php',
Expand Down
Loading
Loading