Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 165 additions & 38 deletions utilities/sdsio-server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1045,60 +1045,187 @@ def interface_validator(interface):
raise argparse.ArgumentTypeError(f"Invalid network interface: {interface}!")

def parse_arguments():
formatter = lambda prog: argparse.HelpFormatter(prog, max_help_position=41)
parser = argparse.ArgumentParser(formatter_class=formatter, description="SDS I/O server")
# Custom parser with targeted epilog behavior
class MSStyleArgumentParser(argparse.ArgumentParser):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._is_top_level = False
self._is_subparser = False
self._ms_show_epilog_once = False # only for "missing server type"
self._error_hint = "" # printed ONLY on subparser errors

def error(self, message):
# Always show the relevant usage
self.print_usage(sys.stderr)

# Decide what footer to show
epilog = ""
# Only show the top-level epilog if we explicitly asked for it (missing server type)
if self._is_top_level and self._ms_show_epilog_once and self.epilog:
try:
epilog_text = self.epilog % {"prog": self.prog}
except Exception:
epilog_text = self.epilog
epilog = "\n\n" + epilog_text + "\n"

# For subparser errors, append a concise per-server hint (not epilog, so help -h stays clean)
hint = ""
if self._is_subparser and self._error_hint:
try:
hint = "\n\n" + (self._error_hint % {"prog": self.prog}) + "\n"
except Exception:
hint = "\n\n" + self._error_hint + "\n"

# Reset one-shot flag
self._ms_show_epilog_once = False

self.exit(2, f"{self.prog}: error: {message}{epilog}{hint}")

def error_with_epilog(self, message):
self._ms_show_epilog_once = True
self.error(message)

formatter = lambda prog: argparse.RawTextHelpFormatter(prog, max_help_position=41)

# Top-level footer (only for the "missing server type" error)
top_epilog = (
"Get help for a server type:\n"
" %(prog)s <server-type> -h\n"
"Examples:\n"
" %(prog)s socket -h\n"
" %(prog)s serial -h\n"
" %(prog)s usb -h\n"
)

# top-level parser
parser = MSStyleArgumentParser(
formatter_class=formatter,
description="SDS I/O server",
epilog=top_epilog,
)
parser._is_top_level = True
parser.usage = "%(prog)s [-h] [--verbose] {socket | serial | usb} [options]"
parser.add_argument("--verbose", "-v", action="store_true", help="Enable debug logging")
subparsers = parser.add_subparsers(dest="server_type", required=True)

# Socket server arguments.
parser_socket = subparsers.add_parser("socket", formatter_class=formatter)
socket_group = parser_socket.add_argument_group("optional")
socket_group_exclusive = socket_group.add_mutually_exclusive_group()
socket_group_exclusive.add_argument("--ipaddr", dest="ip", metavar="<IP>",
help="Server IP address (cannot be used with --interface)",
type=ip_validator, default=None)
socket_group_exclusive.add_argument("--interface", dest="interface", metavar="<Interface>",
help="Network interface (cannot be used with --ipaddr)",
type=interface_validator, default=None)

# Subparsers
subparsers = parser.add_subparsers(
dest="server_type",
title="server type",
metavar="{socket | serial | usb}",
parser_class=MSStyleArgumentParser
)

# socket
parser_socket = subparsers.add_parser(
"socket",
prog=f"{parser.prog} socket",
formatter_class=formatter,
help="Run TCP socket server",
epilog="", # keep subcommand help clean
)
parser_socket._is_subparser = True
parser_socket._error_hint = "For help on how to use the socket server and its arguments, run: %(prog)s -h"
parser_socket.usage = "%(prog)s [--ipaddr <IP> | --interface <Interface>] [--port <TCP Port>] [--workdir <Work dir>]"

socket_group = parser_socket.add_argument_group("socket arguments (optional)")
socket_group.add_argument("--ipaddr", dest="ip", metavar="<IP>",
help="Server IP address (cannot be used with --interface)",
type=ip_validator, default=None)
socket_group.add_argument("--interface", dest="interface", metavar="<Interface>",
help="Network interface (cannot be used with --ipaddr)",
type=interface_validator, default=None)
socket_group.add_argument("--port", dest="port", metavar="<TCP Port>",
help="TCP port (default: 5050)",
type=int, default=5050)
help="TCP port (default: 5050)", type=int, default=5050)
socket_group.add_argument("--workdir", dest="work_dir", metavar="<Work dir>",
help="Directory for SDS files (default: current directory) ", type=dir_path, default=".")

# Serial server arguments.
parser_serial = subparsers.add_parser("serial", formatter_class=formatter)
serial_required = parser_serial.add_argument_group("required")
help="Directory for SDS files (default: current directory)", type=dir_path, default=".")

# serial
parser_serial = subparsers.add_parser(
"serial",
prog=f"{parser.prog} serial",
formatter_class=formatter,
help="Run serial server",
epilog="", # keep subcommand help clean
)
parser_serial._is_subparser = True
parser_serial._error_hint = "For help on how to use the serial server and its arguments, run: %(prog)s -h"
parser_serial.usage = "%(prog)s -p <Serial Port> [--baudrate <Baudrate>] [--parity <Parity>] [--stopbits <Stop bits>] [--connect-timeout <Timeout>] [--workdir <Work dir>]"

serial_required = parser_serial.add_argument_group("serial arguments (required)")
serial_required.add_argument("-p", dest="port", metavar="<Serial Port>",
help="Serial port", required=True)
serial_optional = parser_serial.add_argument_group("optional")
help="Serial port (required)", required=True)

serial_optional = parser_serial.add_argument_group("serial arguments (optional)")
serial_optional.add_argument("--baudrate", dest="baudrate", metavar="<Baudrate>",
help="Baudrate (default: 115200)", type=int, default=115200)
parity_help = "Parity: " + ", ".join([f"{k}={v}" for k, v in serial.PARITY_NAMES.items()])
parity_help += f" (default: {serial.PARITY_NONE})"
parity_help = "Parity: " + ", ".join([f"{k}={v}" for k, v in serial.PARITY_NAMES.items()]) + f" (default: {serial.PARITY_NONE})"
serial_optional.add_argument("--parity", dest="parity", metavar="<Parity>",
choices=serial.PARITY_NAMES.keys(),
help=parity_help, default=serial.PARITY_NONE)
choices=serial.PARITY_NAMES.keys(), help=parity_help, default=serial.PARITY_NONE)
stopbits_help = (f"Stop bits: {serial.STOPBITS_ONE}, {serial.STOPBITS_ONE_POINT_FIVE}, "
f"{serial.STOPBITS_TWO} (default: {serial.STOPBITS_ONE})")
serial_optional.add_argument("--stopbits", dest="stop_bits", metavar="<Stop bits>",
type=float, choices=[serial.STOPBITS_ONE, serial.STOPBITS_ONE_POINT_FIVE,
serial.STOPBITS_TWO], help=stopbits_help, default=serial.STOPBITS_ONE)
type=float,
choices=[serial.STOPBITS_ONE, serial.STOPBITS_ONE_POINT_FIVE, serial.STOPBITS_TWO],
help=stopbits_help, default=serial.STOPBITS_ONE)
serial_optional.add_argument("--connect-timeout", dest="connect_timeout", metavar="<Timeout>",
help="Serial port connection timeout in seconds (default: no timeout)",
type=float, default=None)
serial_optional.add_argument("--workdir", dest="work_dir", metavar="<Work dir>",
help="Directory for SDS files (default: current directory) ", type=dir_path, default=".")
help="Directory for SDS files (default: current directory)", type=dir_path, default=".")

# usb
parser_usb = subparsers.add_parser(
"usb",
prog=f"{parser.prog} usb",
formatter_class=formatter,
help="Run USB bulk server",
epilog="", # keep subcommand help clean
)
parser_usb._is_subparser = True
parser_usb._error_hint = "For help on how to use the usb server and its arguments, run: %(prog)s -h"
parser_usb.usage = "%(prog)s [--workdir <Work dir>] [--high-priority]"

usb_group = parser_usb.add_argument_group("usb arguments (optional)")
usb_group.add_argument("--workdir", dest="work_dir", metavar="<Work dir>",
help="Directory for SDS files (default: current directory)", type=dir_path, default=".")
usb_group.add_argument("--high-priority", dest="high_priority", action="store_true",
help="Enable high-priority threading for USB Server (default: off)")

# two-phase parse
argv = sys.argv[1:]

# 1) Parse top-level to get the server_type (don't print epilog here unless missing)
try:
top_ns, _ = parser.parse_known_args(argv)
except SystemExit:
# top-level parse errors like "invalid choice" -> no epilog (as desired)
raise

# USB server arguments.
parser_usb = subparsers.add_parser("usb", formatter_class=formatter)
usb_optional = parser_usb.add_argument_group("optional")
usb_optional.add_argument("--workdir", dest="work_dir", metavar="<Work dir>",
help="Directory for SDS files (default: current directory) ", type=dir_path, default=".")
usb_optional.add_argument("--high-priority", dest="high_priority", action="store_true",
help="Enable high-priority threading for USB Server (default: off)")
# Missing server type -> show top-level epilog ONCE
if top_ns.server_type is None:
choices = " | ".join(subparsers.choices.keys())
parser.error_with_epilog(f"server-type specification is required: {{{choices}}}")

return parser.parse_args()
# 2) Hand EXACT user tokens to the chosen subparser (everything after the server type token)
try:
i = argv.index(top_ns.server_type)
except ValueError:
# Fallback: if not found for some reason, let subparser try the remainder
i = 0
sub_args = argv[i + 1:]
subparser = subparsers.choices[top_ns.server_type]

# This will raise with subparser usage + our per-server hint on error
sub_ns = subparser.parse_args(sub_args)

# Manual mutual-exclusion for socket so both options appear in one group
if top_ns.server_type == "socket" and sub_ns.ip is not None and sub_ns.interface is not None:
subparser.error("options --ipaddr and --interface are mutually exclusive.")

# Merge namespaces (global + subcommand) for downstream use
for k, v in vars(sub_ns).items():
setattr(top_ns, k, v)
return top_ns

async def main():
args = parse_arguments()
Expand Down
Loading