Skip to content

KAFKA-18373: Remove ZkMetadataCache #18553

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import kafka.controller.StateChangeLogger
import kafka.log._
import kafka.log.remote.RemoteLogManager
import kafka.server._
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
import kafka.server.metadata.KRaftMetadataCache
import kafka.server.share.DelayedShareFetch
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
Expand Down Expand Up @@ -1086,11 +1086,6 @@ class Partition(val topicPartition: TopicPartition,
!kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId) &&
isBrokerEpochIsrEligible(storedBrokerEpoch, cachedBrokerEpoch)

// In ZK mode, we just ensure the broker is alive. Although we do not check for shutting down brokers here,
// the controller will block them from being added to ISR.
case zkMetadataCache: ZkMetadataCache =>
zkMetadataCache.hasAliveBroker(followerReplicaId)

case _ => true
}
}
Expand Down
10 changes: 1 addition & 9 deletions core/src/main/scala/kafka/server/MetadataCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

package kafka.server

import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
import kafka.server.metadata.KRaftMetadataCache
import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
import org.apache.kafka.server.BrokerFeatures
import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion}

import java.util
Expand Down Expand Up @@ -117,13 +116,6 @@ trait MetadataCache {
}

object MetadataCache {
def zkMetadataCache(brokerId: Int,
metadataVersion: MetadataVersion,
brokerFeatures: BrokerFeatures = BrokerFeatures.createEmpty())
: ZkMetadataCache = {
new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures)
}

def kRaftMetadataCache(
brokerId: Int,
kraftVersionSupplier: Supplier[KRaftVersion]
Expand Down
691 changes: 1 addition & 690 deletions core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala

Large diffs are not rendered by default.

43 changes: 12 additions & 31 deletions core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@
package kafka.cluster

import kafka.log.UnifiedLog
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
import kafka.server.metadata.KRaftMetadataCache
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.LogOffsetMetadata
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue}
import org.junit.jupiter.api.{BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.mockito.Mockito.{mock, when}

object ReplicaTest {
Expand Down Expand Up @@ -320,16 +318,10 @@ class ReplicaTest {
assertFalse(isCaughtUp(leaderEndOffset = 16L))
}

@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testFenceStaleUpdates(isKraft: Boolean): Unit = {
val metadataCache = if (isKraft) {
val kRaftMetadataCache = mock(classOf[KRaftMetadataCache])
when(kRaftMetadataCache.getAliveBrokerEpoch(BrokerId)).thenReturn(Option(2L))
kRaftMetadataCache
} else {
mock(classOf[ZkMetadataCache])
}
@Test
def testFenceStaleUpdates(): Unit = {
val metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.getAliveBrokerEpoch(BrokerId)).thenReturn(Option(2L))

val replica = new Replica(BrokerId, Partition, metadataCache)
replica.updateFetchStateOrThrow(
Expand All @@ -339,24 +331,13 @@ class ReplicaTest {
leaderEndOffset = 10L,
brokerEpoch = 2L
)
if (isKraft) {
assertThrows(classOf[NotLeaderOrFollowerException], () => replica.updateFetchStateOrThrow(
followerFetchOffsetMetadata = new LogOffsetMetadata(5L),
followerStartOffset = 2L,
followerFetchTimeMs = 3,
leaderEndOffset = 10L,
brokerEpoch = 1L
))
} else {
// No exception to expect under ZK mode.
replica.updateFetchStateOrThrow(
followerFetchOffsetMetadata = new LogOffsetMetadata(5L),
followerStartOffset = 2L,
followerFetchTimeMs = 3,
leaderEndOffset = 10L,
brokerEpoch = 1L
)
}
assertThrows(classOf[NotLeaderOrFollowerException], () => replica.updateFetchStateOrThrow(
followerFetchOffsetMetadata = new LogOffsetMetadata(5L),
followerStartOffset = 2L,
followerFetchTimeMs = 3,
leaderEndOffset = 10L,
brokerEpoch = 1L
))
replica.updateFetchStateOrThrow(
followerFetchOffsetMetadata = new LogOffsetMetadata(5L),
followerStartOffset = 2L,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
*/
package kafka.server

import kafka.server.metadata.ZkMetadataCache
import org.apache.kafka.clients.NodeApiVersions
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.server.BrokerFeatures
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.KRaftVersion
import org.junit.jupiter.api.{Disabled, Test}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
Expand All @@ -32,7 +31,7 @@ import scala.jdk.CollectionConverters._

class ApiVersionManagerTest {
private val brokerFeatures = BrokerFeatures.createDefault(true)
private val metadataCache = new ZkMetadataCache(1, MetadataVersion.latestTesting(), brokerFeatures)
private val metadataCache = MetadataCache.kRaftMetadataCache(1, () => KRaftVersion.LATEST_PRODUCTION)

@ParameterizedTest
@EnumSource(classOf[ListenerType])
Expand Down
109 changes: 0 additions & 109 deletions core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala

This file was deleted.

1 change: 0 additions & 1 deletion core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,6 @@ class KafkaApisTest extends Logging {

val capturedResponse = verifyNoThrottling[AbstractResponse](request)
assertEquals(expectedResponse.data, capturedResponse.data)

}

private def authorizeResource(authorizer: Authorizer,
Expand Down
Loading
Loading