26
26
import org .apache .kafka .clients .producer .Producer ;
27
27
import org .apache .kafka .common .KafkaException ;
28
28
import org .apache .kafka .common .TopicPartition ;
29
+ import org .apache .kafka .common .config .ConfigDef ;
30
+ import org .apache .kafka .common .config .ConfigDef .Type ;
29
31
import org .apache .kafka .common .metrics .Metrics ;
30
32
import org .apache .kafka .common .metrics .Sensor ;
31
33
import org .apache .kafka .common .metrics .stats .Avg ;
@@ -197,6 +199,7 @@ private synchronized void setStateWhenNotInPendingShutdown(final State newState)
197
199
private final Map <TaskId , StreamTask > suspendedTasks ;
198
200
private final Map <TaskId , StandbyTask > suspendedStandbyTasks ;
199
201
private final Time time ;
202
+ private final int rebalanceTimeoutMs ;
200
203
private final long pollTimeMs ;
201
204
private final long cleanTimeMs ;
202
205
private final long commitTimeMs ;
@@ -290,6 +293,8 @@ public StreamThread(TopologyBuilder builder,
290
293
this .standbyRecords = new HashMap <>();
291
294
292
295
this .stateDirectory = new StateDirectory (applicationId , config .getString (StreamsConfig .STATE_DIR_CONFIG ), time );
296
+ final Object maxPollInterval = consumerConfigs .get (ConsumerConfig .MAX_POLL_INTERVAL_MS_CONFIG );
297
+ this .rebalanceTimeoutMs = (Integer ) ConfigDef .parseType (ConsumerConfig .MAX_POLL_INTERVAL_MS_CONFIG , maxPollInterval , Type .INT );
293
298
this .pollTimeMs = config .getLong (StreamsConfig .POLL_MS_CONFIG );
294
299
this .commitTimeMs = config .getLong (StreamsConfig .COMMIT_INTERVAL_MS_CONFIG );
295
300
this .cleanTimeMs = config .getLong (StreamsConfig .STATE_CLEANUP_DELAY_MS_CONFIG );
@@ -855,7 +860,7 @@ private void closeNonAssignedSuspendedStandbyTasks() {
855
860
}
856
861
}
857
862
858
- private void addStreamTasks (Collection <TopicPartition > assignment ) {
863
+ private void addStreamTasks (Collection <TopicPartition > assignment , final long start ) {
859
864
if (partitionAssignor == null )
860
865
throw new IllegalStateException (logPrefix + " Partition assignor has not been initialized while adding stream tasks: this should not happen." );
861
866
@@ -893,7 +898,7 @@ private void addStreamTasks(Collection<TopicPartition> assignment) {
893
898
894
899
// create all newly assigned tasks (guard against race condition with other thread via backoff and retry)
895
900
// -> other thread will call removeSuspendedTasks(); eventually
896
- taskCreator .retryWithBackoff (newTasks );
901
+ taskCreator .retryWithBackoff (newTasks , start );
897
902
}
898
903
899
904
StandbyTask createStandbyTask (TaskId id , Collection <TopicPartition > partitions ) {
@@ -910,7 +915,7 @@ StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions)
910
915
}
911
916
}
912
917
913
- private void addStandbyTasks () {
918
+ private void addStandbyTasks (final long start ) {
914
919
if (partitionAssignor == null )
915
920
throw new IllegalStateException (logPrefix + " Partition assignor has not been initialized while adding standby tasks: this should not happen." );
916
921
@@ -937,7 +942,7 @@ private void addStandbyTasks() {
937
942
938
943
// create all newly assigned standby tasks (guard against race condition with other thread via backoff and retry)
939
944
// -> other thread will call removeSuspendedStandbyTasks(); eventually
940
- new StandbyTaskCreator (checkpointedOffsets ).retryWithBackoff (newStandbyTasks );
945
+ new StandbyTaskCreator (checkpointedOffsets ).retryWithBackoff (newStandbyTasks , start );
941
946
942
947
restoreConsumer .assign (new ArrayList <>(checkpointedOffsets .keySet ()));
943
948
@@ -1126,7 +1131,7 @@ public void removeAllSensors() {
1126
1131
}
1127
1132
1128
1133
abstract class AbstractTaskCreator {
1129
- void retryWithBackoff (final Map <TaskId , Set <TopicPartition >> tasksToBeCreated ) {
1134
+ void retryWithBackoff (final Map <TaskId , Set <TopicPartition >> tasksToBeCreated , final long start ) {
1130
1135
long backoffTimeMs = 50L ;
1131
1136
while (true ) {
1132
1137
final Iterator <Map .Entry <TaskId , Set <TopicPartition >>> it = tasksToBeCreated .entrySet ().iterator ();
@@ -1138,13 +1143,14 @@ void retryWithBackoff(final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
1138
1143
try {
1139
1144
createTask (taskId , partitions );
1140
1145
it .remove ();
1146
+ backoffTimeMs = 50L ;
1141
1147
} catch (final LockException e ) {
1142
1148
// ignore and retry
1143
1149
log .warn ("Could not create task {}. Will retry." , taskId , e );
1144
1150
}
1145
1151
}
1146
1152
1147
- if (tasksToBeCreated .isEmpty ()) {
1153
+ if (tasksToBeCreated .isEmpty () || time . milliseconds () - start > rebalanceTimeoutMs ) {
1148
1154
break ;
1149
1155
}
1150
1156
@@ -1207,9 +1213,9 @@ public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
1207
1213
// will become active or vice versa
1208
1214
closeNonAssignedSuspendedStandbyTasks ();
1209
1215
closeNonAssignedSuspendedTasks ();
1210
- addStreamTasks (assignment );
1216
+ addStreamTasks (assignment , start );
1211
1217
storeChangelogReader .restore ();
1212
- addStandbyTasks ();
1218
+ addStandbyTasks (start );
1213
1219
streamsMetadataState .onChange (partitionAssignor .getPartitionsByHostState (), partitionAssignor .clusterMetadata ());
1214
1220
lastCleanMs = time .milliseconds (); // start the cleaning cycle
1215
1221
setStateWhenNotInPendingShutdown (State .RUNNING );
0 commit comments