Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35176][Connector/JDBC] Support property authentication connection for JDBC catalog & dynamic table #116

Merged
merged 2 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ public Properties getProperties() {
return properties;
}

@Nonnull
public static Properties getBriefAuthProperties(String user, String password) {
final Properties result = new Properties();
if (Objects.nonNull(user)) {
result.put(USER_KEY, user);
}
if (Objects.nonNull(password)) {
result.put(PASSWORD_KEY, password);
}
return result;
}

/** Builder for {@link JdbcConnectionOptions}. */
public static class JdbcConnectionOptionsBuilder {
private String url;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,12 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Predicate;

import static org.apache.flink.connector.jdbc.JdbcConnectionOptions.PASSWORD_KEY;
import static org.apache.flink.connector.jdbc.JdbcConnectionOptions.USER_KEY;
import static org.apache.flink.connector.jdbc.JdbcConnectionOptions.getBriefAuthProperties;
import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
Expand All @@ -88,32 +92,48 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog {
private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcCatalog.class);

protected final ClassLoader userClassLoader;
protected final String username;
protected final String pwd;
protected final String baseUrl;
protected final String defaultUrl;
protected final Properties connectionProperties;

@Deprecated
public AbstractJdbcCatalog(
ClassLoader userClassLoader,
String catalogName,
String defaultDatabase,
String username,
String pwd,
String baseUrl) {
this(
userClassLoader,
catalogName,
defaultDatabase,
baseUrl,
getBriefAuthProperties(username, pwd));
}

public AbstractJdbcCatalog(
ClassLoader userClassLoader,
String catalogName,
String defaultDatabase,
String baseUrl,
Properties connectionProperties) {
super(catalogName, defaultDatabase);

checkNotNull(userClassLoader);
checkArgument(!StringUtils.isNullOrWhitespaceOnly(username));
checkArgument(!StringUtils.isNullOrWhitespaceOnly(pwd));
checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl));

JdbcCatalogUtils.validateJdbcUrl(baseUrl);

this.userClassLoader = userClassLoader;
this.username = username;
this.pwd = pwd;
this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
this.defaultUrl = this.baseUrl + defaultDatabase;
this.connectionProperties = Preconditions.checkNotNull(connectionProperties);
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(connectionProperties.getProperty(USER_KEY)));
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(
connectionProperties.getProperty(PASSWORD_KEY)));
}

@Override
Expand All @@ -122,7 +142,7 @@ public void open() throws CatalogException {
try (TemporaryClassLoaderContext ignored =
TemporaryClassLoaderContext.of(userClassLoader)) {
// test connection, fail early if we cannot connect to database
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
try (Connection conn = DriverManager.getConnection(defaultUrl, connectionProperties)) {
} catch (SQLException e) {
throw new ValidationException(
String.format("Failed connecting to %s via JDBC.", defaultUrl), e);
Expand All @@ -139,11 +159,11 @@ public void close() throws CatalogException {
// ----- getters ------

public String getUsername() {
return username;
return connectionProperties.getProperty(USER_KEY);
}

public String getPassword() {
return pwd;
return connectionProperties.getProperty(PASSWORD_KEY);
}

public String getBaseUrl() {
Expand Down Expand Up @@ -248,7 +268,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath)
String databaseName = tablePath.getDatabaseName();
String dbUrl = baseUrl + databaseName;

try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
try (Connection conn = DriverManager.getConnection(dbUrl, connectionProperties)) {
DatabaseMetaData metaData = conn.getMetaData();
Optional<UniqueConstraint> primaryKey =
getPrimaryKey(
Expand Down Expand Up @@ -282,8 +302,8 @@ public CatalogBaseTable getTable(ObjectPath tablePath)
Map<String, String> props = new HashMap<>();
props.put(CONNECTOR.key(), IDENTIFIER);
props.put(URL.key(), dbUrl);
props.put(USERNAME.key(), username);
props.put(PASSWORD.key(), pwd);
props.put(USERNAME.key(), connectionProperties.getProperty(USER_KEY));
props.put(PASSWORD.key(), connectionProperties.getProperty(PASSWORD_KEY));
props.put(TABLE_NAME.key(), getSchemaTableName(tablePath));
return CatalogTable.of(tableSchema, null, Lists.newArrayList(), props);
} catch (Exception e) {
Expand Down Expand Up @@ -497,7 +517,7 @@ protected List<String> extractColumnValuesBySQL(
Predicate<String> filterFunc,
Object... params) {

try (Connection conn = DriverManager.getConnection(connUrl, username, pwd);
try (Connection conn = DriverManager.getConnection(connUrl, connectionProperties);
PreparedStatement ps = conn.prepareStatement(sql)) {
return extractColumnValuesByStatement(ps, columnIndex, filterFunc, params);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,22 @@
import org.apache.flink.table.catalog.exceptions.TableNotExistException;

import java.util.List;
import java.util.Properties;

import static org.apache.flink.connector.jdbc.JdbcConnectionOptions.getBriefAuthProperties;

/** Catalogs for relational databases via JDBC. */
@PublicEvolving
public class JdbcCatalog extends AbstractJdbcCatalog {

private final AbstractJdbcCatalog internal;

@Deprecated
/**
* Creates a JdbcCatalog.
*
* @deprecated please use {@link JdbcCatalog#JdbcCatalog(ClassLoader, String, String, String,
* String, String, String)} instead.
* String, Properties)} instead.
*/
public JdbcCatalog(
String catalogName,
Expand All @@ -52,12 +56,12 @@ public JdbcCatalog(
Thread.currentThread().getContextClassLoader(),
catalogName,
defaultDatabase,
username,
pwd,
baseUrl,
null);
null,
getBriefAuthProperties(username, pwd));
}

@VisibleForTesting
/**
* Creates a JdbcCatalog.
*
Expand All @@ -77,17 +81,42 @@ public JdbcCatalog(
String pwd,
String baseUrl,
String compatibleMode) {
super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
this(
userClassLoader,
catalogName,
defaultDatabase,
baseUrl,
compatibleMode,
getBriefAuthProperties(username, pwd));
}

/**
* Creates a JdbcCatalog.
*
* @param userClassLoader the classloader used to load JDBC driver
* @param catalogName the registered catalog name
* @param defaultDatabase the default database name
* @param connectProperties the properties used to connect the database
* @param baseUrl the base URL of the database, e.g. jdbc:mysql://localhost:3306
* @param compatibleMode the compatible mode of the database
*/
public JdbcCatalog(
ClassLoader userClassLoader,
String catalogName,
String defaultDatabase,
String baseUrl,
String compatibleMode,
Properties connectProperties) {
super(userClassLoader, catalogName, defaultDatabase, baseUrl, connectProperties);

internal =
JdbcCatalogUtils.createCatalog(
userClassLoader,
catalogName,
defaultDatabase,
username,
pwd,
baseUrl,
compatibleMode);
compatibleMode,
connectProperties);
}

// ------ databases -----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;

import java.util.Properties;

import static org.apache.flink.connector.jdbc.JdbcConnectionOptions.getBriefAuthProperties;
import static org.apache.flink.util.Preconditions.checkArgument;

/** Utils for {@link JdbcCatalog}. */
Expand All @@ -41,6 +44,7 @@ public static void validateJdbcUrl(String url) {
checkArgument(parts.length == 2);
}

@Deprecated
/** Create catalog instance from given information. */
public static AbstractJdbcCatalog createCatalog(
ClassLoader userClassLoader,
Expand All @@ -50,17 +54,34 @@ public static AbstractJdbcCatalog createCatalog(
String pwd,
String baseUrl,
String compatibleMode) {
return createCatalog(
userClassLoader,
catalogName,
defaultDatabase,
baseUrl,
compatibleMode,
getBriefAuthProperties(username, pwd));
}

/** Create catalog instance from given information. */
public static AbstractJdbcCatalog createCatalog(
ClassLoader userClassLoader,
String catalogName,
String defaultDatabase,
String baseUrl,
String compatibleMode,
Properties connectionProperties) {
JdbcDialect dialect = JdbcDialectLoader.load(baseUrl, compatibleMode, userClassLoader);

if (dialect instanceof PostgresDialect) {
return new PostgresCatalog(
userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
userClassLoader, catalogName, defaultDatabase, baseUrl, connectionProperties);
} else if (dialect instanceof CrateDBDialect) {
return new CrateDBCatalog(
userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
userClassLoader, catalogName, defaultDatabase, baseUrl, connectionProperties);
} else if (dialect instanceof MySqlDialect) {
return new MySqlCatalog(
userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
userClassLoader, catalogName, defaultDatabase, baseUrl, connectionProperties);
} else {
throw new UnsupportedOperationException(
String.format("Catalog for '%s' is not supported yet.", dialect));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.DEFAULT_DATABASE;
import static org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.PASSWORD;
import static org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.USERNAME;
import static org.apache.flink.connector.jdbc.utils.JdbcUtils.getConnectionProperties;
import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;

/** Factory for {@link JdbcCatalog}. */
Expand Down Expand Up @@ -75,9 +76,8 @@ public Catalog createCatalog(Context context) {
context.getClassLoader(),
context.getName(),
helper.getOptions().get(DEFAULT_DATABASE),
helper.getOptions().get(USERNAME),
helper.getOptions().get(PASSWORD),
helper.getOptions().get(BASE_URL),
helper.getOptions().get(COMPATIBLE_MODE));
helper.getOptions().get(COMPATIBLE_MODE),
getConnectionProperties(helper.getOptions()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.connector.jdbc.catalog.JdbcCatalog;
import org.apache.flink.connector.jdbc.table.JdbcConnectorOptions;
import org.apache.flink.table.catalog.CommonCatalogOptions;

/** {@link ConfigOption}s for {@link JdbcCatalog}. */
Expand All @@ -35,11 +36,9 @@ public class JdbcCatalogFactoryOptions {
.stringType()
.noDefaultValue();

public static final ConfigOption<String> USERNAME =
ConfigOptions.key("username").stringType().noDefaultValue();
public static final ConfigOption<String> USERNAME = JdbcConnectorOptions.USERNAME;

public static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password").stringType().noDefaultValue();
public static final ConfigOption<String> PASSWORD = JdbcConnectorOptions.PASSWORD;

public static final ConfigOption<String> BASE_URL =
ConfigOptions.key("base-url").stringType().noDefaultValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.connector.jdbc.databases.cratedb.catalog;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.jdbc.databases.postgres.catalog.PostgresCatalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
Expand All @@ -34,8 +35,11 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;

import static org.apache.flink.connector.jdbc.JdbcConnectionOptions.getBriefAuthProperties;

/** Catalog for CrateDB. */
@Internal
public class CrateDBCatalog extends PostgresCatalog {
Expand All @@ -53,21 +57,35 @@ public class CrateDBCatalog extends PostgresCatalog {
}
};

@VisibleForTesting
public CrateDBCatalog(
ClassLoader userClassLoader,
String catalogName,
String defaultDatabase,
String username,
String pwd,
String baseUrl) {
this(
userClassLoader,
catalogName,
defaultDatabase,
baseUrl,
getBriefAuthProperties(username, pwd));
}

public CrateDBCatalog(
ClassLoader userClassLoader,
String catalogName,
String defaultDatabase,
String baseUrl,
Properties connecProperties) {
super(
userClassLoader,
catalogName,
defaultDatabase,
username,
pwd,
baseUrl,
new CrateDBTypeMapper());
new CrateDBTypeMapper(),
connecProperties);
}

// ------ databases ------
Expand Down
Loading