Skip to content

Comments

[FLINK-38096] AsyncSinkWriter may hang when configured with a custom rate limiting strategy#27631

Open
HundalTaran wants to merge 3 commits intoapache:masterfrom
HundalTaran:master
Open

[FLINK-38096] AsyncSinkWriter may hang when configured with a custom rate limiting strategy#27631
HundalTaran wants to merge 3 commits intoapache:masterfrom
HundalTaran:master

Conversation

@HundalTaran
Copy link

What is the purpose of the change

In Async Writer, records are getting stuck on executing rate limiting. So, need to make sure that write should wait only if records are present in flight.

Brief change log

Changed in method of AysncWriter.java class from mailboxExecutor.yield(); to yieldIfThereExistsInFlightRequests();
Also. added junit to cehck this yielding.
An example is provided with bucket4j to test rate limiting scenario.

Verifying this change

Existing junits are getting passed.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)- no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no) - no
  • The serializers: (yes / no / don't know) - no
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know) - no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - no
  • The S3 file system connector: (yes / no / don't know) - no

Documentation

  • Does this pull request introduce a new feature? (yes / no) -no
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)- not applicable

HundalTaran and others added 3 commits February 13, 2026 15:22
…rate limiting strategy

AsyncSinkWriter may hang when using a custom rate-limiting strategy that may block new requests when no others are in progress. This issue occurs when implementing rate limits, such as restricting API requests per interval
@flinkbot
Copy link
Collaborator

flinkbot commented Feb 18, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@davidradl
Copy link
Contributor

Unapproved licenses

   flink-examples/base-connector-examples/src/main/java/org/apache/flink/TokenBucketRateLimitingStrategy.java
  flink-examples/base-connector-examples/src/main/java/org/apache/flink/TokenBucketProvider.java
  flink-examples/base-connector-examples/src/main/java/org/apache/flink/AsyncSinkHangDemo.java
  flink-examples/base-connector-examples/src/main/java/org/apache/flink/DummyAsyncSink.java

@HundalTaran
Copy link
Author

Unapproved licenses

   flink-examples/base-connector-examples/src/main/java/org/apache/flink/TokenBucketRateLimitingStrategy.java
  flink-examples/base-connector-examples/src/main/java/org/apache/flink/TokenBucketProvider.java
  flink-examples/base-connector-examples/src/main/java/org/apache/flink/AsyncSinkHangDemo.java
  flink-examples/base-connector-examples/src/main/java/org/apache/flink/DummyAsyncSink.java

Thanks for the comments, will work on that

private void flush() throws InterruptedException {
RequestInfo requestInfo = createRequestInfo();
while (rateLimitingStrategy.shouldBlock(requestInfo)) {
mailboxExecutor.yield();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious, is the yield behaviour an explicit contract documented on the yield method or due to the implementations of yield?

If it is not explicit, we should update the javadoc for the yield method in the interface.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants