Skip to content

Commit

Permalink
Stomp hotfix 240517 (#37)
Browse files Browse the repository at this point in the history
* fixes

* typo

* Apply fixes from StyleCI

---------

Co-authored-by: StyleCI Bot <[email protected]>
  • Loading branch information
ngaspari and StyleCIBot authored May 20, 2024
1 parent 7385a3e commit 31a0fee
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 11 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ topic::queue1;topic::queue2 <-- will read from queue1 and queue2 on the topic

Subscribing with client acknowledgement option (ENV variables):

STOMP_CONSUMER_WIN_SIZE=1024 // number of bytes that Broker will send to client before it expects ACK
```
STOMP_CONSUMER_WIN_SIZE=819200 // number of bytes that Broker will send to client before it expects ACK
STOMP_CONSUMER_ACK_MODE=client // mode: client (ACK needs to be sent) | auto (no ACK, and window-size has to be -1 in that case)

```

You can see all other available ``.env`` variables, their defaults and usage explanation within
the [config file](config/stomp.php).
Expand Down
2 changes: 1 addition & 1 deletion config/stomp.php
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
* Once that happens the client will not receive any more messages until it sends the appropriate ACK or NACK
* frame for the messages it already has.
*/
'consumer_window_size' => env('STOMP_CONSUMER_WIN_SIZE', 1048576),
'consumer_window_size' => env('STOMP_CONSUMER_WIN_SIZE', 819200),

/**
* Subscribe mode: auto, client.
Expand Down
34 changes: 28 additions & 6 deletions src/Queue/Jobs/StompJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Asseco\Stomp\Queue\Stomp\Config;
use Asseco\Stomp\Queue\StompQueue;
use Illuminate\Broadcasting\BroadcastEvent;
use Illuminate\Container\Container;
use Illuminate\Contracts\Queue\Job as JobContract;
use Illuminate\Queue\Jobs\Job;
Expand Down Expand Up @@ -112,13 +113,36 @@ public function fire()

protected function isNativeLaravelJob(): bool
{
return array_key_exists('job', $this->payload);
$job = Arr::get($this->payload, 'job');

return $job && str_contains($job, 'CallQueuedHandler@call');
}

protected function laravelJobClassExists(): bool
{
$eventClassName = Arr::get($this->payload, 'displayName');
if ($eventClassName) {
return class_exists($eventClassName);
} else {
$command = Arr::get($this->payload, 'data.command');
$command = $command ?? unserialize($command);
/** @var BroadcastEvent $command */
if ($command & $command->event && class_exists(get_class($command->event))) {
return true;
}
}

return false;
}

protected function fireLaravelJob(): void
{
[$class, $method] = JobName::parse($this->payload['job']);
($this->instance = $this->resolve($class))->{$method}($this, $this->payload['data']);
if ($this->laravelJobClassExists()) {
[$class, $method] = JobName::parse($this->payload['job']);
($this->instance = $this->resolve($class))->{$method}($this, $this->payload['data']);
} else {
$this->log->error("$this->session [STOMP] Laravel job class does not exist!");
}
}

protected function fireExternalJob(): void
Expand Down Expand Up @@ -241,8 +265,6 @@ protected function failed($e)

protected function ackIfNecessary()
{
if (Config::get('consumer_ack_mode') == StompQueue::ACK_MODE_CLIENT && $this->frame) {
$this->stompQueue->client->ack($this->frame);
}
$this->stompQueue->ackLastFrameIfNecessary();
}
}
7 changes: 5 additions & 2 deletions src/Queue/StompQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,13 @@ public function pop($queue = null)

if (!$queueFromFrame) {
$this->log->error("$this->session [STOMP] Wrong frame received. Expected MESSAGE, got: " . print_r($frame, true));
$this->_lastFrame = null;

return null;
}

$this->_lastFrame = $frame;

return new StompJob($this->container, $this, $frame, $queueFromFrame);
}

Expand Down Expand Up @@ -500,7 +503,7 @@ protected function subscribeToQueues(): void
continue;
}

$winSize = Config::get('consumer_window_size') ?? 512000;
$winSize = Config::get('consumer_window_size') ?? 819200;
if ($this->_ackMode != self::ACK_MODE_CLIENT) {
$winSize = -1;
}
Expand All @@ -521,7 +524,7 @@ protected function subscribeToQueues(): void
*
* @return void
*/
protected function ackLastFrameIfNecessary()
public function ackLastFrameIfNecessary()
{
if ($this->_ackMode == self::ACK_MODE_CLIENT && $this->_lastFrame) {
$this->client->ack($this->_lastFrame);
Expand Down

0 comments on commit 31a0fee

Please sign in to comment.