Skip to content

Commit 4e1a21c

Browse files
committed
stateful event sourced workflow with multiple aggregates - async
1 parent 4a2596c commit 4e1a21c

File tree

15 files changed

+224
-44
lines changed

15 files changed

+224
-44
lines changed

packages/Ecotone/src/Modelling/AggregateIdentifierRetrevingService.php

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
use Ecotone\Messaging\Message;
1313
use Ecotone\Messaging\MessageHeaders;
1414
use Ecotone\Messaging\Support\MessageBuilder;
15+
use Ecotone\Modelling\AggregateFlow\AggregateIdMetadata;
1516

1617
/**
1718
* Class AggregateMessageConversionService
@@ -39,7 +40,7 @@ public function __construct(
3940
public function process(Message $message): Message
4041
{
4142
/** @TODO Ecotone 2.0 (remove) this. For backward compatibility because it's ran again when message is consumed from Queue e*/
42-
if ($message->getHeaders()->containsKey(AggregateMessage::AGGREGATE_ID)) {
43+
if ($this->messageContainsCorrectAggregateId($message)) {
4344
return $message;
4445
}
4546

@@ -109,4 +110,19 @@ public function process(Message $message): Message
109110
->setHeader(AggregateMessage::AGGREGATE_ID, AggregateIdResolver::resolveArrayOfIdentifiers($this->aggregateClassName, $aggregateIdentifiers))
110111
->build();
111112
}
113+
114+
private function messageContainsCorrectAggregateId(Message $message): bool
115+
{
116+
if (!$message->getHeaders()->containsKey(AggregateMessage::AGGREGATE_ID)){
117+
return false;
118+
}
119+
120+
$aggregateIdentifiers = AggregateIdMetadata::createFrom($message->getHeaders()->get(AggregateMessage::AGGREGATE_ID))->getIdentifiers();
121+
122+
if ($this->metadataIdentifierMapping !== []) {
123+
return array_keys($this->metadataIdentifierMapping) === array_keys($aggregateIdentifiers);
124+
}
125+
126+
return array_keys($this->messageIdentifierMapping) === array_keys($aggregateIdentifiers);
127+
}
112128
}

packages/PdoEventSourcing/tests/Fixture/StatefulEventSourcedWorkflowWithMultipleAggregates/Basket.php renamed to packages/PdoEventSourcing/tests/Fixture/StatefulEventSourcedWorkflowWithMultipleAggregates/AggregatesWithMetadataMapping/Basket.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?php
22

3-
namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates;
3+
namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\AggregatesWithMetadataMapping;
44

55
use Ecotone\Messaging\Attribute\Parameter\Payload;
66
use Ecotone\Modelling\Attribute\CommandHandler;
@@ -9,6 +9,10 @@
99
use Ecotone\Modelling\Attribute\Identifier;
1010
use Ecotone\Modelling\WithAggregateVersioning;
1111
use Ecotone\Modelling\WithEvents;
12+
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\AddItemToBasket;
13+
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\BasketCreated;
14+
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\ItemReservation;
15+
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\ItemWasAddedToBasket;
1216

1317
#[EventSourcingAggregate(withInternalEventRecorder: true)]
1418
class Basket
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
<?php
2+
3+
namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\AggregatesWithMetadataMapping;
4+
5+
use Ecotone\Messaging\Attribute\Asynchronous;
6+
use Ecotone\Messaging\Attribute\Parameter\Payload;
7+
use Ecotone\Modelling\Attribute\CommandHandler;
8+
use Ecotone\Modelling\Attribute\EventSourcingAggregate;
9+
use Ecotone\Modelling\Attribute\EventSourcingHandler;
10+
use Ecotone\Modelling\Attribute\Identifier;
11+
use Ecotone\Modelling\WithAggregateVersioning;
12+
use Ecotone\Modelling\WithEvents;
13+
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\InventoryStockIncreased;
14+
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\ItemInventoryCreated;
15+
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\ItemReservation;
16+
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\ItemReserved;
17+
18+
#[EventSourcingAggregate(withInternalEventRecorder: true)]
19+
class ItemInventory
20+
{
21+
use WithEvents;
22+
use WithAggregateVersioning;
23+
24+
#[Identifier]
25+
private string $itemId;
26+
27+
private int $quantity = 0;
28+
29+
#[CommandHandler(routingKey: 'itemInventory.makeReservation', endpointId: 'itemInventory.makeReservation.endpoint', identifierMetadataMapping: ['itemId' => 'itemId'])]
30+
#[Asynchronous('itemInventory')]
31+
public function makeReservation(#[Payload] ItemReservation $itemReservation): void
32+
{
33+
$this->recordThat(new ItemReserved($this->itemId, $itemReservation->quantity));
34+
}
35+
36+
#[EventSourcingHandler]
37+
public function applyItemInventoryCreated(ItemInventoryCreated $event): void
38+
{
39+
$this->itemId = $event->itemId;
40+
}
41+
42+
#[EventSourcingHandler]
43+
public function applyInventoryStockIncreased(InventoryStockIncreased $event): void
44+
{
45+
$this->quantity += $event->quantity;
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
3+
namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\AggregatesWithoutMetadataMapping;
4+
5+
use Ecotone\Messaging\Attribute\Parameter\Payload;
6+
use Ecotone\Modelling\Attribute\CommandHandler;
7+
use Ecotone\Modelling\Attribute\EventSourcingAggregate;
8+
use Ecotone\Modelling\Attribute\EventSourcingHandler;
9+
use Ecotone\Modelling\Attribute\Identifier;
10+
use Ecotone\Modelling\WithAggregateVersioning;
11+
use Ecotone\Modelling\WithEvents;
12+
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\AddItemToBasket;
13+
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\BasketCreated;
14+
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\ItemReservation;
15+
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\ItemWasAddedToBasket;
16+
17+
#[EventSourcingAggregate(withInternalEventRecorder: true)]
18+
class Basket
19+
{
20+
use WithEvents;
21+
use WithAggregateVersioning;
22+
23+
#[Identifier]
24+
private string $basketId;
25+
26+
#[CommandHandler(outputChannelName: 'itemInventory.makeReservation')]
27+
public function addItemToBasket(#[Payload] AddItemToBasket $command): ItemReservation
28+
{
29+
$this->recordThat(new ItemWasAddedToBasket($this->basketId, $command->itemId, $command->quantity));
30+
31+
return new ItemReservation($command->itemId, $command->quantity);
32+
}
33+
34+
#[EventSourcingHandler]
35+
public function applyBasketCreated(BasketCreated $event): void
36+
{
37+
$this->basketId = $event->basketId;
38+
}
39+
}

packages/PdoEventSourcing/tests/Fixture/StatefulEventSourcedWorkflowWithMultipleAggregates/ItemInventory.php renamed to packages/PdoEventSourcing/tests/Fixture/StatefulEventSourcedWorkflowWithMultipleAggregates/AggregatesWithoutMetadataMapping/ItemInventory.php

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
11
<?php
22

3-
namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates;
3+
namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\AggregatesWithoutMetadataMapping;
44

5+
use Ecotone\Messaging\Attribute\Asynchronous;
56
use Ecotone\Messaging\Attribute\Parameter\Payload;
67
use Ecotone\Modelling\Attribute\CommandHandler;
78
use Ecotone\Modelling\Attribute\EventSourcingAggregate;
89
use Ecotone\Modelling\Attribute\EventSourcingHandler;
910
use Ecotone\Modelling\Attribute\Identifier;
1011
use Ecotone\Modelling\WithAggregateVersioning;
1112
use Ecotone\Modelling\WithEvents;
13+
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\InventoryStockIncreased;
14+
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\ItemInventoryCreated;
15+
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\ItemReservation;
16+
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\ItemReserved;
1217

1318
#[EventSourcingAggregate(withInternalEventRecorder: true)]
1419
class ItemInventory
@@ -21,7 +26,8 @@ class ItemInventory
2126

2227
private int $quantity = 0;
2328

24-
#[CommandHandler(routingKey: 'itemInventory.makeReservation')]
29+
#[CommandHandler(routingKey: 'itemInventory.makeReservation', endpointId: 'itemInventory.makeReservation.endpoint')]
30+
#[Asynchronous('itemInventory')]
2531
public function makeReservation(#[Payload] ItemReservation $itemReservation): void
2632
{
2733
$this->recordThat(new ItemReserved($this->itemId, $itemReservation->quantity));
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?php
22

3-
namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates;
3+
namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common;
44

55
class AddItemToBasket
66
{
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?php
22

3-
namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates;
3+
namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common;
44

55
use Ecotone\Modelling\Attribute\NamedEvent;
66

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
<?php
22

3-
namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates;
3+
namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common;
44

55
use Ecotone\Messaging\Attribute\Converter;
6+
use Ecotone\Messaging\Attribute\Interceptor\Presend;
67

78
class Converters
89
{
@@ -84,4 +85,10 @@ public function toItemReservation(array $payload): ItemReservation
8485
quantity: $payload['quantity']
8586
);
8687
}
88+
89+
#[Presend(pointcut: 'Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\ItemInventory::makeReservation', changeHeaders: true)]
90+
public function beforeMakeReservation(ItemReservation $message): array
91+
{
92+
return ['itemId' => $message->itemId];
93+
}
8794
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?php
22

3-
namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates;
3+
namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common;
44

55
use Ecotone\Modelling\Attribute\NamedEvent;
66

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?php
22

3-
namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates;
3+
namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common;
44

55
use Ecotone\Modelling\Attribute\NamedEvent;
66

0 commit comments

Comments
 (0)