Skip to content

Commit 65761e8

Browse files
authored
Merge pull request #672 from macrocosm-os/staging
Changes: - InferenceTask switched from seeded determinism to logits checks. - Fix HF data sampling. - Fix DDGS web search. - Timeout adjustments.
2 parents c95b547 + bd3578c commit 65761e8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+2380
-1845
lines changed

.env.miner.example

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ SUBTENSOR_NETWORK = "test"
88
SUBTENSOR_CHAIN_ENDPOINT = None
99

1010
# The name of your wallet.
11-
WALLET_NAME="miner"
11+
WALLET_NAME="example_wallet"
1212

1313
# The name of the hotkey associated with the validator wallet.
14-
HOTKEY="default"
14+
HOTKEY="example_hotkey"
1515

1616
# Open port which can be used to connect to the network.
17-
AXON_PORT=22116
17+
AXON_PORT=12345
1818

1919
# The OpenAI API key (only needed for the OpenAI test miner).
2020
OPENAI_API_KEY="YOUR_API_HERE"

neurons/miners/epistula_miner/miner.py

Lines changed: 101 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@
1414
import uvicorn
1515
from bittensor.core.axon import FastAPIThreadedServer
1616
from bittensor.core.extrinsics.serving import serve_extrinsic
17-
from fastapi import APIRouter, Depends, FastAPI, HTTPException, Request
17+
from fastapi import APIRouter, FastAPI, HTTPException, Request
1818
from loguru import logger
1919
from starlette.background import BackgroundTask
2020
from starlette.responses import StreamingResponse
21+
from vllm import LLM, SamplingParams
2122
from web_retrieval import get_websites_with_similarity
2223

23-
from prompting.llms.hf_llm import ReproducibleHF
2424
from shared.epistula import verify_signature
2525

2626
MODEL_ID: str = "gpt-3.5-turbo"
@@ -30,8 +30,56 @@
3030
NEURON_TOP_P: float = 0.95
3131
NEURON_STREAMING_BATCH_SIZE: int = 12
3232
NEURON_STOP_ON_FORWARD_EXCEPTION: bool = False
33-
SHOULD_SERVE_LLM: bool = False
34-
LOCAL_MODEL_ID = "casperhansen/llama-3-8b-instruct-awq"
33+
SHOULD_SERVE_LLM: bool = True
34+
LOCAL_MODEL_ID = "casperhansen/llama-3.2-3b-instruct-awq"
35+
36+
37+
def get_token_logprobs(llm, prompt, sampling_params):
38+
"""Get logprobs and chosen tokens for text generation."""
39+
outputs = llm.generate(prompt, sampling_params)
40+
41+
if not outputs:
42+
return None
43+
44+
output = outputs[0].outputs[0]
45+
generated_text = output.text
46+
logprobs_sequence = output.logprobs
47+
generated_tokens = output.token_ids
48+
49+
if logprobs_sequence is None:
50+
return None
51+
52+
token_logprobs = []
53+
for i, logprobs in enumerate(logprobs_sequence):
54+
if logprobs is None:
55+
continue
56+
57+
# Convert to list and sort by logprob value
58+
logprobs_list = [(k, v.logprob) for k, v in logprobs.items()]
59+
sorted_logprobs = sorted(logprobs_list, key=lambda x: x[1], reverse=True)
60+
61+
# Get top tokens and logprobs
62+
top_token_ids = [x[0] for x in sorted_logprobs]
63+
top_logprob_values = [x[1] for x in sorted_logprobs]
64+
65+
# Store the actual chosen token from generation
66+
chosen_token = llm.get_tokenizer().decode([generated_tokens[i]])
67+
68+
# Format top logprobs as list of dictionaries
69+
top_logprobs = [
70+
{"token": llm.get_tokenizer().decode([tid]), "logprob": lp}
71+
for tid, lp in zip(top_token_ids, top_logprob_values)
72+
]
73+
74+
# Store logprobs for this step
75+
step_logprobs = {
76+
"token": chosen_token,
77+
"top_tokens": [llm.get_tokenizer().decode([tid]) for tid in top_token_ids],
78+
"top_logprobs": top_logprobs,
79+
}
80+
token_logprobs.append(step_logprobs)
81+
82+
return {"text": generated_text, "token_logprobs": token_logprobs}
3583

3684

3785
class OpenAIMiner:
@@ -45,11 +93,8 @@ def __init__(self):
4593
},
4694
)
4795
if SHOULD_SERVE_LLM:
48-
self.llm = ReproducibleHF(
49-
model_id=LOCAL_MODEL_ID,
50-
device=shared_settings.NEURON_DEVICE,
51-
sampling_params=shared_settings.SAMPLING_PARAMS,
52-
)
96+
self.llm = LLM(model=LOCAL_MODEL_ID, gpu_memory_utilization=0.3, max_model_len=1000)
97+
self.tokenizer = self.llm.get_tokenizer()
5398
else:
5499
self.llm = None
55100

@@ -83,7 +128,7 @@ async def word_stream(body, headers):
83128
async def create_chat_completion(self, request: Request):
84129
data = await request.json()
85130
headers = request.headers
86-
if self.llm and request.headers.get("task", None) == "inference":
131+
if self.llm and request.headers.get("task", None) == "InferenceTask":
87132
return await self.create_inference_completion(request)
88133
if request.headers.get("task", None) == "WebRetrievalTask":
89134
return await self.stream_web_retrieval(data, headers)
@@ -93,14 +138,50 @@ async def create_chat_completion(self, request: Request):
93138

94139
async def create_inference_completion(self, request: Request):
95140
async def word_stream():
96-
inference = await self.run_inference(request)
97-
words = inference.split()
98-
print(words)
99-
for word in words:
100-
# Simulate the OpenAI streaming response format
101-
data = {"choices": [{"delta": {"content": word + " "}, "index": 0, "finish_reason": None}]}
141+
data = await request.json()
142+
messages = data.get("messages", [])
143+
sampling_params = SamplingParams(
144+
max_tokens=NEURON_MAX_TOKENS,
145+
temperature=NEURON_TEMPERATURE,
146+
top_k=NEURON_TOP_K,
147+
top_p=NEURON_TOP_P,
148+
logprobs=10,
149+
)
150+
151+
prompt = self.tokenizer.apply_chat_template(
152+
messages,
153+
tokenize=False,
154+
add_generation_prompt=True,
155+
)
156+
157+
# Get generation with logprobs
158+
result = get_token_logprobs(self.llm, prompt, sampling_params)
159+
if not result:
160+
yield f"data: {json.dumps({'error': 'Generation failed'})}\n\n"
161+
return
162+
163+
# Stream tokens and their logprobs
164+
for step in result["token_logprobs"]:
165+
logger.info(step)
166+
token = step["token"]
167+
logprobs_info = {"top_logprobs": step["top_logprobs"]}
168+
169+
# Format in OpenAI streaming style but include logprobs
170+
data = {
171+
"choices": [
172+
{
173+
"delta": {
174+
"content": token,
175+
},
176+
"logprobs": {"content": [logprobs_info]},
177+
"index": 0,
178+
"finish_reason": None,
179+
}
180+
]
181+
}
102182
yield f"data: {json.dumps(data)}\n\n"
103-
await asyncio.sleep(0.1) # Simulate a delay between words
183+
await asyncio.sleep(0.1)
184+
104185
# Indicate the end of the stream
105186
data = {"choices": [{"delta": {}, "index": 0, "finish_reason": "stop"}]}
106187
yield f"data: {json.dumps(data)}\n\n"
@@ -119,7 +200,7 @@ async def check_availability(self, request: Request):
119200

120201
# Set all model availabilities to False (openai will not be able to handle seeded inference)
121202
model_response = {key: key == LOCAL_MODEL_ID for key in llm_model_availabilities}
122-
203+
print(model_response)
123204
response = {"task_availabilities": task_response, "llm_model_availabilities": model_response}
124205

125206
return response
@@ -158,7 +239,7 @@ async def verify_request(
158239
raise HTTPException(status_code=400, detail=err)
159240

160241
def run(self):
161-
external_ip = None # shared_settings.EXTERNAL_IP
242+
external_ip = None
162243
if not external_ip or external_ip == "[::]":
163244
try:
164245
external_ip = requests.get("https://checkip.amazonaws.com").text.strip()
@@ -187,7 +268,7 @@ def run(self):
187268
router.add_api_route(
188269
"/v1/chat/completions",
189270
self.create_chat_completion,
190-
dependencies=[Depends(self.verify_request)],
271+
# dependencies=[Depends(self.verify_request)],
191272
methods=["POST"],
192273
)
193274
router.add_api_route(

neurons/miners/epistula_miner/web_retrieval.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,11 @@
33

44
import numpy as np
55
import trafilatura
6+
from duckduckgo_search.duckduckgo_search import DDGS
67
from openai import OpenAI
78

8-
from prompting.base.duckduckgo_patch import PatchedDDGS
99
from shared import settings
1010

11-
# Import the patched DDGS and use that
12-
13-
14-
# Import the patched DDGS and use that
15-
1611

1712
async def fetch_url(url: str) -> str:
1813
return trafilatura.fetch_url(url)
@@ -54,7 +49,7 @@ async def get_websites_with_similarity(
5449
Returns:
5550
List of dictionaries containing website URLs and their best matching chunks
5651
"""
57-
ddgs = PatchedDDGS(proxy=settings.shared_settings.PROXY_URL, verify=False)
52+
ddgs = DDGS(proxy=settings.shared_settings.PROXY_URL, verify=False)
5853
results = list(ddgs.text(query))
5954
urls = [r["href"] for r in results][:n_results]
6055

neurons/validator.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ async def spawn_loops(task_queue, scoring_queue, miners_dict: dict):
123123

124124
logger.info("Starting task sending loop in validator...")
125125
asyncio.create_task(task_sender.start(task_queue, scoring_queue, miners_dict, simultaneous_loops=1))
126-
logger.error("Task sending loop started")
126+
logger.debug("Task sending loop started")
127127
while True:
128128
await asyncio.sleep(5)
129129
logger.debug("Task sending loop is running")
@@ -195,7 +195,7 @@ async def main(
195195

196196
try:
197197
# Start checking the availability of miners at regular intervals
198-
if settings.shared_settings.DEPLOY_SCORING_API:
198+
if settings.shared_settings.DEPLOY_SCORING_API and not settings.shared_settings.NEURON_DISABLE_SET_WEIGHTS:
199199
# Use multiprocessing to bypass API blocking issue
200200
api_process = mp.Process(
201201
target=start_api, args=(scoring_queue, reward_events, miners_dict), name="APIProcess"

notebooks/demo.ipynb

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,91 @@
11
{
22
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": 2,
6+
"metadata": {},
7+
"outputs": [
8+
{
9+
"name": "stdout",
10+
"output_type": "stream",
11+
"text": [
12+
"INFO 04-02 16:34:52 [config.py:585] This model supports multiple tasks: {'classify', 'generate', 'score', 'embed', 'reward'}. Defaulting to 'generate'.\n",
13+
"INFO 04-02 16:34:54 [awq_marlin.py:114] The model is convertible to awq_marlin during runtime. Using awq_marlin kernel.\n",
14+
"INFO 04-02 16:34:54 [config.py:1697] Chunked prefill is enabled with max_num_batched_tokens=8192.\n",
15+
"INFO 04-02 16:34:55 [core.py:54] Initializing a V1 LLM engine (v0.8.2) with config: model='casperhansen/llama-3.2-3b-instruct-awq', speculative_config=None, tokenizer='casperhansen/llama-3.2-3b-instruct-awq', skip_tokenizer_init=False, tokenizer_mode=auto, revision=None, override_neuron_config=None, tokenizer_revision=None, trust_remote_code=True, dtype=torch.float16, max_seq_len=1000, download_dir=None, load_format=auto, tensor_parallel_size=1, pipeline_parallel_size=1, disable_custom_all_reduce=False, quantization=awq_marlin, enforce_eager=False, kv_cache_dtype=auto, device_config=cuda, decoding_config=DecodingConfig(guided_decoding_backend='xgrammar', reasoning_backend=None), observability_config=ObservabilityConfig(show_hidden_metrics=False, otlp_traces_endpoint=None, collect_model_forward_time=False, collect_model_execute_time=False), seed=None, served_model_name=casperhansen/llama-3.2-3b-instruct-awq, num_scheduler_steps=1, multi_step_stream_outputs=True, enable_prefix_caching=True, chunked_prefill_enabled=True, use_async_output_proc=True, disable_mm_preprocessor_cache=False, mm_processor_kwargs=None, pooler_config=None, compilation_config={\"level\":3,\"custom_ops\":[\"none\"],\"splitting_ops\":[\"vllm.unified_attention\",\"vllm.unified_attention_with_output\"],\"use_inductor\":true,\"compile_sizes\":[],\"use_cudagraph\":true,\"cudagraph_num_of_warmups\":1,\"cudagraph_capture_sizes\":[512,504,496,488,480,472,464,456,448,440,432,424,416,408,400,392,384,376,368,360,352,344,336,328,320,312,304,296,288,280,272,264,256,248,240,232,224,216,208,200,192,184,176,168,160,152,144,136,128,120,112,104,96,88,80,72,64,56,48,40,32,24,16,8,4,2,1],\"max_capture_size\":512}\n",
16+
"WARNING 04-02 16:34:55 [utils.py:2321] Methods determine_num_available_blocks,device_config,get_cache_block_size_bytes,initialize_cache not implemented in <vllm.v1.worker.gpu_worker.Worker object at 0x7f3b900b7fa0>\n",
17+
"INFO 04-02 16:34:56 [parallel_state.py:954] rank 0 in world size 1 is assigned as DP rank 0, PP rank 0, TP rank 0\n",
18+
"INFO 04-02 16:34:56 [cuda.py:220] Using Flash Attention backend on V1 engine.\n",
19+
"INFO 04-02 16:34:56 [gpu_model_runner.py:1174] Starting to load model casperhansen/llama-3.2-3b-instruct-awq...\n",
20+
"WARNING 04-02 16:34:56 [topk_topp_sampler.py:63] FlashInfer is not available. Falling back to the PyTorch-native implementation of top-p & top-k sampling. For the best performance, please install FlashInfer.\n",
21+
"INFO 04-02 16:34:56 [weight_utils.py:265] Using model weights format ['*.safetensors']\n",
22+
"INFO 04-02 16:34:57 [weight_utils.py:315] No model.safetensors.index.json found in remote.\n"
23+
]
24+
},
25+
{
26+
"name": "stderr",
27+
"output_type": "stream",
28+
"text": [
29+
"Loading safetensors checkpoint shards: 0% Completed | 0/1 [00:00<?, ?it/s]\n",
30+
"Loading safetensors checkpoint shards: 100% Completed | 1/1 [00:00<00:00, 2.00it/s]\n",
31+
"Loading safetensors checkpoint shards: 100% Completed | 1/1 [00:00<00:00, 1.99it/s]\n",
32+
"\n"
33+
]
34+
},
35+
{
36+
"name": "stdout",
37+
"output_type": "stream",
38+
"text": [
39+
"INFO 04-02 16:34:57 [loader.py:447] Loading weights took 0.59 seconds\n",
40+
"INFO 04-02 16:34:58 [gpu_model_runner.py:1186] Model loading took 2.1364 GB and 1.841784 seconds\n",
41+
"INFO 04-02 16:35:07 [backends.py:415] Using cache directory: /root/.cache/vllm/torch_compile_cache/0b0416c300/rank_0_0 for vLLM's torch.compile\n",
42+
"INFO 04-02 16:35:07 [backends.py:425] Dynamo bytecode transform time: 9.07 s\n",
43+
"INFO 04-02 16:35:08 [backends.py:115] Directly load the compiled graph for shape None from the cache\n",
44+
"INFO 04-02 16:35:14 [monitor.py:33] torch.compile takes 9.07 s in total\n",
45+
"INFO 04-02 16:35:15 [kv_cache_utils.py:566] GPU KV cache size: 217,248 tokens\n",
46+
"INFO 04-02 16:35:15 [kv_cache_utils.py:569] Maximum concurrency for 1,000 tokens per request: 217.25x\n",
47+
"INFO 04-02 16:35:33 [gpu_model_runner.py:1534] Graph capturing finished in 18 secs, took 0.48 GiB\n",
48+
"INFO 04-02 16:35:34 [core.py:151] init engine (profile, create kv cache, warmup model) took 35.70 seconds\n"
49+
]
50+
}
51+
],
52+
"source": [
53+
"from prompting.llms.vllm_llm import ReproducibleVLLM\n",
54+
"from loguru import logger\n",
55+
"\n",
56+
"try: \n",
57+
" llm = ReproducibleVLLM(model_id=\"casperhansen/llama-3.2-3b-instruct-awq\", device=\"cuda:0\", sampling_params={\"temperature\": 0.0})\n",
58+
"except Exception as e:\n",
59+
" logger.exception(e)"
60+
]
61+
},
62+
{
63+
"cell_type": "code",
64+
"execution_count": 6,
65+
"metadata": {},
66+
"outputs": [
67+
{
68+
"name": "stderr",
69+
"output_type": "stream",
70+
"text": [
71+
"Processed prompts: 100%|██████████| 1/1 [00:00<00:00, 1.37it/s, est. speed input: 9.62 toks/s, output: 137.41 toks/s]\n"
72+
]
73+
},
74+
{
75+
"data": {
76+
"text/plain": [
77+
"\" Congrats on reaching the 1 year milestone in your marriage!\\n\\nI think there may be a few... issues (no, just kidding, or am I?). Seriously though, is this a hypothetical or real scenario? Either way, I think we can have some fun with this!\\n\\nIf you'd like, we could explore some fun conversations, think of some creative writing prompts, or even plan a fun activity together. Let me know what's on your mind, and I'll do my best to help\""
78+
]
79+
},
80+
"execution_count": 6,
81+
"metadata": {},
82+
"output_type": "execute_result"
83+
}
84+
],
85+
"source": [
86+
"await llm.generate(\"Hello, how are you?\")"
87+
]
88+
},
389
{
490
"cell_type": "code",
591
"execution_count": 1,

0 commit comments

Comments
 (0)