-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18465: Remove MetadataVersions older than 3.0-IV1 #18468
KAFKA-18465: Remove MetadataVersions older than 3.0-IV1 #18468
Conversation
1097696
to
c557e0b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ijuma nice cleanup!
if (version < 3) | ||
assertEquals(OffsetsForLeaderEpochRequest.DEBUGGING_REPLICA_ID, parsed.replicaId()); | ||
else | ||
assertEquals(replicaId, parsed.replicaId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we keep the test for version=4 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The value of this test is a bit low, but I added it back.
@@ -731,7 +731,6 @@ private void cancelClassicGroupSizeCounter() { | |||
public void onLoaded(MetadataImage newImage) { | |||
MetadataDelta emptyDelta = new MetadataDelta(newImage); | |||
groupMetadataManager.onNewMetadataImage(newImage, emptyDelta); | |||
offsetMetadataManager.onNewMetadataImage(newImage, emptyDelta); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we don't need to clean up GroupMetadataManager.scala
, as 4.0 uses a new coordinator by default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated that class too where it made sense. Did you spot additional areas of clean-up?
@@ -238,7 +238,7 @@ public static CoordinatorRecord newGroupMetadataRecord( | |||
), | |||
new ApiMessageAndVersion( | |||
value, | |||
metadataVersion.groupMetadataValueVersion() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please remove variable MetadataVersion metadataVersion
as it is unused now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
) { | ||
short version = metadataVersion.offsetCommitValueVersion(offsetAndMetadata.expireTimestampMs.isPresent()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we need to use version=1 in order to keep "expireTimestamp". otherwise, in v3 expireTimestamp
will be ignored in serialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this change is wrong - a few things will be reverted as a result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed this one - still working through a few other things.
Thanks for the quick review @chia7712. I should have left a note, but I'm still working on this PR. I'll take a look at your comments soon. |
Apache Kafka 4.0 will only support KRaft and 3.0 was the first release where KRaft was in a reasonable state (even if not production ready yet). So, we can assume that Apache Kafka 4.0 will only communicate with brokers that are 3.0 or newer. A couple of notes: 1. KRaft was only marked as production-ready in 3.3, so we could go further and set the baseline to 3.3. I think we should have that discussion, but it made sense to start with the non controversial parts. 2. I have not removed the actual IBP definitions - leaving that for a separate PR.
c557e0b
to
7449859
Compare
I removed the relevant IBP constants too and fixed the known test failures. I'll check the full test results tomorrow and will do a file by file pass then too. |
// until a tagged field is introduced or the version is bumped. | ||
else 3.toShort | ||
if (offsetAndMetadata.expireTimestampMs.isPresent) 1.toShort | ||
else maxVersion |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a need to pass it as an argument to the method or could we just hardcode it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed for some tests that verify the ability to read messages encoded with the old format. I'm not sure if there's a way to make sure there are no messages written with the old format by the time the broker is upgraded to 4.0. Given the usual retention rules, it should be fine, but not sure if we can be sure. Thoughts?
// until a tagged field is introduced or the version is bumped. | ||
else 3.toShort | ||
|
||
version: Short = 3): Array[Byte] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See the reply to the other comment.
There was a single failing test - pushed a simple fix. |
} | ||
|
||
@Test | ||
def testSerdeOffsetCommitValueWithNoneExpireTimestamp(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This basically duplicates testSerdeOffsetCommitValue
and hence I removed it.
…emove-metadata-version-methods-for-versions-older-than-3.0 * apache-github/trunk: KAFKA-18340: Change Dockerfile to use log4j2 yaml instead log4j properties (apache#18378) MINOR: fix flaky RemoteLogManagerTest#testStopPartitionsWithDeletion (apache#18474) KAFKA-18311: Enforcing copartitioned topics (4/N) (apache#18397) KAFKA-18308; Update CoordinatorSerde (apache#18455) KAFKA-18440: Convert AuthorizationException to fatal error in AdminClient (apache#18435) KAFKA-17671: Create better documentation for transactions (apache#17454) KAFKA-18304; Introduce json converter generator (apache#18458) MINOR: Clean up classic group tests (apache#18473) KAFKA-18399 Remove ZooKeeper from KafkaApis (2/N): CONTROLLED_SHUTDOWN and ENVELOPE (apache#18422) MINOR: improve StreamThread periodic processing log (apache#18430)
@@ -390,43 +390,6 @@ class AbstractFetcherThreadTest { | |||
assertEquals(leaderState.highWatermark, replicaState.highWatermark) | |||
} | |||
|
|||
@Test | |||
def testTruncateToHighWatermarkIfLeaderEpochRequestNotSupported(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No longer relevant since leader epoch request is now always supported.
@@ -486,74 +473,6 @@ class AlterPartitionManagerTest { | |||
assertFutureThrows(future2, classOf[UnknownServerException]) | |||
} | |||
|
|||
@ParameterizedTest | |||
@MethodSource(Array("provideMetadataVersions")) | |||
def testPartialTopicIds(metadataVersion: MetadataVersion): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We now always have topic ids.
@@ -1640,15 +1639,6 @@ class KafkaConfigTest { | |||
} | |||
} | |||
|
|||
@Test | |||
def testInvalidInterBrokerProtocolVersionKRaft(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The baseline version now supports kraft.
assertEquals( | ||
ApiKeys.LIST_OFFSETS.latestVersion(true), | ||
testingVersion.listOffsetRequestVersion | ||
) | ||
} | ||
|
||
@Disabled("KAFKA-18370") | ||
@Test | ||
def testFetchLeaderEpochRequestIfLastEpochDefinedForSomePartitions(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lots of disabled tests that rely on the "no truncate on fetch", which is no longer applicable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably better to remove those tests related OffsetForLeaderEpochRequest until we remove the code in AbstractFetcherThread.truncateToEpochEndOffsets()
. This can be done in a separate jira.
We could remove all tests that depend on old IBPs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry @junrao - I didn't fully understand. Are you saying we should not remove these tests just yet or that it's ok to remove these now and remove the rest in a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KAFKA-18370 says that we could remove these tests because 4.0 doesn't support ZK anymore. This seems incorrect since this part of the code is independent of the ZK change. However, it does seem that the OffsetForLeaderEpochRequest is not really exercised because of the truncating on fetch support. I am just not 100% sure. So, it would useful to try to remove that part of the code first and then remove the corresponding tests.
cc @chia7712
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. The reason I removed these tests is:
- They relied on IBP 2.6-IV0 via the
kafkaConfigNoTruncateOnFetch
method. - They were disabled.
- I was under the impression that the truncation approach from IBP 2.7 onwards does not rely on offset for leader epochs requests: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response #9382
So, I wasn't sure how to make the tests work.
Your point is that the legacy code is still there and hence removing the tests is premature. Is that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since these tests were already disabled, is it OK if I keep them deleted and follow up with a separate change that removes the supposed replica fetcher dead code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I filed https://issues.apache.org/jira/browse/KAFKA-18477 to track the removal work in replica fetcher.
).validateMessagesAndAssignOffsets( | ||
PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier() | ||
)); | ||
|
||
assertEquals(metricsRecorder.recordInvalidOffsetCount, 1); | ||
} | ||
|
||
@Test | ||
public void testZStdCompressedWithUnavailableIBPVersion() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
zstd is now always available.
This is ready for review now. |
The tests passed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ijuma : Thanks for the PR. Left a few comments.
*/ | ||
public Builder(AlterPartitionRequestData data, boolean canUseTopicIds) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
v2 request is added in 3.3.0. To be consistent, we need to move the MV baseline to 3.3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The claim is that we're using protocol negotiation to decide the version to use. The following is stated in AlterPartitionManager.buildRequest
.
- While building the request, we don't know which version of the AlterPartition API is
- supported by the controller. The final decision is taken when the AlterPartitionRequest
- is built in the network client based on the advertised api versions of the controller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And the code does seem to use clientChannelBuilder
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation. We can keep the change as it is then.
@@ -150,14 +150,12 @@ class DefaultApiVersionManager( | |||
} | |||
val apiVersions = if (controllerApiVersions.isDefined) { | |||
ApiVersionsResponse.controllerApiVersions( | |||
finalizedFeatures.metadataVersion().highestSupportedRecordVersion, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we leave this param here to support record format change in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can revert this (and related code) if you think it's useful since it's reasonably self-contained. The benefit of keeping vs removing is probably borderline given that there are no concrete plans for adding new record versions (including the specifics regarding the compatibility story). Still, this is a bit unrelated to the main point of this PR (MV/IBP), so I can change it back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree that it's borderline. It's probably ok to keep this change for now.
@@ -1112,21 +1109,13 @@ object GroupMetadataManager { | |||
* | |||
* @param groupMetadata current group metadata | |||
* @param assignment the assignment for the rebalancing generation | |||
* @param metadataVersion the api version | |||
* @param version the version to serialize it with, the default is `3`, the highest supported non-flexible version | |||
* until a tagged version is bumped. The default should always be used outside of tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
until a tagged version is bumped => until a tagged field is introduced or the version is bumped
@@ -1445,7 +1445,7 @@ public void createClassicGroupRecords( | |||
assignments.put(classicGroupMember.memberId(), classicGroupMember.assignment()) | |||
); | |||
|
|||
records.add(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(this, assignments, metadataVersion)); | |||
records.add(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(this, assignments)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
metadataVersion
is no longer used.
@@ -128,7 +127,7 @@ class LogLoaderTest { | |||
brokerTopicStats = new BrokerTopicStats(), | |||
logDirFailureChannel = logDirFailureChannel, | |||
time = time, | |||
keepPartitionMetadataFile = config.usesTopicId, | |||
keepPartitionMetadataFile = true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can probably remove keepPartitionMetadataFile from LogManager. This can be done in a followup PR.
assertEquals( | ||
ApiKeys.LIST_OFFSETS.latestVersion(true), | ||
testingVersion.listOffsetRequestVersion | ||
) | ||
} | ||
|
||
@Disabled("KAFKA-18370") | ||
@Test | ||
def testFetchLeaderEpochRequestIfLastEpochDefinedForSomePartitions(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably better to remove those tests related OffsetForLeaderEpochRequest until we remove the code in AbstractFetcherThread.truncateToEpochEndOffsets()
. This can be done in a separate jira.
We could remove all tests that depend on old IBPs.
@@ -48,7 +46,7 @@ import org.junit.jupiter.params.ParameterizedTest | |||
import org.junit.jupiter.params.provider.ValueSource | |||
import org.mockito.ArgumentCaptor | |||
import org.mockito.ArgumentMatchers.{any, anyBoolean, anyLong} | |||
import org.mockito.Mockito.{mock, never, times, verify, when} | |||
import org.mockito.Mockito.{mock, times, verify, when} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could delete shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20
.
@junrao I have addressed your comments. Let me know if you have any other comments or if this looks good. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ijuma : Thanks for the updated PR. LGTM.
Filed https://issues.apache.org/jira/browse/KAFKA-18479 to track the followup item.
Apache Kafka 4.0 will only support KRaft and 3.0-IV1 is the minimum version supported by KRaft. So, we can assume that Apache Kafka 4.0 will only communicate with brokers that are 3.0-IV1 or newer. Note that KRaft was only marked as production-ready in 3.3, so we could go further and set the baseline to 3.3. I think we should have that discussion, but it made sense to start with the non controversial parts. Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]>, David Jacot <[email protected]>
After apache#18468, interBrokerProtocolVersion is no longer used in GroupMetadataManager, we should remove it.
Apache Kafka 4.0 will only support KRaft and 3.0-IV1 is the minimum version supported by KRaft. So, we can assume that Apache Kafka 4.0 will only communicate with brokers that are 3.0-IV1 or newer. Note that KRaft was only marked as production-ready in 3.3, so we could go further and set the baseline to 3.3. I think we should have that discussion, but it made sense to start with the non controversial parts. Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]>, David Jacot <[email protected]>
Apache Kafka 4.0 will only support KRaft and 3.0-IV1 is the minimum version supported by KRaft. So, we can assume that Apache Kafka 4.0 will only communicate with brokers that are 3.0-IV1 or newer. Note that KRaft was only marked as production-ready in 3.3, so we could go further and set the baseline to 3.3. I think we should have that discussion, but it made sense to start with the non controversial parts. Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]>, David Jacot <[email protected]>
Apache Kafka 4.0 will only support KRaft and 3.0-IV1 is the minimum version supported by KRaft. So, we can assume that Apache Kafka 4.0 will only communicate with brokers that are 3.0-IV1 or newer. Note that KRaft was only marked as production-ready in 3.3, so we could go further and set the baseline to 3.3. I think we should have that discussion, but it made sense to start with the non controversial parts. Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]>, David Jacot <[email protected]>
Apache Kafka 4.0 will only support KRaft and 3.0-IV1 is the minimum version supported by KRaft. So, we can assume that Apache Kafka 4.0 will only communicate with brokers that are 3.0-IV1 or newer.
Note that KRaft was only marked as production-ready in 3.3, so we could go further and set the baseline to 3.3. I think we should have that discussion, but it made sense to start with the non controversial parts.
Committer Checklist (excluded from commit message)