Skip to content

Commit

Permalink
sendpkt: Allow definitions via Scapy as well as hex bytes.
Browse files Browse the repository at this point in the history
When writing tests it can be very difficult to understand what
the test is doing without properly documenting the test.  Added
to that difficulty is when a stream of bytes gets posted with
the patch as a packet definition.  We then need to write out
detailed descriptions of the packets, and tend to get packet
strings that break line length parsers (leading to errors applying
patches.

With this change, we can write out clear descriptions of packets
that make use of Scapy to generate the actual bytes strings.
This should serve as accurate documentation, while also trimming
the amount of bytes a developer needs to write to represent a
packet.

Signed-off-by: Aaron Conole <[email protected]>
Signed-off-by: 0-day Robot <[email protected]>
  • Loading branch information
apconole authored and ovsrobot committed Feb 7, 2025
1 parent 35f49a5 commit 3fe1374
Showing 1 changed file with 62 additions and 2 deletions.
64 changes: 62 additions & 2 deletions tests/sendpkt.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,66 @@
#


import re
import socket
import sys
from optparse import OptionParser

try:
from scapy.all import Ether, IP, IPv6, TCP, UDP, ARP, ICMP, ICMPv6ND_RA
from scapy.all import ICMPv6NDOptSrcLLAddr, ICMPv6NDOptMTU
from scapy.all import ICMPv6NDOptPrefixInfo
from scapy.all import ICMPv6EchoRequest, ICMPv6EchoReply
SCAPY_PROTO = {
"Ether": Ether,
"IP": IP,
"IPv6": IPv6,
"TCP": TCP,
"UDP": UDP,
"ARP": ARP,
"ICMP": ICMP,
"ICMPv6ND_RA": ICMPv6ND_RA,
"ICMPv6NDOptSrcLLAddr": ICMPv6NDOptSrcLLAddr,
"ICMPv6NDOptMTU": ICMPv6NDOptMTU,
"ICMPv6NDOptPrefixInfo": ICMPv6NDOptPrefixInfo,
"ICMPv6EchoRequest": ICMPv6EchoRequest,
"ICMPv6EchoReply": ICMPv6EchoReply,
}

def scapy_parse(packet_def):
pkt_layers = packet_def.split("/")

pkt = None

for layer in pkt_layers:
# Word(...) match
lm = re.match(r'(\w+)\((.*?)\)', layer)
if not lm:
raise ValueError(
f"Invalid definition {packet_def} at layer {layer}")

proto, proto_args_str = lm.groups()
if proto not in SCAPY_PROTO:
raise ValueError("Unable to construct a packet with {proto}.")

proto_args = {}
if proto_args_str:
kvp = re.findall(r'(\w)=(?:\'([^\']*)\'|([\w.]+))',
proto_args_str)
for key, str_type, n_type in kvp:
proto_args[key] = str_type if str_type else eval(n_type)

layer_obj = SCAPY_PROTO[proto](**proto_args)
if pkt is None:
pkt = layer_obj
else:
pkt /= layer_obj

return pkt

except:
def scapy_parse(packet_def):
raise RuntimeError("No scapy module while trying to parse scapy def.")

usage = "usage: %prog [OPTIONS] OUT-INTERFACE HEX-BYTES \n \
bytes in HEX-BYTES must be separated by space(s)"
Expand All @@ -50,8 +106,12 @@

# Strip '0x' prefixes from hex input, combine into a single string and
# convert to bytes.
hex_str = "".join([a[2:] if a.startswith("0x") else a for a in args[1:]])
pkt = bytes.fromhex(hex_str)
try:
hex_str = "".join([a[2:] if a.startswith("0x") else a for a in args[1:]])
pkt = bytes.fromhex(hex_str)
except ValueError:
parsed_pkt_obj = scapy_parse(args[1])
pkt = bytes(parsed_pkt_obj)

try:
sockfd = socket.socket(socket.AF_PACKET, socket.SOCK_RAW)
Expand Down

0 comments on commit 3fe1374

Please sign in to comment.