Skip to content

Commit

Permalink
Add --memory-limit option
Browse files Browse the repository at this point in the history
  • Loading branch information
DieterHolvoet committed Jan 7, 2025
1 parent f76813a commit f22d429
Showing 1 changed file with 56 additions and 15 deletions.
71 changes: 56 additions & 15 deletions src/Commands/core/QueueCommands.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Drush\Commands\core;

use Consolidation\AnnotatedCommand\Hooks\HookManager;
use Consolidation\OutputFormatters\StructuredData\RowsOfFields;
use Drupal\Core\Queue\DelayableQueueInterface;
use Drupal\Core\Queue\DelayedRequeueException;
Expand All @@ -19,6 +20,7 @@
use Drush\Commands\DrushCommands;
use Symfony\Component\Console\Completion\CompletionInput;
use Symfony\Component\Console\Completion\CompletionSuggestions;
use Symfony\Component\Console\Input\InputInterface;

final class QueueCommands extends DrushCommands
{
Expand Down Expand Up @@ -56,29 +58,27 @@ public function getQueueService(): QueueFactory
#[CLI\Argument(name: 'name', description: 'The name of the queue to run.')]
#[CLI\Option(name: 'time-limit', description: 'The maximum number of seconds allowed to run the queue.')]
#[CLI\Option(name: 'items-limit', description: 'The maximum number of items allowed to run the queue.')]
#[CLI\Option(name: 'memory-limit', description: 'The maximum amount of memory the script can consume before exiting. Can be a value supported by the memory_limit PHP setting or a percentage.')]
#[CLI\Option(name: 'lease-time', description: 'The maximum number of seconds that an item remains claimed.')]
#[CLI\Option(name: 'daemon', description: 'Keep the command running indefinitely, or until one of the chosen limits has been reached.')]
#[CLI\ValidateQueueName(argumentName: 'name')]
#[CLI\Complete(method_name_or_callable: 'queueComplete')]
public function run(string $name, $options = ['time-limit' => self::REQ, 'items-limit' => self::REQ, 'lease-time' => self::REQ, 'daemon' => false]): void
public function run(string $name, $options = ['time-limit' => self::REQ, 'items-limit' => self::REQ, 'memory-limit' => self::REQ, 'lease-time' => self::REQ, 'daemon' => false]): void
{
$worker = $this->getWorkerManager()->createInstance($name);
$info = $this->getWorkerManager()->getDefinition($name);
$queue = $this->getQueue($name);

$time_limit = (int) $options['time-limit'];
$start = microtime(true);
$end = time() + $time_limit;
$time_remaining = $time_limit;
$lease_time = $options['lease-time'] ?? $info['cron']['time'] ?? 30;
$end = time() + $options['time-limit'];
$time_remaining = $options['time-limit'];
$items_count = 0;

if ($queue instanceof QueueGarbageCollectionInterface) {
$queue->garbageCollection();
}

do {
while (!$this->hasReachedLimit($options, $items_count, $time_remaining) && ($item = $queue->claimItem($lease_time))) {
while (!$this->hasReachedLimit($options, $items_count, $time_remaining) && ($item = $queue->claimItem($options['lease-time']))) {
if ($this->processItem($queue, $worker, $name, $item)) {
$items_count++;
}
Expand All @@ -93,20 +93,32 @@ public function run(string $name, $options = ['time-limit' => self::REQ, 'items-
$this->logger()->success(dt('Processed @count items from the @name queue in @elapsed sec.', ['@count' => $items_count, '@name' => $name, '@elapsed' => round($elapsed, 2)]));
}

#[CLI\Hook(type: HookManager::POST_INITIALIZE, target: self::RUN)]
public function postInitRun(InputInterface $input): void
{
$name = $input->getArgument('name');
$info = $this->getWorkerManager()->getDefinition($name);

$input->setOption('time-limit', (int) $input->getOption('time-limit'));
$input->setOption('items-limit', (int) $input->getOption('items-limit'));
$input->setOption('memory-limit', $this->parseMemoryLimit($input->getOption('memory-limit')));
$input->setOption('lease-time', $input->getOption('lease-time') ?? $info['cron']['time'] ?? 30);
}

/**
* Run all available queues.
*/
#[CLI\Command(name: self::RUN_ALL, aliases: ['queue-run-all'])]
#[CLI\Option(name: 'time-limit', description: 'The maximum number of seconds allowed to run the queue.')]
#[CLI\Option(name: 'items-limit', description: 'The maximum number of items allowed to run the queue.')]
#[CLI\Option(name: 'memory-limit', description: 'The maximum amount of memory the script can consume before exiting. Can be a value supported by the memory_limit PHP setting or a percentage.')]
#[CLI\Option(name: 'lease-time', description: 'The maximum number of seconds that an item remains claimed.')]
#[CLI\Option(name: 'daemon', description: 'Keep the command running indefinitely, or until one of the chosen limits has been reached.')]
public function runAll($options = ['time-limit' => self::REQ, 'items-limit' => self::REQ, 'lease-time' => self::REQ, 'daemon' => false]): void
public function runAll($options = ['time-limit' => self::REQ, 'items-limit' => self::REQ, 'memory-limit' => self::REQ, 'lease-time' => self::REQ, 'daemon' => false]): void
{
$time_limit = (int) $options['time-limit'];
$start = microtime(true);
$end = time() + $time_limit;
$time_remaining = $time_limit;
$end = time() + $options['time-limit'];
$time_remaining = $options['time-limit'];
$items_count = 0;
$queues = $this->getQueues();

Expand Down Expand Up @@ -149,13 +161,42 @@ public function runAll($options = ['time-limit' => self::REQ, 'items-limit' => s
$this->logger()->success(dt('Processed @count items in @elapsed sec.', ['@count' => $items_count, '@elapsed' => round($elapsed, 2)]));
}

#[CLI\Hook(type: HookManager::POST_INITIALIZE, target: self::RUN_ALL)]
public function postInitRunAll(InputInterface $input): void
{
$input->setOption('time-limit', (int) $input->getOption('time-limit'));
$input->setOption('items-limit', (int) $input->getOption('items-limit'));
$input->setOption('memory-limit', $this->parseMemoryLimit($input->getOption('memory-limit')));
}

protected function hasReachedLimit(array $options, int $items_count, int $time_remaining): bool
{
$time_limit = (int) $options['time-limit'];
$items_limit = (int) $options['items-limit'];
return ($options['time-limit'] && $time_remaining <= 0)
|| ($options['items-limit'] && $items_count >= $options['items-limit'])
|| ($options['memory-limit'] && memory_get_usage() >= $options['memory-limit']);
}

protected function parseMemoryLimit(?string $value): ?int
{
if ($value === null || $value === '') {
return null;
}

$last = strtolower($value[strlen($value)-1]);
$size = (int) rtrim($value, 'GgMmKk%');

switch ($last) {
case 'g':
$size *= DRUSH_KILOBYTE;
case 'm':
$size *= DRUSH_KILOBYTE;
case 'k':
$size *= DRUSH_KILOBYTE;
case '%':
$size = (int) ($size * (drush_memory_limit() / 100));
}

return ($time_limit && $time_remaining <= 0)
|| ($items_limit && $items_count >= $items_limit);
return $size;
}

/**
Expand Down

0 comments on commit f22d429

Please sign in to comment.