Parallelising ARIEL with RayΒΆ
Evaluating fitness is almost always the bottleneck in an EA β every individual has to be scored before selection can proceed, and evaluations are completely independent of each other, making them a perfect target for parallelisation.
This notebook starts from the serial Ackley example and walks through exactly what needs to change to distribute evaluation across all available CPU cores using Ray. The diff is small β roughly four lines added and one function rewritten.
Part 1: Serial BaselineΒΆ
This is the original, unmodified Ackley setup. Run it first so you have a serial benchmark to compare against.
ImportsΒΆ
[ ]:
# Standard library
import random
import time
from typing import cast
# Third-party
import numpy as np
# Pretty errors and progress bars
from rich.console import Console
from rich.traceback import install
# ARIEL
from ariel.ec.a001 import Individual
from ariel.ec.a004 import EA, EASettings, EAStep, Population
from ariel.ec.a005 import Crossover
install()
console = Console()
Fitness functionΒΆ
[ ]:
def Ackley(x):
"""source: https://www.sfu.ca/~ssurjano/ackley.html."""
x = np.array(x)
a, b, c = 20, 0.2, 2 * np.pi
dimension = len(x)
term1 = -a * np.exp(-b * np.sqrt(sum(x**2) / dimension))
term2 = -np.exp(sum(np.cos(c * xi) for xi in x) / dimension)
return term1 + term2 + a + np.exp(1)
def evaluate_ind(ind: Individual) -> float:
return Ackley(cast("list[float]", ind.genotype))
def evaluate_pop_serial(population: Population) -> Population:
"""Serial evaluation β one individual at a time."""
for ind in population:
if ind.requires_eval:
ind.fitness = evaluate_ind(ind)
return population
EA operatorsΒΆ
[ ]:
SEED = None
RNG = np.random.default_rng(SEED)
config = EASettings()
config.is_maximisation = False
config.db_handling = "delete"
config.target_population_size = 100
def create_individual(num_dims) -> Individual:
ind = Individual()
ind.genotype = cast(
"list[float]",
np.random.normal(loc=0, scale=50, size=num_dims).tolist(),
)
return ind
def parent_selection(population: Population) -> Population:
"""Tournament selection."""
tournament_size = 5
for ind in population:
if ind.tags is None:
ind.tags = {}
ind.tags["ps"] = False
num_parents = (len(population) // 2) * 2
winners = []
for _ in range(num_parents):
competitors = [random.choice(population) for _ in range(tournament_size)]
winner = (max if config.is_maximisation else min)(
competitors, key=lambda ind: ind.fitness,
)
winners.append(winner)
for w in winners:
w.tags["ps"] = True
return population
def crossover(population: Population) -> Population:
"""One-point crossover."""
parents = [ind for ind in population if ind.tags.get("ps", False)]
for idx in range(0, len(parents), 2):
g_i, g_j = Crossover.one_point(
cast("list[float]", parents[idx].genotype),
cast("list[float]", parents[idx].genotype),
)
for g in (g_i, g_j):
child = Individual()
child.genotype = g
child.tags = {"mut": True}
child.requires_eval = True
population.append(child)
return population
def mutation(population: Population) -> Population:
for ind in population:
if ind.tags.get("mut", False):
genes = list(ind.genotype)
if random.random() < 0.5:
ind.genotype = [v + random.uniform(-5, 5) for v in genes]
return population
def survivor_selection(population: Population) -> Population:
tournament_size = 5
pop_len = len(population)
for _ in range(config.target_population_size):
alive = [ind for ind in population if ind.alive]
candidates = [random.choice(alive) for _ in range(tournament_size)]
loser = (min if config.is_maximisation else max)(
candidates, key=lambda ind: ind.fitness,
)
loser.alive = False
pop_len -= 1
if pop_len <= config.target_population_size:
break
return population
Run β serialΒΆ
[ ]:
def run(evaluate_fn, label="serial") -> None:
"""Shared entry point used for both serial and parallel runs."""
population = [create_individual(num_dims=10) for _ in range(100)]
population = evaluate_fn(population)
ops = [
EAStep("parent_selection", parent_selection),
EAStep("crossover", crossover),
EAStep("mutation", mutation),
EAStep("evaluation", evaluate_fn),
EAStep("survivor_selection", survivor_selection),
]
t0 = time.perf_counter()
ea = EA(population, operations=ops, num_of_generations=100)
ea.run()
elapsed = time.perf_counter() - t0
best = ea.get_solution("best", only_alive=False)
console.log(f"[{label}] best fitness = {best.fitness_:.4f} | wall time = {elapsed:.2f}s")
run(evaluate_pop_serial, label="serial")
[15:48:10] Database file exists at d:\University\EC TA\ariel\docs\source\EA_intro\__data__\database.db! a004.py:105 Behaviour is set to: 'delete' --> β οΈ Deleting file!
βββββββββββββββββββββββββββββββββββββββββββββββββ EA Initialised ββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββ EA Finished Running βββββββββββββββββββββββββββββββββββββββββββββββ
[15:48:31] best fitness = 4.3451 | wall time = 21.32s 4154822577.py:20
Part 2: Adding Parallelisation with RayΒΆ
The changes are entirely confined to the evaluation step. Every other operator β selection, crossover, mutation β stays identical.
Serial |
Parallel |
|
|---|---|---|
Extra import |
β |
|
Initialise Ray |
β |
|
Mark function for parallel execution |
β |
|
Dispatch & collect results |
|
|
Shutdown (optional) |
β |
|
Step 1 β Import RayΒΆ
One new import at the top of your file.
[9]:
# NEW ββ import ray alongside your other imports
import ray
Step 2 β Initialise RayΒΆ
Call ray.init() once, before the EA runs. Ray starts a pool of worker processes (one per CPU core by default) that will handle the parallel tasks.
ray.init() # use all available cores
ray.init(num_cpus=4) # or cap at a specific number
[10]:
# NEW ββ start Ray (call once, before running the EA)
ray.init()
2026-03-10 15:49:04,640 INFO worker.py:2007 -- Started a local Ray instance.
d:\University\EC TA\ariel\.venv\Lib\site-packages\ray\_private\worker.py:2046: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
warnings.warn(
[10]:
(pid=gcs_server) [2026-03-10 15:49:23,440 E 10480 10040] (gcs_server.exe) gcs_server.cc:303: Failed to establish connection to the event+metrics exporter agent. Events and metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14
Step 3 β Decorate evaluate_ind with @ray.remoteΒΆ
Adding @ray.remote tells Ray that this function can be executed as an independent parallel task on any available worker. The function body is completely unchanged β only the decorator is new.
[11]:
# CHANGED ββ add @ray.remote; the body is identical to the serial version
@ray.remote
def evaluate_ind_remote(ind: Individual) -> float:
return Ackley(cast("list[float]", ind.genotype))
Step 4 β Replace the for loop with parallel dispatchΒΆ
This is where the serial and parallel versions diverge. Instead of scoring individuals one at a time, we:
Call
.remote()on each individual β this schedules the task and immediately returns a future (anObjectRef) without blocking.Pass all futures to
ray.get()at once β this blocks until every task has finished and returns the results as a plain list, in the same order.
# BEFORE (serial)
for ind in population:
if ind.requires_eval:
ind.fitness = evaluate_ind(ind)
# AFTER (parallel)
to_eval = [ind for ind in population if ind.requires_eval]
futures = [evaluate_ind_remote.remote(ind) for ind in to_eval] # schedule all
results = ray.get(futures) # wait for all
for ind, fitness in zip(to_eval, results):
ind.fitness = fitness
All evaluations run concurrently. ray.get only returns once the last one finishes, so the rest of the EA loop sees a fully-evaluated population, exactly as before.
[ ]:
# CHANGED ββ parallel evaluate_pop using Ray
def evaluate_pop_parallel(population: Population) -> Population:
"""Parallel evaluation β all individuals scored simultaneously via Ray."""
to_eval = [ind for ind in population if ind.requires_eval]
if not to_eval:
return population
# Schedule every evaluation as a non-blocking remote task
futures = [evaluate_ind_remote.remote(ind) for ind in to_eval]
# Block until all tasks are done, then collect results in order
results = ray.get(futures)
for ind, fitness in zip(to_eval, results, strict=False):
ind.fitness = fitness
return population
Part 3: Run the Parallel VersionΒΆ
Pass evaluate_pop_parallel into the same run() helper. Everything else is identical.
[13]:
run(evaluate_pop_parallel, label="parallel")
# Tidy up Ray workers when you are done (optional)
ray.shutdown()
[15:49:35] Database file exists at d:\University\EC TA\ariel\docs\source\EA_intro\__data__\database.db! a004.py:105 Behaviour is set to: 'delete' --> β οΈ Deleting file!
βββββββββββββββββββββββββββββββββββββββββββββββββ EA Initialised ββββββββββββββββββββββββββββββββββββββββββββββββββ
(raylet) [2026-03-10 15:49:38,384 E 14360 17780] (raylet.exe) main.cc:1032: Failed to establish connection to the
metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to
initialize the metrics agent. rpc_code: 14
(evaluate_ind_remote pid=20964) [2026-03-10 15:49:51,630 E 20964 14808] core_worker_process.cc:842: Failed to
establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError:
Running out of retries to initialize the metrics agent. rpc_code: 14
βββββββββββββββββββββββββββββββββββββββββββββββ EA Finished Running βββββββββββββββββββββββββββββββββββββββββββββββ
[15:50:08] best fitness = 4.8189 | wall time = 32.37s 4154822577.py:20
(evaluate_ind_remote pid=2112) [2026-03-10 15:49:54,922 E 2112 11988] core_worker_process.cc:842: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14 [repeated 11x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options.)
Part 4: Side-by-Side DiffΒΆ
These are the only lines that differ between the serial and parallel versions.
# ββ Serial βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def evaluate_ind(ind: Individual) -> float:
return Ackley(cast("list[float]", ind.genotype))
def evaluate_pop_serial(population: Population) -> Population:
for ind in population:
if ind.requires_eval:
ind.fitness = evaluate_ind(ind)
return population
# ββ Parallel βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
+ import ray
+ ray.init()
+ @ray.remote
def evaluate_ind_remote(ind: Individual) -> float:
return Ackley(cast("list[float]", ind.genotype)) # body unchanged
def evaluate_pop_parallel(population: Population) -> Population:
to_eval = [ind for ind in population if ind.requires_eval]
if not to_eval:
return population
+ futures = [evaluate_ind_remote.remote(ind) for ind in to_eval]
+ results = ray.get(futures)
for ind, fitness in zip(to_eval, results):
ind.fitness = fitness
return population
Part 5: Tips and Common PitfallsΒΆ
When is parallelisation worth it?
Ray adds a small fixed overhead to spin up workers and serialise data between processes. For very cheap fitness functions (like Ackley on 10 dimensions) the gain may be modest. The benefit grows with evaluation cost β MuJoCo simulation, neural-network training, or any physics-based fitness will see large speedups.
Keep ``evaluate_ind`` stateless.
Ray workers are separate processes; they do not share memory with the main process. Write evaluation functions that depend only on their arguments and avoid mutating global state.
Only parallelise evaluation.
Selection, crossover, and mutation operate on the shared population list sequentially in ARIEL by design. Parallelising them adds synchronisation overhead with very little benefit. Confine Ray usage to
evaluate_pop.
Call ``ray.init()`` once.
Calling
ray.init()when Ray is already running will raise a warning or error. If you need to restart (e.g. to changenum_cpus), callray.shutdown()first.
Installation.
Ray is by default included in the ARIEL environment
[2]:
import numpy as np
a = np.array([[1, 2, 3, 10],
[4, 5, 6, 11],
[7, 8, 9, 12],
])
print(a.mean(axis=0))
print(a.mean(axis=1))
[ 4. 5. 6. 11.]
[4. 6.5 9. ]
[ ]: