Skip to content

Commit 4e53387

Browse files
committed
refactor: use batching for update_price
1 parent a591d8a commit 4e53387

File tree

3 files changed

+158
-127
lines changed

3 files changed

+158
-127
lines changed

example_publisher/pythd.py

Lines changed: 68 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
from dataclasses import dataclass, field
2-
import sys
3-
import traceback
42
from dataclasses_json import config, DataClassJsonMixin
5-
from typing import List
3+
from typing import List, Any, Optional
64
from structlog import get_logger
7-
from jsonrpc_websocket import Server
5+
from websockets.client import connect, WebSocketClientProtocol
6+
from asyncio import Lock
87

98
log = get_logger()
109

@@ -20,6 +19,14 @@ class Price(DataClassJsonMixin):
2019
exponent: int = field(metadata=config(field_name="price_exponent"))
2120

2221

22+
@dataclass
23+
class PriceUpdate(DataClassJsonMixin):
24+
account: str
25+
price: int
26+
conf: int
27+
status: str
28+
29+
2330
@dataclass
2431
class Metadata(DataClassJsonMixin):
2532
symbol: str
@@ -33,36 +40,71 @@ class Product(DataClassJsonMixin):
3340
prices: List[Price] = field(metadata=config(field_name="price"))
3441

3542

43+
@dataclass
44+
class JSONRPCRequest(DataClassJsonMixin):
45+
id: int
46+
method: str
47+
params: List[Any] | Any
48+
jsonrpc: str = "2.0"
49+
50+
51+
@dataclass
52+
class JSONRPCResponse(DataClassJsonMixin):
53+
id: int
54+
result: Optional[Any]
55+
error: Optional[Any]
56+
jsonrpc: str = "2.0"
57+
58+
3659
class Pythd:
3760
def __init__(
3861
self,
3962
address: str,
4063
) -> None:
4164
self.address = address
42-
self.server: Server
43-
self._tasks = set()
65+
self.client: WebSocketClientProtocol
66+
self.id_counter = 0
67+
self.lock = Lock()
4468

4569
async def connect(self):
46-
self.server = Server(self.address)
47-
task = await self.server.ws_connect()
48-
task.add_done_callback(Pythd._on_connection_done)
49-
self._tasks.add(task)
50-
51-
@staticmethod
52-
def _on_connection_done(task):
53-
log.error("pythd connection closed")
54-
if not task.cancelled() and task.exception() is not None:
55-
e = task.exception()
56-
traceback.print_exception(None, e, e.__traceback__)
57-
sys.exit(1)
70+
self.client = await connect(self.address)
71+
72+
def _create_request(self, method: str, params: List[Any] | Any) -> JSONRPCRequest:
73+
self.id_counter += 1
74+
return JSONRPCRequest(
75+
id=self.id_counter,
76+
method=method,
77+
params=params,
78+
)
79+
80+
async def send_request(self, request: JSONRPCRequest) -> JSONRPCResponse:
81+
async with self.lock:
82+
await self.client.send(request.to_json())
83+
response = await self.client.recv()
84+
return JSONRPCResponse.from_json(response)
85+
86+
async def send_batch_request(
87+
self, requests: List[JSONRPCRequest]
88+
) -> List[JSONRPCResponse]:
89+
async with self.lock:
90+
await self.client.send(JSONRPCRequest.schema().dumps(requests, many=True))
91+
response = await self.client.recv()
92+
return JSONRPCResponse.schema().loads(response, many=True)
5893

5994
async def all_products(self) -> List[Product]:
60-
result = await self.server.get_product_list()
61-
return [Product.from_dict(d) for d in result]
95+
request = self._create_request("get_product_list", [])
96+
result = await self.send_request(request)
97+
if result.result:
98+
return Product.schema().load(result.result, many=True)
99+
else:
100+
raise ValueError(f"Error fetching products: {result.to_json()}")
62101

63-
async def update_price(
64-
self, account: str, price: int, conf: int, status: str
65-
) -> None:
66-
await self.server.update_price(
67-
account=account, price=price, conf=conf, status=status
68-
)
102+
async def update_price_batch(self, price_updates: List[PriceUpdate]) -> None:
103+
requests = [
104+
self._create_request("update_price", price_update.to_dict())
105+
for price_update in price_updates
106+
]
107+
results = await self.send_batch_request(requests)
108+
if any(result.error for result in results):
109+
results_json_str = JSONRPCResponse.schema().dumps(results, many=True)
110+
raise ValueError(f"Error updating prices: {results_json_str}")

0 commit comments

Comments
 (0)