1818
1919package org .apache .flink .api .connector .source .mocks ;
2020
21+ import org .apache .flink .api .connector .source .ReaderInfo ;
2122import org .apache .flink .api .connector .source .SourceEvent ;
2223import org .apache .flink .api .connector .source .SplitEnumerator ;
2324import org .apache .flink .api .connector .source .SplitEnumeratorContext ;
2829
2930import java .io .IOException ;
3031import java .util .ArrayList ;
32+ import java .util .Collection ;
3133import java .util .Collections ;
32- import java .util .Comparator ;
3334import java .util .HashMap ;
3435import java .util .HashSet ;
3536import java .util .List ;
3637import java .util .Map ;
3738import java .util .Set ;
38- import java .util .SortedSet ;
39- import java .util .TreeSet ;
39+ import java .util .stream .Collectors ;
4040
4141/** A mock {@link SplitEnumerator} for unit tests. */
4242public class MockSplitEnumerator
4343 implements SplitEnumerator <MockSourceSplit , Set <MockSourceSplit >>, SupportsBatchSnapshot {
44- private final SortedSet <MockSourceSplit > unassignedSplits ;
44+ // 扩成16个partition, unas
45+ private final Map <Integer , Set <MockSourceSplit >> pendingSplitAssignment ;
46+ private final Map <String , Integer > globalSplitAssignment ;
4547 private final SplitEnumeratorContext <MockSourceSplit > enumContext ;
4648 private final List <SourceEvent > handledSourceEvent ;
4749 private final List <Long > successfulCheckpoints ;
@@ -50,22 +52,24 @@ public class MockSplitEnumerator
5052
5153 public MockSplitEnumerator (int numSplits , SplitEnumeratorContext <MockSourceSplit > enumContext ) {
5254 this (new HashSet <>(), enumContext );
55+ List <MockSourceSplit > unassignedSplits = new ArrayList <>();
5356 for (int i = 0 ; i < numSplits ; i ++) {
5457 unassignedSplits .add (new MockSourceSplit (i ));
5558 }
59+ calculateAndPutPendingAssignments (unassignedSplits );
5660 }
5761
5862 public MockSplitEnumerator (
5963 Set <MockSourceSplit > unassignedSplits ,
6064 SplitEnumeratorContext <MockSourceSplit > enumContext ) {
61- this .unassignedSplits =
62- new TreeSet <>(Comparator .comparingInt (o -> Integer .parseInt (o .splitId ())));
63- this .unassignedSplits .addAll (unassignedSplits );
65+ this .pendingSplitAssignment = new HashMap <>();
66+ this .globalSplitAssignment = new HashMap <>();
6467 this .enumContext = enumContext ;
6568 this .handledSourceEvent = new ArrayList <>();
6669 this .successfulCheckpoints = new ArrayList <>();
6770 this .started = false ;
6871 this .closed = false ;
72+ calculateAndPutPendingAssignments (unassignedSplits );
6973 }
7074
7175 @ Override
@@ -83,25 +87,36 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
8387
8488 @ Override
8589 public void addSplitsBack (List <MockSourceSplit > splits , int subtaskId ) {
86- unassignedSplits .addAll (splits );
90+ // add back to same subtaskId.
91+ putPendingAssignments (subtaskId , splits );
8792 }
8893
8994 @ Override
9095 public void addReader (int subtaskId ) {
91- List <MockSourceSplit > assignment = new ArrayList <>();
92- for (MockSourceSplit split : unassignedSplits ) {
93- if (Integer .parseInt (split .splitId ()) % enumContext .currentParallelism () == subtaskId ) {
94- assignment .add (split );
96+ ReaderInfo readerInfo = enumContext .registeredReaders ().get (subtaskId );
97+ List <MockSourceSplit > splitsOnRecovery = readerInfo .getReportedSplitsOnRegistration ();
98+
99+ List <MockSourceSplit > redistributedSplits = new ArrayList <>();
100+ List <MockSourceSplit > addBackSplits = new ArrayList <>();
101+ for (MockSourceSplit split : splitsOnRecovery ) {
102+ if (!globalSplitAssignment .containsKey (split .splitId ())) {
103+ // if split not existed in globalSplitAssignment, mean that it's registered first
104+ // time, can be redistibuted.
105+ redistributedSplits .add (split );
106+ } else if (!globalSplitAssignment .containsKey (split .splitId ())) {
107+ // if split already is assigned to other substaskId, just ignore it. Otherwise,
108+ // addback to this subtaskId again.
109+ addBackSplits .add (split );
95110 }
96111 }
97- enumContext . assignSplits (
98- new SplitsAssignment <>( Collections . singletonMap ( subtaskId , assignment )) );
99- unassignedSplits . removeAll ( assignment );
112+ calculateAndPutPendingAssignments ( redistributedSplits );
113+ putPendingAssignments ( subtaskId , addBackSplits );
114+ assignAllSplits ( );
100115 }
101116
102117 @ Override
103118 public Set <MockSourceSplit > snapshotState (long checkpointId ) {
104- return unassignedSplits ;
119+ return getUnassignedSplits () ;
105120 }
106121
107122 @ Override
@@ -114,11 +129,6 @@ public void close() throws IOException {
114129 this .closed = true ;
115130 }
116131
117- public void addNewSplits (List <MockSourceSplit > newSplits ) {
118- unassignedSplits .addAll (newSplits );
119- assignAllSplits ();
120- }
121-
122132 // --------------------
123133
124134 public boolean started () {
@@ -130,7 +140,9 @@ public boolean closed() {
130140 }
131141
132142 public Set <MockSourceSplit > getUnassignedSplits () {
133- return unassignedSplits ;
143+ return pendingSplitAssignment .values ().stream ()
144+ .flatMap (Set ::stream )
145+ .collect (Collectors .toSet ());
134146 }
135147
136148 public List <SourceEvent > getHandledSourceEvent () {
@@ -145,17 +157,27 @@ public List<Long> getSuccessfulCheckpoints() {
145157
146158 private void assignAllSplits () {
147159 Map <Integer , List <MockSourceSplit >> assignment = new HashMap <>();
148- unassignedSplits .forEach (
149- split -> {
150- int subtaskId =
151- Integer .parseInt (split .splitId ()) % enumContext .currentParallelism ();
152- if (enumContext .registeredReaders ().containsKey (subtaskId )) {
153- assignment
154- .computeIfAbsent (subtaskId , ignored -> new ArrayList <>())
155- .add (split );
156- }
157- });
160+ for (Map .Entry <Integer , Set <MockSourceSplit >> iter : pendingSplitAssignment .entrySet ()) {
161+ Integer subtaskId = iter .getKey ();
162+ if (enumContext .registeredReaders ().containsKey (subtaskId )) {
163+ assignment .put (subtaskId , new ArrayList <>(iter .getValue ()));
164+ }
165+ }
158166 enumContext .assignSplits (new SplitsAssignment <>(assignment ));
159- assignment .values ().forEach (l -> unassignedSplits .removeAll (l ));
167+ assignment .keySet ().forEach (pendingSplitAssignment ::remove );
168+ }
169+
170+ private void calculateAndPutPendingAssignments (Collection <MockSourceSplit > newSplits ) {
171+ for (MockSourceSplit split : newSplits ) {
172+ int subtaskId = Integer .parseInt (split .splitId ()) % enumContext .currentParallelism ();
173+ putPendingAssignments (subtaskId , Collections .singletonList (split ));
174+ }
175+ }
176+
177+ private void putPendingAssignments (int subtaskId , Collection <MockSourceSplit > splits ) {
178+ Set <MockSourceSplit > pendingSplits =
179+ pendingSplitAssignment .computeIfAbsent (subtaskId , HashSet ::new );
180+ pendingSplits .addAll (splits );
181+ splits .forEach (split -> globalSplitAssignment .put (split .splitId (), subtaskId ));
160182 }
161183}
0 commit comments