Skip to content

Commit 34ae209

Browse files
committed
[hotfix] Support dameng database
1 parent 544275c commit 34ae209

File tree

8 files changed

+734
-0
lines changed

8 files changed

+734
-0
lines changed

flink-connector-jdbc-dameng/pom.xml

+91
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
<modelVersion>4.0.0</modelVersion>
4+
<parent>
5+
<groupId>org.apache.flink</groupId>
6+
<artifactId>flink-connector-jdbc-parent</artifactId>
7+
<version>3.3-SNAPSHOT</version>
8+
</parent>
9+
10+
<artifactId>flink-connector-jdbc-dameng</artifactId>
11+
<name>Flink : Connectors : JDBC : DaMeng</name>
12+
13+
<packaging>jar</packaging>
14+
15+
<properties>
16+
<dameng.version>8.1.1.49</dameng.version>
17+
</properties>
18+
19+
<dependencies>
20+
21+
<dependency>
22+
<groupId>org.apache.flink</groupId>
23+
<artifactId>flink-connector-jdbc-core</artifactId>
24+
<version>${project.version}</version>
25+
</dependency>
26+
27+
<dependency>
28+
<groupId>org.apache.flink</groupId>
29+
<artifactId>flink-connector-jdbc-core</artifactId>
30+
<version>${project.version}</version>
31+
<type>test-jar</type>
32+
<scope>test</scope>
33+
</dependency>
34+
35+
<dependency>
36+
<groupId>org.apache.flink</groupId>
37+
<artifactId>flink-table-common</artifactId>
38+
<version>${flink.version}</version>
39+
<type>test-jar</type>
40+
<scope>test</scope>
41+
</dependency>
42+
43+
<dependency>
44+
<groupId>org.apache.flink</groupId>
45+
<artifactId>flink-table-api-java-bridge</artifactId>
46+
<version>${flink.version}</version>
47+
<scope>provided</scope>
48+
<optional>true</optional>
49+
</dependency>
50+
51+
<dependency>
52+
<groupId>org.apache.flink</groupId>
53+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
54+
<version>${flink.version}</version>
55+
<scope>test</scope>
56+
</dependency>
57+
<dependency>
58+
<groupId>org.apache.flink</groupId>
59+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
60+
<version>${flink.version}</version>
61+
<type>test-jar</type>
62+
<scope>test</scope>
63+
</dependency>
64+
65+
<dependency>
66+
<groupId>org.apache.flink</groupId>
67+
<artifactId>flink-test-utils</artifactId>
68+
<version>${flink.version}</version>
69+
<scope>test</scope>
70+
</dependency>
71+
72+
<!-- DaMeng -->
73+
<dependency>
74+
<groupId>com.dameng</groupId>
75+
<artifactId>Dm8JdbcDriver16</artifactId>
76+
<version>${dameng.version}</version>
77+
<scope>test</scope>
78+
</dependency>
79+
80+
<!-- Assertions test dependencies -->
81+
82+
<dependency>
83+
<groupId>org.assertj</groupId>
84+
<artifactId>assertj-core</artifactId>
85+
<version>${assertj.version}</version>
86+
<scope>test</scope>
87+
</dependency>
88+
89+
</dependencies>
90+
91+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.jdbc.dameng.database;
20+
21+
import org.apache.flink.connector.jdbc.core.database.JdbcFactory;
22+
import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog;
23+
import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect;
24+
import org.apache.flink.connector.jdbc.dameng.database.catalog.DaMengCatalog;
25+
import org.apache.flink.connector.jdbc.dameng.database.dialect.DaMengDialect;
26+
27+
/** Factory for {@link DaMengDialect}. */
28+
public class DaMengFactory implements JdbcFactory
29+
{
30+
@Override
31+
public boolean acceptsURL(String url)
32+
{
33+
return url.startsWith("jdbc:dm:");
34+
}
35+
36+
@Override
37+
public JdbcDialect createDialect()
38+
{
39+
return new DaMengDialect();
40+
}
41+
42+
@Override
43+
public JdbcCatalog createCatalog(
44+
ClassLoader classLoader,
45+
String catalogName,
46+
String defaultDatabase,
47+
String username,
48+
String pwd,
49+
String baseUrl)
50+
{
51+
return new DaMengCatalog(classLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
52+
}
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.jdbc.dameng.database.catalog;
20+
21+
import org.apache.commons.lang3.StringUtils;
22+
import org.apache.flink.annotation.Internal;
23+
import org.apache.flink.annotation.VisibleForTesting;
24+
import org.apache.flink.connector.jdbc.core.database.catalog.AbstractJdbcCatalog;
25+
import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalogTypeMapper;
26+
import org.apache.flink.table.catalog.ObjectPath;
27+
import org.apache.flink.table.catalog.exceptions.CatalogException;
28+
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
29+
import org.apache.flink.table.types.DataType;
30+
import org.apache.flink.util.Preconditions;
31+
import org.apache.flink.util.TemporaryClassLoaderContext;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
35+
import java.sql.Connection;
36+
import java.sql.DriverManager;
37+
import java.sql.ResultSetMetaData;
38+
import java.sql.SQLException;
39+
import java.util.HashSet;
40+
import java.util.List;
41+
import java.util.Properties;
42+
import java.util.Set;
43+
import java.util.regex.Matcher;
44+
import java.util.regex.Pattern;
45+
46+
import static org.apache.flink.connector.jdbc.JdbcConnectionOptions.getBriefAuthProperties;
47+
48+
/** Catalog for DaMeng. */
49+
@Internal
50+
public class DaMengCatalog
51+
extends AbstractJdbcCatalog {
52+
53+
private static final Logger LOG = LoggerFactory.getLogger(DaMengCatalog.class);
54+
55+
private final JdbcCatalogTypeMapper dialectTypeMapper;
56+
57+
// DaMeng system schemas that shouldn't be exposed to users
58+
private static final Set<String> builtinDatabases =
59+
new HashSet<String>() {
60+
{
61+
add("SYS");
62+
add("SYSDBA");
63+
add("SYSAUDITOR");
64+
add("INFORMATION_SCHEMA");
65+
}
66+
};
67+
68+
@VisibleForTesting
69+
public DaMengCatalog(
70+
ClassLoader userClassLoader,
71+
String catalogName,
72+
String defaultDatabase,
73+
String username,
74+
String pwd,
75+
String baseUrl) {
76+
this(
77+
userClassLoader,
78+
catalogName,
79+
defaultDatabase,
80+
baseUrl,
81+
getBriefAuthProperties(username, pwd));
82+
}
83+
84+
public DaMengCatalog(
85+
ClassLoader userClassLoader,
86+
String catalogName,
87+
String defaultDatabase,
88+
String baseUrl,
89+
Properties connectionProperties) {
90+
super(userClassLoader, catalogName, defaultDatabase, baseUrl, connectionProperties);
91+
92+
String driverVersion =
93+
Preconditions.checkNotNull(getDriverVersion(), "Driver version must not be null.");
94+
String databaseVersion =
95+
Preconditions.checkNotNull(
96+
getDatabaseVersion(), "Database version must not be null.");
97+
LOG.info("Driver version: {}, database version: {}", driverVersion, databaseVersion);
98+
this.dialectTypeMapper = new DaMengTypeMapper(databaseVersion, driverVersion);
99+
}
100+
101+
@Override
102+
public List<String> listDatabases() throws CatalogException {
103+
return extractColumnValuesBySQL(
104+
defaultUrl,
105+
"SELECT SCHEMA_NAME FROM ALL_SCHEMAS",
106+
1,
107+
dbName -> !builtinDatabases.contains(dbName.toUpperCase()));
108+
}
109+
110+
// ------ tables ------
111+
112+
@Override
113+
public List<String> listTables(String databaseName)
114+
throws DatabaseNotExistException, CatalogException {
115+
Preconditions.checkState(
116+
StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
117+
if (!databaseExists(databaseName)) {
118+
throw new DatabaseNotExistException(getName(), databaseName);
119+
}
120+
121+
return extractColumnValuesBySQL(
122+
getDatabaseUrl(databaseName),
123+
"SELECT TABLE_NAME FROM ALL_TABLES WHERE OWNER = ?",
124+
1,
125+
null,
126+
databaseName.toUpperCase());
127+
}
128+
129+
@Override
130+
public boolean tableExists(ObjectPath tablePath) throws CatalogException {
131+
return !extractColumnValuesBySQL(
132+
baseUrl,
133+
"SELECT TABLE_NAME FROM ALL_TABLES WHERE OWNER = ? AND TABLE_NAME = ?",
134+
1,
135+
null,
136+
tablePath.getDatabaseName().toUpperCase(),
137+
tablePath.getObjectName().toUpperCase())
138+
.isEmpty();
139+
}
140+
141+
private String getDatabaseVersion() {
142+
try (TemporaryClassLoaderContext ignored =
143+
TemporaryClassLoaderContext.of(userClassLoader)) {
144+
try (Connection conn = DriverManager.getConnection(defaultUrl, connectionProperties)) {
145+
return conn.getMetaData().getDatabaseProductVersion();
146+
} catch (Exception e) {
147+
throw new CatalogException(
148+
String.format("Failed in getting DaMeng version by %s.", defaultUrl), e);
149+
}
150+
}
151+
}
152+
153+
private String getDriverVersion() {
154+
try (TemporaryClassLoaderContext ignored =
155+
TemporaryClassLoaderContext.of(userClassLoader)) {
156+
try (Connection conn = DriverManager.getConnection(defaultUrl, connectionProperties)) {
157+
String driverVersion = conn.getMetaData().getDriverVersion();
158+
Pattern regexp = Pattern.compile("\\d+?\\.\\d+?\\.\\d+");
159+
Matcher matcher = regexp.matcher(driverVersion);
160+
return matcher.find() ? matcher.group(0) : null;
161+
} catch (Exception e) {
162+
throw new CatalogException(
163+
String.format("Failed in getting DaMeng driver version by %s.", defaultUrl),
164+
e);
165+
}
166+
}
167+
}
168+
169+
/** Converts DaMeng type to Flink {@link DataType}. */
170+
@Override
171+
protected DataType fromJDBCType(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)
172+
throws SQLException {
173+
return dialectTypeMapper.mapping(tablePath, metadata, colIndex);
174+
}
175+
176+
@Override
177+
protected String getTableName(ObjectPath tablePath) {
178+
return tablePath.getObjectName().toUpperCase();
179+
}
180+
181+
@Override
182+
protected String getSchemaName(ObjectPath tablePath) {
183+
return tablePath.getDatabaseName().toUpperCase();
184+
}
185+
186+
@Override
187+
protected String getSchemaTableName(ObjectPath tablePath) {
188+
return tablePath.getObjectName().toUpperCase();
189+
}
190+
}

0 commit comments

Comments
 (0)