Robot EvolutionΒΆ
How does ARIEL EC module work?ΒΆ
The ARIEL EC (Evolutionary Computing) module works a bit different than other EAs (Evolutionary Algorithms). While other EAs represent the population as a simple list of individuals, here they the population is made as its own class, with a population being type of list[Individuals].
Similarly, in traditional EA architecture an individual is chosen for certain operators (for example parent selection) according to some criteria, put into a separate list and then given to the operator function. ARIEL works by giving individuals what we call tags. An individual has tags that can be toggled, which qualify it for any and all operations. The tag can be whether an individual can crossover or mutate in the future, but it can also show if it can enter the learning cycle.
The tags can be changed at all times, and default values for each tag can be given to more closely represent a traditional EA structure.
Additionally, ARIEL utilizes an SQL database to handle the variables and outputs of the code. This makes the code run faster, but it adds an extra step to the process.
This file demonstrates the process of initializing an EA class and running it for a complex problem, in our case, evolving the morphology and controllers of a robot in simulation.
[ ]:
# Imports
import random
import time
from pathlib import Path
from typing import TYPE_CHECKING
import mujoco as mj
# Learning EA
import nevergrad as ng
# from mujoco import viewer
import numpy as np
# Ray for parallelisation
import torch
# Type Checking
# Pretty little errors and progress bars
from rich.console import Console
from rich.traceback import install
# Import torch for brain controller
from torch import nn
from ariel.body_phenotypes.robogen_lite.constructor import (
construct_mjspec_from_graph,
)
from ariel.body_phenotypes.robogen_lite.decoders.hi_prob_decoding import (
HighProbabilityDecoder,
)
# Ariel Imports
from ariel.ec.a001 import Individual
from ariel.ec.a004 import EA, EASettings, EAStep, Population
from ariel.ec.a005 import Crossover
# Body imports
from ariel.ec.genotypes.nde import NeuralDevelopmentalEncoding
from ariel.simulation.controllers.controller import Controller, Tracker
from ariel.simulation.controllers.utils.data_get import get_state_from_data
# Import World
from ariel.simulation.environments import (
SimpleFlatWorld,
)
from ariel.utils.runners import simple_runner
if TYPE_CHECKING:
from networkx import DiGraph
# Initialize rich console and traceback handler
install()
console = Console()
Set up paths and configurationΒΆ
[ ]:
# Will probably have to fix the paths at some point
CWD = Path.cwd()
DATA = Path(CWD / "__data__" / "Robot_Evolution")
DATA.mkdir(exist_ok=True)
# Show cwd
# print(f"Current working directory: {CWD}")
# print(f"Saving data to {DATA}")
[ ]:
# A seed is optional, but it helps with reproducibility
SEED = None # e.g., 42
NUM_MODULES = 20
GENE_SIZE = 64 # default is 64, change it in the decoder/NDE
GENE_RANGE = 64
POP_SIZE = 50
NUM_GENERATIONS = 100
# Initialize RNG
RNG = np.random.default_rng(SEED)
[ ]:
# Set config
config = EASettings()
config.is_maximisation = False
config.db_handling = "delete"
config.target_population_size = 100
Define Neural Developmental Encoding (NDE) and NetworkΒΆ
[ ]:
NDE = NeuralDevelopmentalEncoding(
number_of_modules=NUM_MODULES, # Seems to be a good value
genotype_size=64,
)
torch.save(NDE.state_dict(), DATA / "NDE.pth")
[ ]:
class Network(nn.Module):
def __init__(
self,
input_size: int,
output_size: int,
hidden_size: int,
) -> None:
super().__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, hidden_size)
self.fc4 = nn.Linear(hidden_size, output_size)
self.hidden_activation = nn.ELU()
self.output_activation = nn.Tanh()
self.input = input_size
# Disable gradients for all parameters
for param in self.parameters():
param.requires_grad = False
@torch.inference_mode()
def forward(self, model, data):
x = torch.Tensor(get_state_from_data(data))
x = self.hidden_activation(self.fc1(x))
x = self.hidden_activation(self.fc2(x))
x = self.output_activation(self.fc4(x)) * (torch.pi / 2)
return x.detach().numpy()
@torch.no_grad()
def fill_parameters(net: nn.Module, vector: torch.Tensor) -> None:
"""Fill the parameters of a torch module (net) from a 1-D vector.
No gradient information is kept.
The vector's length must be exactly the same with the number
of parameters of the PyTorch module.
Parameters
----------
net: nn.Module
The torch module whose parameter values will be filled.
vector: torch.Tensor
A 1-D torch tensor which stores the parameter values.
"""
address = 0
for p in net.parameters():
d = p.data.view(-1)
n = len(d)
d[:] = torch.as_tensor(vector[address : address + n], device=d.device)
address += n
Define fitness and learning functionsΒΆ
[ ]:
# Currently Completed
def create_random_individual() -> Individual:
"""Create and initialise a random BODY individual.
Returns
-------
ind: Individual
A newly created individual with a randomized genotype.
"""
ind = Individual()
ind.genotype = RNG.normal(
loc=0,
scale=64,
size=(3, GENE_SIZE),
).tolist()
return ind
# Currently Completed
def gene_to_robot(genotype, num_modules: int = 20) -> mj.MjSpec:
"""Generate a robot specification from a gene.
Parameters
----------
genotype: list or array-like
The genotype of the individual to decode.
num_modules: int, default=20
The number of modules to use in the HighProbabilityDecoder.
Returns
-------
mujoco.MjSpec
The generated MuJoCo specification for the robot.
"""
p_matrices = NDE.forward(genotype)
# Decode the high-probability graph
hpd = HighProbabilityDecoder(num_modules)
robot_graph: DiGraph = hpd.probability_matrices_to_graph(
p_matrices[0],
p_matrices[1],
p_matrices[2],
)
robot_spec = construct_mjspec_from_graph(robot_graph)
return robot_spec.spec
# Currently Completed
def parent_selection(population: Population) -> Population:
"""Tournament Selection.
Selects parents for the next generation using a tournament selection
mechanism. Tags the winners with 'ps' (parent selection).
Parameters
----------
population : Population
The current population of individuals.
Returns
-------
Population
The updated population with selected parents tagged.
"""
tournament_size: int = 3
# Ensure all individuals have a tags dict and reset parent-selection tag
for ind in population:
ind.tags["ps"] = False
# Decide how many parents we want (even number)
num_parents = (len(population) // 2) * 2
if num_parents == 0 and len(population) >= 2:
num_parents = 2
for _ in range(num_parents):
# sample competitors with replacement
competitors = [
random.choice(population) for _ in range(tournament_size)
]
# pick best competitor depending on maximisation/minimisation
if config.is_maximisation:
winner = max(competitors, key=lambda ind: ind.fitness)
else:
winner = min(competitors, key=lambda ind: ind.fitness)
winner.tags["ps"] = True
return population
# Currently Completed
def crossover(population: Population) -> Population:
"""One point crossover.
Performs one-point crossover on individuals tagged for parent
selection ('ps'). Generates children and appends them to the
population with a 'mut' tag.
Parameters
----------
population : Population
The current population containing selected parents.
Returns
-------
Population
The population extended with the newly created children.
"""
parents = [ind for ind in population if ind.tags.get("ps", False)]
for idx in range(0, len(parents), 2):
parent_i = parents[idx]
parent_j = parents[idx]
genotype_i, genotype_j = Crossover.one_point(
parent_i.genotype,
parent_j.genotype,
)
# First child
child_i = Individual()
child_i.genotype = genotype_i
child_i.tags = {"mut": True}
child_i.requires_eval = True
# Second child
child_j = Individual()
child_j.genotype = genotype_j
child_j.tags = {"mut": True}
child_j.requires_eval = True
# Add children to population
population.extend([child_i, child_j])
return population
# Currently Completed
def mutation(population: Population) -> Population:
""""Gaussian mutation.
Applies Gaussian mutation to individuals tagged for mutation ('mut').
Parameters
----------
population : Population
The current population containing children to be mutated.
Returns
-------
Population
The population with mutated individuals.
"""
for ind in population:
if ind.tags.get("mut", False):
gene_shape = np.array(ind.genotype).shape
genes = np.array(ind.genotype).flatten().copy()
if random.random() < 0.7:
mutated = genes + RNG.normal(
loc=0,
scale=4,
size=len(genes),
)
else:
mutated = genes.copy()
ind.genotype = mutated.reshape(gene_shape).astype(float).tolist()
return population
# Currently Completed
def survivor_selection(population: Population) -> Population:
"""Tournament Survivor Selection.
Kills off individuals based on a tournament selection until the
population size is reduced back to the target size.
Parameters
----------
population : Population
The current population including parents and children.
Returns
-------
Population
The surviving population.
"""
tournament_size: int = 5
pop_len = len(population)
for _ in range(POP_SIZE):
# Sample competitors with replacement
pop_alive = [ind for ind in population if ind.alive is True]
death_candidates = [
# RNG.choice(pop_alive) for _ in range(tournament_size)
random.choice(pop_alive)
for _ in range(tournament_size)
]
# Pick best competitor depending on maximisation/minimisation
if config.is_maximisation:
about_to_be_killed_lol = min(
death_candidates,
key=lambda ind: ind.fitness,
)
else:
about_to_be_killed_lol = max(
death_candidates,
key=lambda ind: ind.fitness,
)
about_to_be_killed_lol.alive = False
pop_len -= 1
if pop_len <= POP_SIZE:
break
return population
def individual_learn(individual: Individual) -> float:
"""Perform learning for one individual.
Evaluates an individual by decoding its genotype into a MuJoCo robot
specification, setting up the simulation, and learning optimal controller
weights using CMA-ES via Nevergrad.
Parameters
----------
individual: Individual
The individual whose fitness and controller are to be evaluated.
Returns
-------
float
The minimum fitness (distance) achieved during the learning budget.
"""
p_matrices = NDE.forward(np.array(individual.genotype))
# Hardcoded num_modules=20 based on your global var, or pass it in if dynamic
hpd = HighProbabilityDecoder(num_modules=20)
robot_graph = hpd.probability_matrices_to_graph(
p_matrices[0],
p_matrices[1],
p_matrices[2],
)
robot_spec = construct_mjspec_from_graph(robot_graph).spec
# 2. Simulation Setup (Logic from evaluate_single)
mj.set_mjcb_control(None)
world = SimpleFlatWorld()
world.spawn(
robot_spec,
position=(0.0, 0.0, 0.1),
correct_collision_with_floor=True,
)
model = world.spec.compile()
data = mj.MjData(model)
net = Network(
input_size=len(get_state_from_data(data)),
output_size=model.nu,
hidden_size=32,
)
# Generate weights for vec
num_vars: int = sum(p.numel() for p in net.parameters())
pop_size = 30
budget = 50
min_fit = np.inf
param = ng.p.Array(shape=(num_vars,))
temp_vec_learner = ng.optimizers.registry["CMA"](
parametrization=param, budget=(pop_size * budget),
)
tracker = Tracker(
name_to_bind="core",
observable_attributes=["xpos"],
quiet=True,
)
tracker.setup(world.spec, data)
controller = Controller(
controller_callback_function=net.forward, tracker=tracker
)
for _ in range(budget):
vecs = [temp_vec_learner.ask() for _ in range(pop_size)]
for vec_candidate in vecs:
vec = vec_candidate.value
# 3. Network Construction
fill_parameters(net, vec)
mj.mj_resetData(model, data)
mj.set_mjcb_control(controller.set_control)
# 4. Run Simulation
simple_runner(model, data, duration=15)
# 5. Calculate Fitness
xc, yc, zc = data.qpos[0:3].copy()
xt, yt, zt = (2.0, 0.0, 0.1)
fitness = np.sqrt((xt - xc) ** 2 + (yt - yc) ** 2 + (zt - zc) ** 2)
temp_vec_learner.tell(vec_candidate, fitness)
min_fit = min(min_fit, fitness)
return min_fit
def pop_learn(population: Population) -> Population:
"""Do learning for the entire population.
Iterates over the population and evaluates the fitness of each individual
by performing a learning cycle.
Parameters
----------
population : Population
The current population to be evaluated.
Returns
-------
Population
The evaluated population with updated fitness values.
"""
for ind in population:
ind.fitness = individual_learn(ind)
return population
Define evolutionary loopΒΆ
[8]:
def evolve() -> EA:
"""Entry point for the evolutionary algorithm.
Initializes the population, evaluates it, defines the evolutionary
steps (parent selection, crossover, mutation, learning, survivor selection),
and runs the EA.
Returns
-------
EA
The completed EA object containing the run history and solutions.
"""
console.log("Initializing population...")
# Initialise Body & Hivemind Population
population = [create_random_individual() for _ in range(POP_SIZE)]
# Initial Eval
population = pop_learn(population)
# Define Evolution Loop
# Operators work for both NDEs and Network Weight Vectors
ops = [
# Default EA operators
EAStep(
"parent_selection", parent_selection
), # Select parents for bodies
EAStep("crossover", crossover), # Crossover Body
EAStep("mutation", mutation), # Mutation Body
# Learning acts as a fitness function
EAStep("pop_learning", pop_learn), # Do learning for all the bodies
EAStep("survivor_selection", survivor_selection),
]
# Initialise EA object
ea = EA(
population,
operations=ops,
num_of_generations=NUM_GENERATIONS,
)
ea.run()
return ea
[21:16:10] Initializing population... 839717354.py:3
[21:16:12] Database file exists at d:\University\EC TA\ariel\docs\source\EA_intro\__data__\database.db! a004.py:105 Behaviour is set to: 'delete' --> β οΈ Deleting file!
βββββββββββββββββββββββββββββββββββββββββββββββββ EA Initialised ββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββ EA Finished Running βββββββββββββββββββββββββββββββββββββββββββββββ
[21:16:25] Code took 15.082 seconds to run 839717354.py:48
[ ]:
def main() -> EA:
"""Is the main entry loop to the code."""
return evolve()
start = time.time()
ea = main()
best_fitness = ea.get_solution("best", only_alive=False).fitness
console.log(f"Best fitness found: {best_fitness:.3f}")
console.log("Best fitness possible: 0")
end = time.time()
time_taken = end - start
# Literally just to see the results better while testing
if time_taken < 60:
console.log(f"Code took {time_taken:.3f} seconds to run")
elif time_taken < 60 * 60:
console.log(f"Code took {time_taken / 60:.3f} minutes to run")
else:
console.log(f"Code took {time_taken / (60 * 60):.3f} hours to run")