workflow - Orchestration
The workflow module provides orchestration of multi-step computations
with caching and reproducibility. This is Layer 2 of the architecture.
Key Concepts
Workflow: A sequence of Steps with input/output caching
Step: A single unit of work with defined inputs and outputs
State: Shared data store that persists between steps
Workflow Class
Workflow Orchestration
This module provides the Workflow class for orchestrating multi-step computations with caching and reproducibility. Workflows manage state between steps and can skip steps when inputs haven’t changed.
Key Concepts
Workflow: A sequence of Steps with input/output caching
Step: A single unit of work with defined inputs and outputs
State: Shared data store that persists between steps
Example
>>> from pyswark.workflow.workflow import Workflow
>>> from pyswark.workflow.step import Step
>>> from pyswark.workflow.state import State
>>>
>>> workflow = Workflow(steps=[
... Step(model='mymodule.PreprocessModel',
... inputs={'raw_data': 'data'},
... outputs={'processed': 'clean_data'}),
... Step(model='mymodule.AnalysisModel',
... inputs={'clean_data': 'data'},
... outputs={'result': 'analysis'})
... ])
>>>
>>> state = State()
>>> state.post(raw_dataframe, name='data')
>>> result = workflow.run(state)
- class pyswark.workflow.workflow.Extracts(*, data: dict[int, dict] = <factory>)
Bases:
BaseModelContainer for cached model inputs or outputs.
Used internally by Workflow to store and compare inputs/outputs for skip logic.
- add(i, extract)
- data: dict[int, dict]
- equal(i: int, other)
- get(i)
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class pyswark.workflow.workflow.Workflow(*, steps: list[pyswark.workflow.step.Step], modelInputs: ~pyswark.workflow.workflow.Extracts = <factory>, modelOutputs: ~pyswark.workflow.workflow.Extracts = <factory>, useExtracts: bool = True, populateExtracts: bool = True, stepsSkipped: list[int] = <factory>, stepsRan: list[int] = <factory>)
Bases:
BaseModelOrchestrates a sequence of computational steps with caching.
A Workflow manages the execution of multiple Steps, passing data between them via a shared State object. It supports caching of inputs/outputs to skip steps when inputs haven’t changed.
- Parameters:
steps (list[Step]) – The sequence of steps to execute.
useExtracts (bool, default=True) – Whether to use cached inputs/outputs for skip logic.
populateExtracts (bool, default=True) – Whether to store inputs/outputs for future runs.
- stepsSkipped
Indices of steps that were skipped (cached).
- Type:
list[int]
- stepsRan
Indices of steps that were executed.
- Type:
list[int]
Example
>>> workflow = Workflow(steps=[step1, step2, step3]) >>> state = State() >>> state.post('input_data', my_data) >>> result = workflow.run(state)
- addStep(step: Step)
Add a step to the workflow.
- Parameters:
step (Step or dict) – A Step instance or dictionary with step configuration.
- getModel(i: int)
- getModelInput(i: int)
- getModelOutput(i: int)
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- populateExtracts: bool
- run(state: State)
Execute all steps in the workflow.
Steps are executed in order, with each step’s outputs posted to the shared State. Steps may be skipped if their inputs match cached values from previous runs.
- Parameters:
state (State) – The shared state object containing input data.
- Returns:
The output from the final step.
- Return type:
Any
- steps: list[pyswark.workflow.step.Step]
- stepsRan: list[int]
- stepsSkipped: list[int]
- useExtracts: bool
Step Class
Workflow Step
This module defines the Step class, the fundamental unit of work in a workflow. Each Step specifies a model to run and mappings between workflow state and model inputs/outputs.
- class pyswark.workflow.step.Step(*, model: str, inputs: Dict[str, str], outputs: Dict[str, str])
Bases:
BaseModelA single unit of work in a Workflow.
A Step defines:
A model (class) to instantiate and run
Input mappings: state names → model input names
Output mappings: model output names → state names
- Parameters:
model (str) – Fully-qualified Python path to the model class (e.g.,
"mymodule.MyModel").inputs (dict[str, str]) – Mapping from state variable names to model input names.
{state_name: model_input_name}.outputs (dict[str, str]) – Mapping from model output names to state variable names.
{model_output_name: state_name}.
Example
>>> step = Step( ... model='mymodule.PreprocessModel', ... inputs={'raw_data': 'data'}, # state 'raw_data' → model 'data' ... outputs={'processed': 'clean'} # model 'processed' → state 'clean' ... )
- extractModelFromModelInput(inputData)
- extractModelInputFromStateInput(stateInput)
extracts inputs from state
- extractStateInputFromState(state)
extracts inputs from state
- extractStateOutputFromModelOutput(modelOutput)
extracts outputs from model
- getModelInputNames()
- getModelOutputNames()
- getStateInputNames()
- getStateOutputNames()
- inputs: Dict[str, str]
- model: str
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- outputs: Dict[str, str]
- postStateOutputToState(state, outputData)
loads output to state
- run(state)
Execute this step using data from state.
Extracts inputs from state, runs the model, and posts outputs back to state.
- Parameters:
state (State) – The shared state object.
- Returns:
The step’s outputs (also posted to state).
- Return type:
dict
Usage Examples
Basic Workflow
from pyswark.workflow.workflow import Workflow
from pyswark.workflow.step import Step
from pyswark.workflow.state import State
# Define steps
workflow = Workflow(steps=[
Step(
model='mymodule.PreprocessModel',
inputs={'raw_data': 'data'}, # state 'raw_data' → model input 'data'
outputs={'processed': 'clean'} # model output 'processed' → state 'clean'
),
Step(
model='mymodule.AnalysisModel',
inputs={'clean': 'data'},
outputs={'result': 'analysis'}
),
])
# Initialize state with input data
state = State()
state.post('raw_data', my_dataframe)
# Run the workflow
result = workflow.run(state)
Workflow with Caching
Workflows automatically cache inputs and outputs. If you run the same workflow with the same inputs, steps will be skipped:
# First run - all steps execute
result1 = workflow.run(state)
print(workflow.stepsRan) # [0, 1]
print(workflow.stepsSkipped) # []
# Second run - steps are skipped if inputs unchanged
result2 = workflow.run(state)
print(workflow.stepsRan) # []
print(workflow.stepsSkipped) # [0, 1]
Creating a Model for Workflow Steps
Models used in workflows should have a run() method that returns a dict:
from pyswark.lib.pydantic import base
from pyswark.core.models import xputs
class PreprocessModel(base.BaseModel):
inputs: PreprocessInputs
def run(self):
# Computation here
processed = self.inputs.data.dropna()
return {'processed': processed}
class PreprocessInputs(xputs.BaseInputs):
data: Any
Inspecting Workflow State
# Get cached inputs/outputs for a step
model_input = workflow.getModelInput(0)
model_output = workflow.getModelOutput(0)
# Get the instantiated model
model = workflow.getModel(0)