Skip to content

Commit 352fe9a

Browse files
committed
Handle exceptions in client._task_network_consumer
When exceptions bubble up to _task_network_consumer, log them and close the connection. This prevents the network consumer becoming unresponsive in the face of unhandled exceptions, which then surface as TimeoutErrors to GivTCP processes which are waiting on responses from the inverter.
1 parent fe07cd5 commit 352fe9a

File tree

1 file changed

+47
-41
lines changed
  • GivTCP/givenergy_modbus_async/client

1 file changed

+47
-41
lines changed

GivTCP/givenergy_modbus_async/client/client.py

+47-41
Original file line numberDiff line numberDiff line change
@@ -328,47 +328,53 @@ async def one_shot_command(
328328

329329
async def _task_network_consumer(self):
330330
"""Task for orchestrating incoming data."""
331-
while hasattr(self, "reader") and self.reader and not self.reader.at_eof():
332-
frame = await self.reader.read(300)
333-
# await self.debug_frames['all'].put(frame)
334-
for message in self.framer.decode(frame):
335-
_logger.debug("Processing %s", message)
336-
if isinstance(message, ExceptionBase):
337-
_logger.warning(
338-
"Expected response never arrived but resulted in exception: %s",
339-
message,
340-
)
341-
continue
342-
if isinstance(message, HeartbeatRequest):
343-
_logger.debug("Responding to HeartbeatRequest")
344-
await self.tx_queue.put(
345-
(message.expected_response().encode(), None)
346-
)
347-
continue
348-
if not isinstance(message, TransparentResponse):
349-
_logger.warning(
350-
"Received unexpected message type for a client: %s", message
351-
)
352-
continue
353-
if isinstance(message, WriteHoldingRegisterResponse):
354-
if message.error:
355-
_logger.warning("%s", message)
356-
else:
357-
_logger.info("%s", message)
358-
359-
future = self.expected_responses.get(message.shape_hash())
360-
361-
if future and not future.done():
362-
future.set_result(message)
363-
# try:
364-
self.plant.update(message)
365-
# except RegisterCacheUpdateFailed as e:
366-
# # await self.debug_frames['error'].put(frame)
367-
# _logger.debug(f'Ignoring {message}: {e}')
368-
_logger.debug(
369-
"network_consumer reader at EOF, cannot continue, closing connection"
370-
)
371-
await self.close()
331+
try:
332+
while hasattr(self, "reader") and self.reader and not self.reader.at_eof():
333+
frame = await self.reader.read(300)
334+
# await self.debug_frames['all'].put(frame)
335+
for message in self.framer.decode(frame):
336+
_logger.debug("Processing %s", message)
337+
if isinstance(message, ExceptionBase):
338+
_logger.warning(
339+
"Expected response never arrived but resulted in exception: %s",
340+
message,
341+
)
342+
continue
343+
if isinstance(message, HeartbeatRequest):
344+
_logger.debug("Responding to HeartbeatRequest")
345+
await self.tx_queue.put(
346+
(message.expected_response().encode(), None)
347+
)
348+
continue
349+
if not isinstance(message, TransparentResponse):
350+
_logger.warning(
351+
"Received unexpected message type for a client: %s", message
352+
)
353+
continue
354+
if isinstance(message, WriteHoldingRegisterResponse):
355+
if message.error:
356+
_logger.warning("%s", message)
357+
else:
358+
_logger.info("%s", message)
359+
360+
future = self.expected_responses.get(message.shape_hash())
361+
362+
if future and not future.done():
363+
future.set_result(message)
364+
# try:
365+
self.plant.update(message)
366+
# except RegisterCacheUpdateFailed as e:
367+
# # await self.debug_frames['error'].put(frame)
368+
# _logger.debug(f'Ignoring {message}: {e}')
369+
_logger.debug(
370+
"network_consumer reader at EOF, cannot continue, closing connection"
371+
)
372+
except Exception as e:
373+
_logger.error(
374+
"network_consumer reader exception {}", e
375+
)
376+
finally:
377+
await self.close()
372378

373379
async def _task_network_producer(self, tx_message_wait: float = 0.25):
374380
"""Producer loop to transmit queued frames with an appropriate delay."""

0 commit comments

Comments
 (0)