Executor framework¶
Marin's executor framework manages the execution of experiments. This document is more about the mechanics, read this to learn more about the conventions.
Steps¶
An experiment is a sequence (really, a DAG) of steps, where each step is
specified by the following:
- name: an identifier describing the function (and its version)
- function: a normal Python function, or a function wrapped with remote() to run as a distributed Fray sub-job (which enables massive parallelism across the cluster)
- config: the single argument to the function, which is a dataclass; fields of the config can refer to previous steps.
A key decision in Marin is that data gets passed between steps by reading and writing to the filesystem. The rationale is two-fold: - For very large datasets, where efficiency and robustness is a concern, we give the steps full control over serialization and deserialization. - It makes the intermediate state completely transparent, and one can do things like monitor the state while it's being created (e.g., a jsonl file).
In particular, each step associated with an output path where that step writes its output (in any format). When a step A references another step B in its config, that step simply resolves to step B's output path, and step A is responsible for reading the data from that output path. The name of the output path includes the step name and a hash of the config (at least the part of it that's explicitly versioned) and all its dependencies.
In the hello world example, we have two steps, generating data and compute statistics.
See the documentation in executor.py for more details.
Coordination between multiple pipelines is handled via lease files. This prevents duplicate execution if, for example, 2 Executor pipelines share common ancestor steps.
Mirrored inputs¶
Some datasets live in a specific regional bucket (e.g.
gs://marin-us-central2/documents/stackexchange/...) but experiments may run
from any region. The mirrored() wrapper marks an input path for
cross-region mirroring so that the executor copies the data to the local
marin prefix before the step runs.
from marin.execution.executor import mirrored, versioned
step = ExecutorStep(
name="train",
fn=my_training_fn,
config=TrainConfig(
dataset=mirrored(versioned("documents/stackexchange/v1"), budget_gb=50),
),
)
At config instantiation time, mirrored() rewrites the path to use the
mirror:// protocol. When the step's function opens the path via fsspec,
the MirrorFileSystem transparently copies data from whichever regional bucket
has it into the local marin prefix, respecting the per-path transfer budget.
Key details:
budget_gb(default 10) caps how much data (in GB) a single step may copy cross-region. The budget is enforced via themirror_budgetcontext manager fromrigging.filesystem.- Paths that already exist in the local prefix are not re-copied.
mirrored()can wrap plain strings orVersionedValue/InputNamereferences.- To adjust the global mirror budget default, set the
MARIN_MIRROR_BUDGET_GBenvironment variable before the process starts.
Distributed execution (Fray + Iris)¶
Each step's function runs either directly in the executor driver process or, when the step requests accelerators or a special environment, as a separate Fray sub-job. Fray is Marin's abstraction for job/task scheduling; on shared infrastructure it dispatches to Iris.
Steps can declare environment extras via remote():
- Default packages: installed into the driver job from
pyproject.toml (fsspec, draccus, etc.).
- Step-specific packages: remote() accepts pip_dependency_groups — a list
of either (1) a key from project.optional-dependencies in
lib/marin/pyproject.toml (e.g., rl), or (2) a specific pip
package. Each step runs in its own environment without interfering with others.
For example, to install the dependencies specified in the
rl extra and also uv pip install google-cloud-logging,
one can do:
number_of_restarts = ExecutorStep(
name=...,
fn=remote(my_fn, pip_dependency_groups=["rl", "google-cloud-logging"]),
config=...,
)
To launch an experiment on the shared Iris cluster, submit the executor script
as the entrypoint of a CPU-only Iris job. The script then uses executor_main
to spawn the accelerated sub-jobs via Fray:
uv run iris --cluster=marin job run --cpu=1 --memory=2G --extra=cpu \
-- python -m experiments.tutorials.hello_world
See lib/iris/OPS.md
for the full Iris CLI reference, including --no-wait, log streaming, and
job lifecycle commands.
Submitting training jobs¶
The standard recipe — default_train(...) to build an ExecutorStep, then
executor_main(steps=[...]) — produces a step the driver walks and dispatches
to a worker. The worker reads the already-resolved trainer config and starts
training.
For runs where you would rather have the executor walk happen inside the
training worker, use experiments.defaults.train(...) instead. It builds a
Levanter config with OutputName / InputName placeholders intact, then
submits a single Iris training job that resolves the entire chain on the
worker — compute_output_path, resolve_local_placeholders, the checkpointer
bake, run-id imputation, and materialize. Every path comes from the
worker's regional marin_prefix(), so a cross-region preempt-and-resume
writes checkpoints in the new region and re-tokenizes locally instead of
dragging data back across regions.
Use default_train + executor_main when you want a normal experiment
graph; use train for one-shot launches and sweep trials (see the
sweep tutorial) where each child job
should resolve its own dependencies in-region.
Agents can use the
add-datasetskill for a guide to dataset schema inspection and addition.