Skip to content

Commit d1e69dd

Browse files
ijumamjsax
authored andcommitted
KAFKA-18465: Remove MetadataVersions older than 3.0-IV1 (apache#18468)
Apache Kafka 4.0 will only support KRaft and 3.0-IV1 is the minimum version supported by KRaft. So, we can assume that Apache Kafka 4.0 will only communicate with brokers that are 3.0-IV1 or newer. Note that KRaft was only marked as production-ready in 3.3, so we could go further and set the baseline to 3.3. I think we should have that discussion, but it made sense to start with the non controversial parts. Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]>, David Jacot <[email protected]>
1 parent d0477cb commit d1e69dd

File tree

87 files changed

+427
-2177
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

87 files changed

+427
-2177
lines changed

clients/src/main/java/org/apache/kafka/clients/ClientUtils.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public static ChannelBuilder createChannelBuilder(AbstractConfig config, Time ti
118118
SecurityProtocol securityProtocol = SecurityProtocol.forName(config.getString(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
119119
String clientSaslMechanism = config.getString(SaslConfigs.SASL_MECHANISM);
120120
return ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT, config, null,
121-
clientSaslMechanism, time, true, logContext);
121+
clientSaslMechanism, time, logContext);
122122
}
123123

124124
static List<InetAddress> resolve(String host, HostResolver hostResolver) throws UnknownHostException {
@@ -275,4 +275,4 @@ public static ClusterResourceListeners configureClusterResourceListeners(List<?>
275275

276276
return clusterResourceListeners;
277277
}
278-
}
278+
}

clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java

+3-8
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,6 @@ private ChannelBuilders() { }
5656
* @param listenerName the listenerName if contextType is SERVER or null otherwise
5757
* @param clientSaslMechanism SASL mechanism if mode is CLIENT, ignored otherwise
5858
* @param time the time instance
59-
* @param saslHandshakeRequestEnable flag to enable Sasl handshake requests; disabled only for SASL
60-
* inter-broker connections with inter-broker protocol version < 0.10
6159
* @param logContext the log context instance
6260
*
6361
* @return the configured `ChannelBuilder`
@@ -70,7 +68,6 @@ public static ChannelBuilder clientChannelBuilder(
7068
ListenerName listenerName,
7169
String clientSaslMechanism,
7270
Time time,
73-
boolean saslHandshakeRequestEnable,
7471
LogContext logContext) {
7572

7673
if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) {
@@ -80,7 +77,7 @@ public static ChannelBuilder clientChannelBuilder(
8077
throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`");
8178
}
8279
return create(securityProtocol, ConnectionMode.CLIENT, contextType, config, listenerName, false, clientSaslMechanism,
83-
saslHandshakeRequestEnable, null, null, time, logContext, null);
80+
null, null, time, logContext, null);
8481
}
8582

8683
/**
@@ -106,8 +103,8 @@ public static ChannelBuilder serverChannelBuilder(ListenerName listenerName,
106103
LogContext logContext,
107104
Function<Short, ApiVersionsResponse> apiVersionSupplier) {
108105
return create(securityProtocol, ConnectionMode.SERVER, JaasContext.Type.SERVER, config, listenerName,
109-
isInterBrokerListener, null, true, credentialCache,
110-
tokenCache, time, logContext, apiVersionSupplier);
106+
isInterBrokerListener, null, credentialCache, tokenCache, time, logContext,
107+
apiVersionSupplier);
111108
}
112109

113110
private static ChannelBuilder create(SecurityProtocol securityProtocol,
@@ -117,7 +114,6 @@ private static ChannelBuilder create(SecurityProtocol securityProtocol,
117114
ListenerName listenerName,
118115
boolean isInterBrokerListener,
119116
String clientSaslMechanism,
120-
boolean saslHandshakeRequestEnable,
121117
CredentialCache credentialCache,
122118
DelegationTokenCache tokenCache,
123119
Time time,
@@ -175,7 +171,6 @@ private static ChannelBuilder create(SecurityProtocol securityProtocol,
175171
listenerName,
176172
isInterBrokerListener,
177173
clientSaslMechanism,
178-
saslHandshakeRequestEnable,
179174
credentialCache,
180175
tokenCache,
181176
sslClientAuthOverride,

clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
8585
private final String clientSaslMechanism;
8686
private final ConnectionMode connectionMode;
8787
private final Map<String, JaasContext> jaasContexts;
88-
private final boolean handshakeRequestEnable;
8988
private final CredentialCache credentialCache;
9089
private final DelegationTokenCache tokenCache;
9190
private final Map<String, LoginManager> loginManagers;
@@ -108,7 +107,6 @@ public SaslChannelBuilder(ConnectionMode connectionMode,
108107
ListenerName listenerName,
109108
boolean isInterBrokerListener,
110109
String clientSaslMechanism,
111-
boolean handshakeRequestEnable,
112110
CredentialCache credentialCache,
113111
DelegationTokenCache tokenCache,
114112
String sslClientAuthOverride,
@@ -122,7 +120,6 @@ public SaslChannelBuilder(ConnectionMode connectionMode,
122120
this.securityProtocol = securityProtocol;
123121
this.listenerName = listenerName;
124122
this.isInterBrokerListener = isInterBrokerListener;
125-
this.handshakeRequestEnable = handshakeRequestEnable;
126123
this.clientSaslMechanism = clientSaslMechanism;
127124
this.credentialCache = credentialCache;
128125
this.tokenCache = tokenCache;
@@ -295,7 +292,7 @@ protected SaslClientAuthenticator buildClientAuthenticator(Map<String, ?> config
295292
String servicePrincipal,
296293
TransportLayer transportLayer, Subject subject) {
297294
return new SaslClientAuthenticator(configs, callbackHandler, id, subject, servicePrincipal,
298-
serverHost, clientSaslMechanism, handshakeRequestEnable, transportLayer, time, logContext);
295+
serverHost, clientSaslMechanism, transportLayer, time, logContext);
299296
}
300297

301298
// Package private for testing

clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,12 @@ public static class Builder extends AbstractRequest.Builder<AlterPartitionReques
6868
* @param data The data to be sent. Note that because the version of the
6969
* request is not known at this time, it is expected that all
7070
* topics have a topic id and a topic name set.
71-
* @param canUseTopicIds True if version 2 and above can be used.
7271
*/
73-
public Builder(AlterPartitionRequestData data, boolean canUseTopicIds) {
72+
public Builder(AlterPartitionRequestData data) {
7473
super(
7574
ApiKeys.ALTER_PARTITION,
7675
ApiKeys.ALTER_PARTITION.oldestVersion(),
77-
// Version 1 is the maximum version that can be used without topic ids.
78-
canUseTopicIds ? ApiKeys.ALTER_PARTITION.latestVersion() : 1
76+
ApiKeys.ALTER_PARTITION.latestVersion()
7977
);
8078
this.data = data;
8179
}

clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java

+23-36
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.kafka.common.protocol.ApiKeys;
3232
import org.apache.kafka.common.protocol.ByteBufferAccessor;
3333
import org.apache.kafka.common.protocol.Errors;
34-
import org.apache.kafka.common.record.RecordVersion;
3534

3635
import java.nio.ByteBuffer;
3736
import java.util.Map;
@@ -172,35 +171,30 @@ public static ApiVersionsResponse parse(ByteBuffer buffer, short version) {
172171
}
173172

174173
public static ApiVersionCollection controllerApiVersions(
175-
RecordVersion minRecordVersion,
176174
NodeApiVersions controllerApiVersions,
177175
ListenerType listenerType,
178176
boolean enableUnstableLastVersion,
179177
boolean clientTelemetryEnabled
180178
) {
181179
return intersectForwardableApis(
182180
listenerType,
183-
minRecordVersion,
184181
controllerApiVersions.allSupportedApiVersions(),
185182
enableUnstableLastVersion,
186183
clientTelemetryEnabled);
187184
}
188185

189186
public static ApiVersionCollection brokerApiVersions(
190-
RecordVersion minRecordVersion,
191187
ListenerType listenerType,
192188
boolean enableUnstableLastVersion,
193189
boolean clientTelemetryEnabled
194190
) {
195191
return filterApis(
196-
minRecordVersion,
197192
listenerType,
198193
enableUnstableLastVersion,
199194
clientTelemetryEnabled);
200195
}
201196

202197
public static ApiVersionCollection filterApis(
203-
RecordVersion minRecordVersion,
204198
ApiMessageType.ListenerType listenerType,
205199
boolean enableUnstableLastVersion,
206200
boolean clientTelemetryEnabled
@@ -210,10 +204,7 @@ public static ApiVersionCollection filterApis(
210204
// Skip telemetry APIs if client telemetry is disabled.
211205
if ((apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == ApiKeys.PUSH_TELEMETRY) && !clientTelemetryEnabled)
212206
continue;
213-
214-
if (apiKey.minRequiredInterBrokerMagic <= minRecordVersion.value) {
215-
apiKey.toApiVersion(enableUnstableLastVersion).ifPresent(apiKeys::add);
216-
}
207+
apiKey.toApiVersion(enableUnstableLastVersion).ifPresent(apiKeys::add);
217208
}
218209
return apiKeys;
219210
}
@@ -234,50 +225,46 @@ public static ApiVersionCollection collectApis(
234225
* known range and that of another set.
235226
*
236227
* @param listenerType the listener type which constrains the set of exposed APIs
237-
* @param minRecordVersion min inter broker magic
238228
* @param activeControllerApiVersions controller ApiVersions
239229
* @param enableUnstableLastVersion whether unstable versions should be advertised or not
240230
* @param clientTelemetryEnabled whether client telemetry is enabled or not
241231
* @return commonly agreed ApiVersion collection
242232
*/
243233
public static ApiVersionCollection intersectForwardableApis(
244234
final ApiMessageType.ListenerType listenerType,
245-
final RecordVersion minRecordVersion,
246235
final Map<ApiKeys, ApiVersion> activeControllerApiVersions,
247236
boolean enableUnstableLastVersion,
248237
boolean clientTelemetryEnabled
249238
) {
250239
ApiVersionCollection apiKeys = new ApiVersionCollection();
251240
for (ApiKeys apiKey : ApiKeys.apisForListener(listenerType)) {
252-
if (apiKey.minRequiredInterBrokerMagic <= minRecordVersion.value) {
253-
final Optional<ApiVersion> brokerApiVersion = apiKey.toApiVersion(enableUnstableLastVersion);
254-
if (brokerApiVersion.isEmpty()) {
255-
// Broker does not support this API key.
256-
continue;
257-
}
241+
final Optional<ApiVersion> brokerApiVersion = apiKey.toApiVersion(enableUnstableLastVersion);
242+
if (brokerApiVersion.isEmpty()) {
243+
// Broker does not support this API key.
244+
continue;
245+
}
258246

259-
// Skip telemetry APIs if client telemetry is disabled.
260-
if ((apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == ApiKeys.PUSH_TELEMETRY) && !clientTelemetryEnabled)
261-
continue;
247+
// Skip telemetry APIs if client telemetry is disabled.
248+
if ((apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == ApiKeys.PUSH_TELEMETRY) && !clientTelemetryEnabled)
249+
continue;
262250

263-
final ApiVersion finalApiVersion;
264-
if (!apiKey.forwardable) {
265-
finalApiVersion = brokerApiVersion.get();
251+
final ApiVersion finalApiVersion;
252+
if (!apiKey.forwardable) {
253+
finalApiVersion = brokerApiVersion.get();
254+
} else {
255+
Optional<ApiVersion> intersectVersion = intersect(
256+
brokerApiVersion.get(),
257+
activeControllerApiVersions.getOrDefault(apiKey, null)
258+
);
259+
if (intersectVersion.isPresent()) {
260+
finalApiVersion = intersectVersion.get();
266261
} else {
267-
Optional<ApiVersion> intersectVersion = intersect(
268-
brokerApiVersion.get(),
269-
activeControllerApiVersions.getOrDefault(apiKey, null)
270-
);
271-
if (intersectVersion.isPresent()) {
272-
finalApiVersion = intersectVersion.get();
273-
} else {
274-
// Controller doesn't support this API key, or there is no intersection.
275-
continue;
276-
}
262+
// Controller doesn't support this API key, or there is no intersection.
263+
continue;
277264
}
278-
279-
apiKeys.add(finalApiVersion.duplicate());
280265
}
266+
267+
apiKeys.add(finalApiVersion.duplicate());
281268
}
282269
return apiKeys;
283270
}

clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,12 @@ public static Builder forConsumer(OffsetForLeaderTopicCollection epochsByPartiti
6565
return new Builder((short) 3, ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), data);
6666
}
6767

68-
public static Builder forFollower(short version, OffsetForLeaderTopicCollection epochsByPartition, int replicaId) {
68+
public static Builder forFollower(OffsetForLeaderTopicCollection epochsByPartition, int replicaId) {
6969
OffsetForLeaderEpochRequestData data = new OffsetForLeaderEpochRequestData();
7070
data.setReplicaId(replicaId);
7171
data.setTopics(epochsByPartition);
72-
return new Builder(version, version, data);
72+
// If we introduce new versions, we should gate them behind the appropriate metadata version
73+
return new Builder((short) 4, (short) 4, data);
7374
}
7475

7576
@Override

clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,8 @@ public Builder(WriteTxnMarkersRequestData data) {
110110
this.data = data;
111111
}
112112

113-
public Builder(short version, final List<TxnMarkerEntry> markers) {
114-
super(ApiKeys.WRITE_TXN_MARKERS, version);
113+
public Builder(final List<TxnMarkerEntry> markers) {
114+
super(ApiKeys.WRITE_TXN_MARKERS, (short) 1); // if we add new versions, gate them behind metadata version
115115
List<WritableTxnMarker> dataMarkers = new ArrayList<>();
116116
for (TxnMarkerEntry marker : markers) {
117117
final Map<String, WritableTxnMarkerTopic> topicMap = new HashMap<>();

clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,6 @@ public SaslClientAuthenticator(Map<String, ?> configs,
177177
String servicePrincipal,
178178
String host,
179179
String mechanism,
180-
boolean handshakeRequestEnable,
181180
TransportLayer transportLayer,
182181
Time time,
183182
LogContext logContext) {
@@ -196,7 +195,7 @@ public SaslClientAuthenticator(Map<String, ?> configs,
196195
this.reauthInfo = new ReauthInfo();
197196

198197
try {
199-
setSaslState(handshakeRequestEnable ? SaslState.SEND_APIVERSIONS_REQUEST : SaslState.INITIAL);
198+
setSaslState(SaslState.SEND_APIVERSIONS_REQUEST);
200199

201200
// determine client principal from subject for Kerberos to use as authorization id for the SaslClient.
202201
// For other mechanisms, the authenticated principal (username for PLAIN and SCRAM) is used as

clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,6 @@
161161
import org.apache.kafka.common.quota.ClientQuotaEntity;
162162
import org.apache.kafka.common.quota.ClientQuotaFilter;
163163
import org.apache.kafka.common.quota.ClientQuotaFilterComponent;
164-
import org.apache.kafka.common.record.RecordVersion;
165164
import org.apache.kafka.common.requests.AddRaftVoterRequest;
166165
import org.apache.kafka.common.requests.AddRaftVoterResponse;
167166
import org.apache.kafka.common.requests.AlterClientQuotasResponse;
@@ -777,7 +776,7 @@ private static ApiVersionsResponse prepareApiVersionsResponseForDescribeFeatures
777776
if (error == Errors.NONE) {
778777
return new ApiVersionsResponse.Builder().
779778
setApiVersions(ApiVersionsResponse.filterApis(
780-
RecordVersion.current(), ApiMessageType.ListenerType.ZK_BROKER, false, false)).
779+
ApiMessageType.ListenerType.ZK_BROKER, false, false)).
781780
setSupportedFeatures(
782781
convertSupportedFeaturesMap(defaultFeatureMetadata().supportedFeatures())).
783782
setFinalizedFeatures(

clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ public void testClientChannelBuilderWithBrokerConfigs() throws Exception {
166166
private SaslChannelBuilder createGssapiChannelBuilder(Map<String, JaasContext> jaasContexts, GSSManager gssManager) {
167167
SaslChannelBuilder channelBuilder = new SaslChannelBuilder(ConnectionMode.SERVER, jaasContexts,
168168
SecurityProtocol.SASL_PLAINTEXT, new ListenerName("GSSAPI"), false, "GSSAPI",
169-
true, null, null, null, Time.SYSTEM, new LogContext(), defaultApiVersionsSupplier()) {
169+
null, null, null, Time.SYSTEM, new LogContext(), defaultApiVersionsSupplier()) {
170170

171171
@Override
172172
protected GSSManager gssManager() {
@@ -205,7 +205,7 @@ private SaslChannelBuilder createChannelBuilder(SecurityProtocol securityProtoco
205205
JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig, null);
206206
Map<String, JaasContext> jaasContexts = Collections.singletonMap(saslMechanism, jaasContext);
207207
return new SaslChannelBuilder(ConnectionMode.CLIENT, jaasContexts, securityProtocol, new ListenerName(saslMechanism),
208-
false, saslMechanism, true, null,
208+
false, saslMechanism, null,
209209
null, null, Time.SYSTEM, new LogContext(), defaultApiVersionsSupplier());
210210
}
211211

clients/src/test/java/org/apache/kafka/common/requests/AlterPartitionRequestTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void testBuildAlterPartitionRequest(short version) {
6161

6262
request.topics().add(topicData);
6363

64-
AlterPartitionRequest.Builder builder = new AlterPartitionRequest.Builder(request, version > 1);
64+
AlterPartitionRequest.Builder builder = new AlterPartitionRequest.Builder(request);
6565
AlterPartitionRequest alterPartitionRequest = builder.build(version);
6666
assertEquals(1, alterPartitionRequest.data().topics().size());
6767
assertEquals(1, alterPartitionRequest.data().topics().get(0).partitions().size());

0 commit comments

Comments
 (0)