Skip to content

Commit b6c67f6

Browse files
committed
KAFKA-18373: Remove ZkMetadataCache
Signed-off-by: PoAn Yang <[email protected]>
1 parent d96b682 commit b6c67f6

12 files changed

+60
-1316
lines changed

Diff for: core/src/main/scala/kafka/cluster/Partition.scala

+1-6
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import kafka.controller.{KafkaController, StateChangeLogger}
2323
import kafka.log._
2424
import kafka.log.remote.RemoteLogManager
2525
import kafka.server._
26-
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
26+
import kafka.server.metadata.KRaftMetadataCache
2727
import kafka.server.share.DelayedShareFetch
2828
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
2929
import kafka.utils._
@@ -1100,11 +1100,6 @@ class Partition(val topicPartition: TopicPartition,
11001100
!kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId) &&
11011101
isBrokerEpochIsrEligible(storedBrokerEpoch, cachedBrokerEpoch)
11021102

1103-
// In ZK mode, we just ensure the broker is alive. Although we do not check for shutting down brokers here,
1104-
// the controller will block them from being added to ISR.
1105-
case zkMetadataCache: ZkMetadataCache =>
1106-
zkMetadataCache.hasAliveBroker(followerReplicaId)
1107-
11081103
case _ => true
11091104
}
11101105
}

Diff for: core/src/main/scala/kafka/server/KafkaApis.scala

-25
Original file line numberDiff line numberDiff line change
@@ -2369,31 +2369,6 @@ class KafkaApis(val requestChannel: RequestChannel,
23692369
describeClientQuotasRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
23702370
} else {
23712371
metadataSupport match {
2372-
case ZkSupport(adminManager, controller, zkClient, forwardingManager, metadataCache, _) =>
2373-
val result = adminManager.describeClientQuotas(describeClientQuotasRequest.filter)
2374-
2375-
val entriesData = result.iterator.map { case (quotaEntity, quotaValues) =>
2376-
val entityData = quotaEntity.entries.asScala.iterator.map { case (entityType, entityName) =>
2377-
new DescribeClientQuotasResponseData.EntityData()
2378-
.setEntityType(entityType)
2379-
.setEntityName(entityName)
2380-
}.toBuffer
2381-
2382-
val valueData = quotaValues.iterator.map { case (key, value) =>
2383-
new DescribeClientQuotasResponseData.ValueData()
2384-
.setKey(key)
2385-
.setValue(value)
2386-
}.toBuffer
2387-
2388-
new DescribeClientQuotasResponseData.EntryData()
2389-
.setEntity(entityData.asJava)
2390-
.setValues(valueData.asJava)
2391-
}.toBuffer
2392-
2393-
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
2394-
new DescribeClientQuotasResponse(new DescribeClientQuotasResponseData()
2395-
.setThrottleTimeMs(requestThrottleMs)
2396-
.setEntries(entriesData.asJava)))
23972372
case RaftSupport(_, metadataCache) =>
23982373
val result = metadataCache.describeClientQuotas(describeClientQuotasRequest.data())
23992374
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {

Diff for: core/src/main/scala/kafka/server/MetadataCache.scala

+1-9
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@
1717

1818
package kafka.server
1919

20-
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
20+
import kafka.server.metadata.KRaftMetadataCache
2121
import org.apache.kafka.admin.BrokerMetadata
2222
import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData}
2323
import org.apache.kafka.common.network.ListenerName
2424
import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
25-
import org.apache.kafka.server.BrokerFeatures
2625
import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion}
2726

2827
import java.util
@@ -117,13 +116,6 @@ trait MetadataCache {
117116
}
118117

119118
object MetadataCache {
120-
def zkMetadataCache(brokerId: Int,
121-
metadataVersion: MetadataVersion,
122-
brokerFeatures: BrokerFeatures = BrokerFeatures.createEmpty())
123-
: ZkMetadataCache = {
124-
new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures)
125-
}
126-
127119
def kRaftMetadataCache(
128120
brokerId: Int,
129121
kraftVersionSupplier: Supplier[KRaftVersion]

Diff for: core/src/main/scala/kafka/server/MetadataSupport.scala

+1-36
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,8 @@
1717

1818
package kafka.server
1919

20-
import kafka.controller.KafkaController
2120
import kafka.network.RequestChannel
22-
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
23-
import kafka.zk.KafkaZkClient
21+
import kafka.server.metadata.KRaftMetadataCache
2422
import org.apache.kafka.common.requests.AbstractResponse
2523

2624
sealed trait MetadataSupport {
@@ -30,15 +28,6 @@ sealed trait MetadataSupport {
3028
*/
3129
val forwardingManager: Option[ForwardingManager]
3230

33-
/**
34-
* Return this instance downcast for use with ZooKeeper
35-
*
36-
* @param createException function to create an exception to throw
37-
* @return this instance downcast for use with ZooKeeper
38-
* @throws Exception if this instance is not for ZooKeeper
39-
*/
40-
def requireZkOrThrow(createException: => Exception): ZkSupport
41-
4231
/**
4332
* Return this instance downcast for use with Raft
4433
*
@@ -66,34 +55,10 @@ sealed trait MetadataSupport {
6655
}
6756
}
6857

69-
case class ZkSupport(adminManager: ZkAdminManager,
70-
controller: KafkaController,
71-
zkClient: KafkaZkClient,
72-
forwardingManager: Option[ForwardingManager],
73-
metadataCache: ZkMetadataCache,
74-
brokerEpochManager: ZkBrokerEpochManager) extends MetadataSupport {
75-
override def requireZkOrThrow(createException: => Exception): ZkSupport = this
76-
77-
override def requireRaftOrThrow(createException: => Exception): RaftSupport = throw createException
78-
79-
override def ensureConsistentWith(config: KafkaConfig): Unit = {
80-
if (!config.requiresZookeeper) {
81-
throw new IllegalStateException("Config specifies Raft but metadata support instance is for ZooKeeper")
82-
}
83-
}
84-
85-
override def canForward(): Boolean = forwardingManager.isDefined && (!controller.isActive)
86-
87-
def isBrokerEpochStale(brokerEpochInRequest: Long, isKRaftControllerRequest: Boolean): Boolean = {
88-
brokerEpochManager.isBrokerEpochStale(brokerEpochInRequest, isKRaftControllerRequest)
89-
}
90-
}
91-
9258
case class RaftSupport(fwdMgr: ForwardingManager,
9359
metadataCache: KRaftMetadataCache)
9460
extends MetadataSupport {
9561
override val forwardingManager: Option[ForwardingManager] = Some(fwdMgr)
96-
override def requireZkOrThrow(createException: => Exception): ZkSupport = throw createException
9762
override def requireRaftOrThrow(createException: => Exception): RaftSupport = this
9863

9964
override def ensureConsistentWith(config: KafkaConfig): Unit = {

0 commit comments

Comments
 (0)