diff --git a/src/Commands/core/QueueCommands.php b/src/Commands/core/QueueCommands.php index 6e0c755619..3b5fc58616 100644 --- a/src/Commands/core/QueueCommands.php +++ b/src/Commands/core/QueueCommands.php @@ -4,12 +4,14 @@ namespace Drush\Commands\core; +use Consolidation\AnnotatedCommand\Hooks\HookManager; use Consolidation\OutputFormatters\StructuredData\RowsOfFields; use Drupal\Core\Queue\DelayableQueueInterface; use Drupal\Core\Queue\DelayedRequeueException; use Drupal\Core\Queue\QueueFactory; use Drupal\Core\Queue\QueueGarbageCollectionInterface; use Drupal\Core\Queue\QueueInterface; +use Drupal\Core\Queue\QueueWorkerInterface; use Drupal\Core\Queue\QueueWorkerManagerInterface; use Drupal\Core\Queue\RequeueException; use Drupal\Core\Queue\SuspendQueueException; @@ -18,12 +20,14 @@ use Drush\Commands\DrushCommands; use Symfony\Component\Console\Completion\CompletionInput; use Symfony\Component\Console\Completion\CompletionSuggestions; +use Symfony\Component\Console\Input\InputInterface; final class QueueCommands extends DrushCommands { use AutowireTrait; const RUN = 'queue:run'; + const RUN_ALL = 'queue:run-all'; const LIST = 'queue:list'; const DELETE = 'queue:delete'; @@ -54,62 +58,188 @@ public function getQueueService(): QueueFactory #[CLI\Argument(name: 'name', description: 'The name of the queue to run.')] #[CLI\Option(name: 'time-limit', description: 'The maximum number of seconds allowed to run the queue.')] #[CLI\Option(name: 'items-limit', description: 'The maximum number of items allowed to run the queue.')] + #[CLI\Option(name: 'memory-limit', description: 'The maximum amount of memory the script can consume before exiting. Can be a value supported by the memory_limit PHP setting or a percentage.')] #[CLI\Option(name: 'lease-time', description: 'The maximum number of seconds that an item remains claimed.')] + #[CLI\Option(name: 'daemon', description: 'Keep the command running indefinitely, or until one of the chosen limits has been reached.')] #[CLI\ValidateQueueName(argumentName: 'name')] #[CLI\Complete(method_name_or_callable: 'queueComplete')] - public function run(string $name, $options = ['time-limit' => self::REQ, 'items-limit' => self::REQ, 'lease-time' => self::REQ]): void + public function run(string $name, $options = ['time-limit' => self::REQ, 'items-limit' => self::REQ, 'memory-limit' => self::REQ, 'lease-time' => self::REQ, 'daemon' => false]): void { - $time_limit = (int) $options['time-limit']; - $items_limit = (int) $options['items-limit']; - $start = microtime(true); $worker = $this->getWorkerManager()->createInstance($name); - $info = $this->getWorkerManager()->getDefinition($name); - $end = time() + $time_limit; $queue = $this->getQueue($name); - $count = 0; - $remaining = $time_limit; - $lease_time = $options['lease-time'] ?? $info['cron']['time'] ?? 30; + + $start = microtime(true); + $end = time() + $options['time-limit']; + $time_remaining = $options['time-limit']; + $items_count = 0; if ($queue instanceof QueueGarbageCollectionInterface) { $queue->garbageCollection(); } - while ((!$time_limit || $remaining > 0) && (!$items_limit || $count < $items_limit) && ($item = $queue->claimItem($lease_time))) { - try { - // @phpstan-ignore-next-line - $this->logger()->info(dt('Processing item @id from @name queue.', ['@name' => $name, '@id' => $item->item_id ?? $item->qid])); - // @phpstan-ignore-next-line - $worker->processItem($item->data); - $queue->deleteItem($item); - $count++; - } catch (RequeueException) { - // The worker requested the task to be immediately requeued. - $queue->releaseItem($item); - } catch (SuspendQueueException $e) { - // If the worker indicates there is a problem with the whole queue, - // release the item. - $queue->releaseItem($item); - throw new \Exception($e->getMessage(), $e->getCode(), $e); - } catch (DelayedRequeueException $e) { - // The worker requested the task not be immediately re-queued. - // - If the queue doesn't support ::delayItem(), we should leave the - // item's current expiry time alone. - // - If the queue does support ::delayItem(), we should allow the - // queue to update the item's expiry using the requested delay. - if ($queue instanceof DelayableQueueInterface) { - // This queue can handle a custom delay; use the duration provided - // by the exception. - $queue->delayItem($item, $e->getDelay()); + do { + while (!$this->hasReachedLimit($options, $items_count, $time_remaining) && ($item = $queue->claimItem($options['lease-time']))) { + if ($this->processItem($queue, $worker, $name, $item)) { + $items_count++; } - } catch (\Exception $e) { - // In case of any other kind of exception, log it and leave the - // item in the queue to be processed again later. - $this->logger()->error($e->getMessage()); + $time_remaining = $end - time(); } - $remaining = $end - time(); - } + if ($options['daemon']) { + sleep(1); + } + } while ($options['daemon'] && !$this->hasReachedLimit($options, $items_count, $time_remaining)); + + $elapsed = microtime(true) - $start; + $this->logger()->success(dt('Processed @count items from the @name queue in @elapsed sec.', ['@count' => $items_count, '@name' => $name, '@elapsed' => round($elapsed, 2)])); + } + + #[CLI\Hook(type: HookManager::POST_INITIALIZE, target: self::RUN)] + public function postInitRun(InputInterface $input): void + { + $name = $input->getArgument('name'); + $info = $this->getWorkerManager()->getDefinition($name); + + $input->setOption('time-limit', (int) $input->getOption('time-limit')); + $input->setOption('items-limit', (int) $input->getOption('items-limit')); + $input->setOption('memory-limit', $this->parseMemoryLimit($input->getOption('memory-limit'))); + $input->setOption('lease-time', $input->getOption('lease-time') ?? $info['cron']['time'] ?? 30); + } + + /** + * Run all available queues. + */ + #[CLI\Command(name: self::RUN_ALL, aliases: ['queue-run-all'])] + #[CLI\Option(name: 'time-limit', description: 'The maximum number of seconds allowed to run the queue.')] + #[CLI\Option(name: 'items-limit', description: 'The maximum number of items allowed to run the queue.')] + #[CLI\Option(name: 'memory-limit', description: 'The maximum amount of memory the script can consume before exiting. Can be a value supported by the memory_limit PHP setting or a percentage.')] + #[CLI\Option(name: 'lease-time', description: 'The maximum number of seconds that an item remains claimed.')] + #[CLI\Option(name: 'daemon', description: 'Keep the command running indefinitely, or until one of the chosen limits has been reached.')] + public function runAll($options = ['time-limit' => self::REQ, 'items-limit' => self::REQ, 'memory-limit' => self::REQ, 'lease-time' => self::REQ, 'daemon' => false]): void + { + $start = microtime(true); + $end = time() + $options['time-limit']; + $time_remaining = $options['time-limit']; + $items_count = 0; + $queues = $this->getQueues(); + + do { + foreach ($queues as $name => $info) { + $queue = $this->getQueue($name); + $worker = $this->getWorkerManager()->createInstance($name); + $lease_time = $options['lease-time'] ?? $info['cron']['time'] ?? 30; + $queue_starting = true; + $queue_start = microtime(true); + $queue_items_count = 0; + + if ($queue instanceof QueueGarbageCollectionInterface) { + $queue->garbageCollection(); + } + + while (!$this->hasReachedLimit($options, $items_count, $time_remaining) && $item = $queue->claimItem($lease_time)) { + if ($queue_starting) { + $this->logger()->notice('Processing queue ' . $name); + } + if ($this->processItem($queue, $worker, $name, $item)) { + $queue_items_count++; + } + $time_remaining = $end - time(); + $queue_starting = false; + } + + if ($queue_items_count > 0) { + $items_count += $queue_items_count; + $elapsed = microtime(true) - $queue_start; + $this->logger()->success(dt('Processed @count items from the @name queue in @elapsed sec.', ['@count' => $queue_items_count, '@name' => $name, '@elapsed' => round($elapsed, 2)])); + } + } + if ($options['daemon']) { + sleep(1); + } + } while ($options['daemon'] && !$this->hasReachedLimit($options, $items_count, $time_remaining)); + $elapsed = microtime(true) - $start; - $this->logger()->success(dt('Processed @count items from the @name queue in @elapsed sec.', ['@count' => $count, '@name' => $name, '@elapsed' => round($elapsed, 2)])); + $this->logger()->success(dt('Processed @count items in @elapsed sec.', ['@count' => $items_count, '@elapsed' => round($elapsed, 2)])); + } + + #[CLI\Hook(type: HookManager::POST_INITIALIZE, target: self::RUN_ALL)] + public function postInitRunAll(InputInterface $input): void + { + $input->setOption('time-limit', (int) $input->getOption('time-limit')); + $input->setOption('items-limit', (int) $input->getOption('items-limit')); + $input->setOption('memory-limit', $this->parseMemoryLimit($input->getOption('memory-limit'))); + } + + protected function hasReachedLimit(array $options, int $items_count, int $time_remaining): bool + { + return ($options['time-limit'] && $time_remaining <= 0) + || ($options['items-limit'] && $items_count >= $options['items-limit']) + || ($options['memory-limit'] && memory_get_usage() >= $options['memory-limit']); + } + + protected function parseMemoryLimit(?string $value): ?int + { + if ($value === null || $value === '') { + return null; + } + + $last = strtolower($value[strlen($value)-1]); + $size = (int) rtrim($value, 'GgMmKk%'); + + switch ($last) { + case 'g': + $size *= DRUSH_KILOBYTE; + case 'm': + $size *= DRUSH_KILOBYTE; + case 'k': + $size *= DRUSH_KILOBYTE; + case '%': + $size = (int) ($size * (drush_memory_limit() / 100)); + } + + return $size; + } + + /** + * Process an item from the queue. + * + * @return bool + * TRUE if the item was processed, FALSE otherwise. + */ + protected function processItem(QueueInterface $queue, QueueWorkerInterface $worker, string $name, object $item): bool + { + try { + // @phpstan-ignore-next-line + $this->logger()->info(dt('Processing item @id from @name queue.', ['@name' => $name, '@id' => $item->item_id ?? $item->qid])); + // @phpstan-ignore-next-line + $worker->processItem($item->data); + $queue->deleteItem($item); + return true; + } catch (RequeueException) { + // The worker requested the task to be immediately requeued. + $queue->releaseItem($item); + } catch (SuspendQueueException $e) { + // If the worker indicates there is a problem with the whole queue, + // release the item. + $queue->releaseItem($item); + throw new \Exception($e->getMessage(), $e->getCode(), $e); + } catch (DelayedRequeueException $e) { + // The worker requested the task not be immediately re-queued. + // - If the queue doesn't support ::delayItem(), we should leave the + // item's current expiry time alone. + // - If the queue does support ::delayItem(), we should allow the + // queue to update the item's expiry using the requested delay. + if ($queue instanceof DelayableQueueInterface) { + // This queue can handle a custom delay; use the duration provided + // by the exception. + $queue->delayItem($item, $e->getDelay()); + } + } catch (\Exception $e) { + // In case of any other kind of exception, log it and leave the + // item in the queue to be processed again later. + $this->logger()->error($e->getMessage()); + } + + return false; } /**