-
Notifications
You must be signed in to change notification settings - Fork 4.4k
[Managed Iceberg] unbounded source #33504
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
ahmedabu98
merged 49 commits into
apache:master
from
ahmedabu98:iceberg_streaming_source
Mar 20, 2025
Merged
Changes from 34 commits
Commits
Show all changes
49 commits
Select commit
Hold shift + click to select a range
bb87511
initial
ahmedabu98 853de4d
let CombinedScanTask do splitting (based on Parquet row groups)
ahmedabu98 69fd988
perf improv
ahmedabu98 da2f33f
create one read task descriptor per snapshot range
ahmedabu98 73c8992
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 81ca709
some improvements
ahmedabu98 e319d76
use GiB for streaming, Redistribute for batch; update docs
ahmedabu98 c25cd75
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 af1ec85
use static value
ahmedabu98 f5d3268
add some test
ahmedabu98 df40239
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 43ab88f
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 20db0ee
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 622625f
add a java doc; don't use static block to create coder
ahmedabu98 4c25d3f
spotless
ahmedabu98 8666166
add options: from/to timestamp, starting strategy, and streaming toggle
ahmedabu98 297c309
trigger integration tests
ahmedabu98 5e3a2cc
small test fix
ahmedabu98 8b131fd
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 887eff1
scan every snapshot individually; use snapshot commit timestamp to ma…
ahmedabu98 6cfc2d8
new schematransform for cdc streaming; add watermark configs
ahmedabu98 fbad86e
cleanup
ahmedabu98 50f9497
add guava import
ahmedabu98 4f1f40b
remove iceberg_cdc_read from xlang auto-wrapper gen
ahmedabu98 633365c
fix javadoc
ahmedabu98 37485f1
cleanup
ahmedabu98 4ede0e8
spotless
ahmedabu98 db9fd63
use CDC schema for batch and streaming; re-introduce boolean 'streami…
ahmedabu98 79ab16a
add to CHANGES.md and discussion docs
ahmedabu98 06a4cee
spotless
ahmedabu98 132034f
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 795c87c
address review comments about java docs
ahmedabu98 c6461c9
remove raw guava dep
ahmedabu98 7dbf3e1
add another test for read utils
ahmedabu98 5263a13
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 40fe4ab
use cached schemas
ahmedabu98 db3c570
watermark based on snapshot timestamp; remove infinite allowed skew; …
ahmedabu98 7078f20
remove check
ahmedabu98 fce87dc
remove watermark support; add a counter and some helpful logging
ahmedabu98 adbbfaf
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 30f3ced
change metric name
ahmedabu98 078b63a
use Redistribute for unbounded; make ReadFromTasks an SDF to leverage…
ahmedabu98 941e8aa
remove CDC schema and output the data record normally
ahmedabu98 44923f3
use LoadingCache; leverage iceberg bin-packing; address other comments
ahmedabu98 1c0f7d7
spotless
ahmedabu98 7f12282
validate catalog cache consistency; added a TODO
ahmedabu98 17816bd
Merge branch 'master' into iceberg_streaming_source
ahmedabu98 24d1782
trigger integration tests
ahmedabu98 3b8bac1
Merge branch 'iceberg_streaming_source' of https://github.com/ahmedab…
ahmedabu98 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
{ | ||
"comment": "Modify this file in a trivial way to cause this test suite to run.", | ||
"modification": 2 | ||
"modification": 3 | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,5 +15,9 @@ limitations under the License. | |
# List Of Documents Submitted To [email protected] In 2025 | ||
| No. | Author | Subject | Date (UTC) | | ||
|---|---|---|---| | ||
| 1 | Danny McCormick | [Beam Python Dependency Extras](https://docs.google.com/document/d/1c84Gc-cZRCfrU8f7kWGsNR2o8oSRjCM-dGHO9KvPWPw) | 2025-01-27 17:50:00 | | ||
| 2 | Danny McCormick | [How vLLM Model Handler Works (Plus a Summary of Model Memory Management in Beam ML)](https://docs.google.com/document/d/1UB4umrtnp1Eg45fiUB3iLS7kPK3BE6pcf0YRDkA289Q) | 2025-01-31 17:50:00 | | ||
| 1 | Kenneth Knowles | [Apache Beam Release Acceptance Criteria - Google Sheets](https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw) | 2025-01-13 10:54:22 | | ||
| 2 | Danny McCormick | [Apache Beam Vendored Dependencies Release Guide](https://s.apache.org/beam-release-vendored-artifacts) | 2025-01-13 15:00:51 | | ||
| 3 | Danny McCormick | [Beam Python & ML Dependency Extras](https://docs.google.com/document/d/1c84Gc-cZRCfrU8f7kWGsNR2o8oSRjCM-dGHO9KvPWPw) | 2025-01-27 15:33:36 | | ||
| 4 | Danny McCormick | [How vLLM Model Handler Works (Plus a Summary of Model Memory Management in Beam ML)](https://docs.google.com/document/d/1UB4umrtnp1Eg45fiUB3iLS7kPK3BE6pcf0YRDkA289Q) | 2025-01-31 11:56:59 | | ||
| 5 | Shunping Huang | [Improve Logging Dependencies in Beam Java SDK](https://docs.google.com/document/d/1IkbiM4m8D-aB3NYI1aErFZHt6M7BQ-8eCULh284Davs) | 2025-02-04 15:13:14 | | ||
| 6 | Ahmed Abualsaud | [Iceberg Incremental Source design](https://s.apache.org/beam-iceberg-incremental-source) | 2025-03-03 14:52:42 | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
122 changes: 122 additions & 0 deletions
122
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/CreateReadTasksDoFn.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.beam.sdk.io.iceberg; | ||
|
||
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; | ||
|
||
import java.io.IOException; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.concurrent.ExecutionException; | ||
import org.apache.beam.sdk.metrics.Counter; | ||
import org.apache.beam.sdk.metrics.Metrics; | ||
import org.apache.beam.sdk.transforms.DoFn; | ||
import org.apache.beam.sdk.values.KV; | ||
import org.apache.iceberg.CombinedScanTask; | ||
import org.apache.iceberg.DataOperations; | ||
import org.apache.iceberg.FileScanTask; | ||
import org.apache.iceberg.IncrementalAppendScan; | ||
import org.apache.iceberg.ScanTaskParser; | ||
import org.apache.iceberg.Table; | ||
import org.apache.iceberg.io.CloseableIterable; | ||
import org.checkerframework.checker.nullness.qual.Nullable; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* Scans the given snapshot and creates multiple {@link ReadTask}s. Each task represents a portion | ||
* of a data file that was appended within the snapshot range. | ||
*/ | ||
class CreateReadTasksDoFn | ||
extends DoFn<KV<String, List<SnapshotInfo>>, KV<ReadTaskDescriptor, ReadTask>> { | ||
private static final Logger LOG = LoggerFactory.getLogger(CreateReadTasksDoFn.class); | ||
private static final Counter totalScanTasks = | ||
Metrics.counter(CreateReadTasksDoFn.class, "totalScanTasks"); | ||
// TODO(ahmedabu98): should we expose a metric that tracks the latest observed snapshot sequence | ||
// number? | ||
|
||
private final IcebergScanConfig scanConfig; | ||
private final @Nullable String watermarkColumnName; | ||
|
||
CreateReadTasksDoFn(IcebergScanConfig scanConfig) { | ||
this.scanConfig = scanConfig; | ||
this.watermarkColumnName = scanConfig.getWatermarkColumn(); | ||
} | ||
|
||
@ProcessElement | ||
public void process( | ||
@Element KV<String, List<SnapshotInfo>> element, | ||
OutputReceiver<KV<ReadTaskDescriptor, ReadTask>> out) | ||
throws IOException, ExecutionException { | ||
Table table = TableCache.get(element.getKey(), scanConfig.getCatalogConfig().catalog()); | ||
List<SnapshotInfo> snapshots = element.getValue(); | ||
|
||
// scan snapshots individually and assign commit timestamp to files | ||
for (SnapshotInfo snapshot : snapshots) { | ||
@Nullable Long fromSnapshot = snapshot.getParentId(); | ||
long toSnapshot = snapshot.getSnapshotId(); | ||
|
||
if (!DataOperations.APPEND.equals(snapshot.getOperation())) { | ||
LOG.info( | ||
"Skipping non-append snapshot of operation '{}'. Sequence number: {}, id: {}", | ||
snapshot.getOperation(), | ||
snapshot.getSequenceNumber(), | ||
snapshot.getSnapshotId()); | ||
} | ||
|
||
LOG.info("Planning to scan snapshot id range ({}, {}]", fromSnapshot, toSnapshot); | ||
IncrementalAppendScan scan = table.newIncrementalAppendScan().toSnapshot(toSnapshot); | ||
if (fromSnapshot != null) { | ||
scan = scan.fromSnapshotExclusive(fromSnapshot); | ||
} | ||
if (watermarkColumnName != null) { | ||
scan = scan.includeColumnStats(Collections.singletonList(watermarkColumnName)); | ||
} | ||
|
||
createAndOutputReadTasks(scan, snapshot, out); | ||
} | ||
} | ||
|
||
private void createAndOutputReadTasks( | ||
IncrementalAppendScan scan, | ||
SnapshotInfo snapshot, | ||
OutputReceiver<KV<ReadTaskDescriptor, ReadTask>> out) | ||
throws IOException { | ||
try (CloseableIterable<CombinedScanTask> combinedScanTasks = scan.planTasks()) { | ||
for (CombinedScanTask combinedScanTask : combinedScanTasks) { | ||
// A single DataFile can be broken up into multiple FileScanTasks | ||
for (FileScanTask fileScanTask : combinedScanTask.tasks()) { | ||
ReadTask task = | ||
ReadTask.builder() | ||
.setFileScanTaskJson(ScanTaskParser.toJson(fileScanTask)) | ||
.setByteSize(fileScanTask.file().fileSizeInBytes()) | ||
.setOperation(snapshot.getOperation()) | ||
.setSnapshotTimestampMillis(snapshot.getTimestampMillis()) | ||
.build(); | ||
ReadTaskDescriptor descriptor = | ||
ReadTaskDescriptor.builder() | ||
.setTableIdentifierString(checkStateNotNull(snapshot.getTableIdentifierString())) | ||
.build(); | ||
|
||
out.output(KV.of(descriptor, task)); | ||
totalScanTasks.inc(); | ||
ahmedabu98 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
} | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.