Skip to content

Commit

Permalink
Subclass FileHiveMetastore for Iceberg connector
Browse files Browse the repository at this point in the history
  • Loading branch information
hantangwangd authored and yingsu00 committed Feb 19, 2025
1 parent d7c0930 commit 4a0bbc9
Show file tree
Hide file tree
Showing 12 changed files with 454 additions and 280 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
import com.facebook.presto.hive.PartitionMutator;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.HiveMetastoreCacheStats;
import com.facebook.presto.hive.metastore.HiveMetastoreModule;
import com.facebook.presto.hive.metastore.HivePartitionMutator;
import com.facebook.presto.hive.metastore.InMemoryCachingHiveMetastore;
import com.facebook.presto.hive.metastore.MetastoreCacheStats;
import com.facebook.presto.hive.metastore.MetastoreConfig;
import com.facebook.presto.iceberg.hive.IcebergHiveMetastoreModule;
import com.google.inject.Binder;
import com.google.inject.Scopes;

Expand All @@ -48,7 +48,7 @@ public IcebergHiveModule(String connectorId, Optional<ExtendedHiveMetastore> met
@Override
public void setup(Binder binder)
{
install(new HiveMetastoreModule(this.connectorId, this.metastore));
install(new IcebergHiveMetastoreModule(this.connectorId, this.metastore));
binder.bind(ExtendedHiveMetastore.class).to(InMemoryCachingHiveMetastore.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(IcebergHiveTableOperationsConfig.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg.hive;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.hive.metastore.file.FileHiveMetastore;
import com.facebook.presto.hive.metastore.file.FileHiveMetastoreConfig;
import com.facebook.presto.spi.PrestoException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;

import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;

import java.io.IOException;
import java.util.Optional;

import static com.facebook.presto.hive.HiveErrorCode.HIVE_METASTORE_ERROR;
import static java.lang.String.format;

@ThreadSafe
public class IcebergFileHiveMetastore
extends FileHiveMetastore
{
private static final Logger LOG = Logger.get(IcebergFileHiveMetastore.class);

@Inject
public IcebergFileHiveMetastore(HdfsEnvironment hdfsEnvironment, FileHiveMetastoreConfig config)
{
this(hdfsEnvironment, config.getCatalogDirectory(), config.getMetastoreUser());
}

public IcebergFileHiveMetastore(HdfsEnvironment hdfsEnvironment, String catalogDirectory, String metastoreUser)
{
super(hdfsEnvironment, catalogDirectory, metastoreUser);
}

@Override
protected void validateExternalLocation(Path externalLocation, Path catalogDirectory)
throws IOException
{
FileSystem externalFileSystem = hdfsEnvironment.getFileSystem(hdfsContext, externalLocation);
if (!externalFileSystem.isDirectory(externalLocation)) {
throw new PrestoException(HIVE_METASTORE_ERROR, "External table location does not exist");
}
}

@Override
protected void validateReplaceTableType(Table originTable, Table newTable)
{}

@Override
protected void renameTable(Path originalMetadataDirectory, Path newMetadataDirectory)
{
Optional<Runnable> rollbackAction = Optional.empty();
try {
// If the directory `.prestoPermissions` exists, copy it to the new table metadata directory
Path originTablePermissionDir = new Path(originalMetadataDirectory, PRESTO_PERMISSIONS_DIRECTORY_NAME);
Path newTablePermissionDir = new Path(newMetadataDirectory, PRESTO_PERMISSIONS_DIRECTORY_NAME);
if (metadataFileSystem.exists(originTablePermissionDir)) {
if (!FileUtil.copy(metadataFileSystem, originTablePermissionDir,
metadataFileSystem, newTablePermissionDir, false, metadataFileSystem.getConf())) {
throw new IOException(format("Could not rename table. Failed to copy directory: %s to %s", originTablePermissionDir, newTablePermissionDir));
}
else {
rollbackAction = Optional.of(() -> {
try {
metadataFileSystem.delete(newTablePermissionDir, true);
}
catch (IOException e) {
// Ignore the exception and print a warn level log
LOG.warn("Could not delete table permission directory: %s", newTablePermissionDir);
}
});
}
}

// Rename file `.prestoSchema` to change it to the new metadata path
// This will atomically execute the table renaming behavior
Path originMetadataFile = new Path(originalMetadataDirectory, PRESTO_SCHEMA_FILE_NAME);
Path newMetadataFile = new Path(newMetadataDirectory, PRESTO_SCHEMA_FILE_NAME);
renamePath(originMetadataFile, newMetadataFile,
format("Could not rename table. Failed to rename file %s to %s", originMetadataFile, newMetadataFile));

// Subsequent action, delete the redundant directory `.prestoPermissions` from the original table metadata path
try {
metadataFileSystem.delete(new Path(originalMetadataDirectory, PRESTO_PERMISSIONS_DIRECTORY_NAME), true);
}
catch (IOException e) {
// Ignore the exception and print a warn level log
LOG.warn("Could not delete table permission directory: %s", originalMetadataDirectory);
}
}
catch (IOException e) {
// If table renaming fails and rollback action has already been recorded, perform the rollback action to clean up junk files
rollbackAction.ifPresent(Runnable::run);
throw new PrestoException(HIVE_METASTORE_ERROR, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg.hive;

import com.facebook.presto.hive.ForCachingHiveMetastore;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.InMemoryCachingHiveMetastore;
import com.facebook.presto.hive.metastore.file.FileHiveMetastoreConfig;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;

import static com.facebook.airlift.configuration.ConfigBinder.configBinder;
import static java.util.Objects.requireNonNull;
import static org.weakref.jmx.ObjectNames.generatedNameOf;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

public class IcebergHiveFileMetastoreModule
implements Module
{
private final String connectorId;

public IcebergHiveFileMetastoreModule(String connectorId)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
}

@Override
public void configure(Binder binder)
{
configBinder(binder).bindConfig(FileHiveMetastoreConfig.class);
binder.bind(ExtendedHiveMetastore.class).annotatedWith(ForCachingHiveMetastore.class).to(IcebergFileHiveMetastore.class).in(Scopes.SINGLETON);
binder.bind(ExtendedHiveMetastore.class).to(InMemoryCachingHiveMetastore.class).in(Scopes.SINGLETON);
newExporter(binder).export(ExtendedHiveMetastore.class)
.as(generatedNameOf(InMemoryCachingHiveMetastore.class, connectorId));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg.hive;

import com.facebook.airlift.configuration.AbstractConfigurationAwareModule;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.MetastoreConfig;
import com.facebook.presto.hive.metastore.glue.GlueMetastoreModule;
import com.facebook.presto.hive.metastore.thrift.ThriftMetastoreModule;
import com.google.inject.Binder;
import com.google.inject.Module;

import java.util.Optional;

import static com.facebook.airlift.configuration.ConditionalModule.installModuleIf;

public class IcebergHiveMetastoreModule
extends AbstractConfigurationAwareModule
{
private final String connectorId;
private final Optional<ExtendedHiveMetastore> metastore;

public IcebergHiveMetastoreModule(String connectorId, Optional<ExtendedHiveMetastore> metastore)
{
this.connectorId = connectorId;
this.metastore = metastore;
}

@Override
protected void setup(Binder binder)
{
if (metastore.isPresent()) {
binder.bind(ExtendedHiveMetastore.class).toInstance(metastore.get());
}
else {
bindMetastoreModule("thrift", new ThriftMetastoreModule(connectorId));
bindMetastoreModule("file", new IcebergHiveFileMetastoreModule(connectorId));
bindMetastoreModule("glue", new GlueMetastoreModule(connectorId));
}
}

private void bindMetastoreModule(String name, Module module)
{
install(installModuleIf(
MetastoreConfig.class,
metastore -> name.equalsIgnoreCase(metastore.getMetastoreType()),
module));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.hive.metastore.file.FileHiveMetastore;
import com.facebook.presto.iceberg.hive.IcebergFileHiveMetastore;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.tests.DistributedQueryRunner;
Expand Down Expand Up @@ -250,7 +250,7 @@ private static ExtendedHiveMetastore getFileHiveMetastore(Path dataDirectory)
MetastoreClientConfig metastoreClientConfig = new MetastoreClientConfig();
HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hiveClientConfig, metastoreClientConfig), ImmutableSet.of(), hiveClientConfig);
HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, metastoreClientConfig, new NoHdfsAuthentication());
return new FileHiveMetastore(hdfsEnvironment, dataDirectory.toFile().toURI().toString(), "test");
return new IcebergFileHiveMetastore(hdfsEnvironment, dataDirectory.toFile().toURI().toString(), "test");
}

public static Path getIcebergDataDirectoryPath(Path dataDirectory, String catalogType, FileFormat format, boolean addStorageFormatToPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package com.facebook.presto.iceberg.hive;

import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.file.FileHiveMetastore;
import com.facebook.presto.iceberg.IcebergDistributedTestBase;
import com.facebook.presto.iceberg.IcebergHiveTableOperationsConfig;
import com.facebook.presto.iceberg.IcebergUtil;
Expand Down Expand Up @@ -82,7 +81,7 @@ protected Table loadTable(String tableName)

protected ExtendedHiveMetastore getFileHiveMetastore()
{
FileHiveMetastore fileHiveMetastore = new FileHiveMetastore(getHdfsEnvironment(),
IcebergFileHiveMetastore fileHiveMetastore = new IcebergFileHiveMetastore(getHdfsEnvironment(),
getCatalogDirectory().getPath(),
"test");
return memoizeMetastore(fileHiveMetastore, false, 1000, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.facebook.presto.hive.MetastoreClientConfig;
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.file.FileHiveMetastore;
import com.facebook.presto.iceberg.CatalogType;
import com.facebook.presto.iceberg.IcebergColumnHandle;
import com.facebook.presto.iceberg.IcebergHiveTableOperationsConfig;
Expand Down Expand Up @@ -579,7 +578,7 @@ private Table loadTable(String tableName)

protected ExtendedHiveMetastore getFileHiveMetastore()
{
FileHiveMetastore fileHiveMetastore = new FileHiveMetastore(getHdfsEnvironment(),
IcebergFileHiveMetastore fileHiveMetastore = new IcebergFileHiveMetastore(getHdfsEnvironment(),
Optional.of(getCatalogDirectory(HIVE))
.filter(File::exists)
.map(File::getPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.facebook.presto.hive.MetastoreClientConfig;
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.file.FileHiveMetastore;
import com.facebook.presto.iceberg.IcebergConfig;
import com.facebook.presto.iceberg.IcebergDistributedSmokeTestBase;
import com.facebook.presto.iceberg.IcebergHiveTableOperationsConfig;
Expand Down Expand Up @@ -68,7 +67,7 @@ protected static HdfsEnvironment getHdfsEnvironment()

protected ExtendedHiveMetastore getFileHiveMetastore()
{
FileHiveMetastore fileHiveMetastore = new FileHiveMetastore(getHdfsEnvironment(),
IcebergFileHiveMetastore fileHiveMetastore = new IcebergFileHiveMetastore(getHdfsEnvironment(),
getCatalogDirectory().toFile().getPath(),
"test");
return memoizeMetastore(fileHiveMetastore, false, 1000, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.file.DatabaseMetadata;
import com.facebook.presto.hive.metastore.file.FileHiveMetastore;
import com.facebook.presto.hive.metastore.file.FileHiveMetastoreConfig;
import com.facebook.presto.hive.metastore.file.TableMetadata;
import com.facebook.presto.iceberg.CommitTaskData;
Expand Down Expand Up @@ -347,7 +346,7 @@ private void testRenameTableWithFailSignalAndValidation(FailSignal failSignal, R
{
FileHiveMetastoreConfig config = createFileHiveMetastoreConfig();
TestingHdfsEnvironment hdfsEnvironment = getTestingHdfsEnvironment();
FileHiveMetastore metastore = new FileHiveMetastore(hdfsEnvironment, config);
IcebergFileHiveMetastore metastore = new IcebergFileHiveMetastore(hdfsEnvironment, config);
IcebergHiveMetadata icebergHiveMetadata = (IcebergHiveMetadata) getIcebergHiveMetadata(metastore);
ExtendedFileSystem fileSystem = hdfsEnvironment.getFileSystem(connectorSession.getUser(), new Path(originSchemaMetadataPath), new Configuration());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.hive.metastore.file.FileHiveMetastore;
import com.facebook.presto.iceberg.IcebergConfig;
import com.facebook.presto.iceberg.IcebergPlugin;
import com.facebook.presto.iceberg.hive.IcebergFileHiveMetastore;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.testing.QueryRunner;
Expand Down Expand Up @@ -540,7 +540,7 @@ protected static HdfsEnvironment getHdfsEnvironment()

protected ExtendedHiveMetastore getFileHiveMetastore()
{
FileHiveMetastore fileHiveMetastore = new FileHiveMetastore(getHdfsEnvironment(),
IcebergFileHiveMetastore fileHiveMetastore = new IcebergFileHiveMetastore(getHdfsEnvironment(),
getCatalogDirectory().toFile().getPath(),
"test");
return memoizeMetastore(fileHiveMetastore, false, 1000, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
import com.facebook.presto.hive.HiveColumnConverterProvider;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.hive.metastore.file.FileHiveMetastore;
import com.facebook.presto.iceberg.HiveTableOperations;
import com.facebook.presto.iceberg.IcebergHiveTableOperationsConfig;
import com.facebook.presto.iceberg.IcebergUtil;
import com.facebook.presto.iceberg.hive.IcebergFileHiveMetastore;
import com.facebook.presto.metadata.CatalogManager;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorId;
Expand Down Expand Up @@ -108,7 +108,7 @@ void dropTableFromCatalog(String tableName)

private ExtendedHiveMetastore getFileHiveMetastore()
{
FileHiveMetastore fileHiveMetastore = new FileHiveMetastore(getHdfsEnvironment(),
IcebergFileHiveMetastore fileHiveMetastore = new IcebergFileHiveMetastore(getHdfsEnvironment(),
getCatalogDirectory(HIVE).getPath(),
"test");
return memoizeMetastore(fileHiveMetastore, false, 1000, 0);
Expand Down

0 comments on commit 4a0bbc9

Please sign in to comment.