Skip to content

Executor API

This is the API documentation for the executor framework.

Executor Entrypoint

executor_main

executor_main(
    config: ExecutorMainConfig,
    steps: list[ExecutorStep],
    description: str | None = None,
    max_concurrent: int | None = None,
)

Main entry point for experiments (to standardize).

Parameters:

  • config (ExecutorMainConfig) –

    Parsed CLI config (draccus). Carries --dry_run, --max_concurrent, etc.

  • steps (list[ExecutorStep]) –

    Steps to execute.

  • description (str | None, default: None ) –

    Optional human-readable description recorded in executor info.

  • max_concurrent (int | None, default: None ) –

    Programmatic cap on concurrent step execution. If provided, takes precedence over config.max_concurrent. Use this to express "run all of these with cap N" inside an experiment script without requiring users to pass --max_concurrent N on the CLI.

ExecutorMainConfig dataclass

Attributes
prefix class-attribute instance-attribute
prefix: str | None = None

Attached to every output path that's constructed (e.g., the GCS bucket).

executor_info_base_path class-attribute instance-attribute
executor_info_base_path: str | None = None

Where the executor info should be stored under a file determined by a hash.

dry_run class-attribute instance-attribute
dry_run: bool = False
force_run_failed class-attribute instance-attribute
force_run_failed: bool = True
run_only class-attribute instance-attribute
run_only: list[str] | None = None

Run these steps (matched by regex.search) and their dependencies only. If None, run all steps.

max_concurrent class-attribute instance-attribute
max_concurrent: int | None = None

Maximum number of steps to run concurrently. If None, run all ready steps in parallel (default).

Executor and Steps

Executor

Performs the execution of a pipeline of ExecutorSteps. 1. Instantiate all the output_paths for each ExecutorStep based on prefix, names, and versions of everything. 2. Run each ExecutorStep in a proper topological sort order.

Attributes
prefix instance-attribute
prefix = prefix
executor_info_base_path instance-attribute
executor_info_base_path = executor_info_base_path
description instance-attribute
description = description
configs instance-attribute
configs: dict[ExecutorStep, dataclass] = {}
dependencies instance-attribute
dependencies: dict[ExecutorStep, list[ExecutorStep]] = {}
versions instance-attribute
versions: dict[ExecutorStep, dict[str, Any]] = {}
is_pseudo_dep instance-attribute
is_pseudo_dep: dict[ExecutorStep, bool] = {}
version_strs instance-attribute
version_strs: dict[ExecutorStep, str] = {}
version_str_to_step instance-attribute
version_str_to_step: dict[str, ExecutorStep] = {}
hashed_versions instance-attribute
hashed_versions: dict[ExecutorStep, str] = {}
output_paths instance-attribute
output_paths: dict[ExecutorStep, str] = {}
steps instance-attribute
steps: list[ExecutorStep] = []
step_infos instance-attribute
step_infos: list[ExecutorStepInfo] = []
executor_info instance-attribute
executor_info: ExecutorInfo | None = None
Functions
run
run(
    steps: list[ExecutorStep | InputName],
    *,
    dry_run: bool = False,
    run_only: list[str] | None = None,
    force_run_failed: bool = True,
    max_concurrent: int | None = None
) -> dict[ExecutorStep, str]

Run the pipeline of ExecutorSteps.

Parameters:

  • steps (list[ExecutorStep | InputName]) –

    The steps to run.

  • dry_run (bool, default: False ) –

    If True, walk the step graph and log what would run, without touching remote filesystems or launching any work.

  • run_only (list[str] | None, default: None ) –

    If not None, only run the steps in the list and their dependencies. Matches steps' names as regex

  • force_run_failed (bool, default: True ) –

    If True, run steps even if they have already been run (including if they failed)

  • max_concurrent (int | None, default: None ) –

    Maximum number of steps to run concurrently. If None, run all ready steps in parallel.

Returns:

compute_version
compute_version(step: ExecutorStep, is_pseudo_dep: bool)
canonicalize
canonicalize(step: ExecutorStep) -> ExecutorStep

Multiple instances of ExecutorStep might have the same version.

get_infos
get_infos()

Calculates info files for each step and also entire execution

get_experiment_url
get_experiment_url() -> str

Return the URL where the experiment can be viewed.

write_infos
write_infos()

Output JSON files (one for the entire execution, one for each step).

ExecutorStep dataclass

Bases: Generic[ConfigT_co]

An ExecutorStep represents a single step of a larger pipeline (e.g., transforming HTML to text). It is specified by: - a name (str), which is used to determine the output_path. - a function fn (Callable), and - a configuration config which gets passed into fn. - a pip dependencies list (Optional[list[str]]) which are the pip dependencies required for the step. These can be keys of project.optional-dependencies in the project's pyproject.toml file or any other pip package.

When a step is run, we compute the following two things for each step: - version: represents all the upstream dependencies of the step - output_path: the path where the output of the step are stored, based on the name and a hash of the version.

The config is a dataclass object that recursively might have special values of the following form: - InputName(step, name): a dependency on another step, resolve to the step.output_path / name - OutputName(name): resolves to the output_path / name - VersionedValue(value): a value that should be part of the version The config is instantiated by replacing these special values with the actual paths during execution.

Note: step: ExecutorStep is interpreted as InputName(step, None).

Attributes
name instance-attribute
name: str
fn instance-attribute
fn: ExecutorFunction
config instance-attribute
config: ConfigT_co
description class-attribute instance-attribute
description: str | None = None
override_output_path class-attribute instance-attribute
override_output_path: str | None = None

Specifies the output_path that should be used. Print warning if it doesn't match the automatically computed one.

resources class-attribute instance-attribute
resources: ResourceConfig | None = None

If set, this step is submitted as its own Fray job using these resources. fn is invoked inside the submitted job.

If None, behavior is determined by fn: a RemoteCallable submits as a Fray job; a plain callable runs inline in-process.

Functions
cd
cd(name: str) -> InputName

Refer to the name under self's output_path.

with_output_path
with_output_path(output_path: str) -> ExecutorStep

Return a copy of the step with the given output_path.

as_input_name
as_input_name() -> InputName

Inputs and Outputs

InputName dataclass

To be interpreted as a previous step's output_path joined with name.

Attributes
step instance-attribute
step: ExecutorStep | None
name instance-attribute
name: str | None
block_on_step class-attribute instance-attribute
block_on_step: bool = True

If False, the step that uses this InputName will not block (or attempt to execute) step. We use this for documenting dependencies in the config, but where that step might not have technically finished...

For instance, we sometimes use training checkpoints before the training step has finished.

These "pseudo-dependencies" still impact the hash of the step, but they don't block execution.

Functions
cd
cd(name: str) -> InputName
hardcoded staticmethod
hardcoded(path: str) -> InputName

Sometimes we want to specify a path that is not part of the pipeline but is still relative to the prefix. Try to use this sparingly.

nonblocking
nonblocking() -> InputName

the step will not block on (or attempt to execute) the parent step.

(Note that if another step depends on the parent step, it will still block on it.)

output_path_of

output_path_of(
    step: ExecutorStep, name: str | None = None
) -> InputName

this_output_path

this_output_path(name: str | None = None)

OutputName dataclass

To be interpreted as part of this step's output_path joined with name.

Attributes
name instance-attribute
name: str | None

THIS_OUTPUT_PATH module-attribute

THIS_OUTPUT_PATH = OutputName(None)

Versioning

VersionedValue dataclass

Bases: Generic[T_co]

Wraps a value, to signal that this value (part of a config) should be part of the version.

Attributes
value instance-attribute
value: T_co

versioned

versioned(value: T_co) -> VersionedValue[T_co]

ensure_versioned

ensure_versioned(
    value: VersionedValue[T_co] | T_co,
) -> VersionedValue[T_co]

Ensure that the value is wrapped in a VersionedValue. If it is already wrapped, return it as is.

unwrap_versioned_value

unwrap_versioned_value(
    value: VersionedValue[T_co] | T_co,
) -> T_co

Unwrap the value if it is a VersionedValue, otherwise return the value as is.

Recurses into dataclasses, dicts and lists to unwrap any nested VersionedValue instances. This method cannot handle InputName, OutputName, or ExecutorStep instances inside VersionedValue as their values depend on execution results.

get_executor_step

get_executor_step(
    run: ExecutorStep | InputName,
) -> ExecutorStep

Helper function to extract the ExecutorStep from an InputName or ExecutorStep.

Parameters:

Returns:

Config Walking and Materialization

Helpers for inspecting and resolving placeholder-bearing configs without running a full executor pipeline. compute_output_path and resolve_local_placeholders run on the submitter; materialize is the worker-side counterpart that submits any embedded ExecutorSteps and substitutes resolved paths into the config.

walk_config

walk_config(obj: Any) -> Iterator[_Event]

Yield one event per InputName / VersionedValue placeholder reached while recursively walking dataclasses, lists, and dicts inside obj.

Bare ExecutorSteps are normalized to InputName(step, None) events. MirroredValues recurse into their inner value (no event of their own).

resolve_local_placeholders

resolve_local_placeholders(
    config: ConfigT, output_path: str
) -> ConfigT

Resolve every placeholder that the caller can resolve locally: OutputName substitutions and VersionedValue unwrapping.

InputName(step=…) and bare ExecutorStep references are deferred for the worker's materialize call (which resolves them under the worker's region). MirroredValue is preserved (rebuilt around its recursed inner value); its meaning is region-aware so resolution belongs on the worker.

compute_output_path

compute_output_path(
    name: str,
    config: Any,
    *,
    override_output_path: str | None = None,
    prefix: str | None = None
) -> str

Compute the concrete output path a step with this name+config will produce.

Drives Executor.compute_version (which walks the config's dependency graph and hashes versioned values — no GCS I/O, no job submission) far enough to populate the resulting output path. Honors override_output_path if provided. Otherwise resolves prefix from marin_prefix() and derives the path from name + a hash of the config's versioned values, matching Executor's scheme so a step run via Executor.run and a path computed here agree on the same value.

materialize

materialize(
    config: ConfigT,
    *,
    prefix: str | None = None,
    output_path: str | None = None
) -> ConfigT

Run any ExecutorSteps embedded in config, then return a copy of config with all placeholder paths substituted.

Composes three pieces:

  1. upstream_steps(config) — find embedded ExecutorSteps.
  2. Executor(prefix=...).run(steps) — submit them as sub-jobs and block on completion.
  3. instantiate_config(config, output_path=<resolved>, output_paths=executor.output_paths, prefix=prefix) — substitute InputName / OutputName / VersionedValue / ExecutorStep placeholders using the just-computed paths.

Parameters:

  • config (ConfigT) –

    A launcher config dataclass that may embed ExecutorSteps and placeholder values.

  • prefix (str | None, default: None ) –

    Storage prefix for newly-submitted sub-jobs. Defaults to marin_prefix() (the worker's regional gs://marin-{R} bucket), so upstream data is co-located with training.

  • output_path (str | None, default: None ) –

    Concrete output path for the current step, used to resolve OutputName(name=...) placeholders inside config. If None, materialize reads config.output_path. For callers whose config type does not expose output_path, pass it explicitly.

Returns:

  • ConfigT

    A copy of config with all placeholders substituted to concrete

  • ConfigT

    paths. A config containing no placeholders round-trips unchanged

  • ConfigT

    (idempotent — no sub-jobs submitted).