Skip to content

KAFKA-18399 Remove ZooKeeper from KafkaApis (9/N): ALTER_CLIENT_QUOTAS and ALLOCATE_PRODUCER_IDS #18465

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 1 addition & 44 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2579,37 +2579,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}

def handleAlterClientQuotasRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
val alterClientQuotasRequest = request.body[AlterClientQuotasRequest]

if (authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)) {
val result = zkSupport.adminManager.alterClientQuotas(alterClientQuotasRequest.entries.asScala,
alterClientQuotasRequest.validateOnly)

val entriesData = result.iterator.map { case (quotaEntity, apiError) =>
val entityData = quotaEntity.entries.asScala.iterator.map { case (key, value) =>
new AlterClientQuotasResponseData.EntityData()
.setEntityType(key)
.setEntityName(value)
}.toBuffer

new AlterClientQuotasResponseData.EntryData()
.setErrorCode(apiError.error.code)
.setErrorMessage(apiError.message)
.setEntity(entityData.asJava)
}.toBuffer

requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new AlterClientQuotasResponse(new AlterClientQuotasResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setEntries(entriesData.asJava)))
} else {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
alterClientQuotasRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
}
}

def handleDescribeUserScramCredentialsRequest(request: RequestChannel.Request): Unit = {
val describeUserScramCredentialsRequest = request.body[DescribeUserScramCredentialsRequest]

Expand Down Expand Up @@ -2813,19 +2782,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}

def handleAllocateProducerIdsRequest(request: RequestChannel.Request): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove this handle also

val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)

val allocateProducerIdsRequest = request.body[AllocateProducerIdsRequest]

if (!zkSupport.controller.isActive)
requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs =>
allocateProducerIdsRequest.getErrorResponse(throttleTimeMs, Errors.NOT_CONTROLLER.exception))
else
zkSupport.controller.allocateProducerIds(allocateProducerIdsRequest.data, producerIdsResponse =>
requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs =>
new AllocateProducerIdsResponse(producerIdsResponse.setThrottleTimeMs(throttleTimeMs)))
)
throw KafkaApis.shouldNeverReceive(request)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we shouldn't handle it like this - this is one of the request types that is only handled by the controller listener (now that zk is gone):

"listeners": ["zkBroker", "controller"]

So, we should have generic logic that handles that automatically before it gets here. We can probably just delete this whole method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for handleAlterClientQuotasRequest.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we should have generic logic that handles that automatically before it gets here. We can probably just delete this whole method.

totally agree that we should have a generic logic for those zk-related handler. The socket server can reject the zk-related requests automatically, and we can rewrite maybeForwardToController to throw KafkaApis.shouldAlwaysForward automatically. With those change, those zk-related handlers can be deleted directly.

In short, the line case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request) can be removed.

Copy link
Member

@ijuma ijuma Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly. We can also remove the zk listener from the json files and the socket server will automatically do the right thing, I believe (i.e. it already has logic for that).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also remove the zk listener from the json files

open https://issues.apache.org/jira/browse/KAFKA-18474

}

private def groupVersion(): GroupVersion = {
Expand Down
52 changes: 1 addition & 51 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER}
import org.apache.kafka.common.errors.{ClusterAuthorizationException, UnsupportedVersionException}
import org.apache.kafka.common.internals.{KafkaFutureImpl, Topic}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection}
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResult
Expand All @@ -61,7 +61,6 @@ import org.apache.kafka.common.message._
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.{ApiKeys, Errors, MessageUtil}
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType
import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
Expand Down Expand Up @@ -730,48 +729,6 @@ class KafkaApisTest extends Logging {
assertEquals(expectedResults, responseMap)
}

@Test
def testAlterClientQuotasWithAuthorizer(): Unit = {
val authorizer: Authorizer = mock(classOf[Authorizer])

authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER,
Resource.CLUSTER_NAME, AuthorizationResult.DENIED)

val quotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
val quotas = Seq(new ClientQuotaAlteration(quotaEntity, Seq.empty.asJavaCollection))

val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion, clientId, 0)

val alterClientQuotasRequest = new AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
.build(requestHeader.apiVersion)
val request = buildRequest(alterClientQuotasRequest,
fromPrivilegedListener = true, requestHeader = Option(requestHeader))

when(controller.isActive).thenReturn(true)
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
anyLong)).thenReturn(0)
kafkaApis = createKafkaApis(authorizer = Some(authorizer))
kafkaApis.handleAlterClientQuotasRequest(request)

val capturedResponse = verifyNoThrottling[AlterClientQuotasResponse](request)
verifyAlterClientQuotaResult(capturedResponse, Map(quotaEntity -> Errors.CLUSTER_AUTHORIZATION_FAILED))

verify(authorizer).authorize(any(), any())
verify(clientRequestQuotaManager).maybeRecordAndGetThrottleTimeMs(any(), anyLong)
}

private def verifyAlterClientQuotaResult(response: AlterClientQuotasResponse,
expected: Map[ClientQuotaEntity, Errors]): Unit = {
val futures = expected.keys.map(quotaEntity => quotaEntity -> new KafkaFutureImpl[Void]()).toMap
response.complete(futures.asJava)
futures.foreach {
case (entity, future) =>
future.whenComplete((_, thrown) =>
assertEquals(thrown, expected(entity).exception())
).isDone
}
}

@ParameterizedTest
@CsvSource(value = Array("0,1500", "1500,0", "3000,1000"))
def testKRaftControllerThrottleTimeEnforced(
Expand Down Expand Up @@ -10040,13 +9997,6 @@ class KafkaApisTest extends Logging {
setResourceType(BROKER_LOGGER.id()))),
response.data())
}

@Test
def testRaftShouldAlwaysForwardAlterClientQuotasRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterClientQuotasRequest)
}

@Test
def testRaftShouldAlwaysForwardUpdateFeatures(): Unit = {
Expand Down
Loading