11import asyncio
2- import datetime
32import json
43from collections import deque
54from pathlib import Path
@@ -89,15 +88,17 @@ class WeightSetter(AsyncLoopRunner):
8988 """The weight setter looks at RewardEvents in the reward_events queue and sets the weights of the miners accordingly."""
9089
9190 sync : bool = True
92- interval : int = 60 * 21
91+ interval : int = 60 * 23
9392 reward_events : list [list [WeightedRewardEvent ]] | None = None
9493 weight_dict : dict [int , list [float ]] | None = None
9594 weight_syncer : WeightSynchronizer | None = None
9695
96+ # Rewards moving average persistency.
9797 reward_history_path : Path = Path ("validator_rewards.jsonl" )
98- reward_history_len : int = 24
99- # List of uids info per each epoch, e.g.: [{1: {"reward": 1.0}, 2: {"reward": 3.0}}].
100- reward_history : deque [dict [int , dict [str , Any ]]] | None = None
98+ # Rewards moving average, 36 epochs = approx. 12 hours.
99+ reward_history_len : int = 36
100+ # List of uids info per epoch, e.g.: [{1: {"reward": 1.0, "hotkey": "ABC"}, 2: {"reward": 3.0, "hotkey": "XYZ"}}].
101+ reward_history : deque [dict [int , dict [str , float | str ]]] | None = None
101102
102103 class Config :
103104 arbitrary_types_allowed = True
@@ -120,17 +121,24 @@ async def _compute_avg_reward(self) -> npt.NDArray[np.float32]:
120121 num_uids = int (shared_settings .METAGRAPH .n .item ())
121122 accum = np .zeros (num_uids , dtype = np .float32 )
122123 if not isinstance (self .reward_history , deque ) or len (self .reward_history ) == 0 :
123- logger .warning (f"Empty rewards history, setting zero weights: { self .reward_history } " )
124+ logger .error (f"Empty rewards history, setting zero weights: { self .reward_history } " )
124125 return accum
125126
126- for snapshot in self .reward_history :
127- for uid_str , info in snapshot .items ():
128- accum [int (uid_str )] += float (info ["reward" ])
127+ # Get current active hotkeys for the current set of UIDs.
128+ active_hotkeys : dict [int , str ] = {}
129+ for uid_str , info in self .reward_history [- 1 ].items ():
130+ active_hotkeys [int (uid_str )] = info .get ("hotkey" )
131+
132+ # Accumulate rewards for each epoch only if hotkey was not changed for the given UID.
133+ for epoch_info in self .reward_history :
134+ for uid_str , info in epoch_info .items ():
135+ if active_hotkeys [int (uid_str )] == info .get ("hotkey" ):
136+ accum [int (uid_str )] += float (info ["reward" ])
129137
130138 avg = accum / len (self .reward_history )
131139 return avg
132140
133- async def _save_rewards (self , rewards : npt .NDArray [np .float32 ]):
141+ async def _update_rewards (self , rewards : npt .NDArray [np .float32 ]):
134142 """Persist the latest epoch rewards.
135143
136144 The snapshot is appended to `reward_history` (bounded by `reward_average_len`) and the JSONL file at
@@ -142,20 +150,22 @@ async def _save_rewards(self, rewards: npt.NDArray[np.float32]):
142150 if not isinstance (self .reward_history , deque ):
143151 self .reward_history = deque (maxlen = self .reward_history_len )
144152
145- snapshot = {int (uid ): {"reward" : float (r )} for uid , r in enumerate (rewards )}
146- self .reward_history .append (snapshot )
153+ hkeys = shared_settings .METAGRAPH .hotkeys
154+ epoch_rewards : dict [int , dict [str , float | str ]] = {}
155+ for uid , reward in enumerate (rewards ):
156+ epoch_rewards [int (uid )] = {"reward" : float (reward ), "hotkey" : hkeys [uid ]}
157+ self .reward_history .append (epoch_rewards )
147158
148- tmp_path = self .reward_history_path .with_suffix (".jsonl.tmp" )
149- block = getattr (shared_settings , "block" , 0 )
159+ # block = getattr(shared_settings, "block", 0)
150160
161+ # Write results into tmp file, them move to the main rewards file to make write operation atomic.
162+ tmp_path = self .reward_history_path .with_suffix (".jsonl.tmp" )
151163 try :
152164 with tmp_path .open ("w" , encoding = "utf-8" ) as file :
153- for snap in self .reward_history :
154- row : dict [str , Any ] = {
155- "ts" : datetime .datetime .now (datetime .timezone .utc ).isoformat (timespec = "seconds" ) + "Z" ,
156- "block" : block ,
157- "rewards" : {str (k ): v ["reward" ] for k , v in snap .items ()},
158- }
165+ for epoch_rewards in self .reward_history :
166+ row : dict [str , Any ] = {}
167+ for uid , info in epoch_rewards .items ():
168+ row [str (uid )] = {"reward" : float (info ["reward" ]), "hotkey" : info ["hotkey" ]}
159169 file .write (json .dumps (row , separators = ("," , ":" )) + "\n " )
160170 tmp_path .replace (self .reward_history_path )
161171 except Exception as exc :
@@ -166,7 +176,7 @@ async def _load_rewards(self):
166176
167177 Only the newest `reward_average_len` rows are retained.
168178 """
169- self .reward_history : deque [dict [int , dict [str , Any ]]] | None = deque (maxlen = self .reward_history_len )
179+ self .reward_history : deque [dict [int , dict [str , float | str ]]] | None = deque (maxlen = self .reward_history_len )
170180 if not self .reward_history_path .exists ():
171181 logger .info ("No rewards file found - starting with empty history." )
172182 return
@@ -175,13 +185,16 @@ async def _load_rewards(self):
175185 with self .reward_history_path .open ("r" , encoding = "utf-8" ) as file :
176186 for line in file :
177187 data = json .loads (line )
178- payload = data .get ("rewards" )
179- if payload is None :
188+ if not data :
180189 raise ValueError (f"Malformed weight history file: { data } " )
181190
182- self .reward_history .append ({int (uid ): {"reward" : float (reward )} for uid , reward in payload .items ()})
191+ epoch_rewards : dict [int , dict [str , float | str ]] = {}
192+ for uid , info in data .items ():
193+ epoch_rewards [int (uid )] = {"reward" : float (info ["reward" ]), "hotkey" : info .get ("hotkey" )}
194+
195+ self .reward_history .append (epoch_rewards )
183196 except BaseException as exc :
184- self .reward_history : deque [dict [int , dict [str , Any ]]] | None = deque (maxlen = self .reward_history_len )
197+ self .reward_history : deque [dict [int , dict [str , float | str ]]] | None = deque (maxlen = self .reward_history_len )
185198 logger .error (f"Couldn't load rewards from file, resetting weight history: { exc } " )
186199
187200 @classmethod
@@ -269,7 +282,7 @@ async def run_step(self):
269282 logger .error ("No rewards were found, skipping weight setting" )
270283 return
271284
272- await self ._save_rewards (final_rewards )
285+ await self ._update_rewards (final_rewards )
273286 averaged_rewards = await self ._compute_avg_reward ()
274287 averaged_rewards [averaged_rewards < 0 ] = 0
275288 averaged_rewards /= np .sum (averaged_rewards ) + 1e-10
0 commit comments