Skip to content

Commit 6990d74

Browse files
committed
refactor: use batching for update_price
1 parent a591d8a commit 6990d74

File tree

4 files changed

+175
-138
lines changed

4 files changed

+175
-138
lines changed

example_publisher/publisher.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from example_publisher.providers.coin_gecko import CoinGecko
88
from example_publisher.config import Config
99
from example_publisher.providers.pyth_replicator import PythReplicator
10-
from example_publisher.pythd import Pythd, SubscriptionId
10+
from example_publisher.pythd import PriceUpdate, Pythd, SubscriptionId
1111

1212

1313
log = get_logger()
@@ -114,6 +114,7 @@ async def _upd_products(self):
114114

115115
async def _price_update_loop(self):
116116
while True:
117+
price_updates = []
117118
for product in self.products:
118119
price = self.provider.latest_price(product.symbol)
119120
if not price:
@@ -123,23 +124,28 @@ async def _price_update_loop(self):
123124
scaled_price = self.apply_exponent(price.price, product.exponent)
124125
scaled_conf = self.apply_exponent(price.conf, product.exponent)
125126

126-
log.info(
127-
"sending update_price",
128-
product_account=product.product_account,
129-
price_account=product.price_account,
130-
price=scaled_price,
131-
conf=scaled_conf,
132-
symbol=product.symbol,
133-
)
134-
await self.pythd.update_price(
135-
product.price_account, scaled_price, scaled_conf, TRADING
127+
price_updates.append(
128+
PriceUpdate(
129+
account=product.price_account,
130+
price=scaled_price,
131+
conf=scaled_conf,
132+
status=TRADING,
133+
)
136134
)
135+
137136
self.last_successful_update = (
138137
price.timestamp
139138
if self.last_successful_update is None
140139
else max(self.last_successful_update, price.timestamp)
141140
)
142141

142+
log.info(
143+
"sending batch update_price",
144+
num_price_updates=len(price_updates),
145+
)
146+
147+
await self.pythd.update_price_batch(price_updates)
148+
143149
await asyncio.sleep(self.config.price_update_interval_secs)
144150

145151
def apply_exponent(self, x: float, exp: int) -> int:

example_publisher/pythd.py

Lines changed: 68 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
from dataclasses import dataclass, field
2-
import sys
3-
import traceback
42
from dataclasses_json import config, DataClassJsonMixin
5-
from typing import List
3+
from typing import List, Any, Optional
64
from structlog import get_logger
7-
from jsonrpc_websocket import Server
5+
from websockets.client import connect, WebSocketClientProtocol
6+
from asyncio import Lock
87

98
log = get_logger()
109

@@ -20,6 +19,14 @@ class Price(DataClassJsonMixin):
2019
exponent: int = field(metadata=config(field_name="price_exponent"))
2120

2221

22+
@dataclass
23+
class PriceUpdate(DataClassJsonMixin):
24+
account: str
25+
price: int
26+
conf: int
27+
status: str
28+
29+
2330
@dataclass
2431
class Metadata(DataClassJsonMixin):
2532
symbol: str
@@ -33,36 +40,71 @@ class Product(DataClassJsonMixin):
3340
prices: List[Price] = field(metadata=config(field_name="price"))
3441

3542

43+
@dataclass
44+
class JSONRPCRequest(DataClassJsonMixin):
45+
id: int
46+
method: str
47+
params: List[Any] | Any
48+
jsonrpc: str = "2.0"
49+
50+
51+
@dataclass
52+
class JSONRPCResponse(DataClassJsonMixin):
53+
id: int
54+
result: Optional[Any]
55+
error: Optional[Any]
56+
jsonrpc: str = "2.0"
57+
58+
3659
class Pythd:
3760
def __init__(
3861
self,
3962
address: str,
4063
) -> None:
4164
self.address = address
42-
self.server: Server
43-
self._tasks = set()
65+
self.client: WebSocketClientProtocol
66+
self.id_counter = 0
67+
self.lock = Lock()
4468

4569
async def connect(self):
46-
self.server = Server(self.address)
47-
task = await self.server.ws_connect()
48-
task.add_done_callback(Pythd._on_connection_done)
49-
self._tasks.add(task)
50-
51-
@staticmethod
52-
def _on_connection_done(task):
53-
log.error("pythd connection closed")
54-
if not task.cancelled() and task.exception() is not None:
55-
e = task.exception()
56-
traceback.print_exception(None, e, e.__traceback__)
57-
sys.exit(1)
70+
self.client = await connect(self.address)
71+
72+
def _create_request(self, method: str, params: List[Any] | Any) -> JSONRPCRequest:
73+
self.id_counter += 1
74+
return JSONRPCRequest(
75+
id=self.id_counter,
76+
method=method,
77+
params=params,
78+
)
79+
80+
async def send_request(self, request: JSONRPCRequest) -> JSONRPCResponse:
81+
async with self.lock:
82+
await self.client.send(request.to_json())
83+
response = await self.client.recv()
84+
return JSONRPCResponse.from_json(response)
85+
86+
async def send_batch_request(
87+
self, requests: List[JSONRPCRequest]
88+
) -> List[JSONRPCResponse]:
89+
async with self.lock:
90+
await self.client.send(JSONRPCRequest.schema().dumps(requests, many=True))
91+
response = await self.client.recv()
92+
return JSONRPCResponse.schema().loads(response, many=True)
5893

5994
async def all_products(self) -> List[Product]:
60-
result = await self.server.get_product_list()
61-
return [Product.from_dict(d) for d in result]
95+
request = self._create_request("get_product_list", [])
96+
result = await self.send_request(request)
97+
if result.result:
98+
return Product.schema().load(result.result, many=True)
99+
else:
100+
raise ValueError(f"Error fetching products: {result.to_json()}")
62101

63-
async def update_price(
64-
self, account: str, price: int, conf: int, status: str
65-
) -> None:
66-
await self.server.update_price(
67-
account=account, price=price, conf=conf, status=status
68-
)
102+
async def update_price_batch(self, price_updates: List[PriceUpdate]) -> None:
103+
requests = [
104+
self._create_request("update_price", price_update.to_dict())
105+
for price_update in price_updates
106+
]
107+
results = await self.send_batch_request(requests)
108+
if any(result.error for result in results):
109+
results_json_str = JSONRPCResponse.schema().dumps(results, many=True)
110+
raise ValueError(f"Error updating prices: {results_json_str}")

0 commit comments

Comments
 (0)