Skip to content

Commit 7d05f6c

Browse files
committed
feat(websocket): expose heartbeat events with improved implementation
Exposes WebSocket heartbeat events to allow users to monitor connection health for both public and authenticated channels. Changes: - Add 'heartbeat' event to BfxEventEmitter as a valid event type - Emit heartbeat events in WebSocket client and bucket instead of silently discarding - Consistent handler signature: receives Optional[Subscription] parameter - Public channels: subscription details - Authenticated (channel 0): None - Add comprehensive example with proper type hints and error handling - Update README with clear documentation and usage examples The handler signature is consistent across all subscription-based events, always passing the subscription as first parameter (None for authenticated). Lines of code: +101 insertions, -4 deletions (+97 net)
1 parent 791c845 commit 7d05f6c

File tree

5 files changed

+101
-4
lines changed

5 files changed

+101
-4
lines changed

README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,33 @@ The same can be done without using decorators:
238238
bfx.wss.on("candles_update", callback=on_candles_update)
239239
```
240240

241+
### Heartbeat events
242+
243+
The WebSocket server sends periodic heartbeat messages to keep connections alive.
244+
These are now exposed as `heartbeat` events that you can listen to:
245+
246+
```python
247+
from typing import Optional
248+
from bfxapi.websocket.subscriptions import Subscription
249+
250+
@bfx.wss.on("heartbeat")
251+
def on_heartbeat(subscription: Optional[Subscription]) -> None:
252+
if subscription:
253+
# Heartbeat for a specific subscription (public channels)
254+
channel = subscription["channel"]
255+
symbol = subscription.get("symbol", "N/A")
256+
print(f"Heartbeat for {channel}: {symbol}")
257+
else:
258+
# Heartbeat for authenticated connection (channel 0)
259+
print("Heartbeat on authenticated connection")
260+
```
261+
262+
**Note:** The heartbeat handler receives:
263+
- `subscription` parameter containing subscription details for public channel heartbeats
264+
- `None` for authenticated connection heartbeats (channel 0)
265+
266+
---
267+
241268
# Advanced features
242269

243270
## Using custom notifications

bfxapi/websocket/_client/bfx_websocket_bucket.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,11 @@ async def start(self) -> None:
6666
if (
6767
(chan_id := cast(int, message[0]))
6868
and (subscription := self.__subscriptions.get(chan_id))
69-
and (message[1] != Connection._HEARTBEAT)
7069
):
71-
self.__handler.handle(subscription, message[1:])
70+
if message[1] == Connection._HEARTBEAT:
71+
self.__event_emitter.emit("heartbeat", subscription)
72+
else:
73+
self.__handler.handle(subscription, message[1:])
7274

7375
def __on_subscribed(self, message: Dict[str, Any]) -> None:
7476
chan_id = cast(int, message["chan_id"])

bfxapi/websocket/_client/bfx_websocket_client.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,9 +266,11 @@ async def __connect(self) -> None:
266266
if (
267267
isinstance(message, list)
268268
and message[0] == 0
269-
and message[1] != Connection._HEARTBEAT
270269
):
271-
self.__handler.handle(message[1], message[2])
270+
if message[1] == Connection._HEARTBEAT:
271+
self.__event_emitter.emit("heartbeat", None)
272+
else:
273+
self.__handler.handle(message[1], message[2])
272274

273275
async def __new_bucket(self) -> BfxWebSocketBucket:
274276
bucket = BfxWebSocketBucket(self._host, self.__event_emitter)

bfxapi/websocket/_event_emitter/bfx_event_emitter.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
_COMMON = [
3434
"disconnected",
35+
"heartbeat",
3536
"t_ticker_update",
3637
"f_ticker_update",
3738
"t_trade_execution",

examples/websocket/heartbeat.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
"""
2+
Demonstrates heartbeat event handling for both public and authenticated WebSocket connections.
3+
4+
Usage:
5+
python examples/websocket/heartbeat.py
6+
7+
If BFX_API_KEY and BFX_API_SECRET environment variables are set, the client will
8+
authenticate and display heartbeats on the authenticated connection (channel 0).
9+
Otherwise, only public channel heartbeats are shown.
10+
"""
11+
12+
import os
13+
from typing import Any, Dict, Optional
14+
15+
from bfxapi import Client
16+
from bfxapi.websocket.subscriptions import Subscription
17+
18+
19+
# Initialize client with optional authentication
20+
api_key = os.getenv("BFX_API_KEY")
21+
api_secret = os.getenv("BFX_API_SECRET")
22+
23+
if api_key and api_secret:
24+
print("Initializing authenticated client...")
25+
bfx = Client(api_key=api_key, api_secret=api_secret)
26+
else:
27+
print("Initializing public client (set BFX_API_KEY and BFX_API_SECRET for auth)...")
28+
bfx = Client()
29+
30+
31+
@bfx.wss.on("heartbeat")
32+
def on_heartbeat(subscription: Optional[Subscription]) -> None:
33+
"""Handle heartbeat events from both public and authenticated channels."""
34+
if subscription:
35+
channel = subscription["channel"]
36+
label = subscription.get("symbol", subscription.get("key", "unknown"))
37+
print(f"Heartbeat for {channel}: {label}")
38+
else:
39+
print("Heartbeat on authenticated connection (channel 0)")
40+
41+
42+
@bfx.wss.on("authenticated")
43+
async def on_authenticated(event: Dict[str, Any]) -> None:
44+
"""Handle authentication confirmation."""
45+
user_id = event.get("userId", "unknown")
46+
print(f"Successfully authenticated with userId: {user_id}")
47+
48+
49+
@bfx.wss.on("open")
50+
async def on_open() -> None:
51+
"""Subscribe to public channels when connection opens."""
52+
print("WebSocket connection opened")
53+
# Subscribe to a public channel to observe subscription heartbeats
54+
await bfx.wss.subscribe("ticker", symbol="tBTCUSD")
55+
print("Subscribed to ticker channel for tBTCUSD")
56+
57+
58+
if __name__ == "__main__":
59+
print("Starting WebSocket client... Press Ctrl+C to stop.")
60+
try:
61+
bfx.wss.run()
62+
except KeyboardInterrupt:
63+
print("\nShutting down gracefully...")
64+
except Exception as e:
65+
print(f"Error: {e}")

0 commit comments

Comments
 (0)