[FLINK-38613][tests] Force number of splits in UnalignedCheckpointRescaleWithMixedExchangesITCase#27635
[FLINK-38613][tests] Force number of splits in UnalignedCheckpointRescaleWithMixedExchangesITCase#27635rkhachatryan wants to merge 1 commit intoapache:masterfrom
Conversation
|
|
||
| final List<NumberSequenceSplit> splits = | ||
| splitNumberRange(from, to, enumContext.currentParallelism()); | ||
| splitNumberRange(from, to, 10 /* enumContext.currentParallelism() */); |
There was a problem hiding this comment.
Thanks for the analysis. I reproduced and investigated it further.
The root cause is: on restore, the available splits come from the checkpoint state, not from the new parallelism. For example, first run with parallelism 3 → NumberSequenceSource creates 3 splits, all assigned immediately, checkpoint saves 0 remaining splits. Restore with parallelism 7 → restoreEnumerator() gets 0 splits from checkpoint, so 4 extra subtasks get NoMoreSplits and finish.
I like your Option 3 that mentioned in https://issues.apache.org/jira/browse/FLINK-38613, but scoped to the test only — no production code changes. Subclassed DataGeneratorSource and overrode createEnumerator() to always create at least MAX_SLOTS splits:
private static class TestDataGeneratorSource extends DataGeneratorSource<Long> {
TestDataGeneratorSource() {
super(index -> index, Long.MAX_VALUE, RateLimiterStrategy.perSecond(5000), Types.LONG);
}
@Override
public SplitEnumerator<NumberSequenceSplit, Collection<NumberSequenceSplit>>
createEnumerator(SplitEnumeratorContext<NumberSequenceSplit> enumContext) {
NumberSequenceSource source =
new NumberSequenceSource(0, Long.MAX_VALUE - 1) {
@Override
protected List<NumberSequenceSplit> splitNumberRange(
long from, long to, int numSplits) {
return super.splitNumberRange(
from, to, Math.max(numSplits, MAX_SLOTS));
}
};
return source.createEnumerator(enumContext);
}
}WDYT?
…caleWithMixedExchangesITCase Motivation: avoid sub-tasks finishing prematurely and fail the test assumption of all sub-tasks always running
| // force large number of splits to avoid sub-tasks finishing prematurely | ||
| // and failing the test assumption of all sub-tasks always running | ||
| return super.createEnumerator(new MockSplitEnumeratorContext<>(MAX_SLOTS)); |
There was a problem hiding this comment.
is it possible to only change the split number but parallelism?
This ITCase is testing the rescale case, fixed parallelism covers fewer cases.
There was a problem hiding this comment.
I don't think that SplitEnumerator is responsible for parallelism, is it?
There was a problem hiding this comment.
I think you are right. My concern is it will affect the semantics, the input parameter of MockSplitEnumeratorContext is parallelism, and interface name of SplitEnumeratorContext is currentParallelism. It might caused potential bug in the future.
For example, the parallelism will be used in other places as well, but our intention is that only changing split number.
Motivation: avoid sub-tasks finishing prematurely and fail the test assumption of all sub-tasks always running