Skip to content

Commit

Permalink
Modularized
Browse files Browse the repository at this point in the history
  • Loading branch information
Volkopat committed Dec 18, 2023
1 parent 2207686 commit bfe2231
Show file tree
Hide file tree
Showing 3 changed files with 826 additions and 964 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
__pycache__
.DS_Store
venv
venv
.env
170 changes: 93 additions & 77 deletions fun_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
import sys
import random
import logging
from dotenv import load_dotenv
from openai import OpenAI
from prompt_manager import PromptManager
from fitness_evaluator import FitnessEvaluator, GeneticAlgorithmConfig
load_dotenv()

logging.basicConfig(filename='genetic_algorithm.log', filemode='w', level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')

def initialize_openai_client():

try:
client = OpenAI()
return client
Expand All @@ -19,6 +22,7 @@ def initialize_openai_client():
raise

def query_openai_api(client, prompt):

try:
completion = client.chat.completions.create(
model="gpt-4-1106-preview",
Expand All @@ -33,6 +37,7 @@ def query_openai_api(client, prompt):
logging.error(f"General error querying OpenAI API: {general_error}")

def install_packages(pip_command):

if not pip_command or pip_command == "None":
return True
packages = pip_command.split(',')
Expand All @@ -46,6 +51,7 @@ def install_packages(pip_command):
return True

def tournament_selection(parents, tournament_size=3):

selected_parents = []
for _ in range(len(parents) - 2):
tournament = random.sample(parents, tournament_size)
Expand All @@ -54,18 +60,37 @@ def tournament_selection(parents, tournament_size=3):
return selected_parents

def apply_elitism(parents, number_of_elites=2):

elites = sorted(parents, key=lambda x: x['evaluation_results']['fitness_score'], reverse=True)[:number_of_elites]
return elites

def main():
OPENAI_API_KEY = "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"
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY

client = initialize_openai_client()
prompt_manager = PromptManager()
fitness_evaluator = FitnessEvaluator()
ga_config = GeneticAlgorithmConfig()
def log_program_details(program_info, title=None):

if title:
logging.info(f"----- {title} -----")

if 'program_code' in program_info:
logging.info(f"Program Code:\n{program_info['program_code']}")

if 'equation' in program_info:
logging.info(f"Equation:\n{program_info['equation']}")

if 'pseudocode' in program_info:
logging.info(f"Pseudocode:\n{program_info['pseudocode']}")

if 'evaluation_results' in program_info:
eval_results = program_info['evaluation_results']
logging.info("Evaluation Results:")
logging.info(f"Time Taken: {eval_results.get('time_taken', 'N/A')}")
logging.info(f"Memory Used: {eval_results.get('memory_used', 'N/A')}")
logging.info(f"Score: {eval_results.get('score', 'N/A')}")
logging.info(f"Fitness Score: {eval_results.get('fitness_score', 'N/A')}")
logging.info(f"Buckets: {eval_results.get('buckets', 'N/A')}")

if 'pip_command' in program_info and program_info['pip_command'] not in ["None", None, ""]:
install_packages(program_info['pip_command'])

def query_and_log_initial_data(prompt_manager, client, fitness_evaluator, ga_config):
try:
initial_prompt = prompt_manager.get_number_prompt()
response_0 = query_openai_api(client, initial_prompt)
Expand All @@ -78,28 +103,30 @@ def main():
master_prompt = prompt_manager.get_master_prompt(numberList, bucketSize)
response_1 = query_openai_api(client, master_prompt)
response_data_1 = ast.literal_eval(response_1)
master_program_code = response_data_1['program_code']
logging.info(f"----- Master Program Details -----")
logging.info(f"Master Program Code:\n{master_program_code}")
logging.info(f"Equation:\n{response_data_1['equation']}")
logging.info(f"Pseudocode:\n{response_data_1['pseudocode']}")
log_program_details(response_data_1, "Master Program Details")

if response_data_1['pip_command'] and response_data_1['pip_command'] != "None":
if 'pip_command' in response_data_1 and response_data_1['pip_command'] not in ["None", None, ""]:
install_packages(response_data_1['pip_command'])

master_results = fitness_evaluator.evaluate_algorithm(master_program_code, numberList, bucketSize, weights={'time': 0.3, 'memory': 0.2, 'score': 0.5})
master_results = fitness_evaluator.evaluate_algorithm(response_data_1['program_code'], numberList, bucketSize, weights={'time': 0.3, 'memory': 0.2, 'score': 0.5})
ga_config.add_result(master_results)
log_program_details(master_results, "Master Results Evaluation")

logging.info(f"Master Results Evaluation:")
logging.info(f"Time Taken: {master_results['time_taken']}")
logging.info(f"Memory Used: {master_results['memory_used']}")
logging.info(f"Score: {master_results['score']}")
logging.info(f"Fitness Score: {master_results['fitness_score']}")
logging.info(f"Buckets: {master_results['buckets']}")
return {
'numberList': numberList,
'bucketSize': bucketSize,
'program_code': response_data_1['program_code'],
'equation': response_data_1.get('equation', ''),
'pseudocode': response_data_1.get('pseudocode', ''),
**master_results
}

except Exception as e:
logging.error("An error occurred: ", exc_info=True)


def generate_parents(prompt_manager, client, fitness_evaluator, ga_config, master_program_details):

parents = []
for individual in range(ga_config.population_size):
valid_algorithm = False
Expand All @@ -111,19 +138,20 @@ def main():
while not valid_algorithm and retries < max_retries:
try:
parent_prompt = prompt_manager.get_parent_prompt(
master_program_code, response_data_1['equation'],
response_data_1['pseudocode'], master_results['buckets'],
master_results['fitness_score'], numberList, bucketSize
master_program_details['program_code'],
master_program_details['equation'],
master_program_details['pseudocode'],
master_program_details['buckets'],
master_program_details['fitness_score'],
master_program_details['numberList'],
master_program_details['bucketSize']
)
if retries > 0:
error_prompt = prompt_manager.get_repeat_prompt(last_error_message, last_program_code)
full_prompt = error_prompt + parent_prompt
else:
full_prompt = parent_prompt
full_prompt = prompt_manager.get_repeat_prompt(last_error_message, last_program_code) + parent_prompt if retries > 0 else parent_prompt
response = query_openai_api(client, full_prompt)
response_data = ast.literal_eval(response)
parent_program_code = response_data.get('program_code')
last_program_code = parent_program_code
print

if not parent_program_code:
raise ValueError("No program code generated.")
Expand All @@ -132,7 +160,10 @@ def main():
install_packages(response_data['pip_command'])

parent_results = fitness_evaluator.evaluate_algorithm(
parent_program_code, numberList, bucketSize, weights={'time': 0.3, 'memory': 0.2, 'score': 0.5}
parent_program_code,
master_program_details['numberList'],
master_program_details['bucketSize'],
weights={'time': 0.3, 'memory': 0.2, 'score': 0.5}
)

if parent_results is None or not parent_results.get('buckets'):
Expand Down Expand Up @@ -161,49 +192,22 @@ def main():
}
ga_config.add_result(parent_results)
parents.append(parent_info)
logging.info(f"----- Parent {individual + 1} Details -----")
logging.info(f"Program Code:\n{parent_info['program_code']}")
logging.info(f"Equation:\n{parent_info['equation']}")
logging.info(f"Pseudocode:\n{parent_info['pseudocode']}")
logging.info(f"Evaluation Results:\n"
f"Time Taken: {parent_info['evaluation_results']['time_taken']}\n"
f"Memory Used: {parent_info['evaluation_results']['memory_used']}\n"
f"Score: {parent_info['evaluation_results']['score']}\n"
f"Fitness Score: {parent_info['evaluation_results']['fitness_score']}\n"
f"Buckets: {parent_info['evaluation_results']['buckets']}")
log_program_details(parent_info, f"Parent {individual + 1} Details")
else:
logging.error(f"Failed to generate a valid parent after {max_retries} attempts for individual {individual + 1}.")

elite_parents = apply_elitism(parents, number_of_elites=2)
tournament_parents = tournament_selection(parents, tournament_size=3)
return parents

def generate_children(prompt_manager, client, fitness_evaluator, parents, numberList, bucketSize, elite_count=2, tournament_size=3):

elite_parents = apply_elitism(parents, number_of_elites=elite_count)
tournament_parents = tournament_selection(parents, tournament_size=tournament_size)

children = []
for i in range(2):
parent1 = elite_parents[i]
parent2 = tournament_parents[i]

logging.info(f"----- Crossover Details for Child {i + 1} -----")
logging.info(f"Elite Parent Details:")
logging.info(f"Program Code:\n{parent1['program_code']}")
logging.info(f"Equation:\n{parent1['equation']}")
logging.info(f"Pseudocode:\n{parent1['pseudocode']}")
logging.info(f"Evaluation Results:\n"
f"Time Taken: {parent1['evaluation_results']['time_taken']}\n"
f"Memory Used: {parent1['evaluation_results']['memory_used']}\n"
f"Score: {parent1['evaluation_results']['score']}\n"
f"Fitness Score: {parent1['evaluation_results']['fitness_score']}\n"
f"Buckets: {parent1['evaluation_results']['buckets']}")

logging.info(f"Tournament Parent Details:")
logging.info(f"Program Code:\n{parent2['program_code']}")
logging.info(f"Equation:\n{parent2['equation']}")
logging.info(f"Pseudocode:\n{parent2['pseudocode']}")
logging.info(f"Evaluation Results:\n"
f"Time Taken: {parent2['evaluation_results']['time_taken']}\n"
f"Memory Used: {parent2['evaluation_results']['memory_used']}\n"
f"Score: {parent2['evaluation_results']['score']}\n"
f"Fitness Score: {parent2['evaluation_results']['fitness_score']}\n"
f"Buckets: {parent2['evaluation_results']['buckets']}")
parent1, parent2 = elite_parents[i], tournament_parents[i]
log_program_details(parent1, f"Crossover Details for Child {i + 1} - Elite Parent Details")
log_program_details(parent2, "Tournament Parent Details")

crossover_prompt = prompt_manager.get_crossover_prompt(
parent1['program_code'], parent1['equation'], parent1['pseudocode'], parent1['evaluation_results']['buckets'], parent1['evaluation_results']['fitness_score'],
Expand All @@ -213,6 +217,7 @@ def main():
response = query_openai_api(client, crossover_prompt)
response_data = ast.literal_eval(response)
child_program_code = response_data.get('program_code')

if child_program_code:
if response_data.get('pip_command') and response_data['pip_command'] != "None":
install_packages(response_data['pip_command'])
Expand All @@ -229,23 +234,34 @@ def main():
'pseudocode': response_data.get('pseudocode')
}
children.append(child_info)
logging.info(f"----- Child Details -----")
logging.info(f"Program Code:\n{child_info['program_code']}")
logging.info(f"Equation:\n{child_info['equation']}")
logging.info(f"Pseudocode:\n{child_info['pseudocode']}")
logging.info(f"Evaluation Results:\n"
f"Time Taken: {child_info['evaluation_results']['time_taken']}\n"
f"Memory Used: {child_info['evaluation_results']['memory_used']}\n"
f"Score: {child_info['evaluation_results']['score']}\n"
f"Fitness Score: {child_info['evaluation_results']['fitness_score']}\n"
f"Buckets: {child_info['evaluation_results']['buckets']}")
log_program_details(child_info, f"Child {i + 1} Details")
else:
logging.error(f"Failed to evaluate Child {i + 1}.")
else:
logging.error(f"Failed to generate Child {i + 1} program code.")

top_children = sorted(children, key=lambda x: x['evaluation_results']['fitness_score'], reverse=True)[:2]
logging.info(f"Top Children Selected for Next Generation: {top_children}")
return top_children

def main():
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY

client = initialize_openai_client()
prompt_manager = PromptManager()
fitness_evaluator = FitnessEvaluator()
ga_config = GeneticAlgorithmConfig()

try:
master_program_details = query_and_log_initial_data(prompt_manager, client, fitness_evaluator, ga_config)
parents = generate_parents(prompt_manager, client, fitness_evaluator, ga_config, master_program_details)
top_children = generate_children(prompt_manager, client, fitness_evaluator, parents, master_program_details['numberList'], master_program_details['bucketSize'])

logging.info(f"Top Children Selected for Next Generation: {top_children}")

except Exception as e:
logging.error("An error occurred in the main process: ", exc_info=True)

if __name__ == "__main__":
main()
Loading

0 comments on commit bfe2231

Please sign in to comment.