Skip to content

Commit 63f6f88

Browse files
committed
Parallel processing with adaptive mutation
1 parent a32eed8 commit 63f6f88

File tree

3 files changed

+581
-85
lines changed

3 files changed

+581
-85
lines changed

pygad/pygad.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1849,7 +1849,6 @@ def cal_pop_fitness(self):
18491849
if type(batch_fitness) not in [list, tuple, numpy.ndarray]:
18501850
raise TypeError(f"Expected to receive a list, tuple, or numpy.ndarray from the fitness function but the value ({batch_fitness}) of type {type(batch_fitness)}.")
18511851
elif len(numpy.array(batch_fitness)) != len(batch_indices):
1852-
18531852
raise ValueError(f"There is a mismatch between the number of solutions passed to the fitness function ({len(batch_indices)}) and the number of fitness values returned ({len(batch_fitness)}). They must match.")
18541853

18551854
for index, fitness in zip(batch_indices, batch_fitness):

pygad/utils/mutation.py

Lines changed: 100 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import random
77

88
import pygad
9+
import concurrent.futures
910

1011
class Mutation:
1112

@@ -483,40 +484,108 @@ def adaptive_mutation_population_fitness(self, offspring):
483484
# This is a single-objective optimization problem.
484485
fitness[first_idx:last_idx] = [0]*(last_idx - first_idx)
485486

486-
if self.fitness_batch_size in [1, None]:
487-
# Calculate the fitness for each individual solution.
488-
for idx in range(first_idx, last_idx):
489-
# We cannot return the index of the solution within the population.
490-
# Because the new solution (offspring) does not yet exist in the population.
491-
# The user should handle this situation if the solution index is used anywhere.
492-
fitness[idx] = self.fitness_func(self,
493-
temp_population[idx],
494-
None)
487+
# # No parallel processing.
488+
if self.parallel_processing is None:
489+
if self.fitness_batch_size in [1, None]:
490+
# Calculate the fitness for each individual solution.
491+
for idx in range(first_idx, last_idx):
492+
# We cannot return the index of the solution within the population.
493+
# Because the new solution (offspring) does not yet exist in the population.
494+
# The user should handle this situation if the solution index is used anywhere.
495+
fitness[idx] = self.fitness_func(self,
496+
temp_population[idx],
497+
None)
498+
else:
499+
# Calculate the fitness for batch of solutions.
500+
501+
# Number of batches.
502+
num_batches = int(numpy.ceil((last_idx - first_idx) / self.fitness_batch_size))
503+
504+
for batch_idx in range(num_batches):
505+
# The index of the first solution in the current batch.
506+
batch_first_index = first_idx + batch_idx * self.fitness_batch_size
507+
# The index of the last solution in the current batch.
508+
if batch_idx == (num_batches - 1):
509+
batch_last_index = last_idx
510+
else:
511+
batch_last_index = first_idx + (batch_idx + 1) * self.fitness_batch_size
512+
513+
# Calculate the fitness values for the batch.
514+
# We cannot return the index/indices of the solution(s) within the population.
515+
# Because the new solution(s) (offspring) do(es) not yet exist in the population.
516+
# The user should handle this situation if the solution index is used anywhere.
517+
fitness_temp = self.fitness_func(self,
518+
temp_population[batch_first_index:batch_last_index],
519+
None)
520+
# Insert the fitness of each solution at the proper index.
521+
for idx in range(batch_first_index, batch_last_index):
522+
fitness[idx] = fitness_temp[idx - batch_first_index]
523+
495524
else:
496-
# Calculate the fitness for batch of solutions.
525+
# Parallel processing
526+
# Decide which class to use based on whether the user selected "process" or "thread"
527+
# TODO Add ExecutorClass as an instance attribute in the pygad.GA instances. Then retrieve this instance here instead of creating a new one.
528+
if self.parallel_processing[0] == "process":
529+
ExecutorClass = concurrent.futures.ProcessPoolExecutor
530+
else:
531+
ExecutorClass = concurrent.futures.ThreadPoolExecutor
532+
533+
# We can use a with statement to ensure threads are cleaned up promptly (https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor-example)
534+
with ExecutorClass(max_workers=self.parallel_processing[1]) as executor:
535+
# Indices of the solutions to calculate its fitness.
536+
solutions_to_submit_indices = list(range(first_idx, last_idx))
537+
# The solutions to calculate its fitness.
538+
solutions_to_submit = [temp_population[sol_idx].copy() for sol_idx in solutions_to_submit_indices]
539+
if self.fitness_batch_size in [1, None]:
540+
# Use parallel processing to calculate the fitness of the solutions.
541+
for index, sol_fitness in zip(solutions_to_submit_indices, executor.map(self.fitness_func, [self]*len(solutions_to_submit_indices), solutions_to_submit, solutions_to_submit_indices)):
542+
if type(sol_fitness) in self.supported_int_float_types:
543+
# The fitness function returns a single numeric value.
544+
# This is a single-objective optimization problem.
545+
fitness[index] = sol_fitness
546+
elif type(sol_fitness) in [list, tuple, numpy.ndarray]:
547+
# The fitness function returns a list/tuple/numpy.ndarray.
548+
# This is a multi-objective optimization problem.
549+
fitness[index] = sol_fitness
550+
else:
551+
raise ValueError(f"The fitness function should return a number or an iterable (list, tuple, or numpy.ndarray) but the value {sol_fitness} of type {type(sol_fitness)} found.")
552+
else:
553+
# Reaching this point means that batch processing is in effect to calculate the fitness values.
554+
# Number of batches.
555+
num_batches = int(numpy.ceil(len(solutions_to_submit_indices) / self.fitness_batch_size))
556+
# Each element of the `batches_solutions` list represents the solutions in one batch.
557+
batches_solutions = []
558+
# Each element of the `batches_indices` list represents the solutions' indices in one batch.
559+
batches_indices = []
560+
# For each batch, get its indices and call the fitness function.
561+
for batch_idx in range(num_batches):
562+
batch_first_index = batch_idx * self.fitness_batch_size
563+
batch_last_index = (batch_idx + 1) * self.fitness_batch_size
564+
batch_indices = solutions_to_submit_indices[batch_first_index:batch_last_index]
565+
batch_solutions = self.population[batch_indices, :]
566+
567+
batches_solutions.append(batch_solutions)
568+
batches_indices.append(batch_indices)
569+
570+
for batch_indices, batch_fitness in zip(batches_indices, executor.map(self.fitness_func, [self]*len(solutions_to_submit_indices), batches_solutions, batches_indices)):
571+
if type(batch_fitness) not in [list, tuple, numpy.ndarray]:
572+
raise TypeError(f"Expected to receive a list, tuple, or numpy.ndarray from the fitness function but the value ({batch_fitness}) of type {type(batch_fitness)}.")
573+
elif len(numpy.array(batch_fitness)) != len(batch_indices):
574+
raise ValueError(f"There is a mismatch between the number of solutions passed to the fitness function ({len(batch_indices)}) and the number of fitness values returned ({len(batch_fitness)}). They must match.")
575+
576+
for index, sol_fitness in zip(batch_indices, batch_fitness):
577+
if type(sol_fitness) in self.supported_int_float_types:
578+
# The fitness function returns a single numeric value.
579+
# This is a single-objective optimization problem.
580+
fitness[index] = sol_fitness
581+
elif type(sol_fitness) in [list, tuple, numpy.ndarray]:
582+
# The fitness function returns a list/tuple/numpy.ndarray.
583+
# This is a multi-objective optimization problem.
584+
fitness[index] = sol_fitness
585+
else:
586+
raise ValueError(f"The fitness function should return a number or an iterable (list, tuple, or numpy.ndarray) but the value ({sol_fitness}) of type {type(sol_fitness)} found.")
497587

498-
# Number of batches.
499-
num_batches = int(numpy.ceil((last_idx - first_idx) / self.fitness_batch_size))
500588

501-
for batch_idx in range(num_batches):
502-
# The index of the first solution in the current batch.
503-
batch_first_index = first_idx + batch_idx * self.fitness_batch_size
504-
# The index of the last solution in the current batch.
505-
if batch_idx == (num_batches - 1):
506-
batch_last_index = last_idx
507-
else:
508-
batch_last_index = first_idx + (batch_idx + 1) * self.fitness_batch_size
509-
510-
# Calculate the fitness values for the batch.
511-
# We cannot return the index/indices of the solution(s) within the population.
512-
# Because the new solution(s) (offspring) do(es) not yet exist in the population.
513-
# The user should handle this situation if the solution index is used anywhere.
514-
fitness_temp = self.fitness_func(self,
515-
temp_population[batch_first_index:batch_last_index],
516-
None)
517-
# Insert the fitness of each solution at the proper index.
518-
for idx in range(batch_first_index, batch_last_index):
519-
fitness[idx] = fitness_temp[idx - batch_first_index]
520589

521590
if len(fitness.shape) > 1:
522591
# TODO This is a multi-objective optimization problem.

0 commit comments

Comments
 (0)