Skip to content

Conversation

mbutrovich
Copy link
Contributor

@mbutrovich mbutrovich commented Sep 23, 2025

Which issue does this PR close?

Closes #.

Rationale for this change

We want to add Parquet Module Encryption support for the native readers when using a Spark KMS. We use the encryption factory features added in DataFusion 50 to register an encryption factory that uses JNI to get decryption keys from Spark.

What changes are included in this PR?

How are these changes tested?

  • Existing PME tests with new readers added.
  • New tests that exercise PME options like plaintext footer, etc.

@mbutrovich mbutrovich changed the title feat: Parquet Modular Encryption support for native_datafusion and native_iceberg_compat readers feat: Parquet Modular Encryption with Spark KMS for native_datafusion and native_iceberg_compat readers Sep 23, 2025
@mbutrovich mbutrovich changed the title feat: Parquet Modular Encryption with Spark KMS for native_datafusion and native_iceberg_compat readers feat: Parquet Modular Encryption with Spark KMS for native readers Sep 23, 2025
@codecov-commenter
Copy link

codecov-commenter commented Sep 23, 2025

Codecov Report

❌ Patch coverage is 36.78161% with 55 lines in your changes missing coverage. Please review.
✅ Project coverage is 58.28%. Comparing base (f09f8af) to head (e9fcca7).
⚠️ Report is 562 commits behind head on main.

Files with missing lines Patch % Lines
...rg/apache/comet/parquet/CometFileKeyUnwrapper.java 0.00% 18 Missing ⚠️
...a/org/apache/comet/parquet/CometParquetUtils.scala 0.00% 15 Missing ⚠️
...ain/scala/org/apache/comet/CometExecIterator.scala 33.33% 7 Missing and 1 partial ⚠️
...va/org/apache/comet/parquet/NativeBatchReader.java 0.00% 5 Missing ⚠️
...n/scala/org/apache/spark/sql/comet/operators.scala 80.76% 3 Missing and 2 partials ⚠️
...n/scala/org/apache/comet/rules/CometScanRule.scala 42.85% 3 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #2447      +/-   ##
============================================
+ Coverage     56.12%   58.28%   +2.16%     
- Complexity      976     1436     +460     
============================================
  Files           119      147      +28     
  Lines         11743    13567    +1824     
  Branches       2251     2360     +109     
============================================
+ Hits           6591     7908    +1317     
- Misses         4012     4428     +416     
- Partials       1140     1231      +91     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@parthchandra
Copy link
Contributor

@mbutrovich mbutrovich marked this pull request as ready for review September 26, 2025 20:31
# Conflicts:
#	spark/src/main/scala/org/apache/comet/CometExecIterator.scala
// Each hadoopConf yields a unique DecryptionPropertiesFactory. While it's unlikely that
// this Comet plan contains more than one hadoopConf, we don't want to assume that. So we'll
// provide the ability to cache more than one Factory with a map.
private final ConcurrentHashMap<Configuration, DecryptionPropertiesFactory> factoryCache =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is only one hadoop conf in a spark session so this may be overkill.

Copy link
Contributor Author

@mbutrovich mbutrovich Sep 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Session hadoopConf is not what the scans use though. They add all the relation options (Parquet options like encryption keys) to the hadoopConf, so each scan can have a unique hadoopConf. Whether we could have a Comet plan with multiple Parquet scans is the real question.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether we could have a Comet plan with multiple Parquet scans is the real question.

I don't know what you mean by this. What exactly are you calling a Parquet scan?

Copy link
Contributor Author

@mbutrovich mbutrovich Oct 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A scan node in a plan tree, specifically a stage that gets converted to a Comet native plan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll simplify this with a couple of assertions that a Comet plan should only have one scan node in it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean now. What about a plan with a union ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mbutrovich I had this one open question about a plan with a union that may have more than one scan. Can you verify this will not be an issue.

@mbutrovich
Copy link
Contributor Author

Results attached from the benchmark I added to CometReadBenchmark, and a small chart with highlights to see what the overhead of encryption is for the various readers.

decryption

benchmark_decryption.txt

// Each hadoopConf yields a unique DecryptionPropertiesFactory. While it's unlikely that
// this Comet plan contains more than one hadoopConf, we don't want to assume that. So we'll
// provide the ability to cache more than one Factory with a map.
private final ConcurrentHashMap<Configuration, DecryptionPropertiesFactory> factoryCache =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether we could have a Comet plan with multiple Parquet scans is the real question.

I don't know what you mean by this. What exactly are you calling a Parquet scan?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants