Skip to content

Commit 8f268b9

Browse files
authored
Merge pull request #777 from macrocosm-os/staging
v2.19.9: - Add expiration period for the queued responses. - Fix rewards averaging.
2 parents 5c8df7a + 2f35eb1 commit 8f268b9

File tree

6 files changed

+76
-13
lines changed

6 files changed

+76
-13
lines changed

prompting/rewards/scoring.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import copy
33
import threading
4+
import time
45
from multiprocessing.managers import AcquirerProxy
56

67
from loguru import logger
@@ -30,6 +31,7 @@ class TaskScorer(AsyncLoopRunner):
3031
scoring_queue: list | None = None
3132
reward_events: list | None = None
3233
task_queue: list | None = None
34+
expiry_time: int = 60 * 60 * 20
3335
model_config = ConfigDict(arbitrary_types_allowed=True)
3436

3537
async def start(
@@ -70,12 +72,22 @@ def add_to_queue(
7072
async def run_step(self) -> RewardLoggingEvent:
7173
await asyncio.sleep(0.1)
7274

73-
if not self.scoring_queue:
75+
scoring_config: ScoringConfig | None = None
76+
while self.scoring_queue:
77+
# Pop the oldest item from the queue.
78+
config = self.scoring_queue.pop(0)
79+
# Check if the config is recent enough to be processed.
80+
if config.created_at >= time.time() - self.expiry_time:
81+
scoring_config = config
82+
break
83+
# Otherwise, the old config is discarded and we continue to the next one.
84+
else:
85+
logger.debug(
86+
f"Discarding old scoring config for {config.task.__class__.__name__} created at {config.created_at}"
87+
)
88+
if not scoring_config:
7489
return
7590

76-
# TODO: Filter based on active models before selecting an item to score.
77-
scoring_config: ScoringConfig = self.scoring_queue.pop(0)
78-
7991
# here we generate the actual reference
8092
with Timer(label=f"Generating reference for {scoring_config.task.__class__.__name__}"):
8193
await scoring_config.task.make_reference(

prompting/rewards/scoring_config.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
from dataclasses import dataclass
1+
import time
2+
from dataclasses import dataclass, field
23

34
from prompting.tasks.base_task import BaseTextTask
45
from shared.base import DatasetEntry
@@ -13,3 +14,4 @@ class ScoringConfig:
1314
block: int
1415
step: int
1516
task_id: str
17+
created_at: float = field(default_factory=time.time)

prompting/weight_setting/weight_setter.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424

2525
async def set_weights(
26-
weights: np.ndarray,
26+
weights: npt.NDArray[np.float32],
2727
subtensor: bt.Subtensor | None = None,
2828
metagraph: bt.Metagraph | None = None,
2929
weight_syncer: WeightSynchronizer | None = None,
@@ -115,6 +115,21 @@ async def start(
115115
await self._load_rewards()
116116
return await super().start(name=name)
117117

118+
async def _compute_avg_reward(self) -> npt.NDArray[np.float32]:
119+
"""Compute reward average based on the `reward_history` and `reward_average_len` window."""
120+
num_uids = int(shared_settings.METAGRAPH.n.item())
121+
accum = np.zeros(num_uids, dtype=np.float32)
122+
if not isinstance(self.reward_history, deque) or len(self.reward_history) == 0:
123+
logger.warning(f"Empty rewards history, setting zero weights: {self.reward_history}")
124+
return accum
125+
126+
for snapshot in self.reward_history:
127+
for uid_str, info in snapshot.items():
128+
accum[int(uid_str)] += float(info["reward"])
129+
130+
avg = accum / len(self.reward_history)
131+
return avg
132+
118133
async def _save_rewards(self, rewards: npt.NDArray[np.float32]):
119134
"""Persist the latest epoch rewards.
120135
@@ -255,14 +270,16 @@ async def run_step(self):
255270
return
256271

257272
await self._save_rewards(final_rewards)
258-
final_rewards[final_rewards < 0] = 0
259-
final_rewards /= np.sum(final_rewards) + 1e-10
273+
averaged_rewards = await self._compute_avg_reward()
274+
averaged_rewards[averaged_rewards < 0] = 0
275+
averaged_rewards /= np.sum(averaged_rewards) + 1e-10
260276
except BaseException as ex:
261277
logger.exception(f"{ex}")
278+
return
262279

263280
# Set weights on chain.
264281
await set_weights(
265-
final_rewards,
282+
averaged_rewards,
266283
subtensor=shared_settings.SUBTENSOR,
267284
metagraph=shared_settings.metagraph_force_sync(),
268285
weight_syncer=self.weight_syncer,

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "prompting"
3-
version = "2.19.8"
3+
version = "2.19.9"
44
description = "Subnetwork 1 runs on Bittensor and is maintained by Macrocosmos. It's an effort to create decentralised AI"
55
authors = ["Kalei Brady, Dmytro Bobrenko, Felix Quinque, Steffen Cruz, Richard Wardle"]
66
readme = "README.md"

tests/prompting/weight_setting/test_weight_setter.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# ruff: noqa: E402
22
import asyncio
3+
from collections import deque
34
from pathlib import Path
45
from types import SimpleNamespace
56
from unittest.mock import MagicMock, patch
@@ -90,7 +91,7 @@ def test_steepness():
9091
assert result[0] < 0, "Negative reward should remain negative"
9192

9293

93-
def test_run_step_with_reward_events():
94+
def test_run_step_with_reward_events(tmp_path: Path):
9495
with (
9596
patch("shared.uids.get_uids") as mock_get_uids,
9697
patch("prompting.weight_setting.weight_setter.TaskRegistry") as MockTaskRegistry,
@@ -126,7 +127,7 @@ def __init__(self, task, uids, rewards, weight):
126127

127128
# Set up the mock mutable_globals.
128129

129-
weight_setter = WeightSetter(reward_history_path=Path("test_validator_rewards.jsonl"))
130+
weight_setter = WeightSetter(reward_history_path=tmp_path / "test_validator_rewards.jsonl")
130131
reward_events = [
131132
[
132133
WeightedRewardEvent(
@@ -165,6 +166,37 @@ def __init__(self, task, uids, rewards, weight):
165166
mock_logger.warning.assert_not_called()
166167

167168

169+
def _make_snapshot(values: list[float]) -> dict[int, dict[str, float]]:
170+
return {uid: {"reward": v} for uid, v in enumerate(values)}
171+
172+
173+
@pytest.mark.asyncio
174+
async def test_avg_reward_non_empty(tmp_path: Path) -> None:
175+
"""Mean over two snapshots equals manual average."""
176+
ws = WeightSetter(reward_history_path=tmp_path / "test_validator_rewards.jsonl")
177+
ws.reward_history_len = 10
178+
ws.reward_history = deque(maxlen=10)
179+
rewards = list(range(256))
180+
ws.reward_history.append(_make_snapshot(rewards))
181+
ws.reward_history.append(_make_snapshot(rewards[::-1]))
182+
183+
result = await ws._compute_avg_reward()
184+
185+
expected = np.full(256, 255 / 2, dtype=np.float32)
186+
assert result.dtype == np.float32
187+
assert np.allclose(result, expected, atol=1e-6)
188+
189+
190+
@pytest.mark.asyncio
191+
async def test_avg_reward_empty(monkeypatch: MonkeyPatch, tmp_path: Path) -> None:
192+
"""Empty history returns a zero vector."""
193+
ws = WeightSetter(reward_history_path=tmp_path / "test_validator_rewards.jsonl")
194+
ws.reward_history_len = 10
195+
ws.reward_history = deque(maxlen=10)
196+
result = await ws._compute_avg_reward()
197+
assert np.array_equal(result, np.zeros(256, dtype=np.float32))
198+
199+
168200
@pytest.mark.asyncio
169201
async def test_set_weights(monkeypatch: MonkeyPatch):
170202
"""`set_weights` calls Subtensor.set_weights with processed vectors."""

validator_api/chat_completion.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ async def chat_completion(
237237
uids: Optional[list[int]] = None,
238238
num_miners: int = 5,
239239
uid_tracker: UidTracker | None = None,
240-
add_reliable_miners: int = 1,
240+
add_reliable_miners: int = 3,
241241
) -> tuple | StreamingResponse:
242242
# TODO: Add docstring.
243243
"""Handle chat completion with multiple miners in parallel."""

0 commit comments

Comments
 (0)