Skip to content

Commit 0bbb228

Browse files
authored
Merge pull request #2 from php-enqueue/amqp-interop-support
Amqp queue
2 parents 2c8f7cc + 4f9c4a5 commit 0bbb228

16 files changed

+99
-3
lines changed

composer.json

+4-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
"require": {
1414
"php": ">=5.6",
1515
"illuminate/queue": "^5.4",
16-
"queue-interop/queue-interop": "^0.6@dev"
16+
"queue-interop/queue-interop": "^0.6@dev",
17+
"queue-interop/amqp-interop": "^0.6@dev",
18+
"enqueue/amqp-tools": "^0.7@dev"
1719
},
1820
"require-dev": {
1921
"phpunit/phpunit": "~5.5",
@@ -22,7 +24,7 @@
2224
"enqueue/test": "^0.7@dev"
2325
},
2426
"autoload": {
25-
"psr-4": { "Enqueue\\LaravelQueue\\": "" },
27+
"psr-4": { "Enqueue\\LaravelQueue\\": "src/" },
2628
"exclude-from-classmap": [
2729
"/Tests/"
2830
]

src/AmqpConnector.php

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
3+
namespace Enqueue\LaravelQueue;
4+
5+
use Enqueue\AmqpTools\DelayStrategyAware;
6+
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
7+
use Interop\Amqp\AmqpContext;
8+
9+
class AmqpConnector extends Connector
10+
{
11+
public function connect(array $config)
12+
{
13+
$queue = parent::connect($config);
14+
15+
/** @var AmqpContext $amqpContext */
16+
$amqpContext = $queue->getPsrContext();
17+
if (false == $amqpContext instanceof AmqpContext) {
18+
throw new \LogicException(sprintf('The context must be instance of "%s" but got "%s"', AmqpContext::class, get_class($queue->getPsrContext())));
19+
}
20+
21+
if ($amqpContext instanceof DelayStrategyAware) {
22+
$amqpContext->setDelayStrategy(new RabbitMqDlxDelayStrategy());
23+
}
24+
25+
return $queue;
26+
}
27+
}

src/AmqpQueue.php

+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
<?php
2+
3+
namespace Enqueue\LaravelQueue;
4+
5+
use Interop\Amqp\AmqpContext;
6+
7+
/**
8+
* @method AmqpContext getPsrContext()
9+
*/
10+
class AmqpQueue extends Queue
11+
{
12+
/**
13+
* {@inheritdoc}
14+
*
15+
* @param AmqpContext $psrContext
16+
*/
17+
public function __construct(AmqpContext $psrContext, $queueName, $timeToRun)
18+
{
19+
parent::__construct($psrContext, $queueName, $timeToRun);
20+
}
21+
22+
/**
23+
* {@inheritdoc}
24+
*/
25+
public function pushRaw($payload, $queue = null, array $options = [])
26+
{
27+
$this->declareQueue($queue);
28+
29+
parent::pushRaw($payload, $queue, $options);
30+
}
31+
32+
/**
33+
* {@inheritdoc}
34+
*/
35+
public function later($delay, $job, $data = '', $queue = null)
36+
{
37+
$this->declareQueue($queue);
38+
39+
return parent::later($delay, $job, $data, $queue);
40+
}
41+
42+
/**
43+
* {@inheritdoc}
44+
*/
45+
public function pop($queue = null)
46+
{
47+
$this->declareQueue($queue);
48+
49+
return parent::pop($queue);
50+
}
51+
52+
/**
53+
* @param string|null $queue
54+
*/
55+
protected function declareQueue($queue = null)
56+
{
57+
$psrQueue = $this->getPsrContext()->createQueue($this->getQueue($queue));
58+
$psrQueue->addFlag(\Interop\Amqp\AmqpQueue::FLAG_DURABLE);
59+
60+
$this->getPsrContext()->declareQueue($psrQueue);
61+
}
62+
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

EnqueueServiceProvider.php renamed to src/EnqueueServiceProvider.php

+4
Original file line numberDiff line numberDiff line change
@@ -65,5 +65,9 @@ private function bootInteropQueueDriver()
6565
$manager->addConnector('interop', function () {
6666
return new Connector();
6767
});
68+
69+
$manager->addConnector('amqp_interop', function () {
70+
return new AmqpConnector();
71+
});
6872
}
6973
}

Job.php renamed to src/Job.php

File renamed without changes.

Queue.php renamed to src/Queue.php

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@ class Queue extends BaseQueue implements QueueContract
1717
* @var int
1818
*/
1919
protected $timeToRun;
20+
2021
/**
2122
* @var PsrContext
2223
*/
23-
private $psrContext;
24+
protected $psrContext;
2425

2526
/**
2627
* @param PsrContext $psrContext
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 commit comments

Comments
 (0)