gluedb - Data Catalogs

The gluedb module provides data catalog functionality for managing named collections of data sources. This is Layer 2 of the architecture.

Key Concepts

  • GlueDb: A database of named records pointing to data sources

  • GlueHub: A hub containing multiple GlueDb instances

  • Record: A named entry with URI and extraction settings

GlueDb Class

class pyswark.gluedb.db.Db(*, records: list[pyswark.core.models.record.Record] = <factory>, url: str = '', datahandler: str = 'pjson', engine_url: str = 'sqlite:///:memory:', persist: bool = False)

Bases: Base

GlueDb - A database of named records pointing to data sources.

A GlueDb is a collection of named records, where each record can point to a data source via URI or contain inline data. Records can be extracted, loaded, and managed through a simple API.

Key Features

  • Store records by name with URI or inline data

  • Extract data from records automatically

  • Merge multiple databases together

  • Serialize to/from .gluedb files

Example

>>> from pyswark.gluedb import db
>>>
>>> # Create a new database
>>> db = db.Db()
>>>
>>> # Post records
>>> db.post('prices', 'file:./prices.csv')
>>> db.post('config', {'window': 60})
>>>
>>> # Extract data
>>> prices = db.extract('prices')
>>>
>>> # List all names
>>> print(db.getNames())  # ['prices', 'config']
__contains__(name)
delete(name)

Delete a record by name (alias for deleteByName).

Parameters:

name (str) – The name of the record to delete.

Returns:

True if the record was deleted, False if it didn’t exist.

Return type:

bool

extract(name)

Extract data from a record by name.

This method retrieves the record and extracts its data. If the record points to a URI, the data will be loaded from that URI. If the record contains inline data, it will be returned directly.

Parameters:

name (str) – The name of the record to extract.

Returns:

The extracted data (type depends on the record’s data source).

Return type:

Any

Example

>>> db.post('prices', 'file:./prices.csv')
>>> prices_df = db.extract('prices')
get(name)

Get a record by name (alias for getByName).

Parameters:

name (str) – The name of the record to retrieve.

Returns:

The record with the given name, or None if not found.

Return type:

Record

getNames()

Get all record names in the database.

Returns:

List of all record names.

Return type:

list[str]

merge(other)

Merge records from another database into this one.

Parameters:

other (Db) – Another GlueDb instance to merge from.

Example

>>> db1.merge(db2)  # Adds all records from db2 to db1

GlueHub Class

class pyswark.gluedb.hub.Hub(*, records: list[pyswark.core.models.record.Record] = <factory>, url: str = '', datahandler: str = 'pjson', engine_url: str = 'sqlite:///:memory:', persist: bool = False)

Bases: Db

A hub containing multiple GlueDb instances.

Hub extends Db but uses Contents model to store references to other GlueDb instances. This allows organizing multiple databases together and consolidating them when needed.

Example

>>> from pyswark.gluedb import hub
>>> hub = hub.Hub()
>>> hub.post('market_data', market_db)
>>> hub.post('config', 'file:./config.gluedb')
>>>
>>> # Extract a database
>>> market_db = hub.extract('market_data')
>>>
>>> # Consolidate all databases
>>> consolidated = hub.toDb()
acquireFromDb(dbName, name)

Acquire the low-level handler for a record in the underlying GlueDb.

deleteFromDb(dbName, name, overwrite=True)

Delete an entry from the underlying GlueDb and persist the change.

extract(name)

Extract a database from the hub by name.

This method retrieves the record and extracts its data. If the record points to a URI, the data will be loaded from that URI. If the record contains inline data, it will be returned directly.

extractFromDb(dbName, name)

Extract data from a record in the underlying GlueDb.

getFromDb(dbName, name)

Get a record from the underlying GlueDb without persisting anything.

load(data, name)

Load a database into the hub by name.

This method stores the database in the hub under the given name.

mergeToDb(otherDb, dbName, overwrite=True)

Merge another GlueDb into a database in the hub and persist it.

Parameters:
  • otherDb (Db) – Another GlueDb instance whose records will be merged into the target database.

  • dbName (str) – Name of the database in the hub to merge into.

  • overwrite (bool, optional) – If True (default), allow overwriting the target URI when persisting.

Returns:

The merged target GlueDb.

Return type:

Db

postToDb(obj, dbName, name=None, overwrite=True)

Post an entry to the underlying GlueDb and overwrite it to the URI it points to.

Uses .acquire() and .extract() to get the Db for the given dbName, posts the object to that Db, then persists the modified Db back to its URI (e.g. the file the gluedb object points to).

Parameters:
  • obj (Any) – Object to post (e.g. dict, BaseModel, or URI string).

  • dbName (str) – Name of the database in the hub to post to.

  • name (str, optional) – Record name for the new entry. If None, taken from obj.name or obj[‘name’] when possible.

  • overwrite (bool, optional) – If True (default), allow overwriting the target URI when persisting. Pass False to forbid overwrite.

Returns:

The posted record from the target Db, or None.

Return type:

Record or None

Example

>>> hub.postToDb(collection.Dict({'x': 1}), 'db_1', name='new_entry')
putToDb(obj, dbName, name=None, overwrite=True)

Put (upsert) an entry into the underlying GlueDb and persist it.

This mirrors GlueDb.put: if a record with the given name exists it is updated, otherwise it is created. After the operation the modified Db is written back to the URI it points to.

toDb()

Consolidate all databases in the hub into a single GlueDb.

This merges all databases stored in the hub into one database. Useful for flattening the hub structure.

Returns:

A new GlueDb instance containing all records from all databases.

Return type:

Db

Example

>>> consolidated = hub.toDb()
>>> print(consolidated.getNames())  # All names from all databases

Usage Examples

Connecting to an Existing GlueDb

from pyswark.core.io import api

# Connect to a .gluedb file
db = api.read('file:./sma-example.gluedb')

# View available records
print(db.getNames())  # ['JPM', 'BAC', 'kwargs']

# Extract data by name
jpm_data = db.extract('JPM')
print(jpm_data.shape)

Creating a New GlueDb

from pyswark.gluedb import db
from pyswark.core.models import collection
from pyswark.core.io import api as io

# Create a new empty database
db = db.Db()

# Post records pointing to data sources
db.post('file:./ohlc-jpm.csv.gz', name='JPM')
db.post('file:./ohlc-bac.csv.gz', name='BAC')

# Post inline data (dict, list, etc.)
db.post('config', collection.Dict({
    'window': 60,
    'method': 'rolling'
}))

print(db.getNames())  # ['JPM', 'BAC', 'config']

# Save the database
io.write(db, 'file:./my-analysis.gluedb')

Persisting with Db.connect

Db.connect() loads a .gluedb catalog from a URI and returns a context manager. When persist=True, the catalog is written back to the file on successful exit:

from pyswark.gluedb.db import Db
from pyswark.core.models import collection
from pyswark.core.io import api

# Create an initial catalog and save it
db = Db()
db.post('file:./ohlc-jpm.csv.gz', name='JPM')
api.write(db, 'file:./catalog.gluedb')

# Re-open with persist=True — auto-saves on exit
with Db.connect('file:./catalog.gluedb', persist=True) as db:
    db.post(collection.Dict({'window': 60}), name='kwargs')

# Changes are persisted
db = Db.connect('file:./catalog.gluedb')
print(db.getNames())  # ['JPM', 'kwargs']

# Extract data
jpm_data = db.extract('JPM')

Persisting with DbSQLModel.connect

DbSQLModel.connect() provides SQLite-backed persistence with automatic commit/rollback semantics. Data posted inside a with block is committed on successful exit and rolled back on exception:

from pyswark.core.models.db import DbSQLModel
from pyswark.lib.pydantic import base

class Ticker(base.BaseModel):
    symbol   : str
    longName : str
    exchange : str

db_url = 'sqlite:///./my-tickers.db'

# Post records — auto-commits on exit
with DbSQLModel.connect(db_url) as db:
    aapl = Ticker(symbol='AAPL', longName='Apple Inc.', exchange='NASDAQ')
    db.post(aapl, name='AAPL')

    msft = Ticker(symbol='MSFT', longName='Microsoft Corp', exchange='NASDAQ')
    db.post(msft, name='MSFT')

# Data persists across connections
with DbSQLModel.connect(db_url) as db:
    result = db.getByName('AAPL')
    ticker = result.body.extract()
    print(ticker.symbol)    # 'AAPL'
    print(ticker.longName)  # 'Apple Inc.'

    all_records = db.getAll()
    print(len(all_records))  # 2

Updating and deleting records:

# PUT replaces or creates a record (idempotent)
with DbSQLModel.connect(db_url) as db:
    updated = Ticker(symbol='MSFT', longName='Microsoft Corporation', exchange='NASDAQ')
    db.put(updated, name='MSFT')

# DELETE removes a record
with DbSQLModel.connect(db_url) as db:
    db.deleteByName('MSFT')

# Verify
with DbSQLModel.connect(db_url) as db:
    print(db.getByName('MSFT'))  # None

Rollback on exception:

# If an exception occurs, changes are rolled back
try:
    with DbSQLModel.connect(db_url) as db:
        db.post(Ticker(symbol='TSLA', longName='Tesla', exchange='NASDAQ'), name='TSLA')
        raise ValueError("something went wrong")
except ValueError:
    pass

with DbSQLModel.connect(db_url) as db:
    print(db.getByName('TSLA'))  # None (rolled back)

Merging Databases

from pyswark.gluedb import db

db1 = db.Db()
db1.post('data1', 'file:./data1.csv')

db2 = db.Db()
db2.post('data2', 'file:./data2.csv')

# Merge db2 into db1
db1.merge(db2)
print(db1.getNames())  # ['data1', 'data2']

Using GlueHub for Multiple Databases

from pyswark.gluedb import hub

gluedb_hub = hub.Hub()
gluedb_hub.post('market_data', market_db)
gluedb_hub.post('config', config_db)

# Extract a specific database from the hub
market_db = gluedb_hub.extract('market_data')

Per-Database Helpers

When a hub entry points at a URI (for example, a .gluedb file), you can modify the underlying database and persist changes back to the file using the convenience helpers:

from pyswark.gluedb import hub
from pyswark.core.models import collection

gluedb_hub = hub.Hub()
gluedb_hub.post('file:./catalog.gluedb', name='catalog')

# Post a new record into the underlying GlueDb and overwrite the file
gluedb_hub.postToDb(
    collection.Dict({'window': 120}),
    'catalog',
    name='kwargs_120',
)

# Merge another GlueDb and persist the merged result
from pyswark.gluedb import db as gluedb

other = gluedb.Db()
other.post('extra', 'file:./extra.csv')

gluedb_hub.mergeToDb(other, 'catalog')