From 3a4f7d3fc9d0c147deb94e3eafadede9602d192b Mon Sep 17 00:00:00 2001 From: Dieter Holvoet Date: Mon, 6 Jan 2025 17:10:43 +0100 Subject: [PATCH 1/5] Add queue:run-all command --- src/Commands/core/QueueCommands.php | 106 ++++++++++++++++++++-------- 1 file changed, 75 insertions(+), 31 deletions(-) diff --git a/src/Commands/core/QueueCommands.php b/src/Commands/core/QueueCommands.php index 6e0c755619..e1ed3d58f8 100644 --- a/src/Commands/core/QueueCommands.php +++ b/src/Commands/core/QueueCommands.php @@ -10,6 +10,7 @@ use Drupal\Core\Queue\QueueFactory; use Drupal\Core\Queue\QueueGarbageCollectionInterface; use Drupal\Core\Queue\QueueInterface; +use Drupal\Core\Queue\QueueWorkerInterface; use Drupal\Core\Queue\QueueWorkerManagerInterface; use Drupal\Core\Queue\RequeueException; use Drupal\Core\Queue\SuspendQueueException; @@ -24,6 +25,7 @@ final class QueueCommands extends DrushCommands use AutowireTrait; const RUN = 'queue:run'; + const RUN_ALL = 'queue:run-all'; const LIST = 'queue:list'; const DELETE = 'queue:delete'; @@ -75,43 +77,85 @@ public function run(string $name, $options = ['time-limit' => self::REQ, 'items- } while ((!$time_limit || $remaining > 0) && (!$items_limit || $count < $items_limit) && ($item = $queue->claimItem($lease_time))) { - try { - // @phpstan-ignore-next-line - $this->logger()->info(dt('Processing item @id from @name queue.', ['@name' => $name, '@id' => $item->item_id ?? $item->qid])); - // @phpstan-ignore-next-line - $worker->processItem($item->data); - $queue->deleteItem($item); - $count++; - } catch (RequeueException) { - // The worker requested the task to be immediately requeued. - $queue->releaseItem($item); - } catch (SuspendQueueException $e) { - // If the worker indicates there is a problem with the whole queue, - // release the item. - $queue->releaseItem($item); - throw new \Exception($e->getMessage(), $e->getCode(), $e); - } catch (DelayedRequeueException $e) { - // The worker requested the task not be immediately re-queued. - // - If the queue doesn't support ::delayItem(), we should leave the - // item's current expiry time alone. - // - If the queue does support ::delayItem(), we should allow the - // queue to update the item's expiry using the requested delay. - if ($queue instanceof DelayableQueueInterface) { - // This queue can handle a custom delay; use the duration provided - // by the exception. - $queue->delayItem($item, $e->getDelay()); - } - } catch (\Exception $e) { - // In case of any other kind of exception, log it and leave the - // item in the queue to be processed again later. - $this->logger()->error($e->getMessage()); - } + $this->doRun($queue, $worker, $name, $item, $count); $remaining = $end - time(); } + $elapsed = microtime(true) - $start; $this->logger()->success(dt('Processed @count items from the @name queue in @elapsed sec.', ['@count' => $count, '@name' => $name, '@elapsed' => round($elapsed, 2)])); } + /** + * Run all available queues. + */ + #[CLI\Command(name: self::RUN_ALL, aliases: ['queue-run-all'])] + #[CLI\Option(name: 'time-limit', description: 'The maximum number of seconds allowed to run the queue.')] + #[CLI\Option(name: 'items-limit', description: 'The maximum number of items allowed to run the queue.')] + #[CLI\Option(name: 'lease-time', description: 'The maximum number of seconds that an item remains claimed.')] + public function runAll($options = ['time-limit' => self::REQ, 'items-limit' => self::REQ, 'lease-time' => self::REQ]): void + { + $time_limit = (int) $options['time-limit']; + $items_limit = (int) $options['items-limit']; + $start = microtime(true); + $end = time() + $time_limit; + $count = 0; + $remaining = $time_limit; + + $queues = $this->getQueues(); + foreach ($queues as $name => $info) { + $queue = $this->getQueue($name); + $worker = $this->getWorkerManager()->createInstance($name); + $lease_time = $options['lease-time'] ?? $info['cron']['time'] ?? 30; + + if ($queue instanceof QueueGarbageCollectionInterface) { + $queue->garbageCollection(); + } + + while ((!$time_limit || $remaining > 0) && (!$items_limit || $count < $items_limit) && ($item = $queue->claimItem($lease_time))) { + $this->doRun($queue, $worker, $name, $item, $count); + $remaining = $end - time(); + } + } + + $elapsed = microtime(true) - $start; + $this->logger()->success(dt('Processed @count items in @elapsed sec.', ['@count' => $count, '@elapsed' => round($elapsed, 2)])); + } + + protected function doRun(QueueInterface $queue, QueueWorkerInterface $worker, string $name, $item, int &$count): void + { + try { + // @phpstan-ignore-next-line + $this->logger()->info(dt('Processing item @id from @name queue.', ['@name' => $name, '@id' => $item->item_id ?? $item->qid])); + // @phpstan-ignore-next-line + $worker->processItem($item->data); + $queue->deleteItem($item); + $count++; + } catch (RequeueException) { + // The worker requested the task to be immediately requeued. + $queue->releaseItem($item); + } catch (SuspendQueueException $e) { + // If the worker indicates there is a problem with the whole queue, + // release the item. + $queue->releaseItem($item); + throw new \Exception($e->getMessage(), $e->getCode(), $e); + } catch (DelayedRequeueException $e) { + // The worker requested the task not be immediately re-queued. + // - If the queue doesn't support ::delayItem(), we should leave the + // item's current expiry time alone. + // - If the queue does support ::delayItem(), we should allow the + // queue to update the item's expiry using the requested delay. + if ($queue instanceof DelayableQueueInterface) { + // This queue can handle a custom delay; use the duration provided + // by the exception. + $queue->delayItem($item, $e->getDelay()); + } + } catch (\Exception $e) { + // In case of any other kind of exception, log it and leave the + // item in the queue to be processed again later. + $this->logger()->error($e->getMessage()); + } + } + /** * Returns a list of all defined queues. */ From f6d38e0152b1166a8e04106fdd0cc737f3725df0 Mon Sep 17 00:00:00 2001 From: Dieter Holvoet Date: Tue, 7 Jan 2025 15:06:47 +0100 Subject: [PATCH 2/5] Fix limits & improve logging in queue:run-all command --- src/Commands/core/QueueCommands.php | 90 ++++++++++++++++++++--------- 1 file changed, 62 insertions(+), 28 deletions(-) diff --git a/src/Commands/core/QueueCommands.php b/src/Commands/core/QueueCommands.php index e1ed3d58f8..b540d53ec3 100644 --- a/src/Commands/core/QueueCommands.php +++ b/src/Commands/core/QueueCommands.php @@ -61,28 +61,30 @@ public function getQueueService(): QueueFactory #[CLI\Complete(method_name_or_callable: 'queueComplete')] public function run(string $name, $options = ['time-limit' => self::REQ, 'items-limit' => self::REQ, 'lease-time' => self::REQ]): void { - $time_limit = (int) $options['time-limit']; - $items_limit = (int) $options['items-limit']; - $start = microtime(true); $worker = $this->getWorkerManager()->createInstance($name); $info = $this->getWorkerManager()->getDefinition($name); - $end = time() + $time_limit; $queue = $this->getQueue($name); - $count = 0; - $remaining = $time_limit; + + $time_limit = (int) $options['time-limit']; + $start = microtime(true); + $end = time() + $time_limit; + $time_remaining = $time_limit; $lease_time = $options['lease-time'] ?? $info['cron']['time'] ?? 30; + $items_count = 0; if ($queue instanceof QueueGarbageCollectionInterface) { $queue->garbageCollection(); } - while ((!$time_limit || $remaining > 0) && (!$items_limit || $count < $items_limit) && ($item = $queue->claimItem($lease_time))) { - $this->doRun($queue, $worker, $name, $item, $count); - $remaining = $end - time(); + while (!$this->hasReachedLimit($options, $items_count, $time_remaining) && ($item = $queue->claimItem($lease_time))) { + if ($this->processItem($queue, $worker, $name, $item)) { + $items_count++; + } + $time_remaining = $end - time(); } $elapsed = microtime(true) - $start; - $this->logger()->success(dt('Processed @count items from the @name queue in @elapsed sec.', ['@count' => $count, '@name' => $name, '@elapsed' => round($elapsed, 2)])); + $this->logger()->success(dt('Processed @count items from the @name queue in @elapsed sec.', ['@count' => $items_count, '@name' => $name, '@elapsed' => round($elapsed, 2)])); } /** @@ -95,33 +97,63 @@ public function run(string $name, $options = ['time-limit' => self::REQ, 'items- public function runAll($options = ['time-limit' => self::REQ, 'items-limit' => self::REQ, 'lease-time' => self::REQ]): void { $time_limit = (int) $options['time-limit']; - $items_limit = (int) $options['items-limit']; $start = microtime(true); $end = time() + $time_limit; - $count = 0; - $remaining = $time_limit; - + $time_remaining = $time_limit; + $items_count = 0; $queues = $this->getQueues(); - foreach ($queues as $name => $info) { - $queue = $this->getQueue($name); - $worker = $this->getWorkerManager()->createInstance($name); - $lease_time = $options['lease-time'] ?? $info['cron']['time'] ?? 30; - if ($queue instanceof QueueGarbageCollectionInterface) { - $queue->garbageCollection(); - } - - while ((!$time_limit || $remaining > 0) && (!$items_limit || $count < $items_limit) && ($item = $queue->claimItem($lease_time))) { - $this->doRun($queue, $worker, $name, $item, $count); - $remaining = $end - time(); + while (!$this->hasReachedLimit($options, $items_count, $time_remaining)) { + foreach ($queues as $name => $info) { + $queue = $this->getQueue($name); + $worker = $this->getWorkerManager()->createInstance($name); + $lease_time = $options['lease-time'] ?? $info['cron']['time'] ?? 30; + $queue_starting = true; + $queue_count = 0; + + if ($queue instanceof QueueGarbageCollectionInterface) { + $queue->garbageCollection(); + } + + while ($item = $queue->claimItem($lease_time)) { + if ($queue_starting) { + $this->logger()->notice('Processing queue ' . $name); + } + if ($this->processItem($queue, $worker, $name, $item)) { + $queue_count++; + } + $time_remaining = $end - time(); + $queue_starting = false; + } + + if ($queue_count > 0) { + $items_count += $queue_count; + $elapsed = microtime(true) - $start; + $this->logger()->success(dt('Processed @count items from the @name queue in @elapsed sec.', ['@count' => $queue_count, '@name' => $name, '@elapsed' => round($elapsed, 2)])); + } } } $elapsed = microtime(true) - $start; - $this->logger()->success(dt('Processed @count items in @elapsed sec.', ['@count' => $count, '@elapsed' => round($elapsed, 2)])); + $this->logger()->success(dt('Processed @count items in @elapsed sec.', ['@count' => $items_count, '@elapsed' => round($elapsed, 2)])); } - protected function doRun(QueueInterface $queue, QueueWorkerInterface $worker, string $name, $item, int &$count): void + protected function hasReachedLimit(array $options, int $items_count, int $time_remaining): bool + { + $time_limit = (int) $options['time-limit']; + $items_limit = (int) $options['items-limit']; + + return ($time_limit && $time_remaining <= 0) + || ($items_limit && $items_count >= $items_limit); + } + + /** + * Process an item from the queue. + * + * @return bool + * TRUE if the item was processed, FALSE otherwise. + */ + protected function processItem(QueueInterface $queue, QueueWorkerInterface $worker, string $name, object $item): bool { try { // @phpstan-ignore-next-line @@ -129,7 +161,7 @@ protected function doRun(QueueInterface $queue, QueueWorkerInterface $worker, st // @phpstan-ignore-next-line $worker->processItem($item->data); $queue->deleteItem($item); - $count++; + return true; } catch (RequeueException) { // The worker requested the task to be immediately requeued. $queue->releaseItem($item); @@ -154,6 +186,8 @@ protected function doRun(QueueInterface $queue, QueueWorkerInterface $worker, st // item in the queue to be processed again later. $this->logger()->error($e->getMessage()); } + + return false; } /** From f5842cb9cb8f5b6c496a6b44d96c641f184699b5 Mon Sep 17 00:00:00 2001 From: Dieter Holvoet Date: Tue, 7 Jan 2025 16:48:08 +0100 Subject: [PATCH 3/5] Add --daemon option --- src/Commands/core/QueueCommands.php | 30 +++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/src/Commands/core/QueueCommands.php b/src/Commands/core/QueueCommands.php index b540d53ec3..156f16e6b9 100644 --- a/src/Commands/core/QueueCommands.php +++ b/src/Commands/core/QueueCommands.php @@ -57,9 +57,10 @@ public function getQueueService(): QueueFactory #[CLI\Option(name: 'time-limit', description: 'The maximum number of seconds allowed to run the queue.')] #[CLI\Option(name: 'items-limit', description: 'The maximum number of items allowed to run the queue.')] #[CLI\Option(name: 'lease-time', description: 'The maximum number of seconds that an item remains claimed.')] + #[CLI\Option(name: 'daemon', description: 'Keep the command running indefinitely, or until one of the chosen limits has been reached.')] #[CLI\ValidateQueueName(argumentName: 'name')] #[CLI\Complete(method_name_or_callable: 'queueComplete')] - public function run(string $name, $options = ['time-limit' => self::REQ, 'items-limit' => self::REQ, 'lease-time' => self::REQ]): void + public function run(string $name, $options = ['time-limit' => self::REQ, 'items-limit' => self::REQ, 'lease-time' => self::REQ, 'daemon' => false]): void { $worker = $this->getWorkerManager()->createInstance($name); $info = $this->getWorkerManager()->getDefinition($name); @@ -76,12 +77,17 @@ public function run(string $name, $options = ['time-limit' => self::REQ, 'items- $queue->garbageCollection(); } - while (!$this->hasReachedLimit($options, $items_count, $time_remaining) && ($item = $queue->claimItem($lease_time))) { - if ($this->processItem($queue, $worker, $name, $item)) { - $items_count++; + do { + while (!$this->hasReachedLimit($options, $items_count, $time_remaining) && ($item = $queue->claimItem($lease_time))) { + if ($this->processItem($queue, $worker, $name, $item)) { + $items_count++; + } + $time_remaining = $end - time(); } - $time_remaining = $end - time(); - } + if ($options['daemon']) { + sleep(1); + } + } while ($options['daemon'] && !$this->hasReachedLimit($options, $items_count, $time_remaining)); $elapsed = microtime(true) - $start; $this->logger()->success(dt('Processed @count items from the @name queue in @elapsed sec.', ['@count' => $items_count, '@name' => $name, '@elapsed' => round($elapsed, 2)])); @@ -94,7 +100,8 @@ public function run(string $name, $options = ['time-limit' => self::REQ, 'items- #[CLI\Option(name: 'time-limit', description: 'The maximum number of seconds allowed to run the queue.')] #[CLI\Option(name: 'items-limit', description: 'The maximum number of items allowed to run the queue.')] #[CLI\Option(name: 'lease-time', description: 'The maximum number of seconds that an item remains claimed.')] - public function runAll($options = ['time-limit' => self::REQ, 'items-limit' => self::REQ, 'lease-time' => self::REQ]): void + #[CLI\Option(name: 'daemon', description: 'Keep the command running indefinitely, or until one of the chosen limits has been reached.')] + public function runAll($options = ['time-limit' => self::REQ, 'items-limit' => self::REQ, 'lease-time' => self::REQ, 'daemon' => false]): void { $time_limit = (int) $options['time-limit']; $start = microtime(true); @@ -103,7 +110,7 @@ public function runAll($options = ['time-limit' => self::REQ, 'items-limit' => s $items_count = 0; $queues = $this->getQueues(); - while (!$this->hasReachedLimit($options, $items_count, $time_remaining)) { + do { foreach ($queues as $name => $info) { $queue = $this->getQueue($name); $worker = $this->getWorkerManager()->createInstance($name); @@ -115,7 +122,7 @@ public function runAll($options = ['time-limit' => self::REQ, 'items-limit' => s $queue->garbageCollection(); } - while ($item = $queue->claimItem($lease_time)) { + while (!$this->hasReachedLimit($options, $items_count, $time_remaining) && $item = $queue->claimItem($lease_time)) { if ($queue_starting) { $this->logger()->notice('Processing queue ' . $name); } @@ -132,7 +139,10 @@ public function runAll($options = ['time-limit' => self::REQ, 'items-limit' => s $this->logger()->success(dt('Processed @count items from the @name queue in @elapsed sec.', ['@count' => $queue_count, '@name' => $name, '@elapsed' => round($elapsed, 2)])); } } - } + if ($options['daemon']) { + sleep(1); + } + } while ($options['daemon'] && !$this->hasReachedLimit($options, $items_count, $time_remaining)); $elapsed = microtime(true) - $start; $this->logger()->success(dt('Processed @count items in @elapsed sec.', ['@count' => $items_count, '@elapsed' => round($elapsed, 2)])); From d31ca6571c4460d3f6ab6fe195abebf0701430b7 Mon Sep 17 00:00:00 2001 From: Dieter Holvoet Date: Tue, 7 Jan 2025 17:04:56 +0100 Subject: [PATCH 4/5] Fix elapsed seconds for single queue --- src/Commands/core/QueueCommands.php | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/Commands/core/QueueCommands.php b/src/Commands/core/QueueCommands.php index 156f16e6b9..5d7399d637 100644 --- a/src/Commands/core/QueueCommands.php +++ b/src/Commands/core/QueueCommands.php @@ -116,7 +116,8 @@ public function runAll($options = ['time-limit' => self::REQ, 'items-limit' => s $worker = $this->getWorkerManager()->createInstance($name); $lease_time = $options['lease-time'] ?? $info['cron']['time'] ?? 30; $queue_starting = true; - $queue_count = 0; + $queue_start = microtime(true); + $queue_items_count = 0; if ($queue instanceof QueueGarbageCollectionInterface) { $queue->garbageCollection(); @@ -127,16 +128,16 @@ public function runAll($options = ['time-limit' => self::REQ, 'items-limit' => s $this->logger()->notice('Processing queue ' . $name); } if ($this->processItem($queue, $worker, $name, $item)) { - $queue_count++; + $queue_items_count++; } $time_remaining = $end - time(); $queue_starting = false; } - if ($queue_count > 0) { - $items_count += $queue_count; - $elapsed = microtime(true) - $start; - $this->logger()->success(dt('Processed @count items from the @name queue in @elapsed sec.', ['@count' => $queue_count, '@name' => $name, '@elapsed' => round($elapsed, 2)])); + if ($queue_items_count > 0) { + $items_count += $queue_items_count; + $elapsed = microtime(true) - $queue_start; + $this->logger()->success(dt('Processed @count items from the @name queue in @elapsed sec.', ['@count' => $queue_items_count, '@name' => $name, '@elapsed' => round($elapsed, 2)])); } } if ($options['daemon']) { From 68005fbc53995ae05f8352cada3da05aa06b38b5 Mon Sep 17 00:00:00 2001 From: Dieter Holvoet Date: Tue, 7 Jan 2025 18:01:23 +0100 Subject: [PATCH 5/5] Add --memory-limit option --- src/Commands/core/QueueCommands.php | 71 +++++++++++++++++++++++------ 1 file changed, 56 insertions(+), 15 deletions(-) diff --git a/src/Commands/core/QueueCommands.php b/src/Commands/core/QueueCommands.php index 5d7399d637..3b5fc58616 100644 --- a/src/Commands/core/QueueCommands.php +++ b/src/Commands/core/QueueCommands.php @@ -4,6 +4,7 @@ namespace Drush\Commands\core; +use Consolidation\AnnotatedCommand\Hooks\HookManager; use Consolidation\OutputFormatters\StructuredData\RowsOfFields; use Drupal\Core\Queue\DelayableQueueInterface; use Drupal\Core\Queue\DelayedRequeueException; @@ -19,6 +20,7 @@ use Drush\Commands\DrushCommands; use Symfony\Component\Console\Completion\CompletionInput; use Symfony\Component\Console\Completion\CompletionSuggestions; +use Symfony\Component\Console\Input\InputInterface; final class QueueCommands extends DrushCommands { @@ -56,21 +58,19 @@ public function getQueueService(): QueueFactory #[CLI\Argument(name: 'name', description: 'The name of the queue to run.')] #[CLI\Option(name: 'time-limit', description: 'The maximum number of seconds allowed to run the queue.')] #[CLI\Option(name: 'items-limit', description: 'The maximum number of items allowed to run the queue.')] + #[CLI\Option(name: 'memory-limit', description: 'The maximum amount of memory the script can consume before exiting. Can be a value supported by the memory_limit PHP setting or a percentage.')] #[CLI\Option(name: 'lease-time', description: 'The maximum number of seconds that an item remains claimed.')] #[CLI\Option(name: 'daemon', description: 'Keep the command running indefinitely, or until one of the chosen limits has been reached.')] #[CLI\ValidateQueueName(argumentName: 'name')] #[CLI\Complete(method_name_or_callable: 'queueComplete')] - public function run(string $name, $options = ['time-limit' => self::REQ, 'items-limit' => self::REQ, 'lease-time' => self::REQ, 'daemon' => false]): void + public function run(string $name, $options = ['time-limit' => self::REQ, 'items-limit' => self::REQ, 'memory-limit' => self::REQ, 'lease-time' => self::REQ, 'daemon' => false]): void { $worker = $this->getWorkerManager()->createInstance($name); - $info = $this->getWorkerManager()->getDefinition($name); $queue = $this->getQueue($name); - $time_limit = (int) $options['time-limit']; $start = microtime(true); - $end = time() + $time_limit; - $time_remaining = $time_limit; - $lease_time = $options['lease-time'] ?? $info['cron']['time'] ?? 30; + $end = time() + $options['time-limit']; + $time_remaining = $options['time-limit']; $items_count = 0; if ($queue instanceof QueueGarbageCollectionInterface) { @@ -78,7 +78,7 @@ public function run(string $name, $options = ['time-limit' => self::REQ, 'items- } do { - while (!$this->hasReachedLimit($options, $items_count, $time_remaining) && ($item = $queue->claimItem($lease_time))) { + while (!$this->hasReachedLimit($options, $items_count, $time_remaining) && ($item = $queue->claimItem($options['lease-time']))) { if ($this->processItem($queue, $worker, $name, $item)) { $items_count++; } @@ -93,20 +93,32 @@ public function run(string $name, $options = ['time-limit' => self::REQ, 'items- $this->logger()->success(dt('Processed @count items from the @name queue in @elapsed sec.', ['@count' => $items_count, '@name' => $name, '@elapsed' => round($elapsed, 2)])); } + #[CLI\Hook(type: HookManager::POST_INITIALIZE, target: self::RUN)] + public function postInitRun(InputInterface $input): void + { + $name = $input->getArgument('name'); + $info = $this->getWorkerManager()->getDefinition($name); + + $input->setOption('time-limit', (int) $input->getOption('time-limit')); + $input->setOption('items-limit', (int) $input->getOption('items-limit')); + $input->setOption('memory-limit', $this->parseMemoryLimit($input->getOption('memory-limit'))); + $input->setOption('lease-time', $input->getOption('lease-time') ?? $info['cron']['time'] ?? 30); + } + /** * Run all available queues. */ #[CLI\Command(name: self::RUN_ALL, aliases: ['queue-run-all'])] #[CLI\Option(name: 'time-limit', description: 'The maximum number of seconds allowed to run the queue.')] #[CLI\Option(name: 'items-limit', description: 'The maximum number of items allowed to run the queue.')] + #[CLI\Option(name: 'memory-limit', description: 'The maximum amount of memory the script can consume before exiting. Can be a value supported by the memory_limit PHP setting or a percentage.')] #[CLI\Option(name: 'lease-time', description: 'The maximum number of seconds that an item remains claimed.')] #[CLI\Option(name: 'daemon', description: 'Keep the command running indefinitely, or until one of the chosen limits has been reached.')] - public function runAll($options = ['time-limit' => self::REQ, 'items-limit' => self::REQ, 'lease-time' => self::REQ, 'daemon' => false]): void + public function runAll($options = ['time-limit' => self::REQ, 'items-limit' => self::REQ, 'memory-limit' => self::REQ, 'lease-time' => self::REQ, 'daemon' => false]): void { - $time_limit = (int) $options['time-limit']; $start = microtime(true); - $end = time() + $time_limit; - $time_remaining = $time_limit; + $end = time() + $options['time-limit']; + $time_remaining = $options['time-limit']; $items_count = 0; $queues = $this->getQueues(); @@ -149,13 +161,42 @@ public function runAll($options = ['time-limit' => self::REQ, 'items-limit' => s $this->logger()->success(dt('Processed @count items in @elapsed sec.', ['@count' => $items_count, '@elapsed' => round($elapsed, 2)])); } + #[CLI\Hook(type: HookManager::POST_INITIALIZE, target: self::RUN_ALL)] + public function postInitRunAll(InputInterface $input): void + { + $input->setOption('time-limit', (int) $input->getOption('time-limit')); + $input->setOption('items-limit', (int) $input->getOption('items-limit')); + $input->setOption('memory-limit', $this->parseMemoryLimit($input->getOption('memory-limit'))); + } + protected function hasReachedLimit(array $options, int $items_count, int $time_remaining): bool { - $time_limit = (int) $options['time-limit']; - $items_limit = (int) $options['items-limit']; + return ($options['time-limit'] && $time_remaining <= 0) + || ($options['items-limit'] && $items_count >= $options['items-limit']) + || ($options['memory-limit'] && memory_get_usage() >= $options['memory-limit']); + } + + protected function parseMemoryLimit(?string $value): ?int + { + if ($value === null || $value === '') { + return null; + } + + $last = strtolower($value[strlen($value)-1]); + $size = (int) rtrim($value, 'GgMmKk%'); + + switch ($last) { + case 'g': + $size *= DRUSH_KILOBYTE; + case 'm': + $size *= DRUSH_KILOBYTE; + case 'k': + $size *= DRUSH_KILOBYTE; + case '%': + $size = (int) ($size * (drush_memory_limit() / 100)); + } - return ($time_limit && $time_remaining <= 0) - || ($items_limit && $items_count >= $items_limit); + return $size; } /**