Skip to content

Conversation

SteveStevenpoor
Copy link
Contributor

What is the purpose of the change

The newly introduced MultiJoin operator currently does not support right joins. Since all right joins are converted to left joins, we need to enable the FlinkRightToLeftJoinRule in the MultiJoin rule set and handle left joins using projections.

Brief change log

  • Introduced FlinkOrderPreservingProjection, a wrapper around the original projection. This class is used to define projections created by FlinkRightToLeftJoinRule

  • Patched JoinToMultiJoinRule to support FlinkOrderPreservingProjection (i. e. right joins).

  • Other minor adjustments related to JoinToMultiJoinRule.

Verifying this change

  • Added the following test cases to MultiJoinTest:
    • testThreeWayRightInnerJoin
    • testThreeWayRightRightJoin
    • testThreeWayInnerRightJoin

TODO: Semantic and restore tests will be added once StreamExecMultiJoin is fixed to support right joins.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@flinkbot
Copy link
Collaborator

flinkbot commented Jul 18, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@gustavodemorais
Copy link
Contributor

@SteveStevenpoor can you provide a quick description of the tests/changes that are still pending?

@@ -278,37 +278,6 @@ Calc(select=[user_id_0, name, order_id, age])
+- Calc(select=[user_id, age])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could only add the new tests but not modify the orders in the xml so we see that the changes do not break anything in existing tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could only add the new tests but not modify the orders in the xml so we see that the changes do not break anything in existing tests?

The file is automatically generated by mvn test

Copy link
Contributor

@gustavodemorais gustavodemorais left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could have a more robust solution that doesn't rely on RULE_SEQUENCE. Some ideas:

  1. Why can't we just execute FlinkRightJoinToLeftJoinRule in a dedicated optimization phase (e.g., a FlinkHepProgram) that runs before the main logical optimization. The output of this phase is always plan that only contains INNER and LEFT joins. All subsequent optimization phases, including the one with JoinToMultiJoinRule, can then be written with the assumption that they will never encounter a RIGHT JOIN.
  2. Adjust FlinkRightJoinToLeftJoin to be FlinkRightJoinToLeftJoinInMultiJoin. That means, we could perform the swap inside the MultiJoin node and not before it's converted.

If you think your approach is better than those two, could you add a bit more of comments + example in the javadocs for FlinkOrderPreservingProjection, JoinToMultiJoinRule so it's a bit easier to follow the dependency between all three rules?

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Jul 31, 2025
@SteveStevenpoor
Copy link
Contributor Author

Let's start from the example:

SELECT u.user_id_0, u.name, o.order_id, p.payment_id, s.location
FROM Users u
RIGHT JOIN Orders o ON u.user_id_0 = o.user_id_1
RIGHT JOIN Payments p ON u.user_id_0 = p.user_id_2
INNER JOIN Shipments s ON p.user_id_2 = s.user_id_3

So the original ast will look like:
image

After right join to left join convertion:
image

In this MR I get rid of projections when converting to multi join (No INNER or LEFT joins but may be projections). So It will look like this:
image

So what's the problem with tests?
StreamExecMultiJoin together with StreamingMultiJoinOperator highly relies on the fact that the original ast will have joins only on the left side. However, in the example above MultiJoin(P, O, U, S) need to firstly merge O and U, then P and JoinedRow from O and U, and then JoinedRow from P, O, U with S. But now it merges P and O first like we have different query structure.

Why can't we construct MJ(O, U, P, S)?
Because P must be on the right side of O, U to consider correct join types.

What I try to do:
After adding support for opt rel plan construction for right joins I've added semantic tests. Then I fixed StreamExecMultiJoin to generate proper join conditions without relying on the specific AST structure. Now I'm working on adapting StreamingMultiJoinOperator to work with described cases. I'm going to construct something like this:
Before:
When record arrives we start to construct resulting record from leftmost table
With example above: MJ(P, O, U, S) ---> when record from User arrives we merge P and O, PO and U, POU and S leading to incorrect result.

After:
When record arrives we start to construct resulting record from leftmost table from proper level:
Example: MJ(P, O, U, S)
image
This way we merge O and U, then P and OU, and then POU with S. Also we can save original join type for the level.

Also it does not matter if we change right joins to left ones before JoinToMultiJoinRule or after. We will need to add projection on top of multi join anyway and handle right-side joins in StreamingMultiJoinOperator. Since we already have FlinkRightToLeftJoinRule I think it's ok to assume it will be applied before JoinToMultiJoinRule.

@gustavodemorais let me know your thoughts on it, my friend.

@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Aug 1, 2025
@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Aug 9, 2025
@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Aug 25, 2025
@SteveStevenpoor
Copy link
Contributor Author

SteveStevenpoor commented Aug 26, 2025

TODO:

  • Change java docs
  • A lot of refactoring
  • Add restore tests
  • Resolve merge conflicts
  • Benchmark versus binary joins
  • Benchmark versus unflattened right joins

@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Aug 27, 2025
@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Aug 28, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-reviewed PR has been reviewed by the community.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants