1
1
import logging
2
2
import threading
3
3
import unittest
4
+ from contextlib import contextmanager
4
5
5
6
import can
6
7
import canopen
@@ -72,7 +73,7 @@ def test_emcy_consumer_reset(self):
72
73
self .assertEqual (len (self .emcy .active ), 0 )
73
74
74
75
def test_emcy_consumer_wait (self ):
75
- PAUSE = TIMEOUT / 4
76
+ PAUSE = TIMEOUT / 2
76
77
77
78
def push_err ():
78
79
self .emcy .on_emcy (0x81 , b'\x01 \x20 \x01 \x01 \x02 \x03 \x04 \x05 ' , 100 )
@@ -84,34 +85,42 @@ def check_err(err):
84
85
data = bytes ([1 , 2 , 3 , 4 , 5 ]), ts = 100 ,
85
86
)
86
87
88
+ @contextmanager
89
+ def timer (func ):
90
+ t = threading .Timer (PAUSE , func )
91
+ try :
92
+ yield t
93
+ finally :
94
+ t .join (TIMEOUT )
95
+
87
96
# Check unfiltered wait, on timeout.
88
97
self .assertIsNone (self .emcy .wait (timeout = TIMEOUT ))
89
98
90
99
# Check unfiltered wait, on success.
91
- timer = threading . Timer ( PAUSE , push_err )
92
- timer . start ()
93
- with self . assertLogs ( level = logging . INFO ):
94
- err = self .emcy .wait (timeout = TIMEOUT )
100
+ with timer ( push_err ) as t :
101
+ with self . assertLogs ( level = logging . INFO ):
102
+ t . start ()
103
+ err = self .emcy .wait (timeout = TIMEOUT )
95
104
check_err (err )
96
105
97
106
# Check filtered wait, on success.
98
- timer = threading . Timer ( PAUSE , push_err )
99
- timer . start ()
100
- with self . assertLogs ( level = logging . INFO ):
101
- err = self .emcy .wait (0x2001 , TIMEOUT )
107
+ with timer ( push_err ) as t :
108
+ with self . assertLogs ( level = logging . INFO ):
109
+ t . start ()
110
+ err = self .emcy .wait (0x2001 , TIMEOUT )
102
111
check_err (err )
103
112
104
113
# Check filtered wait, on timeout.
105
- timer = threading . Timer ( PAUSE , push_err )
106
- timer .start ()
107
- self .assertIsNone (self .emcy .wait (0x9000 , TIMEOUT ))
114
+ with timer ( push_err ) as t :
115
+ t .start ()
116
+ self .assertIsNone (self .emcy .wait (0x9000 , TIMEOUT ))
108
117
109
118
def push_reset ():
110
119
self .emcy .on_emcy (0x81 , b'\x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 ' , 100 )
111
120
112
- timer = threading . Timer ( PAUSE , push_reset )
113
- timer .start ()
114
- self .assertIsNone (self .emcy .wait (0x9000 , TIMEOUT ))
121
+ with timer ( push_reset ) as t :
122
+ t .start ()
123
+ self .assertIsNone (self .emcy .wait (0x9000 , TIMEOUT ))
115
124
116
125
117
126
class TestEmcyError (unittest .TestCase ):
0 commit comments