|
| 1 | +from opengsq.binary_reader import BinaryReader |
| 2 | +from opengsq.exceptions import ServerNotFoundException |
| 3 | +from opengsq.protocol_base import ProtocolBase |
| 4 | +from opengsq.socket_async import SocketAsync, SocketKind |
| 5 | + |
| 6 | + |
| 7 | +class Scum(ProtocolBase): |
| 8 | + """Scum Protocol""" |
| 9 | + full_name = 'Scum Protocol' |
| 10 | + |
| 11 | + async def get_status(self, master_servers: list = None) -> dict: |
| 12 | + """ |
| 13 | + Retrieves information about the server |
| 14 | + Notice: this method calls Scum.query_master_servers() |
| 15 | + function everytime if master_servers is not passed, |
| 16 | + you may need to cache the master servers if you had |
| 17 | + lots of servers to query. |
| 18 | + """ |
| 19 | + ip = SocketAsync.gethostbyname(self._host) |
| 20 | + |
| 21 | + if master_servers is None: |
| 22 | + master_servers = await Scum.query_master_servers() |
| 23 | + |
| 24 | + for server in master_servers: |
| 25 | + if server['ip'] == ip: |
| 26 | + return server |
| 27 | + |
| 28 | + raise ServerNotFoundException() |
| 29 | + |
| 30 | + @staticmethod |
| 31 | + async def query_master_servers(host='172.107.16.215', port=1040) -> list: |
| 32 | + """ |
| 33 | + Query SCUM Master-Server list |
| 34 | + """ |
| 35 | + with SocketAsync(SocketKind.SOCK_STREAM) as sock: |
| 36 | + sock.settimeout(5) |
| 37 | + await sock.connect((host, port)) |
| 38 | + sock.send(b'\x04\x03\x00\x00') |
| 39 | + |
| 40 | + total = -1 |
| 41 | + response = b'' |
| 42 | + servers = [] |
| 43 | + |
| 44 | + while total == -1 or len(servers) < total: |
| 45 | + response += await sock.recv() |
| 46 | + br = BinaryReader(response) |
| 47 | + |
| 48 | + # first packet return the total number of servers |
| 49 | + if total == -1: |
| 50 | + total = br.read_short() |
| 51 | + |
| 52 | + # server bytes length always 127 |
| 53 | + while br.length() >= 127: |
| 54 | + server = {} |
| 55 | + server['ip'] = '.'.join(map(str, reversed([br.read_byte(), br.read_byte(), br.read_byte(), br.read_byte()]))) |
| 56 | + server['port'] = br.read_short() |
| 57 | + server['name'] = str(br.read_bytes(100).rstrip(b'\x00'), encoding='utf-8', errors='ignore') |
| 58 | + br.read_byte() # skip |
| 59 | + server['numplayers'] = br.read_byte() |
| 60 | + server['maxplayers'] = br.read_byte() |
| 61 | + server['time'] = br.read_byte() |
| 62 | + br.read_byte() # skip |
| 63 | + server['password'] = ((br.read_byte() >> 1) & 1) == 1 |
| 64 | + br.read_bytes(7) # skip |
| 65 | + v = list(reversed([hex(br.read_byte())[2:].rjust(2, '0') for _ in range(8)])) |
| 66 | + server['version'] = f'{int(v[0], 16)}.{int(v[1], 16)}.{int(v[2] + v[3], 16)}.{int(v[4] + v[5] + v[6] + v[7], 16)}' |
| 67 | + servers.append(server) |
| 68 | + |
| 69 | + # if the length is less than 127, save the unused bytes for next loop |
| 70 | + response = br.read() |
| 71 | + |
| 72 | + return servers |
| 73 | + |
| 74 | + |
| 75 | +if __name__ == '__main__': |
| 76 | + import asyncio |
| 77 | + import json |
| 78 | + |
| 79 | + async def main_async(): |
| 80 | + scum = Scum(host='15.235.181.19', port=7042, timeout=5.0) |
| 81 | + master_servers = await scum.query_master_servers() |
| 82 | + server = await scum.get_status(master_servers) |
| 83 | + print(json.dumps(server, indent=None) + '\n') |
| 84 | + |
| 85 | + asyncio.run(main_async()) |
0 commit comments