Spark 4.1: Refactor SparkMicroBatchStream to SyncPlanner#15298
Spark 4.1: Refactor SparkMicroBatchStream to SyncPlanner#15298bryanck merged 9 commits intoapache:mainfrom
Conversation
|
cc @bryanck |
|
LGTM! |
|
I know the original didn't have unit tests, but it might be nice to add a few, for some of the utility methods at least. |
Fix Spotless
fce3955 to
4c0c940
Compare
|
@bryanck Added short unit test in new test file TestMicroBatchPlanningUtils. Most functionality is already tested in TestStructuredStreaming3, these are additional sanity checks and testing UnpackedLimits which is somewhat new. |
| * Get the next snapshot skiping over rewrite and delete snapshots. For Async handles nulls, sync | ||
| * will never have nulls |
There was a problem hiding this comment.
There is no Async Planning yet, lets include this when we add support for Async ?
There was a problem hiding this comment.
To clarify the ask, I should revert nextValidSnapshot to its moved state without the async changes and mention of async in comments, essentially removing this check here
if (curSnapshot == null) {
StreamingOffset startingOffset =
MicroBatchUtils.determineStartingOffset(table, readConf.streamFromTimestamp());
LOG.debug("determineStartingOffset picked startingOffset: {}", startingOffset);
if (StreamingOffset.START_OFFSET.equals(startingOffset)) {
return null;
}
nextSnapshot = table.snapshot(startingOffset.snapshotId());
} else {
if (curSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) {
return null;
}
| } | ||
| // skip over rewrite and delete snapshots | ||
| while (!shouldProcess(nextSnapshot)) { | ||
| LOG.debug("Skipping snapshot: {}", nextSnapshot); |
There was a problem hiding this comment.
minor : nice to log snapshot ops type
There was a problem hiding this comment.
Actually I don't think there's a need to log ops type since it should be logged already. Since we are logging the entire snapshot and so far BaseSnapshot is the one that implements Snapshot with a toString method including operation type
public String toString() {
return MoreObjects.toStringHelper(this)
.add("id", snapshotId)
.add("timestamp_ms", timestampMillis)
.add("operation", operation)
.add("summary", summary)
.add("manifest-list", manifestListLocation)
.add("schema-id", schemaId)
.toString();
}
There was a problem hiding this comment.
In that case it nice :)
| // skip over rewrite and delete snapshots | ||
| while (!shouldProcess(nextSnapshot)) { | ||
| LOG.debug("Skipping snapshot: {}", nextSnapshot); | ||
| // if the currentSnapShot was also the mostRecentSnapshot then break |
There was a problem hiding this comment.
it would be nice to add a comment explaining why ?
There was a problem hiding this comment.
This is old code, but I will enhance the comment with we are breaking to avoid snapshotAfter throwing exception since there is no more snapshots to process.
| } | ||
|
|
||
| if (fromTimestamp == Long.MIN_VALUE) { | ||
| // match existing behavior and start from the oldest snapshot |
There was a problem hiding this comment.
Explain why ?
| // match existing behavior and start from the oldest snapshot | |
| // start from the oldest snapshot |
There was a problem hiding this comment.
@singhpk234 to clarify, are you asking about the change from null check to Long.MIN_VALUE and why determineStartingOffset takes in now primitive long?
I saw that readConf().streamFromTimestamp() returns primitive long so it didn't make sense to me why we needed to autobox it. Or was your comment about something else (elaborating on the old comment more?)
There was a problem hiding this comment.
added a comment why we returning oldest snapshot
| // If snapshotSummary doesn't have SnapshotSummary.ADDED_FILES_PROP, | ||
| // iterate through addedFiles iterator to find addedFilesCount. |
There was a problem hiding this comment.
Lets remove this comment, it obvious, i understand its from old code :)
| List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffset endOffset); | ||
|
|
||
| StreamingOffset latestOffset(StreamingOffset startOffset, ReadLimit limit); | ||
|
|
||
| void stop(); |
There was a problem hiding this comment.
can you please added java docs for this ?
|
@singhpk234 addressed comments, ptal, ty! |
|
Thanks for the contribution @RjLi13 and for the review @singhpk234 ! |
This is to prepare for changes made to introduce async planner: #15059. The full context of the feature is in there.
This first phase focuses on just moving SparkMicroBatchStream logic to SyncSparkMicroBatchPlanner and having SparkMicroBatchStream rely on SyncSparkMicroBatchPlanner. I also introduce two new classes besides Sync and interface,
MicroBatchUtilswhich shares static methods between the planners andSparkMicroBatchStreamBaseSparkMicroBatchPlannerwhich shares duplicated code that will be in future reused with async PlannerPhase 2 PR is here: #15299. For reference on the changes in phase 2, this is what that diff looks like. https://github.com/RjLi13/iceberg/pull/6/changes
No regression should be expected. Unfortunately git diff can't show the moves, but this PR should be mostly moving code around.