Skip to content

Commit 2b9a7ce

Browse files
authored
v2.15.4
v2.15.4
2 parents 0355e52 + da461aa commit 2b9a7ce

File tree

11 files changed

+138
-62
lines changed

11 files changed

+138
-62
lines changed

neurons/validator.py

Lines changed: 71 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,20 @@
1-
# ruff: noqa: E402
2-
from shared import settings
3-
4-
settings.shared_settings = settings.SharedSettings.load(mode="validator")
5-
shared_settings = settings.shared_settings
6-
71
import asyncio
82
import multiprocessing as mp
3+
import sys
94
import time
105

116
import loguru
127
import torch
8+
import wandb
9+
10+
# ruff: noqa: E402
11+
from shared import settings
12+
13+
shared_settings = settings.shared_settings
14+
settings.shared_settings = settings.SharedSettings.load(mode="validator")
15+
1316

14-
from prompting.api.api import start_scoring_api
15-
from prompting.llms.model_manager import model_scheduler
1617
from prompting.llms.utils import GPUInfo
17-
from prompting.miner_availability.miner_availability import availability_checking_loop
18-
from prompting.rewards.scoring import task_scorer
19-
from prompting.tasks.task_creation import task_loop
20-
from prompting.tasks.task_sending import task_sender
21-
from prompting.weight_setting.weight_setter import weight_setter
22-
from shared.profiling import profiler
2318

2419
# Add a handler to write logs to a file
2520
loguru.logger.add("logfile.log", rotation="1000 MB", retention="10 days", level="DEBUG")
@@ -32,8 +27,34 @@
3227

3328
def create_loop_process(task_queue, scoring_queue, reward_events):
3429
async def spawn_loops(task_queue, scoring_queue, reward_events):
30+
# ruff: noqa: E402
31+
wandb.setup()
32+
from shared import settings
33+
34+
settings.shared_settings = settings.SharedSettings.load(mode="validator")
35+
36+
from prompting.llms.model_manager import model_scheduler
37+
from prompting.miner_availability.miner_availability import availability_checking_loop
38+
from prompting.rewards.scoring import task_scorer
39+
from prompting.tasks.task_creation import task_loop
40+
from prompting.tasks.task_sending import task_sender
41+
from prompting.weight_setting.weight_setter import weight_setter
42+
from shared.profiling import profiler
43+
3544
logger.info("Starting Profiler...")
3645
asyncio.create_task(profiler.print_stats(), name="Profiler"),
46+
47+
# -------- Duplicate of create_task_loop ----------
48+
logger.info("Starting AvailabilityCheckingLoop...")
49+
asyncio.create_task(availability_checking_loop.start())
50+
51+
logger.info("Starting TaskSender...")
52+
asyncio.create_task(task_sender.start(task_queue, scoring_queue))
53+
54+
logger.info("Starting TaskLoop...")
55+
asyncio.create_task(task_loop.start(task_queue, scoring_queue))
56+
# -------------------------------------------------
57+
3758
logger.info("Starting ModelScheduler...")
3859
asyncio.create_task(model_scheduler.start(scoring_queue), name="ModelScheduler"),
3960
logger.info("Starting TaskScorer...")
@@ -62,6 +83,8 @@ async def spawn_loops(task_queue, scoring_queue, reward_events):
6283

6384
def start_api():
6485
async def start():
86+
from prompting.api.api import start_scoring_api # noqa: F401
87+
6588
await start_scoring_api()
6689
while True:
6790
await asyncio.sleep(10)
@@ -70,21 +93,21 @@ async def start():
7093
asyncio.run(start())
7194

7295

73-
def create_task_loop(task_queue, scoring_queue):
74-
async def start(task_queue, scoring_queue):
75-
logger.info("Starting AvailabilityCheckingLoop...")
76-
asyncio.create_task(availability_checking_loop.start())
96+
# def create_task_loop(task_queue, scoring_queue):
97+
# async def start(task_queue, scoring_queue):
98+
# logger.info("Starting AvailabilityCheckingLoop...")
99+
# asyncio.create_task(availability_checking_loop.start())
77100

78-
logger.info("Starting TaskSender...")
79-
asyncio.create_task(task_sender.start(task_queue, scoring_queue))
101+
# logger.info("Starting TaskSender...")
102+
# asyncio.create_task(task_sender.start(task_queue, scoring_queue))
80103

81-
logger.info("Starting TaskLoop...")
82-
asyncio.create_task(task_loop.start(task_queue, scoring_queue))
83-
while True:
84-
await asyncio.sleep(10)
85-
logger.debug("Running task loop...")
104+
# logger.info("Starting TaskLoop...")
105+
# asyncio.create_task(task_loop.start(task_queue, scoring_queue))
106+
# while True:
107+
# await asyncio.sleep(10)
108+
# logger.debug("Running task loop...")
86109

87-
asyncio.run(start(task_queue, scoring_queue))
110+
# asyncio.run(start(task_queue, scoring_queue))
88111

89112

90113
async def main():
@@ -109,23 +132,38 @@ async def main():
109132
loop_process = mp.Process(
110133
target=create_loop_process, args=(task_queue, scoring_queue, reward_events), name="LoopProcess"
111134
)
112-
task_loop_process = mp.Process(
113-
target=create_task_loop, args=(task_queue, scoring_queue), name="TaskLoopProcess"
114-
)
135+
# task_loop_process = mp.Process(
136+
# target=create_task_loop, args=(task_queue, scoring_queue), name="TaskLoopProcess"
137+
# )
115138
loop_process.start()
116-
task_loop_process.start()
139+
# task_loop_process.start()
117140
processes.append(loop_process)
118-
processes.append(task_loop_process)
141+
# processes.append(task_loop_process)
119142
GPUInfo.log_gpu_info()
120143

144+
step = 0
121145
while True:
122-
await asyncio.sleep(10)
123-
logger.debug("Running...")
146+
await asyncio.sleep(30)
147+
if (
148+
shared_settings.SUBTENSOR.get_current_block()
149+
- shared_settings.METAGRAPH.last_update[shared_settings.UID]
150+
> 500
151+
and step > 120
152+
):
153+
logger.warning(
154+
f"UPDATES HAVE STALED FOR {shared_settings.SUBTENSOR.get_current_block() - shared_settings.METAGRAPH.last_update[shared_settings.UID]} BLOCKS AND {step} STEPS"
155+
)
156+
logger.warning(
157+
f"STALED: {shared_settings.SUBTENSOR.get_current_block()}, {shared_settings.METAGRAPH.block}"
158+
)
159+
sys.exit(1)
160+
step += 1
124161

125162
except Exception as e:
126163
logger.error(f"Main loop error: {e}")
127164
raise
128165
finally:
166+
wandb.teardown()
129167
# Clean up processes
130168
for process in processes:
131169
if process.is_alive():

prompting/rewards/scoring.py

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import asyncio
22
import threading
3-
from dataclasses import dataclass
43

54
from loguru import logger
65
from pydantic import ConfigDict
76

87
from prompting.llms.model_manager import model_manager, model_scheduler
8+
from prompting.rewards.scoring_config import ScoringConfig
99
from prompting.tasks.base_task import BaseTextTask
1010
from prompting.tasks.task_registry import TaskRegistry
1111
from shared.base import DatasetEntry
@@ -14,16 +14,6 @@
1414
from shared.loop_runner import AsyncLoopRunner
1515

1616

17-
@dataclass
18-
class ScoringConfig:
19-
task: BaseTextTask
20-
response: DendriteResponseEvent
21-
dataset_entry: DatasetEntry
22-
block: int
23-
step: int
24-
task_id: str
25-
26-
2717
class TaskScorer(AsyncLoopRunner):
2818
"""The scoring manager maintains a queue of tasks & responses to score and then runs a scoring loop in a background thread.
2919
This scoring loop will score the responses once the LLM needed is loaded in the model_manager and log the rewards.
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from dataclasses import dataclass
2+
3+
from prompting.tasks.base_task import BaseTextTask
4+
from shared.base import DatasetEntry
5+
from shared.dendrite import DendriteResponseEvent
6+
7+
8+
@dataclass
9+
class ScoringConfig:
10+
task: BaseTextTask
11+
response: DendriteResponseEvent
12+
dataset_entry: DatasetEntry
13+
block: int
14+
step: int
15+
task_id: str

prompting/tasks/task_creation.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66

77
from prompting.miner_availability.miner_availability import miner_availabilities
88
from prompting.tasks.task_registry import TaskRegistry
9-
from shared.logging import ErrorLoggingEvent, ValidatorLoggingEvent
9+
10+
# from shared.logging import ErrorLoggingEvent, ValidatorLoggingEvent
1011
from shared.loop_runner import AsyncLoopRunner
1112
from shared.settings import shared_settings
1213

@@ -26,7 +27,7 @@ async def start(self, task_queue, scoring_queue):
2627
self.scoring_queue = scoring_queue
2728
await super().start()
2829

29-
async def run_step(self) -> ValidatorLoggingEvent | ErrorLoggingEvent | None:
30+
async def run_step(self):
3031
if len(self.task_queue) > shared_settings.TASK_QUEUE_LENGTH_THRESHOLD:
3132
logger.debug("Task queue is full. Skipping task generation.")
3233
return None

prompting/tasks/task_sending.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from prompting.miner_availability.miner_availability import miner_availabilities
1010

1111
# from prompting.rewards.scoring import task_scorer
12-
from prompting.rewards.scoring import ScoringConfig
12+
from prompting.rewards.scoring_config import ScoringConfig
1313
from prompting.tasks.base_task import BaseTextTask
1414
from prompting.tasks.inference import InferenceTask
1515
from shared.dendrite import DendriteResponseEvent, SynapseStreamResult
@@ -66,6 +66,9 @@ async def collect_responses(task: BaseTextTask) -> DendriteResponseEvent | None:
6666
response_event = DendriteResponseEvent(
6767
stream_results=stream_results,
6868
uids=uids,
69+
axons=[
70+
shared_settings.METAGRAPH.axons[x].ip + ":" + str(shared_settings.METAGRAPH.axons[x].port) for x in uids
71+
],
6972
timeout=(
7073
shared_settings.INFERENCE_TIMEOUT if isinstance(task, InferenceTask) else shared_settings.NEURON_TIMEOUT
7174
),

prompting/weight_setting/weight_setter.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ class WeightSetter(AsyncLoopRunner):
137137
"""The weight setter looks at RewardEvents in the reward_events queue and sets the weights of the miners accordingly."""
138138

139139
sync: bool = True
140-
interval: int = 60 * 22 # set rewards every 20 minutes
140+
interval: int = 60 * 25 # set rewards every 25 minutes
141141
reward_events: list[list[WeightedRewardEvent]] | None = None
142142
subtensor: bt.Subtensor | None = None
143143
metagraph: bt.Metagraph | None = None
@@ -240,10 +240,8 @@ async def run_step(self):
240240
set_weights(
241241
final_rewards, step=self.step, subtensor=shared_settings.SUBTENSOR, metagraph=shared_settings.METAGRAPH
242242
)
243-
self.reward_events = [] # empty reward events queue
244-
logger.debug(f"Pre-Refresh Metagraph Block: {shared_settings.METAGRAPH.block}")
245-
shared_settings.refresh_metagraph()
246-
logger.debug(f"Post-Refresh Metagraph Block: {shared_settings.METAGRAPH.block}")
243+
# TODO: empty rewards queue only on weight setting success
244+
self.reward_events[:] = [] # empty reward events queue
247245
await asyncio.sleep(0.01)
248246
return final_rewards
249247

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "prompting"
3-
version = "2.15.3"
3+
version = "2.15.4"
44
description = "Subnetwork 1 runs on Bittensor and is maintained by Macrocosmos. It's an effort to create decentralised AI"
55
authors = ["Kalei Brady, Dmytro Bobrenko, Felix Quinque, Steffen Cruz, Richard Wardle"]
66
readme = "README.md"

shared/dendrite.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def model_dump(self):
3535

3636
class DendriteResponseEvent(BaseModel):
3737
uids: np.ndarray | list[float]
38+
axons: list[str]
3839
timeout: float
3940
stream_results: list[SynapseStreamResult]
4041
completions: list[str] = []

shared/epistula.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ async def query_miners(uids, body: dict[str, Any]):
124124
exceptions = [resp for resp in responses if isinstance(resp, Exception)]
125125
if exceptions:
126126
for exc in exceptions:
127-
logger.error(f"Error in make_openai_query: {exc}")
127+
logger.debug(f"Error in make_openai_query: {exc}")
128128

129129
# 'responses' is a list of SynapseStreamResult objects
130130
results = []

shared/misc.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,3 +138,34 @@ def serialize_exception_to_string(e):
138138
return serialized_str
139139
else:
140140
return e
141+
142+
143+
def cached_property_with_expiration(expiration_seconds=1200):
144+
"""
145+
Decorator that caches the property's value for `expiration_seconds` seconds.
146+
After this duration, the cached value is refreshed.
147+
"""
148+
149+
def decorator(func):
150+
attr_name = f"_cached_{func.__name__}"
151+
152+
@property
153+
def wrapper(self):
154+
now = time.time()
155+
156+
# Check if we have a cached value and if it's still valid
157+
if hasattr(self, attr_name):
158+
cached_value, timestamp = getattr(self, attr_name)
159+
160+
# If valid, return cached value
161+
if now - timestamp < expiration_seconds:
162+
return cached_value
163+
164+
# Otherwise, compute the new value and cache it
165+
value = func(self)
166+
setattr(self, attr_name, (value, now))
167+
return value
168+
169+
return wrapper
170+
171+
return decorator

0 commit comments

Comments
 (0)