Skip to content

Initial integration for hudi tables within Polaris #1862

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugins/spark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,9 @@ Following describes the current functionality limitations of the Polaris Spark c
3) Rename a Delta table is not supported.
4) ALTER TABLE ... SET LOCATION is not supported for DELTA table.
5) For other non-Iceberg tables like csv, it is not supported today.

### Hudi Support
Currently support for Hudi tables within the Polaris catalog is still under development.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused as to why we would need this. If the integration needs changes on the Hudi side to work, why would we merge anything into Polaris now?

Copy link
Author

@rahil-c rahil-c Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First off, we have already landed the initial changes on hudi side apache/hudi#13558 in order to integrate Hudi with Polaris, based on discussion between members of the Polaris community and Hudi community.

We have discussed between both communities that Polaris would need the latest Hudi release artifact, and for that we will need to do a hudi point release which I have already started the thread here https://lists.apache.org/thread/4ztwgclljojg7r08mzm2dkynwfrvjlqb.

As doing a release for any open source project can be timely(with alot of back and forth), we aligned that before even doing starting the hudi point release we should first ensure this initial polaris side change is landed, as this change is not even dependent on any hudi release artifact and would allow confidence to even start the point hudi release.

This was already aligned between both communities hence why this discussion thread was started yesterday:https://lists.apache.org/thread/k524b5xq7l75tzz6sdzth15wjxdgp3gf for our hudi code freeze process, as we had obtained approval of a PMC and a committer for this PR.

cc @flyrain @gh-yzou @singhpk234

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with working in parallel with the Hudi community. This PR servers as the first step of the integration, and we file follow-up PRs once Hudi 1.0.3 is out. With that, I guess this doc section itself is not quite necessary. It's normal that a feature is split to multiple PRs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eric-maynard, @rahil-c is trying to parallelize the development work based on his POC, the hudi development is already in and under the release, meanwhile, @rahil-c is trying to start the necessary development wrok in Polaris. The Polaris Hudi support is still under development (which is what the readme change intended to say), and once the hudi plugin is released, @rahil-c will follow up with integration and regression tests, and claim hudi is supported with clear documentation.
If the readme description is confusing, we can remove it or make it more clear if that works?

Copy link
Member

@vinothchandar vinothchandar Jul 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi all - FWIW.. we have active work around this on the next Hudi patch release, based on the feedback from the polaris community. So we are aligned on the goals of making it work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

based on discussion between members of the Polaris community and Hudi community.

we aligned that before even doing starting the hudi point release we should first ensure this initial polaris side change is landed,

This was already aligned between both communities

Did this happen on a mailing list? Can you point me to that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's less that the README is confusing (as Yufei said, it's normal to merge things that are in-development within a project) and more that it's confusing why we would merge something into Polaris because we expect an upstream dependency to eventually change. Couldn't we just forego the README note and hold the Polaris change until the dependency is ready?

The Hudi community has made a change to integrate with Polaris, and is planning on doing a Hudi 1.0.3 minor release.
hudi-spark3.5-bundle_2.12:1.0.3 is required for hudi table support to work end to end, which is still under releasing.
Once the Hudi 1.0.3 release is out, we will update the documentation on how users can use the Polaris Spark client with Hudi tables
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ public Table loadTable(Identifier identifier) throws NoSuchTableException {
try {
GenericTable genericTable =
this.polarisCatalog.loadGenericTable(Spark3Util.identifierToTableIdentifier(identifier));
return PolarisCatalogUtils.loadSparkTable(genericTable);
if (PolarisCatalogUtils.useHudi(genericTable.getFormat())) {
return PolarisCatalogUtils.loadV1SparkTable(genericTable, identifier, name());
} else {
return PolarisCatalogUtils.loadV2SparkTable(genericTable);
}
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(identifier);
}
Expand Down Expand Up @@ -110,7 +114,11 @@ public Table createTable(
baseLocation,
null,
properties);
return PolarisCatalogUtils.loadSparkTable(genericTable);
if (PolarisCatalogUtils.useHudi(format)) {
return PolarisCatalogUtils.loadV1SparkTable(genericTable, identifier, name());
} else {
return PolarisCatalogUtils.loadV2SparkTable(genericTable);
}
} catch (AlreadyExistsException e) {
throw new TableAlreadyExistsException(identifier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iceberg.spark.SupportsReplaceView;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.polaris.spark.utils.DeltaHelper;
import org.apache.polaris.spark.utils.HudiHelper;
import org.apache.polaris.spark.utils.PolarisCatalogUtils;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class SparkCatalog
@VisibleForTesting protected org.apache.iceberg.spark.SparkCatalog icebergsSparkCatalog = null;
@VisibleForTesting protected PolarisSparkCatalog polarisSparkCatalog = null;
@VisibleForTesting protected DeltaHelper deltaHelper = null;
@VisibleForTesting protected HudiHelper hudiHelper = null;

@Override
public String name() {
Expand Down Expand Up @@ -133,6 +135,7 @@ public void initialize(String name, CaseInsensitiveStringMap options) {
this.catalogName = name;
initRESTCatalog(name, options);
this.deltaHelper = new DeltaHelper(options);
this.hudiHelper = new HudiHelper(options);
}

@Override
Expand All @@ -156,12 +159,16 @@ public Table createTable(
throw new UnsupportedOperationException(
"Create table without location key is not supported by Polaris. Please provide location or path on table creation.");
}

if (PolarisCatalogUtils.useDelta(provider)) {
// For delta table, we load the delta catalog to help dealing with the
// delta log creation.
TableCatalog deltaCatalog = deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog);
return deltaCatalog.createTable(ident, schema, transforms, properties);
} else if (PolarisCatalogUtils.useHudi(provider)) {
// For creating the hudi table, we load HoodieCatalog
// to create the .hoodie folder in cloud storage
TableCatalog hudiCatalog = hudiHelper.loadHudiCatalog(this.polarisSparkCatalog);
return hudiCatalog.createTable(ident, schema, transforms, properties);
} else {
return this.polarisSparkCatalog.createTable(ident, schema, transforms, properties);
}
Expand All @@ -182,8 +189,12 @@ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchT
// using ALTER TABLE ...SET LOCATION, and ALTER TABLE ... SET FILEFORMAT.
TableCatalog deltaCatalog = deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog);
return deltaCatalog.alterTable(ident, changes);
} else if (PolarisCatalogUtils.useHudi(provider)) {
TableCatalog hudiCatalog = hudiHelper.loadHudiCatalog(this.polarisSparkCatalog);
return hudiCatalog.alterTable(ident, changes);
} else {
return this.polarisSparkCatalog.alterTable(ident);
}
return this.polarisSparkCatalog.alterTable(ident);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.polaris.spark.utils;

import org.apache.iceberg.common.DynConstructors;
import org.apache.polaris.spark.PolarisSparkCatalog;
import org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

public class HudiHelper {
public static final String HUDI_CATALOG_IMPL_KEY = "hudi-catalog-impl";
private static final String DEFAULT_HUDI_CATALOG_CLASS =
"org.apache.spark.sql.hudi.catalog.HoodieCatalog";

private TableCatalog hudiCatalog = null;
private String hudiCatalogImpl = DEFAULT_HUDI_CATALOG_CLASS;

public HudiHelper(CaseInsensitiveStringMap options) {
if (options.get(HUDI_CATALOG_IMPL_KEY) != null) {
this.hudiCatalogImpl = options.get(HUDI_CATALOG_IMPL_KEY);
}
}

public TableCatalog loadHudiCatalog(PolarisSparkCatalog polarisSparkCatalog) {
if (this.hudiCatalog != null) {
return this.hudiCatalog;
}

DynConstructors.Ctor<TableCatalog> ctor;
try {
ctor = DynConstructors.builder(TableCatalog.class).impl(hudiCatalogImpl).buildChecked();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(
String.format("Cannot initialize Hudi Catalog %s: %s", hudiCatalogImpl, e.getMessage()),
e);
}

try {
this.hudiCatalog = ctor.newInstance();
} catch (ClassCastException e) {
throw new IllegalArgumentException(
String.format(
"Cannot initialize Hudi Catalog, %s does not implement Table Catalog.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this supposed to say TableCatalog?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me fix this to say TableCatalog, i believe had gotten it from the setup of DeltaHelper https://github.com/apache/polaris/blob/main/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java#L66

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rahil-c maybe fix both

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack will do so

hudiCatalogImpl),
e);
}

// set the polaris spark catalog as the delegate catalog of hudi catalog
// will be used in HoodieCatalog's loadTable
((DelegatingCatalogExtension) this.hudiCatalog).setDelegateCatalog(polarisSparkCatalog);
return this.hudiCatalog;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,23 @@
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.polaris.spark.rest.GenericTable;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.TableProvider;
import org.apache.spark.sql.execution.datasources.DataSource;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

public class PolarisCatalogUtils {
private static final Logger LOG = LoggerFactory.getLogger(PolarisCatalogUtils.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use LOGGER elsewhere

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it looks like 2 classes (both in the client) use LOG, hm. I wouldn't fix that here, but maybe just stick with LOGGER

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we have usages of LOG in the following code paths.

https://github.com/apache/polaris/blob/main/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java#L69

https://github.com/apache/polaris/blob/main/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java#L32

Personally Im not sure the naming between LOG and LOGGER makes a difference from either a functional or aesthetics perspective but I can make this change so we can move forward.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rahil-c will you be able to update the other two to LOGGER if we are changing this one?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure


public static final String TABLE_PROVIDER_KEY = "provider";
public static final String TABLE_PATH_KEY = "path";

Expand All @@ -50,6 +59,10 @@ public static boolean useDelta(String provider) {
return "delta".equalsIgnoreCase(provider);
}

public static boolean useHudi(String provider) {
return "hudi".equalsIgnoreCase(provider);
}

/**
* For tables whose location is manged by Spark Session Catalog, there will be no location or path
* in the properties.
Expand All @@ -61,16 +74,11 @@ public static boolean isTableWithSparkManagedLocation(Map<String, String> proper
}

/**
* Load spark table using DataSourceV2.
*
* @return V2Table if DataSourceV2 is available for the table format. For delta table, it returns
* DeltaTableV2.
* Normalize table properties for loading Spark tables by ensuring the TABLE_PATH_KEY is properly
* set. DataSourceV2 requires the path property on table loading.
*/
public static Table loadSparkTable(GenericTable genericTable) {
SparkSession sparkSession = SparkSession.active();
TableProvider provider =
DataSource.lookupDataSourceV2(genericTable.getFormat(), sparkSession.sessionState().conf())
.get();
private static Map<String, String> normalizeTablePropertiesForLoadSparkTable(
GenericTable genericTable) {
Map<String, String> properties = genericTable.getProperties();
boolean hasLocationClause = properties.get(TableCatalog.PROP_LOCATION) != null;
boolean hasPathClause = properties.get(TABLE_PATH_KEY) != null;
Expand All @@ -87,10 +95,80 @@ public static Table loadSparkTable(GenericTable genericTable) {
tableProperties.put(TABLE_PATH_KEY, properties.get(TableCatalog.PROP_LOCATION));
}
}
return tableProperties;
}

/**
* Load spark table using DataSourceV2.
*
* @return V2Table if DataSourceV2 is available for the table format. For delta table, it returns
* DeltaTableV2.
*/
public static Table loadV2SparkTable(GenericTable genericTable) {
SparkSession sparkSession = SparkSession.active();
TableProvider provider =
DataSource.lookupDataSourceV2(genericTable.getFormat(), sparkSession.sessionState().conf())
.get();
Map<String, String> tableProperties = normalizeTablePropertiesForLoadSparkTable(genericTable);
return DataSourceV2Utils.getTableFromProvider(
provider, new CaseInsensitiveStringMap(tableProperties), scala.Option.empty());
}

/**
* Return a Spark V1Table for formats that do not use DataSourceV2. Currently, this is being used
* for Hudi tables
*/
public static Table loadV1SparkTable(
GenericTable genericTable, Identifier identifier, String catalogName) {
Map<String, String> tableProperties = normalizeTablePropertiesForLoadSparkTable(genericTable);

// Need full identifier in order to construct CatalogTable
String namespacePath = String.join(".", identifier.namespace());
TableIdentifier tableIdentifier =
new TableIdentifier(
identifier.name(), Option.apply(namespacePath), Option.apply(catalogName));

scala.collection.immutable.Map<String, String> scalaOptions =
(scala.collection.immutable.Map<String, String>)
scala.collection.immutable.Map$.MODULE$.apply(
scala.collection.JavaConverters.mapAsScalaMap(tableProperties).toSeq());

org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat storage =
DataSource.buildStorageFormatFromOptions(scalaOptions);

// Currently Polaris generic table does not contain any schema information, partition columns,
// stats, etc
// for now we will just use fill the parameters we have from catalog, and let underlying client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use fill?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix

// resolve the rest within its catalog implementation
org.apache.spark.sql.types.StructType emptySchema = new org.apache.spark.sql.types.StructType();
scala.collection.immutable.Seq<String> emptyStringSeq =
scala.collection.JavaConverters.asScalaBuffer(new java.util.ArrayList<String>()).toList();
CatalogTable catalogTable =
new CatalogTable(
tableIdentifier,
CatalogTableType.EXTERNAL(),
storage,
emptySchema,
Option.apply(genericTable.getFormat()),
emptyStringSeq,
scala.Option.empty(),
genericTable.getProperties().get("owner"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This property currently isn't defined anywhere. Are you somehow setting it on writes?

If so, this should be a constant somewhere. If not, you should remove this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On my side, I am not explicitly setting this property on hudi side changes, or in the polaris changes. This seems to be coming from Spark engine itself setting this value in the properties map.

For example this property gets propagated during the Polaris SparkCatalog#createTable which overrides Spark's TableCatalog interface. If you try testing with Delta in the create table and examine the properties map.

Screenshot 2025-07-29 at 12 13 55 PM

You can see the owner for the table is already set, before we even make a createGenericTable request
Screenshot 2025-07-29 at 12 14 01 PM
The createGenericTableRequest will then take those properties, and ensure they get persisted in the GenericTable object on the catalog side.

If the ask is to just have this "owner" be a constant variable called public static final String OWNER = "owner"; I can do that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ask is that, ideally, we can re-use the existing constant (which based on your comment looks to be coming from here). Barring that, yes, please make a new constant.

System.currentTimeMillis(),
-1L,
"",
scalaOptions,
scala.Option.empty(),
scala.Option.empty(),
scala.Option.empty(),
emptyStringSeq,
false,
true,
scala.collection.immutable.Map$.MODULE$.empty(),
scala.Option.empty());

return new org.apache.spark.sql.connector.catalog.V1Table(catalogTable);
}

/**
* Get the catalogAuth field inside the RESTSessionCatalog used by Iceberg Spark Catalog use
* reflection. TODO: Deprecate this function once the iceberg client is updated to 1.9.0 to use
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.spark;

import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableChange;

/**
* This is a fake hudi catalog class that is used for testing. This class is a noop class that
* directly passes all calls to the delegate CatalogPlugin configured as part of
* DelegatingCatalogExtension.
*/
public class NoopHudiCatalog extends DelegatingCatalogExtension {

@Override
public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
return super.loadTable(ident);
}
}
Loading