Getting Started
Installation
Install pyswark using conda:
conda install -c pyt3r pyswark
Example Capabilities
1. Serialization with Type Preservation
A core tenet of pyswark is type-preserving serialization:
from pydantic import BaseModel
from pyswark.lib.pydantic import ser_des
class Character(BaseModel):
name: str
role: str
mulder = Character(name='Fox Mulder', role='FBI Agent')
# Serialize with embedded type information
json_data = ser_des.toJson(mulder, indent=2)
print(json_data)
# {
# "model": "__main__.Character",
# "contents": {"name": "Fox Mulder", "role": "FBI Agent"}
# }
# Deserialize - no type hint needed!
restored = ser_des.fromJson(json_data)
assert type(restored) == Character
2. Unified I/O
Read and write data from any URI:
from pyswark.core.io import api
# Read from local file
df = api.read('file:./data.csv')
# Write to file
api.write(df, 'file:./output.csv')
3. Data Catalogs with GlueDb
Create and use data catalogs:
from pyswark.core.io import api
from pyswark.core.models import collection
# Connect to existing catalog
db = api.read('pyswark://data/sma-example.gluedb')
print(db.getNames()) # ['JPM', 'BAC', 'kwargs']
# Extract data by name
jpm_data = db.extract('JPM')
kwargs = db.extract('kwargs')
# Create a new catalog
from pyswark.gluedb import db
new_db = db.Db()
new_db.post('file:./ohlc-jpm.csv.gz', name='JPM')
new_db.post(collection.Dict({'window': 60}), name='kwargs')
# Extract from a new catalog
new_jpm_data = new_db.extract('JPM')
new_kwargs = new_db.extract('kwargs')
# persist the new catalog
from pyswark.core.io import api
api.write( new_db, 'file:./new.gluedb' )
4. Persistence with Db.connect
Persist a GlueDb catalog to a .gluedb file with Db.connect().
Changes made inside the with block are written to the file on successful exit
when persist=True:
from pyswark.gluedb.db import Db
from pyswark.core.models import collection
from pyswark.core.io import api
# Create an initial catalog and save it
db = Db()
db.post('pyswark://data/sma-example.gluedb', name='JPM')
api.write(db, 'file:./catalog.gluedb')
# Re-open with persist=True — auto-saves on exit
with Db.connect('file:./catalog.gluedb', persist=True) as db:
db.post(collection.Dict({'window': 60}), name='kwargs')
# Changes are persisted
db = Db.connect('file:./catalog.gluedb')
print(db.getNames()) # ['JPM', 'kwargs']
5. Time Series with DatetimeList and TsVector
Work with validated time series data:
from pyswark.core.models.datetime import DatetimeList
from pyswark.ts.tsvector import TsVector
# Create a DatetimeList from strings
dates = DatetimeList(['2024-01-01', '2024-01-02', '2024-01-03'])
print(dates.dt) # array of datetime64
# Resample to different time resolution
monthly = dates.resample('M') # Convert to monthly resolution (lags to next month)
print( monthly.dt ) # array(['2024-02', '2024-02', '2024-02'], dtype='datetime64[M]')
# Create a time series vector
ts = TsVector(
index=['2024-01-01', '2024-01-02', '2024-01-03'],
values=[100.0, 101.5, 99.8]
)
print(ts.dt) # datetime64 array
print(ts.values.vector) # numpy array of values
# DatetimeList efficiently stores as base + deltas
dates = DatetimeList([2020, 2021, 2022, 2023])
print(dates.basedt) # 2020
print(dates.deltas) # [0, 1, 2, 3]
# Fully serializable
from pyswark.lib.pydantic import ser_des
json_str = ser_des.toJson(ts)
restored = ser_des.fromJson(json_str)
6. Workflow Orchestration
Orchestrate multi-step computations with automatic caching:
from pyswark.workflow.workflow import Workflow
from pyswark.workflow.step import Step
from pyswark.workflow.state import State
from pyswark.lib.pydantic import base
# Define models with run() methods
class AddModel(base.BaseModel):
a: int
b: int
def run(self):
return {'sum': self.a + self.b}
class MultiplyModel(base.BaseModel):
sum: int
factor: int
def run(self):
return {'product': self.sum * self.factor}
# Create workflow steps
step0 = Step(
model=AddModel,
inputs={'x': 'a', 'y': 'b'}, # state → model inputs
outputs={'sum': 'result'} # model outputs → state
)
step1 = Step(
model=MultiplyModel,
inputs={'result': 'sum', 'z': 'factor'},
outputs={'product': 'final'}
)
workflow = Workflow(steps=[step0, step1])
# Initialize state with input data
inputData = {'x': 2, 'y': 3, 'z': 4}
state = State(backend=inputData)
# Run workflow - steps execute in order
result = workflow.run(state)
print(result) # {'final': 20} # (2+3)*4
# Workflows cache inputs/outputs - rerun skips unchanged steps
print(workflow.stepsRan) # [0, 1] - both ran
print(workflow.stepsSkipped) # []
# Second run with same inputs - steps are skipped!
# - workflow cached its previous results and sees no need to recompute
# - stepsRan / stepsSkipped accumulate across runs; they are not reset
state2 = State(backend=inputData)
workflow.run(state2)
print(workflow.stepsRan) # [0, 1] - still from first run
print(workflow.stepsSkipped) # [0, 1] - both skipped on the second run!
7. Workflow Runner
The Runner is the top-level entry point for
running a workflow. It pairs a Workflow and a State — given directly
as instances or as python://module.path reference strings — runs them,
and stashes the post-run artifacts for inspection and cheap rerun():
from pyswark.workflow.runner import Runner
# 1) Pass instances directly
runner = Runner(workflow=workflow, state=State(backend=inputData))
result = runner.run()
print(result) # {'final': 20}
print(runner.rerunWorkflow.stepsRan) # [0, 1]
# Rerun against the cached state — all steps are skipped (idempotent)
runner.rerun()
print(runner.rerunWorkflow.stepsSkipped) # [0, 1]
# 2) Or pass python:// refs — preferred for production entry points.
# Each run() re-imports the refs, giving a fresh workflow and state.
runner = Runner(
workflow = 'python://mymodule.workflows.etl.WORKFLOW',
state = 'python://mymodule.states.season.SEASON_2023',
)
runner.run()
See the Workflow Runner gallery example and workflow - Orchestration for full details.