Robot Evolution

How does ARIEL EC module work?

The ARIEL EC (Evolutionary Computing) module works a bit different than other EAs (Evolutionary Algorithms). While other EAs represent the population as a simple list of individuals, here they the population is made as its own class, with a population being type of list[Individuals].

Similarly, in traditional EA architecture an individual is chosen for certain operators (for example parent selection) according to some criteria, put into a separate list and then given to the operator function. ARIEL works by giving individuals what we call tags. An individual has tags that can be toggled, which qualify it for any and all operations. The tag can be whether an individual can crossover or mutate in the future, but it can also show if it can enter the learning cycle.

The tags can be changed at all times, and default values for each tag can be given to more closely represent a traditional EA structure.

Additionally, ARIEL utilizes an SQL database to handle the variables and outputs of the code. This makes the code run faster, but it adds an extra step to the process.

This file demonstrates the process of initializing an EA class and running it for a complex problem, in our case, evolving the morphology and controllers of a robot in simulation.

[29]:
# Imports
import random
import time
from pathlib import Path
from typing import TYPE_CHECKING

import mujoco as mj

# Learning EA
import nevergrad as ng

# from mujoco import viewer
import numpy as np

# Ray for parallelisation
import torch

# Type Checking
# Pretty little errors and progress bars
from rich.console import Console
from rich.traceback import install

# Import torch for brain controller
from torch import nn

from ariel.body_phenotypes.robogen_lite.constructor import (
    construct_mjspec_from_graph,
)
from ariel.body_phenotypes.robogen_lite.decoders.hi_prob_decoding import (
    HighProbabilityDecoder,
)

# Ariel Imports
# New EA engine (ea.py)
from ariel.ec import (
    EA,
    Crossover,
    EAOperation,
    EASettings,
    Individual,
    Population,
    config,
)

# Body imports
from ariel.ec.genotypes.nde import NeuralDevelopmentalEncoding
from ariel.simulation.controllers.controller import Controller, Tracker
from ariel.simulation.controllers.utils.data_get import get_state_from_data

# Import World
from ariel.simulation.environments import (
    SimpleFlatWorld,
)
from ariel.utils.runners import simple_runner

if TYPE_CHECKING:
    from networkx import DiGraph

# Initialize rich console and traceback handler
install()
console = Console()

Set up paths and configuration

[30]:
# Will probably have to fix the paths at some point
CWD = Path.cwd()
DATA = Path(CWD / "__data__" / "Robot_Evolution")
DATA.mkdir(exist_ok=True)

# Show cwd
# print(f"Current working directory: {CWD}")
# print(f"Saving data to {DATA}")
[31]:
# A seed is optional, but it helps with reproducibility
SEED = None  # e.g., 42

NUM_MODULES = 20
GENE_SIZE = 64  # default is 64, change it in the decoder/NDE
GENE_RANGE = 64
POP_SIZE = 5
NUM_GENERATIONS = 2

# Initialize RNG
RNG = np.random.default_rng(SEED)
[32]:
# Set config
config = EASettings(
    is_maximisation=False,
    db_handling="delete",
    target_population_size=10,
    output_folder=DATA,
    db_file_name="database.db",
)

Define Neural Developmental Encoding (NDE) and Network

[33]:
NDE = NeuralDevelopmentalEncoding(
    number_of_modules=NUM_MODULES,  # Seems to be a good value
    genotype_size=64,
)
torch.save(NDE.state_dict(), DATA / "NDE.pth")
[34]:
class Network(nn.Module):
    def __init__(
        self,
        input_size: int,
        output_size: int,
        hidden_size: int,
    ) -> None:
        super().__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc4 = nn.Linear(hidden_size, output_size)

        self.hidden_activation = nn.ELU()
        self.output_activation = nn.Tanh()

        self.input = input_size

        # Disable gradients for all parameters
        for param in self.parameters():
            param.requires_grad = False

    @torch.inference_mode()
    def forward(self, model, data):
        x = torch.Tensor(get_state_from_data(data))

        x = self.hidden_activation(self.fc1(x))
        x = self.hidden_activation(self.fc2(x))
        x = self.output_activation(self.fc4(x)) * (torch.pi / 2)

        return x.detach().numpy()


@torch.no_grad()
def fill_parameters(net: nn.Module, vector: torch.Tensor) -> None:
    """Fill the parameters of a torch module (net) from a 1-D vector.

    No gradient information is kept.

    The vector's length must be exactly the same with the number
    of parameters of the PyTorch module.

    Parameters
    ----------
        net: nn.Module
            The torch module whose parameter values will be filled.
        vector: torch.Tensor
            A 1-D torch tensor which stores the parameter values.

    """
    address = 0
    for p in net.parameters():
        d = p.data.view(-1)
        n = len(d)
        d[:] = torch.as_tensor(vector[address : address + n], device=d.device)
        address += n

Define fitness and learning functions

[35]:
# Currently Completed
def create_random_individual() -> Individual:
    """Create and initialise a random BODY individual.

    Returns
    -------
    ind: Individual
        A newly created individual with a randomized genotype.
    """
    ind = Individual()
    ind.genotype = RNG.normal(
        loc=0,
        scale=64,
        size=(3, GENE_SIZE),
    ).tolist()

    return ind


# Currently Completed
def gene_to_robot(genotype, num_modules: int = 20) -> mj.MjSpec:
    """Generate a robot specification from a gene.

    Parameters
    ----------
    genotype: list or array-like
        The genotype of the individual to decode.
    num_modules: int, default=20
        The number of modules to use in the HighProbabilityDecoder.

    Returns
    -------
    mujoco.MjSpec
        The generated MuJoCo specification for the robot.
    """
    p_matrices = NDE.forward(genotype)

    # Decode the high-probability graph
    hpd = HighProbabilityDecoder(num_modules)
    robot_graph: DiGraph = hpd.probability_matrices_to_graph(
        p_matrices[0],
        p_matrices[1],
        p_matrices[2],
    )

    robot_spec = construct_mjspec_from_graph(robot_graph)
    return robot_spec.spec


# Currently Completed
def parent_selection(population: Population) -> Population:
    """Tournament Selection.

    Selects parents for the next generation using a tournament selection
    mechanism. Tags the winners with 'ps' (parent selection).

    Parameters
    ----------
    population : Population
        The current population of individuals.

    Returns
    -------
    Population
        The updated population with selected parents tagged.
    """
    tournament_size: int = 3

    # Ensure all individuals have a tags dict and reset parent-selection tag
    for ind in population:
        ind.tags["ps"] = False

    # Decide how many parents we want (even number)
    num_parents = (len(population) // 2) * 2
    if num_parents == 0 and len(population) >= 2:
        num_parents = 2

    for _ in range(num_parents):
        # sample competitors with replacement
        competitors = [
            random.choice(population) for _ in range(tournament_size)
        ]

        # pick best competitor depending on maximisation/minimisation
        if config.is_maximisation:
            winner = max(competitors, key=lambda ind: ind.fitness)
        else:
            winner = min(competitors, key=lambda ind: ind.fitness)

        winner.tags["ps"] = True

    return population


# Currently Completed
def crossover(population: Population) -> Population:
    """One point crossover.

    Performs one-point crossover on individuals tagged for parent
    selection ('ps'). Generates children and appends them to the
    population with a 'mut' tag.

    Parameters
    ----------
    population : Population
        The current population containing selected parents.

    Returns
    -------
    Population
        The population extended with the newly created children.
    """
    parents = [ind for ind in population if ind.tags.get("ps", False)]
    for idx in range(0, len(parents), 2):
        parent_i = parents[idx]
        parent_j = parents[idx]
        genotype_i, genotype_j = Crossover.one_point(
            parent_i.genotype,
            parent_j.genotype,
        )

        # First child
        child_i = Individual()
        child_i.genotype = genotype_i
        child_i.tags = {"mut": True}
        child_i.requires_eval = True

        # Second child
        child_j = Individual()
        child_j.genotype = genotype_j
        child_j.tags = {"mut": True}
        child_j.requires_eval = True

        # Add children to population
        population.extend([child_i, child_j])

    return population


# Currently Completed
def mutation(population: Population) -> Population:
    """"Gaussian mutation.

    Applies Gaussian mutation to individuals tagged for mutation ('mut').

    Parameters
    ----------
    population : Population
        The current population containing children to be mutated.

    Returns
    -------
    Population
        The population with mutated individuals.
    """
    for ind in population:
        if ind.tags.get("mut", False):
            gene_shape = np.array(ind.genotype).shape
            genes = np.array(ind.genotype).flatten().copy()

            if random.random() < 0.7:
                mutated = genes + RNG.normal(
                    loc=0,
                    scale=4,
                    size=len(genes),
                )
            else:
                mutated = genes.copy()

            ind.genotype = mutated.reshape(gene_shape).astype(float).tolist()
    return population


# Currently Completed
def survivor_selection(population: Population) -> Population:
    """Tournament Survivor Selection.

    Kills off individuals based on a tournament selection until the
    population size is reduced back to the target size.

    Parameters
    ----------
    population : Population
        The current population including parents and children.

    Returns
    -------
    Population
        The surviving population.
    """
    tournament_size: int = 5

    pop_len = len(population)

    for _ in range(POP_SIZE):
        # Sample competitors with replacement
        pop_alive = [ind for ind in population if ind.alive is True]
        death_candidates = [
            # RNG.choice(pop_alive) for _ in range(tournament_size)
            random.choice(pop_alive)
            for _ in range(tournament_size)
        ]

        # Pick best competitor depending on maximisation/minimisation
        if config.is_maximisation:
            about_to_be_killed_lol = min(
                death_candidates,
                key=lambda ind: ind.fitness,
            )
        else:
            about_to_be_killed_lol = max(
                death_candidates,
                key=lambda ind: ind.fitness,
            )

        about_to_be_killed_lol.alive = False

        pop_len -= 1
        if pop_len <= POP_SIZE:
            break

    return population


def individual_learn(individual: Individual) -> float:
    """Perform learning for one individual.

    Evaluates an individual by decoding its genotype into a MuJoCo robot
    specification, setting up the simulation, and learning optimal controller
    weights using CMA-ES via Nevergrad.

    Parameters
    ----------
    individual: Individual
        The individual whose fitness and controller are to be evaluated.

    Returns
    -------
    float
        The minimum fitness (distance) achieved during the learning budget.
    """
    p_matrices = NDE.forward(np.array(individual.genotype))

    # Hardcoded num_modules=20 based on your global var, or pass it in if dynamic
    hpd = HighProbabilityDecoder(num_modules=20)
    robot_graph = hpd.probability_matrices_to_graph(
        p_matrices[0],
        p_matrices[1],
        p_matrices[2],
    )
    robot_spec = construct_mjspec_from_graph(robot_graph).spec

    # 2. Simulation Setup (Logic from evaluate_single)
    mj.set_mjcb_control(None)
    world = SimpleFlatWorld()
    world.spawn(
        robot_spec,
        position=(0.0, 0.0, 0.1),
        correct_collision_with_floor=True,
    )

    model = world.spec.compile()
    data = mj.MjData(model)

    net = Network(
        input_size=len(get_state_from_data(data)),
        output_size=model.nu,
        hidden_size=32,
    )

    # Generate weights for vec
    num_vars: int = sum(p.numel() for p in net.parameters())

    pop_size = 30
    budget = 50

    min_fit = np.inf

    param = ng.p.Array(shape=(num_vars,))
    temp_vec_learner = ng.optimizers.registry["CMA"](
        parametrization=param, budget=(pop_size * budget),
    )

    tracker = Tracker(
        name_to_bind="core",
        observable_attributes=["xpos"],
        quiet=True,
    )

    tracker.setup(world.spec, data)

    controller = Controller(
        controller_callback_function=net.forward, tracker=tracker
    )
    for _ in range(budget):
        vecs = [temp_vec_learner.ask() for _ in range(pop_size)]

        for vec_candidate in vecs:
            vec = vec_candidate.value
            # 3. Network Construction
            fill_parameters(net, vec)

            mj.mj_resetData(model, data)

            mj.set_mjcb_control(controller.set_control)
            # 4. Run Simulation
            simple_runner(model, data, duration=15)

            # 5. Calculate Fitness
            xc, yc, zc = data.qpos[0:3].copy()
            xt, yt, zt = (2.0, 0.0, 0.1)
            fitness = np.sqrt((xt - xc) ** 2 + (yt - yc) ** 2 + (zt - zc) ** 2)

            temp_vec_learner.tell(vec_candidate, fitness)

            min_fit = min(min_fit, fitness)

    return min_fit


def pop_learn(population: Population) -> Population:
    """Do learning for the entire population.

    Iterates over the population and evaluates the fitness of each individual
    by performing a learning cycle.

    Parameters
    ----------
    population : Population
        The current population to be evaluated.

    Returns
    -------
    Population
        The evaluated population with updated fitness values.
    """
    for ind in population:
        ind.fitness = individual_learn(ind)

    return population

Define evolutionary loop

[36]:
def evolve() -> EA:
    """Entry point for the evolutionary algorithm.

    Initializes the population, evaluates it, defines the evolutionary
    steps (parent selection, crossover, mutation, learning, survivor selection),
    and runs the EA.

    Returns
    -------
    EA
        The completed EA object containing the run history and solutions.
    """
    console.log("Initializing population...")

    # Initialise Body & Hivemind Population
    population_list = [create_random_individual() for _ in range(POP_SIZE)]

    # Initial Eval
    population_list = pop_learn(Population(population_list)).to_list()

    # Define Evolution Loop
    # Operators work for both NDEs and Network Weight Vectors
    ops = [
        # Default EA operators
        EAOperation(parent_selection),  # Select parents for bodies
        EAOperation(crossover),  # Crossover Body
        EAOperation(mutation),  # Mutation Body
        # Learning acts as a fitness function
        EAOperation(pop_learn),  # Do learning for all the bodies
        EAOperation(survivor_selection),
    ]

    # Initialise EA object
    ea = EA(
        Population(population_list),
        operations=ops,
        num_steps=NUM_GENERATIONS,
        db_file_path=config.db_file_path,
        db_handling="delete",
    )
    ea.run()

    return ea

[37]:
def main() -> EA:
    """Is the main entry loop to the code."""
    return evolve()


start = time.time()

ea = main()

best_fitness = ea.get_solution("best", only_alive=False).fitness

console.log(f"Best fitness found: {best_fitness:.3f}")
console.log("Best fitness possible: 0")


end = time.time()

time_taken = end - start

# Literally just to see the results better while testing
if time_taken < 60:
    console.log(f"Code took {time_taken:.3f} seconds to run")
elif time_taken < 60 * 60:
    console.log(f"Code took {time_taken / 60:.3f} minutes to run")
else:
    console.log(f"Code took {time_taken / (60 * 60):.3f} hours to run")

[13:24:28] Initializing population...                                                              1640256329.py:13
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
 in <module>:8                                                                                    
                                                                                                  
    5                                                                                             
    6 start = time.time()                                                                         
    7                                                                                             
  8 ea = main()                                                                                 
    9                                                                                             
   10 best_fitness = ea.get_solution("best", only_alive=False).fitness                            
   11                                                                                             
                                                                                                  
 in main:3                                                                                        
                                                                                                  
    1 def main() -> EA:                                                                           
    2 """Is the main entry loop to the code."""                                               
  3 return evolve()                                                                         
    4                                                                                             
    5                                                                                             
    6 start = time.time()                                                                         
                                                                                                  
 in evolve:19                                                                                     
                                                                                                  
   16 population_list = [create_random_individual() for _ in range(POP_SIZE)]                 
   17                                                                                         
   18 # Initial Eval                                                                          
 19 population_list = pop_learn(Population(population_list)).to_list()                      
   20                                                                                         
   21 # Define Evolution Loop                                                                 
   22 # Operators work for both NDEs and Network Weight Vectors                               
                                                                                                  
 in pop_learn:338                                                                                 
                                                                                                  
   335 │   │   The evaluated population with updated fitness values.                              
   336 """                                                                                    
   337 for ind in population:                                                                 
 338 │   │   ind.fitness = individual_learn(ind)                                                
   339                                                                                        
   340 return population                                                                      
   341                                                                                            
                                                                                                  
 in individual_learn:307                                                                          
                                                                                                  
   304 │   │   │                                                                                  
   305 │   │   │   mj.set_mjcb_control(controller.set_control)                                    
   306 │   │   │   # 4. Run Simulation                                                            
 307 │   │   │   simple_runner(model, data, duration=15)                                        
   308 │   │   │                                                                                  
   309 │   │   │   # 5. Calculate Fitness                                                         
   310 │   │   │   xc, yc, zc = data.qpos[0:3].copy()                                             
                                                                                                  
 /home/aronf/Desktop/EvoDevo/pure-ariel/ariel/src/ariel/utils/runners.py:41 in simple_runner      
                                                                                                  
   38 data.ctrl = RNG.normal(scale=0.1, size=model.nu)                                        
   39                                                                                         
   40 while data.time < duration:                                                             
 41 │   │   mujoco.mj_step(model, data, nstep=steps_per_loop)                                   
   42                                                                                             
                                                                                                  
 /home/aronf/Desktop/EvoDevo/pure-ariel/ariel/src/ariel/simulation/controllers/controller.py:87   
 in set_control                                                                                   
                                                                                                  
   84 │   │   │   data.ctrl = np.clip(new_ctrl, -np.pi / 2, np.pi / 2)                            
   85 │   │   │                                                                                   
   86 │   │   │   # Check if there are any NaN values in the control signal                       
 87 │   │   │   if np.any(np.isnan(data.ctrl)):                                                 
   88 │   │   │   │   msg = "NaN values detected in the control signal.\n"                        
   89 │   │   │   │   msg += f"{data.ctrl}"                                                       
   90 │   │   │   │   raise ValueError(msg)                                                       
                                                                                                  
 /home/aronf/Desktop/EvoDevo/pure-ariel/ariel/.venv/lib/python3.12/site-packages/numpy/_core/from 
 numeric.py:2477 in any                                                                           
                                                                                                  
   2474 return (a, where, out)                                                                
   2475                                                                                           
   2476                                                                                           
 2477 @array_function_dispatch(_any_dispatcher)                                                 
   2478 def any(a, axis=None, out=None, keepdims=np._NoValue, *, where=np._NoValue):              
   2479 """                                                                                   
   2480 Test whether any array element along a given axis evaluates to True.                  
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
KeyboardInterrupt