workflow - Orchestration
The workflow module provides orchestration of multi-step computations
with caching and reproducibility. This is Layer 2 of the architecture.
Key Concepts
Step: A single unit of work that wires a compute model’s fields to/from named state slots.
Workflow: An ordered list of
Stepinstances with input/output caching (modelInputs/modelOutputs).State: A shared, named key-value store passed between steps. Two backends:
State(plain dict) andStateWithGlueDb(SQLite-backed via GlueDb).Runner: A thin wrapper that assembles a
WorkflowandState(as instances orpython://refs) and runs them.
Workflow Class
Workflow Orchestration
This module provides the Workflow class for orchestrating multi-step computations with caching and reproducibility. Workflows manage state between steps and can skip steps when inputs haven’t changed.
Key Concepts
Workflow: A sequence of Steps with input/output caching
Step: A single unit of work with defined inputs and outputs
State: Shared data store that persists between steps
Example
>>> from pyswark.workflow.workflow import Workflow
>>> from pyswark.workflow.step import Step
>>> from pyswark.workflow.state import State
>>>
>>> workflow = Workflow(steps=[
... Step(model='mymodule.PreprocessModel',
... inputs={'raw_data': 'data'},
... outputs={'processed': 'clean_data'}),
... Step(model='mymodule.AnalysisModel',
... inputs={'clean_data': 'data'},
... outputs={'result': 'analysis'})
... ])
>>>
>>> state = State()
>>> state.post(raw_dataframe, name='data')
>>> result = workflow.run(state)
- class pyswark.workflow.workflow.Extracts(*, data: dict[int, dict] = <factory>)
Bases:
BaseModelContainer for cached model inputs or outputs.
Used internally by Workflow to store and compare inputs/outputs for skip logic.
- add(i, extract)
- data: dict[int, dict]
- equal(i: int, other)
- get(i)
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class pyswark.workflow.workflow.Workflow(*, steps: list[pyswark.workflow.step.Step], modelInputs: ~pyswark.workflow.workflow.Extracts = <factory>, modelOutputs: ~pyswark.workflow.workflow.Extracts = <factory>, useExtracts: bool = True, populateExtracts: bool = True, stepsSkipped: list[int] = <factory>, stepsRan: list[int] = <factory>)
Bases:
BaseModelOrchestrates a sequence of computational steps with caching.
A Workflow manages the execution of multiple Steps, passing data between them via a shared State object. It supports caching of inputs/outputs to skip steps when inputs haven’t changed.
- Parameters:
steps (list[Step]) – The sequence of steps to execute.
useExtracts (bool, default=True) – Whether to use cached inputs/outputs for skip logic.
populateExtracts (bool, default=True) – Whether to store inputs/outputs for future runs.
- stepsSkipped
Indices of steps that were skipped (cached).
- Type:
list[int]
- stepsRan
Indices of steps that were executed.
- Type:
list[int]
Example
>>> workflow = Workflow(steps=[step1, step2, step3]) >>> state = State() >>> state.post('input_data', my_data) >>> result = workflow.run(state)
- addStep(step: Step)
Add a step to the workflow.
- Parameters:
step (Step or dict) – A Step instance or dictionary with step configuration.
- getModel(i: int)
- getModelInput(i: int)
- getModelOutput(i: int)
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- populateExtracts: bool
- run(state: State)
Execute all steps in the workflow.
Steps are executed in order, with each step’s outputs posted to the shared State. Steps may be skipped if their inputs match cached values from previous runs.
- Parameters:
state (State) – The shared state object containing input data.
- Returns:
The output from the final step.
- Return type:
Any
- steps: list[pyswark.workflow.step.Step]
- stepsRan: list[int]
- stepsSkipped: list[int]
- useExtracts: bool
Step Class
Workflow Step
This module defines the Step class, the fundamental unit of work in a workflow. Each Step specifies a model to run and mappings between workflow state and model inputs/outputs.
- class pyswark.workflow.step.Step(*, model: str, inputs: Dict[str, str], outputs: Dict[str, str])
Bases:
BaseModelA single unit of work in a Workflow.
A Step defines:
A model (class) to instantiate and run
Input mappings: state names → model input names
Output mappings: model output names → state names
- Parameters:
model (str) – Fully-qualified Python path to the model class (e.g.,
"mymodule.MyModel").inputs (dict[str, str]) – Mapping from state variable names to model input names.
{state_name: model_input_name}.outputs (dict[str, str]) – Mapping from model output names to state variable names.
{model_output_name: state_name}.
Example
>>> step = Step( ... model='mymodule.PreprocessModel', ... inputs={'raw_data': 'data'}, # state 'raw_data' → model 'data' ... outputs={'processed': 'clean'} # model 'processed' → state 'clean' ... )
- extractModelFromModelInput(inputData)
- extractModelInputFromStateInput(stateInput)
extracts inputs from state
- extractStateInputFromState(state)
extracts inputs from state
- extractStateOutputFromModelOutput(modelOutput)
extracts outputs from model
- getModelInputNames()
- getModelOutputNames()
- getStateInputNames()
- getStateOutputNames()
- inputs: Dict[str, str]
- model: str
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- outputs: Dict[str, str]
- postStateOutputToState(state, outputData)
loads output to state
State Classes
- class pyswark.workflow.state.Interface(backend=None, *, mutable: bool = False)
Bases:
BaseModel- backend: dict
- delete(name: str)
deletes the data from the state
- extract(name: str)
extracts the data from the state
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- mutable: bool
- post(data: Any, name: str)
posts the data into the state
- class pyswark.workflow.state.State(backend=None, *, mutable: bool = False)
Bases:
Interface- delete(name: str)
deletes the data from the state
- extract(name: str)
extracts the data from the state
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- post(data: Any, name: str)
posts the data into the state. If the state is immutable, the data is validated to ensure that it does not contain any keys that are already in the state.
- class pyswark.workflow.state.StateWithGlueDb(backend=None, *, mutable: bool = False)
Bases:
Interface- delete(name: str)
deletes the data from the state
- extract(name: str)
extracts the data from the state
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- post(data: Any, name: str)
posts the data into the state
Runner Class
Workflow Runner
This module provides the Runner class, a thin convenience wrapper
that assembles and executes a Workflow
against a Interface (state) object.
A Runner accepts the workflow and state either as instances or as
URI-addressable references (strings resolved via
pyswark.core.io.api.read(), typically python:// URIs). After
running, the post-run workflow and state are cached on the runner so the
caller can inspect intermediate extracts or rerun.
Example
>>> from pyswark.workflow.runner import Runner
>>> runner = Runner( workflow=my_workflow, state=my_state )
>>> result = runner.run()
>>> result2 = runner.rerun() # reruns using the cached workflow/state
- class pyswark.workflow.runner.Runner(*, workflow: str | Workflow, state: str | Interface, rerunWorkflow: None | Workflow = None, rerunState: None | Interface = None)
Bases:
BaseModelAssembles and runs a workflow with a dynamically resolved state.
- getRerunState()
- getRerunWorkflow(useExtracts=None, populateExtracts=None)
- getState()
- getWorkflow()
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- rerun(**kw)
- run()
Usage Examples
Basic Workflow
from pyswark.workflow.workflow import Workflow
from pyswark.workflow.step import Step
from pyswark.workflow.state import State
workflow = Workflow(steps=[
Step(
model='mymodule.PreprocessModel',
inputs={'raw_data': 'data'}, # state 'raw_data' -> model field 'data'
outputs={'processed': 'clean'}, # model output 'processed' -> state 'clean'
),
Step(
model='mymodule.AnalysisModel',
inputs={'clean': 'data'},
outputs={'result': 'analysis'},
),
])
# Initialize state with input data. State.post signature is post(data, name=...)
state = State()
state.post(my_dataframe, name='raw_data')
result = workflow.run(state)
Workflow with Caching
Workflows cache each step’s inputs and outputs in modelInputs and
modelOutputs. On subsequent runs, any step whose current input matches
its cached input is skipped and its cached output is used instead.
# First run — all steps execute
result1 = workflow.run(state)
print(workflow.stepsRan) # [0, 1]
print(workflow.stepsSkipped) # []
# Second run — steps are skipped if inputs are unchanged.
# Note: stepsRan and stepsSkipped accumulate across runs; they are not reset.
result2 = workflow.run(state)
print(workflow.stepsRan) # [0, 1] — still from first run
print(workflow.stepsSkipped) # [0, 1] — newly skipped on the second run
On a skipped step, the workflow posts cached outputs to state only for names
that are not already present. This makes workflow.run(state) safely
re-callable against immutable StateWithGlueDb backends.
Toggle caching with the useExtracts and populateExtracts flags on
Workflow:
workflow.useExtracts = False # do not consult the cache; always run
workflow.populateExtracts = False # do not update the cache after each step
Creating a Model for Workflow Steps
Models used in workflows are Pydantic models (extending
pyswark.lib.pydantic.base.BaseModel) with a run() method that returns
a dict:
from pyswark.lib.pydantic import base
from pyswark.core.io import api as io
class ExtractModel(base.BaseModel):
uri: str
def run(self):
return {'data': io.read(self.uri)}
class TransformModel(base.BaseModel):
data: object
def run(self):
return {'processed': self.data.dropna()}
The Step then references the model by class (or fully-qualified string
path) and maps the state ↔ model field names:
from pyswark.workflow.step import Step
extract = Step(model=ExtractModel,
inputs={'uri': 'uri'},
outputs={'data': 'raw'})
transform = Step(model=TransformModel,
inputs={'raw': 'data'},
outputs={'processed': 'clean'})
Inspecting Workflow State
# Cached inputs/outputs per step index
model_input = workflow.getModelInput(0)
model_output = workflow.getModelOutput(0)
# Reconstruct the model instance used at step 0
model = workflow.getModel(0)
Runner: the top-level entry point
Runner is a thin wrapper that assembles a Workflow and a State
and runs them. Both may be passed as instances or as python://module.path
reference strings; strings are resolved lazily via
pyswark.core.io.api.read() on every call (reloadmodule=True by
default), which gives you a fresh instance on each run().
from pyswark.workflow.runner import Runner
runner = Runner(
workflow = 'python://mymodule.workflows.etl.WORKFLOW',
state = 'python://mymodule.states.season.SEASON_2023',
)
result = runner.run()
# Post-run workflow and state are cached on the runner for inspection
print(runner.rerunWorkflow.stepsRan)
print(runner.rerunState.extract('clean'))
# `rerun()` replays the workflow against the cached state. Same inputs ->
# all steps are skipped. Pass flag overrides to force re-execution.
result = runner.rerun()
result = runner.rerun(useExtracts=False, populateExtracts=False)
See the Workflow Runner gallery example for an end-to-end walkthrough.
Application layout
Applications that use the workflow framework follow a canonical layout that
keeps step models, workflow graphs, state factories, and runner factories
independent. See AGENTS.md (Applications: canonical layout section) and
the reference implementation at pyswark/apps/baseball/:
pyswark/apps/<app_name>/
├── steps/ # Compute models, one per file
├── workflows/ # Workflow INSTANCES composing steps
├── states/ # State factories (create(**kw) -> State)
└── runner/ # Runner factories wiring workflow + state by python:// refs