Skip to content

Commit 9fc2400

Browse files
Support tracking latency from multiple indexers (#14)
* Add server_address field and new table to indexer websocket listener * Increase wait time for place_stateful_orders * Update indexer ws subscriber to also listen to stateful order address * Fix bug where batch writer could get stuck with timeout = 0 and drop data * Remove excessive sleeping in SIGTERM/SIGINT handler for run_all_scripts.py
1 parent 7b04599 commit 9fc2400

5 files changed

+64
-31
lines changed

bigquery_to_datadog.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@
3232
UNION ALL
3333
SELECT p.received_at
3434
, TIMESTAMP_DIFF(p.received_at, s.sent_at, millisecond) AS latency
35-
, "indexer" AS server_address
35+
, p.server_address
3636
FROM `{project_id}.latency_experiments.long_running_two_sided_orders` s
37-
JOIN `{project_id}.indexer_stream.received_orders_and_cancels` p
37+
JOIN `{project_id}.indexer_stream_new.received_orders_and_cancels` p
3838
ON p.client_id = CAST(s.client_id AS STRING)
3939
AND p.address = s.address
4040
AND p.received_at > TIMESTAMP("{start_timestamp}")
@@ -68,9 +68,9 @@
6868
UNION ALL
6969
SELECT p.received_at
7070
, TIMESTAMP_DIFF(p.received_at, s.sent_at, millisecond) AS latency
71-
, "indexer" AS server_address
71+
, p.server_address
7272
FROM `{project_id}.latency_experiments.long_running_stateful_orders` s
73-
JOIN `{project_id}.indexer_stream.received_orders_and_cancels` p
73+
JOIN `{project_id}.indexer_stream_new.received_orders_and_cancels` p
7474
ON p.client_id = CAST(s.client_id AS STRING)
7575
AND p.address = s.address
7676
AND p.received_at > TIMESTAMP("{start_timestamp}")

bq_helpers.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ async def worker(self):
103103
elapsed_time = (
104104
datetime.utcnow() - self.last_flush_time
105105
).total_seconds()
106-
dynamic_timeout = max(0, self.batch_timeout - elapsed_time)
106+
dynamic_timeout = max(0.1, self.batch_timeout - elapsed_time)
107107
data = await asyncio.wait_for(self.queue.get(), timeout=dynamic_timeout)
108108
data_buffer.append(data)
109109

listen_to_websocket.py

+30-14
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
44
Usage: python listen_to_websocket.py
55
"""
6-
6+
import argparse
77
import asyncio
88
import json
99
import logging
@@ -14,7 +14,6 @@
1414
import websockets
1515
from google.cloud import bigquery
1616
from google.cloud.bigquery import SchemaField
17-
from v4_client_py.clients.constants import Network
1817

1918
# Import the BigQuery helpers
2019
from bq_helpers import create_table, BatchWriter
@@ -23,41 +22,45 @@
2322
with open("config.json", "r") as config_file:
2423
config_json = json.load(config_file)
2524

26-
DATASET_ID = "indexer_stream"
25+
DATASET_ID = "indexer_stream_new"
2726
TABLE_ID = "responses"
2827

2928
SCHEMA = [
3029
SchemaField("received_at", "TIMESTAMP", mode="REQUIRED"),
3130
SchemaField("uuid", "STRING", mode="REQUIRED"),
3231
SchemaField("response", "JSON", mode="NULLABLE"),
32+
SchemaField("server_address", "STRING", mode="REQUIRED"),
3333
]
3434

3535
TIME_PARTITIONING = bigquery.TimePartitioning(field="received_at")
36+
CLUSTERING_FIELDS = ["server_address"]
37+
3638
# Batch settings
3739
BATCH_SIZE = 9
3840
BATCH_TIMEOUT = 10
3941
WORKER_COUNT = 1
4042

4143

42-
def process_message(message):
44+
def process_message(message, url):
4345
return {
4446
"received_at": datetime.utcnow().isoformat("T") + "Z",
4547
"uuid": str(uuid.uuid4()),
4648
"response": message,
49+
"server_address": url
4750
}
4851

4952

5053
class AsyncSocketClient:
51-
def __init__(self, config, subaccount_ids, batch_writer):
52-
self.url = config.websocket_endpoint
54+
def __init__(self, indexer_url: str, subaccount_ids, batch_writer):
55+
self.url = indexer_url
5356
self.subaccount_ids = subaccount_ids
5457
self.batch_writer = batch_writer
5558

5659
async def connect(self):
5760
retries = 0
5861
while True:
5962
try:
60-
async with websockets.connect(self.url) as websocket:
63+
async with websockets.connect(f"wss://{self.url}/v4/ws") as websocket:
6164
if self.subaccount_ids:
6265
for subaccount_id in self.subaccount_ids:
6366
await self.subscribe(
@@ -81,7 +84,7 @@ async def connect(self):
8184
async def consumer_handler(self, websocket):
8285
async for message in websocket:
8386
await self.batch_writer.enqueue_data(
84-
process_message(message)
87+
process_message(message, self.url)
8588
) # Enqueue data for batch writing
8689

8790
async def send(self, websocket, message):
@@ -97,17 +100,19 @@ async def subscribe(self, websocket, channel, params=None):
97100
await self.send(websocket, message)
98101

99102

100-
async def main():
103+
async def main(indexer_url: str):
101104
batch_writer = BatchWriter(
102105
DATASET_ID, TABLE_ID, WORKER_COUNT, BATCH_SIZE, BATCH_TIMEOUT
103106
)
104-
config = Network.config_network().indexer_config
105107
subaccount_ids = [
106108
"/".join([config_json["maker_address"], str(0)]),
107109
"/".join([config_json["taker_address"], str(0)]),
110+
"/".join([config_json["stateful_address"], str(0)]),
108111
]
109112
client = AsyncSocketClient(
110-
config, subaccount_ids=subaccount_ids, batch_writer=batch_writer
113+
indexer_url,
114+
subaccount_ids=subaccount_ids,
115+
batch_writer=batch_writer,
111116
)
112117

113118
batch_writer_task = asyncio.create_task(batch_writer.batch_writer_loop())
@@ -116,8 +121,19 @@ async def main():
116121

117122

118123
if __name__ == "__main__":
124+
parser = argparse.ArgumentParser(
125+
description="Run the Indexer Websocket client for a given URL, e.g. indexer.v4testnet.dydx.exchange."
126+
)
127+
parser.add_argument(
128+
"--indexer_url",
129+
type=str,
130+
help="The indexer API to read from.",
131+
)
132+
args = parser.parse_args()
133+
134+
log_id = args.indexer_url.replace(":", "_").replace("/", "_")
119135
handler = RotatingFileHandler(
120-
"listen_to_websocket.log",
136+
f"listen_to_websocket_{log_id}.log",
121137
maxBytes=5 * 1024 * 1024, # 5 MB
122138
backupCount=5
123139
)
@@ -127,5 +143,5 @@ async def main():
127143
format="%(asctime)s - %(levelname)s - %(message)s",
128144
)
129145

130-
create_table(DATASET_ID, TABLE_ID, SCHEMA, TIME_PARTITIONING)
131-
asyncio.run(main())
146+
create_table(DATASET_ID, TABLE_ID, SCHEMA, TIME_PARTITIONING, CLUSTERING_FIELDS)
147+
asyncio.run(main(args.indexer_url))

place_stateful_orders.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
NUM_BLOCKS = 10_000
5757
DYDX_MNEMONIC = config["stateful_mnemonic"]
5858
GTBT_DELTA = 5
59-
PLACE_INTERVAL = 12
59+
PLACE_INTERVAL = 13
6060

6161

6262
async def listen_to_block_stream_and_place_orders(batch_writer):

run_all_scripts.py

+28-11
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,14 @@
2828
CHECK_INTERVAL = 250 # Check every 250 seconds
2929

3030
SCRIPT_CONFIGS = {
31-
"websocket": {
32-
"script_name": "listen_to_websocket.py",
33-
"table_id": "indexer_stream.responses",
34-
"timestamp_column": "received_at",
35-
"filter": "",
36-
"args": [],
37-
"time_threshold": timedelta(seconds=90),
38-
},
31+
# "websocket": {
32+
# "script_name": "listen_to_websocket.py",
33+
# "table_id": "indexer_stream.responses",
34+
# "timestamp_column": "received_at",
35+
# "filter": "",
36+
# "args": [],
37+
# "time_threshold": timedelta(seconds=90),
38+
# },
3939
"place_orders": {
4040
"script_name": "place_orders.py",
4141
"table_id": "latency_experiments.long_running_two_sided_orders",
@@ -74,6 +74,16 @@
7474
"time_threshold": timedelta(seconds=90),
7575
}
7676

77+
for addr in config["indexer_addresses"]:
78+
SCRIPT_CONFIGS[f"indexer {addr}"] = {
79+
"script_name": "listen_to_websocket.py",
80+
"table_id": "indexer_stream_new.responses",
81+
"timestamp_column": "received_at",
82+
"filter": f'server_address = "{addr}"',
83+
"args": ["--indexer_url", addr],
84+
"time_threshold": timedelta(seconds=90),
85+
}
86+
7787

7888
def get_latest_timestamp(table_id, timestamp_column, filter_condition):
7989
filter_clause = f"WHERE TIMESTAMP_TRUNC({timestamp_column}, DAY) = TIMESTAMP(CURRENT_DATE()) AND {filter_condition}" if filter_condition else ""
@@ -95,14 +105,13 @@ def start_script(script_name, args):
95105
return subprocess.Popen(["python", script_name] + args)
96106

97107

98-
def force_kill_process(process: subprocess.Popen, pname: str):
108+
def terminate_process(process: subprocess.Popen, pname: str):
99109
# Try to terminate the process
100110
logging.info(f"Terminating process {pname}...")
101111
process.terminate()
102112

103-
# Wait for a few seconds to give the process time to terminate
104-
time.sleep(3)
105113

114+
def force_kill_process(process: subprocess.Popen, pname: str):
106115
# Check if the process has terminated
107116
if process.poll() is None:
108117
# Process is still alive, so forcefully kill it
@@ -147,6 +156,8 @@ def check_and_restart_script(
147156
should_restart = True
148157

149158
if should_restart:
159+
terminate_process(process, script_name)
160+
time.sleep(1) # Wait for the process to terminate
150161
force_kill_process(process, script_name)
151162
return start_script(script_name, args)
152163
else:
@@ -162,8 +173,14 @@ def main():
162173
# Gracefully handle Ctrl+C
163174
def signal_handler(sig, frame):
164175
logging.info("Received termination signal, shutting down...")
176+
for pname, p in processes.items():
177+
terminate_process(p, pname)
178+
179+
time.sleep(3) # Wait for the processes to terminate
180+
165181
for pname, p in processes.items():
166182
force_kill_process(p, pname)
183+
167184
sys.exit(0)
168185
signal.signal(signal.SIGINT, signal_handler)
169186
signal.signal(signal.SIGTERM, signal_handler)

0 commit comments

Comments
 (0)