Skip to content

Commit f184f35

Browse files
committed
Initial set of scripts
1 parent d1d9e65 commit f184f35

File tree

3 files changed

+201
-0
lines changed

3 files changed

+201
-0
lines changed

get_busy_shards

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
#!/usr/bin/env python3
2+
import json
3+
4+
import fdb
5+
6+
fdb.api_version(710)
7+
db = fdb.open("/fdb-cluster-config/cluster-file")
8+
9+
vals = {}
10+
prefix = b"\xff\xff/metrics/data_distribution_stats/"
11+
for k, v in db.get_range_startswith(prefix):
12+
j = json.loads(v.decode("utf-8"))
13+
vals[k] = j
14+
if j["shard_bytes_per_ksecond"] > 0:
15+
print(
16+
f"{k.replace(prefix, b'')} {int(j['shard_bytes_per_ksecond']/10e3)}B/s {int(j['shard_bytes']/10e3)}KB "
17+
)
18+
19+
sorted_data = dict(
20+
sorted(vals.items(), key=lambda item: item[1]["shard_bytes_per_ksecond"])
21+
)
22+
# for k, v in sorted_data.items():
23+
# print(f"{k} {int(v['shard_bytes_per_ksecond']/10e3)}")
24+
25+
tr = db.create_transaction()
26+
tr.options.set_access_system_keys()
27+
REBALANCE_SETTINGS_KEY = b"\xff\x02/r"
28+
for k, v in tr.get_range_startswith(REBALANCE_SETTINGS_KEY, 10000):
29+
print(f"{k} {v}")

get_rebalance_settings

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#!/usr/bin/env python3
2+
3+
import fdb
4+
5+
fdb.api_version(710)
6+
db = fdb.open("/fdb-cluster-config/cluster-file")
7+
8+
#
9+
# OTHER = 0
10+
# REBALANCE_DISK,
11+
# REBALANCE_READ,
12+
# REBALANCE_WRITE,
13+
# MERGE_SHARD,
14+
# SIZE_SPLIT,
15+
# WRITE_SPLIT,
16+
# TENANT_SPLIT,
17+
# __COUNT
18+
19+
tr = db.create_transaction()
20+
tr.options.set_access_system_keys()
21+
REBALANCE_SETTINGS_KEY = b"\xff\x02/r"
22+
for k, v in tr.get_range_startswith(REBALANCE_SETTINGS_KEY, 10000):
23+
print(f"{k} {v}")

get_server_to_shards

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
#!/usr/bin/env python3
2+
import json
3+
import struct
4+
from typing import List, Tuple
5+
6+
import fdb
7+
8+
fdb.api_version(710)
9+
KEY_SERVERS_PREFIX = b"\xff/keyServers/"
10+
KEY_SERVERS_END = b"\xff/keyServers0"
11+
12+
SERVER_LIST_PREFIX = b"\xff/serverList/"
13+
SERVER_LIST_END = b"\xff/serverList\x00"
14+
15+
16+
class UID:
17+
"""Represents a unique identifier used in FoundationDB."""
18+
19+
def __init__(self, first: int, second: int):
20+
self.first = first # 64-bit integer
21+
self.second = second # 64-bit integer
22+
23+
def __repr__(self):
24+
return f"UID({self.first:#018x}, {self.second:#018x})"
25+
26+
27+
def get_process_ids_from_status_json(db):
28+
"""
29+
Return a map storageID(SS) -> processID (ID that we show in the dashboad)
30+
"""
31+
# Get the status JSON from the special key \xff\xff/status/json
32+
status_json = db[b"\xff\xff/status/json"]
33+
status_data = json.loads(status_json.decode("utf-8"))
34+
35+
# Build mapping from storage server IDs to process IDs
36+
server_list_map = {}
37+
processes = status_data["cluster"]["processes"]
38+
for proc_id, proc_info in processes.items():
39+
for role in proc_info["roles"]:
40+
if role["role"] == "storage":
41+
# The storage server ID is the 'id' field in the role
42+
server_id = role["id"]
43+
process_id = proc_id
44+
server_list_map[server_id] = process_id
45+
return server_list_map
46+
47+
48+
def decode_key_servers_value(value_bytes: bytes) -> Tuple[int, List[UID], List[UID]]:
49+
"""
50+
Decodes the value associated with a key starting with \xff/keyServers.
51+
52+
Args:
53+
value_bytes (bytes): The binary value to decode.
54+
55+
Returns:
56+
Tuple[int, List[UID], List[UID]]: A tuple containing the protocol version,
57+
list of source UIDs, and list of destination UIDs.
58+
"""
59+
offset = 0
60+
data_len = len(value_bytes)
61+
62+
# Read protocol version (8 bytes)
63+
if data_len - offset < 8:
64+
raise ValueError("Value too short to contain protocol version")
65+
(protocol_version,) = struct.unpack_from("<Q", value_bytes, offset)
66+
offset += 8
67+
68+
# Read src vector length (4 bytes)
69+
if data_len - offset < 4:
70+
raise ValueError("Value too short to contain src vector length")
71+
(src_len,) = struct.unpack_from("<I", value_bytes, offset)
72+
offset += 4
73+
74+
src = []
75+
for _ in range(src_len):
76+
if data_len - offset < 16:
77+
raise ValueError("Value too short to contain src UIDs")
78+
first, second = struct.unpack_from("<QQ", value_bytes, offset)
79+
offset += 16
80+
src.append(UID(first, second))
81+
82+
# Read dest vector length (4 bytes)
83+
if data_len - offset < 4:
84+
raise ValueError("Value too short to contain dest vector length")
85+
(dest_len,) = struct.unpack_from("<I", value_bytes, offset)
86+
offset += 4
87+
88+
dest = []
89+
for _ in range(dest_len):
90+
if data_len - offset < 16:
91+
raise ValueError("Value too short to contain dest UIDs")
92+
first, second = struct.unpack_from("<QQ", value_bytes, offset)
93+
offset += 16
94+
dest.append(UID(first, second))
95+
96+
return protocol_version, src, dest
97+
98+
99+
def get_key_range_to_storage_servers(db):
100+
"""
101+
Return an array of dict, each dict has 2 or 3 keys:
102+
103+
* begin
104+
* end
105+
* storage_servers an array of the SS ID, beware that's not what we(rockset) call SS ID, which is actually is the process ID
106+
"""
107+
# Get the mapping from key ranges to storage servers
108+
key_range_map = []
109+
transaction = db.create_transaction()
110+
transaction.options.set_access_system_keys()
111+
# Read from \xff/keyServers/ to \xff/keyServers\x00
112+
kvs = transaction.get_range(begin=KEY_SERVERS_PREFIX, end=KEY_SERVERS_END)
113+
end_key = None
114+
for kv in kvs:
115+
key = kv.key
116+
value = kv.value
117+
begin_key = key[len(KEY_SERVERS_PREFIX) :]
118+
if len(value) < 1:
119+
continue
120+
(version, src, dst) = decode_key_servers_value(value)
121+
storage_server_ids = [f"{s.first:#018x}"[2:] for s in src]
122+
key_range_map.append(
123+
{
124+
"begin": begin_key,
125+
"storage_servers": storage_server_ids,
126+
}
127+
)
128+
if end_key is not None:
129+
key_range_map[-1]["end"] = end_key
130+
end_key = begin_key
131+
return key_range_map
132+
133+
134+
def main():
135+
db = fdb.open("/fdb-cluster-config/cluster-file")
136+
# Can have issues with multi version
137+
storage_server_to_process_id = get_process_ids_from_status_json(db)
138+
# print(storage_server_to_process_id)
139+
try:
140+
v = get_key_range_to_storage_servers(db)
141+
except:
142+
v = get_key_range_to_storage_servers(db)
143+
for val in v:
144+
print(
145+
f"{val['begin']} {[storage_server_to_process_id[s] for s in val['storage_servers']]}"
146+
)
147+
148+
149+
main()

0 commit comments

Comments
 (0)