Skip to content

Commit ef9658b

Browse files
committed
Address comments
1 parent db7d938 commit ef9658b

File tree

3 files changed

+171
-20
lines changed

3 files changed

+171
-20
lines changed

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java

+39-20
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.connector.jdbc.table;
2020

2121
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.annotation.VisibleForTesting;
2223
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
2324
import org.apache.flink.api.connector.source.Boundedness;
2425
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
@@ -191,7 +192,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
191192

192193
predicates.addAll(this.resolvedPredicates);
193194

194-
if (predicates.size() > 0) {
195+
if (!predicates.isEmpty()) {
195196
String joinedConditions =
196197
predicates.stream()
197198
.map(pred -> String.format("(%s)", pred))
@@ -214,25 +215,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
214215
(key, value) ->
215216
builder.setConnectionProperty(key.toString(), value.toString()));
216217
JdbcSource<RowData> source = builder.build();
217-
return new DataStreamScanProvider() {
218-
@Override
219-
public DataStream<RowData> produceDataStream(
220-
ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
221-
String sourceName =
222-
Objects.isNull(tableIdentifier)
223-
? "JdbcSource"
224-
: "JdbcSource-" + tableIdentifier;
225-
DataStreamSource<RowData> sourceStream =
226-
execEnv.fromSource(source, WatermarkStrategy.noWatermarks(), sourceName);
227-
providerContext.generateUid(JDBC_TRANSFORMATION).ifPresent(sourceStream::uid);
228-
return sourceStream;
229-
}
230-
231-
@Override
232-
public boolean isBounded() {
233-
return source.getBoundedness() == Boundedness.BOUNDED;
234-
}
235-
};
218+
return new JdbcDataStreamScanProvider(source, tableIdentifier);
236219
}
237220

238221
@Override
@@ -349,4 +332,40 @@ private Serializable[][] replicatePushdownParamsForN(int n) {
349332
}
350333
return allPushdownParams;
351334
}
335+
336+
@VisibleForTesting
337+
protected static String getDynamicJdbcTableSourceName(String tableIdentifier) {
338+
return Objects.isNull(tableIdentifier)
339+
? JdbcDynamicTableSource.class.getSimpleName()
340+
: JdbcDynamicTableSource.class.getSimpleName() + "-" + tableIdentifier;
341+
}
342+
343+
private static class JdbcDataStreamScanProvider implements DataStreamScanProvider {
344+
345+
@Nullable private final String tableIdentifier;
346+
private final JdbcSource<RowData> source;
347+
348+
public JdbcDataStreamScanProvider(
349+
JdbcSource<RowData> source, @Nullable String tableIdentifier) {
350+
this.source = Preconditions.checkNotNull(source);
351+
this.tableIdentifier = tableIdentifier;
352+
}
353+
354+
@Override
355+
public DataStream<RowData> produceDataStream(
356+
ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
357+
DataStreamSource<RowData> sourceStream =
358+
execEnv.fromSource(
359+
source,
360+
WatermarkStrategy.noWatermarks(),
361+
getDynamicJdbcTableSourceName(tableIdentifier));
362+
providerContext.generateUid(JDBC_TRANSFORMATION).ifPresent(sourceStream::uid);
363+
return sourceStream;
364+
}
365+
366+
@Override
367+
public boolean isBounded() {
368+
return source.getBoundedness() == Boundedness.BOUNDED;
369+
}
370+
}
352371
}

flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java

+8
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,14 @@ class JdbcDynamicTableFactoryTest {
6363
Collections.emptyList(),
6464
UniqueConstraint.primaryKey("name", Arrays.asList("bbb", "aaa")));
6565

66+
@Test
67+
void testIdentifierOfTableSourceName() {
68+
DynamicTableSource actualSource = createTableSource(SCHEMA, getAllOptions());
69+
JdbcDynamicTableSource jdbcDynamicTableSource = (JdbcDynamicTableSource) actualSource;
70+
assertThat(jdbcDynamicTableSource.tableIdentifier)
71+
.isEqualTo(FactoryMocks.IDENTIFIER.asSummaryString());
72+
}
73+
6674
@Test
6775
void testJdbcCommonProperties() {
6876
Map<String, String> properties = getAllOptions();

flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java

+124
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,46 @@
1818

1919
package org.apache.flink.connector.jdbc.table;
2020

21+
import org.apache.flink.api.dag.Transformation;
2122
import org.apache.flink.configuration.Configuration;
2223
import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
2324
import org.apache.flink.connector.jdbc.testutils.TableManaged;
2425
import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
2526
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
2627
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2728
import org.apache.flink.table.api.DataTypes;
29+
import org.apache.flink.table.api.EnvironmentSettings;
30+
import org.apache.flink.table.api.TableConfig;
2831
import org.apache.flink.table.api.TableEnvironment;
32+
import org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl;
2933
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
34+
import org.apache.flink.table.catalog.CatalogManager;
35+
import org.apache.flink.table.catalog.CatalogStore;
36+
import org.apache.flink.table.catalog.CatalogStoreHolder;
37+
import org.apache.flink.table.catalog.FunctionCatalog;
38+
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
3039
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
3140
import org.apache.flink.table.data.DecimalData;
3241
import org.apache.flink.table.data.GenericRowData;
3342
import org.apache.flink.table.data.RowData;
3443
import org.apache.flink.table.data.TimestampData;
44+
import org.apache.flink.table.delegation.Executor;
45+
import org.apache.flink.table.delegation.Planner;
46+
import org.apache.flink.table.factories.CatalogStoreFactory;
47+
import org.apache.flink.table.factories.PlannerFactoryUtil;
48+
import org.apache.flink.table.factories.TableFactoryUtil;
49+
import org.apache.flink.table.module.ModuleManager;
50+
import org.apache.flink.table.operations.ModifyOperation;
3551
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
3652
import org.apache.flink.table.planner.runtime.utils.StreamTestSink;
53+
import org.apache.flink.table.resource.ResourceManager;
3754
import org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager;
3855
import org.apache.flink.table.test.lookup.cache.LookupCacheAssert;
3956
import org.apache.flink.test.junit5.MiniClusterExtension;
4057
import org.apache.flink.types.Row;
4158
import org.apache.flink.util.CollectionUtil;
59+
import org.apache.flink.util.FlinkUserCodeClassLoaders;
60+
import org.apache.flink.util.MutableURLClassLoader;
4261

4362
import org.junit.jupiter.api.AfterEach;
4463
import org.junit.jupiter.api.BeforeEach;
@@ -48,11 +67,13 @@
4867
import org.junit.jupiter.params.provider.EnumSource;
4968

5069
import java.math.BigDecimal;
70+
import java.net.URL;
5171
import java.sql.Connection;
5272
import java.sql.SQLException;
5373
import java.time.LocalDateTime;
5474
import java.time.temporal.ChronoUnit;
5575
import java.time.temporal.TemporalUnit;
76+
import java.util.ArrayList;
5677
import java.util.Arrays;
5778
import java.util.Collection;
5879
import java.util.Collections;
@@ -140,6 +161,24 @@ void afterEach() {
140161
StreamTestSink.clear();
141162
}
142163

164+
@Test
165+
void testJdbcSourceName() {
166+
String testTable = "testTable";
167+
final List<Transformation<?>> transformationCollector = new ArrayList<>();
168+
tEnv =
169+
createTestingTableEnv(
170+
StreamExecutionEnvironment.getExecutionEnvironment(),
171+
EnvironmentSettings.newInstance().build(),
172+
transformationCollector);
173+
tEnv.executeSql(inputTable.getCreateQueryForFlink(getMetadata(), testTable));
174+
tEnv.executeSql("SELECT * FROM " + testTable);
175+
assertThat(transformationCollector).hasSize(1);
176+
assertThat(transformationCollector.get(0).getName())
177+
.isNotEqualTo(
178+
JdbcDynamicTableSource.getDynamicJdbcTableSourceName(
179+
"default_catalog.default_database." + testTable));
180+
}
181+
143182
@Test
144183
void testJdbcSource() {
145184
String testTable = "testTable";
@@ -585,6 +624,91 @@ protected TemporalUnit timestampPrecision() {
585624
return ChronoUnit.MICROS;
586625
}
587626

627+
private TableEnvironment createTestingTableEnv(
628+
StreamExecutionEnvironment executionEnvironment,
629+
EnvironmentSettings settings,
630+
final List<Transformation<?>> transformationCollector) {
631+
632+
final MutableURLClassLoader userClassLoader =
633+
FlinkUserCodeClassLoaders.create(
634+
new URL[0], settings.getUserClassLoader(), settings.getConfiguration());
635+
final Executor executor =
636+
AbstractStreamTableEnvironmentImpl.lookupExecutor(
637+
userClassLoader, executionEnvironment);
638+
639+
final TableConfig tableConfig = TableConfig.getDefault();
640+
tableConfig.setRootConfiguration(executor.getConfiguration());
641+
tableConfig.addConfiguration(settings.getConfiguration());
642+
643+
final ResourceManager resourceManager =
644+
new ResourceManager(settings.getConfiguration(), userClassLoader);
645+
final ModuleManager moduleManager = new ModuleManager();
646+
647+
final CatalogStoreFactory catalogStoreFactory =
648+
TableFactoryUtil.findAndCreateCatalogStoreFactory(
649+
settings.getConfiguration(), userClassLoader);
650+
final CatalogStoreFactory.Context catalogStoreFactoryContext =
651+
TableFactoryUtil.buildCatalogStoreFactoryContext(
652+
settings.getConfiguration(), userClassLoader);
653+
catalogStoreFactory.open(catalogStoreFactoryContext);
654+
final CatalogStore catalogStore =
655+
settings.getCatalogStore() != null
656+
? settings.getCatalogStore()
657+
: catalogStoreFactory.createCatalogStore();
658+
659+
final CatalogManager catalogManager =
660+
CatalogManager.newBuilder()
661+
.classLoader(userClassLoader)
662+
.config(tableConfig)
663+
.defaultCatalog(
664+
settings.getBuiltInCatalogName(),
665+
new GenericInMemoryCatalog(
666+
settings.getBuiltInCatalogName(),
667+
settings.getBuiltInDatabaseName()))
668+
.executionConfig(executionEnvironment.getConfig())
669+
.catalogModificationListeners(
670+
TableFactoryUtil.findCatalogModificationListenerList(
671+
settings.getConfiguration(), userClassLoader))
672+
.catalogStoreHolder(
673+
CatalogStoreHolder.newBuilder()
674+
.classloader(userClassLoader)
675+
.config(tableConfig)
676+
.catalogStore(catalogStore)
677+
.factory(catalogStoreFactory)
678+
.build())
679+
.build();
680+
681+
final FunctionCatalog functionCatalog =
682+
new FunctionCatalog(tableConfig, resourceManager, catalogManager, moduleManager);
683+
684+
final Planner planner =
685+
PlannerFactoryUtil.createPlanner(
686+
executor,
687+
tableConfig,
688+
userClassLoader,
689+
moduleManager,
690+
catalogManager,
691+
functionCatalog);
692+
693+
return new AbstractStreamTableEnvironmentImpl(
694+
catalogManager,
695+
moduleManager,
696+
resourceManager,
697+
tableConfig,
698+
executor,
699+
functionCatalog,
700+
planner,
701+
settings.isStreamingMode(),
702+
executionEnvironment) {
703+
@Override
704+
protected List<Transformation<?>> translate(List<ModifyOperation> modifyOperations) {
705+
List<Transformation<?>> transformations = super.translate(modifyOperations);
706+
transformationCollector.addAll(transformations);
707+
return transformations;
708+
}
709+
};
710+
}
711+
588712
private LocalDateTime truncateTime(LocalDateTime value) {
589713
return value.truncatedTo(timestampPrecision());
590714
}

0 commit comments

Comments
 (0)