Parallelising ARIEL with RayΒΆ

Evaluating fitness is almost always the bottleneck in an EA β€” every individual has to be scored before selection can proceed, and evaluations are completely independent of each other, making them a perfect target for parallelisation.

This notebook starts from the serial Ackley example and walks through exactly what needs to change to distribute evaluation across all available CPU cores using Ray. The diff is small β€” roughly four lines added and one function rewritten.

Part 1: Serial BaselineΒΆ

This is the original, unmodified Ackley setup. Run it first so you have a serial benchmark to compare against.

ImportsΒΆ

[ ]:
# Standard library
import random
import time
from typing import cast

# Third-party
import numpy as np

# Pretty errors and progress bars
from rich.console import Console
from rich.traceback import install

# ARIEL
from ariel.ec.a001 import Individual
from ariel.ec.a004 import EA, EASettings, EAStep, Population
from ariel.ec.a005 import Crossover

install()
console = Console()

Fitness functionΒΆ

[ ]:
def Ackley(x):
    """source: https://www.sfu.ca/~ssurjano/ackley.html."""
    x = np.array(x)
    a, b, c = 20, 0.2, 2 * np.pi
    dimension = len(x)
    term1 = -a * np.exp(-b * np.sqrt(sum(x**2) / dimension))
    term2 = -np.exp(sum(np.cos(c * xi) for xi in x) / dimension)
    return term1 + term2 + a + np.exp(1)


def evaluate_ind(ind: Individual) -> float:
    return Ackley(cast("list[float]", ind.genotype))


def evaluate_pop_serial(population: Population) -> Population:
    """Serial evaluation β€” one individual at a time."""
    for ind in population:
        if ind.requires_eval:
            ind.fitness = evaluate_ind(ind)
    return population

EA operatorsΒΆ

[ ]:
SEED = None
RNG = np.random.default_rng(SEED)

config = EASettings()
config.is_maximisation = False
config.db_handling = "delete"
config.target_population_size = 100


def create_individual(num_dims) -> Individual:
    ind = Individual()
    ind.genotype = cast(
        "list[float]",
        np.random.normal(loc=0, scale=50, size=num_dims).tolist(),
    )
    return ind


def parent_selection(population: Population) -> Population:
    """Tournament selection."""
    tournament_size = 5
    for ind in population:
        if ind.tags is None:
            ind.tags = {}
        ind.tags["ps"] = False
    num_parents = (len(population) // 2) * 2
    winners = []
    for _ in range(num_parents):
        competitors = [random.choice(population) for _ in range(tournament_size)]
        winner = (max if config.is_maximisation else min)(
            competitors, key=lambda ind: ind.fitness,
        )
        winners.append(winner)
    for w in winners:
        w.tags["ps"] = True
    return population


def crossover(population: Population) -> Population:
    """One-point crossover."""
    parents = [ind for ind in population if ind.tags.get("ps", False)]
    for idx in range(0, len(parents), 2):
        g_i, g_j = Crossover.one_point(
            cast("list[float]", parents[idx].genotype),
            cast("list[float]", parents[idx].genotype),
        )
        for g in (g_i, g_j):
            child = Individual()
            child.genotype = g
            child.tags = {"mut": True}
            child.requires_eval = True
            population.append(child)
    return population


def mutation(population: Population) -> Population:
    for ind in population:
        if ind.tags.get("mut", False):
            genes = list(ind.genotype)
            if random.random() < 0.5:
                ind.genotype = [v + random.uniform(-5, 5) for v in genes]
    return population


def survivor_selection(population: Population) -> Population:
    tournament_size = 5
    pop_len = len(population)
    for _ in range(config.target_population_size):
        alive = [ind for ind in population if ind.alive]
        candidates = [random.choice(alive) for _ in range(tournament_size)]
        loser = (min if config.is_maximisation else max)(
            candidates, key=lambda ind: ind.fitness,
        )
        loser.alive = False
        pop_len -= 1
        if pop_len <= config.target_population_size:
            break
    return population

Run β€” serialΒΆ

[ ]:
def run(evaluate_fn, label="serial") -> None:
    """Shared entry point used for both serial and parallel runs."""
    population = [create_individual(num_dims=10) for _ in range(100)]
    population = evaluate_fn(population)

    ops = [
        EAStep("parent_selection", parent_selection),
        EAStep("crossover", crossover),
        EAStep("mutation", mutation),
        EAStep("evaluation", evaluate_fn),
        EAStep("survivor_selection", survivor_selection),
    ]

    t0 = time.perf_counter()
    ea = EA(population, operations=ops, num_of_generations=100)
    ea.run()
    elapsed = time.perf_counter() - t0

    best = ea.get_solution("best", only_alive=False)
    console.log(f"[{label}]  best fitness = {best.fitness_:.4f}  |  wall time = {elapsed:.2f}s")


run(evaluate_pop_serial, label="serial")
[15:48:10] Database file exists at d:\University\EC TA\ariel\docs\source\EA_intro\__data__\database.db! a004.py:105
           Behaviour is set to: 'delete' --> ⚠️  Deleting file!                                                     
───────────────────────────────────────────────── EA Initialised ──────────────────────────────────────────────────
─────────────────────────────────────────────── EA Finished Running ───────────────────────────────────────────────
[15:48:31]   best fitness = 4.3451  |  wall time = 21.32s                                          4154822577.py:20

Part 2: Adding Parallelisation with RayΒΆ

The changes are entirely confined to the evaluation step. Every other operator β€” selection, crossover, mutation β€” stays identical.

Serial

Parallel

Extra import

β€”

import ray

Initialise Ray

β€”

ray.init()

Mark function for parallel execution

β€”

@ray.remote decorator

Dispatch & collect results

for loop

ray.get(ray.remote calls)

Shutdown (optional)

β€”

ray.shutdown()

Step 1 β€” Import RayΒΆ

One new import at the top of your file.

[9]:
# NEW ── import ray alongside your other imports
import ray

Step 2 β€” Initialise RayΒΆ

Call ray.init() once, before the EA runs. Ray starts a pool of worker processes (one per CPU core by default) that will handle the parallel tasks.

ray.init()                  # use all available cores
ray.init(num_cpus=4)        # or cap at a specific number
[10]:
# NEW ── start Ray (call once, before running the EA)
ray.init()
2026-03-10 15:49:04,640 INFO worker.py:2007 -- Started a local Ray instance.
d:\University\EC TA\ariel\.venv\Lib\site-packages\ray\_private\worker.py:2046: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(
[10]:
(pid=gcs_server) [2026-03-10 15:49:23,440 E 10480 10040] (gcs_server.exe) gcs_server.cc:303: Failed to establish connection to the event+metrics exporter agent. Events and metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14

Step 3 β€” Decorate evaluate_ind with @ray.remoteΒΆ

Adding @ray.remote tells Ray that this function can be executed as an independent parallel task on any available worker. The function body is completely unchanged β€” only the decorator is new.

[11]:
# CHANGED ── add @ray.remote; the body is identical to the serial version
@ray.remote
def evaluate_ind_remote(ind: Individual) -> float:
    return Ackley(cast("list[float]", ind.genotype))

Step 4 β€” Replace the for loop with parallel dispatchΒΆ

This is where the serial and parallel versions diverge. Instead of scoring individuals one at a time, we:

  1. Call .remote() on each individual β€” this schedules the task and immediately returns a future (an ObjectRef) without blocking.

  2. Pass all futures to ray.get() at once β€” this blocks until every task has finished and returns the results as a plain list, in the same order.

# BEFORE (serial)
for ind in population:
    if ind.requires_eval:
        ind.fitness = evaluate_ind(ind)

# AFTER (parallel)
to_eval  = [ind for ind in population if ind.requires_eval]
futures  = [evaluate_ind_remote.remote(ind) for ind in to_eval]  # schedule all
results  = ray.get(futures)                                        # wait for all
for ind, fitness in zip(to_eval, results):
    ind.fitness = fitness

All evaluations run concurrently. ray.get only returns once the last one finishes, so the rest of the EA loop sees a fully-evaluated population, exactly as before.

[ ]:
# CHANGED ── parallel evaluate_pop using Ray
def evaluate_pop_parallel(population: Population) -> Population:
    """Parallel evaluation β€” all individuals scored simultaneously via Ray."""
    to_eval = [ind for ind in population if ind.requires_eval]
    if not to_eval:
        return population

    # Schedule every evaluation as a non-blocking remote task
    futures = [evaluate_ind_remote.remote(ind) for ind in to_eval]

    # Block until all tasks are done, then collect results in order
    results = ray.get(futures)

    for ind, fitness in zip(to_eval, results, strict=False):
        ind.fitness = fitness

    return population

Part 3: Run the Parallel VersionΒΆ

Pass evaluate_pop_parallel into the same run() helper. Everything else is identical.

[13]:
run(evaluate_pop_parallel, label="parallel")

# Tidy up Ray workers when you are done (optional)
ray.shutdown()
[15:49:35] Database file exists at d:\University\EC TA\ariel\docs\source\EA_intro\__data__\database.db! a004.py:105
           Behaviour is set to: 'delete' --> ⚠️  Deleting file!                                                     
───────────────────────────────────────────────── EA Initialised ──────────────────────────────────────────────────
(raylet) [2026-03-10 15:49:38,384 E 14360 17780] (raylet.exe) main.cc:1032: Failed to establish connection to the
metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to
initialize the metrics agent. rpc_code: 14
(evaluate_ind_remote pid=20964) [2026-03-10 15:49:51,630 E 20964 14808] core_worker_process.cc:842: Failed to
establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError:
Running out of retries to initialize the metrics agent. rpc_code: 14
─────────────────────────────────────────────── EA Finished Running ───────────────────────────────────────────────
[15:50:08]   best fitness = 4.8189  |  wall time = 32.37s                                          4154822577.py:20
(evaluate_ind_remote pid=2112) [2026-03-10 15:49:54,922 E 2112 11988] core_worker_process.cc:842: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14 [repeated 11x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options.)

Part 4: Side-by-Side DiffΒΆ

These are the only lines that differ between the serial and parallel versions.

# ── Serial ───────────────────────────────────────────────────────────────────

def evaluate_ind(ind: Individual) -> float:
    return Ackley(cast("list[float]", ind.genotype))

def evaluate_pop_serial(population: Population) -> Population:
    for ind in population:
        if ind.requires_eval:
            ind.fitness = evaluate_ind(ind)
    return population


# ── Parallel ─────────────────────────────────────────────────────────────────
+ import ray
+ ray.init()

+ @ray.remote
def evaluate_ind_remote(ind: Individual) -> float:
    return Ackley(cast("list[float]", ind.genotype))   # body unchanged

def evaluate_pop_parallel(population: Population) -> Population:
    to_eval = [ind for ind in population if ind.requires_eval]
    if not to_eval:
        return population
+   futures = [evaluate_ind_remote.remote(ind) for ind in to_eval]
+   results = ray.get(futures)
    for ind, fitness in zip(to_eval, results):
        ind.fitness = fitness
    return population

Part 5: Tips and Common PitfallsΒΆ

When is parallelisation worth it?

  • Ray adds a small fixed overhead to spin up workers and serialise data between processes. For very cheap fitness functions (like Ackley on 10 dimensions) the gain may be modest. The benefit grows with evaluation cost β€” MuJoCo simulation, neural-network training, or any physics-based fitness will see large speedups.

Keep ``evaluate_ind`` stateless.

  • Ray workers are separate processes; they do not share memory with the main process. Write evaluation functions that depend only on their arguments and avoid mutating global state.

Only parallelise evaluation.

  • Selection, crossover, and mutation operate on the shared population list sequentially in ARIEL by design. Parallelising them adds synchronisation overhead with very little benefit. Confine Ray usage to evaluate_pop.

Call ``ray.init()`` once.

  • Calling ray.init() when Ray is already running will raise a warning or error. If you need to restart (e.g. to change num_cpus), call ray.shutdown() first.

Installation.

  • Ray is by default included in the ARIEL environment

[2]:
import numpy as np
a = np.array([[1, 2, 3, 10],
     [4, 5, 6, 11],
     [7, 8, 9, 12],
     ])

print(a.mean(axis=0))
print(a.mean(axis=1))
[ 4.  5.  6. 11.]
[4.  6.5 9. ]
[ ]: