@@ -3370,10 +3370,7 @@ class KafkaApis(val requestChannel: RequestChannel,
3370
3370
} else {
3371
3371
metadataSupport match {
3372
3372
case ZkSupport (adminManager, controller, zkClient, forwardingManager, metadataCache, _) =>
3373
- val result = adminManager.describeUserScramCredentials(
3374
- Option (describeUserScramCredentialsRequest.data.users).map(_.asScala.map(_.name).toList))
3375
- requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
3376
- new DescribeUserScramCredentialsResponse (result.setThrottleTimeMs(requestThrottleMs)))
3373
+ throw KafkaApis .shouldNeverReceive(request)
3377
3374
case RaftSupport (_, metadataCache) =>
3378
3375
val result = metadataCache.describeScramCredentials(describeUserScramCredentialsRequest.data())
3379
3376
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
@@ -3383,21 +3380,7 @@ class KafkaApis(val requestChannel: RequestChannel,
3383
3380
}
3384
3381
3385
3382
def handleAlterUserScramCredentialsRequest (request : RequestChannel .Request ): Unit = {
3386
- val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis .shouldAlwaysForward(request))
3387
- val alterUserScramCredentialsRequest = request.body[AlterUserScramCredentialsRequest ]
3388
-
3389
- if (! zkSupport.controller.isActive) {
3390
- requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
3391
- alterUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, Errors .NOT_CONTROLLER .exception))
3392
- } else if (authHelper.authorize(request.context, ALTER , CLUSTER , CLUSTER_NAME )) {
3393
- val result = zkSupport.adminManager.alterUserScramCredentials(
3394
- alterUserScramCredentialsRequest.data.upsertions().asScala, alterUserScramCredentialsRequest.data.deletions().asScala)
3395
- requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
3396
- new AlterUserScramCredentialsResponse (result.setThrottleTimeMs(requestThrottleMs)))
3397
- } else {
3398
- requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
3399
- alterUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, Errors .CLUSTER_AUTHORIZATION_FAILED .exception))
3400
- }
3383
+ throw KafkaApis .shouldAlwaysForward(request)
3401
3384
}
3402
3385
3403
3386
def handleAlterPartitionRequest (request : RequestChannel .Request ): Unit = {
0 commit comments