workflow - Orchestration

The workflow module provides orchestration of multi-step computations with caching and reproducibility. This is Layer 2 of the architecture.

Key Concepts

  • Step: A single unit of work that wires a compute model’s fields to/from named state slots.

  • Workflow: An ordered list of Step instances with input/output caching (modelInputs / modelOutputs).

  • State: A shared, named key-value store passed between steps. Two backends: State (plain dict) and StateWithGlueDb (SQLite-backed via GlueDb).

  • Runner: A thin wrapper that assembles a Workflow and State (as instances or python:// refs) and runs them.

Workflow Class

Workflow Orchestration

This module provides the Workflow class for orchestrating multi-step computations with caching and reproducibility. Workflows manage state between steps and can skip steps when inputs haven’t changed.

Key Concepts

  • Workflow: A sequence of Steps with input/output caching

  • Step: A single unit of work with defined inputs and outputs

  • State: Shared data store that persists between steps

Example

>>> from pyswark.workflow.workflow import Workflow
>>> from pyswark.workflow.step import Step
>>> from pyswark.workflow.state import State
>>>
>>> workflow = Workflow(steps=[
...     Step(model='mymodule.PreprocessModel',
...          inputs={'raw_data': 'data'},
...          outputs={'processed': 'clean_data'}),
...     Step(model='mymodule.AnalysisModel',
...          inputs={'clean_data': 'data'},
...          outputs={'result': 'analysis'})
... ])
>>>
>>> state = State()
>>> state.post(raw_dataframe, name='data')
>>> result = workflow.run(state)
class pyswark.workflow.workflow.Extracts(*, data: dict[int, dict] = <factory>)

Bases: BaseModel

Container for cached model inputs or outputs.

Used internally by Workflow to store and compare inputs/outputs for skip logic.

add(i, extract)
data: dict[int, dict]
equal(i: int, other)
get(i)
model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class pyswark.workflow.workflow.Workflow(*, steps: list[pyswark.workflow.step.Step], modelInputs: ~pyswark.workflow.workflow.Extracts = <factory>, modelOutputs: ~pyswark.workflow.workflow.Extracts = <factory>, useExtracts: bool = True, populateExtracts: bool = True, stepsSkipped: list[int] = <factory>, stepsRan: list[int] = <factory>)

Bases: BaseModel

Orchestrates a sequence of computational steps with caching.

A Workflow manages the execution of multiple Steps, passing data between them via a shared State object. It supports caching of inputs/outputs to skip steps when inputs haven’t changed.

Parameters:
  • steps (list[Step]) – The sequence of steps to execute.

  • useExtracts (bool, default=True) – Whether to use cached inputs/outputs for skip logic.

  • populateExtracts (bool, default=True) – Whether to store inputs/outputs for future runs.

stepsSkipped

Indices of steps that were skipped (cached).

Type:

list[int]

stepsRan

Indices of steps that were executed.

Type:

list[int]

Example

>>> workflow = Workflow(steps=[step1, step2, step3])
>>> state = State()
>>> state.post('input_data', my_data)
>>> result = workflow.run(state)
addStep(step: Step)

Add a step to the workflow.

Parameters:

step (Step or dict) – A Step instance or dictionary with step configuration.

getModel(i: int)
getModelInput(i: int)
getModelOutput(i: int)
modelInputs: Extracts
modelOutputs: Extracts
model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

populateExtracts: bool
run(state: State)

Execute all steps in the workflow.

Steps are executed in order, with each step’s outputs posted to the shared State. Steps may be skipped if their inputs match cached values from previous runs.

Parameters:

state (State) – The shared state object containing input data.

Returns:

The output from the final step.

Return type:

Any

steps: list[pyswark.workflow.step.Step]
stepsRan: list[int]
stepsSkipped: list[int]
useExtracts: bool

Step Class

Workflow Step

This module defines the Step class, the fundamental unit of work in a workflow. Each Step specifies a model to run and mappings between workflow state and model inputs/outputs.

class pyswark.workflow.step.Step(*, model: str, inputs: Dict[str, str], outputs: Dict[str, str])

Bases: BaseModel

A single unit of work in a Workflow.

A Step defines:

  • A model (class) to instantiate and run

  • Input mappings: state names → model input names

  • Output mappings: model output names → state names

Parameters:
  • model (str) – Fully-qualified Python path to the model class (e.g., "mymodule.MyModel").

  • inputs (dict[str, str]) – Mapping from state variable names to model input names. {state_name: model_input_name}.

  • outputs (dict[str, str]) – Mapping from model output names to state variable names. {model_output_name: state_name}.

Example

>>> step = Step(
...     model='mymodule.PreprocessModel',
...     inputs={'raw_data': 'data'},      # state 'raw_data' → model 'data'
...     outputs={'processed': 'clean'}    # model 'processed' → state 'clean'
... )
extractModelFromModelInput(inputData)
extractModelInputFromStateInput(stateInput)

extracts inputs from state

extractStateInputFromState(state)

extracts inputs from state

extractStateOutputFromModelOutput(modelOutput)

extracts outputs from model

getModelInputNames()
getModelOutputNames()
getStateInputNames()
getStateOutputNames()
inputs: Dict[str, str]
model: str
model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

outputs: Dict[str, str]
postStateOutputToState(state, outputData)

loads output to state

run(state)

Execute this step using data from state.

Extracts inputs from state, runs the model, and posts outputs back to state.

Parameters:

state (State) – The shared state object.

Returns:

The step’s outputs (also posted to state).

Return type:

dict

State Classes

class pyswark.workflow.state.Interface(backend=None, *, mutable: bool = False)

Bases: BaseModel

backend: dict
delete(name: str)

deletes the data from the state

extract(name: str)

extracts the data from the state

model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

mutable: bool
post(data: Any, name: str)

posts the data into the state

class pyswark.workflow.state.State(backend=None, *, mutable: bool = False)

Bases: Interface

delete(name: str)

deletes the data from the state

extract(name: str)

extracts the data from the state

model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

post(data: Any, name: str)

posts the data into the state. If the state is immutable, the data is validated to ensure that it does not contain any keys that are already in the state.

class pyswark.workflow.state.StateWithGlueDb(backend=None, *, mutable: bool = False)

Bases: Interface

backend: Db
delete(name: str)

deletes the data from the state

extract(name: str)

extracts the data from the state

model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

post(data: Any, name: str)

posts the data into the state

Runner Class

Workflow Runner

This module provides the Runner class, a thin convenience wrapper that assembles and executes a Workflow against a Interface (state) object.

A Runner accepts the workflow and state either as instances or as URI-addressable references (strings resolved via pyswark.core.io.api.read(), typically python:// URIs). After running, the post-run workflow and state are cached on the runner so the caller can inspect intermediate extracts or rerun.

Example

>>> from pyswark.workflow.runner import Runner
>>> runner = Runner( workflow=my_workflow, state=my_state )
>>> result = runner.run()
>>> result2 = runner.rerun()   # reruns using the cached workflow/state
class pyswark.workflow.runner.Runner(*, workflow: str | Workflow, state: str | Interface, rerunWorkflow: None | Workflow = None, rerunState: None | Interface = None)

Bases: BaseModel

Assembles and runs a workflow with a dynamically resolved state.

getRerunState()
getRerunWorkflow(useExtracts=None, populateExtracts=None)
getState()
getWorkflow()
model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

rerun(**kw)
rerunState: None | Interface
rerunWorkflow: None | Workflow
run()
state: str | Interface
workflow: str | Workflow

Usage Examples

Basic Workflow

from pyswark.workflow.workflow import Workflow
from pyswark.workflow.step import Step
from pyswark.workflow.state import State

workflow = Workflow(steps=[
    Step(
        model='mymodule.PreprocessModel',
        inputs={'raw_data': 'data'},      # state 'raw_data' -> model field 'data'
        outputs={'processed': 'clean'},   # model output 'processed' -> state 'clean'
    ),
    Step(
        model='mymodule.AnalysisModel',
        inputs={'clean': 'data'},
        outputs={'result': 'analysis'},
    ),
])

# Initialize state with input data. State.post signature is post(data, name=...)
state = State()
state.post(my_dataframe, name='raw_data')

result = workflow.run(state)

Workflow with Caching

Workflows cache each step’s inputs and outputs in modelInputs and modelOutputs. On subsequent runs, any step whose current input matches its cached input is skipped and its cached output is used instead.

# First run — all steps execute
result1 = workflow.run(state)
print(workflow.stepsRan)      # [0, 1]
print(workflow.stepsSkipped)  # []

# Second run — steps are skipped if inputs are unchanged.
# Note: stepsRan and stepsSkipped accumulate across runs; they are not reset.
result2 = workflow.run(state)
print(workflow.stepsRan)      # [0, 1]  — still from first run
print(workflow.stepsSkipped)  # [0, 1]  — newly skipped on the second run

On a skipped step, the workflow posts cached outputs to state only for names that are not already present. This makes workflow.run(state) safely re-callable against immutable StateWithGlueDb backends.

Toggle caching with the useExtracts and populateExtracts flags on Workflow:

workflow.useExtracts = False        # do not consult the cache; always run
workflow.populateExtracts = False   # do not update the cache after each step

Creating a Model for Workflow Steps

Models used in workflows are Pydantic models (extending pyswark.lib.pydantic.base.BaseModel) with a run() method that returns a dict:

from pyswark.lib.pydantic import base
from pyswark.core.io import api as io

class ExtractModel(base.BaseModel):
    uri: str

    def run(self):
        return {'data': io.read(self.uri)}

class TransformModel(base.BaseModel):
    data: object

    def run(self):
        return {'processed': self.data.dropna()}

The Step then references the model by class (or fully-qualified string path) and maps the state ↔ model field names:

from pyswark.workflow.step import Step

extract  = Step(model=ExtractModel,
                inputs={'uri': 'uri'},
                outputs={'data': 'raw'})
transform = Step(model=TransformModel,
                 inputs={'raw': 'data'},
                 outputs={'processed': 'clean'})

Inspecting Workflow State

# Cached inputs/outputs per step index
model_input  = workflow.getModelInput(0)
model_output = workflow.getModelOutput(0)

# Reconstruct the model instance used at step 0
model = workflow.getModel(0)

Runner: the top-level entry point

Runner is a thin wrapper that assembles a Workflow and a State and runs them. Both may be passed as instances or as python://module.path reference strings; strings are resolved lazily via pyswark.core.io.api.read() on every call (reloadmodule=True by default), which gives you a fresh instance on each run().

from pyswark.workflow.runner import Runner

runner = Runner(
    workflow = 'python://mymodule.workflows.etl.WORKFLOW',
    state    = 'python://mymodule.states.season.SEASON_2023',
)
result = runner.run()

# Post-run workflow and state are cached on the runner for inspection
print(runner.rerunWorkflow.stepsRan)
print(runner.rerunState.extract('clean'))

# `rerun()` replays the workflow against the cached state. Same inputs ->
# all steps are skipped. Pass flag overrides to force re-execution.
result = runner.rerun()
result = runner.rerun(useExtracts=False, populateExtracts=False)

See the Workflow Runner gallery example for an end-to-end walkthrough.

Application layout

Applications that use the workflow framework follow a canonical layout that keeps step models, workflow graphs, state factories, and runner factories independent. See AGENTS.md (Applications: canonical layout section) and the reference implementation at pyswark/apps/baseball/:

pyswark/apps/<app_name>/
├── steps/        # Compute models, one per file
├── workflows/    # Workflow INSTANCES composing steps
├── states/       # State factories (create(**kw) -> State)
└── runner/       # Runner factories wiring workflow + state by python:// refs