Skip to content

Commit 7b04599

Browse files
Maintenance and bugfixes (#13)
* Fix bug that double ran place_stateful_orders.py each restart * Try terminate and kill in run_all_scripts.py + add more logging * Fix asyncio event-loop blocking in place_stateful_orders.py * Add more detailed logging for BigQuery insert errors * Update clob pair id + add TODOs for code cleanup * Refactor precompute_order to not need to query the chain * Update place_orders.py to only use asyncio-friendly code * Handle place_stateful_orders.py account sequence bug * Bump clob pair id to 1000 to cover future market listings * Update batch writer to fall back to GCS insert automatically
1 parent 55fa499 commit 7b04599

8 files changed

+254
-169
lines changed

bq_helpers.py

+25-5
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import asyncio
22
import logging
33
from datetime import datetime
4-
from typing import List, Callable
4+
from typing import List, Callable, Type, Awaitable
55

6+
from google.api_core.exceptions import GoogleAPICallError
67
from google.cloud import bigquery, storage
78
from google.cloud.bigquery import SchemaField
89
from google.cloud.exceptions import NotFound
@@ -57,21 +58,40 @@ def __init__(
5758
self.batch_timeout = batch_timeout
5859
self.last_flush_time = datetime.utcnow()
5960

61+
async def default_error_413_handler(data_buffer, e):
62+
logging.error(f"Error inserting {len(data_buffer)} rows (code = {e.code}): {e}")
63+
64+
self.error_413_handler = default_error_413_handler
65+
6066
async def enqueue_data(self, data):
6167
await self.queue.put(data)
6268

6369
async def flush_data(self, data_buffer):
6470
try:
6571
errors = await asyncio.to_thread(
66-
self.bq_client.insert_rows_json, self.table_ref, data_buffer
72+
self.bq_client.insert_rows_json,
73+
self.table_ref,
74+
data_buffer
6775
)
6876
if errors:
6977
logging.error(f"Errors occurred: {errors}")
78+
except GoogleAPICallError as e:
79+
if e.code == 413:
80+
await self.error_413_handler(data_buffer, e)
81+
else:
82+
logging.error(f"Error inserting {len(data_buffer)} rows (code = {e.code}): {e}")
7083
except Exception as e:
71-
logging.error(f"Error inserting rows: {e}")
84+
logging.error(f"Error inserting {len(data_buffer)} rows: {e}")
7285
finally:
7386
self.last_flush_time = datetime.utcnow()
7487

88+
def set_error_413_handler(self, handler: Callable[[List, Type[Exception]], Awaitable[None]]):
89+
"""
90+
Set an async function handler that will be called when a 413 (payload
91+
too large) error occurs.
92+
"""
93+
self.error_413_handler = handler
94+
7595
async def batch_writer_loop(self):
7696
workers = [asyncio.create_task(self.worker()) for _ in range(self.worker_count)]
7797
await asyncio.gather(*workers)
@@ -140,9 +160,9 @@ async def gcs_writer_loop(self):
140160
logging.error(f"Error inserting data: {e}")
141161

142162
def _process_and_insert(self, data: dict) -> None:
143-
data = self.middleware_fn(data)
163+
data = [self.middleware_fn(data) for data in data]
144164
return gcs_insert.insert_via_gcs(
145-
self.client, self.bucket, self.table, [data]
165+
self.client, self.bucket, self.table, data
146166
)
147167

148168
def set_middleware(self, middleware_fn: Callable[[dict], dict]):

client_helpers.py

+30-16
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,16 @@ def setup_clients(grpc_endpoint):
4343
return client, ledger_client
4444

4545

46+
def get_current_block_with_retries(client: CompositeClient):
47+
for i in range(5):
48+
try:
49+
return client.get_current_block()
50+
except Exception as error:
51+
if i == 4:
52+
raise error
53+
raise Exception("Failed to get current block")
54+
55+
4656
def get_markets_data(client, market):
4757
markets_response = client.indexer_client.markets.get_perpetual_markets(market)
4858
return markets_response.data["markets"][market]
@@ -62,19 +72,20 @@ def place_order(client, block, msg_and_order, batch_writer):
6272

6373

6474
def precompute_order(
65-
client,
66-
ledger_client,
67-
market,
68-
subaccount,
69-
side,
70-
price,
71-
client_id,
72-
good_til_block,
73-
good_til_block_time,
74-
size,
75-
order_flags,
76-
time_in_force,
77-
sequence_number,
75+
client,
76+
ledger_client,
77+
market,
78+
subaccount,
79+
side,
80+
price,
81+
client_id,
82+
good_til_block,
83+
good_til_block_time,
84+
size,
85+
order_flags,
86+
time_in_force,
87+
sequence_number,
88+
account_number,
7889
):
7990
# precompute order and sign the order
8091
clob_pair_id = market["clobPairId"]
@@ -114,17 +125,20 @@ def precompute_order(
114125
memo = None
115126

116127
fee = ledger_client.estimate_fee_from_gas(gas_limit)
117-
account = ledger_client.query_account(sender.address())
118128

119129
tx.seal(
120130
SigningCfg.direct(sender.public_key(), sequence_number),
121131
fee=fee,
122132
gas_limit=gas_limit,
123133
memo=memo,
124134
)
125-
tx.sign(sender.signer(), ledger_client.network_config.chain_id, account.number)
135+
tx.sign(
136+
sender.signer(),
137+
ledger_client.network_config.chain_id,
138+
account_number
139+
)
126140
tx.complete()
127-
return (tx, msg)
141+
return tx, msg
128142

129143

130144
async def place_orders(client, block, msg_and_orders, batch_writer):

listen_to_grpc_stream.py

+25-7
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
# Dataset configuration
2323
DATASET_ID = "full_node_stream"
2424
TABLE_ID = "responses"
25-
CLOB_PAIR_IDS = range(133)
25+
CLOB_PAIR_IDS = range(1000) # Have to update if too many more pairs are added
2626

2727
# If data to too large for direct insert, use this GCS bucket to sideload
2828
GCS_BUCKET = "full_node_stream_sideload"
@@ -97,6 +97,9 @@ async def listen_to_stream_and_write_to_bq(
9797
retry_count = 0
9898
start_time = datetime.utcnow()
9999

100+
msg_count = 0
101+
msg_count_time = datetime.utcnow()
102+
100103
while retry_count < MAX_RETRIES_PER_DAY:
101104
try:
102105
stub = QueryStub(channel)
@@ -107,12 +110,19 @@ async def listen_to_stream_and_write_to_bq(
107110
row = process_message(response, server_address)
108111

109112
# If the row is too large, sideload into BQ via GCS
110-
too_large_for_direct_insert = len(row['response']) > 5_000_000
113+
too_large_for_direct_insert = len(row['response']) > 1_000_000
111114
if too_large_for_direct_insert:
112-
await gcs_writer.enqueue_data(row)
115+
await gcs_writer.enqueue_data([row])
113116
else:
114117
await batch_writer.enqueue_data(row)
115118

119+
# Log message counts every 15 mins
120+
msg_count += 1
121+
if datetime.utcnow() - msg_count_time > timedelta(minutes=15):
122+
logging.info(f"Saw {msg_count} msgs in the last 15 minutes")
123+
msg_count_time = datetime.utcnow()
124+
msg_count = 0
125+
116126
await batch_writer.enqueue_data(
117127
process_error("Stream ended", server_address)
118128
)
@@ -166,24 +176,32 @@ def preprocess_row_for_gcs(row: dict) -> dict:
166176

167177

168178
async def main(server_address):
179+
# Writer for exceptionally large rows (book snapshots)
180+
gcs_writer = GCSWriter(DATASET_ID, TABLE_ID, SCHEMA, GCS_BUCKET)
181+
gcs_writer.set_middleware(preprocess_row_for_gcs)
182+
gcs_writer_task = asyncio.create_task(gcs_writer.gcs_writer_loop())
183+
169184
# Writer for direct BigQuery inserts
170185
batch_writer = BatchWriter(
171186
DATASET_ID, TABLE_ID, WORKER_COUNT, BATCH_SIZE, BATCH_TIMEOUT
172187
)
173188
batch_writer_task = asyncio.create_task(batch_writer.batch_writer_loop())
174189

175-
# Writer for exceptionally large rows (book snapshots)
176-
gcs_writer = GCSWriter(DATASET_ID, TABLE_ID, SCHEMA, GCS_BUCKET)
177-
gcs_writer.set_middleware(preprocess_row_for_gcs)
178-
gcs_writer_task = asyncio.create_task(gcs_writer.gcs_writer_loop())
190+
# If the direct writer encounters a 413 error, fall back to sideloading
191+
async def error_413_handler(data_buffer, e):
192+
logging.error(f"Error 413 occurred, sideloading {len(data_buffer)} rows.")
193+
await gcs_writer.enqueue_data(data_buffer)
194+
batch_writer.set_error_413_handler(error_413_handler)
179195

180196
# Adjust to use secure channel if needed
197+
logging.info(f"Connecting to server at {server_address}...")
181198
async with grpc.aio.insecure_channel(server_address, GRPC_OPTIONS) as channel:
182199
await listen_to_stream_and_write_to_bq(
183200
channel, batch_writer, gcs_writer, server_address
184201
)
185202

186203
await asyncio.gather(batch_writer_task, gcs_writer_task)
204+
logging.error("Listening task finished (unexpected)")
187205

188206

189207
if __name__ == "__main__":

0 commit comments

Comments
 (0)