Spark: Async Spark Micro Batch Planner#15059
Conversation
|
cc @bryanck |
|
|
||
| class StreamingOffset extends Offset { | ||
| static final StreamingOffset START_OFFSET = new StreamingOffset(-1L, -1, false); | ||
| public class StreamingOffset extends Offset { |
There was a problem hiding this comment.
Does this need to be public?
| * @param snapshotTotalRows Total rows in the snapshot | ||
| */ | ||
| StreamingOffset(long snapshotId, long position, boolean scanAllFiles) { | ||
| public StreamingOffset( |
There was a problem hiding this comment.
Also here, curious why this needs to be public. Also below there are a few static methods that were made public.
There was a problem hiding this comment.
Good catch, I originally had it to match the public interface SparkMicroBatchPlanner, only to realize all of them are implementation details. Reverting and making the planners package private as well
| this.minQueuedRows = readConf.maxRecordsPerMicroBatch(); | ||
| this.readConf = readConf; | ||
| this.lastOffsetForTriggerAvailableNow = lastOffsetForTriggerAvailableNow; | ||
| this.planFilesCache = Caffeine.newBuilder().maximumSize(10).build(); |
There was a problem hiding this comment.
nit: make 10 a constant or configurable
| this::refreshAndTrapException, pollingIntervalMs, pollingIntervalMs, TimeUnit.MILLISECONDS); | ||
| // Schedule queue fill to run frequently (use polling interval for tests, cap at 100ms for | ||
| // production) | ||
| long queueFillIntervalMs = Math.min(100L, pollingIntervalMs); |
There was a problem hiding this comment.
nit: make 100L a constant or configurable
| private final long position; | ||
| private final boolean scanAllFiles; | ||
| private final long snapshotTimestampMillis; | ||
| private final long snapshotTotalRows; |
There was a problem hiding this comment.
I wasn't clear why these need to be added, given we already have the snapshot ID?
There was a problem hiding this comment.
Reverted all changes to StreamingOffset
|
There is a fair amount of duplicated code between the existing microbatch planner and this, I'm wondering if there are opportunities for reuse. |
97cfdbf to
0cfb405
Compare
Moved duplicated code out to BaseSparkMicroBatchPlanner abstract class that both Planners now extend. |
d31b909 to
d7f2883
Compare
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java
Outdated
Show resolved
Hide resolved
|
This should be added to the latest Spark version. Once that is merged, we'll backport the changes. |
|
|
||
| interface SparkMicroBatchPlanner { | ||
| List<FileScanTask> planFiles(StreamingOffset start, StreamingOffset end) | ||
| throws ExecutionException; |
There was a problem hiding this comment.
I feel ExecutionException should not be part of the method signature, the implementation can throw an unchecked exception.
| List<FileScanTask> fileScanTasks; | ||
| try { | ||
| fileScanTasks = microBatchPlanner.planFiles(startOffset, endOffset); | ||
| } catch (ExecutionException e) { |
There was a problem hiding this comment.
We can move exception handling to implementation if we make the above change to the interface.
| return ((ReadMaxFiles) limit).maxFiles(); | ||
| } | ||
| @Override | ||
| public Map<String, String> metrics(Optional<Offset> latestConsumedOffset) { |
There was a problem hiding this comment.
It seems like this is only useful for the async planner, for the sync planner it will report the same thing.
There was a problem hiding this comment.
Any recommendations on how to isolate it to Async Planner? Else I will remove it, and users can rely on logging instead, which will minimize implementing on ReportsSourceMetrics Interface
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java
Show resolved
Hide resolved
|
If you have any benchmarks or metrics that show the benefit of this, that would be helpful. |
| List<FileScanTask> fileScanTasks = Lists.newArrayList(); | ||
| StreamingOffset batchStartOffset = | ||
| StreamingOffset.START_OFFSET.equals(start) | ||
| ? SparkMicroBatchStream.determineStartingOffset(table(), fromTimestamp) |
There was a problem hiding this comment.
There is a circular dependency here, it would be better to have the planners not rely on SparkMicroBatchStream.
| } | ||
|
|
||
| private static StreamingOffset determineStartingOffset(Table table, Long fromTimestamp) { | ||
| static StreamingOffset determineStartingOffset(Table table, long fromTimestamp) { |
There was a problem hiding this comment.
IMO we should move this somewhere else so the planners don't need to reference SparkMicroBatchStream.
@bryanck Would it be ok to move these files to 4.1 in this PR? Or would you prefer having a separate PR there and moving conversation and review to that PR? |
d7f2883 to
f5e7c6b
Compare
|
commit f5e7c6b Spark 3.5: Address circular dep, unneeded code: Addresses all the review comments about circular dependency, executionexception and possibly unnecessary metrics call commit [e079fd3] Spark: Move feature to latest Spark: Addresses comment about making this feature for latest / current Spark, then separate PR for backporting cc @bryanck The only remaining comment I did not address for code review was #15059 (comment), to discuss. |
|
Sorry the branch name has 3-5 in it. But change PR to now target 4.1. |
| // Data appended after the timestamp should appear | ||
| appendData(data); | ||
| // Allow async background thread to refresh, else test sometimes fails | ||
| Thread.sleep(50); |
There was a problem hiding this comment.
I'm open to other suggestions. This test sometimes fails, because background thread runs and isn't able to refresh before test finishes executing. Hence I added sleep here to wait before test finishes. In real use case, user should not have issues with this.
It failed for me locally once before this, but ran multiple times without it failing.
I don't have the production metrics or benchmarks, but I took a stab at trying to create a JMH benchmark on latestOffset() calls with different snapshot numbers. Hopefully, this showcases sync planner scales linearly to number of snapshots whereas async planner is constant, but trading off detection latency and memory usage. It is great for tables with large number of snapshots and where processing time takes long enough for background thread to poll. Here is the link to it: RjLi13#2 |
|
nit: I feel the refactor could have reused the existing code a little bit better, i.e. renamed the existing class to be the base class first, that would help reduce the size of the PR |
This feature was originally built by Drew Goya <dgoya@netflix.com> for Spark 3.3 and Iceberg 1.4.
7292949 to
4a102ab
Compare
Implements a new feature for Spark Structured Streaming and Iceberg users known as Async Spark Micro Batch Planner
Currently Microbatch planning in Iceberg is synchronous. Streaming queries plan out what batches to read and how many rows / files in each batch. Then it processes the data and repeats. By introducing an async planner, it improves streaming performance by pre-fetching table metadata and file scan tasks in a background thread, reducing micro-batch planning latency. This way planning can overlap with data processing and speed up dealing with large volumes.
This PR adds the option for users to set
spark.sql.iceberg.async-micro-batch-planning-enabledif they want to use async planning. The code in SparkMicroBatchStream.java is moved to SyncSparkMicroBatchPlanner.java and SparkMicroBatchStream configures which planner to use. This option is defaulted to false, so existing behavior is unchanged.This feature was originally authored by Drew Goya in our Netflix fork for Spark 3.3 & Iceberg 1.4. I built upon Drew's work by porting this to Spark
3.54.1 and current Iceberg version.Changes
AsyncSparkMicroBatchPlannerthat queues file scan tasks asynchronouslySyncSparkMicroBatchPlannerSparkMicroBatchPlannerinterface for both implementationsSparkMicroBatchStreamnow selects planner based on configurationBaseSparkMicroBatchPlannerto dedupe code between Sync and Async Planners