Skip to content

Commit 88662fc

Browse files
TaiJuWupranavt84
authored andcommitted
KAFKA-18399 Remove ZooKeeper from KafkaApis (8/N): ELECT_LEADERS , ALTER_PARTITION, UPDATE_FEATURES (apache#18453)
Reviewers: Chia-Ping Tsai <[email protected]>
1 parent b15cce9 commit 88662fc

File tree

2 files changed

+0
-145
lines changed

2 files changed

+0
-145
lines changed

Diff for: core/src/main/scala/kafka/server/KafkaApis.scala

-118
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import org.apache.kafka.common.internals.{FatalExitError, Topic}
3737
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartitionsToTxnResult, AddPartitionsToTxnResultCollection}
3838
import org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse
3939
import org.apache.kafka.common.message.DeleteRecordsResponseData.{DeleteRecordsPartitionResult, DeleteRecordsTopicResult}
40-
import org.apache.kafka.common.message.ElectLeadersResponseData.{PartitionResult, ReplicaElectionResult}
4140
import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource
4241
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition
4342
import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse}
@@ -217,7 +216,6 @@ class KafkaApis(val requestChannel: RequestChannel,
217216
case ApiKeys.ALTER_CLIENT_QUOTAS => forwardToController(request)
218217
case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => handleDescribeUserScramCredentialsRequest(request)
219218
case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => forwardToController(request)
220-
case ApiKeys.ALTER_PARTITION => handleAlterPartitionRequest(request)
221219
case ApiKeys.UPDATE_FEATURES => forwardToController(request)
222220
case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
223221
case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request)
@@ -2400,77 +2398,6 @@ class KafkaApis(val requestChannel: RequestChannel,
24002398
true
24012399
}
24022400

2403-
def handleElectLeaders(request: RequestChannel.Request): Unit = {
2404-
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
2405-
val electionRequest = request.body[ElectLeadersRequest]
2406-
2407-
def sendResponseCallback(
2408-
error: ApiError
2409-
)(
2410-
results: Map[TopicPartition, ApiError]
2411-
): Unit = {
2412-
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
2413-
val adjustedResults = if (electionRequest.data.topicPartitions == null) {
2414-
/* When performing elections across all of the partitions we should only return
2415-
* partitions for which there was an election or resulted in an error. In other
2416-
* words, partitions that didn't need election because they ready have the correct
2417-
* leader are not returned to the client.
2418-
*/
2419-
results.filter { case (_, error) =>
2420-
error.error != Errors.ELECTION_NOT_NEEDED
2421-
}
2422-
} else results
2423-
2424-
val electionResults = new util.ArrayList[ReplicaElectionResult]()
2425-
adjustedResults
2426-
.groupBy { case (tp, _) => tp.topic }
2427-
.foreachEntry { (topic, ps) =>
2428-
val electionResult = new ReplicaElectionResult()
2429-
2430-
electionResult.setTopic(topic)
2431-
ps.foreachEntry { (topicPartition, error) =>
2432-
val partitionResult = new PartitionResult()
2433-
partitionResult.setPartitionId(topicPartition.partition)
2434-
partitionResult.setErrorCode(error.error.code)
2435-
partitionResult.setErrorMessage(error.message)
2436-
electionResult.partitionResult.add(partitionResult)
2437-
}
2438-
2439-
electionResults.add(electionResult)
2440-
}
2441-
2442-
new ElectLeadersResponse(
2443-
requestThrottleMs,
2444-
error.error.code,
2445-
electionResults,
2446-
electionRequest.version
2447-
)
2448-
})
2449-
}
2450-
2451-
if (!authHelper.authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) {
2452-
val error = new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null)
2453-
val partitionErrors: Map[TopicPartition, ApiError] =
2454-
electionRequest.topicPartitions.asScala.iterator.map(partition => partition -> error).toMap
2455-
2456-
sendResponseCallback(error)(partitionErrors)
2457-
} else {
2458-
val partitions = if (electionRequest.data.topicPartitions == null) {
2459-
metadataCache.getAllTopics().flatMap(metadataCache.getTopicPartitions)
2460-
} else {
2461-
electionRequest.topicPartitions.asScala
2462-
}
2463-
2464-
replicaManager.electLeaders(
2465-
zkSupport.controller,
2466-
partitions,
2467-
electionRequest.electionType,
2468-
sendResponseCallback(ApiError.NONE),
2469-
electionRequest.data.timeoutMs
2470-
)
2471-
}
2472-
}
2473-
24742401
def handleOffsetDeleteRequest(
24752402
request: RequestChannel.Request,
24762403
requestLocal: RequestLocal
@@ -2629,51 +2556,6 @@ class KafkaApis(val requestChannel: RequestChannel,
26292556
}
26302557
}
26312558

2632-
def handleAlterPartitionRequest(request: RequestChannel.Request): Unit = {
2633-
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
2634-
val alterPartitionRequest = request.body[AlterPartitionRequest]
2635-
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
2636-
2637-
if (!zkSupport.controller.isActive)
2638-
requestHelper.sendResponseExemptThrottle(request, alterPartitionRequest.getErrorResponse(
2639-
AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.NOT_CONTROLLER.exception))
2640-
else
2641-
zkSupport.controller.alterPartitions(alterPartitionRequest.data, request.context.apiVersion, alterPartitionResp =>
2642-
requestHelper.sendResponseExemptThrottle(request, new AlterPartitionResponse(alterPartitionResp)))
2643-
}
2644-
2645-
def handleUpdateFeatures(request: RequestChannel.Request): Unit = {
2646-
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
2647-
val updateFeaturesRequest = request.body[UpdateFeaturesRequest]
2648-
2649-
def sendResponseCallback(errors: Either[ApiError, Map[String, ApiError]]): Unit = {
2650-
def createResponse(throttleTimeMs: Int): UpdateFeaturesResponse = {
2651-
errors match {
2652-
case Left(topLevelError) =>
2653-
UpdateFeaturesResponse.createWithErrors(
2654-
topLevelError,
2655-
Collections.emptySet(),
2656-
throttleTimeMs)
2657-
case Right(featureUpdateErrors) =>
2658-
// This response is not correct, but since this is ZK specific code it will be removed in 4.0
2659-
UpdateFeaturesResponse.createWithErrors(
2660-
ApiError.NONE,
2661-
featureUpdateErrors.asJava.keySet(),
2662-
throttleTimeMs)
2663-
}
2664-
}
2665-
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => createResponse(requestThrottleMs))
2666-
}
2667-
2668-
if (!authHelper.authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) {
2669-
sendResponseCallback(Left(new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED)))
2670-
} else if (!zkSupport.controller.isActive) {
2671-
sendResponseCallback(Left(new ApiError(Errors.NOT_CONTROLLER)))
2672-
} else {
2673-
zkSupport.controller.updateFeatures(updateFeaturesRequest, sendResponseCallback)
2674-
}
2675-
}
2676-
26772559
def handleDescribeCluster(request: RequestChannel.Request): Unit = {
26782560
val response = authHelper.computeDescribeClusterResponse(
26792561
request,

Diff for: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

-27
Original file line numberDiff line numberDiff line change
@@ -9932,25 +9932,12 @@ class KafkaApisTest extends Logging {
99329932
request
99339933
}
99349934

9935-
private def verifyShouldNeverHandleErrorMessage(handler: RequestChannel.Request => Unit): Unit = {
9936-
val request = createMockRequest()
9937-
val e = assertThrows(classOf[UnsupportedVersionException], () => handler(request))
9938-
assertEquals(KafkaApis.shouldNeverReceive(request).getMessage, e.getMessage)
9939-
}
9940-
99419935
private def verifyShouldAlwaysForwardErrorMessage(handler: RequestChannel.Request => Unit): Unit = {
99429936
val request = createMockRequest()
99439937
val e = assertThrows(classOf[UnsupportedVersionException], () => handler(request))
99449938
assertEquals(KafkaApis.shouldAlwaysForward(request).getMessage, e.getMessage)
99459939
}
99469940

9947-
@Test
9948-
def testRaftShouldNeverHandleAlterPartitionRequest(): Unit = {
9949-
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
9950-
kafkaApis = createKafkaApis(raftSupport = true)
9951-
verifyShouldNeverHandleErrorMessage(kafkaApis.handleAlterPartitionRequest)
9952-
}
9953-
99549941
@Test
99559942
def testRaftShouldAlwaysForwardCreateAcls(): Unit = {
99569943
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
@@ -10048,20 +10035,6 @@ class KafkaApisTest extends Logging {
1004810035
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterClientQuotasRequest)
1004910036
}
1005010037

10051-
@Test
10052-
def testRaftShouldAlwaysForwardUpdateFeatures(): Unit = {
10053-
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
10054-
kafkaApis = createKafkaApis(raftSupport = true)
10055-
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleUpdateFeatures)
10056-
}
10057-
10058-
@Test
10059-
def testRaftShouldAlwaysForwardElectLeaders(): Unit = {
10060-
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
10061-
kafkaApis = createKafkaApis(raftSupport = true)
10062-
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleElectLeaders)
10063-
}
10064-
1006510038
@Test
1006610039
def testConsumerGroupHeartbeatReturnsUnsupportedVersion(): Unit = {
1006710040
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData().setGroupId("group")

0 commit comments

Comments
 (0)