diff --git a/src/Commands/core/QueueCommands.php b/src/Commands/core/QueueCommands.php index 5d7399d637..3b5fc58616 100644 --- a/src/Commands/core/QueueCommands.php +++ b/src/Commands/core/QueueCommands.php @@ -4,6 +4,7 @@ namespace Drush\Commands\core; +use Consolidation\AnnotatedCommand\Hooks\HookManager; use Consolidation\OutputFormatters\StructuredData\RowsOfFields; use Drupal\Core\Queue\DelayableQueueInterface; use Drupal\Core\Queue\DelayedRequeueException; @@ -19,6 +20,7 @@ use Drush\Commands\DrushCommands; use Symfony\Component\Console\Completion\CompletionInput; use Symfony\Component\Console\Completion\CompletionSuggestions; +use Symfony\Component\Console\Input\InputInterface; final class QueueCommands extends DrushCommands { @@ -56,21 +58,19 @@ public function getQueueService(): QueueFactory #[CLI\Argument(name: 'name', description: 'The name of the queue to run.')] #[CLI\Option(name: 'time-limit', description: 'The maximum number of seconds allowed to run the queue.')] #[CLI\Option(name: 'items-limit', description: 'The maximum number of items allowed to run the queue.')] + #[CLI\Option(name: 'memory-limit', description: 'The maximum amount of memory the script can consume before exiting. Can be a value supported by the memory_limit PHP setting or a percentage.')] #[CLI\Option(name: 'lease-time', description: 'The maximum number of seconds that an item remains claimed.')] #[CLI\Option(name: 'daemon', description: 'Keep the command running indefinitely, or until one of the chosen limits has been reached.')] #[CLI\ValidateQueueName(argumentName: 'name')] #[CLI\Complete(method_name_or_callable: 'queueComplete')] - public function run(string $name, $options = ['time-limit' => self::REQ, 'items-limit' => self::REQ, 'lease-time' => self::REQ, 'daemon' => false]): void + public function run(string $name, $options = ['time-limit' => self::REQ, 'items-limit' => self::REQ, 'memory-limit' => self::REQ, 'lease-time' => self::REQ, 'daemon' => false]): void { $worker = $this->getWorkerManager()->createInstance($name); - $info = $this->getWorkerManager()->getDefinition($name); $queue = $this->getQueue($name); - $time_limit = (int) $options['time-limit']; $start = microtime(true); - $end = time() + $time_limit; - $time_remaining = $time_limit; - $lease_time = $options['lease-time'] ?? $info['cron']['time'] ?? 30; + $end = time() + $options['time-limit']; + $time_remaining = $options['time-limit']; $items_count = 0; if ($queue instanceof QueueGarbageCollectionInterface) { @@ -78,7 +78,7 @@ public function run(string $name, $options = ['time-limit' => self::REQ, 'items- } do { - while (!$this->hasReachedLimit($options, $items_count, $time_remaining) && ($item = $queue->claimItem($lease_time))) { + while (!$this->hasReachedLimit($options, $items_count, $time_remaining) && ($item = $queue->claimItem($options['lease-time']))) { if ($this->processItem($queue, $worker, $name, $item)) { $items_count++; } @@ -93,20 +93,32 @@ public function run(string $name, $options = ['time-limit' => self::REQ, 'items- $this->logger()->success(dt('Processed @count items from the @name queue in @elapsed sec.', ['@count' => $items_count, '@name' => $name, '@elapsed' => round($elapsed, 2)])); } + #[CLI\Hook(type: HookManager::POST_INITIALIZE, target: self::RUN)] + public function postInitRun(InputInterface $input): void + { + $name = $input->getArgument('name'); + $info = $this->getWorkerManager()->getDefinition($name); + + $input->setOption('time-limit', (int) $input->getOption('time-limit')); + $input->setOption('items-limit', (int) $input->getOption('items-limit')); + $input->setOption('memory-limit', $this->parseMemoryLimit($input->getOption('memory-limit'))); + $input->setOption('lease-time', $input->getOption('lease-time') ?? $info['cron']['time'] ?? 30); + } + /** * Run all available queues. */ #[CLI\Command(name: self::RUN_ALL, aliases: ['queue-run-all'])] #[CLI\Option(name: 'time-limit', description: 'The maximum number of seconds allowed to run the queue.')] #[CLI\Option(name: 'items-limit', description: 'The maximum number of items allowed to run the queue.')] + #[CLI\Option(name: 'memory-limit', description: 'The maximum amount of memory the script can consume before exiting. Can be a value supported by the memory_limit PHP setting or a percentage.')] #[CLI\Option(name: 'lease-time', description: 'The maximum number of seconds that an item remains claimed.')] #[CLI\Option(name: 'daemon', description: 'Keep the command running indefinitely, or until one of the chosen limits has been reached.')] - public function runAll($options = ['time-limit' => self::REQ, 'items-limit' => self::REQ, 'lease-time' => self::REQ, 'daemon' => false]): void + public function runAll($options = ['time-limit' => self::REQ, 'items-limit' => self::REQ, 'memory-limit' => self::REQ, 'lease-time' => self::REQ, 'daemon' => false]): void { - $time_limit = (int) $options['time-limit']; $start = microtime(true); - $end = time() + $time_limit; - $time_remaining = $time_limit; + $end = time() + $options['time-limit']; + $time_remaining = $options['time-limit']; $items_count = 0; $queues = $this->getQueues(); @@ -149,13 +161,42 @@ public function runAll($options = ['time-limit' => self::REQ, 'items-limit' => s $this->logger()->success(dt('Processed @count items in @elapsed sec.', ['@count' => $items_count, '@elapsed' => round($elapsed, 2)])); } + #[CLI\Hook(type: HookManager::POST_INITIALIZE, target: self::RUN_ALL)] + public function postInitRunAll(InputInterface $input): void + { + $input->setOption('time-limit', (int) $input->getOption('time-limit')); + $input->setOption('items-limit', (int) $input->getOption('items-limit')); + $input->setOption('memory-limit', $this->parseMemoryLimit($input->getOption('memory-limit'))); + } + protected function hasReachedLimit(array $options, int $items_count, int $time_remaining): bool { - $time_limit = (int) $options['time-limit']; - $items_limit = (int) $options['items-limit']; + return ($options['time-limit'] && $time_remaining <= 0) + || ($options['items-limit'] && $items_count >= $options['items-limit']) + || ($options['memory-limit'] && memory_get_usage() >= $options['memory-limit']); + } + + protected function parseMemoryLimit(?string $value): ?int + { + if ($value === null || $value === '') { + return null; + } + + $last = strtolower($value[strlen($value)-1]); + $size = (int) rtrim($value, 'GgMmKk%'); + + switch ($last) { + case 'g': + $size *= DRUSH_KILOBYTE; + case 'm': + $size *= DRUSH_KILOBYTE; + case 'k': + $size *= DRUSH_KILOBYTE; + case '%': + $size = (int) ($size * (drush_memory_limit() / 100)); + } - return ($time_limit && $time_remaining <= 0) - || ($items_limit && $items_count >= $items_limit); + return $size; } /**