Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions logicnet/base/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
import torch
import asyncio
import threading
import time
import bittensor as bt

from typing import List
from traceback import print_exception

from logicnet.base.neuron import BaseNeuron



class BaseValidatorNeuron(BaseNeuron):
"""
Base class for Bittensor validators. Your validator should inherit from this class.
Expand All @@ -21,6 +22,9 @@ def __init__(self, config=None):
# Save a copy of the hotkeys to local memory.
self.hotkeys = copy.deepcopy(self.metagraph.hotkeys)

# Save a copy of the new hotkeys to local memory.
self.new_hotkeys = []

# Dendrite lets us send messages to other nodes (axons) in the network.
self.dendrite = bt.dendrite(wallet=self.wallet)
bt.logging.info(f"\033[1;32m🔗 Dendrite: {self.dendrite}\033[0m")
Expand Down Expand Up @@ -251,9 +255,10 @@ def resync_metagraph(self):
bt.logging.info(
"\033[1;32m🔄 Metagraph updated, re-syncing hotkeys, dendrite pool and moving averages\033[0m"
)
# Zero out all hotkeys that have been replaced.

# Zero out all hotkeys that have been replaced and add them to the new hotkeys list.
for uid, hotkey in enumerate(self.hotkeys):
if (hotkey != self.metagraph.hotkeys[uid]):
if hotkey != self.metagraph.hotkeys[uid]:
self.scores[uid] = 0 # hotkey has been replaced

# Check to see if the metagraph has changed size.
Expand Down
5 changes: 5 additions & 0 deletions logicnet/utils/func_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
def linear_function(x, m=1, b=0):
"""
Computes the value of a linear function f(x) = mx + b
"""
return m * x + b
26 changes: 18 additions & 8 deletions logicnet/validator/miner_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,23 @@ class MinerManager:
def __init__(self, validator):
self.validator = validator
self.all_uids = [int(uid.item()) for uid in self.validator.metagraph.uids]
# Ensure all entries are MinerInfo objects
self.all_uids_info = {uid: MinerInfo() for uid in self.all_uids}

def to_dict(self):
return {uid: info.to_dict() for uid, info in self.all_uids_info.items()}
"""Convert miner info to dictionary format, ensuring all entries are MinerInfo objects"""
result = {}
for uid, info in self.all_uids_info.items():
if isinstance(info, dict):
# Convert dict to MinerInfo object if needed
info = MinerInfo(**info)
self.all_uids_info[uid] = info
elif not isinstance(info, MinerInfo):
# Create new MinerInfo object if invalid type
info = MinerInfo()
self.all_uids_info[uid] = info
result[uid] = info.to_dict()
return result

def get_miner_info(self):
"""
Expand Down Expand Up @@ -101,10 +114,7 @@ def update_miners_identity(self):
miner_distribution = {}
for uid, info in valid_miners_info.items():
# info = self.all_uids_info[int(uid)] if int(uid) in self.all_uids_info else MinerInfo(**info)
miner_state = self.all_uids_info.setdefault(
uid,
{"scores": [], "reward_logs": []},
)
miner_state = self.all_uids_info.setdefault(uid, MinerInfo())
miner_state.category = info.get("category", "")
miner_state.epoch_volume = info.get("epoch_volume") if info.get("epoch_volume") else 512
info = miner_state
Expand Down Expand Up @@ -139,9 +149,9 @@ def get_miner_uids(self, category: str):
Get miner uids based on category, useful if subnet has multiple categories
"""
available_uids = [
int(uid)
for uid in self.all_uids_info.keys()
if self.all_uids_info[uid].category == category
uid for uid in self.all_uids_info.keys()
if isinstance(self.all_uids_info[uid], MinerInfo) and
self.all_uids_info[uid].category == category
]
return available_uids

Expand Down
71 changes: 65 additions & 6 deletions logicnet/validator/rewarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,27 @@
import openai
import sympy
import random
import time
import bittensor as bt
from concurrent import futures
from logicnet.protocol import LogicSynapse
from sentence_transformers import SentenceTransformer
from logicnet.utils.model_selector import model_selector
from logicnet.utils.regex_helper import extract_numbers
from logicnet.validator.prompt import DETECT_TRICK_TEMPLATE, CORRECTNESS_TEMPLATE, EXTRACT_ANSWER_PROMPT
from logicnet.utils.func_helper import linear_function

SIMILARITY_WEIGHT = 0.3
CORRECTNESS_WEIGHT = 0.7
PROCESSING_TIME_WEIGHT = -0.05
ELIGIBLE_TIMEOUT = 604800 # 7 days


class MinerInfo:
def __init__(self, uid: str, time: float):
self.uid = uid
self.time = time


class LogicRewarder:
def __init__(self, model_rotation_pool: dict):
Expand All @@ -24,7 +32,7 @@ def __init__(self, model_rotation_pool: dict):
self.model_rotation_pool = model_rotation_pool
self.embedder = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")

def __call__(self, uids, responses: list[LogicSynapse], base_synapse: LogicSynapse):
def __call__(self, uids, responses: list[LogicSynapse], base_synapse: LogicSynapse, new_miners: list[MinerInfo]):
"""Calculate reward for each response using similarity, correctness, and processing time.

Args:
Expand Down Expand Up @@ -68,7 +76,7 @@ def __call__(self, uids, responses: list[LogicSynapse], base_synapse: LogicSynap
+ CORRECTNESS_WEIGHT * correctness[i]
+ PROCESSING_TIME_WEIGHT * min(process_times[i] / timeout, 1)
)

# Scale up the reward
reward = reward / 2 + 0.5
valid_rewards.append(reward)
Expand All @@ -93,8 +101,15 @@ def __call__(self, uids, responses: list[LogicSynapse], base_synapse: LogicSynap

except Exception as e:
bt.logging.error(f"Error in logging reward for valid miners: {e}")



# Bonus reward for new miners
if not new_miners:
bt.logging.info("No new miners within the eligible timeframe")
else:
# Convert new_miners from list of dicts to list of MinerInfo objects
new_miners = [MinerInfo(uid=miner['uid'], time=miner['time']) for miner in new_miners]
valid_rewards = self.bonus_rewarder(valid_uids, new_miners, valid_rewards)

total_uids = valid_uids + invalid_uids
rewards = valid_rewards + invalid_rewards

Expand Down Expand Up @@ -204,7 +219,6 @@ def clean_response(self, response: str):
response = response.replace(char, ' ')
return response


def _get_correctness_by_llm(self, question: str, ground_truth: str, response: str, model_name: str, openai_client: openai.OpenAI):
"""Calculate the correctness score for a single response using LLM.

Expand Down Expand Up @@ -430,4 +444,49 @@ def _get_ground_truth(self, question: str):
except openai.OpenAIError as e:
bt.logging.error(f"API request failed after switching: {e}")

return response
return response

def bonus_rewarder(self, available_miner_uids: list[int], eligible_new_miners: list[MinerInfo], rewards: list[float]):
"""Reward new miners with a bonus based on their registration time.

Args:
available_miner_uids (list[int]): List of miner UIDs that are currently active.
eligible_new_miners (list[MinerInfo]): List of MinerInfo objects containing miner UIDs and registration times.
rewards (list[float]): List of base rewards to be modified with bonuses.

Returns:
list[float]: Modified rewards list with time-based bonuses applied to eligible new miners.
"""
if not available_miner_uids:
bt.logging.info("No available miners")
return rewards

new_miners_dict = {int(miner.uid): miner.time for miner in eligible_new_miners}
current_time = time.time()

bt.logging.debug(f"Processing bonus rewards - Eligible new miners: {new_miners_dict}")
bt.logging.debug(f"Available miner UIDs: {available_miner_uids}")

# Process each miner exactly once with direct dictionary lookup
for idx, miner_uid in enumerate(available_miner_uids):
if miner_uid in new_miners_dict:
miner_time = new_miners_dict[miner_uid]
time_factor = 1 - (current_time - miner_time) / ELIGIBLE_TIMEOUT

if time_factor > 0:
original_reward = rewards[idx]
# Calculate bonus as a percentage of original reward, scaled by time factor
bonus = linear_function(time_factor, m=0.1 * original_reward)
rewards[idx] = min(original_reward + bonus, 1.0)

bt.logging.info(
f"Bonus applied to miner {miner_uid}: "
f"final incentive = {rewards[idx]:.4f} "
f"(original = {original_reward:.4f}, bonus = {bonus:.4f}, "
f"time_factor = {time_factor:.2f})"
)
else:
bt.logging.debug(f"Miner {miner_uid} bonus period expired "
f"(registered {(current_time - miner_time) / 86400:.1f} days ago)")

return rewards
71 changes: 68 additions & 3 deletions neurons/validator/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
from neurons.validator.core.serving_queue import QueryQueue
from collections import defaultdict
import wandb
from bittensor import Subtensor

ELIGIBLE_TIMEOUT = 604800 # 7 days


def init_category(config=None, model_rotation_pool=None, dataset_weight=None):
Expand Down Expand Up @@ -150,6 +153,7 @@ def forward(self):
Query miners by batched from the serving queue then process challenge-generating -> querying -> rewarding in background by threads
DEFAULT: 16 miners per batch, 600 seconds per loop.
"""
self.sync()
self.store_miner_infomation()
bt.logging.info("\033[1;34m🔄 Updating available models & uids\033[0m")
async_batch_size = self.config.async_batch_size
Expand Down Expand Up @@ -214,7 +218,6 @@ def forward(self):
)
time.sleep(loop_base_time - actual_time_taken)


def async_query_and_reward(
self,
category: str,
Expand Down Expand Up @@ -256,9 +259,12 @@ def async_query_and_reward(
uid for uid, should_reward in zip(uids, should_rewards) if should_reward
]

# Get new miners within the ELIGIBLE_TIMEOUT period
self.new_hotkeys = self.get_new_miners()

if reward_uids:
uids, rewards, reward_logs = self.categories[category]["rewarder"](
reward_uids, reward_responses, base_synapse
reward_uids, reward_responses, base_synapse, self.new_hotkeys
)

for i, uid in enumerate(reward_uids):
Expand Down Expand Up @@ -434,6 +440,7 @@ def save_state(self):
bt.logging.info("State successfully saved to state.pkl")
except Exception as e:
bt.logging.error(f"Failed to save state: {e}")

def load_state(self):
"""Loads state of validator from a file, with fallback to .pt if .pkl is not found."""
# TODO: After a transition period, remove support for the old .pt format.
Expand Down Expand Up @@ -475,7 +482,6 @@ def load_state(self):
self.step = 0 # Default fallback in case of an unknown error
bt.logging.error(f"Error loading state: {e}")


def store_miner_infomation(self):
miner_informations = self.miner_manager.to_dict()

Expand Down Expand Up @@ -574,6 +580,65 @@ def _log_wandb(self, log):
except Exception as e:
bt.logging.error(f"Error logging to wandb: {e}")

def get_new_miners(self):
"""
Get newly registered miners within the ELIGIBLE_TIMEOUT period.

Returns:
list: List of dicts containing new miner UIDs and their registration timestamps.
Format: [{'uid': int, 'time': float}, ...]
"""
try:
# Get current block number from the chain
current_block_number = bt.subtensor().get_current_block()

# Query registration blocks for all miners in netuid 78
netuid = 35
on_chain_data = []
new_miners = []

with Subtensor() as subtensor:
on_chain_data = subtensor.query_map('SubtensorModule', 'BlockAtRegistration', params=[netuid])

if not on_chain_data:
bt.logging.warning(f"No registration data found for netuid {netuid}")
return []

# Process registration data
miner_status = []

# Get block numbers for all miners
for uid, block_number_ in on_chain_data:
block_number = block_number_.value
miner_status.append({
'uid': uid,
'registered_block_number': block_number
})

# Calculate age and filter recent miners
current_time = time.time()
for miner in miner_status:
# Convert blocks to seconds (12 seconds per block)
blocks_age = current_block_number - miner['registered_block_number']
miner_age_seconds = blocks_age * 12
if miner_age_seconds < ELIGIBLE_TIMEOUT:
bt.logging.info(f"Miner {miner['uid']} registered {miner_age_seconds / 86400:.1f} days ago")
registered_timestamp = current_time - miner_age_seconds
new_miners.append({
'uid': miner['uid'],
'time': registered_timestamp
})

if not new_miners:
bt.logging.debug(f"No new miners found within the last {ELIGIBLE_TIMEOUT / 86400:.1f} days")
else:
bt.logging.info(f"Found {len(new_miners)} new miners within {ELIGIBLE_TIMEOUT / 86400:.1f} days")

return new_miners

except Exception as e:
bt.logging.error(f"Error in get_new_miner: {str(e)}")
return []

# The main function parses the configuration and runs the validator.
if __name__ == "__main__":
Expand Down