Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use Ecotone\Messaging\Message;
use Ecotone\Messaging\MessageHeaders;
use Ecotone\Messaging\Support\MessageBuilder;
use Ecotone\Modelling\AggregateFlow\AggregateIdMetadata;

/**
* Class AggregateMessageConversionService
Expand Down Expand Up @@ -39,7 +40,7 @@ public function __construct(
public function process(Message $message): Message
{
/** @TODO Ecotone 2.0 (remove) this. For backward compatibility because it's ran again when message is consumed from Queue e*/
if ($message->getHeaders()->containsKey(AggregateMessage::AGGREGATE_ID)) {
if ($this->messageContainsCorrectAggregateId($message)) {
return $message;
}

Expand Down Expand Up @@ -109,4 +110,19 @@ public function process(Message $message): Message
->setHeader(AggregateMessage::AGGREGATE_ID, AggregateIdResolver::resolveArrayOfIdentifiers($this->aggregateClassName, $aggregateIdentifiers))
->build();
}

private function messageContainsCorrectAggregateId(Message $message): bool
{
if (!$message->getHeaders()->containsKey(AggregateMessage::AGGREGATE_ID)){
return false;
}

$aggregateIdentifiers = AggregateIdMetadata::createFrom($message->getHeaders()->get(AggregateMessage::AGGREGATE_ID))->getIdentifiers();

if ($this->metadataIdentifierMapping !== []) {
return array_keys($this->metadataIdentifierMapping) === array_keys($aggregateIdentifiers);
}

return array_keys($this->messageIdentifierMapping) === array_keys($aggregateIdentifiers);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?php

namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\AggregatesWithMetadataMapping;

use Ecotone\Messaging\Attribute\Parameter\Payload;
use Ecotone\Modelling\Attribute\CommandHandler;
use Ecotone\Modelling\Attribute\EventSourcingAggregate;
use Ecotone\Modelling\Attribute\EventSourcingHandler;
use Ecotone\Modelling\Attribute\Identifier;
use Ecotone\Modelling\WithAggregateVersioning;
use Ecotone\Modelling\WithEvents;
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\AddItemToBasket;
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\BasketCreated;
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\ItemReservation;
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\ItemWasAddedToBasket;

#[EventSourcingAggregate(withInternalEventRecorder: true)]
class Basket
{
use WithEvents;
use WithAggregateVersioning;

#[Identifier]
private string $basketId;

#[CommandHandler(outputChannelName: 'itemInventory.makeReservation')]
public function addItemToBasket(#[Payload] AddItemToBasket $command): ItemReservation
{
$this->recordThat(new ItemWasAddedToBasket($this->basketId, $command->itemId, $command->quantity));

return new ItemReservation($command->itemId, $command->quantity);
}

#[EventSourcingHandler]
public function applyBasketCreated(BasketCreated $event): void
{
$this->basketId = $event->basketId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php

namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\AggregatesWithMetadataMapping;

use Ecotone\Messaging\Attribute\Asynchronous;
use Ecotone\Messaging\Attribute\Parameter\Payload;
use Ecotone\Modelling\Attribute\CommandHandler;
use Ecotone\Modelling\Attribute\EventSourcingAggregate;
use Ecotone\Modelling\Attribute\EventSourcingHandler;
use Ecotone\Modelling\Attribute\Identifier;
use Ecotone\Modelling\WithAggregateVersioning;
use Ecotone\Modelling\WithEvents;
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\InventoryStockIncreased;
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\ItemInventoryCreated;
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\ItemReservation;
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\ItemReserved;

#[EventSourcingAggregate(withInternalEventRecorder: true)]
class ItemInventory
{
use WithEvents;
use WithAggregateVersioning;

#[Identifier]
private string $itemId;

private int $quantity = 0;

#[CommandHandler(routingKey: 'itemInventory.makeReservation', endpointId: 'itemInventory.makeReservation.endpoint', identifierMetadataMapping: ['itemId' => 'itemId'])]
#[Asynchronous('itemInventory')]
public function makeReservation(#[Payload] ItemReservation $itemReservation): void
{
$this->recordThat(new ItemReserved($this->itemId, $itemReservation->quantity));
}

#[EventSourcingHandler]
public function applyItemInventoryCreated(ItemInventoryCreated $event): void
{
$this->itemId = $event->itemId;
}

#[EventSourcingHandler]
public function applyInventoryStockIncreased(InventoryStockIncreased $event): void
{
$this->quantity += $event->quantity;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?php

namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\AggregatesWithoutMetadataMapping;

use Ecotone\Messaging\Attribute\Parameter\Payload;
use Ecotone\Modelling\Attribute\CommandHandler;
use Ecotone\Modelling\Attribute\EventSourcingAggregate;
use Ecotone\Modelling\Attribute\EventSourcingHandler;
use Ecotone\Modelling\Attribute\Identifier;
use Ecotone\Modelling\WithAggregateVersioning;
use Ecotone\Modelling\WithEvents;
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\AddItemToBasket;
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\BasketCreated;
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\ItemReservation;
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\ItemWasAddedToBasket;

#[EventSourcingAggregate(withInternalEventRecorder: true)]
class Basket
{
use WithEvents;
use WithAggregateVersioning;

#[Identifier]
private string $basketId;

#[CommandHandler(outputChannelName: 'itemInventory.makeReservation')]
public function addItemToBasket(#[Payload] AddItemToBasket $command): ItemReservation
{
$this->recordThat(new ItemWasAddedToBasket($this->basketId, $command->itemId, $command->quantity));

return new ItemReservation($command->itemId, $command->quantity);
}

#[EventSourcingHandler]
public function applyBasketCreated(BasketCreated $event): void
{
$this->basketId = $event->basketId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php

namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\AggregatesWithoutMetadataMapping;

use Ecotone\Messaging\Attribute\Asynchronous;
use Ecotone\Messaging\Attribute\Parameter\Payload;
use Ecotone\Modelling\Attribute\CommandHandler;
use Ecotone\Modelling\Attribute\EventSourcingAggregate;
use Ecotone\Modelling\Attribute\EventSourcingHandler;
use Ecotone\Modelling\Attribute\Identifier;
use Ecotone\Modelling\WithAggregateVersioning;
use Ecotone\Modelling\WithEvents;
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\InventoryStockIncreased;
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\ItemInventoryCreated;
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\ItemReservation;
use Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common\ItemReserved;

#[EventSourcingAggregate(withInternalEventRecorder: true)]
class ItemInventory
{
use WithEvents;
use WithAggregateVersioning;

#[Identifier]
private string $itemId;

private int $quantity = 0;

#[CommandHandler(routingKey: 'itemInventory.makeReservation', endpointId: 'itemInventory.makeReservation.endpoint')]
#[Asynchronous('itemInventory')]
public function makeReservation(#[Payload] ItemReservation $itemReservation): void
{
$this->recordThat(new ItemReserved($this->itemId, $itemReservation->quantity));
}

#[EventSourcingHandler]
public function applyItemInventoryCreated(ItemInventoryCreated $event): void
{
$this->itemId = $event->itemId;
}

#[EventSourcingHandler]
public function applyInventoryStockIncreased(InventoryStockIncreased $event): void
{
$this->quantity += $event->quantity;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common;

class AddItemToBasket
{
public function __construct(public string $basketId, public string $itemId, public int $quantity)
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common;

use Ecotone\Modelling\Attribute\NamedEvent;

#[NamedEvent('BasketCreated')]
class BasketCreated
{
public function __construct(public string $basketId)
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
<?php

namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common;

use Ecotone\Messaging\Attribute\Converter;
use Ecotone\Messaging\Attribute\Interceptor\Presend;

class Converters
{
#[Converter]
public function fromBasketCreated(BasketCreated $message): array
{
return ['basketId' => $message->basketId];
}

#[Converter]
public function toBasketCreated(array $payload): BasketCreated
{
return new BasketCreated(basketId: $payload['basketId']);
}

#[Converter]
public function fromItemInventoryCreated(ItemInventoryCreated $message): array
{
return ['itemId' => $message->itemId];
}

#[Converter]
public function toItemInventoryCreated(array $payload): ItemInventoryCreated
{
return new ItemInventoryCreated(itemId: $payload['itemId']);
}

#[Converter]
public function fromInventoryStockIncreased(InventoryStockIncreased $message): array
{
return ['itemId' => $message->itemId, 'quantity' => $message->quantity];
}

#[Converter]
public function toInventoryStockIncreased(array $payload): InventoryStockIncreased
{
return new InventoryStockIncreased(itemId: $payload['itemId'], quantity: $payload['quantity']);
}

#[Converter]
public function fromItemReserved(ItemReserved $message): array
{
return ['itemId' => $message->itemId, 'quantity' => $message->quantity];
}

#[Converter]
public function toItemReserved(array $payload): ItemReserved
{
return new ItemReserved(itemId: $payload['itemId'], quantity: $payload['quantity']);
}

#[Converter]
public function fromItemWasAddedToBasket(ItemWasAddedToBasket $message): array
{
return ['basketId' => $message->basketId, 'itemId' => $message->itemId, 'quantity' => $message->quantity];
}

#[Converter]
public function toItemWasAddedToBasket(array $payload): ItemWasAddedToBasket
{
return new ItemWasAddedToBasket(
basketId: $payload['basketId'],
itemId: $payload['itemId'],
quantity: $payload['quantity']
);
}

#[Converter]
public function fromItemReservation(ItemReservation $message): array
{
return ['itemId' => $message->itemId, 'quantity' => $message->quantity];
}

#[Converter]
public function toItemReservation(array $payload): ItemReservation
{
return new ItemReservation(
itemId: $payload['itemId'],
quantity: $payload['quantity']
);
}

#[Presend(pointcut: 'Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\ItemInventory::makeReservation', changeHeaders: true)]
public function beforeMakeReservation(ItemReservation $message): array
{
return ['itemId' => $message->itemId];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common;

use Ecotone\Modelling\Attribute\NamedEvent;

#[NamedEvent('InventoryStockIncreased')]
class InventoryStockIncreased
{
public function __construct(public string $itemId, public int $quantity)
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common;

use Ecotone\Modelling\Attribute\NamedEvent;

#[NamedEvent('ItemInventoryCreated')]
class ItemInventoryCreated
{
public function __construct(public string $itemId)
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common;

class ItemReservation
{
public function __construct(public string $itemId, public int $quantity)
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common;

use Ecotone\Modelling\Attribute\NamedEvent;

#[NamedEvent('ItemReserved')]
class ItemReserved
{
public function __construct(public string $itemId, public int $quantity)
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

namespace Test\Ecotone\EventSourcing\Fixture\StatefulEventSourcedWorkflowWithMultipleAggregates\Common;

use Ecotone\Modelling\Attribute\NamedEvent;

#[NamedEvent('ItemWasAddedToBasket')]
class ItemWasAddedToBasket
{
public function __construct(public string $basketId, public string $itemId, public int $quantity)
{
}
}
Loading
Loading