Skip to content

Conversation

@unixslayer
Copy link
Member

  • I have read and agree to the contribution terms outlined in CONTRIBUTING.

@unixslayer
Copy link
Member Author

unixslayer commented Jun 8, 2025

@dgafka this PR is a follow-up to #481

Scenario here is to build synchronous, stateful workflow with multiple aggregates. Apparently there is an issue with mapping aggregate id when workflow jumps from one aggregate to another as identifier of initial aggregate remains in use. The same happens, when itemInventory.makeReservation is asynchronous as in second commit here.

This issue was already solved with #474 although, flow is based on event handler in that one.

Anyway, I wonder if this would really be a use case. As models from example can be modified independently from each other. Therefore, so they will probably be in their own BCs so connecting them in synchronous workflow would lead to unexpected behavior. I'd rather don't connect them in the same workflow as we most likely have to deal with eventual consistency here.

cc: @jlabedo @lifinsky

@dgafka
Copy link
Member

dgafka commented Jun 9, 2025

I do think valid use case to connect multiple aggregates within synchronous workflow.
Someone may for example start the workflow as async process, and then do several steps synchronously which make use of two or more Aggregates.

In general this can be used for adding extra informations to the Message, to make the decision in last step for example.
So we could have Credit Card application process, that goes through User Aggregate, then to CardApproval Aggregate e.g.

In general Command or Event Handler can be joined as part of the workflow, however the intention behind them is rather higher level execution, which may begin the flow. So the cavecat here is that we miss one piece to make it actually fully useable, it's making ability to put #[InternalHandler] on the Aggregate and Saga.

This way we wouldn't be really exposing an Aggregate method to the public (via Bus), but it would be kept as part of the Workflow.
InternalHandler then should also have the option to modify header instead of payload.
This way, it becomes super easy to pass the flow through Aggregate, in order to enrich it with some details needed later, e.g. get email address and pass it to email sender.


Anyways I do think combing things together via input/output channels is something not really explored much in the area of PHP. Even that Ecotone had under the hood from the beginning, on the surface userland level - it's still relativealy new, as there was no tooling like that before.
There were companies that had been building like this, but not really in PHP directly. Those were companies that paid huge amount of money to achieve it using external tooling like MuleSoft.

That being said, I do think we will find more and more use cases for building workflows like this and combining Aggregates as part of the process. I just matter of exploration and easy to use higher level API to achieve that (e.g. missing InternalHandler on Aggregate).

@lifinsky
Copy link
Contributor

lifinsky commented Jun 9, 2025

Hi! I’d like to suggest a feature that would greatly improve developer experience and workflow flexibility.

Currently, the outputChannelName option for command or event handlers only supports returning a single message. However, there are many real-world cases where a handler naturally results in multiple follow-up commands or events. For example:

  • Broadcasting a webhook to multiple applications of the same merchant (each with its own webhook URL)
  • Splitting one logical command into multiple async commands

In these scenarios, being able to return an array or iterable from a handler — and having Ecotone automatically route each item to the specified OutputChannel — would simplify the architecture and eliminate boilerplate.

@unixslayer
Copy link
Member Author

@dgafka looking at the previous implementation I think that building stateful workflow over ES aggregate was never fully supported. We are one step closer now to cover more complex scenarios with reliable solution. I think that Ecotone would need to track connections in workflow which goes through multiple different aggregates/sagas as those will require additional information added to the message as different aggregate is being loaded/called.

I'm a little bit concerned exposing each step of a workflow right now however, knowing this limitation and comparing it with the benefits of splitting complex business process I've decided to go with it for now. Using #[InternalHandler] on an aggregate was my first way to go :)

@lifinsky some time ago as I was working on aggregate flow I came up with an option for aggregate to return described result object that would be handled by Ecotone with decision, e.g.

class Aggregate
{
    // ...

    #[CommandHandler]
    public function doStuff(): DoStuffResult
    {
        // ....

        return new DoStuffResult(...);
    }
}

#[Result]
class DoStuffResult
{
    public function __construct(
        #[RecordedEvents] public array $events,
        #[ResultedAggregate] public AnotherAggregate $resultedAggregateA,
        #[ResultedAggregate] public AnotherAggregate $resultedAggregateB,
        #[FollowUpCommand] public DoSomethingElse $followUpCommandA,
        #[FollowUpCommand] public DoSomethingElse $followUpCommandB,
    ) {}
}

WDYT?

@unixslayer
Copy link
Member Author

unixslayer commented Jun 9, 2025

Although, that would make outputChannelName obsolete as its design lies in passing single message within a workflow. With #[Result] approach, Ecotone may handle each element directly.

@lifinsky
Copy link
Contributor

I like the idea of ​​returning an iterator as output for an input channel better.

@unixslayer unixslayer force-pushed the stateful_event_sourced_workflow_with_multiple_aggregates branch from 4c16f18 to 3b7824a Compare October 13, 2025 19:36
@unixslayer
Copy link
Member Author

@dgafka I resolved stateful event sourced workflow with multiple aggregates issue by checking if message that is being handled in a workflow contains proper identifier header - based on metadata identifier mapping (if provided) or default message identifier mapping. If unexpected identifier is provided, Ecotone will try to resolve it again.

cc: @jlabedo @lifinsky

@unixslayer unixslayer requested review from dgafka and jlabedo October 13, 2025 19:49
return array_keys($this->metadataIdentifierMapping) === array_keys($aggregateIdentifiers);
}

return array_keys($this->messageIdentifierMapping) === array_keys($aggregateIdentifiers);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this work only if identifier name are different?

Can we change the Basket and ItemInvetory identifiers to be named the same way e.g. "id", to ensure that this case is also covered?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll write additional tests cases

Copy link
Member Author

@unixslayer unixslayer Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dgafka when aggregates have the same identifier name there is no difference in behavior as the same value will be used for calling second aggregate.

But as I was writing the test case for that I noticed that aggregates and messages started to look weird. I wonder how common would that be that two aggregates, which "talk to each other" have the same identifier name.

I've separated test cases into one with identifier metadata mapping and one without. IMO proposed solution is a step forward as it don't change current behavior and with proper setup let us deliver stateful workflow with multiple aggregates.

If you agree, I'll prepare documentation for this feature.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oki, make sense :)

@unixslayer unixslayer force-pushed the stateful_event_sourced_workflow_with_multiple_aggregates branch 2 times, most recently from bf08a91 to cdbe8f3 Compare October 14, 2025 14:33
@unixslayer unixslayer requested a review from dgafka October 14, 2025 14:39
@unixslayer unixslayer force-pushed the stateful_event_sourced_workflow_with_multiple_aggregates branch from cdbe8f3 to 4e1a21c Compare October 14, 2025 15:09
@dgafka dgafka merged commit ea46897 into ecotoneframework:main Oct 16, 2025
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants