[FLINK-39014][table] Fix the conversion to relational algebra issue in Batch Mode#28500
[FLINK-39014][table] Fix the conversion to relational algebra issue in Batch Mode#28500argoyal2212 wants to merge 1 commit into
Conversation
|
Hey @argoyal2212 |
spuru9
left a comment
There was a problem hiding this comment.
Have some comments.
Also, The JIRA's "Fix Versions" (1.16.3 / 1.20.3) and "Components" (Deployment/K8s/YARN) look wrong.
Can you fix those as well.
| executionEnvironment, | ||
| EnvironmentSettings.newInstance() | ||
| .withConfiguration( | ||
| (org.apache.flink.configuration.Configuration) |
There was a problem hiding this comment.
executionEnvironment.getConfiguration() returns ReadableConfig, not Configuration, and the source of that method explicitly warns against this cast:
// Note to implementers:
// In theory, you can cast the return value of this method to Configuration and perform
// mutations. In practice, this could cause side effects. A better approach is to implement
// the ReadableConfig interface and create a layered configuration.
Could be something like:
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment) {
final EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings.newInstance();
if (executionEnvironment.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.BATCH) {
settingsBuilder.inBatchMode();
}
return create(executionEnvironment, settingsBuilder.build());
}
Can you check this.
| */ | ||
| static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment) { | ||
| return create(executionEnvironment, EnvironmentSettings.newInstance().build()); | ||
| return create( |
There was a problem hiding this comment.
The Scala bridge has the identical bug — create(executionEnvironment, EnvironmentSettings.newInstance().build) ignores the env's runtime mode. A user hitting FLINK-39014 from Scala won't be fixed by this PR. Worth fixing in this PR (or at least noting in the JIRA as a follow-up) so the two bridges stay consistent.
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.flink.table.examples.java.basics; |
There was a problem hiding this comment.
This looks misplaced. flink-examples-table/.../basics tests cover specific example programs — there's no WatermarkExample, and this test is really exercising StreamTableEnvironment.create. It most probably belongs next to the code it covers (e.g. an ITCase under flink-table-api-java-bridge).
| configuration.setString( | ||
| ExecutionOptions.RUNTIME_MODE.key(), RuntimeExecutionMode.BATCH.toString()); |
There was a problem hiding this comment.
nit: Use the typed config setter instead of setString(opt.key(), enum.toString())
| configuration.setString( | |
| ExecutionOptions.RUNTIME_MODE.key(), RuntimeExecutionMode.BATCH.toString()); | |
| configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH); |
Similarly for other similar statements.
What is the purpose of the change
FLINK-39014
When creating a
StreamTableEnvironmentusingStreamTableEnvironment.create(env)where theStreamExecutionEnvironmentis configured withRUNTIME_MODE=BATCH, the planner was incorrectly created inSTREAMINGmode, causing a mismatch between execution mode and planner mode.Brief change log
The fix ensures that
EnvironmentSettingsinherits the configuration fromStreamExecutionEnvironmentby calling.withConfiguration(executionEnvironment.getConfiguration()) when creating default environment settings. This maintains consistency between the execution mode and planner mode throughout the system.create(StreamExecutionEnvironment) to pass the execution environment's configuration to EnvironmentSettings, ensuring the runtime mode is properly inheritedVerifying this change
This change is covered by new integration tests in
WatermarkExampleITCase.Does this pull request potentially affect one of the following parts:
@Public(Evolving): (no)Documentation