Skip to content

[FLINK-39954][tests] Relax SinkV2 scaling commit assertion#28507

Open
qiuyanjun888 wants to merge 2 commits into
apache:masterfrom
qiuyanjun888:fix/flink-39954-sinkv2-commit-assertion
Open

[FLINK-39954][tests] Relax SinkV2 scaling commit assertion#28507
qiuyanjun888 wants to merge 2 commits into
apache:masterfrom
qiuyanjun888:fix/flink-39954-sinkv2-commit-assertion

Conversation

@qiuyanjun888

Copy link
Copy Markdown

What is the purpose of the change

FLINK-39954 reports that SinkV2ITCase.writerAndCommitterExecuteInStreamingModeWithScaling over-specifies the exact number of commit attempts under unaligned checkpoints. The Committer contract requires idempotent commit handling and allows repeated commit attempts after recovery, so the test should not fail solely because an expected committable is retried more often.

Summary

  • Relax the scaling test assertion to require all expected committables while allowing additional retried committables that are already part of the expected set.
  • Add a focused regression test for the assertion behavior with an additional retried expected committable.
  • Keep the change limited to SinkV2ITCase; no production code or runtime semantics are changed.

Root Cause

The scaling IT used containsExactlyInAnyOrderElementsOf(duplicate(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE)), which asserts an exact commit multiplicity. Under unaligned checkpoints and recovery, a boundary committable can be retried in addition to the expected baseline commits. That is valid for an idempotent committer, but the test committer records every commit request and the exact-multiplicity assertion treats the extra retry as unexpected.

Brief change log

  • Replaced the scaling test's exact-multiplicity assertion with a helper that checks the expected committables as a lower-bound multiset.
  • The helper still rejects unexpected records after the expected multiplicities are matched.
  • Added committedRecordAssertionAllowsAdditionalRetriedCommittables to cover the previously over-strict assertion case.

Changes

  • SinkV2ITCase now verifies that all expected committed records are present with at least the required multiplicity.
  • Additional committed records are accepted only if they are records that belong to the expected set.

Verifying this change

This change added tests and can be verified as follows:

  • RED before the assertion fix:
    • ./mvnw -pl flink-tests -Dtest=SinkV2ITCase#committedRecordAssertionAllowsAdditionalRetriedCommittables -Dsurefire.failIfNoSpecifiedTests=false -DfailIfNoTests=false -Dcheckstyle.skip -Drat.skip=true -Dspotless.check.skip=true -Djapicmp.skip=true test
  • GREEN focused regression test:
    • ./mvnw -pl flink-tests -Dtest=SinkV2ITCase#committedRecordAssertionAllowsAdditionalRetriedCommittables -Dsurefire.failIfNoSpecifiedTests=false -DfailIfNoTests=false -Dcheckstyle.skip -Drat.skip=true -Dspotless.check.skip=true -Djapicmp.skip=true test
  • GREEN focused IT class:
    • ./mvnw -pl flink-tests -Dtest=SinkV2ITCase -Dsurefire.failIfNoSpecifiedTests=false -DfailIfNoTests=false -Dcheckstyle.skip -Drat.skip=true -Dspotless.check.skip=true -Djapicmp.skip=true test
  • Formatting/style checks:
    • ./mvnw -pl flink-tests -DskipTests -DskipITs -Dcheckstyle.skip -Drat.skip=true -Djapicmp.skip=true spotless:check
    • ./mvnw -pl flink-tests -DskipTests -DskipITs -Drat.skip=true -Dspotless.check.skip=true -Djapicmp.skip=true checkstyle:check

Validation

  • SinkV2ITCase#committedRecordAssertionAllowsAdditionalRetriedCommittables: PASS after the fix.
  • SinkV2ITCase: PASS.
  • spotless:check for flink-tests: PASS.
  • checkstyle:check for flink-tests: PASS.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no (test assertion only)
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

Was generative AI tooling used to co-author this PR?
  • Yes, Hermes Agent with OpenAI GPT-5.5 was used to help prepare this test-only change.

Generated-by: Hermes Agent (OpenAI GPT-5.5)

@flinkbot

flinkbot commented Jun 22, 2026

Copy link
Copy Markdown
Collaborator

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

assertThat(remainingActual).contains(expectedRecord);
remainingActual.remove(expectedRecord);
}
assertThat(remainingActual).allSatisfy(record -> assertThat(expected).contains(record));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assertThat(remainingActual).allSatisfy(record -> assertThat(expected).contains(record));
assertThat(remainingActual).isSubsetOf(expected);

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed by applying the suggested isSubsetOf(expected) assertion for the remaining actual records.

Verification:

  • git diff --check
  • JAVA_HOME=/opt/data/jdks/jdk-17.0.19+10 ./mvnw -pl flink-tests -Dtest=SinkV2ITCase#committedRecordAssertionAllowsAdditionalRetriedCommittables test

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants