Parallelising Robot Evaluation with MuJoCo WorkerΒΆ
Evaluating fitness is almost always the bottleneck in an evolutionary robotics run β every robot has to be simulated before selection can proceed, and simulations are completely independent of each other, making them a perfect target for parallelisation.
This notebook shows how to use MuJoCoWorkerBase to distribute robot evaluation across all available CPU cores using Pythonβs built-in multiprocessing module. The approach requires no extra dependencies and is designed specifically for MuJoCo simulation workloads.
The pattern is:
Subclass
MuJoCoWorkerBaseand implementevaluate()with your fitness logic.Pass an instance of your subclass directly to
pool.imapβ it is a callable class, so it pickles cleanly with thespawncontext.Collect results and assign fitness values back to the population.
Part 1: Serial BaselineΒΆ
This is what a robot evaluation loop looks like without parallelisation. Run it first to understand the structure before the parallel version is introduced.
ImportsΒΆ
[1]:
import time
import mujoco as mj
import nevergrad as ng
import numpy as np
import torch
from rich.console import Console
from ariel.body_phenotypes.robogen_lite.constructor import (
construct_mjspec_from_graph,
)
from ariel.body_phenotypes.robogen_lite.decoders.hi_prob_decoding import (
HighProbabilityDecoder,
)
from ariel.ec import Individual, Population
from ariel.ec.genotypes.nde import NeuralDevelopmentalEncoding
from ariel.simulation.environments import SimpleFlatWorld
console = Console()
Encoding setupΒΆ
We use a NeuralDevelopmentalEncoding to map a genotype vector to a robot morphology graph, then HighProbabilityDecoder to extract the graph, and construct_mjspec_from_graph to build the MuJoCo XML.
[ ]:
NUM_MODULES = 10
GENE_SIZE = 64
SEED = 42
rng = np.random.default_rng(SEED)
nde = NeuralDevelopmentalEncoding(number_of_modules=NUM_MODULES, genotype_size=GENE_SIZE)
hpd = HighProbabilityDecoder(num_modules=NUM_MODULES)
SPAWN = (0.0, 0.0, 0.1)
TARGET = (2.0, 0.0, 0.1)
def genotype_to_xml(genotype: list[list[float]]) -> str:
"""Convert a raw genotype to a MuJoCo XML string (world + robot)."""
tensor = torch.tensor(genotype, dtype=torch.float32)
p_mats = nde.forward(tensor)
graph = hpd.probability_matrices_to_graph(p_mats[0], p_mats[1], p_mats[2])
spec_obj = construct_mjspec_from_graph(graph)
world = SimpleFlatWorld()
world.spawn(spec_obj.spec, position=SPAWN, correct_collision_with_floor=True)
return world.spec.to_xml()
def create_individual() -> Individual:
ind = Individual()
ind.genotype = rng.normal(loc=0, scale=64, size=(3, GENE_SIZE)).tolist()
return ind
Fitness functionΒΆ
A minimal ANN controller is optimised with CMA-ES for each robot. Fitness is the final distance to the target (lower is better).
[ ]:
CMA_GEN = 10
CMA_POP = 5
class _Network:
def __init__(self, input_size: int, hidden_size: int, output_size: int) -> None:
self.W1 = torch.randn(hidden_size, input_size)
self.b1 = torch.zeros(hidden_size)
self.W2 = torch.randn(output_size, hidden_size)
self.b2 = torch.zeros(output_size)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = torch.relu(self.W1 @ x + self.b1)
return torch.tanh(self.W2 @ x + self.b2)
def parameters(self) -> list[torch.Tensor]:
return [self.W1, self.b1, self.W2, self.b2]
def _fill(net: _Network, params: np.ndarray) -> None:
offset = 0
for p in net.parameters():
n = p.numel()
p.data = torch.tensor(params[offset:offset + n], dtype=torch.float32).reshape(p.shape)
offset += n
def evaluate_robot_serial(xml: str, seed: int | None = None) -> float:
"""Evaluate a single robot. Returns distance to target (lower = better)."""
local_rng = np.random.default_rng(seed)
try:
model = mj.MjModel.from_xml_string(xml)
data = mj.MjData(model)
except Exception:
return float(np.linalg.norm(np.array(TARGET) - np.array(SPAWN)) * 10)
state_size = len(data.qpos) + len(data.qvel) + 3
net = _Network(state_size, 16, model.nu)
num_vars = sum(p.numel() for p in net.parameters())
opt = ng.optimizers.CMA(parametrization=num_vars, budget=CMA_GEN * CMA_POP)
best = float("inf")
for _ in range(CMA_GEN):
candidates = [opt.ask() for _ in range(CMA_POP)]
for cand in candidates:
_fill(net, cand.value)
mj.mj_resetData(model, data)
data.ctrl[:] = local_rng.normal(scale=0.1, size=model.nu)
mj.mj_step(model, data, nstep=300)
displacement = data.qpos[:3].copy()
target = tuple(np.array(TARGET) + displacement)
def cb(m, d, _target=target, _net=net) -> None:
pos = d.qpos[:3].copy()
vec = np.array(_target) - pos
dist = np.linalg.norm(vec) + 1e-6
state = np.concatenate([d.qpos.copy(), d.qvel.copy(), vec / dist])
ctrl = _net.forward(torch.tensor(state, dtype=torch.float32))
d.ctrl[:] = ctrl.detach().numpy()[:m.nu]
mj.set_mjcb_control(cb)
mj.mj_step(model, data, nstep=1_000)
mj.set_mjcb_control(None)
fitness = float(np.linalg.norm(np.array(target) - data.qpos[:3]))
opt.tell(cand, fitness)
best = min(best, fitness)
return best
Run β serialΒΆ
[4]:
POP_SIZE = 20
population = Population([create_individual() for _ in range(POP_SIZE)])
xml_strings = [genotype_to_xml(ind.genotype) for ind in population]
t0 = time.perf_counter()
fitnesses_serial = [evaluate_robot_serial(xml, seed=SEED + i) for i, xml in enumerate(xml_strings)]
elapsed_serial = time.perf_counter() - t0
console.print(f"Serial | best={min(fitnesses_serial):.3f} | mean={np.mean(fitnesses_serial):.3f} | time={elapsed_serial:.1f}s")
Serial | best=1.868 | mean=1.973 | time=55.2s
Part 2: Adding Parallelisation with MuJoCo WorkerΒΆ
The changes are entirely confined to the evaluation step. Every other operator β selection, crossover, mutation β stays identical.
Serial |
Parallel |
|
|---|---|---|
Extra import |
β |
|
Worker setup |
β |
Subclass |
Evaluation loop |
|
|
Process management |
β |
|
{note} The `spawn` context is important for MuJoCo: it starts each worker as a fresh Python process, avoiding conflicts with MuJoCo's internal OpenGL context and PyTorch's thread pool.
Step 1 β Import MuJoCoWorkerBase and EvalConfigΒΆ
One new import replaces the manual model-loading boilerplate.
[5]:
# NEW ββ import the base class alongside your other imports
from multiprocessing import get_context
Step 2 β Define the worker in a separate moduleΒΆ
On Windows, multiprocessing uses the spawn start method: each worker process starts as a fresh Python interpreter and must import your worker class by name before it can unpickle it. Classes defined inside a Jupyter notebook cell live in __main__, which spawned processes cannot import.
The fix is to put your worker class in a plain .py file alongside your notebook β the same pattern used by parallel_ackley.ipynb for fitness_plot.py. Worker processes can import a .py file on disk; they cannot access the notebook kernelβs memory.
The companion file for this notebook is locomotion_worker.py. It contains LocomotionConfig and TargetedLocomotionWorker as module-level classes β identical to what you would write inline, just saved to a file.
[6]:
# NEW ββ import from a .py file so spawned workers can find the class
from locomotion_worker import LocomotionConfig, TargetedLocomotionWorker
Step 3 β Replace the for loop with pool.imapΒΆ
Instead of calling evaluate_robot_serial one at a time, we:
Build a list of
(xml_string, config)tuples β one per individual.Open a
Poolwith thespawncontext and callpool.imap, passing the worker instance as the map function.Collect results in order as they complete.
# BEFORE (serial)
fitnesses = [evaluate_robot_serial(xml, seed=i) for i, xml in enumerate(xml_strings)]
# AFTER (parallel)
eval_args = [(xml, config) for xml in xml_strings]
ctx = get_context("spawn")
with ctx.Pool(processes=NUM_WORKERS) as pool:
fitnesses = list(pool.imap(worker, eval_args, chunksize=1))
pool.imap streams results back as workers finish and preserves the original order, so fitness values align with the population list without extra bookkeeping.
Part 3: Run the Parallel VersionΒΆ
Instantiate the worker once, build the argument list, and hand both to pool.imap.
[7]:
NUM_WORKERS = 8
worker = TargetedLocomotionWorker()
configs = [
LocomotionConfig(
spawn_position=SPAWN,
target_position=TARGET,
seed=SEED + i,
)
for i in range(POP_SIZE)
]
eval_args = list(zip(xml_strings, configs, strict=False))
ctx = get_context("spawn")
t0 = time.perf_counter()
with ctx.Pool(processes=NUM_WORKERS) as pool:
fitnesses_parallel = list(pool.imap(worker, eval_args, chunksize=1))
elapsed_parallel = time.perf_counter() - t0
console.print(f"Parallel | best={min(fitnesses_parallel):.3f} | mean={np.mean(fitnesses_parallel):.3f} | time={elapsed_parallel:.1f}s")
console.print(f"Speedup | {elapsed_serial / elapsed_parallel:.1f}Γ")
Parallel | best=1.863 | mean=1.969 | time=17.5s
Speedup | 3.2Γ
Part 4: Side-by-Side DiffΒΆ
These are the only lines that differ between the serial and parallel versions.
# ββ Serial βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def evaluate_robot_serial(xml: str, seed=None) -> float:
model = mj.MjModel.from_xml_string(xml) # manual model loading
data = mj.MjData(model)
# ... fitness logic ...
fitnesses = [evaluate_robot_serial(xml, seed=i) for i, xml in enumerate(xml_strings)]
# ββ Parallel βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
+ from ariel.simulation.mujoco_worker import MuJoCoWorkerBase
+ from locomotion_worker import LocomotionConfig, TargetedLocomotionWorker # see note on spawn
+ from multiprocessing import get_context
# TargetedLocomotionWorker is defined in locomotion_worker.py (not inline)
# β worker processes need to import it from a file on disk
+ worker = TargetedLocomotionWorker()
+ eval_args = list(zip(xml_strings, configs))
+ ctx = get_context("spawn")
with ctx.Pool(processes=NUM_WORKERS) as pool:
+ fitnesses = list(pool.imap(worker, eval_args, chunksize=1))
Part 5: Tips and Common PitfallsΒΆ
spawn is the start method), worker processes start as fresh Python interpreters. They import your class by module name β they cannot access the Jupyter kernelβs __main__. Any class defined in a notebook cell will fail to unpickle in the worker. The fix is always the same: move the class to a .py file next to your notebook and import it.chunksize=1 lets workers pick up new tasks as soon as they finish rather than waiting for a batch, keeping all cores busy.spawn context starts each worker as a completely fresh Python process, avoiding subtle corruption that can occur with fork. This is why get_context("spawn") is used instead of multiprocessing.Pool directly.EvalConfig.MuJoCoWorkerBase.__call__ limits PyTorch, OpenMP, MKL, and OpenBLAS to a single thread per worker before calling evaluate(). Without this, each worker would spawn its own thread pool, causing severe oversubscription β 8 workers Γ 8 OMP threads = 64 threads competing for the same cores, resulting in very low CPU utilisation and a parallel run slower than serial.Pool usage to the evaluation step.