Getting Started

Installation

Install pyswark using conda:

conda install -c pyt3r pyswark

Example Capabilities

1. Serialization with Type Preservation

A core tenet of pyswark is type-preserving serialization:

from pydantic import BaseModel
from pyswark.lib.pydantic import ser_des

class Character(BaseModel):
    name: str
    role: str

mulder = Character(name='Fox Mulder', role='FBI Agent')

# Serialize with embedded type information
json_data = ser_des.toJson(mulder, indent=2)
print(json_data)
# {
#   "model": "__main__.Character",
#   "contents": {"name": "Fox Mulder", "role": "FBI Agent"}
# }

# Deserialize - no type hint needed!
restored = ser_des.fromJson(json_data)
assert type(restored) == Character

2. Unified I/O

Read and write data from any URI:

from pyswark.core.io import api

# Read from local file
df = api.read('file:./data.csv')

# Write to file
api.write(df, 'file:./output.csv')

3. Data Catalogs with GlueDb

Create and use data catalogs:

from pyswark.core.io import api
from pyswark.core.models import collection

# Connect to existing catalog
db = api.read('pyswark://data/sma-example.gluedb')
print(db.getNames())  # ['JPM', 'BAC', 'kwargs']

# Extract data by name
jpm_data = db.extract('JPM')
kwargs   = db.extract('kwargs')


# Create a new catalog
from pyswark.gluedb import db
new_db = db.Db()
new_db.post('file:./ohlc-jpm.csv.gz', name='JPM')
new_db.post(collection.Dict({'window': 60}), name='kwargs')

# Extract from a new catalog
new_jpm_data = new_db.extract('JPM')
new_kwargs   = new_db.extract('kwargs')

# persist the new catalog
from pyswark.core.io import api
api.write( new_db, 'file:./new.gluedb' )

4. Persistence with Db.connect

Persist a GlueDb catalog to a .gluedb file with Db.connect(). Changes made inside the with block are written to the file on successful exit when persist=True:

from pyswark.gluedb.db import Db
from pyswark.core.models import collection
from pyswark.core.io import api

# Create an initial catalog and save it
db = Db()
db.post('pyswark://data/sma-example.gluedb', name='JPM')
api.write(db, 'file:./catalog.gluedb')

# Re-open with persist=True — auto-saves on exit
with Db.connect('file:./catalog.gluedb', persist=True) as db:
    db.post(collection.Dict({'window': 60}), name='kwargs')

# Changes are persisted
db = Db.connect('file:./catalog.gluedb')
print(db.getNames())  # ['JPM', 'kwargs']

5. Time Series with DatetimeList and TsVector

Work with validated time series data:

from pyswark.core.models.datetime import DatetimeList
from pyswark.ts.tsvector import TsVector

# Create a DatetimeList from strings
dates = DatetimeList(['2024-01-01', '2024-01-02', '2024-01-03'])
print(dates.dt)      # array of datetime64

# Resample to different time resolution
monthly = dates.resample('M')  # Convert to monthly resolution (lags to next month)
print( monthly.dt ) # array(['2024-02', '2024-02', '2024-02'], dtype='datetime64[M]')

# Create a time series vector
ts = TsVector(
   index=['2024-01-01', '2024-01-02', '2024-01-03'],
   values=[100.0, 101.5, 99.8]
)
print(ts.dt)          # datetime64 array
print(ts.values.vector)  # numpy array of values

# DatetimeList efficiently stores as base + deltas
dates = DatetimeList([2020, 2021, 2022, 2023])
print(dates.basedt)   # 2020
print(dates.deltas)   # [0, 1, 2, 3]

# Fully serializable
from pyswark.lib.pydantic import ser_des
json_str = ser_des.toJson(ts)
restored = ser_des.fromJson(json_str)

6. Workflow Orchestration

Orchestrate multi-step computations with automatic caching:

from pyswark.workflow.workflow import Workflow
from pyswark.workflow.step import Step
from pyswark.workflow.state import State
from pyswark.lib.pydantic import base

# Define models with run() methods
class AddModel(base.BaseModel):
   a: int
   b: int
   def run(self):
      return {'sum': self.a + self.b}

class MultiplyModel(base.BaseModel):
   sum: int
   factor: int
   def run(self):
      return {'product': self.sum * self.factor}

# Create workflow steps
step0 = Step(
   model=AddModel,
   inputs={'x': 'a', 'y': 'b'},      # state → model inputs
   outputs={'sum': 'result'}          # model outputs → state
)

step1 = Step(
   model=MultiplyModel,
   inputs={'result': 'sum', 'z': 'factor'},
   outputs={'product': 'final'}
)

workflow = Workflow(steps=[step0, step1])

# Initialize state with input data
inputData = {'x': 2, 'y': 3, 'z': 4}
state = State(backend=inputData)

# Run workflow - steps execute in order
result = workflow.run(state)
print(result)  # {'final': 20}  # (2+3)*4

# Workflows cache inputs/outputs - rerun skips unchanged steps
print(workflow.stepsRan)      # [0, 1] - both ran
print(workflow.stepsSkipped)  # []

# Second run with same inputs - steps are skipped!
# - workflow cached its previous results and sees no need to recompute
# - stepsRan / stepsSkipped accumulate across runs; they are not reset
state2 = State(backend=inputData)
workflow.run(state2)
print(workflow.stepsRan)      # [0, 1] - still from first run
print(workflow.stepsSkipped)  # [0, 1] - both skipped on the second run!

7. Workflow Runner

The Runner is the top-level entry point for running a workflow. It pairs a Workflow and a State — given directly as instances or as python://module.path reference strings — runs them, and stashes the post-run artifacts for inspection and cheap rerun():

from pyswark.workflow.runner import Runner

# 1) Pass instances directly
runner = Runner(workflow=workflow, state=State(backend=inputData))
result = runner.run()
print(result)                              # {'final': 20}
print(runner.rerunWorkflow.stepsRan)       # [0, 1]

# Rerun against the cached state — all steps are skipped (idempotent)
runner.rerun()
print(runner.rerunWorkflow.stepsSkipped)   # [0, 1]

# 2) Or pass python:// refs — preferred for production entry points.
#    Each run() re-imports the refs, giving a fresh workflow and state.
runner = Runner(
    workflow = 'python://mymodule.workflows.etl.WORKFLOW',
    state    = 'python://mymodule.states.season.SEASON_2023',
)
runner.run()

See the Workflow Runner gallery example and workflow - Orchestration for full details.