From 3fe1374870d7d1182be834ccc72abeb2f9e57e38 Mon Sep 17 00:00:00 2001 From: Aaron Conole Date: Fri, 7 Feb 2025 09:45:52 -0500 Subject: [PATCH] sendpkt: Allow definitions via Scapy as well as hex bytes. When writing tests it can be very difficult to understand what the test is doing without properly documenting the test. Added to that difficulty is when a stream of bytes gets posted with the patch as a packet definition. We then need to write out detailed descriptions of the packets, and tend to get packet strings that break line length parsers (leading to errors applying patches. With this change, we can write out clear descriptions of packets that make use of Scapy to generate the actual bytes strings. This should serve as accurate documentation, while also trimming the amount of bytes a developer needs to write to represent a packet. Signed-off-by: Aaron Conole Signed-off-by: 0-day Robot --- tests/sendpkt.py | 64 ++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 62 insertions(+), 2 deletions(-) diff --git a/tests/sendpkt.py b/tests/sendpkt.py index 7cbea516548..32af99b3089 100755 --- a/tests/sendpkt.py +++ b/tests/sendpkt.py @@ -25,10 +25,66 @@ # +import re import socket import sys from optparse import OptionParser +try: + from scapy.all import Ether, IP, IPv6, TCP, UDP, ARP, ICMP, ICMPv6ND_RA + from scapy.all import ICMPv6NDOptSrcLLAddr, ICMPv6NDOptMTU + from scapy.all import ICMPv6NDOptPrefixInfo + from scapy.all import ICMPv6EchoRequest, ICMPv6EchoReply + SCAPY_PROTO = { + "Ether": Ether, + "IP": IP, + "IPv6": IPv6, + "TCP": TCP, + "UDP": UDP, + "ARP": ARP, + "ICMP": ICMP, + "ICMPv6ND_RA": ICMPv6ND_RA, + "ICMPv6NDOptSrcLLAddr": ICMPv6NDOptSrcLLAddr, + "ICMPv6NDOptMTU": ICMPv6NDOptMTU, + "ICMPv6NDOptPrefixInfo": ICMPv6NDOptPrefixInfo, + "ICMPv6EchoRequest": ICMPv6EchoRequest, + "ICMPv6EchoReply": ICMPv6EchoReply, + } + + def scapy_parse(packet_def): + pkt_layers = packet_def.split("/") + + pkt = None + + for layer in pkt_layers: + # Word(...) match + lm = re.match(r'(\w+)\((.*?)\)', layer) + if not lm: + raise ValueError( + f"Invalid definition {packet_def} at layer {layer}") + + proto, proto_args_str = lm.groups() + if proto not in SCAPY_PROTO: + raise ValueError("Unable to construct a packet with {proto}.") + + proto_args = {} + if proto_args_str: + kvp = re.findall(r'(\w)=(?:\'([^\']*)\'|([\w.]+))', + proto_args_str) + for key, str_type, n_type in kvp: + proto_args[key] = str_type if str_type else eval(n_type) + + layer_obj = SCAPY_PROTO[proto](**proto_args) + if pkt is None: + pkt = layer_obj + else: + pkt /= layer_obj + + return pkt + +except: + def scapy_parse(packet_def): + raise RuntimeError("No scapy module while trying to parse scapy def.") usage = "usage: %prog [OPTIONS] OUT-INTERFACE HEX-BYTES \n \ bytes in HEX-BYTES must be separated by space(s)" @@ -50,8 +106,12 @@ # Strip '0x' prefixes from hex input, combine into a single string and # convert to bytes. -hex_str = "".join([a[2:] if a.startswith("0x") else a for a in args[1:]]) -pkt = bytes.fromhex(hex_str) +try: + hex_str = "".join([a[2:] if a.startswith("0x") else a for a in args[1:]]) + pkt = bytes.fromhex(hex_str) +except ValueError: + parsed_pkt_obj = scapy_parse(args[1]) + pkt = bytes(parsed_pkt_obj) try: sockfd = socket.socket(socket.AF_PACKET, socket.SOCK_RAW)