28
28
import com .dtstack .chunjun .connector .sqlserver .sink .SqlserverOutputFormat ;
29
29
import com .dtstack .chunjun .connector .sqlserver .source .SqlserverInputFormat ;
30
30
31
+ import org .apache .flink .table .connector .sink .DynamicTableSink ;
31
32
import org .apache .flink .table .connector .source .DynamicTableSource ;
33
+ import org .apache .flink .table .factories .FactoryUtil ;
32
34
33
35
import org .apache .commons .lang3 .StringUtils ;
34
36
@@ -38,13 +40,20 @@ public class SqlserverDynamicTableFactory extends JdbcDynamicTableFactory {
38
40
39
41
private static final String IDENTIFIER = "sqlserver-x" ;
40
42
43
+ private JdbcConfig jdbcConfig ;
44
+
41
45
@ Override
42
46
public String factoryIdentifier () {
43
47
return IDENTIFIER ;
44
48
}
45
49
46
50
@ Override
47
51
protected JdbcDialect getDialect () {
52
+ if (jdbcConfig != null ) {
53
+ return new SqlserverDialect (
54
+ jdbcConfig .isWithNoLock (),
55
+ jdbcConfig .getJdbcUrl ().startsWith ("jdbc:jtds:sqlserver" ));
56
+ }
48
57
return new SqlserverDialect ();
49
58
}
50
59
@@ -60,11 +69,24 @@ protected JdbcOutputFormatBuilder getOutputFormatBuilder() {
60
69
61
70
@ Override
62
71
public DynamicTableSource createDynamicTableSource (Context context ) {
72
+ final FactoryUtil .TableFactoryHelper helper =
73
+ FactoryUtil .createTableFactoryHelper (this , context );
74
+ this .jdbcConfig = getSourceConnectionConfig (helper .getOptions ());
63
75
Map <String , String > prop = context .getCatalogTable ().getOptions ();
64
76
prop .put ("druid.validation-query" , "SELECT 1" );
65
77
return super .createDynamicTableSource (context );
66
78
}
67
79
80
+ @ Override
81
+ public DynamicTableSink createDynamicTableSink (Context context ) {
82
+ final FactoryUtil .TableFactoryHelper helper =
83
+ FactoryUtil .createTableFactoryHelper (this , context );
84
+ this .jdbcConfig =
85
+ getSinkConnectionConfig (
86
+ helper .getOptions (), context .getCatalogTable ().getResolvedSchema ());
87
+ return super .createDynamicTableSink (context );
88
+ }
89
+
68
90
/** table字段有可能是[schema].[table]格式 需要转换为对应的schema 和 table 字段* */
69
91
@ Override
70
92
protected void resetTableInfo (JdbcConfig jdbcConfig ) {
0 commit comments