Executor API¶
This is the API documentation for the executor framework.
Executor Entrypoint¶
executor_main ¶
executor_main(
config: ExecutorMainConfig,
steps: list[ExecutorStep],
description: str | None = None,
max_concurrent: int | None = None,
)
Main entry point for experiments (to standardize).
Parameters:
-
config(ExecutorMainConfig) –Parsed CLI config (draccus). Carries
--dry_run,--max_concurrent, etc. -
steps(list[ExecutorStep]) –Steps to execute.
-
description(str | None, default:None) –Optional human-readable description recorded in executor info.
-
max_concurrent(int | None, default:None) –Programmatic cap on concurrent step execution. If provided, takes precedence over
config.max_concurrent. Use this to express "run all of these with cap N" inside an experiment script without requiring users to pass--max_concurrent Non the CLI.
ExecutorMainConfig
dataclass
¶
Attributes¶
prefix
class-attribute
instance-attribute
¶
prefix: str | None = None
Attached to every output path that's constructed (e.g., the GCS bucket).
executor_info_base_path
class-attribute
instance-attribute
¶
executor_info_base_path: str | None = None
Where the executor info should be stored under a file determined by a hash.
run_only
class-attribute
instance-attribute
¶
Run these steps (matched by regex.search) and their dependencies only. If None, run all steps.
Executor and Steps¶
Executor ¶
Performs the execution of a pipeline of ExecutorSteps.
1. Instantiate all the output_paths for each ExecutorStep based on prefix, names, and versions of everything.
2. Run each ExecutorStep in a proper topological sort order.
Attributes¶
Functions¶
run ¶
run(
steps: list[ExecutorStep | InputName],
*,
dry_run: bool = False,
run_only: list[str] | None = None,
force_run_failed: bool = True,
max_concurrent: int | None = None
) -> dict[ExecutorStep, str]
Run the pipeline of ExecutorSteps.
Parameters:
-
steps(list[ExecutorStep | InputName]) –The steps to run.
-
dry_run(bool, default:False) –If True, walk the step graph and log what would run, without touching remote filesystems or launching any work.
-
run_only(list[str] | None, default:None) –If not None, only run the steps in the list and their dependencies. Matches steps' names as regex
-
force_run_failed(bool, default:True) –If True, run steps even if they have already been run (including if they failed)
-
max_concurrent(int | None, default:None) –Maximum number of steps to run concurrently. If None, run all ready steps in parallel.
Returns:
-
dict[ExecutorStep, str]–Mapping from every known
ExecutorStep(including transitive -
dict[ExecutorStep, str]–dependencies discovered while walking
steps) to its concrete -
dict[ExecutorStep, str]–output path.
canonicalize ¶
canonicalize(step: ExecutorStep) -> ExecutorStep
Multiple instances of ExecutorStep might have the same version.
ExecutorStep
dataclass
¶
Bases: Generic[ConfigT_co]
An ExecutorStep represents a single step of a larger pipeline (e.g.,
transforming HTML to text). It is specified by:
- a name (str), which is used to determine the output_path.
- a function fn (Callable), and
- a configuration config which gets passed into fn.
- a pip dependencies list (Optional[list[str]]) which are the pip dependencies required for the step.
These can be keys of project.optional-dependencies in the project's pyproject.toml file or any other pip package.
When a step is run, we compute the following two things for each step:
- version: represents all the upstream dependencies of the step
- output_path: the path where the output of the step are stored, based on
the name and a hash of the version.
The config is a dataclass object that recursively might have special
values of the following form:
- InputName(step, name): a dependency on another step, resolve to the step.output_path / name
- OutputName(name): resolves to the output_path / name
- VersionedValue(value): a value that should be part of the version
The config is instantiated by replacing these special values with the
actual paths during execution.
Note: step: ExecutorStep is interpreted as InputName(step, None).
Attributes¶
override_output_path
class-attribute
instance-attribute
¶
override_output_path: str | None = None
Specifies the output_path that should be used. Print warning if it
doesn't match the automatically computed one.
resources
class-attribute
instance-attribute
¶
resources: ResourceConfig | None = None
If set, this step is submitted as its own Fray job using these
resources. fn is invoked inside the submitted job.
If None, behavior is determined by fn: a RemoteCallable
submits as a Fray job; a plain callable runs inline in-process.
Functions¶
with_output_path ¶
with_output_path(output_path: str) -> ExecutorStep
Return a copy of the step with the given output_path.
Inputs and Outputs¶
InputName
dataclass
¶
To be interpreted as a previous step's output_path joined with name.
Attributes¶
block_on_step
class-attribute
instance-attribute
¶
block_on_step: bool = True
If False, the step that uses this InputName
will not block (or attempt to execute) step. We use this for
documenting dependencies in the config, but where that step might not have technically finished...
For instance, we sometimes use training checkpoints before the training step has finished.
These "pseudo-dependencies" still impact the hash of the step, but they don't block execution.
Functions¶
hardcoded
staticmethod
¶
Sometimes we want to specify a path that is not part of the pipeline but is still relative to the prefix. Try to use this sparingly.
OutputName
dataclass
¶
Versioning¶
VersionedValue
dataclass
¶
ensure_versioned ¶
ensure_versioned(
value: VersionedValue[T_co] | T_co,
) -> VersionedValue[T_co]
Ensure that the value is wrapped in a VersionedValue. If it is already wrapped, return it as is.
unwrap_versioned_value ¶
unwrap_versioned_value(
value: VersionedValue[T_co] | T_co,
) -> T_co
Unwrap the value if it is a VersionedValue, otherwise return the value as is.
Recurses into dataclasses, dicts and lists to unwrap any nested VersionedValue instances. This method cannot handle InputName, OutputName, or ExecutorStep instances inside VersionedValue as their values depend on execution results.
get_executor_step ¶
get_executor_step(
run: ExecutorStep | InputName,
) -> ExecutorStep
Helper function to extract the ExecutorStep from an InputName or ExecutorStep.
Parameters:
-
run(ExecutorStep | InputName) –The input to extract the step from.
Returns:
-
ExecutorStep(ExecutorStep) –The extracted step.
Config Walking and Materialization¶
Helpers for inspecting and resolving placeholder-bearing configs without
running a full executor pipeline. compute_output_path and
resolve_local_placeholders run on the submitter; materialize is the
worker-side counterpart that submits any embedded ExecutorSteps and
substitutes resolved paths into the config.
walk_config ¶
Yield one event per InputName / VersionedValue placeholder reached
while recursively walking dataclasses, lists, and dicts inside obj.
Bare ExecutorSteps are normalized to InputName(step, None) events.
MirroredValues recurse into their inner value (no event of their own).
resolve_local_placeholders ¶
resolve_local_placeholders(
config: ConfigT, output_path: str
) -> ConfigT
Resolve every placeholder that the caller can resolve locally:
OutputName substitutions and VersionedValue unwrapping.
InputName(step=…) and bare ExecutorStep references are deferred
for the worker's materialize call (which resolves them under the
worker's region). MirroredValue is preserved (rebuilt around its
recursed inner value); its meaning is region-aware so resolution belongs
on the worker.
compute_output_path ¶
compute_output_path(
name: str,
config: Any,
*,
override_output_path: str | None = None,
prefix: str | None = None
) -> str
Compute the concrete output path a step with this name+config will produce.
Drives Executor.compute_version (which walks the config's dependency
graph and hashes versioned values — no GCS I/O, no job submission) far
enough to populate the resulting output path. Honors override_output_path
if provided. Otherwise resolves prefix from marin_prefix() and
derives the path from name + a hash of the config's versioned values,
matching Executor's scheme so a step run via Executor.run and a
path computed here agree on the same value.
materialize ¶
materialize(
config: ConfigT,
*,
prefix: str | None = None,
output_path: str | None = None
) -> ConfigT
Run any ExecutorSteps embedded in config, then return a copy of
config with all placeholder paths substituted.
Composes three pieces:
upstream_steps(config)— find embeddedExecutorSteps.Executor(prefix=...).run(steps)— submit them as sub-jobs and block on completion.instantiate_config(config, output_path=<resolved>, output_paths=executor.output_paths, prefix=prefix)— substituteInputName/OutputName/VersionedValue/ExecutorStepplaceholders using the just-computed paths.
Parameters:
-
config(ConfigT) –A launcher config dataclass that may embed
ExecutorSteps and placeholder values. -
prefix(str | None, default:None) –Storage prefix for newly-submitted sub-jobs. Defaults to
marin_prefix()(the worker's regionalgs://marin-{R}bucket), so upstream data is co-located with training. -
output_path(str | None, default:None) –Concrete output path for the current step, used to resolve
OutputName(name=...)placeholders insideconfig. IfNone,materializereadsconfig.output_path. For callers whose config type does not exposeoutput_path, pass it explicitly.
Returns:
-
ConfigT–A copy of
configwith all placeholders substituted to concrete -
ConfigT–paths. A config containing no placeholders round-trips unchanged
-
ConfigT–(idempotent — no sub-jobs submitted).