Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add queue:run-all command #6204

Open
wants to merge 5 commits into
base: 13.x
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 172 additions & 42 deletions src/Commands/core/QueueCommands.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@

namespace Drush\Commands\core;

use Consolidation\AnnotatedCommand\Hooks\HookManager;
use Consolidation\OutputFormatters\StructuredData\RowsOfFields;
use Drupal\Core\Queue\DelayableQueueInterface;
use Drupal\Core\Queue\DelayedRequeueException;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Queue\QueueGarbageCollectionInterface;
use Drupal\Core\Queue\QueueInterface;
use Drupal\Core\Queue\QueueWorkerInterface;
use Drupal\Core\Queue\QueueWorkerManagerInterface;
use Drupal\Core\Queue\RequeueException;
use Drupal\Core\Queue\SuspendQueueException;
Expand All @@ -18,12 +20,14 @@
use Drush\Commands\DrushCommands;
use Symfony\Component\Console\Completion\CompletionInput;
use Symfony\Component\Console\Completion\CompletionSuggestions;
use Symfony\Component\Console\Input\InputInterface;

final class QueueCommands extends DrushCommands
{
use AutowireTrait;

const RUN = 'queue:run';
const RUN_ALL = 'queue:run-all';
const LIST = 'queue:list';
const DELETE = 'queue:delete';

Expand Down Expand Up @@ -54,62 +58,188 @@ public function getQueueService(): QueueFactory
#[CLI\Argument(name: 'name', description: 'The name of the queue to run.')]
#[CLI\Option(name: 'time-limit', description: 'The maximum number of seconds allowed to run the queue.')]
#[CLI\Option(name: 'items-limit', description: 'The maximum number of items allowed to run the queue.')]
#[CLI\Option(name: 'memory-limit', description: 'The maximum amount of memory the script can consume before exiting. Can be a value supported by the memory_limit PHP setting or a percentage.')]
#[CLI\Option(name: 'lease-time', description: 'The maximum number of seconds that an item remains claimed.')]
#[CLI\Option(name: 'daemon', description: 'Keep the command running indefinitely, or until one of the chosen limits has been reached.')]
#[CLI\ValidateQueueName(argumentName: 'name')]
#[CLI\Complete(method_name_or_callable: 'queueComplete')]
public function run(string $name, $options = ['time-limit' => self::REQ, 'items-limit' => self::REQ, 'lease-time' => self::REQ]): void
public function run(string $name, $options = ['time-limit' => self::REQ, 'items-limit' => self::REQ, 'memory-limit' => self::REQ, 'lease-time' => self::REQ, 'daemon' => false]): void
{
$time_limit = (int) $options['time-limit'];
$items_limit = (int) $options['items-limit'];
$start = microtime(true);
$worker = $this->getWorkerManager()->createInstance($name);
$info = $this->getWorkerManager()->getDefinition($name);
$end = time() + $time_limit;
$queue = $this->getQueue($name);
$count = 0;
$remaining = $time_limit;
$lease_time = $options['lease-time'] ?? $info['cron']['time'] ?? 30;

$start = microtime(true);
$end = time() + $options['time-limit'];
$time_remaining = $options['time-limit'];
$items_count = 0;

if ($queue instanceof QueueGarbageCollectionInterface) {
$queue->garbageCollection();
}

while ((!$time_limit || $remaining > 0) && (!$items_limit || $count < $items_limit) && ($item = $queue->claimItem($lease_time))) {
try {
// @phpstan-ignore-next-line
$this->logger()->info(dt('Processing item @id from @name queue.', ['@name' => $name, '@id' => $item->item_id ?? $item->qid]));
// @phpstan-ignore-next-line
$worker->processItem($item->data);
$queue->deleteItem($item);
$count++;
} catch (RequeueException) {
// The worker requested the task to be immediately requeued.
$queue->releaseItem($item);
} catch (SuspendQueueException $e) {
// If the worker indicates there is a problem with the whole queue,
// release the item.
$queue->releaseItem($item);
throw new \Exception($e->getMessage(), $e->getCode(), $e);
} catch (DelayedRequeueException $e) {
// The worker requested the task not be immediately re-queued.
// - If the queue doesn't support ::delayItem(), we should leave the
// item's current expiry time alone.
// - If the queue does support ::delayItem(), we should allow the
// queue to update the item's expiry using the requested delay.
if ($queue instanceof DelayableQueueInterface) {
// This queue can handle a custom delay; use the duration provided
// by the exception.
$queue->delayItem($item, $e->getDelay());
do {
while (!$this->hasReachedLimit($options, $items_count, $time_remaining) && ($item = $queue->claimItem($options['lease-time']))) {
if ($this->processItem($queue, $worker, $name, $item)) {
$items_count++;
}
} catch (\Exception $e) {
// In case of any other kind of exception, log it and leave the
// item in the queue to be processed again later.
$this->logger()->error($e->getMessage());
$time_remaining = $end - time();
}
$remaining = $end - time();
}
if ($options['daemon']) {
sleep(1);
}
} while ($options['daemon'] && !$this->hasReachedLimit($options, $items_count, $time_remaining));

$elapsed = microtime(true) - $start;
$this->logger()->success(dt('Processed @count items from the @name queue in @elapsed sec.', ['@count' => $items_count, '@name' => $name, '@elapsed' => round($elapsed, 2)]));
}

#[CLI\Hook(type: HookManager::POST_INITIALIZE, target: self::RUN)]
public function postInitRun(InputInterface $input): void
{
$name = $input->getArgument('name');
$info = $this->getWorkerManager()->getDefinition($name);

$input->setOption('time-limit', (int) $input->getOption('time-limit'));
$input->setOption('items-limit', (int) $input->getOption('items-limit'));
$input->setOption('memory-limit', $this->parseMemoryLimit($input->getOption('memory-limit')));
$input->setOption('lease-time', $input->getOption('lease-time') ?? $info['cron']['time'] ?? 30);
}

/**
* Run all available queues.
*/
#[CLI\Command(name: self::RUN_ALL, aliases: ['queue-run-all'])]
#[CLI\Option(name: 'time-limit', description: 'The maximum number of seconds allowed to run the queue.')]
#[CLI\Option(name: 'items-limit', description: 'The maximum number of items allowed to run the queue.')]
#[CLI\Option(name: 'memory-limit', description: 'The maximum amount of memory the script can consume before exiting. Can be a value supported by the memory_limit PHP setting or a percentage.')]
#[CLI\Option(name: 'lease-time', description: 'The maximum number of seconds that an item remains claimed.')]
#[CLI\Option(name: 'daemon', description: 'Keep the command running indefinitely, or until one of the chosen limits has been reached.')]
public function runAll($options = ['time-limit' => self::REQ, 'items-limit' => self::REQ, 'memory-limit' => self::REQ, 'lease-time' => self::REQ, 'daemon' => false]): void
{
$start = microtime(true);
$end = time() + $options['time-limit'];
$time_remaining = $options['time-limit'];
$items_count = 0;
$queues = $this->getQueues();

do {
foreach ($queues as $name => $info) {
$queue = $this->getQueue($name);
$worker = $this->getWorkerManager()->createInstance($name);
$lease_time = $options['lease-time'] ?? $info['cron']['time'] ?? 30;
$queue_starting = true;
$queue_start = microtime(true);
$queue_items_count = 0;

if ($queue instanceof QueueGarbageCollectionInterface) {
$queue->garbageCollection();
}

while (!$this->hasReachedLimit($options, $items_count, $time_remaining) && $item = $queue->claimItem($lease_time)) {
if ($queue_starting) {
$this->logger()->notice('Processing queue ' . $name);
}
if ($this->processItem($queue, $worker, $name, $item)) {
$queue_items_count++;
}
$time_remaining = $end - time();
$queue_starting = false;
}

if ($queue_items_count > 0) {
$items_count += $queue_items_count;
$elapsed = microtime(true) - $queue_start;
$this->logger()->success(dt('Processed @count items from the @name queue in @elapsed sec.', ['@count' => $queue_items_count, '@name' => $name, '@elapsed' => round($elapsed, 2)]));
}
}
if ($options['daemon']) {
sleep(1);
}
} while ($options['daemon'] && !$this->hasReachedLimit($options, $items_count, $time_remaining));

$elapsed = microtime(true) - $start;
$this->logger()->success(dt('Processed @count items from the @name queue in @elapsed sec.', ['@count' => $count, '@name' => $name, '@elapsed' => round($elapsed, 2)]));
$this->logger()->success(dt('Processed @count items in @elapsed sec.', ['@count' => $items_count, '@elapsed' => round($elapsed, 2)]));
}

#[CLI\Hook(type: HookManager::POST_INITIALIZE, target: self::RUN_ALL)]
public function postInitRunAll(InputInterface $input): void
{
$input->setOption('time-limit', (int) $input->getOption('time-limit'));
$input->setOption('items-limit', (int) $input->getOption('items-limit'));
$input->setOption('memory-limit', $this->parseMemoryLimit($input->getOption('memory-limit')));
}

protected function hasReachedLimit(array $options, int $items_count, int $time_remaining): bool
{
return ($options['time-limit'] && $time_remaining <= 0)
|| ($options['items-limit'] && $items_count >= $options['items-limit'])
|| ($options['memory-limit'] && memory_get_usage() >= $options['memory-limit']);
}

protected function parseMemoryLimit(?string $value): ?int
{
if ($value === null || $value === '') {
return null;
}

$last = strtolower($value[strlen($value)-1]);
$size = (int) rtrim($value, 'GgMmKk%');

switch ($last) {
case 'g':
$size *= DRUSH_KILOBYTE;
case 'm':
$size *= DRUSH_KILOBYTE;
case 'k':
$size *= DRUSH_KILOBYTE;
case '%':
$size = (int) ($size * (drush_memory_limit() / 100));
}

return $size;
}

/**
* Process an item from the queue.
*
* @return bool
* TRUE if the item was processed, FALSE otherwise.
*/
protected function processItem(QueueInterface $queue, QueueWorkerInterface $worker, string $name, object $item): bool
{
try {
// @phpstan-ignore-next-line
$this->logger()->info(dt('Processing item @id from @name queue.', ['@name' => $name, '@id' => $item->item_id ?? $item->qid]));
// @phpstan-ignore-next-line
$worker->processItem($item->data);
$queue->deleteItem($item);
return true;
} catch (RequeueException) {
// The worker requested the task to be immediately requeued.
$queue->releaseItem($item);
} catch (SuspendQueueException $e) {
// If the worker indicates there is a problem with the whole queue,
// release the item.
$queue->releaseItem($item);
throw new \Exception($e->getMessage(), $e->getCode(), $e);
} catch (DelayedRequeueException $e) {
// The worker requested the task not be immediately re-queued.
// - If the queue doesn't support ::delayItem(), we should leave the
// item's current expiry time alone.
// - If the queue does support ::delayItem(), we should allow the
// queue to update the item's expiry using the requested delay.
if ($queue instanceof DelayableQueueInterface) {
// This queue can handle a custom delay; use the duration provided
// by the exception.
$queue->delayItem($item, $e->getDelay());
}
} catch (\Exception $e) {
// In case of any other kind of exception, log it and leave the
// item in the queue to be processed again later.
$this->logger()->error($e->getMessage());
}

return false;
}

/**
Expand Down