Skip to content

Commit 0850c77

Browse files
committed
[FLINK-38510][table-planner] Remove targetColumns from the digest generation of SinkReuser
1 parent f1d0ab6 commit 0850c77

File tree

4 files changed

+76
-20
lines changed

4 files changed

+76
-20
lines changed

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -177,16 +177,6 @@ private String getDigest(Sink sink) {
177177
List<String> digest = new ArrayList<>();
178178
digest.add(sink.contextResolvedTable().getIdentifier().asSummaryString());
179179

180-
int[][] targetColumns = sink.targetColumns();
181-
if (targetColumns != null && targetColumns.length > 0) {
182-
digest.add(
183-
"targetColumns=["
184-
+ Arrays.stream(targetColumns)
185-
.map(Arrays::toString)
186-
.collect(Collectors.joining(","))
187-
+ "]");
188-
}
189-
190180
String fieldTypes =
191181
sink.getRowType().getFieldList().stream()
192182
.map(f -> f.getType().toString())

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/SinkReuseTestBase.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.flink.table.api.StatementSet;
2222
import org.apache.flink.table.api.TableConfig;
23-
import org.apache.flink.table.api.config.OptimizerConfigOptions;
2423
import org.apache.flink.table.planner.plan.reuse.SinkReuser;
2524
import org.apache.flink.table.planner.utils.TableTestBase;
2625
import org.apache.flink.table.planner.utils.TableTestUtil;
@@ -35,8 +34,6 @@ public abstract class SinkReuseTestBase extends TableTestBase {
3534
@BeforeEach
3635
protected void setup() {
3736
TableConfig tableConfig = TableConfig.getDefault();
38-
tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, true);
39-
tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SINK_ENABLED, true);
4037
util = getTableTestUtil(tableConfig);
4138

4239
util.tableEnv()
@@ -153,14 +150,25 @@ public void testSinkReuseFromSameSource() {
153150
}
154151

155152
@Test
156-
public void testSinkReuseWithPartialColumns() {
153+
public void testSinkReuseWithPartialColumnsNotSupportsTargetColumnWriting() {
157154
StatementSet statementSet = util.tableEnv().createStatementSet();
155+
/// sink1 has not implemented the {@link SupportsTargetColumnWriting} sink ability
158156
statementSet.addInsertSql("INSERT INTO sink1(`x`) (SELECT x FROM source1)");
159157
statementSet.addInsertSql("INSERT INTO sink1(`y`) (SELECT y FROM source1)");
160158
statementSet.addInsertSql("INSERT INTO sink1(`x`) (SELECT x FROM source3)");
161159
util.verifyExecPlan(statementSet);
162160
}
163161

162+
@Test
163+
public void testSinkReuseWithPartialColumnsAndSupportsTargetColumnWriting() {
164+
StatementSet statementSet = util.tableEnv().createStatementSet();
165+
/// sink2 has implemented the {@link SupportsTargetColumnWriting} sink ability
166+
statementSet.addInsertSql("INSERT INTO sink2(`x`) (SELECT x FROM source1)");
167+
statementSet.addInsertSql("INSERT INTO sink2(`y`) (SELECT y FROM source1)");
168+
statementSet.addInsertSql("INSERT INTO sink2(`x`) (SELECT x FROM source3)");
169+
util.verifyExecPlan(statementSet);
170+
}
171+
164172
@Test
165173
public void testSinkReuseWithOverwrite() {
166174
StatementSet statementSet = util.tableEnv().createStatementSet();

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/BatchSinkReuseTest.xml

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ Sink(table=[default_catalog.default_database.sink1], fields=[x, y], hints=[[[OPT
148148
]]>
149149
</Resource>
150150
</TestCase>
151-
<TestCase name="testSinkReuseWithPartialColumns">
151+
<TestCase name="testSinkReuseWithPartialColumnsNotSupportsTargetColumnWriting">
152152
<Resource name="ast">
153153
<![CDATA[
154154
LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[0]], fields=[x, EXPR$1])
@@ -160,6 +160,35 @@ LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[1]],
160160
+- LogicalTableScan(table=[[default_catalog, default_database, source1]])
161161
162162
LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[0]], fields=[x, EXPR$1])
163+
+- LogicalProject(x=[$0], EXPR$1=[null:BIGINT])
164+
+- LogicalTableScan(table=[[default_catalog, default_database, source3]])
165+
]]>
166+
</Resource>
167+
<Resource name="optimized exec plan">
168+
<![CDATA[
169+
Sink(table=[default_catalog.default_database.sink1], targetColumns=[[0]], fields=[x, EXPR$1])
170+
+- Union(all=[true], union=[x, EXPR$1])
171+
:- Calc(select=[x, null:BIGINT AS EXPR$1])
172+
: +- TableSourceScan(table=[[default_catalog, default_database, source1, project=[x, y], metadata=[]]], fields=[x, y])(reuse_id=[1])
173+
:- Calc(select=[null:BIGINT AS EXPR$0, y])
174+
: +- Reused(reference_id=[1])
175+
+- Calc(select=[x, null:BIGINT AS EXPR$1])
176+
+- TableSourceScan(table=[[default_catalog, default_database, source3, project=[x], metadata=[]]], fields=[x])
177+
]]>
178+
</Resource>
179+
</TestCase>
180+
<TestCase name="testSinkReuseWithPartialColumnsAndSupportsTargetColumnWriting">
181+
<Resource name="ast">
182+
<![CDATA[
183+
LogicalSink(table=[default_catalog.default_database.sink2], targetColumns=[[0]], fields=[x, EXPR$1])
184+
+- LogicalProject(x=[$0], EXPR$1=[null:BIGINT])
185+
+- LogicalTableScan(table=[[default_catalog, default_database, source1]])
186+
187+
LogicalSink(table=[default_catalog.default_database.sink2], targetColumns=[[1]], fields=[EXPR$0, y])
188+
+- LogicalProject(EXPR$0=[null:BIGINT], y=[$1])
189+
+- LogicalTableScan(table=[[default_catalog, default_database, source1]])
190+
191+
LogicalSink(table=[default_catalog.default_database.sink2], targetColumns=[[0]], fields=[x, EXPR$1])
163192
+- LogicalProject(x=[$0], EXPR$1=[null:BIGINT])
164193
+- LogicalTableScan(table=[[default_catalog, default_database, source3]])
165194
]]>
@@ -168,14 +197,14 @@ LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[0]],
168197
<![CDATA[
169198
TableSourceScan(table=[[default_catalog, default_database, source1, project=[x, y], metadata=[]]], fields=[x, y])(reuse_id=[1])
170199
171-
Sink(table=[default_catalog.default_database.sink1], targetColumns=[[0]], fields=[x, EXPR$1])
200+
Sink(table=[default_catalog.default_database.sink2], targetColumns=[[0]], fields=[x, EXPR$1])
172201
+- Union(all=[true], union=[x, EXPR$1])
173202
:- Calc(select=[x, null:BIGINT AS EXPR$1])
174203
: +- Reused(reference_id=[1])
175204
+- Calc(select=[x, null:BIGINT AS EXPR$1])
176205
+- TableSourceScan(table=[[default_catalog, default_database, source3, project=[x], metadata=[]]], fields=[x])
177206
178-
Sink(table=[default_catalog.default_database.sink1], targetColumns=[[1]], fields=[EXPR$0, y])
207+
Sink(table=[default_catalog.default_database.sink2], targetColumns=[[1]], fields=[EXPR$0, y])
179208
+- Calc(select=[null:BIGINT AS EXPR$0, y])
180209
+- Reused(reference_id=[1])
181210
]]>

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/StreamSinkReuseTest.xml

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ Sink(table=[default_catalog.default_database.sink1], fields=[x, y])
148148
]]>
149149
</Resource>
150150
</TestCase>
151-
<TestCase name="testSinkReuseWithPartialColumns">
151+
<TestCase name="testSinkReuseWithPartialColumnsNotSupportsTargetColumnWriting">
152152
<Resource name="ast">
153153
<![CDATA[
154154
LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[0]], fields=[x, EXPR$1])
@@ -160,6 +160,35 @@ LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[1]],
160160
+- LogicalTableScan(table=[[default_catalog, default_database, source1]])
161161
162162
LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[0]], fields=[x, EXPR$1])
163+
+- LogicalProject(x=[$0], EXPR$1=[null:BIGINT])
164+
+- LogicalTableScan(table=[[default_catalog, default_database, source3]])
165+
]]>
166+
</Resource>
167+
<Resource name="optimized exec plan">
168+
<![CDATA[
169+
Sink(table=[default_catalog.default_database.sink1], targetColumns=[[0]], fields=[x, EXPR$1])
170+
+- Union(all=[true], union=[x, EXPR$1])
171+
:- Calc(select=[x, null:BIGINT AS EXPR$1])
172+
: +- TableSourceScan(table=[[default_catalog, default_database, source1, project=[x, y], metadata=[]]], fields=[x, y])(reuse_id=[1])
173+
:- Calc(select=[null:BIGINT AS EXPR$0, y])
174+
: +- Reused(reference_id=[1])
175+
+- Calc(select=[x, null:BIGINT AS EXPR$1])
176+
+- TableSourceScan(table=[[default_catalog, default_database, source3, project=[x], metadata=[]]], fields=[x])
177+
]]>
178+
</Resource>
179+
</TestCase>
180+
<TestCase name="testSinkReuseWithPartialColumnsAndSupportsTargetColumnWriting">
181+
<Resource name="ast">
182+
<![CDATA[
183+
LogicalSink(table=[default_catalog.default_database.sink2], targetColumns=[[0]], fields=[x, EXPR$1])
184+
+- LogicalProject(x=[$0], EXPR$1=[null:BIGINT])
185+
+- LogicalTableScan(table=[[default_catalog, default_database, source1]])
186+
187+
LogicalSink(table=[default_catalog.default_database.sink2], targetColumns=[[1]], fields=[EXPR$0, y])
188+
+- LogicalProject(EXPR$0=[null:BIGINT], y=[$1])
189+
+- LogicalTableScan(table=[[default_catalog, default_database, source1]])
190+
191+
LogicalSink(table=[default_catalog.default_database.sink2], targetColumns=[[0]], fields=[x, EXPR$1])
163192
+- LogicalProject(x=[$0], EXPR$1=[null:BIGINT])
164193
+- LogicalTableScan(table=[[default_catalog, default_database, source3]])
165194
]]>
@@ -168,14 +197,14 @@ LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[0]],
168197
<![CDATA[
169198
TableSourceScan(table=[[default_catalog, default_database, source1, project=[x, y], metadata=[]]], fields=[x, y])(reuse_id=[1])
170199
171-
Sink(table=[default_catalog.default_database.sink1], targetColumns=[[0]], fields=[x, EXPR$1])
200+
Sink(table=[default_catalog.default_database.sink2], targetColumns=[[0]], fields=[x, EXPR$1])
172201
+- Union(all=[true], union=[x, EXPR$1])
173202
:- Calc(select=[x, null:BIGINT AS EXPR$1])
174203
: +- Reused(reference_id=[1])
175204
+- Calc(select=[x, null:BIGINT AS EXPR$1])
176205
+- TableSourceScan(table=[[default_catalog, default_database, source3, project=[x], metadata=[]]], fields=[x])
177206
178-
Sink(table=[default_catalog.default_database.sink1], targetColumns=[[1]], fields=[EXPR$0, y])
207+
Sink(table=[default_catalog.default_database.sink2], targetColumns=[[1]], fields=[EXPR$0, y])
179208
+- Calc(select=[null:BIGINT AS EXPR$0, y])
180209
+- Reused(reference_id=[1])
181210
]]>

0 commit comments

Comments
 (0)