18
18
package kafka .server .builders ;
19
19
20
20
import kafka .log .LogManager ;
21
- import kafka .log .remote .RemoteLogManager ;
22
- import kafka .server .AddPartitionsToTxnManager ;
23
21
import kafka .server .AlterPartitionManager ;
24
- import kafka .server .DelayedDeleteRecords ;
25
- import kafka .server .DelayedFetch ;
26
- import kafka .server .DelayedProduce ;
27
- import kafka .server .DelayedRemoteFetch ;
28
- import kafka .server .DelayedRemoteListOffsets ;
29
22
import kafka .server .KafkaConfig ;
30
23
import kafka .server .MetadataCache ;
31
24
import kafka .server .QuotaFactory .QuotaManagers ;
32
25
import kafka .server .ReplicaManager ;
33
- import kafka .server .share .DelayedShareFetch ;
34
26
35
27
import org .apache .kafka .common .metrics .Metrics ;
36
28
import org .apache .kafka .common .utils .Time ;
37
29
import org .apache .kafka .server .DelayedActionQueue ;
38
30
import org .apache .kafka .server .common .DirectoryEventHandler ;
39
- import org .apache .kafka .server .purgatory .DelayedOperationPurgatory ;
40
31
import org .apache .kafka .server .util .Scheduler ;
41
32
import org .apache .kafka .storage .internals .log .LogDirFailureChannel ;
42
33
import org .apache .kafka .storage .log .metrics .BrokerTopicStats ;
43
34
44
35
import java .util .Collections ;
45
- import java .util .Optional ;
46
36
import java .util .concurrent .atomic .AtomicBoolean ;
47
37
48
- import scala .jdk . javaapi . OptionConverters ;
38
+ import scala .Option ;
49
39
50
40
51
41
@@ -60,18 +50,6 @@ public class ReplicaManagerBuilder {
60
50
private LogDirFailureChannel logDirFailureChannel = null ;
61
51
private AlterPartitionManager alterPartitionManager = null ;
62
52
private BrokerTopicStats brokerTopicStats = null ;
63
- private AtomicBoolean isShuttingDown = new AtomicBoolean (false );
64
- private Optional <RemoteLogManager > remoteLogManager = Optional .empty ();
65
- private Optional <DelayedOperationPurgatory <DelayedProduce >> delayedProducePurgatory = Optional .empty ();
66
- private Optional <DelayedOperationPurgatory <DelayedFetch >> delayedFetchPurgatory = Optional .empty ();
67
- private Optional <DelayedOperationPurgatory <DelayedDeleteRecords >> delayedDeleteRecordsPurgatory = Optional .empty ();
68
- private Optional <DelayedOperationPurgatory <DelayedRemoteFetch >> delayedRemoteFetchPurgatory = Optional .empty ();
69
- private Optional <DelayedOperationPurgatory <DelayedRemoteListOffsets >> delayedRemoteListOffsetsPurgatory = Optional .empty ();
70
- private Optional <DelayedOperationPurgatory <DelayedShareFetch >> delayedShareFetchPurgatory = Optional .empty ();
71
- private Optional <String > threadNamePrefix = Optional .empty ();
72
- private Long brokerEpoch = -1L ;
73
- private Optional <AddPartitionsToTxnManager > addPartitionsToTxnManager = Optional .empty ();
74
- private DirectoryEventHandler directoryEventHandler = DirectoryEventHandler .NOOP ;
75
53
76
54
public ReplicaManagerBuilder setConfig (KafkaConfig config ) {
77
55
this .config = config ;
@@ -98,11 +76,6 @@ public ReplicaManagerBuilder setLogManager(LogManager logManager) {
98
76
return this ;
99
77
}
100
78
101
- public ReplicaManagerBuilder setRemoteLogManager (RemoteLogManager remoteLogManager ) {
102
- this .remoteLogManager = Optional .ofNullable (remoteLogManager );
103
- return this ;
104
- }
105
-
106
79
public ReplicaManagerBuilder setQuotaManagers (QuotaManagers quotaManagers ) {
107
80
this .quotaManagers = quotaManagers ;
108
81
return this ;
@@ -128,26 +101,6 @@ public ReplicaManagerBuilder setBrokerTopicStats(BrokerTopicStats brokerTopicSta
128
101
return this ;
129
102
}
130
103
131
- public ReplicaManagerBuilder setDelayedFetchPurgatory (DelayedOperationPurgatory <DelayedFetch > delayedFetchPurgatory ) {
132
- this .delayedFetchPurgatory = Optional .of (delayedFetchPurgatory );
133
- return this ;
134
- }
135
-
136
- public ReplicaManagerBuilder setThreadNamePrefix (String threadNamePrefix ) {
137
- this .threadNamePrefix = Optional .of (threadNamePrefix );
138
- return this ;
139
- }
140
-
141
- public ReplicaManagerBuilder setBrokerEpoch (long brokerEpoch ) {
142
- this .brokerEpoch = brokerEpoch ;
143
- return this ;
144
- }
145
-
146
- public ReplicaManagerBuilder setDirectoryEventHandler (DirectoryEventHandler directoryEventHandler ) {
147
- this .directoryEventHandler = directoryEventHandler ;
148
- return this ;
149
- }
150
-
151
104
public ReplicaManager build () {
152
105
if (config == null ) config = new KafkaConfig (Collections .emptyMap ());
153
106
if (logManager == null ) throw new RuntimeException ("You must set logManager" );
@@ -164,23 +117,23 @@ public ReplicaManager build() {
164
117
time ,
165
118
scheduler ,
166
119
logManager ,
167
- OptionConverters . toScala ( remoteLogManager ),
120
+ Option . empty ( ),
168
121
quotaManagers ,
169
122
metadataCache ,
170
123
logDirFailureChannel ,
171
124
alterPartitionManager ,
172
125
brokerTopicStats ,
173
- isShuttingDown ,
174
- OptionConverters . toScala ( delayedProducePurgatory ),
175
- OptionConverters . toScala ( delayedFetchPurgatory ),
176
- OptionConverters . toScala ( delayedDeleteRecordsPurgatory ),
177
- OptionConverters . toScala ( delayedRemoteFetchPurgatory ),
178
- OptionConverters . toScala ( delayedRemoteListOffsetsPurgatory ),
179
- OptionConverters . toScala ( delayedShareFetchPurgatory ),
180
- OptionConverters . toScala ( threadNamePrefix ),
181
- () -> brokerEpoch ,
182
- OptionConverters . toScala ( addPartitionsToTxnManager ),
183
- directoryEventHandler ,
126
+ new AtomicBoolean ( false ) ,
127
+ Option . empty ( ),
128
+ Option . empty ( ),
129
+ Option . empty ( ),
130
+ Option . empty ( ),
131
+ Option . empty ( ),
132
+ Option . empty ( ),
133
+ Option . empty ( ),
134
+ () -> - 1L ,
135
+ Option . empty ( ),
136
+ DirectoryEventHandler . NOOP ,
184
137
new DelayedActionQueue ());
185
138
}
186
139
}
0 commit comments