Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ buildscript {
ext.hazelcastVersion = '5.3.6'
ext.ebeanVersion = '15.5.2'
ext.googleJavaFormatVersion = '1.18.1'
ext.openLineageVersion = '1.33.0'
ext.openLineageVersion = '1.38.0'
ext.logbackClassicJava8 = '1.2.12'
ext.awsSdk2Version = '2.30.33'
ext.micrometerVersion = '1.15.1'
Expand Down
7 changes: 7 additions & 0 deletions metadata-integration/java/acryl-spark-lineage/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Temporary upstream files - downloaded only when patching
patches/upstream-*/
patches/backup-*/

# Build artifacts
build/
.gradle/
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
java liberica-17.0.9+11
206 changes: 206 additions & 0 deletions metadata-integration/java/acryl-spark-lineage/CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
# acryl-spark-lineage Module

This module integrates OpenLineage with DataHub's Spark lineage collection. It contains shadowed and modified OpenLineage classes to support custom functionality.

## Architecture

This module:
- Builds shadow JARs for multiple Scala versions (2.12 and 2.13)
- Contains custom OpenLineage class implementations
- Depends on `io.openlineage:openlineage-spark` as specified by `ext.openLineageVersion` in the root `build.gradle`

## OpenLineage Version Upgrade Process

### Current Version

The OpenLineage version is defined in the root `build.gradle`:
```gradle
ext.openLineageVersion = '1.38.0'
```

**Last upgraded:** October 2, 2025 (from 1.33.0 to 1.38.0)

**Key changes in 1.38.0:**
- ✅ AWS Glue ARN handling now in upstream (was DataHub customization)
- ✅ `getMetastoreUri()` and `getWarehouseLocation()` now public upstream
- ✅ SaveIntoDataSourceCommandVisitor Delta table handling adopted upstream
- ⚡ Enhanced schema handling with nested structs, maps, and arrays
- ⚡ New UnionRDD and NewHadoopRDD extractors for improved path detection

### Architecture: Patch-Based Customization System

DataHub maintains customizations to OpenLineage through a **version-organized patch-based system**:

```
patches/
└── datahub-customizations/ # Versioned patch files (committed to git)
├── v1.38.0/ # Patches for OpenLineage 1.38.0
│ ├── Vendors.patch
│ ├── PathUtils.patch
│ └── ...
└── v1.33.0/ # Patches for OpenLineage 1.33.0 (historical)
└── ...

# Temporary files (NOT committed, downloaded when patching):
patches/upstream-<version>/ # Original OpenLineage files
patches/backup-<timestamp>/ # Automatic backups during upgrades
```

**Important**: Only `patches/datahub-customizations/` is version controlled. Each OpenLineage version has its own subdirectory (e.g., `v1.38.0/`) containing patches specific to that version. Upstream and backup files are temporary and excluded via `.gitignore`.

### Shadowed Classes Location

Custom OpenLineage implementations are in:
```
src/main/java/io/openlineage/
```

These files are OpenLineage source files with DataHub-specific modifications applied via patches.

### Known Customizations (v1.38.0)

Tracked via patch files in `patches/datahub-customizations/v1.38.0/`:

1. **`Vendors.patch`**: Adds `RedshiftVendor` to the vendors list
2. **`PathUtils.patch`**: "Table outside warehouse" symlink handling
3. **`PlanUtils.patch`**: Custom directory path handling with DataHub's HdfsPathDataset
4. **`RemovePathPatternUtils.patch`**: DataHub-specific PathSpec transformations
5. **`StreamingDataSourceV2RelationVisitor.patch`**: File-based streaming source support
6. **`WriteToDataSourceV2Visitor.patch`**: ForeachBatch streaming write support
7. **`MergeIntoCommandEdgeInputDatasetBuilder.patch`**: Delta Lake merge command complex subquery handling
8. **`MergeIntoCommandInputDatasetBuilder.patch`**: Enables recursive traversal for merge command subqueries
9. **`SparkOpenLineageExtensionVisitorWrapper.patch`**: Extension visitor customizations
10. **`RddPathUtils.patch`**: Debug log level for noise reduction (3 log statements changed from warn to debug)
11. **Redshift vendor** (`spark/agent/vendor/redshift/*`): Complete custom implementation (no upstream equivalent)

### Automated Upgrade Process

Use the automated upgrade script:

```bash
# Quick upgrade (recommended)
./scripts/upgrade-openlineage.sh 1.33.0 1.38.0

# This will:
# 1. Fetch new upstream files from GitHub
# 2. Compare old vs new upstream versions
# 3. Update shadowed files with new upstream code
# 4. Apply DataHub customizations via patches
# 5. Update build.gradle version
# 6. Report any conflicts requiring manual merge
```

### Manual Upgrade Steps

If you prefer manual control or need to resolve conflicts:

1. **Fetch upstream files**:
```bash
./scripts/fetch-upstream.sh 1.38.0
```

2. **Compare upstream changes** (optional):
```bash
# See what changed between versions
diff -r patches/upstream-1.33.0 patches/upstream-1.38.0
```

3. **Update source files**:
```bash
# Copy new upstream files
cp -r patches/upstream-1.38.0/* src/main/java/io/openlineage/
```

4. **Apply DataHub customizations**:
```bash
# Apply all patches for the target version
for patch in patches/datahub-customizations/v1.38.0/*.patch; do
echo "Applying $(basename $patch)..."
patch -p0 < "$patch" || echo "Conflict in $patch - manual merge required"
done
```

5. **Handle conflicts**:
- If patches fail, manually merge changes from:
- Backup: `patches/backup-<timestamp>/`
- New upstream: `patches/upstream-<new-version>/`
- Patches show what to customize: `patches/datahub-customizations/v<version>/`

6. **Regenerate patches** (if you manually merged):
```bash
./scripts/generate-patches.sh 1.38.0
```

7. **Update build.gradle**:
```gradle
ext.openLineageVersion = '1.38.0'
```

8. **Test thoroughly**:
```bash
./gradlew :metadata-integration:java:acryl-spark-lineage:build
./gradlew :metadata-integration:java:acryl-spark-lineage:test
```

### Understanding Patch Files

Patch files show exactly what DataHub customized:

```bash
# View a specific customization for v1.38.0
cat patches/datahub-customizations/v1.38.0/Vendors.patch

# Example output shows:
# - Lines removed from upstream (-)
# - Lines added by DataHub (+)
# - Context around changes
```

### Adding New Customizations

If you need to customize additional files:

1. Make your changes to files in `src/main/java/io/openlineage/`
2. Regenerate patches:
```bash
./scripts/generate-patches.sh <current-version>
```
3. The script will update `patches/datahub-customizations/` with your new changes

### Troubleshooting

**Patch conflicts during upgrade:**
- The upgrade script preserves backups in `patches/backup-<timestamp>/`
- Manually merge by comparing:
- Your backup (shows DataHub customizations)
- New upstream (shows OpenLineage changes)
- Existing patch (shows what customizations to preserve)

**Files that are entirely DataHub-specific:**
- `FileStreamMicroBatchStreamStrategy.java` - Custom file-based streaming
- Redshift vendor files - Complete custom Redshift support
- These have `.note` files instead of `.patch` files

**Debugging patches:**
```bash
# Dry-run to see if patch will apply cleanly
patch -p0 --dry-run < patches/datahub-customizations/v1.38.0/Vendors.patch

# See what a patch would change
patch -p0 --dry-run < patches/datahub-customizations/v1.38.0/Vendors.patch | less
```

### Debugging

To see resolved dependencies for each Scala version:
```bash
./gradlew :metadata-integration:java:acryl-spark-lineage:debugDependencies
```

## Build Tasks

- `./gradlew :metadata-integration:java:acryl-spark-lineage:build` - Build all shadow JARs
- `./gradlew :metadata-integration:java:acryl-spark-lineage:shadowJar_2_12` - Build Scala 2.12 JAR only
- `./gradlew :metadata-integration:java:acryl-spark-lineage:shadowJar_2_13` - Build Scala 2.13 JAR only
- `./gradlew :metadata-integration:java:acryl-spark-lineage:test` - Run unit tests
- `./gradlew :metadata-integration:java:acryl-spark-lineage:integrationTest` - Run integration tests
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Patch for DataHub customizations in MergeIntoCommandEdgeInputDatasetBuilder.java
# Upstream version: OpenLineage 1.38.0
# Generated: 2025-10-02 12:57:22 UTC
#
# To apply this patch to a new upstream version:
# patch -p0 < datahub-customizations/MergeIntoCommandEdgeInputDatasetBuilder.patch
#
--- /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/patches/upstream-1.38.0/spark3/agent/lifecycle/plan/MergeIntoCommandEdgeInputDatasetBuilder.java 2025-10-02 14:47:52.097747891 +0200
+++ /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/MergeIntoCommandEdgeInputDatasetBuilder.java 2025-09-12 19:50:04.457785281 +0200
@@ -48,9 +48,48 @@
inputs.addAll(delegate((LogicalPlan) o1, event));
}
if (o2 != null && o2 instanceof LogicalPlan) {
- inputs.addAll(delegate((LogicalPlan) o2, event));
+ List<InputDataset> sourceDatasets = delegate((LogicalPlan) o2, event);
+ inputs.addAll(sourceDatasets);
+
+ // Handle complex subqueries that aren't captured by standard delegation
+ if (sourceDatasets.isEmpty()) {
+ inputs.addAll(extractInputDatasetsFromComplexSource((LogicalPlan) o2, event));
+ }
}

return inputs;
}
+
+ /**
+ * Extracts input datasets from complex source plans like subqueries with DISTINCT, PROJECT, etc.
+ * This handles cases where the standard delegation doesn't work due to missing builders for
+ * intermediate logical plan nodes.
+ */
+ private List<InputDataset> extractInputDatasetsFromComplexSource(
+ LogicalPlan source, SparkListenerEvent event) {
+ List<InputDataset> datasets = new ArrayList<>();
+
+ // Use a queue to traverse the logical plan tree depth-first
+ java.util.Queue<LogicalPlan> queue = new java.util.LinkedList<>();
+ queue.offer(source);
+
+ while (!queue.isEmpty()) {
+ LogicalPlan current = queue.poll();
+
+ // Try to delegate this node directly
+ List<InputDataset> currentDatasets = delegate(current, event);
+ datasets.addAll(currentDatasets);
+
+ // If this node didn't produce any datasets, traverse its children
+ if (currentDatasets.isEmpty()) {
+ // Add all children to the queue for traversal
+ scala.collection.Iterator<LogicalPlan> children = current.children().iterator();
+ while (children.hasNext()) {
+ queue.offer(children.next());
+ }
+ }
+ }
+
+ return datasets;
+ }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Patch for DataHub customizations in MergeIntoCommandInputDatasetBuilder.java
# Upstream version: OpenLineage 1.38.0
# Generated: 2025-10-02 12:57:22 UTC
#
# To apply this patch to a new upstream version:
# patch -p0 < datahub-customizations/MergeIntoCommandInputDatasetBuilder.patch
#
--- /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/patches/upstream-1.38.0/spark3/agent/lifecycle/plan/MergeIntoCommandInputDatasetBuilder.java 2025-10-02 14:47:52.336608842 +0200
+++ /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/MergeIntoCommandInputDatasetBuilder.java 2025-09-12 19:50:04.457859239 +0200
@@ -20,7 +20,7 @@
extends AbstractQueryPlanInputDatasetBuilder<MergeIntoCommand> {

public MergeIntoCommandInputDatasetBuilder(OpenLineageContext context) {
- super(context, false);
+ super(context, true); // FIXED: This enables recursive traversal of subqueries
}

@Override
@@ -31,8 +31,54 @@
@Override
protected List<OpenLineage.InputDataset> apply(SparkListenerEvent event, MergeIntoCommand x) {
List<OpenLineage.InputDataset> datasets = new ArrayList<>();
- datasets.addAll(delegate(x.target(), event));
- datasets.addAll(delegate(x.source(), event));
+
+ // Process target table
+ List<OpenLineage.InputDataset> targetDatasets = delegate(x.target(), event);
+ datasets.addAll(targetDatasets);
+
+ // Process source - this will recursively process all datasets in the source plan,
+ // including those in subqueries
+ List<OpenLineage.InputDataset> sourceDatasets = delegate(x.source(), event);
+ datasets.addAll(sourceDatasets);
+
+ // Handle complex subqueries that aren't captured by standard delegation
+ if (sourceDatasets.isEmpty()) {
+ sourceDatasets.addAll(extractInputDatasetsFromComplexSource(x.source(), event));
+ datasets.addAll(sourceDatasets);
+ }
+
+ return datasets;
+ }
+
+ /**
+ * Extracts input datasets from complex source plans like subqueries with DISTINCT, PROJECT, etc.
+ * This handles cases where the standard delegation doesn't work due to missing builders for
+ * intermediate logical plan nodes.
+ */
+ private List<OpenLineage.InputDataset> extractInputDatasetsFromComplexSource(
+ LogicalPlan source, SparkListenerEvent event) {
+ List<OpenLineage.InputDataset> datasets = new ArrayList<>();
+
+ // Use a queue to traverse the logical plan tree depth-first
+ java.util.Queue<LogicalPlan> queue = new java.util.LinkedList<>();
+ queue.offer(source);
+
+ while (!queue.isEmpty()) {
+ LogicalPlan current = queue.poll();
+
+ // Try to delegate this node directly
+ List<OpenLineage.InputDataset> currentDatasets = delegate(current, event);
+ datasets.addAll(currentDatasets);
+
+ // If this node didn't produce any datasets, traverse its children
+ if (currentDatasets.isEmpty()) {
+ // Add all children to the queue for traversal
+ scala.collection.Iterator<LogicalPlan> children = current.children().iterator();
+ while (children.hasNext()) {
+ queue.offer(children.next());
+ }
+ }
+ }

return datasets;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Patch for DataHub customizations in PathUtils.java
# Upstream version: OpenLineage 1.38.0
# Generated: 2025-10-02 12:57:22 UTC
#
# To apply this patch to a new upstream version:
# patch -p0 < datahub-customizations/PathUtils.patch
#
--- /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/patches/upstream-1.38.0/spark/agent/util/PathUtils.java 2025-10-02 14:47:50.071931557 +0200
+++ /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PathUtils.java 2025-10-02 14:53:54.367355461 +0200
@@ -96,6 +96,11 @@
symlinkDataset =
Optional.of(
FilesystemDatasetUtils.fromLocationAndName(warehouseLocation.get(), tableName));
+ } else {
+ // Table is outside warehouse, but we create symlink to actual location + tableName
+ String tableName = nameFromTableIdentifier(catalogTable.identifier());
+ symlinkDataset =
+ Optional.of(FilesystemDatasetUtils.fromLocationAndName(locationUri, tableName));
}
}
}
Loading
Loading