Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusjunges committed Dec 19, 2024
1 parent b2ff1dc commit fad0cb4
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 3 deletions.
9 changes: 7 additions & 2 deletions src/Consumers/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public function __construct(
$this->identifier = $identifier ?? str()->random();
}

public function identifier(): string
{
return $this->identifier;
}

/**
* Consume messages from a kafka topic in loop.
*
Expand Down Expand Up @@ -145,7 +150,7 @@ public function consume(): void
));

$callback = $this->whenStopConsuming;
$callback(...)();
$callback(...)($this);
}

$this->dispatcher->dispatch(new ConsumerStopped(
Expand Down Expand Up @@ -255,7 +260,7 @@ private function executeMessage(Message $message): void
$success = true;

// Dispatch an event informing that a message was consumed.
$this->dispatcher->dispatch(new MessageConsumed($consumedMessage));
$this->dispatcher->dispatch(new MessageConsumed($consumedMessage, $this->identifier()));
} catch (Throwable $throwable) {
$this->logger->error($message, $throwable);
$success = $this->handleException($throwable, $message);
Expand Down
2 changes: 2 additions & 0 deletions src/Contracts/MessageConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ interface MessageConsumer
*/
public function consume(): void;

public function identifier(): string;

/** Requests the consumer to stop after it's finished processing any messages to allow graceful exit. */
public function stopConsuming(): void;

Expand Down
3 changes: 2 additions & 1 deletion src/Events/MessageConsumed.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
final class MessageConsumed
{
public function __construct(
public readonly ConsumerMessage $message
public readonly ConsumerMessage $message,
public readonly string $consumerIdentifier,
) {
}

Expand Down

0 comments on commit fad0cb4

Please sign in to comment.