Skip to content

Commit 61d227e

Browse files
author
farhadzand
committed
refactor: simplify README and enhance RabbitMQ consumer implementation
- Streamlined the README by reducing verbosity and improving clarity, including a new Table of Contents. - Removed the ConsumerInterface and integrated its functionality directly into the Consumer class. - Enhanced error handling in RabbitMQ connection and message publishing methods. - Updated job handling in RabbitMQJob to ensure better logging and connection checks. - Improved the RabbitQueue class with retry logic and better exception handling for queue operations.
1 parent d966b28 commit 61d227e

File tree

13 files changed

+379
-498
lines changed

13 files changed

+379
-498
lines changed

README.md

Lines changed: 201 additions & 364 deletions
Large diffs are not rendered by default.

src/Connectors/RabbitMQConnector.php

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
<?php
22

3+
declare(strict_types=1);
4+
35
namespace iamfarhad\LaravelRabbitMQ\Connectors;
46

57
use AMQPConnection;
8+
use AMQPConnectionException;
9+
use iamfarhad\LaravelRabbitMQ\Exceptions\ConnectionException;
610
use iamfarhad\LaravelRabbitMQ\RabbitQueue;
711
use Illuminate\Contracts\Events\Dispatcher;
812
use Illuminate\Contracts\Queue\Queue;
@@ -13,6 +17,9 @@
1317
{
1418
public function __construct(private Dispatcher $dispatcher) {}
1519

20+
/**
21+
* @throws ConnectionException
22+
*/
1623
public function connect(array $config = []): Queue
1724
{
1825
$connectionConfig = [
@@ -41,8 +48,16 @@ public function connect(array $config = []): Queue
4148
}
4249

4350
// Create AMQP Connection
44-
$connection = new AMQPConnection($connectionConfig);
45-
$connection->connect();
51+
try {
52+
$connection = new AMQPConnection($connectionConfig);
53+
$connection->connect();
54+
} catch (AMQPConnectionException $e) {
55+
throw new ConnectionException(
56+
'Failed to connect to RabbitMQ: '.$e->getMessage(),
57+
$e->getCode(),
58+
$e
59+
);
60+
}
4661

4762
$defaultQueue = config('queue.connections.rabbitmq.queue', 'default');
4863
$options = config('queue.connections.rabbitmq.options', []);

src/Console/ConsumeCommand.php

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ public function handle(): int
8888
// Parent process
8989
$childPids[] = $pid;
9090
$this->info("Started worker process $pid");
91-
9291
}
9392

9493
// Set up signal handling for graceful termination
@@ -132,7 +131,7 @@ private function consume(): int
132131
}
133132

134133
$consumer->setContainer($this->laravel);
135-
$consumer->setName($this->option('name'));
134+
$consumer->setName((string) $this->option('name'));
136135
$consumer->setConsumerTag($this->generateConsumerTag());
137136

138137
// Only set max priority if it's provided and not null
@@ -154,8 +153,9 @@ private function consume(): int
154153
*/
155154
private function generateConsumerTag(): string
156155
{
157-
if ($consumerTag = $this->option('consumer-tag')) {
158-
return $consumerTag;
156+
$consumerTag = $this->option('consumer-tag');
157+
if ($consumerTag !== null && $consumerTag !== false && $consumerTag !== '') {
158+
return (string) $consumerTag;
159159
}
160160

161161
$appName = config('app.name', 'laravel');
@@ -166,7 +166,7 @@ private function generateConsumerTag(): string
166166
'_',
167167
[
168168
Str::slug($appName),
169-
Str::slug($consumerName),
169+
Str::slug((string) $consumerName),
170170
$uniqueId,
171171
]
172172
);

src/Consumer.php

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,33 @@
11
<?php
22

3+
declare(strict_types=1);
4+
35
namespace iamfarhad\LaravelRabbitMQ;
46

57
use AMQPChannel;
68
use AMQPChannelException;
79
use AMQPConnectionException;
10+
use AMQPEnvelope;
811
use AMQPQueue;
912
use Exception;
10-
use iamfarhad\LaravelRabbitMQ\Contracts\ConsumerInterface;
13+
use iamfarhad\LaravelRabbitMQ\Jobs\RabbitMQJob;
1114
use Illuminate\Container\Container;
1215
use Illuminate\Queue\Worker;
1316
use Illuminate\Queue\WorkerOptions;
17+
use RuntimeException;
1418
use Throwable;
1519

16-
class Consumer extends Worker implements ConsumerInterface
20+
class Consumer extends Worker
1721
{
1822
private Container $container;
1923

20-
private string $consumerTag;
24+
private string $consumerTag = '';
2125

22-
private int $maxPriority;
26+
private int $maxPriority = 0;
2327

2428
private AMQPChannel $amqpChannel;
2529

26-
private ?object $currentJob = null;
30+
private ?RabbitMQJob $currentJob = null;
2731

2832
public function setContainer(Container $container): void
2933
{
@@ -63,7 +67,7 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
6367

6468
// Check if the connection is a RabbitQueue instance
6569
if (! $connection instanceof RabbitQueue) {
66-
throw new \RuntimeException('Connection must be an instance of RabbitQueue for RabbitMQ Consumer');
70+
throw new RuntimeException('Connection must be an instance of RabbitQueue for RabbitMQ Consumer');
6771
}
6872

6973
$connection->declareQueue($queue);
@@ -91,7 +95,7 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
9195
// Try to get a message from the queue
9296
$envelope = $amqpQueue->get(AMQP_NOPARAM);
9397

94-
if ($envelope !== false) {
98+
if ($envelope instanceof AMQPEnvelope) {
9599
$job = new $jobClass(
96100
$this->container,
97101
$connection,
@@ -100,6 +104,7 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
100104
$queue
101105
);
102106

107+
/** @var RabbitMQJob $job */
103108
$this->currentJob = $job;
104109

105110
if ($this->supportsAsyncSignals()) {

src/Contracts/ConsumerInterface.php

Lines changed: 0 additions & 34 deletions
This file was deleted.
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
<?php
22

3+
declare(strict_types=1);
4+
35
namespace iamfarhad\LaravelRabbitMQ\Exceptions;
46

5-
class ConnectionException extends RabbitMQException
7+
final class ConnectionException extends RabbitMQException
68
{
79
//
810
}

src/Exceptions/QueueException.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
<?php
22

3+
declare(strict_types=1);
4+
35
namespace iamfarhad\LaravelRabbitMQ\Exceptions;
46

5-
class QueueException extends RabbitMQException
7+
final class QueueException extends RabbitMQException
68
{
79
//
810
}
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
<?php
22

3+
declare(strict_types=1);
4+
35
namespace iamfarhad\LaravelRabbitMQ\Exceptions;
46

57
use Exception;
68

7-
class RabbitMQException extends Exception
9+
abstract class RabbitMQException extends Exception
810
{
911
//
1012
}

src/Facades/RabbitMQ.php

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,22 @@
11
<?php
22

3+
declare(strict_types=1);
4+
35
namespace iamfarhad\LaravelRabbitMQ\Facades;
46

57
use Illuminate\Support\Facades\Facade;
68

79
/**
8-
* @method static mixed push($job, $data = '', $queue = null)
9-
* @method static mixed pushRaw($payload, $queue = null, array $options = [])
10-
* @method static mixed later($delay, $job, $data = '', $queue = null)
11-
* @method static mixed laterRaw($delay, $payload, $queue = null, $attempts = 2)
12-
* @method static mixed pop($queue = null)
13-
* @method static int size($queue = null)
10+
* @method static ?string push($job, $data = '', ?string $queue = null)
11+
* @method static ?string pushRaw(string $payload, ?string $queue = null, array $options = [])
12+
* @method static ?string later($delay, $job, $data = '', ?string $queue = null)
13+
* @method static ?string laterRaw($delay, string $payload, ?string $queue = null, int $attempts = 2)
14+
* @method static ?\Illuminate\Contracts\Queue\Job pop(?string $queue = null)
15+
* @method static int size(?string $queue = null)
1416
* @method static bool queueExists(string $queueName)
1517
* @method static mixed purgeQueue(string $queueName)
1618
* @method static mixed deleteQueue(string $queueName)
19+
* @method static void declareQueue(string $name, bool $durable = true, bool $autoDelete = false, array $arguments = [])
1720
*
1821
* @see \iamfarhad\LaravelRabbitMQ\RabbitQueue
1922
*/

src/Jobs/RabbitMQJob.php

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
<?php
22

3+
declare(strict_types=1);
4+
35
namespace iamfarhad\LaravelRabbitMQ\Jobs;
46

57
use AMQPEnvelope;
@@ -8,10 +10,14 @@
810
use Illuminate\Contracts\Queue\Job as JobContract;
911
use Illuminate\Queue\Jobs\Job;
1012
use Illuminate\Support\Arr;
13+
use Illuminate\Support\Facades\Log;
1114
use JsonException;
15+
use Throwable;
1216

1317
final class RabbitMQJob extends Job implements JobContract
1418
{
19+
private const FAILED_MESSAGES_EXCHANGE = 'failed_messages';
20+
1521
private readonly array $decoded;
1622

1723
public function __construct(
@@ -30,7 +36,7 @@ public function __construct(
3036
/**
3137
* {@inheritdoc}
3238
*/
33-
public function getJobId()
39+
public function getJobId(): ?string
3440
{
3541
return $this->decoded['id'] ?? null;
3642
}
@@ -62,9 +68,28 @@ public function attempts(): int
6268
*/
6369
private function convertMessageToFailed(): void
6470
{
65-
if ($this->amqpEnvelope->getExchangeName() !== 'failed_messages') {
66-
$this->rabbitQueue->declareQueue('failed_messages');
67-
$this->rabbitQueue->pushRaw($this->amqpEnvelope->getBody(), 'failed_messages');
71+
try {
72+
if ($this->amqpEnvelope->getExchangeName() !== self::FAILED_MESSAGES_EXCHANGE) {
73+
// Check if the RabbitMQ connection is still available before attempting operations
74+
if (! $this->rabbitQueue->getConnection()->isConnected()) {
75+
Log::warning('RabbitMQ connection lost, cannot move job to failed_messages queue', [
76+
'job_id' => $this->getJobId(),
77+
]);
78+
79+
return;
80+
}
81+
82+
$this->rabbitQueue->declareQueue(self::FAILED_MESSAGES_EXCHANGE);
83+
$this->rabbitQueue->pushRaw($this->amqpEnvelope->getBody(), self::FAILED_MESSAGES_EXCHANGE);
84+
}
85+
} catch (Throwable $e) {
86+
// If channel is not available or queue declaration fails, just log the error
87+
// This can happen when the connection is lost during job processing
88+
Log::error('Failed to move job to failed_messages queue', [
89+
'job_id' => $this->getJobId(),
90+
'exception' => $e->getMessage(),
91+
'exception_class' => get_class($e),
92+
]);
6893
}
6994
}
7095

0 commit comments

Comments
 (0)