Skip to content

Commit c516698

Browse files
committed
Added support for ZephyrOS based devices
1 parent 633a30e commit c516698

File tree

5 files changed

+243
-18
lines changed

5 files changed

+243
-18
lines changed

README.md

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,19 @@ $ pip install -I client_test_lib*.whl
2525
- Windows: `set CLOUD_API_KEY=<access_key_here>`
2626
- Default API address is `https://api.us-east-1.mbedcloud.com`. You can change this by defining `CLOUD_API_GW` environment variable in similar way as `CLOUD_API_KEY` is done above.
2727
- Test run will create temporary API key for the WebSocket callback channel by default. If you want to prevent that and use only the exported API key, add `--use_one_apikey` startup argument.
28-
- Tests use [Mbed LS](https://github.com/ARMmbed/mbed-os-tools/tree/master/packages/mbed-ls) to select the board from the serial port.
28+
- Tests use [pyOCD](https://pyocd.io/) for device discovery, with automatic fallback to [Mbed LS](https://github.com/ARMmbed/mbed-os-tools/tree/master/packages/mbed-ls) if pyOCD is not available.
2929
- If you have only one board connected to the serial port, you don't need to select the device for the tests.
30-
- If there are multiple boards connected to the serial port, run `mbedls` to check the target board's ID, and use it in the test run's argument `--target_id=[id]`.
30+
- If there are multiple boards connected to the serial port, you can use either:
31+
- `pyocd list` to check pyOCD-discovered boards and use the board ID with `--target_id=[id]`
32+
- `mbedls` to check mbed-discovered boards and use the target ID with `--target_id=[id]`
3133

3234
```bash
35+
# Using pyOCD (preferred)
36+
$ pyocd list
37+
[INFO] Available debug probes:
38+
0: 0240000032044e4500257009997b00386781000097969900 (ST-Link V2-1)
39+
40+
# Using mbed-ls (fallback)
3341
$ mbedls
3442
+---------------+----------------------+-------------+--------------+--------------------------------------------------+-----------------+
3543
| platform_name | platform_name_unique | mount_point | serial_port | target_id | daplink_version |

client_test_lib/fixtures/client_fixtures.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from client_test_lib.tools.external_conn import ExternalConnection
2222
from client_test_lib.tools.local_conn import LocalConnection
2323
from client_test_lib.tools.serial_conn import SerialConnection
24-
from client_test_lib.tools.utils import get_serial_port_for_mbed
24+
from client_test_lib.tools.utils import get_serial_port_for_mbed, get_serial_port_for_pyocd
2525

2626
log = logging.getLogger(__name__)
2727

@@ -40,23 +40,28 @@ def client_internal(request):
4040
log.info("Using local binary process")
4141
conn = LocalConnection(request.config.getoption("local_binary"))
4242
else:
43-
address = get_serial_port_for_mbed(
43+
# Try pyocd first, fall back to mbed-ls if needed
44+
address = get_serial_port_for_pyocd(
4445
request.config.getoption("target_id")
4546
)
47+
log.info("Serial connection address: {}".format(address))
4648
if address:
4749
conn = SerialConnection(address, 115200)
50+
log.info("Serial connection opened successfully")
4851
else:
4952
err_msg = "No serial connection to open for test device"
5053
log.error(err_msg)
5154
assert False, err_msg
5255

53-
cli = Client(conn)
56+
cli = Client(conn, trace=True)
5457

5558
# reset the serial connection device
5659
if not request.config.getoption(
5760
"ext_conn"
5861
) and not request.config.getoption("local_binary"):
62+
log.info("Resetting device before test...")
5963
cli.reset()
64+
sleep(2) # Give device time to reset and stabilize
6065

6166
cli.wait_for_output("Client registered", 300)
6267
ep_id = cli.endpoint_id(120)

client_test_lib/tools/client_runner.py

Lines changed: 114 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,14 @@ class Client:
3939
:param dut: Running client object
4040
:param trace: Log the raw client output
4141
:param name: Logging name for the client
42+
:param filter_debug: Filter out debug messages to reduce log noise (default: True)
4243
"""
4344

44-
def __init__(self, dut, trace=False, name="0"):
45+
def __init__(self, dut, trace=False, name="0", filter_debug=True):
4546
self._ep_id = None
4647
self.name = name
4748
self.trace = trace
49+
self.filter_debug = filter_debug
4850
self.run = True
4951
self.iq = queue.Queue()
5052
self.dut = dut
@@ -64,21 +66,120 @@ def _input_thread(self):
6466
while self.run:
6567
line = self.dut.readline()
6668
if line:
67-
plain_line = utils.strip_escape(line)
68-
if b"\r" in line and line.count(b"\r") > 1:
69-
plain_line = plain_line.split(b"\r")[-2]
70-
plain_line = plain_line.replace(b"\t", b" ").decode(
71-
"utf-8", "replace"
72-
)
73-
flog.info("<--|D{}| {}".format(self.name, plain_line.strip()))
74-
if self.trace:
75-
log.debug("Raw output: {}".format(line))
76-
if b"Error" in line:
77-
log.error("Output: {}".format(line))
78-
self.iq.put(plain_line)
69+
plain_line = self._parse_serial_line(line)
70+
if plain_line: # Only process non-empty lines
71+
flog.info("<--|D{}| {}".format(self.name, plain_line.strip()))
72+
if self.trace:
73+
log.debug("Raw output: {}".format(line))
74+
self._check_for_errors(line, plain_line)
75+
self.iq.put(plain_line)
7976
else:
8077
pass
8178

79+
def _parse_serial_line(self, line):
80+
"""
81+
Parse serial line to extract clean content
82+
:param line: Raw serial line bytes
83+
:return: Cleaned string or None if line should be ignored
84+
"""
85+
if not line or line == b'':
86+
return None
87+
88+
# Strip escape sequences first
89+
plain_line = utils.strip_escape(line)
90+
91+
# Handle multiple carriage returns and newlines
92+
# Split on \r\n or \r\r\n patterns and take the last meaningful part
93+
if b"\r" in plain_line:
94+
# Split on carriage returns and filter out empty parts
95+
parts = plain_line.split(b"\r")
96+
# Find the last non-empty part that contains actual content
97+
for part in reversed(parts):
98+
if part.strip() and not part.startswith(b"\n"):
99+
plain_line = part
100+
break
101+
102+
# Remove leading/trailing newlines and whitespace
103+
plain_line = plain_line.strip(b"\n\r\t ")
104+
105+
# Skip empty lines
106+
if not plain_line:
107+
return None
108+
109+
# Convert tabs to spaces and decode to string
110+
plain_line = plain_line.replace(b"\t", b" ").decode("utf-8", "replace")
111+
112+
# Skip lines that are just whitespace or control characters
113+
if not plain_line.strip():
114+
return None
115+
116+
# Filter debug output if enabled
117+
if self.filter_debug and self._is_debug_line(plain_line):
118+
return None
119+
120+
return plain_line
121+
122+
def _is_debug_line(self, line):
123+
"""
124+
Check if a line is debug output that should be filtered
125+
:param line: Parsed line string
126+
:return: True if line should be filtered out
127+
"""
128+
line_lower = line.lower().strip()
129+
130+
# log.info("Checking if line is debug: {}".format(line_lower))
131+
# Common debug patterns to filter
132+
debug_patterns = [
133+
"[trace][paal]",
134+
"debug: ",
135+
]
136+
137+
for pattern in debug_patterns:
138+
if pattern in line_lower:
139+
return True
140+
141+
return False
142+
143+
def _check_for_errors(self, raw_line, parsed_line):
144+
"""
145+
Check for error conditions in the serial output
146+
:param raw_line: Raw bytes from serial
147+
:param parsed_line: Parsed string line
148+
"""
149+
# Check for various error patterns (case insensitive)
150+
error_patterns = [
151+
b"Error ",
152+
b"ERROR:",
153+
b"error:",
154+
b"Error:",
155+
b"FAIL",
156+
b"fail",
157+
b"Exception",
158+
b"exception",
159+
b"Fatal",
160+
b"fatal",
161+
b"Critical",
162+
b"critical"
163+
]
164+
165+
# Check raw line for error patterns
166+
for pattern in error_patterns:
167+
if pattern in raw_line:
168+
log.error("Output: {}".format(raw_line))
169+
return
170+
171+
# Also check parsed line for error keywords
172+
parsed_lower = parsed_line.lower()
173+
error_keywords = [
174+
"error", "fail", "exception", "fatal", "critical",
175+
"timeout", "abort", "crash", "panic"
176+
]
177+
178+
for keyword in error_keywords:
179+
if keyword in parsed_lower:
180+
log.error("Output: {}".format(raw_line))
181+
return
182+
82183
def _read_line(self, timeout):
83184
"""
84185
Read data from input queue

client_test_lib/tools/utils.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import re
1919
import string
2020
import mbed_lstools
21+
import serial.tools.list_ports
2122

2223
log = logging.getLogger(__name__)
2324

@@ -157,6 +158,115 @@ def get_serial_port_for_mbed(target_id):
157158
return None
158159

159160

161+
def get_serial_port_for_pyocd(target_id):
162+
"""
163+
Gets serial port address for the device using pyocd for device discovery
164+
Falls back to mbed-ls if pyocd is not available or fails
165+
:param target_id: device target_id (can be pyocd board ID or mbed target_id)
166+
:return: Serial port address
167+
"""
168+
try:
169+
from pyocd.core.helpers import ConnectHelper
170+
171+
log.debug("Attempting to discover devices using pyocd")
172+
173+
# Try to create a session with pyocd
174+
session = ConnectHelper.session_with_chosen_probe()
175+
176+
if session is None:
177+
log.warning("No devices found with pyocd, falling back to mbed-ls")
178+
return get_serial_port_for_mbed(target_id)
179+
180+
# Get the probe from the session
181+
probe = session.probe
182+
if probe:
183+
# Map pyocd probe to serial port
184+
serial_port = _map_pyocd_probe_to_serial_port(probe)
185+
if serial_port:
186+
log.info(
187+
'Using pyocd-discovered device "{}" at "{}" port for tests'.format(
188+
getattr(probe, 'unique_id', 'Unknown'),
189+
serial_port
190+
)
191+
)
192+
session.close()
193+
return serial_port
194+
else:
195+
log.warning("Could not map pyocd probe to serial port, falling back to mbed-ls")
196+
session.close()
197+
return get_serial_port_for_mbed(target_id)
198+
else:
199+
log.warning("No probe found in pyocd session, falling back to mbed-ls")
200+
session.close()
201+
return get_serial_port_for_mbed(target_id)
202+
203+
except ImportError:
204+
log.debug("pyocd not available, falling back to mbed-ls")
205+
return get_serial_port_for_mbed(target_id)
206+
except Exception as e:
207+
log.warning("pyocd device discovery failed: {}, falling back to mbed-ls".format(e))
208+
return get_serial_port_for_mbed(target_id)
209+
210+
211+
def _map_pyocd_probe_to_serial_port(probe):
212+
"""
213+
Maps a pyocd probe to its corresponding serial port
214+
:param probe: pyocd probe object
215+
:return: Serial port path or None if not found
216+
"""
217+
try:
218+
# Get all available serial ports
219+
ports = serial.tools.list_ports.comports()
220+
221+
# Try to match based on USB VID/PID if available
222+
if hasattr(probe, 'vid') and hasattr(probe, 'pid'):
223+
target_vid = probe.vid
224+
target_pid = probe.pid
225+
226+
for port in ports:
227+
if port.vid == target_vid and port.pid == target_pid:
228+
log.debug("Matched pyocd probe to serial port {} by VID/PID".format(port.device))
229+
return port.device
230+
231+
# Prioritize USB serial ports over system serial ports
232+
# Common patterns for ARM development boards (in order of preference)
233+
arm_patterns = [
234+
'ttyACM', # Linux USB CDC-ACM (most common for ARM boards)
235+
'ttyUSB', # Linux USB serial
236+
'cu.usbmodem', # macOS USB
237+
'COM', # Windows
238+
]
239+
240+
# First pass: look for USB serial ports
241+
for port in ports:
242+
port_name = port.device.lower()
243+
for pattern in arm_patterns:
244+
if pattern in port_name:
245+
log.debug("Matched pyocd probe to USB serial port {} by name pattern".format(port.device))
246+
return port.device
247+
248+
# Second pass: exclude system serial ports and use first available USB port
249+
usb_ports = []
250+
for port in ports:
251+
port_name = port.device.lower()
252+
# Skip system serial ports (ttyS*) and virtual ports
253+
if not any(skip in port_name for skip in ['ttys', 'pts', 'ttyprintk']):
254+
usb_ports.append(port)
255+
256+
if usb_ports:
257+
log.debug("Using first available USB serial port {} for pyocd probe".format(usb_ports[0].device))
258+
return usb_ports[0].device
259+
260+
# Last resort: return None to fall back to mbed-ls
261+
log.debug("No suitable USB serial port found for pyocd probe")
262+
return None
263+
264+
except Exception as e:
265+
log.debug("Error mapping pyocd probe to serial port: {}".format(e))
266+
267+
return None
268+
269+
160270
def get_path(path):
161271
if "WORKSPACE" in os.environ:
162272
log.debug("$WORKSPACE: {}".format(os.environ["WORKSPACE"]))

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
manifest-tool==2.6.2
22
mbed-ls==1.8.*
3+
pyocd>=0.35.0
34
pytest==7.4.4
45
pytest-html
56
pyserial

0 commit comments

Comments
 (0)