gluedb - Data Catalogs
The gluedb module provides data catalog functionality for managing
named collections of data sources. This is Layer 2 of the architecture.
Key Concepts
GlueDb: A database of named records pointing to data sources
GlueHub: A hub containing multiple GlueDb instances
Record: A named entry with URI and extraction settings
GlueDb Class
- class pyswark.gluedb.db.Db(*, records: list[pyswark.core.models.record.Record] = <factory>, url: str = '', datahandler: str = 'pjson', engine_url: str = 'sqlite:///:memory:', persist: bool = False)
Bases:
BaseGlueDb - A database of named records pointing to data sources.
A GlueDb is a collection of named records, where each record can point to a data source via URI or contain inline data. Records can be extracted, loaded, and managed through a simple API.
Key Features
Store records by name with URI or inline data
Extract data from records automatically
Merge multiple databases together
Serialize to/from .gluedb files
Example
>>> from pyswark.gluedb import db >>> >>> # Create a new database >>> db = db.Db() >>> >>> # Post records >>> db.post('prices', 'file:./prices.csv') >>> db.post('config', {'window': 60}) >>> >>> # Extract data >>> prices = db.extract('prices') >>> >>> # List all names >>> print(db.getNames()) # ['prices', 'config']
- __contains__(name)
- delete(name)
Delete a record by name (alias for deleteByName).
- Parameters:
name (str) – The name of the record to delete.
- Returns:
True if the record was deleted, False if it didn’t exist.
- Return type:
bool
- extract(name)
Extract data from a record by name.
This method retrieves the record and extracts its data. If the record points to a URI, the data will be loaded from that URI. If the record contains inline data, it will be returned directly.
- Parameters:
name (str) – The name of the record to extract.
- Returns:
The extracted data (type depends on the record’s data source).
- Return type:
Any
Example
>>> db.post('prices', 'file:./prices.csv') >>> prices_df = db.extract('prices')
- get(name)
Get a record by name (alias for getByName).
- Parameters:
name (str) – The name of the record to retrieve.
- Returns:
The record with the given name, or None if not found.
- Return type:
- getNames()
Get all record names in the database.
- Returns:
List of all record names.
- Return type:
list[str]
GlueHub Class
- class pyswark.gluedb.hub.Hub(*, records: list[pyswark.core.models.record.Record] = <factory>, url: str = '', datahandler: str = 'pjson', engine_url: str = 'sqlite:///:memory:', persist: bool = False)
Bases:
DbA hub containing multiple GlueDb instances.
Hub extends Db but uses Contents model to store references to other GlueDb instances. This allows organizing multiple databases together and consolidating them when needed.
Example
>>> from pyswark.gluedb import hub >>> hub = hub.Hub() >>> hub.post('market_data', market_db) >>> hub.post('config', 'file:./config.gluedb') >>> >>> # Extract a database >>> market_db = hub.extract('market_data') >>> >>> # Consolidate all databases >>> consolidated = hub.toDb()
- acquireFromDb(dbName, name)
Acquire the low-level handler for a record in the underlying GlueDb.
- deleteFromDb(dbName, name, overwrite=True)
Delete an entry from the underlying GlueDb and persist the change.
- extract(name)
Extract a database from the hub by name.
This method retrieves the record and extracts its data. If the record points to a URI, the data will be loaded from that URI. If the record contains inline data, it will be returned directly.
- extractFromDb(dbName, name)
Extract data from a record in the underlying GlueDb.
- getFromDb(dbName, name)
Get a record from the underlying GlueDb without persisting anything.
- load(data, name)
Load a database into the hub by name.
This method stores the database in the hub under the given name.
- mergeToDb(otherDb, dbName, overwrite=True)
Merge another GlueDb into a database in the hub and persist it.
- Parameters:
otherDb (Db) – Another GlueDb instance whose records will be merged into the target database.
dbName (str) – Name of the database in the hub to merge into.
overwrite (bool, optional) – If True (default), allow overwriting the target URI when persisting.
- Returns:
The merged target GlueDb.
- Return type:
- postToDb(obj, dbName, name=None, overwrite=True)
Post an entry to the underlying GlueDb and overwrite it to the URI it points to.
Uses .acquire() and .extract() to get the Db for the given dbName, posts the object to that Db, then persists the modified Db back to its URI (e.g. the file the gluedb object points to).
- Parameters:
obj (Any) – Object to post (e.g. dict, BaseModel, or URI string).
dbName (str) – Name of the database in the hub to post to.
name (str, optional) – Record name for the new entry. If None, taken from obj.name or obj[‘name’] when possible.
overwrite (bool, optional) – If True (default), allow overwriting the target URI when persisting. Pass False to forbid overwrite.
- Returns:
The posted record from the target Db, or None.
- Return type:
Record or None
Example
>>> hub.postToDb(collection.Dict({'x': 1}), 'db_1', name='new_entry')
- putToDb(obj, dbName, name=None, overwrite=True)
Put (upsert) an entry into the underlying GlueDb and persist it.
This mirrors GlueDb.put: if a record with the given name exists it is updated, otherwise it is created. After the operation the modified Db is written back to the URI it points to.
- toDb()
Consolidate all databases in the hub into a single GlueDb.
This merges all databases stored in the hub into one database. Useful for flattening the hub structure.
- Returns:
A new GlueDb instance containing all records from all databases.
- Return type:
Example
>>> consolidated = hub.toDb() >>> print(consolidated.getNames()) # All names from all databases
Usage Examples
Connecting to an Existing GlueDb
from pyswark.core.io import api
# Connect to a .gluedb file
db = api.read('file:./sma-example.gluedb')
# View available records
print(db.getNames()) # ['JPM', 'BAC', 'kwargs']
# Extract data by name
jpm_data = db.extract('JPM')
print(jpm_data.shape)
Creating a New GlueDb
from pyswark.gluedb import db
from pyswark.core.models import collection
from pyswark.core.io import api as io
# Create a new empty database
db = db.Db()
# Post records pointing to data sources
db.post('file:./ohlc-jpm.csv.gz', name='JPM')
db.post('file:./ohlc-bac.csv.gz', name='BAC')
# Post inline data (dict, list, etc.)
db.post('config', collection.Dict({
'window': 60,
'method': 'rolling'
}))
print(db.getNames()) # ['JPM', 'BAC', 'config']
# Save the database
io.write(db, 'file:./my-analysis.gluedb')
Persisting with Db.connect
Db.connect() loads a .gluedb catalog from a URI and returns a context
manager. When persist=True, the catalog is written back to the file on
successful exit:
from pyswark.gluedb.db import Db
from pyswark.core.models import collection
from pyswark.core.io import api
# Create an initial catalog and save it
db = Db()
db.post('file:./ohlc-jpm.csv.gz', name='JPM')
api.write(db, 'file:./catalog.gluedb')
# Re-open with persist=True — auto-saves on exit
with Db.connect('file:./catalog.gluedb', persist=True) as db:
db.post(collection.Dict({'window': 60}), name='kwargs')
# Changes are persisted
db = Db.connect('file:./catalog.gluedb')
print(db.getNames()) # ['JPM', 'kwargs']
# Extract data
jpm_data = db.extract('JPM')
Persisting with DbSQLModel.connect
DbSQLModel.connect() provides SQLite-backed persistence with automatic
commit/rollback semantics. Data posted inside a with block is committed
on successful exit and rolled back on exception:
from pyswark.core.models.db import DbSQLModel
from pyswark.lib.pydantic import base
class Ticker(base.BaseModel):
symbol : str
longName : str
exchange : str
db_url = 'sqlite:///./my-tickers.db'
# Post records — auto-commits on exit
with DbSQLModel.connect(db_url) as db:
aapl = Ticker(symbol='AAPL', longName='Apple Inc.', exchange='NASDAQ')
db.post(aapl, name='AAPL')
msft = Ticker(symbol='MSFT', longName='Microsoft Corp', exchange='NASDAQ')
db.post(msft, name='MSFT')
# Data persists across connections
with DbSQLModel.connect(db_url) as db:
result = db.getByName('AAPL')
ticker = result.body.extract()
print(ticker.symbol) # 'AAPL'
print(ticker.longName) # 'Apple Inc.'
all_records = db.getAll()
print(len(all_records)) # 2
Updating and deleting records:
# PUT replaces or creates a record (idempotent)
with DbSQLModel.connect(db_url) as db:
updated = Ticker(symbol='MSFT', longName='Microsoft Corporation', exchange='NASDAQ')
db.put(updated, name='MSFT')
# DELETE removes a record
with DbSQLModel.connect(db_url) as db:
db.deleteByName('MSFT')
# Verify
with DbSQLModel.connect(db_url) as db:
print(db.getByName('MSFT')) # None
Rollback on exception:
# If an exception occurs, changes are rolled back
try:
with DbSQLModel.connect(db_url) as db:
db.post(Ticker(symbol='TSLA', longName='Tesla', exchange='NASDAQ'), name='TSLA')
raise ValueError("something went wrong")
except ValueError:
pass
with DbSQLModel.connect(db_url) as db:
print(db.getByName('TSLA')) # None (rolled back)
Merging Databases
from pyswark.gluedb import db
db1 = db.Db()
db1.post('data1', 'file:./data1.csv')
db2 = db.Db()
db2.post('data2', 'file:./data2.csv')
# Merge db2 into db1
db1.merge(db2)
print(db1.getNames()) # ['data1', 'data2']
Using GlueHub for Multiple Databases
from pyswark.gluedb import hub
gluedb_hub = hub.Hub()
gluedb_hub.post('market_data', market_db)
gluedb_hub.post('config', config_db)
# Extract a specific database from the hub
market_db = gluedb_hub.extract('market_data')
Per-Database Helpers
When a hub entry points at a URI (for example, a .gluedb file), you can
modify the underlying database and persist changes back to the file using the
convenience helpers:
from pyswark.gluedb import hub
from pyswark.core.models import collection
gluedb_hub = hub.Hub()
gluedb_hub.post('file:./catalog.gluedb', name='catalog')
# Post a new record into the underlying GlueDb and overwrite the file
gluedb_hub.postToDb(
collection.Dict({'window': 120}),
'catalog',
name='kwargs_120',
)
# Merge another GlueDb and persist the merged result
from pyswark.gluedb import db as gluedb
other = gluedb.Db()
other.post('extra', 'file:./extra.csv')
gluedb_hub.mergeToDb(other, 'catalog')