Skip to content

Commit 4613b39

Browse files
committed
KAFKA-18399 Remove ZooKeeper from KafkaApis: ALTER_CLIENT_QUOTAS, handleAllocateProducerIdsRequest
1 parent 33556ae commit 4613b39

File tree

1 file changed

+2
-41
lines changed

1 file changed

+2
-41
lines changed

Diff for: core/src/main/scala/kafka/server/KafkaApis.scala

+2-41
Original file line numberDiff line numberDiff line change
@@ -2823,34 +2823,7 @@ class KafkaApis(val requestChannel: RequestChannel,
28232823
}
28242824

28252825
def handleAlterClientQuotasRequest(request: RequestChannel.Request): Unit = {
2826-
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
2827-
val alterClientQuotasRequest = request.body[AlterClientQuotasRequest]
2828-
2829-
if (authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)) {
2830-
val result = zkSupport.adminManager.alterClientQuotas(alterClientQuotasRequest.entries.asScala,
2831-
alterClientQuotasRequest.validateOnly)
2832-
2833-
val entriesData = result.iterator.map { case (quotaEntity, apiError) =>
2834-
val entityData = quotaEntity.entries.asScala.iterator.map { case (key, value) =>
2835-
new AlterClientQuotasResponseData.EntityData()
2836-
.setEntityType(key)
2837-
.setEntityName(value)
2838-
}.toBuffer
2839-
2840-
new AlterClientQuotasResponseData.EntryData()
2841-
.setErrorCode(apiError.error.code)
2842-
.setErrorMessage(apiError.message)
2843-
.setEntity(entityData.asJava)
2844-
}.toBuffer
2845-
2846-
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
2847-
new AlterClientQuotasResponse(new AlterClientQuotasResponseData()
2848-
.setThrottleTimeMs(requestThrottleMs)
2849-
.setEntries(entriesData.asJava)))
2850-
} else {
2851-
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
2852-
alterClientQuotasRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
2853-
}
2826+
throw KafkaApis.shouldAlwaysForward(request)
28542827
}
28552828

28562829
def handleDescribeUserScramCredentialsRequest(request: RequestChannel.Request): Unit = {
@@ -3056,19 +3029,7 @@ class KafkaApis(val requestChannel: RequestChannel,
30563029
}
30573030

30583031
def handleAllocateProducerIdsRequest(request: RequestChannel.Request): Unit = {
3059-
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
3060-
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
3061-
3062-
val allocateProducerIdsRequest = request.body[AllocateProducerIdsRequest]
3063-
3064-
if (!zkSupport.controller.isActive)
3065-
requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs =>
3066-
allocateProducerIdsRequest.getErrorResponse(throttleTimeMs, Errors.NOT_CONTROLLER.exception))
3067-
else
3068-
zkSupport.controller.allocateProducerIds(allocateProducerIdsRequest.data, producerIdsResponse =>
3069-
requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs =>
3070-
new AllocateProducerIdsResponse(producerIdsResponse.setThrottleTimeMs(throttleTimeMs)))
3071-
)
3032+
throw KafkaApis.shouldNeverReceive(request)
30723033
}
30733034

30743035
private def groupVersion(): GroupVersion = {

0 commit comments

Comments
 (0)