Skip to content

Commit

Permalink
First commit
Browse files Browse the repository at this point in the history
  • Loading branch information
muhammetsafak committed Dec 10, 2023
0 parents commit 90f7c5b
Show file tree
Hide file tree
Showing 12 changed files with 939 additions and 0 deletions.
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/.idea/
/.vs/
/.vscode/
/vendor/
/composer.lock
/.phpunit.result.cache
/nbproject/private/
/*.log
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2023 InitPHP

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
159 changes: 159 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# InitPHP Queue

This library offers performance and asynchrony by queuing your jobs to be done later.

```
composer require initphp/queue
```

## Create Job

First, start by creating the business class. You can find a simple example below.

```php
require_once __DIR__ . DIRECTORY_SEPARATOR . 'vendor/autoload.php';

namespace App\Jobs;

use InitPHP\Queue\Job;

class MailJob extends Job
{
protected string $channel = 'mailChannel';

protected string $queue = 'mailQueue';

public function handle(): bool
{
$payload = $this->getPayload();
try {
if (mail($payload['to'], $payload['subject'])) {
return true;
} else {
return false;;
}
} catch (\Throwable $e) {
return false;
}
}
}
```

Use the `push()` method to add jobs to the queue;

```php
require_once __DIR__ . DIRECTORY_SEPARATOR . 'vendor/autoload.php';
$adapter = new \InitPHP\Queue\Adapters\RabbitMQAdapter('127.0.0.1', 5267, 'guest', 'guest');

$job = new App\Jobs\MailJob($adapter);

// Add Queue Job
$job->push([
'to' => '[email protected]',
'subject' => 'Subject Mail',
]);
```

Write your code to handle the jobs in the queue.

`consumer.php`

```php
require_once __DIR__ . DIRECTORY_SEPARATOR . 'vendor/autoload.php';
$adapter = new \InitPHP\Queue\Adapters\RabbitMQAdapter('127.0.0.1', 5267, 'guest', 'guest');

$adapter->handle('mailChannel', 'mailQueue');

$adapter->close();
```

Trigger your consumer code.

```
php consumer.php
```


# Adapters
- [x] [PDO (Database) Adapter](#pdo-adapter)
- [x] [RabbitMQ Adapter](#rabbitmq-adapter)
- [ ] Kafka Adapter

## PDO Adapter

- [x] PDO Extension

To initialize the PDO adapter, you need a PDO object and 2 tables.

```php
$pdo = new PDO('mysql:host=localhost;port=3307;dbname=queue_db', 'root', 'root');
$adapter = new \InitPHP\Queue\Adapters\PDOAdapter($pdo, 'queue');
```

The first of these tables is used for those waiting in line and the other for jobs that have errors. The table name into which the failed jobs fall is obtained by adding "`_failed`" as a suffix to the main table name. Accordingly, create your queue tables using the following SQL.

```sql
CREATE TABLE `queue` (
`id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
`channel` VARCHAR(255) NOT NULL,
`queue` VARCHAR(255) NOT NULL,
`payload` TEXT NULL DEFAULT NULL,
`created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` DATETIME NULL DEFAULT NULL,
`status` TINYINT(1) NOT NULL DEFAULT '0',
PRIMARY KEY (`id`)
) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE INDEX `channel_queue` ON `queue` (`channel`, `queue`);

CREATE TABLE `queue_failed` (
`id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
`queue_id` BIGINT UNSIGNED NOT NULL,
`channel` VARCHAR(255) NOT NULL,
`queue` VARCHAR(255) NOT NULL,
`payload` TEXT NULL DEFAULT NULL,
`created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` DATETIME NULL DEFAULT NULL,
`status` TINYINT(1) NOT NULL DEFAULT '0',
PRIMARY KEY (`id`)
) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
```

## RabbitMQ Adapter

- [x] RabbitMQ Server
- [x] "`php-amqplib/php-amqplib`" Library

```
composer require php-amqplib/php-amqplib
```

```php
$adapter = new \InitPHP\Queue\Adapters\RabbitMQAdapter('127.0.0.1', 5267, 'guest', 'guest');
```

# Getting Involved

> All contributions to this project will be published under the MIT License. By submitting a pull request or filing a bug, issue, or feature request, you are agreeing to comply with this waiver of copyright interest.
There are two primary ways to help:

- Using the issue tracker, and
- Changing the code-base.

## Using the issue tracker

Use the issue tracker to suggest feature requests, report bugs, and ask questions. This is also a great way to connect with the developers of the project as well as others who are interested in this solution.

Use the issue tracker to find ways to contribute. Find a bug or a feature, mention in the issue that you will take on that effort, then follow the Changing the code-base guidance below.

## Changing the code-base

Generally speaking, you should fork this repository, make changes in your own fork, and then submit a pull request. All new code should have associated unit tests that validate implemented features and the presence or lack of defects. Additionally, the code should follow any stylistic and architectural guidelines prescribed by the project. In the absence of such guidelines, mimic the styles and patterns in the existing code-base.

# Credits

- [Muhammet ŞAFAK](https://www.muhammetsafak.com.tr) <<[email protected]>>

# License

Copyright &copy; 2022 [MIT License](./LICENSE)
24 changes: 24 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"name": "initphp/queue",
"description": "InitPHP Queue Library",
"type": "library",
"license": "MIT",
"autoload": {
"psr-4": {
"InitPHP\\Queue\\": "src/"
}
},
"authors": [
{
"name": "Muhammet ŞAFAK",
"email": "[email protected]",
"role": "Developer",
"homepage": "https://www.muhammetsafak.com.tr"
}
],
"minimum-stability": "stable",
"require": {
"php": ">=7.4",
"ext-json": "*"
}
}
156 changes: 156 additions & 0 deletions src/Adapters/PDOAdapter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
<?php
/**
* InitPHP Queue
*
* This file is part of InitPHP Queue.
*
* @author Muhammet ŞAFAK <[email protected]>
* @copyright Copyright © 2023 Muhammet ŞAFAK
* @license ./LICENSE MIT
* @version 1.0
* @link https://www.muhammetsafak.com.tr
*/

namespace InitPHP\Queue\Adapters;

use PDO;
use Throwable;
use InitPHP\Queue\Interfaces\AdapterInterface;
use InitPHP\Queue\Interfaces\JobInterface;

class PDOAdapter implements AdapterInterface
{

private ?PDO $pdo;

private string $table;

public function __construct(PDO $pdo, string $table = 'queue')
{
$this->pdo = $pdo;
$this->table = $table;
}

/**
* @param object $message
* @return bool
*/
public function worker(object $message): bool
{
try {
$payload = json_decode($message->payload, true);
$jobClass = $payload['jobClass'];
$jobObj = $jobClass($this);
if (!($jobObj instanceof JobInterface)) {
return false;
}
$jobObj->setPayload($payload['payload'])
->setId($message->id);

return $jobObj->handle() ? $jobObj->ack() : $jobObj->nack();
} catch (Throwable $e) {
return false;
}
}

/**
* @inheritDoc
*/
public function handle(string $channel, string $queue): bool
{
try {

do {
$stmt = $this->pdo->prepare("SELECT * FROM " . $this->table . " WHERE channel = :channel AND queue = :queue AND status = 0 LIMIT 0, 1");
if (!$stmt) {
return false;
}
$stmt->bindValue(':channel', $channel);
$stmt->bindValue(':queue', $queue);
if (!$stmt->execute()) {
return false;
}
if ($stmt->rowCount() < 1) {
return false;
}
$stmt->setFetchMode(PDO::FETCH_OBJ);
$res = $stmt->fetch();

$update = $this->pdo->prepare("UPDATE " . $this->table . " SET status = 1, updated_at = :updated_at WHERE id = :id");
$update->bindValue(':id', $res->id, PDO::PARAM_INT);
$update->bindValue(':updated_at', date("Y-m-d H:i:s"));
$update->execute();

$this->worker($res);
} while (true);
} catch (Throwable $e) {
return false;
}
}

/**
* @inheritDoc
*/
public function push(string $channel, string $queue, JobInterface $job): bool
{
try {
$insert = $this->pdo->prepare("INSERT INTO " . $this->table . " (channel, queue, payload, created_at, updated_at, status) VALUES (:channel, :queue, :payload, :created_at, NULL, 0);");

$insert->bindValue(':channel', $channel);
$insert->bindValue(':queue', $queue);
$insert->bindValue(':payload', json_encode(['jobClass' => get_class($job), 'payload' => $job->getPayload()], JSON_UNESCAPED_SLASHES));
$insert->bindValue(':created_at', date("Y-m-d H:i:s"));
return $insert->execute();
} catch (Throwable $e) {
return false;
}
}

/**
* @inheritDoc
*/
public function ack($id, ?string $message = null): bool
{
try {
$delete = $this->pdo->prepare("DELETE FROM " . $this->table . " WHERE id = :id");
$delete->bindValue(':id', $id);
$delete->execute();
} catch (Throwable $e) {
return false;
}

return true;
}

/**
* @inheritDoc
*/
public function nack($id, ?string $message = null): bool
{
try {
$this->pdo->beginTransaction();
$insert = $this->pdo->prepare("INSERT INTO " . $this->table . "_failed (queue_id, channel, queue, payload, created_at, updated_at, status) SELECT * FROM " . $this->table . " WHERE id = :id");
$insert->bindValue(':id', $id);
$insert->execute();
$delete = $this->pdo->prepare("DELETE FROM " . $this->table . " WHERE id = :id");
$delete->bindValue(':id', $id);
$delete->execute();

return $this->pdo->commit();
} catch (Throwable $e) {
$this->pdo->rollBack();
return false;
}
}

/**
* @inheritDoc
*/
public function close(): bool
{
$this->pdo = null;

return true;
}

}
Loading

0 comments on commit 90f7c5b

Please sign in to comment.