From 306e1b3595061f59aedb5ada37e1533abbbec141 Mon Sep 17 00:00:00 2001 From: femto Date: Mon, 17 Feb 2025 15:40:11 +0800 Subject: [PATCH] deepseek-r1 --- examples/smart_minion/aime/aime_config.json | 14 + examples/smart_minion/aime/evalute_aime.py | 246 ++++++++++++ examples/smart_minion/brain.py | 25 +- examples/smart_minion/evalute_aime.py | 378 ------------------- minion/main/input.py | 2 +- minion/main/worker.py | 15 + minion/providers/azure_inference_provider.py | 5 +- 7 files changed, 292 insertions(+), 393 deletions(-) create mode 100644 examples/smart_minion/aime/aime_config.json create mode 100644 examples/smart_minion/aime/evalute_aime.py delete mode 100644 examples/smart_minion/evalute_aime.py diff --git a/examples/smart_minion/aime/aime_config.json b/examples/smart_minion/aime/aime_config.json new file mode 100644 index 00000000..fb7bdb8c --- /dev/null +++ b/examples/smart_minion/aime/aime_config.json @@ -0,0 +1,14 @@ +{ + "type": "ensemble", + "pre_processing": ["problem_reflect","example_reasoning"], + "workers": [ + { + "name": "raw", + "count": 1, + "check": 1 + } + ], + "result_strategy": { + "name": "majority_voting" + } +} \ No newline at end of file diff --git a/examples/smart_minion/aime/evalute_aime.py b/examples/smart_minion/aime/evalute_aime.py new file mode 100644 index 00000000..571a1c1e --- /dev/null +++ b/examples/smart_minion/aime/evalute_aime.py @@ -0,0 +1,246 @@ +import asyncio +import json +import os +import re +import sys +import threading +import time +from typing import List, Dict, Tuple, Optional, Any +from contextlib import redirect_stdout +from io import StringIO + +import aiofiles +import numpy as np +from tqdm.asyncio import tqdm + +from minion.configs.config import config +from minion.main.brain import Brain +from minion.main.rpyc_python_env import RpycPythonEnv +from minion.utils.syncheck import run_with_timeout +from minion.utils.utils import extract_number_from_string +from minion.providers import create_llm_provider +from minion.providers.cost import CostManager +from minion.utils.process import run_code_in_separate_process + +# Load JSONL file +def load_json(file_path): + with open(file_path, "r") as f: + data = json.load(f) + return data + + +# Load JSONL file +def load_jsonl(file_path): + data = [] + with open(file_path, "r") as f: + for line in f: + data.append(json.loads(line.strip())) + return data + + +def extract_answer(answer_str): + # Regular expression to find the answer after '####' + match = re.search(r"####\s*(.*)", answer_str) + if match: + return match.group(1).strip() # Extract and remove any surrounding whitespace + else: + return answer_str # Return None if no match is found + +async def evaluate_dataset( + data, + last_processed_id=None, + start_id=None, + to_processed_id=None, + route="cot", + run_filename=None, + continue_process=False, + concurrency_count=1, +): + correct = 0 + count = 0 + total_count = len(data) + matched_ids = [] + mismatch = [] + tasks = [] + + async def process_batch(tasks, correct): + results = await asyncio.gather(*tasks) + for result in results: + correct += result["result"] + if result["result"] == 1: + matched_ids.append(result["item_id"]) + else: + mismatch.append(result) + last_processed_item = results[-1] # Get the last processed item + return correct, last_processed_item + + async def read_json_file(filename): + async with aiofiles.open(filename, "r") as f: + contents = await f.read() + data = json.loads(contents) + return data + + async def save_run_info(filename, last_processed_id): + run_info = { + "last_processed_id": last_processed_id, + "matched_ids": matched_ids, + "mismatched_ids": mismatch, + "correct": correct, + "count": count, + "correct_percentage": correct / count if count > 0 else 0, + "total_prompt_tokens": cost_manager.total_prompt_tokens, + "total_completion_tokens": cost_manager.total_completion_tokens, + "total_cost": cost_manager.total_cost, + } + async with aiofiles.open(filename, "w") as f: + await f.write(json.dumps(run_info, indent=4)) + + if continue_process and os.path.exists(run_filename): + async with aiofiles.open(run_filename, "r") as f: + run_info = json.loads(await f.read()) + last_processed_id = run_info["last_processed_id"] + matched_ids = run_info["matched_ids"] + mismatch = run_info["mismatched_ids"] + correct = run_info["correct"] + count = run_info["count"] + cost_manager.total_prompt_tokens = run_info.get("total_prompt_tokens", 0) + cost_manager.total_completion_tokens = run_info.get("total_completion_tokens", 0) + cost_manager.total_cost = run_info.get("total_cost", 0) + + with tqdm(total=total_count, desc="Evaluating") as pbar: + for i, item in enumerate(data): + item_id = i + item["idx"] = i + if last_processed_id is not None and item_id <= last_processed_id: + continue + if start_id and item_id < start_id: + continue + if to_processed_id and item_id > to_processed_id: + break + + count += 1 + tasks.append(solve_single_question(item, route=route)) + + if len(tasks) == concurrency_count: + correct, last_processed_item = await process_batch(tasks, correct) + last_processed_id = last_processed_item["item_id"] + tasks = [] # Reset tasks after processing + pbar.set_postfix({"Correct": correct, "count": count}) + pbar.update(concurrency_count) + + # Save running information after each batch + await save_run_info(filename=run_filename, last_processed_id=last_processed_id) + + # Process remaining tasks + if tasks: + correct, last_processed_item = await process_batch(tasks, correct) + last_processed_id = last_processed_item["item_id"] + pbar.set_postfix({"Correct": correct}) + pbar.update(len(tasks)) + + # Save running information after each batch + await save_run_info(filename=run_filename, last_processed_id=last_processed_id) + + return correct, count, matched_ids, mismatch + +PASS = "PASS" +FAIL = "FAIL" + +def check_solution(solution, test): + print(f"solution: {solution}") + + try: + # Get test cases from the dictionary + inputs = test.get('input', []) + outputs = test.get('output', []) + + # Run each test case + for input_data, expected_output in zip(inputs, outputs): + try: + # Run the code in a separate process + result = run_code_in_separate_process(solution, input_data) + + if result.stderr: + print(f"Test produced stderr: {result.stderr}") + + # Compare outputs (strip both to handle trailing newlines) + if result.stdout.strip() != expected_output.strip(): + return (FAIL, f"Test failed:\nInput: {input_data}\nExpected: {expected_output}\nGot: {result.stdout}\nStderr: {result.stderr if result.stderr else 'None'}") + except Exception as e: + return (FAIL, f"Test execution failed: {str(e)}") + + return (PASS, "Solution passed all test cases.") + + except TimeoutError: + return (FAIL, "Execution timeout. Please check if your solution contains infinite loops or time-consuming operations.") + except Exception as e: + # Record detailed error information + error_message = f"Error: {str(e)}.\n Solution: {solution}.\n Test: {test}" + + # Write error information to error.log file + with open('error.log', 'a', encoding='utf-8') as log_file: + log_file.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {error_message}\n") + + return (FAIL, error_message) + +async def solve_single_question(item, route="cot"): + item_id = item.get("id", -1) # Extract the ID or use a default value + + #correct_answer = extract_answer(ground_truth_raw) + question = item["problem"] + answer = await solve_question(item) + if answer == item["answer"]: + return {"result": 1, "item_id": item_id, "question": question, "answer": answer, "idx": item_id} + + else: + # Append the mismatched item to the JSONL file + return { + "result": 0, + "item_id": item_id, + "item": item, + "question": question, + "answer": answer, + "idx": item_id, + } + + +# Load ensemble logic from JSON files +def load_execution_config(file_path): + with open(file_path, "r") as file: + ensemble_logic = json.load(file) + return ensemble_logic + +async def solve_question(item): + brain = Brain(stats_storer=None, python_env=RpycPythonEnv(ports=3007), llm=llm) + current_dir = os.path.dirname(os.path.abspath(__file__)) + ensemble_logic_path = os.path.join(current_dir, "aime_config.json") + # 加载测试用例 + + answer, score, *_ = await brain.step( + query=item["problem"], + execution_config=load_execution_config(ensemble_logic_path), + ) + return answer + +#model = "gpt-4o" +#model = "claude" +model = "deepseek-r1" + +llm = create_llm_provider(config.models.get(model)) +cost_manager = CostManager() +llm.cost_manager = cost_manager +async def main(): + from datasets import load_dataset + ds = load_dataset("HuggingFaceH4/aime_2024", split='train') + correct, count, matched_ids, mismatched_ids = await evaluate_dataset( + ds, run_filename=f"run_aime_{model}.json", continue_process=True, concurrency_count=1 + ) + + print(f"Accuracy: {correct/count:.2%}") + print(f"Mismatched IDs: {mismatched_ids}") + + +# Run the async main function +if __name__ == "__main__": + asyncio.run(main()) +# Example usage diff --git a/examples/smart_minion/brain.py b/examples/smart_minion/brain.py index 343c43fa..274746c9 100644 --- a/examples/smart_minion/brain.py +++ b/examples/smart_minion/brain.py @@ -19,6 +19,7 @@ async def smart_brain(): # 使用从 minion/__init__.py 导入的 config 对象 model = "gpt-4o" model = "deepseek-r1" + model = "phi-4" #model = "llama3.2" llm_config = config.models.get(model) @@ -34,18 +35,18 @@ async def smart_brain(): # obs, score, *_ = await brain.step(query="what's the solution for game of 24 for 1 3 4 6", route="python") # print(obs) - obs, score, *_ = await brain.step(query="what's the solution for game of 24 for 2 3 5 12", route="python") - print(obs) - - current_file_dir = os.path.dirname(os.path.abspath(__file__)) - cache_plan = os.path.join(current_file_dir, "aime", "plan_gpt4o.1.json") - obs, score, *_ = await brain.step( - query="Every morning Aya goes for a $9$-kilometer-long walk and stops at a coffee shop afterwards. When she walks at a constant speed of $s$ kilometers per hour, the walk takes her 4 hours, including $t$ minutes spent in the coffee shop. When she walks $s+2$ kilometers per hour, the walk takes her 2 hours and 24 minutes, including $t$ minutes spent in the coffee shop. Suppose Aya walks at $s+\frac{1}{2}$ kilometers per hour. Find the number of minutes the walk takes her, including the $t$ minutes spent in the coffee shop.", - route="plan", - dataset="aime 2024", - cache_plan=cache_plan, - ) - print(obs) + # obs, score, *_ = await brain.step(query="what's the solution for game of 24 for 2 3 5 12", route="python") + # print(obs) + # + # current_file_dir = os.path.dirname(os.path.abspath(__file__)) + # cache_plan = os.path.join(current_file_dir, "aime", "plan_gpt4o.1.json") + # obs, score, *_ = await brain.step( + # query="Every morning Aya goes for a $9$-kilometer-long walk and stops at a coffee shop afterwards. When she walks at a constant speed of $s$ kilometers per hour, the walk takes her 4 hours, including $t$ minutes spent in the coffee shop. When she walks $s+2$ kilometers per hour, the walk takes her 2 hours and 24 minutes, including $t$ minutes spent in the coffee shop. Suppose Aya walks at $s+\frac{1}{2}$ kilometers per hour. Find the number of minutes the walk takes her, including the $t$ minutes spent in the coffee shop.", + # route="plan", + # dataset="aime 2024", + # cache_plan=cache_plan, + # ) + # print(obs) # 从 HumanEval/88 提取的测试用例 test_data = { diff --git a/examples/smart_minion/evalute_aime.py b/examples/smart_minion/evalute_aime.py deleted file mode 100644 index 919759cc..00000000 --- a/examples/smart_minion/evalute_aime.py +++ /dev/null @@ -1,378 +0,0 @@ -import asyncio -import json -import os -import re -from collections import Counter - -import aiofiles -from datasets import load_dataset -from minion.llm import LLM -from minion.utils.cost_manager import CostManager -from pydantic import BaseModel -from tqdm.asyncio import tqdm - -from minion.main.brain import Brain -from minion.main.rpyc_python_env import RpycPythonEnv -from minion.main.stats_storer import ( - JsonStatsStorer, - MultipleStatsStorer, - SqlStatsStorer, -) - - -# Load JSONL file -def load_json(file_path): - with open(file_path, "r") as f: - data = json.load(f) - return data - - -# Load JSONL file -def load_jsonl(file_path): - data = [] - with open(file_path, "r") as f: - for line in f: - data.append(json.loads(line.strip())) - return data - - -def extract_answer(answer_str): - return answer_str - # Regular expression to find the answer after '####' - match = re.search(r"####\s*(.*)", answer_str) - if match: - return match.group(1).strip() # Extract and remove any surrounding whitespace - else: - return None # Return None if no match is found - - -def roman_to_int(roman): - roman_values = { - "I": 1, - "II": 2, - "III": 3, - "IV": 4, - "V": 5, - "VI": 6, - "VII": 7, - "VIII": 8, - "IX": 9, - "X": 10, - "XI": 11, - "XII": 12, - } - return roman_values.get(roman.upper(), 0) - - -class Item(BaseModel): - id: str - arr: list[int] = None - - def __init__(self, **data): - data["id"] = str(data["id"]) - super().__init__(**data) - # Split the id into parts - parts = self.id.split("-") - - # Convert the parts to integers, handling Roman numerals as needed - self.arr = [] - for part in parts: - try: - self.arr.append(int(part)) - except ValueError: - self.arr.append(roman_to_int(part)) - - def __lt__(self, other): - return self.arr < other.arr - - def __le__(self, other): - return self.arr <= other.arr - - -cost_manager = CostManager() -llm = LLM() -llm.cost_manager = cost_manager - - -async def evaluate_dataset( - data, - last_processed_id=0, - to_processed_id=None, - route=None, - concurrency_count=1, - start_id=None, - continue_process=False, - run_filename=None, - stats_storer=None, -): - correct = 0 - count = 0 - total_count = len(data) - matched_ids = [] - mismatch = [] - tasks = [] - - async def process_batch(tasks, correct): - results = await asyncio.gather(*tasks) - for result in results: - correct += result["result"] - if result["result"] == 1: - matched_ids.append(result["item_id"]) - else: - mismatch.append(result) - last_processed_item = results[-1] # Get the last processed item - return correct, last_processed_item - - async def save_run_info(filename, last_processed_id): - run_info = { - "last_processed_id": last_processed_id, - "matched_ids": matched_ids, - "mismatched_ids": mismatch, - "correct": correct, - "count": count, - "correct_percentage": correct / count if count > 0 else 0, - "total_prompt_tokens": cost_manager.total_prompt_tokens, - "total_completion_tokens": cost_manager.total_completion_tokens, - "total_cost": cost_manager.total_cost, - } - async with aiofiles.open(filename, "w") as f: - await f.write(json.dumps(run_info, indent=4)) - - if continue_process and os.path.exists(run_filename): - async with aiofiles.open(run_filename, "r") as f: - run_info = json.loads(await f.read()) - last_processed_id = run_info["last_processed_id"] - matched_ids = run_info["matched_ids"] - mismatch = run_info["mismatched_ids"] - correct = run_info["correct"] - count = run_info["count"] - cost_manager.total_prompt_tokens = run_info.get("total_prompt_tokens", 0) - cost_manager.total_completion_tokens = run_info.get("total_completion_tokens", 0) - cost_manager.total_cost = run_info.get("total_cost", 0) - - with tqdm(total=total_count, desc="Evaluating") as pbar: - for index, item in enumerate(data): - item_id = Item(id=index) - - if last_processed_id and item_id <= Item(id=last_processed_id): - continue - if start_id and item_id < Item(id=start_id): - continue - if to_processed_id and item_id > Item(id=to_processed_id): - break - - count += 1 - tasks.append(solve_single_question(item, route=route, stats_storer=stats_storer)) - - if len(tasks) == concurrency_count: - correct, last_processed_item = await process_batch(tasks, correct) - last_processed_id = last_processed_item["item_id"] - tasks = [] # Reset tasks after processing - pbar.set_postfix({"Correct": correct, "count": count, "Last ID": last_processed_id}) - pbar.update(concurrency_count) - - # Save running information after each batch - await save_run_info(filename=run_filename, last_processed_id=last_processed_id) - - # Process remaining tasks - if tasks: - correct, last_processed_item = await process_batch(tasks, correct) - last_processed_id = last_processed_item["item_id"] - pbar.set_postfix({"Correct": correct, "count": count, "Last ID": last_processed_id}) - pbar.update(len(tasks)) - - # Save running information after the final batch - await save_run_info(filename=run_filename, last_processed_id=last_processed_id) - - return correct, count, matched_ids, mismatch - - -def find_expression_end(s): - paren_count = 0 - num_count = 0 - op_count = 0 - start = -1 - - for i, char in enumerate(s): - if start == -1: - if char.isdigit() or char == "(": - start = i - elif char not in " \n": # Skip leading spaces and newlines - paren_count = 0 - num_count = 0 - op_count = 0 - start = -1 - - if start != -1: - if char == "(": - paren_count += 1 - elif char == ")": - paren_count -= 1 - elif char.isdigit() and (i == len(s) - 1 or not s[i + 1].isdigit()): - num_count += 1 - elif char in "+-*/": - op_count += 1 - elif char not in " ()+-*/0123456789": - paren_count = 0 - num_count = 0 - op_count = 0 - start = -1 - - if paren_count == 0 and num_count == 4 and op_count == 3: - return start, i + 1 - - return None, None - - -def extract_solution(solution_str): - start, end = find_expression_end(solution_str) - if start is None or end is None: - return None - - expr = solution_str[start:end].strip() - - # Ensure we have 4 numbers and 3 operators - numbers = re.findall(r"\d+", expr) - operators = re.findall(r"[\+\-\*\/]", expr) - if len(numbers) != 4 or len(operators) != 3: - return None - - return expr - - -def evaluate_expression(expr, numbers): - # Convert all numbers to integers - numbers = [int(num) for num in numbers] - - # Remove all whitespace and parentheses for number checking - expr_clean = re.sub(r"[\s\(\)]", "", expr) - - # Extract all numbers from the expression - expr_numbers = [int(num) for num in re.findall(r"\d+", expr_clean)] - - # Check if the numbers in the expression match the given numbers - if Counter(expr_numbers) != Counter(numbers): - return False - - # Evaluate the expression - try: - result = eval(expr) - return abs(result - 24) < 1e-6 # Allow for small floating-point errors - except: - return False - - -def verify_game24_solution(question, user_answer): - # Extract numbers from the question - numbers = re.findall(r"\d+", question) - - # Ensure we have exactly 4 numbers - if len(numbers) != 4: - return False - - # Extract the solution from the user_answer - solution = extract_solution(user_answer) - if not solution: - return False - - # Verify the solution - return evaluate_expression(solution, numbers) - - -async def solve_single_question( - item, - route="cot", - stats_storer=None, -): - question = item["Question"] - correct_answer_str = item["Answer"] - item_id = item["ID"] # the item_id for update_stats - - # Extract the correct answer after '####' - - correct_answer = correct_answer_str # extract_answer(correct_answer_str) - - # Your solver logic - user_answer_str = await solve_question( - question, - route=route, - correct_answer=correct_answer, - raw_correct_answer=correct_answer_str, - item_id=item_id, - stats_storer=stats_storer, - ) - user_answer = user_answer_str - - if verify_game24_solution( - question, - user_answer, - ): - return {"result": 1, "item_id": item_id, "question": question, "user_answer": user_answer, "idx": item_id} - - else: - # Append the mismatched item to the JSONL file - return { - "result": 0, - "item_id": item_id, - "question": question, - "correct_answer": correct_answer_str, - "user_answer": user_answer, - "idx": item_id, - } - - -# Load ensemble logic from JSON files -def load_execution_config(file_path): - with open(file_path, "r") as file: - ensemble_logic = json.load(file) - return ensemble_logic - - -# Sample solver function (you'll replace this with your actual logic) -async def solve_question(question, route=None, stats_storer=None, **kwargs): - # Implement your problem-solving logic here - # For example, this could be a math solver or text parser - brain = Brain(stats_storer=stats_storer, python_env=RpycPythonEnv(ports=3007), llm=llm) - - obs, score, *_ = await brain.step( - query=question, - route=route, - execution_config=load_execution_config("aime_ensemble.json"), - **kwargs - # stats={}, - # stats_output="aime/stat_output.json" - ) - # print(obs) - return obs - - -async def main(): - data = load_dataset("qq8933/AIME_1983_2024", split="train") - - # - json_storer = JsonStatsStorer("logs/aime_stats_output.json") - - # tracker = AsyncStatsTracker(stats_db_url) - # In your main function or wherever you set up your application - - sql_storer = SqlStatsStorer("postgresql+asyncpg://femtozheng@localhost:5432/aime1") - await sql_storer.init_db() - - stats_storer = MultipleStatsStorer([json_storer, sql_storer]) - - correct, count, matched_ids, mismatched_ids = await evaluate_dataset( - data, - concurrency_count=1, - stats_storer=stats_storer, - continue_process=True, - run_filename="run_aime.json", - ) - - print(f"Accuracy: {correct / count:.2%}") - print(f"Mismatched IDs: {mismatched_ids}") - - -# Run the async main function -if __name__ == "__main__": - asyncio.run(main()) -# Example usage diff --git a/minion/main/input.py b/minion/main/input.py index a372a841..ccdb4c1a 100644 --- a/minion/main/input.py +++ b/minion/main/input.py @@ -77,7 +77,7 @@ class Input(BaseModel): answer: str = "" # The final extracted/processed answer answer_raw: str = "" # Raw answer including chain of thought answer_code: str = "" # Answer in code format if applicable - answer_full: str = "" # Complete output including all details + answer_full: str = "" # Complete output including all details, or should we call it reasoning_content? feedback: str = "" # Feedback for improvement error: str = "" # error for improvement entry_point: str = "" #entry_point name of function for code generation diff --git a/minion/main/worker.py b/minion/main/worker.py index 1a3f3ad6..51110dec 100644 --- a/minion/main/worker.py +++ b/minion/main/worker.py @@ -1003,6 +1003,21 @@ async def execute(self): self.answer = self.input.answer = response return self.answer +class RawMinion(WorkerMinion): + """Raw minion that directly queries LLM without any prompt processing or modifications""" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.input.instruction = "" + + async def execute(self): + node = LmpActionNode(self.brain.llm) + response = await node.execute(self.input.query) + + self.raw_answer = self.input.answer_raw = response + self.answer = self.input.answer = response + return self.answer + diff --git a/minion/providers/azure_inference_provider.py b/minion/providers/azure_inference_provider.py index fa090e8d..a148b6ef 100644 --- a/minion/providers/azure_inference_provider.py +++ b/minion/providers/azure_inference_provider.py @@ -103,7 +103,8 @@ async def generate_stream(self, messages: List[Message], temperature: Optional[f if chunk.choices: completion_tokens += 1 chunk_message = chunk.choices[0].delta.content - full_content += chunk_message - log_llm_stream(chunk_message) + if chunk_message: + full_content += chunk_message + log_llm_stream(chunk_message) #yield chunk_message return full_content \ No newline at end of file