Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,23 @@

import java.sql.Connection;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.RowIdLifetime;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLType;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;

public class DatabaseMetaDataImpl implements java.sql.DatabaseMetaData, JdbcV2Wrapper {
private static final Logger log = LoggerFactory.getLogger(DatabaseMetaDataImpl.class);
Expand All @@ -38,12 +43,13 @@ public class DatabaseMetaDataImpl implements java.sql.DatabaseMetaData, JdbcV2Wr
public enum TableType {
DICTIONARY("DICTIONARY"),
LOG_TABLE("LOG TABLE"),
MATERIALIZED_VIEW("MATERIALIZED VIEW"),
MEMORY_TABLE("MEMORY TABLE"),
REMOTE_TABLE("REMOTE TABLE"),
TABLE("TABLE"),
VIEW("VIEW"),
SYSTEM_TABLE("SYSTEM TABLE"),
TEMPORARY_TABLE("TEMPORARY TABLE");
TABLE("TABLE"),
TEMPORARY_TABLE("TEMPORARY TABLE"),
VIEW("VIEW");

private final String typeName;

Expand All @@ -55,8 +61,9 @@ public String getTypeName() {
return typeName;
}
}
public static final String[] TABLE_TYPES = new String[] { "DICTIONARY", "LOG TABLE", "MEMORY TABLE",
"REMOTE TABLE", "TABLE", "VIEW", "SYSTEM TABLE", "TEMPORARY TABLE" };

static final Set<String> TABLE_TYPES = Arrays.stream(TableType.values()).map(TableType::getTypeName).collect(Collectors.toSet());
private static final String TEMPORARY_ENGINE_PREFIX = "Temporary";

private static final String DATABASE_PRODUCT_NAME = "ClickHouse";
private static final String DRIVER_NAME = DATABASE_PRODUCT_NAME + " JDBC Driver";
Expand Down Expand Up @@ -764,6 +771,153 @@ public ResultSet getProcedureColumns(String catalog, String schemaPattern, Strin
}
}

// Map of ClickHouse engine names to JDBC table types
static final Map<String, String> ENGINE_TO_TABLE_TYPE;
static {
Map<String, String> map = new java.util.HashMap<>();

// Log tables
map.put("Log", TableType.LOG_TABLE.getTypeName());
map.put("StripeLog", TableType.LOG_TABLE.getTypeName());
map.put("TinyLog", TableType.LOG_TABLE.getTypeName());

// Memory tables
map.put("Buffer", TableType.MEMORY_TABLE.getTypeName());
map.put("Memory", TableType.MEMORY_TABLE.getTypeName());
map.put("Set", TableType.MEMORY_TABLE.getTypeName());

// Views
map.put("View", TableType.VIEW.getTypeName());
map.put("LiveView", TableType.VIEW.getTypeName());
map.put("MaterializedView", TableType.MATERIALIZED_VIEW.getTypeName());
map.put("WindowView", TableType.VIEW.getTypeName());

// Dictionary
map.put("Dictionary", TableType.DICTIONARY.getTypeName());

// Remote/External tables
map.put("AzureBlobStorage", TableType.REMOTE_TABLE.getTypeName());
map.put("AzureQueue", TableType.REMOTE_TABLE.getTypeName());
map.put("ArrowFlight", TableType.REMOTE_TABLE.getTypeName());
map.put("DeltaLake", TableType.REMOTE_TABLE.getTypeName());
map.put("DeltaLakeAzure", TableType.REMOTE_TABLE.getTypeName());
map.put("DeltaLakeLocal", TableType.REMOTE_TABLE.getTypeName());
map.put("DeltaLakeS3", TableType.REMOTE_TABLE.getTypeName());
map.put("Distributed", TableType.REMOTE_TABLE.getTypeName());
map.put("GCS", TableType.REMOTE_TABLE.getTypeName());
map.put("HDFS", TableType.REMOTE_TABLE.getTypeName());
map.put("Hive", TableType.REMOTE_TABLE.getTypeName());
map.put("Hudi", TableType.REMOTE_TABLE.getTypeName());
map.put("Iceberg", TableType.REMOTE_TABLE.getTypeName());
map.put("IcebergAzure", TableType.REMOTE_TABLE.getTypeName());
map.put("IcebergHDFS", TableType.REMOTE_TABLE.getTypeName());
map.put("IcebergLocal", TableType.REMOTE_TABLE.getTypeName());
map.put("IcebergS3", TableType.REMOTE_TABLE.getTypeName());
map.put("JDBC", TableType.REMOTE_TABLE.getTypeName());
map.put("Kafka", TableType.REMOTE_TABLE.getTypeName());
map.put("MaterializedPostgreSQL", TableType.REMOTE_TABLE.getTypeName());
map.put("MongoDB", TableType.REMOTE_TABLE.getTypeName());
map.put("MySQL", TableType.REMOTE_TABLE.getTypeName());
map.put("NATS", TableType.REMOTE_TABLE.getTypeName());
map.put("ODBC", TableType.REMOTE_TABLE.getTypeName());
map.put("OSS", TableType.REMOTE_TABLE.getTypeName());
map.put("PostgreSQL", TableType.REMOTE_TABLE.getTypeName());
map.put("RabbitMQ", TableType.REMOTE_TABLE.getTypeName());
map.put("Redis", TableType.REMOTE_TABLE.getTypeName());
map.put("S3", TableType.REMOTE_TABLE.getTypeName());
map.put("S3Queue", TableType.REMOTE_TABLE.getTypeName());
map.put("URL", TableType.REMOTE_TABLE.getTypeName());
map.put("YTsaurus", TableType.REMOTE_TABLE.getTypeName());

// Regular tables (MergeTree family and others)
map.put("AggregatingMergeTree", TableType.TABLE.getTypeName());
map.put("Alias", TableType.TABLE.getTypeName());
map.put("CoalescingMergeTree", TableType.TABLE.getTypeName());
map.put("CollapsingMergeTree", TableType.TABLE.getTypeName());
map.put("EmbeddedRocksDB", TableType.TABLE.getTypeName());
map.put("Executable", TableType.TABLE.getTypeName());
map.put("ExecutablePool", TableType.TABLE.getTypeName());
map.put("GraphiteMergeTree", TableType.TABLE.getTypeName());
map.put("Join", TableType.TABLE.getTypeName());
map.put("KeeperMap", TableType.TABLE.getTypeName());
map.put("Merge", TableType.TABLE.getTypeName());
map.put("MergeTree", TableType.TABLE.getTypeName());
map.put("ReplacingMergeTree", TableType.TABLE.getTypeName());
map.put("ReplicatedAggregatingMergeTree", TableType.TABLE.getTypeName());
map.put("ReplicatedCoalescingMergeTree", TableType.TABLE.getTypeName());
map.put("ReplicatedCollapsingMergeTree", TableType.TABLE.getTypeName());
map.put("ReplicatedGraphiteMergeTree", TableType.TABLE.getTypeName());
map.put("ReplicatedMergeTree", TableType.TABLE.getTypeName());
map.put("ReplicatedReplacingMergeTree", TableType.TABLE.getTypeName());
map.put("ReplicatedSummingMergeTree", TableType.TABLE.getTypeName());
map.put("ReplicatedVersionedCollapsingMergeTree", TableType.TABLE.getTypeName());
map.put("SummingMergeTree", TableType.TABLE.getTypeName());
map.put("VersionedCollapsingMergeTree", TableType.TABLE.getTypeName());
map.put("COSN", TableType.TABLE.getTypeName());

// Special
map.put("TimeSeries", TableType.TABLE.getTypeName());
map.put("Null", TableType.TABLE.getTypeName());
map.put("Loop", TableType.TABLE.getTypeName());
map.put("SQLite", TableType.TABLE.getTypeName());
map.put("File", TableType.TABLE.getTypeName());
map.put("FileLog", TableType.TABLE.getTypeName());
map.put("GenerateRandom", TableType.TABLE.getTypeName());
map.put("FuzzJSON", TableType.TABLE.getTypeName());
map.put("FuzzQuery", TableType.TABLE.getTypeName());


ENGINE_TO_TABLE_TYPE = Collections.unmodifiableMap(map);
}

/**
* Converts engine name to table type. Returns TABLE as default for unknown engines.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returns TABLE as default for unknown engines.

if a user is trying to write use an unknown/undefined table engine, shouldn't the client throw an exception and prevent this behavior? Otherwise, their query will fail

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users of older JDBC versions should be able to work with newer servers. Having unknown engine mapped to TABLE let user see them.
Showing error results blocking them until upgrade.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

understood - can you add this to the comment please?

*/
private static String engineToTableType(String engine) {
if (engine == null) {
return TableType.TABLE.getTypeName();
}
// Check for system tables (engines starting with "System" or "Async")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's an example of a table engine that starts with System or Async? I don't see any here: https://clickhouse.com/docs/engines/table-engines

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not listed but present in system database. Here are some.

SELECT
    name,
    database,
    engine
FROM system.tables
WHERE startsWith(engine, 'System') OR startsWith(engine, 'Async')

Query id: da51240b-613a-4d2a-84b1-163298e2ad69

    ┌─name───────────────────────────┬─database─┬─engine─────────────────────────────┐
 1. │ aggregate_function_combinators │ system   │ SystemAggregateFunctionCombinators │
 2. │ asynchronous_inserts           │ system   │ SystemAsynchronousInserts          │
 3. │ asynchronous_loader            │ system   │ SystemAsyncLoader                  │
 4. │ asynchronous_metrics           │ system   │ SystemAsynchronousMetrics          │
 5. │ azure_queue                    │ system   │ SystemAzureQueue                   │
 6. │ azure_queue_settings           │ system   │ SystemAzureQueueSettings           │

if (isSystemTableEngine(engine)) {
return TableType.SYSTEM_TABLE.getTypeName();
}
return ENGINE_TO_TABLE_TYPE.getOrDefault(engine, TableType.TABLE.getTypeName());
}

/**
* Returns set of engines that map to any of the given table types.
*/
private static Set<String> getEnginesForTableTypes(final Set<String> requestedTypes) {
return ENGINE_TO_TABLE_TYPE.entrySet().stream()
.filter(entry -> requestedTypes.contains(entry.getValue()))
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
}

public static boolean isSystemTableEngine(String engine) {
return engine != null && (engine.startsWith("System") || engine.startsWith("Async"));
}

private static final String TABLE_TYPE_COL_IN_GET_TABLES = "TABLE_TYPE";

private static final Consumer<Map<String, Object>> TABLE_TYPE_MUTATOR = row -> {
String engine = (String) row.get(TABLE_TYPE_COL_IN_GET_TABLES);

String tableType;
if (engine != null && engine.startsWith(TEMPORARY_ENGINE_PREFIX)) {
tableType = TableType.TEMPORARY_TABLE.getTypeName();
} else if (isSystemTableEngine(engine)) {
tableType = TableType.SYSTEM_TABLE.getTypeName();
} else {
tableType = engineToTableType(engine);
}

row.put(TABLE_TYPE_COL_IN_GET_TABLES, tableType);
};

private static final Collection<Consumer<Map<String, Object>>> GET_TABLES_MUTATORS = Collections.singletonList(TABLE_TYPE_MUTATOR);


/**
* Returns tables defined for a schema. Parameter {@code catalog} is ignored
*
Expand All @@ -774,24 +928,47 @@ public ResultSet getProcedureColumns(String catalog, String schemaPattern, Strin
public ResultSet getTables(String catalog, String schemaPattern, String tableNamePattern, String[] types) throws SQLException {
log.debug("getTables: catalog={}, schemaPattern={}, tableNamePattern={}, types={}", catalog, schemaPattern, tableNamePattern, types);
// TODO: when switch between catalog and schema is implemented, then TABLE_SCHEMA and TABLE_CAT should be populated accordingly
// String commentColumn = connection.getServerVersion().check("[21.6,)") ? "t.comment" : "''";
// TODO: handle useCatalogs == true and return schema catalog name
if (types == null || types.length == 0) {
types = TABLE_TYPES;

// Get engines that map to the requested table types
Set<String> requestedTypes = (types == null || types.length == 0) ? TABLE_TYPES : Arrays.stream(types).collect(Collectors.toSet()) ;
Set<String> engines = getEnginesForTableTypes(requestedTypes);

// Build engine filter conditions
List<String> filterConditions = new ArrayList<>();

// Add condition for engines that map to requested types
if (!engines.isEmpty()) {
filterConditions.add("(t.engine IN ('" + String.join("','", engines) + "'))");
}

// If TABLE type is requested, also include engines not in our map (they default to TABLE)
if (requestedTypes.contains(TableType.TABLE.getTypeName())) {
filterConditions.add("(t.engine NOT IN ('" + String.join("','", ENGINE_TO_TABLE_TYPE.keySet()) +
"') AND NOT t.engine LIKE 'System%' AND NOT t.engine LIKE 'Async%' AND t.is_temporary = 0)");
}

// If SYSTEM TABLE is requested, include system engines (System* and Async*)
if (requestedTypes.contains(TableType.SYSTEM_TABLE.getTypeName())) {
filterConditions.add("(t.engine LIKE 'System%' OR t.engine LIKE 'Async%')");
}

// If TEMPORARY TABLE is requested, include temporary tables
if (requestedTypes.contains(TableType.TEMPORARY_TABLE.getTypeName())) {
filterConditions.add("(t.is_temporary = 1)");
}

String engineFilter = filterConditions.isEmpty() ? "" : " AND ( " + String.join(" OR ", filterConditions) + ")";
// Exclude temporary tables when not requested (they would otherwise match engine-based conditions)
if (!requestedTypes.contains(TableType.TEMPORARY_TABLE.getTypeName())) {
engineFilter += " AND (t.is_temporary = 0)";
}

String sql = "SELECT " +
catalogPlaceholder + " AS TABLE_CAT, " +
"t.database AS TABLE_SCHEM, " +
"t.name AS TABLE_NAME, " +
"CASE WHEN t.engine LIKE '%Log' THEN 'LOG TABLE' " +
"WHEN t.engine in ('Buffer', 'Memory', 'Set') THEN 'MEMORY TABLE' " +
"WHEN t.is_temporary != 0 THEN 'TEMPORARY TABLE' " +
"WHEN t.engine like '%View' THEN 'VIEW'" +
"WHEN t.engine = 'Dictionary' THEN 'DICTIONARY' " +
"WHEN t.engine LIKE 'Async%' OR t.engine LIKE 'System%' THEN 'SYSTEM TABLE' " +
"WHEN empty(t.data_paths) THEN 'REMOTE TABLE' " +
"ELSE 'TABLE' END AS TABLE_TYPE, " +
"if(t.is_temporary = 1, concat('Temporary', t.engine), t.engine) AS TABLE_TYPE, " +
"t.comment AS REMARKS, " +
"CAST(null as Nullable(String)) AS TYPE_CAT, " + // no types catalog
"d.engine AS TYPE_SCHEM, " + // no types schema
Expand All @@ -800,12 +977,16 @@ public ResultSet getTables(String catalog, String schemaPattern, String tableNam
"CAST(null as Nullable(String)) AS REF_GENERATION" +
" FROM system.tables t" +
" JOIN system.databases d ON system.tables.database = system.databases.name" +
" WHERE t.database LIKE '" + (schemaPattern == null ? "%" : schemaPattern) + "'" +
" AND t.name LIKE '" + (tableNamePattern == null ? "%" : tableNamePattern) + "'" +
" AND TABLE_TYPE IN ('" + String.join("','", types) + "')";

try {
return connection.createStatement().executeQuery(sql);
" WHERE t.database LIKE ?" +
" AND t.name LIKE ?"
+ engineFilter;

try (PreparedStatement stmt = connection.prepareStatement(sql)) {
stmt.setString(1, (schemaPattern == null ? "%" : schemaPattern));
stmt.setString(2, (tableNamePattern == null ? "%" : tableNamePattern));
try (ResultSet rs = stmt.executeQuery()) {
return DetachedResultSet.createFromResultSet(rs, connection.getDefaultCalendar(), GET_TABLES_MUTATORS);
}
} catch (Exception e) {
throw ExceptionUtils.toSqlState(e);
}
Expand Down Expand Up @@ -844,15 +1025,20 @@ public ResultSet getCatalogs() throws SQLException {
}
}


static final List<String> TABLE_TYPES_SQL_ARRAY = Arrays.stream(TableType.values()).map(TableType::getTypeName).collect(Collectors.toList());
/**
* Returns name of the ClickHouse table types as the broad category (rather than engine name).
* @return - ResultSet with one column TABLE_TYPE
* @throws SQLException - if an error occurs
*/
@Override
public ResultSet getTableTypes() throws SQLException {
try {
return connection.createStatement().executeQuery("SELECT arrayJoin(['" + String.join("','", TABLE_TYPES) + "']) AS TABLE_TYPE ORDER BY TABLE_TYPE");
try (PreparedStatement stmt = connection.prepareStatement("SELECT arrayJoin(?) AS TABLE_TYPE ORDER BY TABLE_TYPE")) {
stmt.setObject(1, TABLE_TYPES_SQL_ARRAY);
try (ResultSet rs = stmt.executeQuery()) {
return DetachedResultSet.createFromResultSet(rs, connection.getDefaultCalendar(), Collections.emptyList());
}
} catch (Exception e) {
throw ExceptionUtils.toSqlState(e);
}
Expand Down
Loading
Loading