@@ -328,47 +328,53 @@ async def one_shot_command(
328
328
329
329
async def _task_network_consumer (self ):
330
330
"""Task for orchestrating incoming data."""
331
- while hasattr (self , "reader" ) and self .reader and not self .reader .at_eof ():
332
- frame = await self .reader .read (300 )
333
- # await self.debug_frames['all'].put(frame)
334
- for message in self .framer .decode (frame ):
335
- _logger .debug ("Processing %s" , message )
336
- if isinstance (message , ExceptionBase ):
337
- _logger .warning (
338
- "Expected response never arrived but resulted in exception: %s" ,
339
- message ,
340
- )
341
- continue
342
- if isinstance (message , HeartbeatRequest ):
343
- _logger .debug ("Responding to HeartbeatRequest" )
344
- await self .tx_queue .put (
345
- (message .expected_response ().encode (), None )
346
- )
347
- continue
348
- if not isinstance (message , TransparentResponse ):
349
- _logger .warning (
350
- "Received unexpected message type for a client: %s" , message
351
- )
352
- continue
353
- if isinstance (message , WriteHoldingRegisterResponse ):
354
- if message .error :
355
- _logger .warning ("%s" , message )
356
- else :
357
- _logger .info ("%s" , message )
358
-
359
- future = self .expected_responses .get (message .shape_hash ())
360
-
361
- if future and not future .done ():
362
- future .set_result (message )
363
- # try:
364
- self .plant .update (message )
365
- # except RegisterCacheUpdateFailed as e:
366
- # # await self.debug_frames['error'].put(frame)
367
- # _logger.debug(f'Ignoring {message}: {e}')
368
- _logger .debug (
369
- "network_consumer reader at EOF, cannot continue, closing connection"
370
- )
371
- await self .close ()
331
+ try :
332
+ while hasattr (self , "reader" ) and self .reader and not self .reader .at_eof ():
333
+ frame = await self .reader .read (300 )
334
+ # await self.debug_frames['all'].put(frame)
335
+ for message in self .framer .decode (frame ):
336
+ _logger .debug ("Processing %s" , message )
337
+ if isinstance (message , ExceptionBase ):
338
+ _logger .warning (
339
+ "Expected response never arrived but resulted in exception: %s" ,
340
+ message ,
341
+ )
342
+ continue
343
+ if isinstance (message , HeartbeatRequest ):
344
+ _logger .debug ("Responding to HeartbeatRequest" )
345
+ await self .tx_queue .put (
346
+ (message .expected_response ().encode (), None )
347
+ )
348
+ continue
349
+ if not isinstance (message , TransparentResponse ):
350
+ _logger .warning (
351
+ "Received unexpected message type for a client: %s" , message
352
+ )
353
+ continue
354
+ if isinstance (message , WriteHoldingRegisterResponse ):
355
+ if message .error :
356
+ _logger .warning ("%s" , message )
357
+ else :
358
+ _logger .info ("%s" , message )
359
+
360
+ future = self .expected_responses .get (message .shape_hash ())
361
+
362
+ if future and not future .done ():
363
+ future .set_result (message )
364
+ # try:
365
+ self .plant .update (message )
366
+ # except RegisterCacheUpdateFailed as e:
367
+ # # await self.debug_frames['error'].put(frame)
368
+ # _logger.debug(f'Ignoring {message}: {e}')
369
+ _logger .debug (
370
+ "network_consumer reader at EOF, cannot continue, closing connection"
371
+ )
372
+ except Exception as e :
373
+ _logger .error (
374
+ "network_consumer reader exception {}" , e
375
+ )
376
+ finally :
377
+ await self .close ()
372
378
373
379
async def _task_network_producer (self , tx_message_wait : float = 0.25 ):
374
380
"""Producer loop to transmit queued frames with an appropriate delay."""
0 commit comments