Getting Started =============== Installation ------------ Install pyswark using conda: .. code-block:: bash conda install -c pyt3r pyswark Example Capabilities -------------------- 1. Serialization with Type Preservation ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ A core tenet of pyswark is type-preserving serialization: .. code-block:: python from pydantic import BaseModel from pyswark.lib.pydantic import ser_des class Character(BaseModel): name: str role: str mulder = Character(name='Fox Mulder', role='FBI Agent') # Serialize with embedded type information json_data = ser_des.toJson(mulder, indent=2) print(json_data) # { # "model": "__main__.Character", # "contents": {"name": "Fox Mulder", "role": "FBI Agent"} # } # Deserialize - no type hint needed! restored = ser_des.fromJson(json_data) assert type(restored) == Character 2. Unified I/O ^^^^^^^^^^^^^^ Read and write data from any URI: .. code-block:: python from pyswark.core.io import api # Read from local file df = api.read('file:./data.csv') # Write to file api.write(df, 'file:./output.csv') 3. Data Catalogs with GlueDb ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Create and use data catalogs: .. code-block:: python from pyswark.core.io import api from pyswark.core.models import collection # Connect to existing catalog db = api.read('pyswark://data/sma-example.gluedb') print(db.getNames()) # ['JPM', 'BAC', 'kwargs'] # Extract data by name jpm_data = db.extract('JPM') kwargs = db.extract('kwargs') # Create a new catalog from pyswark.gluedb import db new_db = db.Db() new_db.post('file:./ohlc-jpm.csv.gz', name='JPM') new_db.post(collection.Dict({'window': 60}), name='kwargs') # Extract from a new catalog new_jpm_data = new_db.extract('JPM') new_kwargs = new_db.extract('kwargs') # persist the new catalog from pyswark.core.io import api api.write( new_db, 'file:./new.gluedb' ) 4. Persistence with Db.connect ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Persist a GlueDb catalog to a ``.gluedb`` file with ``Db.connect()``. Changes made inside the ``with`` block are written to the file on successful exit when ``persist=True``: .. code-block:: python from pyswark.gluedb.db import Db from pyswark.core.models import collection from pyswark.core.io import api # Create an initial catalog and save it db = Db() db.post('pyswark://data/sma-example.gluedb', name='JPM') api.write(db, 'file:./catalog.gluedb') # Re-open with persist=True — auto-saves on exit with Db.connect('file:./catalog.gluedb', persist=True) as db: db.post(collection.Dict({'window': 60}), name='kwargs') # Changes are persisted db = Db.connect('file:./catalog.gluedb') print(db.getNames()) # ['JPM', 'kwargs'] 5. Time Series with DatetimeList and TsVector ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Work with validated time series data: .. code-block:: python from pyswark.core.models.datetime import DatetimeList from pyswark.ts.tsvector import TsVector # Create a DatetimeList from strings dates = DatetimeList(['2024-01-01', '2024-01-02', '2024-01-03']) print(dates.dt) # array of datetime64 # Resample to different time resolution monthly = dates.resample('M') # Convert to monthly resolution (lags to next month) print( monthly.dt ) # array(['2024-02', '2024-02', '2024-02'], dtype='datetime64[M]') # Create a time series vector ts = TsVector( index=['2024-01-01', '2024-01-02', '2024-01-03'], values=[100.0, 101.5, 99.8] ) print(ts.dt) # datetime64 array print(ts.values.vector) # numpy array of values # DatetimeList efficiently stores as base + deltas dates = DatetimeList([2020, 2021, 2022, 2023]) print(dates.basedt) # 2020 print(dates.deltas) # [0, 1, 2, 3] # Fully serializable from pyswark.lib.pydantic import ser_des json_str = ser_des.toJson(ts) restored = ser_des.fromJson(json_str) 6. Workflow Orchestration ^^^^^^^^^^^^^^^^^^^^^^^^^ Orchestrate multi-step computations with automatic caching: .. code-block:: python from pyswark.workflow.workflow import Workflow from pyswark.workflow.step import Step from pyswark.workflow.state import State from pyswark.lib.pydantic import base # Define models with run() methods class AddModel(base.BaseModel): a: int b: int def run(self): return {'sum': self.a + self.b} class MultiplyModel(base.BaseModel): sum: int factor: int def run(self): return {'product': self.sum * self.factor} # Create workflow steps step0 = Step( model=AddModel, inputs={'x': 'a', 'y': 'b'}, # state → model inputs outputs={'sum': 'result'} # model outputs → state ) step1 = Step( model=MultiplyModel, inputs={'result': 'sum', 'z': 'factor'}, outputs={'product': 'final'} ) workflow = Workflow(steps=[step0, step1]) # Initialize state with input data inputData = {'x': 2, 'y': 3, 'z': 4} state = State(backend=inputData) # Run workflow - steps execute in order result = workflow.run(state) print(result) # {'final': 20} # (2+3)*4 # Workflows cache inputs/outputs - rerun skips unchanged steps print(workflow.stepsRan) # [0, 1] - both ran print(workflow.stepsSkipped) # [] # Second run with same inputs - steps are skipped! # - workflow cached its previous results and sees no need to recompute # - stepsRan / stepsSkipped accumulate across runs; they are not reset state2 = State(backend=inputData) workflow.run(state2) print(workflow.stepsRan) # [0, 1] - still from first run print(workflow.stepsSkipped) # [0, 1] - both skipped on the second run! 7. Workflow Runner ^^^^^^^^^^^^^^^^^^ The :class:`~pyswark.workflow.runner.Runner` is the top-level entry point for running a workflow. It pairs a ``Workflow`` and a ``State`` — given directly as instances or as ``python://module.path`` reference strings — runs them, and stashes the post-run artifacts for inspection and cheap ``rerun()``: .. code-block:: python from pyswark.workflow.runner import Runner # 1) Pass instances directly runner = Runner(workflow=workflow, state=State(backend=inputData)) result = runner.run() print(result) # {'final': 20} print(runner.rerunWorkflow.stepsRan) # [0, 1] # Rerun against the cached state — all steps are skipped (idempotent) runner.rerun() print(runner.rerunWorkflow.stepsSkipped) # [0, 1] # 2) Or pass python:// refs — preferred for production entry points. # Each run() re-imports the refs, giving a fresh workflow and state. runner = Runner( workflow = 'python://mymodule.workflows.etl.WORKFLOW', state = 'python://mymodule.states.season.SEASON_2023', ) runner.run() See the *Workflow Runner* gallery example and :doc:`/api/workflow` for full details.