diff --git a/CHANGELOG.md b/CHANGELOG.md index d425bdb..ccdd6de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ Updates should follow the [Keep a CHANGELOG](http://keepachangelog.com/) princip ## [Unreleased][unreleased] +## 1.1.0 - 2016-09-05 + +### Added + +* Amazon SQS driver + ## 1.0.0 - 2016-09-02 ### Added diff --git a/composer.json b/composer.json index 7f448f0..c7d2ebc 100644 --- a/composer.json +++ b/composer.json @@ -30,13 +30,15 @@ "jakub-onderka/php-parallel-lint": "^0.9.0", "ext-redis": "*", "ext-zmq": "*", - "predis/predis": "*" + "predis/predis": "*", + "aws/aws-sdk-php": "2.*" }, "suggest": { "monolog/monolog": "Basic logger implements psr/logger", "ext-redis": "Allow use for redis driver with native redis php extension", "ext-zmq": "Allow use for ZeroMQ driver with native zmq php extension", "npredis/predis": "Allow use for redis driver with php package Predis", + "aws/aws-sdk-php": "Allow use Amazon SQS driver", "php-amqplib/php-amqplib": "Allow using rabbimq as driver" }, "autoload": { diff --git a/examples/sqs/emitter.php b/examples/sqs/emitter.php new file mode 100644 index 0000000..5eaa98d --- /dev/null +++ b/examples/sqs/emitter.php @@ -0,0 +1,25 @@ + 'latest', + 'region' => 'eu-west-1', + 'key' => '*key*', + 'secret' => '*secret*', +]); + +$driver = new AmazonSqsDriver($client, '*queueName*'); +$dispatcher = new Dispatcher($driver); +$counter = 1; +while (true) { + $dispatcher->emit(new Message('type1', ['message' => $counter])); + echo "Emited message $counter\n"; + $counter++; + sleep(1); +} diff --git a/examples/sqs/processor.php b/examples/sqs/processor.php new file mode 100644 index 0000000..78dd6ae --- /dev/null +++ b/examples/sqs/processor.php @@ -0,0 +1,22 @@ + 'latest', + 'region' => 'eu-west-1', + 'key' => '*key*', + 'secret' => '*secret*', +]); + +$driver = new AmazonSqsDriver($client, '*queueName*'); +$dispatcher = new Dispatcher($driver); + +$dispatcher->registerHandler('type1', new EchoHandler()); + +$dispatcher->handle(); diff --git a/src/Driver/AmazonSqsDriver.php b/src/Driver/AmazonSqsDriver.php new file mode 100644 index 0000000..6d211d9 --- /dev/null +++ b/src/Driver/AmazonSqsDriver.php @@ -0,0 +1,124 @@ + + * use Aws\Sqs\SqsClient; + * + * $client = SqsClient::factory(array( + * 'profile' => '', + * 'region' => '' + * )); + * + * + * or + * + * + * use Aws\Common\Aws; + * + * // Create a service builder using a configuration file + * $aws = Aws::factory('/path/to/my_config.json'); + * + * // Get the client from the builder by namespace + * $client = $aws->get('Sqs'); + * + * + * More examples see: https://docs.aws.amazon.com/aws-sdk-php/v2/guide/service-sqs.html + * + * + * @see examples/sqs folder + * + * @param AMQPChannel $client + * @param string $queueName + * @param array $queueAttributes + */ + public function __construct(SqsClient $client, $queueName, $queueAttributes = []) + { + $this->client = $client; + $this->queueName = $queueName; + $this->serializer = new MessageSerializer(); + + $result = $client->createQueue([ + 'QueueName' => $queueName, + 'Attributes' => $queueAttributes, + ]); + $this->queueUrl = $result->get('QueueUrl'); + } + + /** + * {@inheritdoc} + */ + public function send(MessageInterface $message) + { + $this->client->sendMessage([ + 'QueueUrl' => $this->queueUrl, + 'MessageBody' => $this->serializer->serialize($message), + ]); + } + + /** + * {@inheritdoc} + */ + public function wait(Closure $callback) + { + while (true) { + $result = $this->client->receiveMessage(array( + 'QueueUrl' => $this->queueUrl, + 'WaitTimeSeconds' => 20, + )); + + $messages = $result['Messages']; + + if ($messages) { + foreach ($messages as $message) { + $hermesMessage = $this->serializer->unserialize($message['Body']); + $callback($hermesMessage); + $result = $this->client->deleteMessage(array( + 'QueueUrl' => $this->queueUrl, + 'ReceiptHandle' => $message['ReceiptHandle'], + )); + } + } + + if ($this->sleepInterval) { + sleep($this->sleepInterval); + } + } + } +}