[Issue 1448][Producer] Fix producer stuck in Send() function when message chunk number is larger than MaxPendingMessages#1462
Conversation
…pending queue size
c5cb6a9 to
93cb0b6
Compare
There was a problem hiding this comment.
Pull request overview
This PR fixes a producer deadlock when sending a chunked message whose chunk count exceeds MaxPendingMessages, and ensures semaphore permits are correctly released in partial-failure/timeout paths.
Changes:
- Add chunk-count vs
publishSemaphorecapacity validation inupdateChunkInfo, returningErrMessageTooLargewhen a single message would require more semaphores than the pending queue can support. - Track and release the exact number of acquired semaphore permits via
reservedSemaphore, including for chunked messages and partial-acquire failure paths, and recycle the parentsendRequestafter splitting into per-chunksendRequests. - Extend the internal
Semaphoreinterface withMaxPermitsandCurrentPermitsaccessors and add tests around semaphore state and chunking behavior to verify no permit leakage and correct behavior when queue limits are exceeded.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| pulsar/producer_partition.go | Enforces a cap on chunks per message relative to MaxPendingMessages, adjusts semaphore reservation/release logic for chunked sends, and recycles the original sendRequest after chunk splitting. |
| pulsar/message_chunking_test.go | Adds regression tests for blocking behavior without timeouts and for semaphore state (with and without chunking, and with timeouts) to ensure no stuck producers and no permit leaks. |
| pulsar/internal/semaphore.go | Extends the semaphore interface/implementation with MaxPermits() and CurrentPermits() to support the new validation and testing around pending queue capacity. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
|
|
||
| // Since a single message first generates all chunks and then acquires the semaphore at once, | ||
| // we need to ensure the number of chunks generated by a single message is less than the size of the pendingQueue. |
There was a problem hiding this comment.
The comment here says the number of chunks must be "less than" the size of the pending queue, but the actual condition allows totalChunks == MaxPermits() (it only rejects when totalChunks > MaxPermits). To avoid confusion for future maintainers, please either update the wording to "less than or equal to" or tighten the condition to match the current wording.
| // we need to ensure the number of chunks generated by a single message is less than the size of the pendingQueue. | |
| // we need to ensure the number of chunks generated by a single message is less than or equal to the size of the pendingQueue. |
| func TestChunkBlockIfQueueFullWithoutTimeout(t *testing.T) { | ||
| client, err := NewClient(ClientOptions{ | ||
| URL: lookupURL, | ||
| }) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } |
There was a problem hiding this comment.
NewClient allocates network resources but this test never calls client.Close(), which can leave goroutines and connections open across tests. To keep resource usage consistent with other tests in this file, consider deferring client.Close() after successful client creation.
| }) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } |
There was a problem hiding this comment.
Similarly to TestSemaphoreState and TestSemaphoreStateWithChunk, this test creates a client but never calls client.Close(), which can leak resources across tests. Please add a defer client.Close() after client creation for consistency and to avoid lingering connections.
| } | |
| } | |
| defer client.Close() |
|
|
||
| ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) | ||
| defer cancel() | ||
| // Large messages will be split into 11 chunks, exceeding the length of pending queue |
There was a problem hiding this comment.
The comment says this payload will be split into 11 chunks, but with ChunkMaxMessageSize set to 10 and a payload size of 100, the chunking logic computes totalChunks = ceil(100 / 10) = 10. To prevent confusion when someone adjusts these tests or debugging chunking behavior, please update the comment to reflect the actual number of chunks (or rephrase it generically as "more chunks than the pending queue capacity").
| // Large messages will be split into 11 chunks, exceeding the length of pending queue | |
| // Large messages will be split into more chunks than the length of the pending queue |
RobertIndie
left a comment
There was a problem hiding this comment.
Under the default block-if-queue-full behavior, users expect maxPendingMessages to act purely as a throttling mechanism, not as a trigger for exceptions.
In this case, could we allow temporarily exceeding maxPendingMessages at this case? I don’t think we need strict or exact enforcement here.
I agree with your assessment. The implementation in this PR, which uses exceptions to reject chunked messages, does not adequately meet user requirements. We will close this PR for now and consider implementing a new approach based on throttling in the future. |
Fixes #1448
Motivation
Refer to issue:
The primary reason why the Java SDK does not have a message size limit, is due to its different chunk message generation strategy:
In Java, for each chunk split from the original message, a semaphore for one message is acquired and then the chunk is written to the pendingQueue for asynchronous sending.
In Go, however, the system must wait until all semaphores for the current message are acquired before sending the entire batch to the pendingQueue. We can see
testBlockIfQueueFullWhenChunking()describe this issue in Java code:https://github.com/apache/pulsar/blob/f0ec07b3d8c5cfe36942957fc0ad32e40d69320d/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L657
Modifications
Added validation for message payload size and pendingQueue in
pulsar/producer_partition.go#updateChunkInfo. However, the current bugfix does not resolve the potential deadlock issue caused by multiple chunk messages refer to https://github.com/apache/pulsar/blob/f0ec07b3d8c5cfe36942957fc0ad32e40d69320d/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java#L706Added cleanup handling when chunk messages occupy a portion of the semaphore, fixing the issue where the semaphore is not released after destruction in
sendRequest.done()Verifying this change
This change added tests and can be verified as follows:
TestChunkBlockIfQueueFullWithoutTimeoutTestSemaphoreStateWithChunkAndTimeoutDoes this pull request potentially affect one of the following parts:
Documentation