Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added new configuration to pass catalog name to metastore when creati… #24235

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
6b6b4dd
Added new configuration to pass catalog name to metastore when creati…
AnuragKDwivedi Dec 10, 2024
bb6b416
Merge branch 'master' into catalog-name-to-metastore
AnuragKDwivedi Dec 11, 2024
ffc768f
Added default value for new properties in test config
AnuragKDwivedi Dec 11, 2024
79bae22
Revert "Added default value for new properties in test config"
AnuragKDwivedi Dec 11, 2024
e72d7ff
added default values for the new property in test config
AnuragKDwivedi Dec 11, 2024
5032b90
Passing catalog information to metastore by concating it with schema …
AnuragKDwivedi Jan 27, 2025
1df9099
Merge branch 'master' into catalog-name-to-metastore
AnuragKDwivedi Jan 27, 2025
771f8cf
Added catalog name in partition object
AnuragKDwivedi Jan 28, 2025
6721fc4
moved method to common class
AnuragKDwivedi Jan 28, 2025
fb015cb
test case fix
AnuragKDwivedi Jan 29, 2025
ec7b8c6
Added catalog name in test cases
AnuragKDwivedi Jan 29, 2025
2e19034
Added new method to make call to get_databases
AnuragKDwivedi Jan 30, 2025
17509da
Fixed unit test case failures
AnuragKDwivedi Jan 30, 2025
23c56d5
Test case fixed
AnuragKDwivedi Jan 30, 2025
2b165a4
Revised getDatabases implementation to pull all database names for a …
AnuragKDwivedi Jan 30, 2025
2bc3d16
Refactored list table and list view code along with column statistics…
AnuragKDwivedi Jan 31, 2025
825c881
Fixed test cases and added catalog with schema name in rename view me…
AnuragKDwivedi Jan 31, 2025
18a057a
fix unit test case failures
AnuragKDwivedi Jan 31, 2025
53c4275
test case fixes
AnuragKDwivedi Feb 2, 2025
f9c1d94
removed unused import
AnuragKDwivedi Feb 2, 2025
c97e5bf
Updated new config in presto doc for all the connectors where this ne…
AnuragKDwivedi Feb 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import javax.annotation.Nullable;

import java.util.Optional;
import java.util.function.Supplier;

import static com.facebook.presto.common.type.TypeUtils.readNativeValue;
Expand All @@ -29,6 +30,10 @@

public final class Utils
{
private static final CharSequence CATALOG_DB_SEPARATOR = "#";
private static final Object CATALOG_DB_THRIFT_NAME_MARKER = "@";
private static final String DEFAULT_DATABASE = "default";

private Utils()
{
}
Expand Down Expand Up @@ -126,4 +131,23 @@ public static void checkState(boolean test, String errorMessage)
throw new IllegalStateException(errorMessage);
}
}

/**
* Constructs the schema name, including catalog name if applicable.
*
* @param schemaName the original schema name
* @return the formatted schema name (Example - @catalog_name#schema_name)
*/
public static String constructSchemaName(Optional<String> catalogName, String schemaName)
{
if (catalogName.isPresent() && schemaName != null && !schemaName.equals(DEFAULT_DATABASE) && !schemaName.contains(CATALOG_DB_SEPARATOR)) {
return String.format(
"%s%s%s%s",
CATALOG_DB_THRIFT_NAME_MARKER,
catalogName.get(),
CATALOG_DB_SEPARATOR,
schemaName);
}
return schemaName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.HiveCommonClientConfig;
import com.facebook.presto.hive.HiveStorageFormat;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.HivePrivilegeInfo;
Expand Down Expand Up @@ -53,6 +54,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import static com.facebook.presto.common.Utils.constructSchemaName;
import static com.facebook.presto.delta.DeltaColumnHandle.ColumnType.PARTITION;
import static com.facebook.presto.delta.DeltaColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.delta.DeltaExpressionUtils.splitPredicate;
Expand Down Expand Up @@ -87,27 +89,36 @@ public class DeltaMetadata
private final ExtendedHiveMetastore metastore;
private final TypeManager typeManager;
private final DeltaConfig config;
private final String catalogName;

@Inject
public DeltaMetadata(
DeltaConnectorId connectorId,
DeltaClient deltaClient,
ExtendedHiveMetastore metastore,
TypeManager typeManager,
DeltaConfig config)
DeltaConfig config, HiveCommonClientConfig commonConfig)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
this.deltaClient = requireNonNull(deltaClient, "deltaClient is null");
this.metastore = requireNonNull(metastore, "metastore is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.config = requireNonNull(config, "config is null");
this.catalogName = commonConfig.getCatalogName();
}

@Override
public List<String> listSchemaNames(ConnectorSession session)
{
ArrayList<String> schemas = new ArrayList<>();
schemas.addAll(metastore.getAllDatabases(metastoreContext(session)));
List<String> schemaNames;
if (catalogName != null) {
schemaNames = metastore.getDatabases(metastoreContext(session), constructSchemaName(Optional.of(catalogName), ""));
}
else {
schemaNames = metastore.getAllDatabases(metastoreContext(session));
}
schemas.addAll(schemaNames);
schemas.add(PATH_SCHEMA.toLowerCase(US));
return schemas;
}
Expand All @@ -131,15 +142,16 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle
{
DeltaTableHandle handle = (DeltaTableHandle) tableHandle;
MetastoreContext metastoreContext = metastoreContext(session);
String schemaName = constructSchemaName(Optional.ofNullable(catalogName), handle.getDeltaTable().getSchemaName());

Optional<Table> target = metastore.getTable(metastoreContext, handle.getDeltaTable().getSchemaName(), handle.getDeltaTable().getTableName());
Optional<Table> target = metastore.getTable(metastoreContext, schemaName, handle.getDeltaTable().getTableName());
if (!target.isPresent()) {
throw new TableNotFoundException(handle.toSchemaTableName());
}

metastore.dropTable(
metastoreContext,
handle.getDeltaTable().getSchemaName(),
schemaName,
handle.getDeltaTable().getTableName(),
false);
}
Expand All @@ -164,6 +176,7 @@ private Table prepareTable(ConnectorSession session, ConnectorTableMetadata tabl
}

Table.Builder tableBuilder = Table.builder()
.setCatalogName(Optional.ofNullable(catalogName))
.setDatabaseName(schemaName)
.setTableName(tableName)
.setOwner(session.getUser())
Expand Down Expand Up @@ -195,7 +208,7 @@ public DeltaTableHandle getTableHandle(ConnectorSession session, SchemaTableName
tableLocation = deltaTableName.getTableNameOrPath();
}
else {
Optional<Table> metastoreTable = metastore.getTable(metastoreContext(session), schemaName, deltaTableName.getTableNameOrPath());
Optional<Table> metastoreTable = metastore.getTable(metastoreContext(session), constructSchemaName(Optional.ofNullable(catalogName), schemaName), deltaTableName.getTableNameOrPath());
if (!metastoreTable.isPresent()) {
return null; // indicates table doesn't exist
}
Expand Down Expand Up @@ -284,7 +297,7 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
.orElseGet(() -> listSchemaNames(session));
ImmutableList.Builder<SchemaTableName> tableNames = ImmutableList.builder();
for (String schema : schemaNames) {
for (String tableName : metastore.getAllTables(metastoreContext(session), schema).orElse(emptyList())) {
for (String tableName : metastore.getAllTables(metastoreContext(session), constructSchemaName(Optional.ofNullable(catalogName), schema)).orElse(emptyList())) {
tableNames.add(new SchemaTableName(schema, tableName));
}
}
Expand Down
1 change: 1 addition & 0 deletions presto-docs/src/main/sphinx/connector/deltalake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Property Name Description
``true``.
``delta.case-sensitive-partitions-enabled`` Allows matching the names of partitioned columns in a ``true``
case-sensitive manner.
``hive.metastore.catalog.name`` Enable passing the catalog name to store the metastore.
=============================================== ========================================================= ============

Delta Lake connector reuses many of the modules existing in Hive connector.
Expand Down
2 changes: 2 additions & 0 deletions presto-docs/src/main/sphinx/connector/hive.rst
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ Property Name Description
error iterating through empty files.

``hive.file-status-cache.max-retained-size`` Maximum size in bytes of the directory listing cache ``0KB``

``hive.metastore.catalog.name`` Enable passing the catalog name to store the metastore.
================================================== ============================================================ ============

Metastore Configuration Properties
Expand Down
2 changes: 2 additions & 0 deletions presto-docs/src/main/sphinx/connector/hudi.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ Property Name Description
======================================= ============================================= ===========
``hudi.metadata-table-enabled`` Fetch the list of file names and sizes from false
Hudi's metadata table rather than storage.
``hive.metastore.catalog.name`` Enable passing the catalog name to store
the metastore.
======================================= ============================================= ===========

SQL Support
Expand Down
1 change: 1 addition & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ Property Name Description

``iceberg.hive.table-refresh.backoff-scale-factor`` The multiple used to scale subsequent wait time between 4.0
retries.
``hive.metastore.catalog.name`` Enable passing the catalog name to store the metastore.
======================================================== ============================================================= ============

Nessie catalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class HiveCommonClientConfig
private boolean readNullMaskedParquetEncryptedValueEnabled;
private boolean useParquetColumnNames;
private boolean zstdJniDecompressionEnabled;
private String catalogName;

public NodeSelectionStrategy getNodeSelectionStrategy()
{
Expand Down Expand Up @@ -284,4 +285,17 @@ public HiveCommonClientConfig setZstdJniDecompressionEnabled(boolean zstdJniDeco
this.zstdJniDecompressionEnabled = zstdJniDecompressionEnabled;
return this;
}

public String getCatalogName()
{
return catalogName;
}

@Config("hive.metastore.catalog.name")
@ConfigDescription("Specified property to store the metastore catalog name.")
public HiveCommonClientConfig setCatalogName(String catalogName)
{
this.catalogName = catalogName;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public void testDefaults()
.setZstdJniDecompressionEnabled(false)
.setParquetBatchReaderVerificationEnabled(false)
.setParquetBatchReadOptimizationEnabled(false)
.setReadNullMaskedParquetEncryptedValue(false));
.setReadNullMaskedParquetEncryptedValue(false)
.setCatalogName(null));
}

@Test
Expand All @@ -72,6 +73,7 @@ public void testExplicitPropertyMappings()
.put("hive.enable-parquet-batch-reader-verification", "true")
.put("hive.parquet-batch-read-optimization-enabled", "true")
.put("hive.read-null-masked-parquet-encrypted-value-enabled", "true")
.put("hive.metastore.catalog.name", "catalogName")
.build();

HiveCommonClientConfig expected = new HiveCommonClientConfig()
Expand All @@ -92,7 +94,8 @@ public void testExplicitPropertyMappings()
.setZstdJniDecompressionEnabled(true)
.setParquetBatchReaderVerificationEnabled(true)
.setParquetBatchReadOptimizationEnabled(true)
.setReadNullMaskedParquetEncryptedValue(true);
.setReadNullMaskedParquetEncryptedValue(true)
.setCatalogName("catalogName");

ConfigAssertions.assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.facebook.presto.hive.HiveCoercionPolicy;
import com.facebook.presto.hive.HiveColumnConverterProvider;
import com.facebook.presto.hive.HiveColumnHandle;
import com.facebook.presto.hive.HiveCommonClientConfig;
import com.facebook.presto.hive.HiveEncryptionInformationProvider;
import com.facebook.presto.hive.HiveFileRenamer;
import com.facebook.presto.hive.HiveHdfsConfiguration;
Expand Down Expand Up @@ -177,7 +178,9 @@ public S3SelectTestHelper(String host,
new HiveFileRenamer(),
columnConverterProvider,
new QuickStatsProvider(metastoreClient, hdfsEnvironment, DO_NOTHING_DIRECTORY_LISTER, new HiveClientConfig(), new NamenodeStats(), ImmutableList.of()),
new HiveTableWritabilityChecker(config));
new HiveTableWritabilityChecker(config),
new HiveCommonClientConfig());

transactionManager = new HiveTransactionManager();
splitManager = new HiveSplitManager(
transactionManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class Database
private final PrincipalType ownerType;
private final Optional<String> comment;
private final Map<String, String> parameters;
private final Optional<String> catalogName;

@JsonCreator
public Database(
Expand All @@ -47,14 +48,16 @@ public Database(
@JsonProperty("ownerName") String ownerName,
@JsonProperty("ownerType") PrincipalType ownerType,
@JsonProperty("comment") Optional<String> comment,
@JsonProperty("parameters") Map<String, String> parameters)
@JsonProperty("parameters") Map<String, String> parameters,
@JsonProperty("catalogName") Optional<String> catalogName)
{
this.databaseName = requireNonNull(databaseName, "databaseName is null");
this.location = requireNonNull(location, "location is null");
this.ownerName = requireNonNull(ownerName, "ownerName is null");
this.ownerType = requireNonNull(ownerType, "ownerType is null");
this.comment = requireNonNull(comment, "comment is null");
this.parameters = ImmutableMap.copyOf(requireNonNull(parameters, "parameters is null"));
this.catalogName = requireNonNull(catalogName, "catalogName is null");
}

@JsonProperty
Expand Down Expand Up @@ -103,6 +106,12 @@ public static Builder builder(Database database)
return new Builder(database);
}

@JsonProperty
public Optional<String> getCatalogName()
{
return catalogName;
}

public static class Builder
{
private String databaseName;
Expand All @@ -111,6 +120,7 @@ public static class Builder
private PrincipalType ownerType;
private Optional<String> comment = Optional.empty();
private Map<String, String> parameters = new LinkedHashMap<>();
private Optional<String> catalogName = Optional.empty();

public Builder() {}

Expand All @@ -122,6 +132,7 @@ public Builder(Database database)
this.ownerType = database.ownerType;
this.comment = database.comment;
this.parameters = database.parameters;
this.catalogName = database.catalogName;
}

public Builder setDatabaseName(String databaseName)
Expand Down Expand Up @@ -166,6 +177,13 @@ public Builder setParameters(Map<String, String> parameters)
return this;
}

public Builder setCatalogName(Optional<String> catalogName)
{
requireNonNull(catalogName, "catalogName is null");
this.catalogName = catalogName;
return this;
}

public Database build()
{
return new Database(
Expand All @@ -174,7 +192,8 @@ public Database build()
ownerName,
ownerType,
comment,
parameters);
parameters,
catalogName);
}
}

Expand All @@ -188,6 +207,7 @@ public String toString()
.add("ownerType", ownerType)
.add("comment", comment)
.add("parameters", parameters)
.add("catalogName", catalogName)
.toString();
}

Expand All @@ -207,12 +227,13 @@ public boolean equals(Object o)
Objects.equals(ownerName, database.ownerName) &&
ownerType == database.ownerType &&
Objects.equals(comment, database.comment) &&
Objects.equals(parameters, database.parameters);
Objects.equals(parameters, database.parameters) &&
Objects.equals(catalogName, database.catalogName);
}

@Override
public int hashCode()
{
return Objects.hash(databaseName, location, ownerName, ownerType, comment, parameters);
return Objects.hash(databaseName, location, ownerName, ownerType, comment, parameters, catalogName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@

public interface ExtendedHiveMetastore
{
default List<String> getDatabases(MetastoreContext metastoreContext, String pattern)
{
return ImmutableList.of();
}

Optional<Database> getDatabase(MetastoreContext metastoreContext, String databaseName);

List<String> getAllDatabases(MetastoreContext metastoreContext);
Expand Down
Loading
Loading