Skip to content

Commit fc939b3

Browse files
authored
[EventHub] Fix race condition when buffered mode is enabled (Azure#34712)
* Protects buffer append from race condition * Improves current batch flushing consistency * Update azure-eventhub CHANGELOG.md
1 parent 6c9ae15 commit fc939b3

File tree

3 files changed

+27
-34
lines changed

3 files changed

+27
-34
lines changed

sdk/eventhub/azure-eventhub/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
### Bugs Fixed
1010

11+
- Fixed a bug where using `EventHubProducerClient` in buffered mode could potentially drop a buffered message without actually sending it. ([#34712](https://github.com/Azure/azure-sdk-for-python/pull/34712))
12+
1113
### Other Changes
1214

1315
- Updated network trace logging to replace `None` values in AMQP connection info with empty strings as per the OpenTelemetry specification.

sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -105,24 +105,22 @@ def put_events(self, events, timeout_time=None):
105105
raise OperationTimeoutError(
106106
"Failed to enqueue events into buffer due to timeout."
107107
)
108-
try:
109-
# add single event into current batch
110-
self._cur_batch.add(events)
111-
except AttributeError: # if the input events is a EventDataBatch, put the whole into the buffer
112-
# if there are events in cur_batch, enqueue cur_batch to the buffer
113-
with self._lock:
108+
with self._lock:
109+
try:
110+
# add single event into current batch
111+
self._cur_batch.add(events)
112+
except AttributeError: # if the input events is a EventDataBatch, put the whole into the buffer
113+
# if there are events in cur_batch, enqueue cur_batch to the buffer
114114
if self._cur_batch:
115115
self._buffered_queue.put(self._cur_batch)
116116
self._buffered_queue.put(events)
117-
# create a new batch for incoming events
118-
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport)
119-
except ValueError:
120-
# add single event exceeds the cur batch size, create new batch
121-
with self._lock:
117+
# create a new batch for incoming events
118+
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport)
119+
except ValueError:
120+
# add single event exceeds the cur batch size, create new batch
122121
self._buffered_queue.put(self._cur_batch)
123-
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport)
124-
self._cur_batch.add(events)
125-
with self._lock:
122+
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport)
123+
self._cur_batch.add(events)
126124
self._cur_buffered_len += new_events_len
127125

128126
def failsafe_callback(self, callback):
@@ -146,6 +144,7 @@ def flush(self, timeout_time=None, raise_error=True):
146144
_LOGGER.info("Partition: %r started flushing.", self.partition_id)
147145
if self._cur_batch: # if there is batch, enqueue it to the buffer first
148146
self._buffered_queue.put(self._cur_batch)
147+
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport)
149148
while self._buffered_queue.qsize() > 0:
150149
remaining_time = timeout_time - time.time() if timeout_time else None
151150
if (remaining_time and remaining_time > 0) or remaining_time is None:
@@ -197,9 +196,6 @@ def flush(self, timeout_time=None, raise_error=True):
197196
break
198197
# after finishing flushing, reset cur batch and put it into the buffer
199198
self._last_send_time = time.time()
200-
#reset buffered count
201-
self._cur_buffered_len = 0
202-
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport)
203199
_LOGGER.info("Partition %r finished flushing.", self.partition_id)
204200

205201
def check_max_wait_time_worker(self):

sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -105,24 +105,22 @@ async def put_events(self, events, timeout_time=None):
105105
raise OperationTimeoutError(
106106
"Failed to enqueue events into buffer due to timeout."
107107
)
108-
try:
109-
# add single event into current batch
110-
self._cur_batch.add(events)
111-
except AttributeError: # if the input events is a EventDataBatch, put the whole into the buffer
112-
# if there are events in cur_batch, enqueue cur_batch to the buffer
113-
async with self._lock:
108+
async with self._lock:
109+
try:
110+
# add single event into current batch
111+
self._cur_batch.add(events)
112+
except AttributeError: # if the input events is a EventDataBatch, put the whole into the buffer
113+
# if there are events in cur_batch, enqueue cur_batch to the buffer
114114
if self._cur_batch:
115115
self._buffered_queue.put(self._cur_batch)
116116
self._buffered_queue.put(events)
117-
# create a new batch for incoming events
118-
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport)
119-
except ValueError:
120-
# add single event exceeds the cur batch size, create new batch
121-
async with self._lock:
117+
# create a new batch for incoming events
118+
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport)
119+
except ValueError:
120+
# add single event exceeds the cur batch size, create new batch
122121
self._buffered_queue.put(self._cur_batch)
123-
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport)
124-
self._cur_batch.add(events)
125-
async with self._lock:
122+
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport)
123+
self._cur_batch.add(events)
126124
self._cur_buffered_len += new_events_len
127125

128126
def failsafe_callback(self, callback):
@@ -200,9 +198,6 @@ async def _flush(self, timeout_time=None, raise_error=True):
200198
break
201199
# after finishing flushing, reset cur batch and put it into the buffer
202200
self._last_send_time = time.time()
203-
#reset curr_buffered
204-
self._cur_buffered_len = 0
205-
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport)
206201
_LOGGER.info("Partition %r finished flushing.", self.partition_id)
207202

208203
async def check_max_wait_time_worker(self):

0 commit comments

Comments
 (0)