Skip to content

Commit f50fd46

Browse files
frankvickymanoj-mathivanan
authored andcommitted
KAFKA-18399 Remove ZooKeeper from KafkaApis (3/N): USER_SCRAM_CREDENTIALS (apache#18456)
Reviewers: Chia-Ping Tsai <[email protected]>
1 parent b330f05 commit f50fd46

File tree

3 files changed

+26
-215
lines changed

3 files changed

+26
-215
lines changed

Diff for: core/src/main/scala/kafka/server/KafkaApis.scala

+23-56
Original file line numberDiff line numberDiff line change
@@ -135,17 +135,15 @@ class KafkaApis(val requestChannel: RequestChannel,
135135
info("Shutdown complete.")
136136
}
137137

138-
private def maybeForwardToController(
139-
request: RequestChannel.Request,
140-
handler: RequestChannel.Request => Unit
141-
): Unit = {
138+
private def forwardToController(request: RequestChannel.Request): Unit = {
142139
def responseCallback(responseOpt: Option[AbstractResponse]): Unit = {
143140
responseOpt match {
144141
case Some(response) => requestHelper.sendForwardedResponse(request, response)
145142
case None => handleInvalidVersionsDuringForwarding(request)
146143
}
147144
}
148-
metadataSupport.maybeForward(request, handler, responseCallback)
145+
146+
metadataSupport.forward(request, responseCallback)
149147
}
150148

151149
private def handleInvalidVersionsDuringForwarding(request: RequestChannel.Request): Unit = {
@@ -155,16 +153,6 @@ class KafkaApis(val requestChannel: RequestChannel,
155153
requestChannel.closeConnection(request, Collections.emptyMap())
156154
}
157155

158-
private def forwardToControllerOrFail(
159-
request: RequestChannel.Request
160-
): Unit = {
161-
def errorHandler(request: RequestChannel.Request): Unit = {
162-
throw new IllegalStateException(s"Unable to forward $request to the controller")
163-
}
164-
165-
maybeForwardToController(request, errorHandler)
166-
}
167-
168156
/**
169157
* Top-level method that handles all requests and multiplexes to the right api
170158
*/
@@ -201,8 +189,8 @@ class KafkaApis(val requestChannel: RequestChannel,
201189
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request).exceptionally(handleError)
202190
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
203191
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
204-
case ApiKeys.CREATE_TOPICS => maybeForwardToController(request, handleCreateTopicsRequest)
205-
case ApiKeys.DELETE_TOPICS => maybeForwardToController(request, handleDeleteTopicsRequest)
192+
case ApiKeys.CREATE_TOPICS => forwardToController(request)
193+
case ApiKeys.DELETE_TOPICS => forwardToController(request)
206194
case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
207195
case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request, requestLocal)
208196
case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
@@ -212,47 +200,47 @@ class KafkaApis(val requestChannel: RequestChannel,
212200
case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request, requestLocal)
213201
case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request, requestLocal).exceptionally(handleError)
214202
case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
215-
case ApiKeys.CREATE_ACLS => maybeForwardToController(request, handleCreateAcls)
216-
case ApiKeys.DELETE_ACLS => maybeForwardToController(request, handleDeleteAcls)
203+
case ApiKeys.CREATE_ACLS => forwardToController(request)
204+
case ApiKeys.DELETE_ACLS => forwardToController(request)
217205
case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
218206
case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
219207
case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
220208
case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
221209
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
222-
case ApiKeys.CREATE_PARTITIONS => maybeForwardToController(request, handleCreatePartitionsRequest)
210+
case ApiKeys.CREATE_PARTITIONS => forwardToController(request)
223211
// Create, renew and expire DelegationTokens must first validate that the connection
224212
// itself is not authenticated with a delegation token before maybeForwardToController.
225213
case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
226214
case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
227215
case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
228216
case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
229217
case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request, requestLocal).exceptionally(handleError)
230-
case ApiKeys.ELECT_LEADERS => maybeForwardToController(request, handleElectLeaders)
218+
case ApiKeys.ELECT_LEADERS => forwardToController(request)
231219
case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigsRequest(request)
232-
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleAlterPartitionReassignmentsRequest)
233-
case ApiKeys.LIST_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleListPartitionReassignmentsRequest)
220+
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => forwardToController(request)
221+
case ApiKeys.LIST_PARTITION_REASSIGNMENTS => forwardToController(request)
234222
case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request, requestLocal).exceptionally(handleError)
235223
case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request)
236-
case ApiKeys.ALTER_CLIENT_QUOTAS => maybeForwardToController(request, handleAlterClientQuotasRequest)
224+
case ApiKeys.ALTER_CLIENT_QUOTAS => forwardToController(request)
237225
case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => handleDescribeUserScramCredentialsRequest(request)
238-
case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => maybeForwardToController(request, handleAlterUserScramCredentialsRequest)
226+
case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => forwardToController(request)
239227
case ApiKeys.ALTER_PARTITION => handleAlterPartitionRequest(request)
240-
case ApiKeys.UPDATE_FEATURES => maybeForwardToController(request, handleUpdateFeatures)
228+
case ApiKeys.UPDATE_FEATURES => forwardToController(request)
241229
case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
242230
case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request)
243-
case ApiKeys.UNREGISTER_BROKER => forwardToControllerOrFail(request)
231+
case ApiKeys.UNREGISTER_BROKER => forwardToController(request)
244232
case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request)
245233
case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request)
246234
case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
247-
case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request)
235+
case ApiKeys.DESCRIBE_QUORUM => forwardToController(request)
248236
case ApiKeys.CONSUMER_GROUP_HEARTBEAT => handleConsumerGroupHeartbeat(request).exceptionally(handleError)
249237
case ApiKeys.CONSUMER_GROUP_DESCRIBE => handleConsumerGroupDescribe(request).exceptionally(handleError)
250238
case ApiKeys.DESCRIBE_TOPIC_PARTITIONS => handleDescribeTopicPartitionsRequest(request)
251239
case ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS => handleGetTelemetrySubscriptionsRequest(request)
252240
case ApiKeys.PUSH_TELEMETRY => handlePushTelemetryRequest(request)
253241
case ApiKeys.LIST_CLIENT_METRICS_RESOURCES => handleListClientMetricsResources(request)
254-
case ApiKeys.ADD_RAFT_VOTER => forwardToControllerOrFail(request)
255-
case ApiKeys.REMOVE_RAFT_VOTER => forwardToControllerOrFail(request)
242+
case ApiKeys.ADD_RAFT_VOTER => forwardToController(request)
243+
case ApiKeys.REMOVE_RAFT_VOTER => forwardToController(request)
256244
case ApiKeys.SHARE_GROUP_HEARTBEAT => handleShareGroupHeartbeat(request).exceptionally(handleError)
257245
case ApiKeys.SHARE_GROUP_DESCRIBE => handleShareGroupDescribe(request).exceptionally(handleError)
258246
case ApiKeys.SHARE_FETCH => handleShareFetchRequest(request)
@@ -2771,7 +2759,7 @@ class KafkaApis(val requestChannel: RequestChannel,
27712759
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, requestThrottleMs,
27722760
Errors.INVALID_PRINCIPAL_TYPE, owner, requester))
27732761
} else {
2774-
maybeForwardToController(request, handleCreateTokenRequestZk)
2762+
forwardToController(request)
27752763
}
27762764
}
27772765

@@ -2824,7 +2812,7 @@ class KafkaApis(val requestChannel: RequestChannel,
28242812
.setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
28252813
.setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp)))
28262814
} else {
2827-
maybeForwardToController(request, handleRenewTokenRequestZk)
2815+
forwardToController(request)
28282816
}
28292817
}
28302818

@@ -2870,7 +2858,7 @@ class KafkaApis(val requestChannel: RequestChannel,
28702858
.setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
28712859
.setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp)))
28722860
} else {
2873-
maybeForwardToController(request, handleExpireTokenRequestZk)
2861+
forwardToController(request)
28742862
}
28752863
}
28762864

@@ -3173,37 +3161,16 @@ class KafkaApis(val requestChannel: RequestChannel,
31733161
describeUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
31743162
} else {
31753163
metadataSupport match {
3176-
case ZkSupport(adminManager, controller, zkClient, forwardingManager, metadataCache, _) =>
3177-
val result = adminManager.describeUserScramCredentials(
3178-
Option(describeUserScramCredentialsRequest.data.users).map(_.asScala.map(_.name).toList))
3179-
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
3180-
new DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
31813164
case RaftSupport(_, metadataCache) =>
31823165
val result = metadataCache.describeScramCredentials(describeUserScramCredentialsRequest.data())
31833166
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
31843167
new DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
3168+
case _ =>
3169+
throw KafkaApis.shouldNeverReceive(request)
31853170
}
31863171
}
31873172
}
31883173

3189-
def handleAlterUserScramCredentialsRequest(request: RequestChannel.Request): Unit = {
3190-
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
3191-
val alterUserScramCredentialsRequest = request.body[AlterUserScramCredentialsRequest]
3192-
3193-
if (!zkSupport.controller.isActive) {
3194-
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
3195-
alterUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, Errors.NOT_CONTROLLER.exception))
3196-
} else if (authHelper.authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) {
3197-
val result = zkSupport.adminManager.alterUserScramCredentials(
3198-
alterUserScramCredentialsRequest.data.upsertions().asScala, alterUserScramCredentialsRequest.data.deletions().asScala)
3199-
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
3200-
new AlterUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
3201-
} else {
3202-
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
3203-
alterUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
3204-
}
3205-
}
3206-
32073174
def handleAlterPartitionRequest(request: RequestChannel.Request): Unit = {
32083175
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
32093176
val alterPartitionRequest = request.body[AlterPartitionRequest]

Diff for: core/src/main/scala/kafka/server/MetadataSupport.scala

+2-7
Original file line numberDiff line numberDiff line change
@@ -58,16 +58,11 @@ sealed trait MetadataSupport {
5858

5959
def canForward(): Boolean
6060

61-
def maybeForward(
61+
def forward(
6262
request: RequestChannel.Request,
63-
handler: RequestChannel.Request => Unit,
6463
responseCallback: Option[AbstractResponse] => Unit
6564
): Unit = {
66-
if (!request.isForwarded && canForward()) {
67-
forwardingManager.get.forwardRequest(request, responseCallback)
68-
} else {
69-
handler(request)
70-
}
65+
forwardingManager.get.forwardRequest(request, responseCallback)
7166
}
7267
}
7368

0 commit comments

Comments
 (0)