workflow - Orchestration

The workflow module provides orchestration of multi-step computations with caching and reproducibility. This is Layer 2 of the architecture.

Key Concepts

  • Workflow: A sequence of Steps with input/output caching

  • Step: A single unit of work with defined inputs and outputs

  • State: Shared data store that persists between steps

Workflow Class

Workflow Orchestration

This module provides the Workflow class for orchestrating multi-step computations with caching and reproducibility. Workflows manage state between steps and can skip steps when inputs haven’t changed.

Key Concepts

  • Workflow: A sequence of Steps with input/output caching

  • Step: A single unit of work with defined inputs and outputs

  • State: Shared data store that persists between steps

Example

>>> from pyswark.workflow.workflow import Workflow
>>> from pyswark.workflow.step import Step
>>> from pyswark.workflow.state import State
>>>
>>> workflow = Workflow(steps=[
...     Step(model='mymodule.PreprocessModel',
...          inputs={'raw_data': 'data'},
...          outputs={'processed': 'clean_data'}),
...     Step(model='mymodule.AnalysisModel',
...          inputs={'clean_data': 'data'},
...          outputs={'result': 'analysis'})
... ])
>>>
>>> state = State()
>>> state.post(raw_dataframe, name='data')
>>> result = workflow.run(state)
class pyswark.workflow.workflow.Extracts(*, data: dict[int, dict] = <factory>)

Bases: BaseModel

Container for cached model inputs or outputs.

Used internally by Workflow to store and compare inputs/outputs for skip logic.

add(i, extract)
data: dict[int, dict]
equal(i: int, other)
get(i)
model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class pyswark.workflow.workflow.Workflow(*, steps: list[pyswark.workflow.step.Step], modelInputs: ~pyswark.workflow.workflow.Extracts = <factory>, modelOutputs: ~pyswark.workflow.workflow.Extracts = <factory>, useExtracts: bool = True, populateExtracts: bool = True, stepsSkipped: list[int] = <factory>, stepsRan: list[int] = <factory>)

Bases: BaseModel

Orchestrates a sequence of computational steps with caching.

A Workflow manages the execution of multiple Steps, passing data between them via a shared State object. It supports caching of inputs/outputs to skip steps when inputs haven’t changed.

Parameters:
  • steps (list[Step]) – The sequence of steps to execute.

  • useExtracts (bool, default=True) – Whether to use cached inputs/outputs for skip logic.

  • populateExtracts (bool, default=True) – Whether to store inputs/outputs for future runs.

stepsSkipped

Indices of steps that were skipped (cached).

Type:

list[int]

stepsRan

Indices of steps that were executed.

Type:

list[int]

Example

>>> workflow = Workflow(steps=[step1, step2, step3])
>>> state = State()
>>> state.post('input_data', my_data)
>>> result = workflow.run(state)
addStep(step: Step)

Add a step to the workflow.

Parameters:

step (Step or dict) – A Step instance or dictionary with step configuration.

getModel(i: int)
getModelInput(i: int)
getModelOutput(i: int)
modelInputs: Extracts
modelOutputs: Extracts
model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

populateExtracts: bool
run(state: State)

Execute all steps in the workflow.

Steps are executed in order, with each step’s outputs posted to the shared State. Steps may be skipped if their inputs match cached values from previous runs.

Parameters:

state (State) – The shared state object containing input data.

Returns:

The output from the final step.

Return type:

Any

steps: list[pyswark.workflow.step.Step]
stepsRan: list[int]
stepsSkipped: list[int]
useExtracts: bool

Step Class

Workflow Step

This module defines the Step class, the fundamental unit of work in a workflow. Each Step specifies a model to run and mappings between workflow state and model inputs/outputs.

class pyswark.workflow.step.Step(*, model: str, inputs: Dict[str, str], outputs: Dict[str, str])

Bases: BaseModel

A single unit of work in a Workflow.

A Step defines:

  • A model (class) to instantiate and run

  • Input mappings: state names → model input names

  • Output mappings: model output names → state names

Parameters:
  • model (str) – Fully-qualified Python path to the model class (e.g., "mymodule.MyModel").

  • inputs (dict[str, str]) – Mapping from state variable names to model input names. {state_name: model_input_name}.

  • outputs (dict[str, str]) – Mapping from model output names to state variable names. {model_output_name: state_name}.

Example

>>> step = Step(
...     model='mymodule.PreprocessModel',
...     inputs={'raw_data': 'data'},      # state 'raw_data' → model 'data'
...     outputs={'processed': 'clean'}    # model 'processed' → state 'clean'
... )
extractModelFromModelInput(inputData)
extractModelInputFromStateInput(stateInput)

extracts inputs from state

extractStateInputFromState(state)

extracts inputs from state

extractStateOutputFromModelOutput(modelOutput)

extracts outputs from model

getModelInputNames()
getModelOutputNames()
getStateInputNames()
getStateOutputNames()
inputs: Dict[str, str]
model: str
model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

outputs: Dict[str, str]
postStateOutputToState(state, outputData)

loads output to state

run(state)

Execute this step using data from state.

Extracts inputs from state, runs the model, and posts outputs back to state.

Parameters:

state (State) – The shared state object.

Returns:

The step’s outputs (also posted to state).

Return type:

dict

Usage Examples

Basic Workflow

from pyswark.workflow.workflow import Workflow
from pyswark.workflow.step import Step
from pyswark.workflow.state import State

# Define steps
workflow = Workflow(steps=[
    Step(
        model='mymodule.PreprocessModel',
        inputs={'raw_data': 'data'},      # state 'raw_data' → model input 'data'
        outputs={'processed': 'clean'}    # model output 'processed' → state 'clean'
    ),
    Step(
        model='mymodule.AnalysisModel',
        inputs={'clean': 'data'},
        outputs={'result': 'analysis'}
    ),
])

# Initialize state with input data
state = State()
state.post('raw_data', my_dataframe)

# Run the workflow
result = workflow.run(state)

Workflow with Caching

Workflows automatically cache inputs and outputs. If you run the same workflow with the same inputs, steps will be skipped:

# First run - all steps execute
result1 = workflow.run(state)
print(workflow.stepsRan)      # [0, 1]
print(workflow.stepsSkipped)  # []

# Second run - steps are skipped if inputs unchanged
result2 = workflow.run(state)
print(workflow.stepsRan)      # []
print(workflow.stepsSkipped)  # [0, 1]

Creating a Model for Workflow Steps

Models used in workflows should have a run() method that returns a dict:

from pyswark.lib.pydantic import base
from pyswark.core.models import xputs

class PreprocessModel(base.BaseModel):
    inputs: PreprocessInputs

    def run(self):
        # Computation here
        processed = self.inputs.data.dropna()
        return {'processed': processed}

class PreprocessInputs(xputs.BaseInputs):
    data: Any

Inspecting Workflow State

# Get cached inputs/outputs for a step
model_input = workflow.getModelInput(0)
model_output = workflow.getModelOutput(0)

# Get the instantiated model
model = workflow.getModel(0)