Skip to content

Commit 7b3f1ef

Browse files
authored
Merge pull request #596 from macrocosm-os/staging
v2.17.3
2 parents be4e6f0 + 5ed4dbe commit 7b3f1ef

37 files changed

+345
-266
lines changed

.env.api.example

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
API_PORT = "42170" # Port for the API server
22
API_HOST = "0.0.0.0" # Host for the API server
33
SCORING_KEY = "123" # The scoring key for the validator (must match the scoring key in the .env.validator file)
4-
SCORE_ORGANICS = True # Whether to score organics
54
VALIDATOR_API = "0.0.0.0:8094" # The validator API to forward responses to for scoring
65
WORKERS=4

.env.validator.example

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ HF_TOKEN = "your_huggingface_token_here"
2626

2727
# Scoring API (optional).
2828
DEPLOY_SCORING_API = true
29-
SCORING_ADMIN_KEY = "123456"
3029
SCORING_API_PORT = 8094
3130
# Scoring key must match the scoring key in the .env.api.
3231
# SCORING_KEY="..."

data/top100k_domains.csv

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99997,4 +99997,4 @@
9999799997
"99996","tankspotter.com","4.51"
9999899998
"99997","targetshootingapp.com","4.51"
9999999999
"99998","tastytalegame.com","4.51"
100000-
"99999","tbscan.com","4.51"
100000+
"99999","tbscan.com","4.51"

neurons/miners/epistula_miner/web_retrieval.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
import numpy as np
55
import trafilatura
6-
from loguru import logger
76
from openai import OpenAI
87

98
from prompting.base.duckduckgo_patch import PatchedDDGS
@@ -55,10 +54,8 @@ async def get_websites_with_similarity(
5554
Returns:
5655
List of dictionaries containing website URLs and their best matching chunks
5756
"""
58-
logger.debug("Getting results")
5957
ddgs = PatchedDDGS(proxy=settings.shared_settings.PROXY_URL, verify=False)
6058
results = list(ddgs.text(query))
61-
logger.debug(f"Got {len(results)} results")
6259
urls = [r["href"] for r in results][:n_results]
6360

6461
# Fetch and extract content
@@ -74,7 +71,6 @@ async def get_websites_with_similarity(
7471
if not text: # Skip if extraction failed
7572
continue
7673

77-
# logger.debug(f"TEXTS: {text}")
7874
chunks = create_chunks(text)
7975
chunk_embeddings = client.embeddings.create(model="text-embedding-ada-002", input=chunks).data
8076

neurons/validator.py

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
import asyncio
22
import multiprocessing as mp
33
import sys
4-
import time
54

65
import loguru
6+
import netaddr
7+
import requests
78
import torch
89
import wandb
10+
from bittensor.core.extrinsics.serving import serve_extrinsic
11+
12+
from prompting.rewards.scoring import task_scorer
913

1014
# ruff: noqa: E402
1115
from shared import settings
@@ -34,7 +38,6 @@ async def spawn_loops(task_queue, scoring_queue, reward_events):
3438
# ruff: noqa: E402
3539
from prompting.llms.model_manager import model_scheduler
3640
from prompting.miner_availability.miner_availability import availability_checking_loop
37-
from prompting.rewards.scoring import task_scorer
3841
from prompting.tasks.task_creation import task_loop
3942
from prompting.tasks.task_sending import task_sender
4043
from prompting.weight_setting.weight_setter import weight_setter
@@ -61,18 +64,10 @@ async def spawn_loops(task_queue, scoring_queue, reward_events):
6164
logger.info("Starting WeightSetter...")
6265
asyncio.create_task(weight_setter.start(reward_events))
6366

64-
# Main monitoring loop
65-
start = time.time()
66-
67-
logger.info("Starting Main Monitoring Loop...")
6867
while True:
6968
await asyncio.sleep(5)
70-
current_time = time.time()
71-
time_diff = current_time - start
72-
start = current_time
7369

7470
# Check if all tasks are still running
75-
logger.debug(f"Running {time_diff:.2f} seconds")
7671
logger.debug(f"Number of tasks in Task Queue: {len(task_queue)}")
7772
logger.debug(f"Number of tasks in Scoring Queue: {len(scoring_queue)}")
7873
logger.debug(f"Number of tasks in Reward Events: {len(reward_events)}")
@@ -87,15 +82,29 @@ async def start():
8782
# TODO: We should not use 2 availability loops for each process, in reality
8883
# we should only be sharing the miner availability data between processes.
8984
from prompting.miner_availability.miner_availability import availability_checking_loop
90-
from prompting.rewards.scoring import task_scorer
9185

9286
asyncio.create_task(availability_checking_loop.start())
9387

88+
try:
89+
external_ip = requests.get("https://checkip.amazonaws.com").text.strip()
90+
netaddr.IPAddress(external_ip)
91+
92+
serve_success = serve_extrinsic(
93+
subtensor=settings.shared_settings.SUBTENSOR,
94+
wallet=settings.shared_settings.WALLET,
95+
ip=external_ip,
96+
port=settings.shared_settings.SCORING_API_PORT,
97+
protocol=4,
98+
netuid=settings.shared_settings.NETUID,
99+
)
100+
101+
logger.debug(f"Serve success: {serve_success}")
102+
except Exception as e:
103+
logger.warning(f"Failed to serve scoring api to chain: {e}")
94104
await start_scoring_api(task_scorer, scoring_queue, reward_events)
95105

96106
while True:
97107
await asyncio.sleep(10)
98-
logger.debug("Running API...")
99108

100109
asyncio.run(start())
101110

@@ -112,7 +121,6 @@ async def main():
112121

113122
try:
114123
# # Start checking the availability of miners at regular intervals
115-
116124
if settings.shared_settings.DEPLOY_SCORING_API:
117125
# Use multiprocessing to bypass API blocking issue
118126
api_process = mp.Process(target=start_api, args=(scoring_queue, reward_events), name="API_Process")
@@ -122,13 +130,9 @@ async def main():
122130
loop_process = mp.Process(
123131
target=create_loop_process, args=(task_queue, scoring_queue, reward_events), name="LoopProcess"
124132
)
125-
# task_loop_process = mp.Process(
126-
# target=create_task_loop, args=(task_queue, scoring_queue), name="TaskLoopProcess"
127-
# )
133+
128134
loop_process.start()
129-
# task_loop_process.start()
130135
processes.append(loop_process)
131-
# processes.append(task_loop_process)
132136
GPUInfo.log_gpu_info()
133137

134138
step = 0
@@ -143,9 +147,9 @@ async def main():
143147
current_block = settings.shared_settings.SUBTENSOR.get_current_block()
144148
last_update_block = settings.shared_settings.METAGRAPH.last_update[settings.shared_settings.UID]
145149
logger.warning(
146-
f"UPDATES HAVE STALED FOR {current_block - last_update_block} BLOCKS AND {step} STEPS"
150+
f"Metagraph hasn't been updated for {current_block - last_update_block} blocks. "
151+
f"Staled block: {current_block}, Last update: {last_update_block}"
147152
)
148-
logger.warning(f"STALED: {current_block}, {settings.shared_settings.METAGRAPH.block}")
149153
sys.exit(1)
150154
step += 1
151155

prompting/api/api.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
from prompting.api.miner_availabilities.api import router as miner_availabilities_router
66
from prompting.api.scoring.api import router as scoring_router
7+
8+
# from prompting.rewards.scoring import task_scorer
79
from shared import settings
810

911
app = FastAPI()
@@ -13,7 +15,6 @@
1315

1416
@app.get("/health")
1517
def health():
16-
logger.info("Health endpoint accessed.")
1718
return {"status": "healthy"}
1819

1920

prompting/api/miner_availabilities/api.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from typing import Literal
22

33
from fastapi import APIRouter
4-
from loguru import logger
54

65
from prompting.miner_availability.miner_availability import miner_availabilities
76
from prompting.tasks.task_registry import TaskRegistry
@@ -13,7 +12,6 @@
1312
async def get_miner_availabilities(uids: list[int] | None = None):
1413
if uids:
1514
return {uid: miner_availabilities.miners.get(uid) for uid in uids}
16-
logger.info(f"Returning all miner availabilities for {len(miner_availabilities.miners)} miners")
1715
return miner_availabilities.miners
1816

1917

@@ -23,7 +21,6 @@ async def get_available_miners(
2321
model: str | None = None,
2422
k: int = 10,
2523
):
26-
logger.info(f"Getting {k} available miners for task {task} and model {model}")
2724
task_configs = [config for config in TaskRegistry.task_configs if config.task.__name__ == task]
2825
task_config = task_configs[0] if task_configs else None
2926
return miner_availabilities.get_available_miners(task=task_config, model=model, k=k)

prompting/api/scoring/api.py

Lines changed: 57 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
import time
12
import uuid
23
from typing import Any
34

4-
from fastapi import APIRouter, Depends, Header, HTTPException, Request
5+
from fastapi import APIRouter, Depends, HTTPException, Request
56
from loguru import logger
67

78
from prompting.datasets.random_website import DDGDatasetEntry
@@ -11,13 +12,38 @@
1112
from shared import settings
1213
from shared.base import DatasetEntry
1314
from shared.dendrite import DendriteResponseEvent
14-
from shared.epistula import SynapseStreamResult
15+
from shared.epistula import SynapseStreamResult, verify_signature
16+
from shared.settings import shared_settings
1517

1618
router = APIRouter()
1719

1820

19-
def validate_scoring_key(api_key: str = Header(...)):
20-
if api_key != settings.shared_settings.SCORING_KEY:
21+
async def verify_scoring_signature(request: Request):
22+
signed_by = request.headers.get("Epistula-Signed-By")
23+
signed_for = request.headers.get("Epistula-Signed-For")
24+
if signed_for != shared_settings.WALLET.hotkey.ss58_address:
25+
raise HTTPException(status_code=400, detail="Bad Request, message is not intended for self")
26+
if signed_by != shared_settings.API_HOTKEY:
27+
raise HTTPException(status_code=401, detail="Signer not the expected ss58 address")
28+
29+
body = await request.body()
30+
now = time.time()
31+
err = verify_signature(
32+
request.headers.get("Epistula-Request-Signature"),
33+
body,
34+
request.headers.get("Epistula-Timestamp"),
35+
request.headers.get("Epistula-Uuid"),
36+
signed_for,
37+
signed_by,
38+
now,
39+
)
40+
if err:
41+
logger.error(err)
42+
raise HTTPException(status_code=400, detail=err)
43+
44+
45+
def validate_scoring_key(request: Request):
46+
if request.headers.api_key != settings.shared_settings.SCORING_KEY:
2147
raise HTTPException(status_code=403, detail="Invalid API key")
2248

2349

@@ -27,56 +53,59 @@ def get_task_scorer(request: Request):
2753

2854
@router.post("/scoring")
2955
async def score_response(
30-
request: Request, api_key_data: dict = Depends(validate_scoring_key), task_scorer=Depends(get_task_scorer)
56+
request: Request, api_key_data: dict = Depends(verify_scoring_signature), task_scorer=Depends(get_task_scorer)
3157
):
58+
logger.debug("Scoring Request received!!!!!!!!!!!!!!!!")
3259
model = None
60+
logger.debug("Setted Model to None")
3361
payload: dict[str, Any] = await request.json()
62+
logger.debug(f"Awaited body: {payload}")
3463
body = payload.get("body")
35-
timeout = payload.get("timeout", settings.shared_settings.NEURON_TIMEOUT)
36-
uids = payload.get("uid", [])
64+
timeout = payload.get("timeout", shared_settings.NEURON_TIMEOUT)
65+
uids = payload.get("uids", [])
3766
chunks = payload.get("chunks", {})
67+
timings = payload.get("timings", {})
68+
logger.debug("About to check chunks and uids")
3869
if not uids or not chunks:
3970
logger.error(f"Either uids: {uids} or chunks: {chunks} is not valid, skipping scoring")
4071
return
4172
uids = [int(uid) for uid in uids]
4273
model = body.get("model")
43-
if model:
44-
try:
45-
llm_model = ModelZoo.get_model_by_id(model)
46-
except Exception:
47-
logger.warning(
48-
f"Organic request with model {body.get('model')} made but the model cannot be found in model zoo. Skipping scoring."
49-
)
74+
logger.debug("About to check model")
75+
if model and model != shared_settings.LLM_MODEL:
76+
logger.error(f"Model {model} not available for scoring on this validator.")
5077
return
51-
else:
52-
llm_model = None
78+
logger.debug("Model has been checked")
79+
llm_model = ModelZoo.get_model_by_id(model)
80+
logger.debug("Got LLM Model from ModelZoo")
5381
task_name = body.get("task")
82+
logger.debug(f"Task name set: {task_name}")
83+
logger.debug(f"Length pre-insertion: {len(task_scorer.scoring_queue)}")
5484
if task_name == "InferenceTask":
55-
logger.info(f"Received Organic InferenceTask with body: {body}")
56-
logger.info(f"With model of type {type(body.get('model'))}")
5785
organic_task = InferenceTask(
5886
messages=body.get("messages"),
5987
llm_model=llm_model,
60-
llm_model_id=body.get("model"),
88+
llm_model_id=llm_model,
6189
seed=int(body.get("seed", 0)),
62-
sampling_params=body.get("sampling_parameters", settings.shared_settings.SAMPLING_PARAMS),
90+
sampling_params=body.get("sampling_parameters", shared_settings.SAMPLING_PARAMS),
6391
query=body.get("messages"),
92+
organic=True,
6493
)
65-
logger.info(f"Task created: {organic_task}")
6694
task_scorer.add_to_queue(
6795
task=organic_task,
6896
response=DendriteResponseEvent(
6997
uids=uids,
7098
stream_results=[SynapseStreamResult(accumulated_chunks=chunks.get(str(uid), None)) for uid in uids],
7199
timeout=timeout,
100+
stream_results_all_chunks_timings=[timings.get(str(uid), None) for uid in uids],
72101
),
73102
dataset_entry=DatasetEntry(),
74-
block=settings.shared_settings.METAGRAPH.block,
103+
block=shared_settings.METAGRAPH.block,
75104
step=-1,
76105
task_id=str(uuid.uuid4()),
77106
)
107+
78108
elif task_name == "WebRetrievalTask":
79-
logger.info(f"Received Organic WebRetrievalTask with body: {body}")
80109
try:
81110
search_term = body.get("messages")[0].get("content")
82111
except Exception as ex:
@@ -91,15 +120,14 @@ async def score_response(
91120
query=search_term,
92121
),
93122
response=DendriteResponseEvent(
94-
uids=[uids],
95-
stream_results=[
96-
SynapseStreamResult(accumulated_chunks=[chunk for chunk in chunks if chunk is not None])
97-
],
98-
timeout=body.get("timeout", settings.shared_settings.NEURON_TIMEOUT),
123+
uids=uids,
124+
stream_results=[SynapseStreamResult(accumulated_chunks=chunks.get(str(uid), [])) for uid in uids],
125+
timeout=body.get("timeout", shared_settings.NEURON_TIMEOUT),
99126
),
100127
dataset_entry=DDGDatasetEntry(search_term=search_term),
101-
block=settings.shared_settings.METAGRAPH.block,
128+
block=shared_settings.METAGRAPH.block,
102129
step=-1,
103130
task_id=str(uuid.uuid4()),
104131
)
132+
logger.debug(f"Current Queue: {len(task_scorer.scoring_queue)}")
105133
logger.info("Organic task appended to scoring queue")

prompting/datasets/huggingface_github.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
from datasets import load_dataset
2-
from loguru import logger
32
from pydantic import ConfigDict, model_validator
43

54
from shared.base import BaseDataset, DatasetEntry
@@ -61,7 +60,6 @@ def next(self) -> HuggingFaceGithubDatasetEntry:
6160
entry = next(self.iterator)
6261
return self._process_entry(entry) # Throws failed to get a valid file after multiple attempts
6362
except StopIteration:
64-
logger.warning("Reached end of dataset. Resetting iterator.")
6563
self.reset()
6664
raise Exception("Failed to get a valid file after multiple attempts")
6765

0 commit comments

Comments
 (0)