Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ def select(
original_survivors, offspring_survivors = population_management.steady_state(
[i.genotype for i in population.individuals],
[i.fitness for i in population.individuals],
[i.genotype for i in offspring],
[i.fitness for i in offspring],
[i.genotype for i in offspring.individuals],
[i.fitness for i in offspring.individuals],
lambda n, genotypes, fitnesses: selection.multiple_unique(
n,
genotypes,
Expand All @@ -112,8 +112,8 @@ def select(
]
+ [
Individual(
genotype=offspring[i].genotype,
fitness=offspring[i].fitness,
genotype=offspring.individuals[i].genotype,
fitness=offspring.individuals[i].fitness,
)
for i in offspring_survivors
]
Expand Down Expand Up @@ -145,10 +145,11 @@ def reproduce(self, population: NDArray[np.int_], **kwargs: Any) -> list[Genotyp
) # We select the population of parents that were passed in KWArgs of the parent selector object.
if parents is None:
raise KeyError("No children passed.")
#print(type(parents))
offspring = [
Genotype.crossover(
parents[parent1_i].genotype,
parents[parent2_i].genotype,
parents.individuals[parent1_i].genotype,
parents.individuals[parent2_i].genotype,
self._rng,
num_parameters=config.NUM_PARAMETERS,
).mutate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ def multiple_unique(
population: list[TIndividual],
fitnesses: list[TFitness],
selection_function: Callable[[list[TIndividual], list[TFitness]], int],
) -> npt.NDArray[np.float_]:
) -> npt.NDArray[np.int_]:
"""
Select multiple distinct individuals from a population using the provided selection function.

:param selection_size: Amount of of individuals to select.
:param selection_size: Amount of individuals to select.
:param population: List of individuals to select from.
:param fitnesses: Fitnesses of the population.
:param selection_function: Function that select a single individual from a population. ([TIndividual], [TFitness]) -> index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ def tournament(rng: np.random.Generator, fitnesses: list[Fitness], k: int) -> in
Perform tournament selection and return the index of the best individual.

:param rng: Random number generator.
:param fitnesses: List of finesses of individuals that joint the tournamente.
:param fitnesses: List of finesses of individuals that joint the tournament.
:param k: Amount of individuals to participate in tournament.
:returns: The index of te individual that won the tournament.
:returns: The index of the individual that won the tournament.
"""
assert len(fitnesses) >= k

Expand Down
6 changes: 6 additions & 0 deletions prototype/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""Configuration parameters for this example."""

NUM_SIMULATORS = 2 # should be number of cores
POPULATION_SIZE = 100
OFFSPRING_SIZE = 50
NUM_GENERATIONS = 100
160 changes: 160 additions & 0 deletions prototype/ea.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
from typing import Any
import multineat
import numpy as np
import numpy.typing as npt
from genotype import Genotype
from individual import Individual
from revolve2.experimentation.evolution.abstract_elements import Reproducer, Selector
from revolve2.experimentation.optimization.ea import population_management, selection


class ParentSelector(Selector):
"""Selector class for parent selection."""

rng: np.random.Generator
offspring_size: int

def __init__(self, offspring_size: int, rng: np.random.Generator) -> None:
"""
Initialize the parent selector.

:param offspring_size: The offspring size.
:param rng: The rng generator.
"""
self.offspring_size = offspring_size
self.rng = rng

def select(
self, population: list[Individual], **kwargs: Any
) -> tuple[npt.NDArray[np.int_], dict[str, list[Individual]]]:
"""
Select the parents.

:param population: The population of robots.
:param kwargs: Other parameters.
:return: The parent pairs.
"""
return np.array(
[
selection.multiple_unique(
selection_size=2,
population=[individual.genotype for individual in population],
fitnesses=[individual.fitness for individual in population],
selection_function=lambda _, fitnesses: selection.tournament(
rng=self.rng, fitnesses=fitnesses, k=1
),
)
for _ in range(self.offspring_size)
],
), {"parent_population": population}


class SurvivorSelector(Selector):
"""Selector class for survivor selection."""

rng: np.random.Generator

def __init__(self, rng: np.random.Generator) -> None:
"""
Initialize the parent selector.

:param rng: The rng generator.
"""
self.rng = rng

def select(
self, population: list[Individual], **kwargs: Any
) -> tuple[list[Individual], dict[str, Any]]:
"""
Select survivors using a tournament.

:param population: The population the parents come from.
:param kwargs: The offspring, with key 'offspring_population'.
:returns: A newly created population.
:raises ValueError: If the population is empty.
"""
offspring = kwargs.get("children")
offspring_fitness = kwargs.get("child_task_performance")
if offspring is None or offspring_fitness is None:
raise ValueError(
"No offspring was passed with positional argument 'children' and / or 'child_task_performance'."
)

original_survivors, offspring_survivors = population_management.steady_state(
old_genotypes=[i.genotype for i in population],
old_fitnesses=[i.fitness for i in population],
new_genotypes=offspring,
new_fitnesses=offspring_fitness,
selection_function=lambda n, genotypes, fitnesses: selection.multiple_unique(
selection_size=n,
population=genotypes,
fitnesses=fitnesses,
selection_function=lambda _, fitnesses: selection.tournament(
rng=self.rng, fitnesses=fitnesses, k=2
),
),
)

return [
Individual(
population[i].genotype,
population[i].fitness,
)
for i in original_survivors
] + [
Individual(
offspring[i],
offspring_fitness[i],
)
for i in offspring_survivors
], {}


class CrossoverReproducer(Reproducer):
"""A simple crossover reproducer using multineat."""

rng: np.random.Generator
innov_db_body: multineat.InnovationDatabase
innov_db_brain: multineat.InnovationDatabase

def __init__(
self,
rng: np.random.Generator,
innov_db_body: multineat.InnovationDatabase,
innov_db_brain: multineat.InnovationDatabase,
):
"""
Initialize the reproducer.

:param rng: The ranfom generator.
:param innov_db_body: The innovation database for the body.
:param innov_db_brain: The innovation database for the brain.
"""
self.rng = rng
self.innov_db_body = innov_db_body
self.innov_db_brain = innov_db_brain

def reproduce(
self, population: npt.NDArray[np.int_], **kwargs: Any
) -> list[Genotype]:
"""
Reproduce the population by crossover.

:param population: The parent pairs.
:param kwargs: Additional keyword arguments.
:return: The genotypes of the children.
:raises ValueError: If the parent population is not passed as a kwarg `parent_population`.
"""
parent_population: list[Individual] | None = kwargs.get("parent_population")
if parent_population is None:
raise ValueError("No parent population given.")

offspring_genotypes = [
Genotype.crossover(
parent_population[parent1_i].genotype,
parent_population[parent2_i].genotype,
self.rng,
).mutate(self.innov_db_body, self.innov_db_brain, self.rng)
for parent1_i, parent2_i in population
]
return offspring_genotypes
93 changes: 93 additions & 0 deletions prototype/evaluator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""Evaluator class."""

from genotype import Genotype

from revolve2.experimentation.evolution.abstract_elements import Evaluator as Eval
from revolve2.modular_robot_simulation import (
ModularRobotScene,
Terrain,
simulate_scenes,
)
from revolve2.simulators.mujoco_simulator import LocalSimulator
from revolve2.standards import terrains
from revolve2.standards.simulation_parameters import make_standard_batch_parameters
import math
from revolve2.modular_robot_simulation import ModularRobotSimulationState


def x_displacement(
begin_state: ModularRobotSimulationState, end_state: ModularRobotSimulationState
) -> float:
"""
Calculate the distance traveled on the x-plane by a single modular robot.

:param begin_state: Begin state of the robot.
:param end_state: End state of the robot.
:returns: The calculated fitness.
"""
begin_position = begin_state.get_pose().position
end_position = end_state.get_pose().position
return math.sqrt(
(begin_position.x - end_position.x) ** 2
)


class Evaluator(Eval):
"""Provides evaluation of robots."""

_simulator: LocalSimulator
_terrain: Terrain

def __init__(
self,
headless: bool,
num_simulators: int,
) -> None:
"""
Initialize this object.

:param headless: `headless` parameter for the physics simulator.
:param num_simulators: `num_simulators` parameter for the physics simulator.
"""
self._simulator = LocalSimulator(
headless=headless, num_simulators=num_simulators
)
self._terrain = terrains.flat()

def evaluate(
self,
population: list[Genotype],
) -> list[float]:
"""
Evaluate multiple robots.

Fitness is the distance traveled on the x plane.

:param population: The robots to simulate.
:returns: Fitnesses of the robots.
"""
robots = [genotype.develop() for genotype in population]
# Create the scenes.
scenes = []
for robot in robots:
scene = ModularRobotScene(terrain=self._terrain)
scene.add_robot(robot)
scenes.append(scene)

# Simulate all scenes.
scene_states = simulate_scenes(
simulator=self._simulator,
batch_parameters=make_standard_batch_parameters(),
scenes=scenes,
)

# Calculate the x displacement (only one direction)
x_displacements = [
x_displacement(
states[0].get_modular_robot_simulation_state(robot),
states[-1].get_modular_robot_simulation_state(robot),
)
for robot, states in zip(robots, scene_states)
]

return x_displacements
Loading