Workflow Runner

The Runner assembles a Workflow and a State, runs them, and caches the post-run artifacts for inspection and rerunning. Runners are the stable public entry point for every application under pyswark.apps.

This example walks through the runner end-to-end with a tiny two-step arithmetic workflow: (x + y) * k.

Define the step models — the actual compute. In a real application these would live one-per-file under pyswark/apps/<name>/steps/.

from pyswark.lib.pydantic import base


class AddModel( base.BaseModel ):
    a: int
    b: int

    def run( self ):
        return { 'sum': self.a + self.b }


class MultiplyModel( base.BaseModel ):
    value  : int
    factor : int

    def run( self ):
        return { 'product': self.value * self.factor }

Compose the step models into a Workflow. Each Step wires a model’s fields to/from named slots in the shared state:

  • inputs: {state_name: model_field_name}

  • outputs: {model_output_key: state_name}

In a real application this module-level WORKFLOW would live under pyswark/apps/<name>/workflows/<graph>.py.

from pyswark.workflow.workflow import Workflow
from pyswark.workflow.step import Step

WORKFLOW = Workflow( steps=[
    Step(
        model   = AddModel,
        inputs  = { 'x': 'a', 'y': 'b' },
        outputs = { 'sum': 'total' },
    ),
    Step(
        model   = MultiplyModel,
        inputs  = { 'total': 'value', 'k': 'factor' },
        outputs = { 'product': 'final' },
    ),
])

Build an initial state. A state factory like this would typically live under pyswark/apps/<name>/states/<scenario>.py.

from pyswark.workflow.state import State


def make_state():
    return State( backend={ 'x': 2, 'y': 3, 'k': 4 } )

Wire a Workflow and State into a Runner and run it.

from pyswark.workflow.runner import Runner

runner = Runner( workflow=WORKFLOW, state=make_state() )
result = runner.run()
print( "result:", result )    # {'final': 20}  —  (2 + 3) * 4
2026-04-29 12:30:36,384 :: INFO :: Running Step  0. __main__.AddModel...
2026-04-29 12:30:36,385 :: INFO :: done.
2026-04-29 12:30:36,385 :: INFO :: Running Step  1. __main__.MultiplyModel...
2026-04-29 12:30:36,385 :: INFO :: done.
result: {'final': 20}

After run(), the post-run workflow and state are stashed on the runner as rerunWorkflow / rerunState so you can inspect what happened.

wf = runner.rerunWorkflow
print( "stepsRan:    ", wf.stepsRan )       # [0, 1]
print( "stepsSkipped:", wf.stepsSkipped )   # []

print( "state['total']:", runner.rerunState.extract('total') )
print( "state['final']:", runner.rerunState.extract('final') )

print( "cached input for step 0:",  wf.getModelInput(0) )
print( "cached output for step 0:", wf.getModelOutput(0) )
stepsRan:     [0, 1]
stepsSkipped: []
state['total']: 5
state['final']: 20
cached input for step 0: {'a': 2, 'b': 3}
cached output for step 0: {'sum': 5}

rerun() replays the workflow against the cached state. Because the inputs are unchanged, every step is skipped — the cache makes reruns cheap and idempotent on immutable state. Note that stepsRan and stepsSkipped accumulate across runs (they are not reset).

result2 = runner.rerun()
print( "rerun result:", result2 )                          # {'final': 20}
print( "stepsRan:    ", runner.rerunWorkflow.stepsRan )     # [0, 1]
print( "stepsSkipped:", runner.rerunWorkflow.stepsSkipped ) # [0, 1]
2026-04-29 12:30:36,386 :: INFO :: Skipped Step  0. __main__.AddModel.
2026-04-29 12:30:36,386 :: INFO :: Skipped Step  1. __main__.MultiplyModel.
rerun result: {'final': 20}
stepsRan:     [0, 1]
stepsSkipped: [0, 1]

You can override the caching flags on rerun() to force a full re-execution. Pairing useExtracts=False with populateExtracts=False runs every step from scratch without updating the cache. Because state is immutable by default, this works here only because the workflow’s idempotent post rule avoids re-writing unchanged output names on the skipped path; a fresh run on an already-populated immutable state will fail on write. In production, pass a fresh state factory (or a StateWithGlueDb with mutable=True) when forcing re-execution.

String refs for production. In real applications, pass workflow and state as python://module.path reference strings. The Runner resolves them lazily via pyswark.core.io.api.read(), which re-imports the target module on every run() — each call gets a fresh instance, and the runner itself serializes cleanly:

from pyswark.workflow.runner import Runner

runner = Runner(
    workflow = 'python://mypkg.apps.<name>.workflows.etl.WORKFLOW',
    state    = 'python://mypkg.apps.<name>.states.season.SEASON_2023',
)
result = runner.run()

Total running time of the script: (0 minutes 0.570 seconds)

Gallery generated by Sphinx-Gallery