Skip to content

Commit 0069a03

Browse files
committed
Address comments
1 parent db7d938 commit 0069a03

File tree

3 files changed

+166
-28
lines changed

3 files changed

+166
-28
lines changed

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java

+39-20
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.connector.jdbc.table;
2020

2121
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.annotation.VisibleForTesting;
2223
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
2324
import org.apache.flink.api.connector.source.Boundedness;
2425
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
@@ -191,7 +192,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
191192

192193
predicates.addAll(this.resolvedPredicates);
193194

194-
if (predicates.size() > 0) {
195+
if (!predicates.isEmpty()) {
195196
String joinedConditions =
196197
predicates.stream()
197198
.map(pred -> String.format("(%s)", pred))
@@ -214,25 +215,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
214215
(key, value) ->
215216
builder.setConnectionProperty(key.toString(), value.toString()));
216217
JdbcSource<RowData> source = builder.build();
217-
return new DataStreamScanProvider() {
218-
@Override
219-
public DataStream<RowData> produceDataStream(
220-
ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
221-
String sourceName =
222-
Objects.isNull(tableIdentifier)
223-
? "JdbcSource"
224-
: "JdbcSource-" + tableIdentifier;
225-
DataStreamSource<RowData> sourceStream =
226-
execEnv.fromSource(source, WatermarkStrategy.noWatermarks(), sourceName);
227-
providerContext.generateUid(JDBC_TRANSFORMATION).ifPresent(sourceStream::uid);
228-
return sourceStream;
229-
}
230-
231-
@Override
232-
public boolean isBounded() {
233-
return source.getBoundedness() == Boundedness.BOUNDED;
234-
}
235-
};
218+
return new JdbcDataStreamScanProvider(source, tableIdentifier);
236219
}
237220

238221
@Override
@@ -349,4 +332,40 @@ private Serializable[][] replicatePushdownParamsForN(int n) {
349332
}
350333
return allPushdownParams;
351334
}
335+
336+
@VisibleForTesting
337+
protected static String getDynamicJdbcTableSourceName(String tableIdentifier) {
338+
return Objects.isNull(tableIdentifier)
339+
? JdbcDynamicTableSource.class.getSimpleName()
340+
: JdbcDynamicTableSource.class.getSimpleName() + "-" + tableIdentifier;
341+
}
342+
343+
private static class JdbcDataStreamScanProvider implements DataStreamScanProvider {
344+
345+
@Nullable private final String tableIdentifier;
346+
private final JdbcSource<RowData> source;
347+
348+
public JdbcDataStreamScanProvider(
349+
JdbcSource<RowData> source, @Nullable String tableIdentifier) {
350+
this.source = Preconditions.checkNotNull(source);
351+
this.tableIdentifier = tableIdentifier;
352+
}
353+
354+
@Override
355+
public DataStream<RowData> produceDataStream(
356+
ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
357+
DataStreamSource<RowData> sourceStream =
358+
execEnv.fromSource(
359+
source,
360+
WatermarkStrategy.noWatermarks(),
361+
getDynamicJdbcTableSourceName(tableIdentifier));
362+
providerContext.generateUid(JDBC_TRANSFORMATION).ifPresent(sourceStream::uid);
363+
return sourceStream;
364+
}
365+
366+
@Override
367+
public boolean isBounded() {
368+
return source.getBoundedness() == Boundedness.BOUNDED;
369+
}
370+
}
352371
}

flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java

+8
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,14 @@ class JdbcDynamicTableFactoryTest {
6363
Collections.emptyList(),
6464
UniqueConstraint.primaryKey("name", Arrays.asList("bbb", "aaa")));
6565

66+
@Test
67+
void testIdentifierOfTableSourceName() {
68+
DynamicTableSource actualSource = createTableSource(SCHEMA, getAllOptions());
69+
JdbcDynamicTableSource jdbcDynamicTableSource = (JdbcDynamicTableSource) actualSource;
70+
assertThat(jdbcDynamicTableSource.tableIdentifier)
71+
.isEqualTo(FactoryMocks.IDENTIFIER.asSummaryString());
72+
}
73+
6674
@Test
6775
void testJdbcCommonProperties() {
6876
Map<String, String> properties = getAllOptions();

flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java

+119-8
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,39 @@
1818

1919
package org.apache.flink.connector.jdbc.table;
2020

21+
import org.apache.flink.api.dag.Transformation;
2122
import org.apache.flink.configuration.Configuration;
2223
import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
2324
import org.apache.flink.connector.jdbc.testutils.TableManaged;
2425
import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
2526
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
2627
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
27-
import org.apache.flink.table.api.DataTypes;
28-
import org.apache.flink.table.api.TableEnvironment;
28+
import org.apache.flink.table.api.*;
29+
import org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl;
2930
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
31+
import org.apache.flink.table.catalog.*;
3032
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
3133
import org.apache.flink.table.data.DecimalData;
3234
import org.apache.flink.table.data.GenericRowData;
3335
import org.apache.flink.table.data.RowData;
3436
import org.apache.flink.table.data.TimestampData;
37+
import org.apache.flink.table.delegation.Executor;
38+
import org.apache.flink.table.delegation.Planner;
39+
import org.apache.flink.table.factories.CatalogStoreFactory;
40+
import org.apache.flink.table.factories.PlannerFactoryUtil;
41+
import org.apache.flink.table.factories.TableFactoryUtil;
42+
import org.apache.flink.table.module.ModuleManager;
43+
import org.apache.flink.table.operations.ModifyOperation;
3544
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
3645
import org.apache.flink.table.planner.runtime.utils.StreamTestSink;
46+
import org.apache.flink.table.resource.ResourceManager;
3747
import org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager;
3848
import org.apache.flink.table.test.lookup.cache.LookupCacheAssert;
3949
import org.apache.flink.test.junit5.MiniClusterExtension;
4050
import org.apache.flink.types.Row;
4151
import org.apache.flink.util.CollectionUtil;
52+
import org.apache.flink.util.FlinkUserCodeClassLoaders;
53+
import org.apache.flink.util.MutableURLClassLoader;
4254

4355
import org.junit.jupiter.api.AfterEach;
4456
import org.junit.jupiter.api.BeforeEach;
@@ -48,17 +60,13 @@
4860
import org.junit.jupiter.params.provider.EnumSource;
4961

5062
import java.math.BigDecimal;
63+
import java.net.URL;
5164
import java.sql.Connection;
5265
import java.sql.SQLException;
5366
import java.time.LocalDateTime;
5467
import java.time.temporal.ChronoUnit;
5568
import java.time.temporal.TemporalUnit;
56-
import java.util.Arrays;
57-
import java.util.Collection;
58-
import java.util.Collections;
59-
import java.util.HashMap;
60-
import java.util.List;
61-
import java.util.Map;
69+
import java.util.*;
6270
import java.util.stream.Collectors;
6371

6472
import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
@@ -140,6 +148,24 @@ void afterEach() {
140148
StreamTestSink.clear();
141149
}
142150

151+
@Test
152+
void testJdbcSourceName() {
153+
String testTable = "testTable";
154+
final List<Transformation<?>> transformationCollector = new ArrayList<>();
155+
tEnv =
156+
createTestingTableEnv(
157+
StreamExecutionEnvironment.getExecutionEnvironment(),
158+
EnvironmentSettings.newInstance().build(),
159+
transformationCollector);
160+
tEnv.executeSql(inputTable.getCreateQueryForFlink(getMetadata(), testTable));
161+
tEnv.executeSql("SELECT * FROM " + testTable);
162+
assertThat(transformationCollector).hasSize(1);
163+
assertThat(transformationCollector.get(0).getName())
164+
.isNotEqualTo(
165+
JdbcDynamicTableSource.getDynamicJdbcTableSourceName(
166+
"default_catalog.default_database." + testTable));
167+
}
168+
143169
@Test
144170
void testJdbcSource() {
145171
String testTable = "testTable";
@@ -585,6 +611,91 @@ protected TemporalUnit timestampPrecision() {
585611
return ChronoUnit.MICROS;
586612
}
587613

614+
private TableEnvironment createTestingTableEnv(
615+
StreamExecutionEnvironment executionEnvironment,
616+
EnvironmentSettings settings,
617+
final List<Transformation<?>> transformationCollector) {
618+
619+
final MutableURLClassLoader userClassLoader =
620+
FlinkUserCodeClassLoaders.create(
621+
new URL[0], settings.getUserClassLoader(), settings.getConfiguration());
622+
final Executor executor =
623+
AbstractStreamTableEnvironmentImpl.lookupExecutor(
624+
userClassLoader, executionEnvironment);
625+
626+
final TableConfig tableConfig = TableConfig.getDefault();
627+
tableConfig.setRootConfiguration(executor.getConfiguration());
628+
tableConfig.addConfiguration(settings.getConfiguration());
629+
630+
final ResourceManager resourceManager =
631+
new ResourceManager(settings.getConfiguration(), userClassLoader);
632+
final ModuleManager moduleManager = new ModuleManager();
633+
634+
final CatalogStoreFactory catalogStoreFactory =
635+
TableFactoryUtil.findAndCreateCatalogStoreFactory(
636+
settings.getConfiguration(), userClassLoader);
637+
final CatalogStoreFactory.Context catalogStoreFactoryContext =
638+
TableFactoryUtil.buildCatalogStoreFactoryContext(
639+
settings.getConfiguration(), userClassLoader);
640+
catalogStoreFactory.open(catalogStoreFactoryContext);
641+
final CatalogStore catalogStore =
642+
settings.getCatalogStore() != null
643+
? settings.getCatalogStore()
644+
: catalogStoreFactory.createCatalogStore();
645+
646+
final CatalogManager catalogManager =
647+
CatalogManager.newBuilder()
648+
.classLoader(userClassLoader)
649+
.config(tableConfig)
650+
.defaultCatalog(
651+
settings.getBuiltInCatalogName(),
652+
new GenericInMemoryCatalog(
653+
settings.getBuiltInCatalogName(),
654+
settings.getBuiltInDatabaseName()))
655+
.executionConfig(executionEnvironment.getConfig())
656+
.catalogModificationListeners(
657+
TableFactoryUtil.findCatalogModificationListenerList(
658+
settings.getConfiguration(), userClassLoader))
659+
.catalogStoreHolder(
660+
CatalogStoreHolder.newBuilder()
661+
.classloader(userClassLoader)
662+
.config(tableConfig)
663+
.catalogStore(catalogStore)
664+
.factory(catalogStoreFactory)
665+
.build())
666+
.build();
667+
668+
final FunctionCatalog functionCatalog =
669+
new FunctionCatalog(tableConfig, resourceManager, catalogManager, moduleManager);
670+
671+
final Planner planner =
672+
PlannerFactoryUtil.createPlanner(
673+
executor,
674+
tableConfig,
675+
userClassLoader,
676+
moduleManager,
677+
catalogManager,
678+
functionCatalog);
679+
680+
return new AbstractStreamTableEnvironmentImpl(
681+
catalogManager,
682+
moduleManager,
683+
resourceManager,
684+
tableConfig,
685+
executor,
686+
functionCatalog,
687+
planner,
688+
settings.isStreamingMode(),
689+
executionEnvironment) {
690+
@Override
691+
protected List<Transformation<?>> translate(List<ModifyOperation> modifyOperations) {
692+
List<Transformation<?>> transformations = super.translate(modifyOperations);
693+
transformationCollector.addAll(transformations);
694+
return transformations;
695+
}
696+
};
697+
}
698+
588699
private LocalDateTime truncateTime(LocalDateTime value) {
589700
return value.truncatedTo(timestampPrecision());
590701
}

0 commit comments

Comments
 (0)