-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Volko
authored and
Volko
committed
Dec 17, 2023
0 parents
commit ab596dd
Showing
20 changed files
with
1,831 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
__pycache__ | ||
.DS_Store | ||
venv |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
import os | ||
import re | ||
import time | ||
import psutil | ||
|
||
class FitnessEvaluator: | ||
@staticmethod | ||
def execute_program(program_code): | ||
try: | ||
local_scope = {} | ||
exec(program_code, globals(), local_scope) | ||
return local_scope.get('optimized_bucket_filler') | ||
except NameError as e: | ||
match = re.search(r"name '(\w+)' is not defined", str(e)) | ||
if match: | ||
missing_module = match.group(1) | ||
full_code = f"import {missing_module}\n" + program_code | ||
try: | ||
exec(full_code, globals(), local_scope) | ||
return local_scope.get('optimized_bucket_filler') | ||
except Exception as e_inner: | ||
print(f"Error executing program after adding import: {e_inner}") | ||
else: | ||
print(f"Error executing program: {e}") | ||
except Exception as e: | ||
print(f"Error executing program: {e}") | ||
|
||
return None | ||
|
||
@staticmethod | ||
def measure_time(func, *args, **kwargs): | ||
start_time = time.time() | ||
result = func(*args, **kwargs) | ||
end_time = time.time() | ||
return end_time - start_time, result | ||
|
||
@staticmethod | ||
def measure_memory(func, *args, **kwargs): | ||
process = psutil.Process(os.getpid()) | ||
memory_before = process.memory_info().rss | ||
result = func(*args, **kwargs) | ||
memory_after = process.memory_info().rss | ||
return memory_after - memory_before, result | ||
|
||
@staticmethod | ||
def scoring_function(buckets, bucket_limit): | ||
total_empty_space = sum(bucket_limit - sum(bucket) for bucket in buckets) | ||
distribution_score = FitnessEvaluator.calculate_distribution_score(buckets) | ||
variance_score = FitnessEvaluator.calculate_variance_score(buckets, bucket_limit) | ||
score = (1000 - total_empty_space) - (10 * len(buckets)) + distribution_score + variance_score | ||
|
||
return max(score, 0) # Ensure the score is not negative | ||
|
||
@staticmethod | ||
def calculate_distribution_score(buckets): | ||
if not buckets: | ||
return 0 | ||
average_fill = sum(sum(bucket) for bucket in buckets) / len(buckets) | ||
return 100 - sum(abs(sum(bucket) - average_fill) for bucket in buckets) / len(buckets) | ||
|
||
@staticmethod | ||
def calculate_variance_score(buckets, bucket_limit): | ||
if not buckets: | ||
return 0 | ||
total_variance = sum((sum(bucket) - bucket_limit / 2) ** 2 for bucket in buckets) / len(buckets) | ||
return 100 - total_variance / (bucket_limit / 2) ** 2 | ||
|
||
@staticmethod | ||
def calculate_fitness_score(time_taken, memory_used, custom_score, weights): | ||
normalized_time_score = 100 / (1 + time_taken) | ||
normalized_memory_score = 100 / (1 + memory_used) | ||
normalized_custom_score = (custom_score / 1000) * 100 | ||
fitness_score = (weights['time'] * normalized_time_score + | ||
weights['memory'] * normalized_memory_score + | ||
weights['score'] * normalized_custom_score) | ||
return fitness_score | ||
|
||
@staticmethod | ||
def evaluate_algorithm(program_code_str, numberList, bucket_limit, weights): | ||
algorithm_func = FitnessEvaluator.execute_program(program_code_str) | ||
|
||
if callable(algorithm_func): | ||
numberList_copy = numberList[:] | ||
time_taken, buckets = FitnessEvaluator.measure_time(algorithm_func, numberList_copy, bucket_limit) | ||
memory_used, _ = FitnessEvaluator.measure_memory(algorithm_func, numberList_copy, bucket_limit) | ||
score = FitnessEvaluator.scoring_function(buckets, bucket_limit) | ||
fitness_score = FitnessEvaluator.calculate_fitness_score(time_taken, memory_used, score, weights) | ||
|
||
return { | ||
"time_taken": time_taken, | ||
"memory_used": memory_used, | ||
"score": score, | ||
"fitness_score": fitness_score, | ||
"buckets": buckets | ||
} | ||
else: | ||
print("Function 'optimized_bucket_filler' not found in the provided code.") | ||
return None | ||
|
||
class GeneticAlgorithmConfig: | ||
def __init__(self, generations=5, population_size=6): | ||
self.generations = generations | ||
self.population_size = population_size | ||
self.previous_results = [] | ||
|
||
def is_iteration_unique(self, current_results): | ||
for prev_results in self.previous_results: | ||
if (current_results['score'] == prev_results['score'] and | ||
current_results['fitness_score'] == prev_results['fitness_score']): | ||
return False | ||
return True | ||
|
||
def add_result(self, result): | ||
self.previous_results.append(result) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,185 @@ | ||
import os | ||
from openai import OpenAI | ||
import ast | ||
import subprocess | ||
import sys | ||
import random | ||
from prompt_manager import PromptManager | ||
from fitness_evaluator import FitnessEvaluator, GeneticAlgorithmConfig | ||
|
||
def initialize_openai_client(): | ||
try: | ||
client = OpenAI() | ||
return client | ||
except Exception as e: | ||
raise Exception(f"Failed to initialize OpenAI client: {e}") | ||
|
||
def query_openai_api(client, prompt): | ||
try: | ||
completion = client.chat.completions.create( | ||
model="gpt-4-1106-preview", | ||
response_format={"type": "json_object"}, | ||
messages=[ | ||
{"role": "system", "content": "You are a helpful assistant designed to output JSON."}, | ||
{"role": "user", "content": prompt} | ||
] | ||
) | ||
return completion.choices[0].message.content | ||
except Exception as general_error: | ||
print(f"General error querying OpenAI API: {general_error}") | ||
|
||
def install_packages(pip_command): | ||
if not pip_command or pip_command == "None": | ||
return True | ||
packages = pip_command.split(',') | ||
for package in packages: | ||
package = package.strip() | ||
try: | ||
subprocess.check_call([sys.executable, '-m', 'pip', 'install', package]) | ||
except subprocess.CalledProcessError as e: | ||
print(f"Error installing package '{package}': {e}") | ||
return False | ||
return True | ||
|
||
def tournament_selection(parents, tournament_size=3): | ||
selected_parents = [] | ||
for _ in range(len(parents) - 2): | ||
tournament = random.sample(parents, tournament_size) | ||
winner = max(tournament, key=lambda x: x['fitness_score']) | ||
selected_parents.append(winner) | ||
return selected_parents | ||
|
||
def apply_elitism(parents, number_of_elites=2): | ||
elites = sorted(parents, key=lambda x: x['fitness_score'], reverse=True)[:number_of_elites] | ||
return elites | ||
|
||
|
||
def main(): | ||
OPENAI_API_KEY = "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" | ||
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY | ||
|
||
client = initialize_openai_client() | ||
prompt_manager = PromptManager() | ||
fitness_evaluator = FitnessEvaluator() | ||
ga_config = GeneticAlgorithmConfig() | ||
|
||
initial_prompt = prompt_manager.get_number_prompt() | ||
response_0 = query_openai_api(client, initial_prompt) | ||
response_data_0 = ast.literal_eval(response_0) | ||
numberList = response_data_0['numberList'] | ||
bucketSize = response_data_0['bucketSize'] | ||
print(numberList) | ||
print(bucketSize) | ||
|
||
master_prompt = prompt_manager.get_master_prompt(numberList, bucketSize) | ||
response_1 = query_openai_api(client, master_prompt) | ||
response_data_1 = ast.literal_eval(response_1) | ||
master_program_code = response_data_1['program_code'] | ||
print(master_program_code) | ||
|
||
if response_data_1['pip_command'] and response_data_1['pip_command'] != "None": | ||
install_packages(response_data_1['pip_command']) | ||
|
||
master_results = fitness_evaluator.evaluate_algorithm(master_program_code, numberList, bucketSize, weights={'time': 0.3, 'memory': 0.2, 'score': 0.5}) | ||
ga_config.add_result(master_results) | ||
|
||
parents = [] | ||
for individual in range(ga_config.population_size): | ||
valid_algorithm = False | ||
retries = 0 | ||
max_retries = 3 | ||
last_program_code = None | ||
|
||
while not valid_algorithm and retries < max_retries: | ||
try: | ||
parent_prompt = prompt_manager.get_parent_prompt( | ||
master_program_code, response_data_1['equation'], | ||
response_data_1['pseudocode'], master_results['buckets'], | ||
master_results['fitness_score'], numberList, bucketSize | ||
) | ||
if retries > 0: | ||
error_prompt = prompt_manager.get_repeat_prompt(last_error_message, last_program_code) | ||
full_prompt = error_prompt + parent_prompt | ||
else: | ||
full_prompt = parent_prompt | ||
response = query_openai_api(client, full_prompt) | ||
response_data = ast.literal_eval(response) | ||
parent_program_code = response_data.get('program_code') | ||
last_program_code = parent_program_code | ||
print(parent_program_code) | ||
|
||
if not parent_program_code: | ||
raise ValueError("No program code generated.") | ||
|
||
if response_data.get('pip_command') and response_data['pip_command'] != "None": | ||
install_packages(response_data['pip_command']) | ||
|
||
parent_results = fitness_evaluator.evaluate_algorithm( | ||
parent_program_code, numberList, bucketSize, weights={'time': 0.3, 'memory': 0.2, 'score': 0.5} | ||
) | ||
|
||
if parent_results is None or not parent_results.get('buckets'): | ||
raise ValueError("Failed to evaluate algorithm or no buckets generated.") | ||
|
||
unique = ga_config.is_iteration_unique(parent_results) | ||
valid_algorithm = unique and parent_results is not None | ||
|
||
except ValueError as ve: | ||
print(f"Validation error during parent generation: {ve}. Retrying...") | ||
retries += 1 | ||
valid_algorithm = False | ||
|
||
except Exception as e: | ||
last_error_message = str(e) | ||
print(f"An error occurred during parent generation: {e}. Retrying...") | ||
retries += 1 | ||
valid_algorithm = False | ||
|
||
if valid_algorithm: | ||
ga_config.add_result(parent_results) | ||
parents.append(parent_results) | ||
print(f"Parent {individual + 1}: {parent_results}") | ||
else: | ||
print(f"Failed to generate a valid parent after {max_retries} attempts for individual {individual + 1}.") | ||
|
||
elite_parents = apply_elitism(parents, number_of_elites=2) | ||
tournament_parents = tournament_selection(parents, tournament_size=3) | ||
|
||
print(elite_parents) | ||
print(tournament_parents) | ||
|
||
children = [] | ||
for i in range(2): | ||
parent1 = elite_parents[i] | ||
parent2 = tournament_parents[i] | ||
|
||
crossover_prompt = prompt_manager.get_crossover_prompt( | ||
parent1['program_code'], parent1['equation'], parent1['pseudocode'], parent1['buckets'], parent1['fitness_score'], | ||
parent2['program_code'], parent2['equation'], parent2['pseudocode'], parent2['buckets'], parent2['fitness_score'], | ||
numberList, bucketSize | ||
) | ||
response = query_openai_api(client, crossover_prompt) | ||
response_data = ast.literal_eval(response) | ||
child_program_code = response_data.get('program_code') | ||
print(child_program_code) | ||
if child_program_code: | ||
if response_data.get('pip_command') and response_data['pip_command'] != "None": | ||
install_packages(response_data['pip_command']) | ||
|
||
child_results = fitness_evaluator.evaluate_algorithm( | ||
child_program_code, numberList, bucketSize, weights={'time': 0.3, 'memory': 0.2, 'score': 0.5} | ||
) | ||
|
||
if child_results: | ||
children.append(child_results) | ||
print(f"Child {i + 1} generated successfully.") | ||
else: | ||
print(f"Failed to evaluate Child {i + 1}.") | ||
else: | ||
print(f"Failed to generate Child {i + 1} program code.") | ||
|
||
top_children = sorted(children, key=lambda x: x['fitness_score'], reverse=True)[:2] | ||
print("Top Children Selected for Next Generation:", top_children) | ||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
# Plan of execution: Bit Packing Algorithm: | ||
## Initialization: | ||
- Initialize connection with OpenAI | ||
- Initialize number of generations - **generations** | ||
- Initialize population size - **population_size** | ||
- **Prompt 1** Perform first LLM Request to only provide the numbers and the bucket limit. | ||
- Store them as: **numbers[list]** and **bucket_limit - int** | ||
- Store seed program in the string called **seed**. | ||
- Store the equation of the program in **equation**. | ||
- We run compile(seed) and exec(seed(numbers, bucket_limit)) -> **buckets**, **bucket_limit**. | ||
- We then pass these results to scorer(buckets, bucket_limit) -> **score**. | ||
|
||
## LLM Preparation: | ||
- **Prompt 2** Tell LLM your goal to optimize and research novel ways to improve and optimize the algorithm, develop new mathematics for better performance. | ||
- Pass the **numbers**, **bucket_limit**, **seed**, **equation**, **buckets** and **score** as input | ||
- Instruct it to strictly produce a json with format: | ||
result = { | ||
"pip": (pip dependencies that can be installed via pip install comma separated) | ||
"Program": {Optimized Program}, | ||
"Equation": {Optimized Program Equation}. | ||
} | ||
- Parse the JSON data to extract the contents -> **pip_dependencies**, **optimized_program**, **optimized_equation** | ||
- Install pip dependencies and run exec(JSON Program) -> **optimized_score** | ||
|
||
## Genetic Algorithm: | ||
- Initialize parents[json]: | ||
- Run for loop on population_size: | ||
- Run **Prompt 2** to produce diverse results producing **pip_dependencies**, **optimized_program**, **optimized_equation** | ||
- Run for loop on generations: | ||
- Initialize **fitness[list]** | ||
- Calculate fitness - Install pip dependencies and run exec(JSON Program) for each parent -> **fitness** | ||
- Initialize **new_parents[json]** | ||
- Run for loop on parents: | ||
- Select **number_of_parents** parents based on **fitness** | ||
- Apply Cross over to create offspring | ||
- Apply Mutation on offspring | ||
- Append to **new_parents** | ||
- **parents** = **new_parents** | ||
|
||
## Select Parents: | ||
- **Prompt 3** Tell the LLM that this is the selection of parents part of genetic algorithm where the **new_parents** and **fitness** is fed. Based on this it will decide **number_of_parents** and produce the list of parents selected along with they json data in a form of a list to be sent to cross over. | ||
|
||
## Cross Over: | ||
- **Prompt 4** Tell the LLM that this is the cross over part of the genetic algorithm and feed it the **number_of_parents** selected parents as a list of json to produce a json result similar to Prompt 2. | ||
- Pass the properties of **number_of_parents** parents. | ||
- Instruct it to strictly produce a json with format: | ||
result = { | ||
"pip": (pip dependencies that can be installed via pip install comma separated) | ||
"Program": {Cross Over Program}, | ||
"Equation": {Cross Over Program Equation}. | ||
} | ||
- Parse the JSON data to extract the contents -> **pip_dependencies**, **optimized_program**, **optimized_equation** | ||
|
||
## Mutation: | ||
- **Prompt 5** Tell the LLM that this is the optional mutation part of the genetic algorithm and feed it the two selected parents to produce a json result similar to Prompt 2. | ||
- Pass the properties of cross over program. | ||
- Instruct it to strictly produce a json with format: | ||
result = { | ||
"pip": (pip dependencies that can be installed via pip install comma separated) | ||
"Program": {Mutated Program}, | ||
"Equation": {Mutated Program Equation}. | ||
} | ||
- Parse the JSON data to extract the contents -> **pip_dependencies**, **optimized_program**, **optimized_equation** | ||
|
||
## Output: | ||
- Final result should be the highest score of the optimized algorithm, the python implementation, the math equation and the results. |
Oops, something went wrong.