From 0f1fe7c4ca5a1fa1f880962e7caba897d969f68f Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Wed, 22 Jan 2025 17:48:51 -0500 Subject: [PATCH] Add a new command for concurrent packet capture --- zigpy_cli/pcap.py | 29 +++++++++++++++++++++++++++++ zigpy_cli/radio.py | 35 ++++++++++++++++++++++++++++++----- 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/zigpy_cli/pcap.py b/zigpy_cli/pcap.py index 33685bc..6ed1802 100644 --- a/zigpy_cli/pcap.py +++ b/zigpy_cli/pcap.py @@ -1,14 +1,20 @@ from __future__ import annotations +import datetime +import json import logging +import sys import click +import zigpy.types as t from scapy.config import conf as scapy_conf from scapy.layers.dot15d4 import Dot15d4 # NOQA: F401 from scapy.utils import PcapReader, PcapWriter from zigpy_cli.cli import cli +from .helpers import PcapWriter as ZigpyPcapWriter + scapy_conf.dot15d4_protocol = "zigbee" LOGGER = logging.getLogger(__name__) @@ -29,3 +35,26 @@ def fix_fcs(input, output): for packet in reader: packet.fcs = None writer.write(packet) + + +@pcap.command() +@click.option("-o", "--output", type=click.File("wb"), required=True) +def interleave_combine(output): + if output.name == "": + output = sys.stdout.buffer.raw + + writer = ZigpyPcapWriter(output) + writer.write_header() + + while True: + line = sys.stdin.readline() + data = json.loads(line) + packet = t.CapturedPacket( + timestamp=datetime.datetime.fromisoformat(data["timestamp"]), + rssi=data["rssi"], + lqi=data["lqi"], + channel=data["channel"], + data=bytes.fromhex(data["data"]), + ) + + writer.write_packet(packet) diff --git a/zigpy_cli/radio.py b/zigpy_cli/radio.py index 3f47093..481878e 100644 --- a/zigpy_cli/radio.py +++ b/zigpy_cli/radio.py @@ -8,6 +8,7 @@ import json import logging import random +import sys import click import zigpy.state @@ -251,10 +252,19 @@ async def change_channel(app, channel): ) @click.option("-p", "--channel-hop-period", type=float, default=5.0) @click.option("-o", "--output", type=click.File("wb"), required=True) +@click.option("--interleave", is_flag=True, type=bool, default=False) @click_coroutine async def packet_capture( - app, channel_hop_randomly, channels, channel_hop_period, output + app, + channel_hop_randomly, + channels, + channel_hop_period, + output, + interleave, ): + if output.name == "" and not interleave: + output = sys.stdout.buffer.raw + if not channel_hop_randomly: channels_iter = itertools.cycle(channels) else: @@ -270,8 +280,9 @@ def channels_iter_func(): await app.connect() - writer = PcapWriter(output) - writer.write_header() + if not interleave: + writer = PcapWriter(output) + writer.write_header() async with asyncio.TaskGroup() as tg: channel_hopper_task = None @@ -287,7 +298,21 @@ async def channel_hopper(): channel_hopper_task = tg.create_task(channel_hopper()) LOGGER.debug("Got a packet %s", packet) - writer.write_packet(packet) - if output.name == "": # Surely there's a better way? + if not interleave: + writer.write_packet(packet) + else: + # To do line interleaving, encode the packets as JSON + output.write( + json.dumps( + { + "timestamp": packet.timestamp.isoformat(), + "rssi": packet.rssi, + "lqi": packet.lqi, + "channel": packet.channel, + "data": packet.data.hex(), + } + ).encode("ascii") + + b"\n" + ) output.flush()