Skip to content

Commit 0595143

Browse files
committed
Add an extension to stop consumption on closed entity manager
1 parent 6afffd3 commit 0595143

File tree

8 files changed

+396
-0
lines changed

8 files changed

+396
-0
lines changed

docs/bundle/config_reference.md

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ enqueue:
7070
doctrine_ping_connection_extension: false
7171
doctrine_clear_identity_map_extension: false
7272
doctrine_odm_clear_identity_map_extension: false
73+
doctrine_closed_entity_manager_extension: false
7374
reset_services_extension: false
7475
signal_extension: true
7576
reply_extension: true

docs/consumption/extensions.md

+4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ It clears Doctrine's identity map after a message is processed. It reduce memory
2222

2323
It test a database connection and if it is lost it does reconnect. Fixes "MySQL has gone away" errors.
2424

25+
## [DoctrineClosedEntityManagerExtension](https://github.com/php-enqueue/enqueue-dev/blob/master/pkg/enqueue-bundle/Consumption/Extension/DoctrineClosedEntityManagerExtension.php)
26+
27+
The extension interrupts consumption if an entity manager has been closed.
28+
2529
## [ResetServicesExtension](https://github.com/php-enqueue/enqueue-dev/blob/master/pkg/enqueue-bundle/Consumption/Extension/ResetServicesExtension.php)
2630

2731
It resets all services with tag "kernel.reset".
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Consumption\Extension;
4+
5+
use Doctrine\Common\Persistence\ManagerRegistry;
6+
use Doctrine\ORM\EntityManagerInterface;
7+
use Enqueue\Consumption\Context\PostConsume;
8+
use Enqueue\Consumption\Context\PostMessageReceived;
9+
use Enqueue\Consumption\Context\PreConsume;
10+
use Enqueue\Consumption\PostConsumeExtensionInterface;
11+
use Enqueue\Consumption\PostMessageReceivedExtensionInterface;
12+
use Enqueue\Consumption\PreConsumeExtensionInterface;
13+
use Psr\Log\LoggerInterface;
14+
15+
class DoctrineClosedEntityManagerExtension implements PreConsumeExtensionInterface, PostMessageReceivedExtensionInterface, PostConsumeExtensionInterface
16+
{
17+
/**
18+
* @var ManagerRegistry
19+
*/
20+
protected $registry;
21+
22+
public function __construct(ManagerRegistry $registry)
23+
{
24+
$this->registry = $registry;
25+
}
26+
27+
public function onPreConsume(PreConsume $context): void
28+
{
29+
if ($this->shouldBeStopped($context->getLogger())) {
30+
$context->interruptExecution();
31+
}
32+
}
33+
34+
public function onPostConsume(PostConsume $context): void
35+
{
36+
if ($this->shouldBeStopped($context->getLogger())) {
37+
$context->interruptExecution();
38+
}
39+
}
40+
41+
public function onPostMessageReceived(PostMessageReceived $context): void
42+
{
43+
if ($this->shouldBeStopped($context->getLogger())) {
44+
$context->interruptExecution();
45+
}
46+
}
47+
48+
private function shouldBeStopped(LoggerInterface $logger): bool
49+
{
50+
foreach ($this->registry->getManagers() as $name => $manager) {
51+
if (!$manager instanceof EntityManagerInterface || $manager->isOpen()) {
52+
continue;
53+
}
54+
55+
$logger->debug(sprintf(
56+
'[DoctrineClosedEntityManagerExtension] Interrupt execution as entity manager "%s" has been closed',
57+
$name
58+
));
59+
60+
return true;
61+
}
62+
63+
return false;
64+
}
65+
}

pkg/enqueue-bundle/DependencyInjection/Configuration.php

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public function getConfigTreeBuilder(): TreeBuilder
4848
->booleanNode('doctrine_ping_connection_extension')->defaultFalse()->end()
4949
->booleanNode('doctrine_clear_identity_map_extension')->defaultFalse()->end()
5050
->booleanNode('doctrine_odm_clear_identity_map_extension')->defaultFalse()->end()
51+
->booleanNode('doctrine_closed_entity_manager_extension')->defaultFalse()->end()
5152
->booleanNode('reset_services_extension')->defaultFalse()->end()
5253
->booleanNode('signal_extension')->defaultValue(function_exists('pcntl_signal_dispatch'))->end()
5354
->booleanNode('reply_extension')->defaultTrue()->end()

pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php

+24
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Enqueue\AsyncCommand\DependencyInjection\AsyncCommandExtension;
66
use Enqueue\AsyncEventDispatcher\DependencyInjection\AsyncEventDispatcherExtension;
77
use Enqueue\Bundle\Consumption\Extension\DoctrineClearIdentityMapExtension;
8+
use Enqueue\Bundle\Consumption\Extension\DoctrineClosedEntityManagerExtension;
89
use Enqueue\Bundle\Consumption\Extension\DoctrinePingConnectionExtension;
910
use Enqueue\Bundle\Consumption\Extension\ResetServicesExtension;
1011
use Enqueue\Bundle\Profiler\MessageQueueCollector;
@@ -138,6 +139,7 @@ public function load(array $configs, ContainerBuilder $container): void
138139
$this->loadDoctrinePingConnectionExtension($config, $container);
139140
$this->loadDoctrineClearIdentityMapExtension($config, $container);
140141
$this->loadDoctrineOdmClearIdentityMapExtension($config, $container);
142+
$this->loadDoctrineClosedEntityManagerExtension($config, $container);
141143
$this->loadResetServicesExtension($config, $container);
142144
$this->loadSignalExtension($config, $container);
143145
$this->loadReplyExtension($config, $container);
@@ -273,6 +275,28 @@ private function loadDoctrineOdmClearIdentityMapExtension(array $config, Contain
273275
}
274276
}
275277

278+
private function loadDoctrineClosedEntityManagerExtension(array $config, ContainerBuilder $container)
279+
{
280+
$configNames = [];
281+
foreach ($config as $name => $modules) {
282+
if ($modules['extensions']['doctrine_closed_entity_manager_extension']) {
283+
$configNames[] = $name;
284+
}
285+
}
286+
287+
if ([] === $configNames) {
288+
return;
289+
}
290+
291+
$extension = $container->register('enqueue.consumption.doctrine_closed_entity_manager_extension', DoctrineClosedEntityManagerExtension::class)
292+
->addArgument(new Reference('doctrine'));
293+
294+
foreach ($configNames as $name) {
295+
$extension->addTag('enqueue.consumption_extension', ['client' => $name]);
296+
$extension->addTag('enqueue.transport.consumption_extension', ['transport' => $name]);
297+
}
298+
}
299+
276300
private function loadResetServicesExtension(array $config, ContainerBuilder $container)
277301
{
278302
$configNames = [];
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Tests\Unit\Consumption\Extension;
4+
5+
use Doctrine\Common\Persistence\ManagerRegistry;
6+
use Doctrine\ORM\EntityManagerInterface;
7+
use Enqueue\Bundle\Consumption\Extension\DoctrineClosedEntityManagerExtension;
8+
use Enqueue\Consumption\Context\PostConsume;
9+
use Enqueue\Consumption\Context\PostMessageReceived;
10+
use Enqueue\Consumption\Context\PreConsume;
11+
use Interop\Queue\Consumer;
12+
use Interop\Queue\Context as InteropContext;
13+
use Interop\Queue\Message;
14+
use Interop\Queue\SubscriptionConsumer;
15+
use PHPUnit\Framework\TestCase;
16+
use Psr\Log\LoggerInterface;
17+
18+
class DoctrineClosedEntityManagerExtensionTest extends TestCase
19+
{
20+
public function testOnPreConsumeShouldNotInterruptExecution()
21+
{
22+
$manager = $this->createManagerMock(true);
23+
24+
$registry = $this->createRegistryMock([
25+
'manager' => $manager,
26+
]);
27+
28+
$message = new PreConsume(
29+
$this->createMock(InteropContext::class),
30+
$this->createMock(SubscriptionConsumer::class),
31+
$this->createMock(LoggerInterface::class),
32+
1,
33+
2,
34+
3
35+
);
36+
37+
self::assertFalse($message->isExecutionInterrupted());
38+
39+
$extension = new DoctrineClosedEntityManagerExtension($registry);
40+
$extension->onPreConsume($message);
41+
42+
self::assertFalse($message->isExecutionInterrupted());
43+
}
44+
45+
public function testOnPreConsumeShouldInterruptExecutionIfAManagerIsClosed()
46+
{
47+
$manager1 = $this->createManagerMock(true);
48+
$manager2 = $this->createManagerMock(false);
49+
50+
$registry = $this->createRegistryMock([
51+
'manager1' => $manager1,
52+
'manager2' => $manager2,
53+
]);
54+
55+
$message = new PreConsume(
56+
$this->createMock(InteropContext::class),
57+
$this->createMock(SubscriptionConsumer::class),
58+
$this->createMock(LoggerInterface::class),
59+
1,
60+
2,
61+
3
62+
);
63+
$message->getLogger()
64+
->expects($this->once())
65+
->method('debug')
66+
->with('[DoctrineClosedEntityManagerExtension] Interrupt execution as entity manager "manager2" has been closed')
67+
;
68+
69+
self::assertFalse($message->isExecutionInterrupted());
70+
71+
$extension = new DoctrineClosedEntityManagerExtension($registry);
72+
$extension->onPreConsume($message);
73+
74+
self::assertTrue($message->isExecutionInterrupted());
75+
}
76+
77+
public function testOnPostConsumeShouldNotInterruptExecution()
78+
{
79+
$manager = $this->createManagerMock(true);
80+
81+
$registry = $this->createRegistryMock([
82+
'manager' => $manager,
83+
]);
84+
85+
$message = new PostConsume(
86+
$this->createMock(InteropContext::class),
87+
$this->createMock(SubscriptionConsumer::class),
88+
1,
89+
1,
90+
1,
91+
$this->createMock(LoggerInterface::class)
92+
);
93+
94+
self::assertFalse($message->isExecutionInterrupted());
95+
96+
$extension = new DoctrineClosedEntityManagerExtension($registry);
97+
$extension->onPostConsume($message);
98+
99+
self::assertFalse($message->isExecutionInterrupted());
100+
}
101+
102+
public function testOnPostConsumeShouldInterruptExecutionIfAManagerIsClosed()
103+
{
104+
$manager1 = $this->createManagerMock(true);
105+
$manager2 = $this->createManagerMock(false);
106+
107+
$registry = $this->createRegistryMock([
108+
'manager1' => $manager1,
109+
'manager2' => $manager2,
110+
]);
111+
112+
$message = new PostConsume(
113+
$this->createMock(InteropContext::class),
114+
$this->createMock(SubscriptionConsumer::class),
115+
1,
116+
1,
117+
1,
118+
$this->createMock(LoggerInterface::class)
119+
);
120+
$message->getLogger()
121+
->expects($this->once())
122+
->method('debug')
123+
->with('[DoctrineClosedEntityManagerExtension] Interrupt execution as entity manager "manager2" has been closed')
124+
;
125+
126+
self::assertFalse($message->isExecutionInterrupted());
127+
128+
$extension = new DoctrineClosedEntityManagerExtension($registry);
129+
$extension->onPostConsume($message);
130+
131+
self::assertTrue($message->isExecutionInterrupted());
132+
}
133+
134+
public function testOnPostReceivedShouldNotInterruptExecution()
135+
{
136+
$manager = $this->createManagerMock(true);
137+
138+
$registry = $this->createRegistryMock([
139+
'manager' => $manager,
140+
]);
141+
142+
$message = new PostMessageReceived(
143+
$this->createMock(InteropContext::class),
144+
$this->createMock(Consumer::class),
145+
$this->createMock(Message::class),
146+
'aResult',
147+
1,
148+
$this->createMock(LoggerInterface::class)
149+
);
150+
151+
self::assertFalse($message->isExecutionInterrupted());
152+
153+
$extension = new DoctrineClosedEntityManagerExtension($registry);
154+
$extension->onPostMessageReceived($message);
155+
156+
self::assertFalse($message->isExecutionInterrupted());
157+
}
158+
159+
public function testOnPostReceivedShouldInterruptExecutionIfAManagerIsClosed()
160+
{
161+
$manager1 = $this->createManagerMock(true);
162+
$manager2 = $this->createManagerMock(false);
163+
164+
$registry = $this->createRegistryMock([
165+
'manager1' => $manager1,
166+
'manager2' => $manager2,
167+
]);
168+
169+
$message = new PostMessageReceived(
170+
$this->createMock(InteropContext::class),
171+
$this->createMock(Consumer::class),
172+
$this->createMock(Message::class),
173+
'aResult',
174+
1,
175+
$this->createMock(LoggerInterface::class)
176+
);
177+
$message->getLogger()
178+
->expects($this->once())
179+
->method('debug')
180+
->with('[DoctrineClosedEntityManagerExtension] Interrupt execution as entity manager "manager2" has been closed')
181+
;
182+
183+
self::assertFalse($message->isExecutionInterrupted());
184+
185+
$extension = new DoctrineClosedEntityManagerExtension($registry);
186+
$extension->onPostMessageReceived($message);
187+
188+
self::assertTrue($message->isExecutionInterrupted());
189+
}
190+
191+
/**
192+
* @return \PHPUnit_Framework_MockObject_MockObject|ManagerRegistry
193+
*/
194+
protected function createRegistryMock(array $managers): ManagerRegistry
195+
{
196+
$mock = $this->createMock(ManagerRegistry::class);
197+
198+
$mock
199+
->expects($this->once())
200+
->method('getManagers')
201+
->willReturn($managers)
202+
;
203+
204+
return $mock;
205+
}
206+
207+
/**
208+
* @return \PHPUnit_Framework_MockObject_MockObject|EntityManagerInterface
209+
*/
210+
protected function createManagerMock(bool $open): EntityManagerInterface
211+
{
212+
$mock = $this->createMock(EntityManagerInterface::class);
213+
214+
$mock
215+
->expects($this->once())
216+
->method('isOpen')
217+
->willReturn($open)
218+
;
219+
220+
return $mock;
221+
}
222+
}

0 commit comments

Comments
 (0)