Skip to content

Commit feaafc5

Browse files
committed
KAFKA-18399 Remove ZooKeeper from KafkaApis (6/N): USER_SCRAM_CREDENTIALS
1 parent cd061c8 commit feaafc5

File tree

3 files changed

+28
-64
lines changed

3 files changed

+28
-64
lines changed

Diff for: core/src/main/scala/kafka/server/KafkaApis.scala

+27-56
Original file line numberDiff line numberDiff line change
@@ -136,17 +136,19 @@ class KafkaApis(val requestChannel: RequestChannel,
136136
info("Shutdown complete.")
137137
}
138138

139-
private def maybeForwardToController(
140-
request: RequestChannel.Request,
141-
handler: RequestChannel.Request => Unit
142-
): Unit = {
139+
private def forwardToController(request: RequestChannel.Request): Unit = {
143140
def responseCallback(responseOpt: Option[AbstractResponse]): Unit = {
144141
responseOpt match {
145142
case Some(response) => requestHelper.sendForwardedResponse(request, response)
146143
case None => handleInvalidVersionsDuringForwarding(request)
147144
}
148145
}
149-
metadataSupport.maybeForward(request, handler, responseCallback)
146+
147+
def errorHandler(request: RequestChannel.Request): Unit = {
148+
throw new IllegalStateException(s"Unable to forward $request to the controller")
149+
}
150+
151+
metadataSupport.maybeForward(request, errorHandler, responseCallback)
150152
}
151153

152154
private def handleInvalidVersionsDuringForwarding(request: RequestChannel.Request): Unit = {
@@ -156,16 +158,6 @@ class KafkaApis(val requestChannel: RequestChannel,
156158
requestChannel.closeConnection(request, Collections.emptyMap())
157159
}
158160

159-
private def forwardToControllerOrFail(
160-
request: RequestChannel.Request
161-
): Unit = {
162-
def errorHandler(request: RequestChannel.Request): Unit = {
163-
throw new IllegalStateException(s"Unable to forward $request to the controller")
164-
}
165-
166-
maybeForwardToController(request, errorHandler)
167-
}
168-
169161
/**
170162
* Top-level method that handles all requests and multiplexes to the right api
171163
*/
@@ -202,8 +194,8 @@ class KafkaApis(val requestChannel: RequestChannel,
202194
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request).exceptionally(handleError)
203195
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
204196
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
205-
case ApiKeys.CREATE_TOPICS => maybeForwardToController(request, handleCreateTopicsRequest)
206-
case ApiKeys.DELETE_TOPICS => maybeForwardToController(request, handleDeleteTopicsRequest)
197+
case ApiKeys.CREATE_TOPICS => forwardToController(request)
198+
case ApiKeys.DELETE_TOPICS => forwardToController(request)
207199
case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
208200
case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request, requestLocal)
209201
case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
@@ -213,47 +205,47 @@ class KafkaApis(val requestChannel: RequestChannel,
213205
case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request, requestLocal)
214206
case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request, requestLocal).exceptionally(handleError)
215207
case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
216-
case ApiKeys.CREATE_ACLS => maybeForwardToController(request, handleCreateAcls)
217-
case ApiKeys.DELETE_ACLS => maybeForwardToController(request, handleDeleteAcls)
208+
case ApiKeys.CREATE_ACLS => forwardToController(request)
209+
case ApiKeys.DELETE_ACLS => forwardToController(request)
218210
case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
219211
case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
220212
case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
221213
case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
222214
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
223-
case ApiKeys.CREATE_PARTITIONS => maybeForwardToController(request, handleCreatePartitionsRequest)
215+
case ApiKeys.CREATE_PARTITIONS => forwardToController(request)
224216
// Create, renew and expire DelegationTokens must first validate that the connection
225217
// itself is not authenticated with a delegation token before maybeForwardToController.
226218
case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
227219
case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
228220
case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
229221
case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
230222
case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request, requestLocal).exceptionally(handleError)
231-
case ApiKeys.ELECT_LEADERS => maybeForwardToController(request, handleElectLeaders)
223+
case ApiKeys.ELECT_LEADERS => forwardToController(request)
232224
case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigsRequest(request)
233-
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleAlterPartitionReassignmentsRequest)
234-
case ApiKeys.LIST_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleListPartitionReassignmentsRequest)
225+
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => forwardToController(request)
226+
case ApiKeys.LIST_PARTITION_REASSIGNMENTS => forwardToController(request)
235227
case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request, requestLocal).exceptionally(handleError)
236228
case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request)
237-
case ApiKeys.ALTER_CLIENT_QUOTAS => maybeForwardToController(request, handleAlterClientQuotasRequest)
229+
case ApiKeys.ALTER_CLIENT_QUOTAS => forwardToController(request)
238230
case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => handleDescribeUserScramCredentialsRequest(request)
239-
case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => maybeForwardToController(request, handleAlterUserScramCredentialsRequest)
231+
case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => forwardToController(request)
240232
case ApiKeys.ALTER_PARTITION => handleAlterPartitionRequest(request)
241-
case ApiKeys.UPDATE_FEATURES => maybeForwardToController(request, handleUpdateFeatures)
233+
case ApiKeys.UPDATE_FEATURES => forwardToController(request)
242234
case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
243235
case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request)
244-
case ApiKeys.UNREGISTER_BROKER => forwardToControllerOrFail(request)
236+
case ApiKeys.UNREGISTER_BROKER => forwardToController(request)
245237
case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request)
246238
case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request)
247239
case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
248-
case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request)
240+
case ApiKeys.DESCRIBE_QUORUM => forwardToController(request)
249241
case ApiKeys.CONSUMER_GROUP_HEARTBEAT => handleConsumerGroupHeartbeat(request).exceptionally(handleError)
250242
case ApiKeys.CONSUMER_GROUP_DESCRIBE => handleConsumerGroupDescribe(request).exceptionally(handleError)
251243
case ApiKeys.DESCRIBE_TOPIC_PARTITIONS => handleDescribeTopicPartitionsRequest(request)
252244
case ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS => handleGetTelemetrySubscriptionsRequest(request)
253245
case ApiKeys.PUSH_TELEMETRY => handlePushTelemetryRequest(request)
254246
case ApiKeys.LIST_CLIENT_METRICS_RESOURCES => handleListClientMetricsResources(request)
255-
case ApiKeys.ADD_RAFT_VOTER => forwardToControllerOrFail(request)
256-
case ApiKeys.REMOVE_RAFT_VOTER => forwardToControllerOrFail(request)
247+
case ApiKeys.ADD_RAFT_VOTER => forwardToController(request)
248+
case ApiKeys.REMOVE_RAFT_VOTER => forwardToController(request)
257249
case ApiKeys.SHARE_GROUP_HEARTBEAT => handleShareGroupHeartbeat(request).exceptionally(handleError)
258250
case ApiKeys.SHARE_GROUP_DESCRIBE => handleShareGroupDescribe(request).exceptionally(handleError)
259251
case ApiKeys.SHARE_FETCH => handleShareFetchRequest(request)
@@ -2806,7 +2798,7 @@ class KafkaApis(val requestChannel: RequestChannel,
28062798
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, requestThrottleMs,
28072799
Errors.INVALID_PRINCIPAL_TYPE, owner, requester))
28082800
} else {
2809-
maybeForwardToController(request, handleCreateTokenRequestZk)
2801+
forwardToController(request)
28102802
}
28112803
}
28122804

@@ -2859,7 +2851,7 @@ class KafkaApis(val requestChannel: RequestChannel,
28592851
.setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
28602852
.setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp)))
28612853
} else {
2862-
maybeForwardToController(request, handleRenewTokenRequestZk)
2854+
forwardToController(request)
28632855
}
28642856
}
28652857

@@ -2905,7 +2897,7 @@ class KafkaApis(val requestChannel: RequestChannel,
29052897
.setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
29062898
.setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp)))
29072899
} else {
2908-
maybeForwardToController(request, handleExpireTokenRequestZk)
2900+
forwardToController(request)
29092901
}
29102902
}
29112903

@@ -3208,37 +3200,16 @@ class KafkaApis(val requestChannel: RequestChannel,
32083200
describeUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
32093201
} else {
32103202
metadataSupport match {
3211-
case ZkSupport(adminManager, controller, zkClient, forwardingManager, metadataCache, _) =>
3212-
val result = adminManager.describeUserScramCredentials(
3213-
Option(describeUserScramCredentialsRequest.data.users).map(_.asScala.map(_.name).toList))
3214-
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
3215-
new DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
32163203
case RaftSupport(_, metadataCache) =>
32173204
val result = metadataCache.describeScramCredentials(describeUserScramCredentialsRequest.data())
32183205
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
32193206
new DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
3207+
case _ =>
3208+
throw KafkaApis.shouldNeverReceive(request)
32203209
}
32213210
}
32223211
}
32233212

3224-
def handleAlterUserScramCredentialsRequest(request: RequestChannel.Request): Unit = {
3225-
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
3226-
val alterUserScramCredentialsRequest = request.body[AlterUserScramCredentialsRequest]
3227-
3228-
if (!zkSupport.controller.isActive) {
3229-
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
3230-
alterUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, Errors.NOT_CONTROLLER.exception))
3231-
} else if (authHelper.authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) {
3232-
val result = zkSupport.adminManager.alterUserScramCredentials(
3233-
alterUserScramCredentialsRequest.data.upsertions().asScala, alterUserScramCredentialsRequest.data.deletions().asScala)
3234-
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
3235-
new AlterUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
3236-
} else {
3237-
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
3238-
alterUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
3239-
}
3240-
}
3241-
32423213
def handleAlterPartitionRequest(request: RequestChannel.Request): Unit = {
32433214
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
32443215
val alterPartitionRequest = request.body[AlterPartitionRequest]

Diff for: core/src/main/scala/kafka/server/MetadataSupport.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ sealed trait MetadataSupport {
6363
handler: RequestChannel.Request => Unit,
6464
responseCallback: Option[AbstractResponse] => Unit
6565
): Unit = {
66-
if (!request.isForwarded && canForward()) {
66+
if (!request.isForwarded) {
6767
forwardingManager.get.forwardRequest(request, responseCallback)
6868
} else {
6969
handler(request)

Diff for: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

-7
Original file line numberDiff line numberDiff line change
@@ -10514,13 +10514,6 @@ class KafkaApisTest extends Logging {
1051410514
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterClientQuotasRequest)
1051510515
}
1051610516

10517-
@Test
10518-
def testRaftShouldAlwaysForwardAlterUserScramCredentialsRequest(): Unit = {
10519-
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
10520-
kafkaApis = createKafkaApis(raftSupport = true)
10521-
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterUserScramCredentialsRequest)
10522-
}
10523-
1052410517
@Test
1052510518
def testRaftShouldAlwaysForwardUpdateFeatures(): Unit = {
1052610519
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)

0 commit comments

Comments
 (0)