[FLINK-38624][table] Type Mismatch Exception in StreamPhysicalOverAggregateRule#27616
[FLINK-38624][table] Type Mismatch Exception in StreamPhysicalOverAggregateRule#27616snuyanzin wants to merge 4 commits intoapache:masterfrom
Conversation
|
@lincoln-lil fyi , since you participated in FLINK-27519 |
7207dbc to
8906ded
Compare
.../apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalOverAggregateRule.scala
Outdated
Show resolved
Hide resolved
| val windowInputTypes = | ||
| logicWindow.getInput.getRowType.getFieldList.asScala.map(f => f.getType).toList.asJava | ||
|
|
||
| // Type might be different because of RelTimeIndicatorConverter#materializeTimeIndicators |
There was a problem hiding this comment.
Can we fix in the RelTimeIndicatorConverter instead? The output of the materializeTimeIndicators should be consistent and correct for all RexNode but also for RelNode data types.
There was a problem hiding this comment.
Finally fixed in RelTimeIndicatorConverter
Also seems fixed another similar issue when window function applied to watermark column (in this case it is actual for 1.20 as well), covered with tests
4003c16 to
0c1274f
Compare
…ThrownBy` in OverAggregateTest.scala
0c1274f to
1d87003
Compare
| * Sub-class of {@link Window} that is a relational expression which represents a set of over window | ||
| * aggregates in Flink. | ||
| */ | ||
| public class FlinkLogicalOverAggregate extends Window implements FlinkLogicalRel { |
There was a problem hiding this comment.
Converted to java since this class was touched
| assertThatExceptionOfType(classOf[RuntimeException]) | ||
| .isThrownBy(() => util.verifyExecPlan(sql)) | ||
| .havingRootCause() | ||
| .withMessage("CHARACTER type is not allowed for window boundary") | ||
| .isExactlyInstanceOf(classOf[ValidationException]) | ||
| } |
There was a problem hiding this comment.
It looks like the previous approach didn't check the last statement
.hasRootCauseInstanceOf(classOf[ValidationException])it was just ignored...
There was a problem hiding this comment.
I noticed this weird behavior only for tests in scala
in java everything is ok
| return context.getRelBuilder().getRexBuilder().makeAbstractCast(targetRelDataType, child); | ||
| return context.getRelBuilder() | ||
| .getRexBuilder() | ||
| .makeAbstractCast(targetRelDataType, child, false); |
There was a problem hiding this comment.
now this method with 2 args is deprecated
false means do not use SAFE_CAST (similar to Flink's TRY_CAST)
| } | ||
|
|
||
| @Test | ||
| def testTemporalJoinWithWatermarksWithMaterializedTimeArg(): Unit = { |
There was a problem hiding this comment.
Last 2 tests are. not regression,
Seems they just didn't work for long time even for 1.20
There was a problem hiding this comment.
thanks for heavily looking into this!
…regateRule This closes apache#27616.
…regateRule This closes apache#27616.
…regateRule This closes apache#27616.
…regateRule This closes apache#27616.
…regateRule This closes apache#27616.
What is the purpose of the change
The PR is going to fix type mismatch issue in case of join 2 streams (each with watermark)
In that case one for of them rel time indicator will be materialized and this was not taken into account while fixing of FLINK-27519, before that it was working, so it is a regression starting 2.0 (or 2.0-preview)
case to reproduce: see test
Brief change log
StreamPhysicalOverAggregateRule
Verifying this change
Test added
Does this pull request potentially affect one of the following parts:
@Public(Evolving): ( no)Documentation