Skip to content

Commit c90f52a

Browse files
Feat(observer): Detect artificially noisy stalled publisher prices (#81)
* feat(observer): improve stall detection check * fix(observer): revert changes made for local debugging * fix(observer): fix pyright errors * fix(observer): fix ci and pr comments * chore(observer): remove unneeded log * fix(observer): fix exact stall, add test * bump version
1 parent 1c4d802 commit c90f52a

10 files changed

+421
-71
lines changed

.gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ __pycache__/
66
.envrc
77
.coverage
88

9-
.env
9+
.env
10+
.vscode/

.python-version

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
3.10

README.md

+5-1
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,13 @@ Observe Pyth on-chain price feeds and run sanity checks on the data.
66

77
Container images are available at https://github.com/pyth-network/pyth-observer/pkgs/container/pyth-observer
88

9-
To run Observer locally, make sure you have a recent version of [Poetry](https://python-poetry.org) installed and run:
9+
To run Observer locally, you will need:
10+
- Python 3.10 ([pyenv](https://github.com/pyenv/pyenv) is a nice way to manage Python installs, and once installed will automatically set the version to 3.10 for this project dir via the `.python-version` file).
11+
- [Poetry](https://python-poetry.org), which handles package and virtualenv management.
1012

13+
Install dependencies and run the service:
1114
```sh
15+
$ poetry env use $(which python) # point Poetry to the pyenv python shim
1216
$ poetry install
1317
$ poetry run pyth-observer
1418
```

poetry.lock

+67-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ ignore_missing_imports = true
44

55
[tool.poetry]
66
name = "pyth-observer"
7-
version = "0.2.15"
7+
version = "0.3.0"
88
description = "Alerts and stuff"
99
authors = []
1010
readme = "README.md"
@@ -27,6 +27,7 @@ throttler = "1.2.1"
2727
types-pyyaml = "^6.0.12"
2828
types-pytz = "^2022.4.0.0"
2929
python-dotenv = "^1.0.1"
30+
numpy = "^2.1.3"
3031

3132

3233
[tool.poetry.group.dev.dependencies]

pyth_observer/check/publisher.py

+58-17
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,34 @@
11
import time
2-
from dataclasses import dataclass
2+
from collections import defaultdict, deque
3+
from dataclasses import asdict, dataclass
34
from datetime import datetime
45
from typing import Dict, Protocol, runtime_checkable
56
from zoneinfo import ZoneInfo
67

8+
from loguru import logger
79
from pythclient.calendar import is_market_open
810
from pythclient.pythaccounts import PythPriceStatus
911
from pythclient.solana import SolanaPublicKey
1012

13+
14+
@dataclass
15+
class PriceUpdate:
16+
"""Represents a single price with its timestamp (epoch seconds)."""
17+
18+
timestamp: int
19+
price: float
20+
21+
1122
PUBLISHER_EXCLUSION_DISTANCE = 25
23+
PUBLISHER_CACHE_MAX_LEN = 30
24+
"""Roughly 30 mins of updates, since the check runs about once a minute"""
1225

13-
PUBLISHER_CACHE = {}
26+
PUBLISHER_CACHE = defaultdict(lambda: deque(maxlen=PUBLISHER_CACHE_MAX_LEN))
27+
"""
28+
Cache that holds tuples of (price, timestamp) for publisher/feed combos as they stream in.
29+
Entries longer than `PUBLISHER_CACHE_MAX_LEN` are automatically pruned.
30+
Used by the PublisherStalledCheck to detect stalls in prices.
31+
"""
1432

1533

1634
@dataclass
@@ -240,6 +258,16 @@ def __init__(self, state: PublisherState, config: PublisherCheckConfig):
240258
self.__abandoned_time_limit: int = int(config["abandoned_time_limit"])
241259
self.__max_slot_distance: int = int(config["max_slot_distance"])
242260

261+
from pyth_observer.check.stall_detection import ( # noqa: deferred import to avoid circular import
262+
StallDetector,
263+
)
264+
265+
self.__detector = StallDetector(
266+
stall_time_limit=self.__stall_time_limit,
267+
noise_threshold=float(config["noise_threshold"]),
268+
min_noise_samples=int(config["min_noise_samples"]),
269+
)
270+
243271
def state(self) -> PublisherState:
244272
return self.__state
245273

@@ -254,36 +282,49 @@ def run(self) -> bool:
254282

255283
distance = self.__state.latest_block_slot - self.__state.slot
256284

285+
# Pass for redemption rates because they are expected to be static for long periods
286+
if self.__state.asset_type == "Crypto Redemption Rate":
287+
return True
288+
257289
# Pass when publisher is offline because PublisherOfflineCheck will be triggered
258290
if distance >= self.__max_slot_distance:
259291
return True
260292

261-
publisher_key = (self.__state.publisher_name, self.__state.symbol)
262293
current_time = int(time.time())
263-
previous_price, last_change_time = PUBLISHER_CACHE.get(
264-
publisher_key, (None, None)
265-
)
266294

267-
if previous_price is None or self.__state.price != previous_price:
268-
PUBLISHER_CACHE[publisher_key] = (self.__state.price, current_time)
269-
return True
295+
publisher_key = (self.__state.publisher_name, self.__state.symbol)
296+
updates = PUBLISHER_CACHE[publisher_key]
297+
298+
# Only cache new prices, let repeated prices grow stale.
299+
# These will be caught as an exact stall in the detector.
300+
is_repeated_price = updates and updates[-1].price == self.__state.price
301+
cur_update = PriceUpdate(current_time, self.__state.price)
302+
if not is_repeated_price:
303+
PUBLISHER_CACHE[publisher_key].append(cur_update)
304+
305+
# Analyze for stalls
306+
result = self.__detector.analyze_updates(list(updates), cur_update)
307+
logger.debug(f"Stall detection result: {result}")
270308

271-
time_since_last_change = current_time - last_change_time
272-
if time_since_last_change > self.__stall_time_limit:
273-
if time_since_last_change > self.__abandoned_time_limit:
274-
return True # Abandon this check after the abandoned time limit
275-
return False
309+
self.__last_analysis = result # For error logging
310+
311+
# If we've been stalled for too long, abandon this check
312+
if result.is_stalled and result.duration > self.__abandoned_time_limit:
313+
return True
276314

277-
return True
315+
return not result.is_stalled
278316

279317
def error_message(self) -> dict:
318+
stall_duration = f"{self.__last_analysis.duration:.1f} seconds"
280319
return {
281-
"msg": f"{self.__state.publisher_name} has been publishing the same price for too long.",
320+
"msg": f"{self.__state.publisher_name} has been publishing the same price of {self.__state.symbol} for {stall_duration}",
282321
"type": "PublisherStalledCheck",
283322
"publisher": self.__state.publisher_name,
284323
"symbol": self.__state.symbol,
285324
"price": self.__state.price,
286-
"stall_duration": f"{int(time.time()) - PUBLISHER_CACHE[(self.__state.publisher_name, self.__state.symbol)][1]} seconds",
325+
"stall_type": self.__last_analysis.stall_type,
326+
"stall_duration": stall_duration,
327+
"analysis": asdict(self.__last_analysis),
287328
}
288329

289330

0 commit comments

Comments
 (0)