@@ -1402,4 +1402,136 @@ async def test_query_peer_properties_retry_timeout(nursery, dbus, sysnet):
14021402
14031403 # exit mctpd
14041404 res = await mctpd .stop_mctpd ()
1405- assert res == 0
1405+ assert res == 0
1406+
1407+ """ Test RegisterVDMTypeSupport when no responders are registered """
1408+ async def test_register_vdm_type_support_empty (mctpd ):
1409+ ep = mctpd .network .endpoints [0 ]
1410+ ep .eid = 12
1411+ iface = mctpd .system .interfaces [0 ]
1412+ await mctpd .system .add_route (mctpd .system .Route (ep .eid , 1 , iface = iface ))
1413+ await mctpd .system .add_neighbour (
1414+ mctpd .system .Neighbour (iface , ep .lladdr , ep .eid )
1415+ )
1416+
1417+ # Verify error response when no VDM is registered
1418+ cmd = MCTPControlCommand (True , 0 , 0x06 , bytes ([0x00 ]))
1419+ rsp = await ep .send_control (mctpd .network .mctp_socket , cmd )
1420+ assert rsp .hex (' ' ) == '00 06 02'
1421+
1422+ """ Test RegisterVDMTypeSupport when a single PCIe VDM is registered """
1423+ async def test_register_vdm_type_support_pcie_only (dbus , mctpd ):
1424+ ep = mctpd .network .endpoints [0 ]
1425+ ep .eid = 12
1426+ iface = mctpd .system .interfaces [0 ]
1427+ await mctpd .system .add_route (mctpd .system .Route (ep .eid , 1 , iface = iface ))
1428+ await mctpd .system .add_neighbour (
1429+ mctpd .system .Neighbour (iface , ep .lladdr , ep .eid )
1430+ )
1431+
1432+ mctp = await mctpd_mctp_base_iface_obj (dbus )
1433+
1434+ # Register PCIe VDM: format=0x00, VID=0xABCD, command_set=0x0001
1435+ await mctp .call_register_vdm_type_support (0x00 ,
1436+ asyncdbus .Variant ('q' , 0xABCD ), 0x0001 )
1437+
1438+ # Verify PCIe VDM (selector 0)
1439+ cmd = MCTPControlCommand (True , 0 , 0x06 , bytes ([0x00 ]))
1440+ rsp = await ep .send_control (mctpd .network .mctp_socket , cmd )
1441+ assert rsp .hex (' ' ) == '00 06 00 ff 00 ab cd 00 01'
1442+
1443+ # Verify error with incorrect selector
1444+ cmd = MCTPControlCommand (True , 0 , 0x06 , bytes ([0x05 ]))
1445+ rsp = await ep .send_control (mctpd .network .mctp_socket , cmd )
1446+ assert rsp .hex (' ' ) == '00 06 02'
1447+
1448+ """ Test RegisterVDMTypeSupport when a single IANA VDM is registered """
1449+ async def test_register_vdm_type_support_iana_only (dbus , mctpd ):
1450+ ep = mctpd .network .endpoints [0 ]
1451+ ep .eid = 12
1452+ iface = mctpd .system .interfaces [0 ]
1453+ await mctpd .system .add_route (mctpd .system .Route (ep .eid , 1 , iface = iface ))
1454+ await mctpd .system .add_neighbour (
1455+ mctpd .system .Neighbour (iface , ep .lladdr , ep .eid )
1456+ )
1457+
1458+ mctp = await mctpd_mctp_base_iface_obj (dbus )
1459+
1460+ # Register IANA VDM: format=0x01, VID=0x1234ABCD, command_set=0x5678
1461+ await mctp .call_register_vdm_type_support (0x01 ,
1462+ asyncdbus .Variant ('u' , 0x1234ABCD ), 0x5678 )
1463+
1464+ # Verify IANA VDM (selector 0)
1465+ cmd = MCTPControlCommand (True , 0 , 0x06 , bytes ([0x00 ]))
1466+ rsp = await ep .send_control (mctpd .network .mctp_socket , cmd )
1467+ assert rsp .hex (' ' ) == '00 06 00 ff 01 12 34 ab cd 56 78'
1468+
1469+ """ Test RegisterVDMTypeSupport with dbus disconnect """
1470+ async def test_register_vdm_type_support_dbus_disconnect (mctpd ):
1471+ ep = mctpd .network .endpoints [0 ]
1472+ ep .eid = 12
1473+ iface = mctpd .system .interfaces [0 ]
1474+ await mctpd .system .add_route (mctpd .system .Route (ep .eid , 1 , iface = iface ))
1475+ await mctpd .system .add_neighbour (
1476+ mctpd .system .Neighbour (iface , ep .lladdr , ep .eid )
1477+ )
1478+
1479+ # Verify error response when no VDM is registered
1480+ cmd = MCTPControlCommand (True , 0 , 0x06 , bytes ([0x00 ]))
1481+ rsp = await ep .send_control (mctpd .network .mctp_socket , cmd )
1482+ assert rsp .hex (' ' ) == '00 06 02'
1483+
1484+ async with asyncdbus .MessageBus ().connect () as temp_bus :
1485+ mctp = await mctpd_mctp_base_iface_obj (temp_bus )
1486+
1487+ # Register PCIe VDM: format=0x00, VID=0xABCD, command_set=0x0001
1488+ await mctp .call_register_vdm_type_support (0x00 ,
1489+ asyncdbus .Variant ('q' , 0xABCD ), 0x0001 )
1490+
1491+ # Verify PCIe VDM (selector 0)
1492+ cmd = MCTPControlCommand (True , 0 , 0x06 , bytes ([0x00 ]))
1493+ rsp = await ep .send_control (mctpd .network .mctp_socket , cmd )
1494+ assert rsp .hex (' ' ) == '00 06 00 ff 00 ab cd 00 01'
1495+
1496+ # Give mctpd a moment to process the disconnection
1497+ await trio .sleep (0.1 )
1498+
1499+ # Verify VDM type is removed after disconnect
1500+ cmd = MCTPControlCommand (True , 0 , 0x06 , bytes ([0x00 ]))
1501+ rsp = await ep .send_control (mctpd .network .mctp_socket , cmd )
1502+ assert rsp .hex (' ' ) == '00 06 02' # Should be error again
1503+
1504+ """ Test RegisterVDMTypeSupport error handling """
1505+ async def test_register_vdm_type_support_errors (dbus , mctpd ):
1506+ ep = mctpd .network .endpoints [0 ]
1507+ ep .eid = 12
1508+ iface = mctpd .system .interfaces [0 ]
1509+ await mctpd .system .add_route (mctpd .system .Route (ep .eid , 1 , iface = iface ))
1510+ await mctpd .system .add_neighbour (
1511+ mctpd .system .Neighbour (iface , ep .lladdr , ep .eid )
1512+ )
1513+
1514+ mctp = await mctpd_mctp_base_iface_obj (dbus )
1515+ # Verify DBus call fails with invalid format 0x02
1516+ with pytest .raises (asyncdbus .errors .DBusError ) as ex :
1517+ await mctp .call_register_vdm_type_support (0x02 ,
1518+ asyncdbus .Variant ('q' , 0xABCD ), 0x0001 )
1519+ assert "Unsupported VID format" in str (ex .value )
1520+
1521+ # Verify incorrect VID type raises error
1522+ with pytest .raises (asyncdbus .errors .DBusError ) as ex :
1523+ await mctp .call_register_vdm_type_support (0x00 ,
1524+ asyncdbus .Variant ('u' , 0xABCDEF12 ), 0x0001 )
1525+ assert "Expected format is PCIe but variant contains" in str (ex .value )
1526+ with pytest .raises (asyncdbus .errors .DBusError ) as ex :
1527+ await mctp .call_register_vdm_type_support (0x01 ,
1528+ asyncdbus .Variant ('q' , 0xABCD ), 0x5678 )
1529+ assert "Expected format is IANA but variant contains" in str (ex .value )
1530+
1531+ # Verify duplicate VDM raises error
1532+ await mctp .call_register_vdm_type_support (0x00 ,
1533+ asyncdbus .Variant ('q' , 0xABCD ), 0x0001 )
1534+ with pytest .raises (asyncdbus .errors .DBusError ) as ex :
1535+ await mctp .call_register_vdm_type_support (0x00 ,
1536+ asyncdbus .Variant ('q' , 0xABCD ), 0x0001 )
1537+ assert str (ex .value ) == "VDM type already registered"
0 commit comments