diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..622e165 --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +/.idea/ +/.vs/ +/.vscode/ +/vendor/ +/composer.lock +/.phpunit.result.cache +/nbproject/private/ +/*.log \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..070b1cd --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2023 InitPHP + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..01ffdac --- /dev/null +++ b/README.md @@ -0,0 +1,159 @@ +# InitPHP Queue + +This library offers performance and asynchrony by queuing your jobs to be done later. + +``` +composer require initphp/queue +``` + +## Create Job + +First, start by creating the business class. You can find a simple example below. + +```php +require_once __DIR__ . DIRECTORY_SEPARATOR . 'vendor/autoload.php'; + +namespace App\Jobs; + +use InitPHP\Queue\Job; + +class MailJob extends Job +{ + protected string $channel = 'mailChannel'; + + protected string $queue = 'mailQueue'; + + public function handle(): bool + { + $payload = $this->getPayload(); + try { + if (mail($payload['to'], $payload['subject'])) { + return true; + } else { + return false;; + } + } catch (\Throwable $e) { + return false; + } + } +} +``` + +Use the `push()` method to add jobs to the queue; + +```php +require_once __DIR__ . DIRECTORY_SEPARATOR . 'vendor/autoload.php'; +$adapter = new \InitPHP\Queue\Adapters\RabbitMQAdapter('127.0.0.1', 5267, 'guest', 'guest'); + +$job = new App\Jobs\MailJob($adapter); + +// Add Queue Job +$job->push([ + 'to' => 'to@example.com', + 'subject' => 'Subject Mail', +]); +``` + +Write your code to handle the jobs in the queue. + +`consumer.php` + +```php +require_once __DIR__ . DIRECTORY_SEPARATOR . 'vendor/autoload.php'; +$adapter = new \InitPHP\Queue\Adapters\RabbitMQAdapter('127.0.0.1', 5267, 'guest', 'guest'); + +$adapter->handle('mailChannel', 'mailQueue'); + +$adapter->close(); +``` + +Trigger your consumer code. + +``` +php consumer.php +``` + + +# Adapters +- [x] [PDO (Database) Adapter](#pdo-adapter) +- [x] [RabbitMQ Adapter](#rabbitmq-adapter) +- [ ] Kafka Adapter + +## PDO Adapter + +- [x] PDO Extension + +To initialize the PDO adapter, you need a PDO object and 2 tables. + +```php +$pdo = new PDO('mysql:host=localhost;port=3307;dbname=queue_db', 'root', 'root'); +$adapter = new \InitPHP\Queue\Adapters\PDOAdapter($pdo, 'queue'); +``` + +The first of these tables is used for those waiting in line and the other for jobs that have errors. The table name into which the failed jobs fall is obtained by adding "`_failed`" as a suffix to the main table name. Accordingly, create your queue tables using the following SQL. + +```sql +CREATE TABLE `queue` ( + `id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, + `channel` VARCHAR(255) NOT NULL, + `queue` VARCHAR(255) NOT NULL, + `payload` TEXT NULL DEFAULT NULL, + `created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + `updated_at` DATETIME NULL DEFAULT NULL, + `status` TINYINT(1) NOT NULL DEFAULT '0', + PRIMARY KEY (`id`) +) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; +CREATE INDEX `channel_queue` ON `queue` (`channel`, `queue`); + +CREATE TABLE `queue_failed` ( + `id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, + `queue_id` BIGINT UNSIGNED NOT NULL, + `channel` VARCHAR(255) NOT NULL, + `queue` VARCHAR(255) NOT NULL, + `payload` TEXT NULL DEFAULT NULL, + `created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + `updated_at` DATETIME NULL DEFAULT NULL, + `status` TINYINT(1) NOT NULL DEFAULT '0', + PRIMARY KEY (`id`) +) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; +``` + +## RabbitMQ Adapter + +- [x] RabbitMQ Server +- [x] "`php-amqplib/php-amqplib`" Library + +``` +composer require php-amqplib/php-amqplib +``` + +```php +$adapter = new \InitPHP\Queue\Adapters\RabbitMQAdapter('127.0.0.1', 5267, 'guest', 'guest'); +``` + +# Getting Involved + +> All contributions to this project will be published under the MIT License. By submitting a pull request or filing a bug, issue, or feature request, you are agreeing to comply with this waiver of copyright interest. + +There are two primary ways to help: + +- Using the issue tracker, and +- Changing the code-base. + +## Using the issue tracker + +Use the issue tracker to suggest feature requests, report bugs, and ask questions. This is also a great way to connect with the developers of the project as well as others who are interested in this solution. + +Use the issue tracker to find ways to contribute. Find a bug or a feature, mention in the issue that you will take on that effort, then follow the Changing the code-base guidance below. + +## Changing the code-base + +Generally speaking, you should fork this repository, make changes in your own fork, and then submit a pull request. All new code should have associated unit tests that validate implemented features and the presence or lack of defects. Additionally, the code should follow any stylistic and architectural guidelines prescribed by the project. In the absence of such guidelines, mimic the styles and patterns in the existing code-base. + +# Credits + +- [Muhammet ŞAFAK](https://www.muhammetsafak.com.tr) <> + +# License + +Copyright © 2022 [MIT License](./LICENSE) diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..fe23d51 --- /dev/null +++ b/composer.json @@ -0,0 +1,24 @@ +{ + "name": "initphp/queue", + "description": "InitPHP Queue Library", + "type": "library", + "license": "MIT", + "autoload": { + "psr-4": { + "InitPHP\\Queue\\": "src/" + } + }, + "authors": [ + { + "name": "Muhammet ŞAFAK", + "email": "info@muhammetsafak.com.tr", + "role": "Developer", + "homepage": "https://www.muhammetsafak.com.tr" + } + ], + "minimum-stability": "stable", + "require": { + "php": ">=7.4", + "ext-json": "*" + } +} diff --git a/src/Adapters/PDOAdapter.php b/src/Adapters/PDOAdapter.php new file mode 100644 index 0000000..7d0f152 --- /dev/null +++ b/src/Adapters/PDOAdapter.php @@ -0,0 +1,156 @@ + + * @copyright Copyright © 2023 Muhammet ŞAFAK + * @license ./LICENSE MIT + * @version 1.0 + * @link https://www.muhammetsafak.com.tr + */ + +namespace InitPHP\Queue\Adapters; + +use PDO; +use Throwable; +use InitPHP\Queue\Interfaces\AdapterInterface; +use InitPHP\Queue\Interfaces\JobInterface; + +class PDOAdapter implements AdapterInterface +{ + + private ?PDO $pdo; + + private string $table; + + public function __construct(PDO $pdo, string $table = 'queue') + { + $this->pdo = $pdo; + $this->table = $table; + } + + /** + * @param object $message + * @return bool + */ + public function worker(object $message): bool + { + try { + $payload = json_decode($message->payload, true); + $jobClass = $payload['jobClass']; + $jobObj = $jobClass($this); + if (!($jobObj instanceof JobInterface)) { + return false; + } + $jobObj->setPayload($payload['payload']) + ->setId($message->id); + + return $jobObj->handle() ? $jobObj->ack() : $jobObj->nack(); + } catch (Throwable $e) { + return false; + } + } + + /** + * @inheritDoc + */ + public function handle(string $channel, string $queue): bool + { + try { + + do { + $stmt = $this->pdo->prepare("SELECT * FROM " . $this->table . " WHERE channel = :channel AND queue = :queue AND status = 0 LIMIT 0, 1"); + if (!$stmt) { + return false; + } + $stmt->bindValue(':channel', $channel); + $stmt->bindValue(':queue', $queue); + if (!$stmt->execute()) { + return false; + } + if ($stmt->rowCount() < 1) { + return false; + } + $stmt->setFetchMode(PDO::FETCH_OBJ); + $res = $stmt->fetch(); + + $update = $this->pdo->prepare("UPDATE " . $this->table . " SET status = 1, updated_at = :updated_at WHERE id = :id"); + $update->bindValue(':id', $res->id, PDO::PARAM_INT); + $update->bindValue(':updated_at', date("Y-m-d H:i:s")); + $update->execute(); + + $this->worker($res); + } while (true); + } catch (Throwable $e) { + return false; + } + } + + /** + * @inheritDoc + */ + public function push(string $channel, string $queue, JobInterface $job): bool + { + try { + $insert = $this->pdo->prepare("INSERT INTO " . $this->table . " (channel, queue, payload, created_at, updated_at, status) VALUES (:channel, :queue, :payload, :created_at, NULL, 0);"); + + $insert->bindValue(':channel', $channel); + $insert->bindValue(':queue', $queue); + $insert->bindValue(':payload', json_encode(['jobClass' => get_class($job), 'payload' => $job->getPayload()], JSON_UNESCAPED_SLASHES)); + $insert->bindValue(':created_at', date("Y-m-d H:i:s")); + return $insert->execute(); + } catch (Throwable $e) { + return false; + } + } + + /** + * @inheritDoc + */ + public function ack($id, ?string $message = null): bool + { + try { + $delete = $this->pdo->prepare("DELETE FROM " . $this->table . " WHERE id = :id"); + $delete->bindValue(':id', $id); + $delete->execute(); + } catch (Throwable $e) { + return false; + } + + return true; + } + + /** + * @inheritDoc + */ + public function nack($id, ?string $message = null): bool + { + try { + $this->pdo->beginTransaction(); + $insert = $this->pdo->prepare("INSERT INTO " . $this->table . "_failed (queue_id, channel, queue, payload, created_at, updated_at, status) SELECT * FROM " . $this->table . " WHERE id = :id"); + $insert->bindValue(':id', $id); + $insert->execute(); + $delete = $this->pdo->prepare("DELETE FROM " . $this->table . " WHERE id = :id"); + $delete->bindValue(':id', $id); + $delete->execute(); + + return $this->pdo->commit(); + } catch (Throwable $e) { + $this->pdo->rollBack(); + return false; + } + } + + /** + * @inheritDoc + */ + public function close(): bool + { + $this->pdo = null; + + return true; + } + +} diff --git a/src/Adapters/RabbitMQAdapter.php b/src/Adapters/RabbitMQAdapter.php new file mode 100644 index 0000000..7a1dceb --- /dev/null +++ b/src/Adapters/RabbitMQAdapter.php @@ -0,0 +1,163 @@ + + * @copyright Copyright © 2023 Muhammet ŞAFAK + * @license ./LICENSE MIT + * @version 1.0 + * @link https://www.muhammetsafak.com.tr + */ + +declare(strict_types=1); +namespace InitPHP\Queue\Adapters; + +use Throwable; +use InitPHP\Queue\Exceptions\JobException; +use PhpAmqpLib\Channel\AMQPChannel; +use PhpAmqpLib\Message\AMQPMessage; +use InitPHP\Queue\Interfaces\AdapterInterface; +use InitPHP\Queue\Interfaces\JobInterface; +use PhpAmqpLib\Connection\AMQPStreamConnection; + +class RabbitMQAdapter implements AdapterInterface +{ + + private AMQPStreamConnection $connection; + + /** + * @var AMQPChannel[] + */ + private array $channel = []; + + /** + * @var AMQPChannel[] + */ + private array $failed = []; + + /** + * @throws Throwable + */ + public function __construct(string $host, int $port, string $username, string $password) + { + $this->connection = new AMQPStreamConnection($host, $port, $username, $password); + } + + private function declareChannel(string $channel, string $queue): string + { + $queue = $channel . '_' . $queue; + if (isset($this->channel[$queue])) { + return $queue; + } + $this->channel[$queue] = $this->connection->channel(); + $this->channel[$queue]->queue_declare($queue, false, true, false, false, false, null, null); + $this->failed[$queue . '_failed'] = $this->connection->channel(); + $this->failed[$queue . '_failed']->queue_declare($queue. '_failed', false, true, false, false, false, null, null); + + return $queue; + } + + /** + * @inheritDoc + */ + public function worker(object $message): bool + { + $payload = json_decode($message->body, true); + + try { + $job = isset($payload['jobClass']) ? new $payload['jobClass']($this) : null; + if (!($job instanceof JobInterface)) { + throw new JobException(); + } + $job->setPayload($payload['payload']) + ->setId($message->delivery_info); + $job->handle() ? $job->ack() : $job->nack(); + return true; + } catch (Throwable $e) { + if (isset($job) && ($job instanceof JobInterface) && !empty($job->getId())) { + $job->getId()['channel']->basic_nack($job->getId()['delivery_tag']); + } + return false; + } + } + + /** + * @inheritDoc + */ + public function handle(string $channel, string $queue): bool + { + try { + $queue = $this->declareChannel($channel, $queue); + $this->channel[$queue]->basic_qos(0, 1, null); + $this->channel[$queue]->basic_consume($queue, '', false, false, false, false, [$this, 'worker']); + while (sizeof($this->channel[$queue]->callbacks)) { + $this->channel[$queue]->wait(); + } + return true; + } catch (Throwable $e) { + return false; + } + } + + /** + * @inheritDoc + */ + public function push(string $channel, string $queue, JobInterface $job): bool + { + try { + $queue = $this->declareChannel($channel, $queue); + $message = new AMQPMessage(json_encode(['jobClass' => get_class($job), 'payload' => $job->getPayload()], JSON_UNESCAPED_SLASHES), [ + 'delivery_mode' => 2, + ]); + $this->channel[$queue]->basic_publish($message, '', $queue); + + return true; + } catch (Throwable $e) { + return false; + } + } + + /** + * @inheritDoc + */ + public function ack($id, ?string $message = null): bool + { + try { + $id['channel']->basic_ack($id['delivery_tag']); + return true; + } catch (Throwable $e) { + return false; + } + } + + /** + * @inheritDoc + */ + public function nack($id, ?string $message = null): bool + { + try { + $id['channel']->basic_nack($id['delivery_tag']); + return true; + } catch (Throwable $e) { + return false; + } + } + + /** + * @inheritDoc + */ + public function close(): bool + { + $queues = array_keys($this->channel); + foreach ($queues as $queue) { + $this->channel[$queue]->close(); + $this->failed[$queue]->close(); + } + $this->connection->close(); + + return true; + } + +} diff --git a/src/Exceptions/JobException.php b/src/Exceptions/JobException.php new file mode 100644 index 0000000..9a43bc6 --- /dev/null +++ b/src/Exceptions/JobException.php @@ -0,0 +1,21 @@ + + * @copyright Copyright © 2023 Muhammet ŞAFAK + * @license ./LICENSE MIT + * @version 1.0 + * @link https://www.muhammetsafak.com.tr + */ + +declare(strict_types=1); +namespace InitPHP\Queue\Exceptions; + +use Exception; + +class JobException extends Exception +{ +} diff --git a/src/Exceptions/JobInvalidArgumentException.php b/src/Exceptions/JobInvalidArgumentException.php new file mode 100644 index 0000000..6843424 --- /dev/null +++ b/src/Exceptions/JobInvalidArgumentException.php @@ -0,0 +1,21 @@ + + * @copyright Copyright © 2023 Muhammet ŞAFAK + * @license ./LICENSE MIT + * @version 1.0 + * @link https://www.muhammetsafak.com.tr + */ + +declare(strict_types=1); +namespace InitPHP\Queue\Exceptions; + +use InvalidArgumentException; + +class JobInvalidArgumentException extends InvalidArgumentException +{ +} diff --git a/src/Exceptions/QueueException.php b/src/Exceptions/QueueException.php new file mode 100644 index 0000000..2f49773 --- /dev/null +++ b/src/Exceptions/QueueException.php @@ -0,0 +1,21 @@ + + * @copyright Copyright © 2023 Muhammet ŞAFAK + * @license ./LICENSE MIT + * @version 1.0 + * @link https://www.muhammetsafak.com.tr + */ + +declare(strict_types=1); +namespace InitPHP\Queue\Exceptions; + +use Exception; + +class QueueException extends Exception +{ +} diff --git a/src/Interfaces/AdapterInterface.php b/src/Interfaces/AdapterInterface.php new file mode 100644 index 0000000..91f4b76 --- /dev/null +++ b/src/Interfaces/AdapterInterface.php @@ -0,0 +1,61 @@ + + * @copyright Copyright © 2023 Muhammet ŞAFAK + * @license ./LICENSE MIT + * @version 1.0 + * @link https://www.muhammetsafak.com.tr + */ + +declare(strict_types=1); +namespace InitPHP\Queue\Interfaces; + +interface AdapterInterface +{ + + /** + * @param object $message + * @return bool + */ + public function worker(object $message): bool; + + /** + * @param string $channel + * @param string $queue + * @return bool + */ + public function handle(string $channel, string $queue): bool; + + /** + * @param string $channel + * @param string $queue + * @param JobInterface $job + * @return bool + */ + public function push(string $channel, string $queue, JobInterface $job): bool; + + /** + * @param mixed $id + * @param string|null $message + * @return bool + */ + public function ack($id, ?string $message = null): bool; + + /** + * @param mixed $id + * @param string|null $message + * @return bool + */ + public function nack($id, ?string $message = null): bool; + + + /** + * @return bool + */ + public function close(): bool; + +} diff --git a/src/Interfaces/JobInterface.php b/src/Interfaces/JobInterface.php new file mode 100644 index 0000000..4bf449c --- /dev/null +++ b/src/Interfaces/JobInterface.php @@ -0,0 +1,98 @@ + + * @copyright Copyright © 2023 Muhammet ŞAFAK + * @license ./LICENSE MIT + * @version 1.0 + * @link https://www.muhammetsafak.com.tr + */ + +declare(strict_types=1); +namespace InitPHP\Queue\Interfaces; + +interface JobInterface +{ + + /** + * @param AdapterInterface $adapter + * @return self + */ + public function setAdapter(AdapterInterface $adapter): self; + + /** + * @return AdapterInterface + */ + public function getAdapter(): AdapterInterface; + + /** + * @param string $queue + * @return self + */ + public function setQueue(string $queue): self; + + /** + * @return string + */ + public function getQueue(): string; + + /** + * @param string $channel + * @return self + */ + public function setChannel(string $channel): self; + + /** + * @return string + */ + public function getChannel(): string; + + /** + * @param string|array $payload + * @return self + */ + public function setPayload($payload): self; + + /** + * @return array + */ + public function getPayload(): array; + + /** + * @param mixed $id + * @return self + */ + public function setId($id): self; + + /** + * @return mixed + */ + public function getId(); + + /** + * @param string|null $message + * @return bool + */ + public function ack(?string $message = null): bool; + + /** + * @param string|null $message + * @return bool + */ + public function nack(?string $message = null): bool; + + /** + * @return bool + */ + public function handle(): bool; + + /** + * @param array|null $payload + * @return bool + */ + public function push(?array $payload = null): bool; + +} diff --git a/src/Job.php b/src/Job.php new file mode 100644 index 0000000..4bc8fd0 --- /dev/null +++ b/src/Job.php @@ -0,0 +1,186 @@ + + * @copyright Copyright © 2023 Muhammet ŞAFAK + * @license ./LICENSE MIT + * @version 1.0 + * @link https://www.muhammetsafak.com.tr + */ + +declare(strict_types=1); +namespace InitPHP\Queue; + +use Throwable; +use InitPHP\Queue\Interfaces\JobInterface; +use InitPHP\Queue\Exceptions\JobInvalidArgumentException; +use InitPHP\Queue\Interfaces\AdapterInterface; + +abstract class Job implements JobInterface +{ + + private AdapterInterface $adapter; + + protected string $channel = 'c'; + + protected string $queue = 'queue'; + + /** @var mixed */ + private $id; + + private array $payload = []; + + private bool $nack = false; + + private bool $ack = false; + + public function __construct(AdapterInterface $adapter) + { + $this->setAdapter($adapter); + } + + /** + * @inheritDoc + */ + public function setAdapter(AdapterInterface $adapter): self + { + $this->adapter = $adapter; + + return $this; + } + + /** + * @inheritDoc + */ + public function getAdapter(): AdapterInterface + { + return $this->adapter; + } + + /** + * @inheritDoc + */ + public function setQueue(string $queue): self + { + $this->queue = $queue; + + return $this; + } + + /** + * @inheritDoc + */ + public function getQueue(): string + { + return $this->queue; + } + + /** + * @inheritDoc + */ + public function setChannel(string $channel): self + { + $this->channel = $channel; + + return $this; + } + + /** + * @inheritDoc + */ + public function getChannel(): string + { + return $this->channel; + } + + /** + * @inheritDoc + */ + public function setId($id): self + { + $this->id = $id; + + return $this; + } + + /** + * @inheritDoc + */ + public function getId() + { + return $this->id ?? null; + } + + /** + * @inheritDoc + */ + public function setPayload($payload): self + { + if (is_string($payload)) { + $payload = json_decode($payload, true); + } + if (!is_array($payload)) { + throw new JobInvalidArgumentException(); + } + $this->payload = $payload; + + return $this; + } + + /** + * @inheritDoc + */ + public function getPayload(): array + { + return $this->payload; + } + + /** + * @inheritDoc + */ + public function ack(?string $message = null): bool + { + if (!$this->ack) { + $this->ack = true; + return $this->getAdapter()->ack($this->id, $message); + } + + return true; + } + + /** + * @inheritDoc + */ + public function nack(?string $message = null): bool + { + if (!$this->nack) { + $this->nack = true; + return $this->getAdapter()->nack($this->id, $message); + } + + return true; + } + + /** + * @inheritDoc + */ + abstract public function handle(): bool; + + /** + * @inheritDoc + */ + public function push(?array $payload = null): bool + { + try { + $payload !== null && $this->setPayload($payload); + $this->getAdapter()->push($this->getChannel(), $this->getQueue(), $this); + return true; + } catch (Throwable $e) { + return false; + } + } + +}