This repository has been archived by the owner on Aug 24, 2022. It is now read-only.
forked from gjedeer/celery-php
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathamqppeclconnector.php
119 lines (105 loc) · 2.88 KB
/
amqppeclconnector.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
<?php
require_once('amqp.php');
/**
* Driver for a fast C/librabbitmq implementation of AMQP from PECL
* @link http://pecl.php.net/package/amqp
* @package celery-php
*/
class PECLAMQPConnector extends AbstractAMQPConnector
{
/**
* Return AMQPConnection object passed to all other calls
* @param array $details Array of connection details
* @return AMQPConnection
*/
function GetConnectionObject($details)
{
$connection = new AMQPConnection();
$connection->setHost($details['host']);
$connection->setLogin($details['login']);
$connection->setPassword($details['password']);
$connection->setVhost($details['vhost']);
$connection->setPort($details['port']);
return $connection;
}
/**
* Initialize connection on a given connection object
* @return NULL
*/
function Connect($connection)
{
$connection->connect();
}
/**
* Post a task to exchange specified in $details
* @param AMQPConnection $connection Connection object
* @param array $details Array of connection details
* @param string $task JSON-encoded task
* @param array $params AMQP message parameters
*/
function PostToExchange($connection, $details, $task, $params)
{
$ch = new AMQPChannel($connection);
$xchg = new AMQPExchange($ch);
$xchg->setName($details['exchange']);
$success = $xchg->publish($task, $details['binding'], 0, $params);
$connection->disconnect();
return $success;
}
/**
* Return result of task execution for $task_id
* @param AMQPConnection $connection Connection object
* @param string $task_id Celery task identifier
* @param boolean $removeMessageFromQueue whether to remove message from queue
* @return array array('body' => JSON-encoded message body, 'complete_result' => AMQPEnvelope object)
* or false if result not ready yet
*/
function GetMessageBody($connection, $task_id, $removeMessageFromQueue = true)
{
$this->Connect($connection);
$ch = new AMQPChannel($connection);
$q = new AMQPQueue($ch);
$q->setName($task_id);
$q->setFlags(AMQP_AUTODELETE | AMQP_DURABLE);
$q->declareQueue();
try
{
$q->bind('celeryresults', $task_id);
}
catch(AMQPQueueException $e)
{
if ($removeMessageFromQueue) {
$q->delete();
}
$connection->disconnect();
return false;
}
$message = $q->get(AMQP_AUTOACK);
if(!$message)
{
if ($removeMessageFromQueue) {
$q->delete();
}
$connection->disconnect();
return false;
}
if($message->getContentType() != 'application/json')
{
if ($removeMessageFromQueue) {
$q->delete();
}
$connection->disconnect();
throw new CeleryException('Response was not encoded using JSON - found ' .
$message->getContentType().
' - check your CELERY_RESULT_SERIALIZER setting!');
}
if ($removeMessageFromQueue) {
$q->delete();
}
$connection->disconnect();
return array(
'complete_result' => $message,
'body' => $message->getBody(),
);
}
}