Skip to content

Commit f303625

Browse files
mingdaoychia7712
authored andcommitted
KAFKA-18399 Remove ZooKeeper from KafkaApis (9/N): ALTER_CLIENT_QUOTAS and ALLOCATE_PRODUCER_IDS (#18465)
Reviewers: Ismael Juma <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent c7654f7 commit f303625

File tree

2 files changed

+1
-99
lines changed

2 files changed

+1
-99
lines changed

Diff for: core/src/main/scala/kafka/server/KafkaApis.scala

-48
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,6 @@ class KafkaApis(val requestChannel: RequestChannel,
222222
case ApiKeys.UNREGISTER_BROKER => forwardToController(request)
223223
case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request)
224224
case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request)
225-
case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
226225
case ApiKeys.DESCRIBE_QUORUM => forwardToController(request)
227226
case ApiKeys.CONSUMER_GROUP_HEARTBEAT => handleConsumerGroupHeartbeat(request).exceptionally(handleError)
228227
case ApiKeys.CONSUMER_GROUP_DESCRIBE => handleConsumerGroupDescribe(request).exceptionally(handleError)
@@ -2506,37 +2505,6 @@ class KafkaApis(val requestChannel: RequestChannel,
25062505
}
25072506
}
25082507

2509-
def handleAlterClientQuotasRequest(request: RequestChannel.Request): Unit = {
2510-
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
2511-
val alterClientQuotasRequest = request.body[AlterClientQuotasRequest]
2512-
2513-
if (authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)) {
2514-
val result = zkSupport.adminManager.alterClientQuotas(alterClientQuotasRequest.entries.asScala,
2515-
alterClientQuotasRequest.validateOnly)
2516-
2517-
val entriesData = result.iterator.map { case (quotaEntity, apiError) =>
2518-
val entityData = quotaEntity.entries.asScala.iterator.map { case (key, value) =>
2519-
new AlterClientQuotasResponseData.EntityData()
2520-
.setEntityType(key)
2521-
.setEntityName(value)
2522-
}.toBuffer
2523-
2524-
new AlterClientQuotasResponseData.EntryData()
2525-
.setErrorCode(apiError.error.code)
2526-
.setErrorMessage(apiError.message)
2527-
.setEntity(entityData.asJava)
2528-
}.toBuffer
2529-
2530-
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
2531-
new AlterClientQuotasResponse(new AlterClientQuotasResponseData()
2532-
.setThrottleTimeMs(requestThrottleMs)
2533-
.setEntries(entriesData.asJava)))
2534-
} else {
2535-
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
2536-
alterClientQuotasRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
2537-
}
2538-
}
2539-
25402508
def handleDescribeUserScramCredentialsRequest(request: RequestChannel.Request): Unit = {
25412509
val describeUserScramCredentialsRequest = request.body[DescribeUserScramCredentialsRequest]
25422510

@@ -2694,22 +2662,6 @@ class KafkaApis(val requestChannel: RequestChannel,
26942662
new ListTransactionsResponse(response.setThrottleTimeMs(requestThrottleMs)))
26952663
}
26962664

2697-
def handleAllocateProducerIdsRequest(request: RequestChannel.Request): Unit = {
2698-
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
2699-
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
2700-
2701-
val allocateProducerIdsRequest = request.body[AllocateProducerIdsRequest]
2702-
2703-
if (!zkSupport.controller.isActive)
2704-
requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs =>
2705-
allocateProducerIdsRequest.getErrorResponse(throttleTimeMs, Errors.NOT_CONTROLLER.exception))
2706-
else
2707-
zkSupport.controller.allocateProducerIds(allocateProducerIdsRequest.data, producerIdsResponse =>
2708-
requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs =>
2709-
new AllocateProducerIdsResponse(producerIdsResponse.setThrottleTimeMs(throttleTimeMs)))
2710-
)
2711-
}
2712-
27132665
private def groupVersion(): GroupVersion = {
27142666
GroupVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(GroupVersion.FEATURE_NAME, 0.toShort))
27152667
}

Diff for: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

+1-51
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.kafka.common.compress.Compression
3535
import org.apache.kafka.common.config.ConfigResource
3636
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER}
3737
import org.apache.kafka.common.errors.{ClusterAuthorizationException, UnsupportedVersionException}
38-
import org.apache.kafka.common.internals.{KafkaFutureImpl, Topic}
38+
import org.apache.kafka.common.internals.Topic
3939
import org.apache.kafka.common.memory.MemoryPool
4040
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection}
4141
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResult
@@ -61,7 +61,6 @@ import org.apache.kafka.common.message._
6161
import org.apache.kafka.common.metrics.Metrics
6262
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
6363
import org.apache.kafka.common.protocol.{ApiKeys, Errors, MessageUtil}
64-
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
6564
import org.apache.kafka.common.record._
6665
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType
6766
import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
@@ -730,48 +729,6 @@ class KafkaApisTest extends Logging {
730729
assertEquals(expectedResults, responseMap)
731730
}
732731

733-
@Test
734-
def testAlterClientQuotasWithAuthorizer(): Unit = {
735-
val authorizer: Authorizer = mock(classOf[Authorizer])
736-
737-
authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER,
738-
Resource.CLUSTER_NAME, AuthorizationResult.DENIED)
739-
740-
val quotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
741-
val quotas = Seq(new ClientQuotaAlteration(quotaEntity, Seq.empty.asJavaCollection))
742-
743-
val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion, clientId, 0)
744-
745-
val alterClientQuotasRequest = new AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
746-
.build(requestHeader.apiVersion)
747-
val request = buildRequest(alterClientQuotasRequest,
748-
fromPrivilegedListener = true, requestHeader = Option(requestHeader))
749-
750-
when(controller.isActive).thenReturn(true)
751-
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
752-
anyLong)).thenReturn(0)
753-
kafkaApis = createKafkaApis(authorizer = Some(authorizer))
754-
kafkaApis.handleAlterClientQuotasRequest(request)
755-
756-
val capturedResponse = verifyNoThrottling[AlterClientQuotasResponse](request)
757-
verifyAlterClientQuotaResult(capturedResponse, Map(quotaEntity -> Errors.CLUSTER_AUTHORIZATION_FAILED))
758-
759-
verify(authorizer).authorize(any(), any())
760-
verify(clientRequestQuotaManager).maybeRecordAndGetThrottleTimeMs(any(), anyLong)
761-
}
762-
763-
private def verifyAlterClientQuotaResult(response: AlterClientQuotasResponse,
764-
expected: Map[ClientQuotaEntity, Errors]): Unit = {
765-
val futures = expected.keys.map(quotaEntity => quotaEntity -> new KafkaFutureImpl[Void]()).toMap
766-
response.complete(futures.asJava)
767-
futures.foreach {
768-
case (entity, future) =>
769-
future.whenComplete((_, thrown) =>
770-
assertEquals(thrown, expected(entity).exception())
771-
).isDone
772-
}
773-
}
774-
775732
@ParameterizedTest
776733
@CsvSource(value = Array("0,1500", "1500,0", "3000,1000"))
777734
def testKRaftControllerThrottleTimeEnforced(
@@ -10027,13 +9984,6 @@ class KafkaApisTest extends Logging {
100279984
setResourceType(BROKER_LOGGER.id()))),
100289985
response.data())
100299986
}
10030-
10031-
@Test
10032-
def testRaftShouldAlwaysForwardAlterClientQuotasRequest(): Unit = {
10033-
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
10034-
kafkaApis = createKafkaApis(raftSupport = true)
10035-
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterClientQuotasRequest)
10036-
}
100379987

100389988
@Test
100399989
def testConsumerGroupHeartbeatReturnsUnsupportedVersion(): Unit = {

0 commit comments

Comments
 (0)