Storage¶
Pluggable storage backends for the Haystack server.
Protocol¶
The StorageAdapter runtime-checkable
protocol that all backends implement for entity and history storage, and the
UserStore protocol for user management.
StorageAdapter and UserStore Protocols for Haystack server backends.
Defines the async interfaces that all storage backends must implement.
Concrete implementations include InMemoryAdapter,
RedisAdapter, and
TimescaleAdapter.
- class hs_py.storage.protocol.StorageAdapter(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for Haystack server storage backends.
All methods are async. Concrete backends must implement every method. The
start()/close()methods bracket the lifetime of the adapter and are called by the server on startup and shutdown respectively.Entity dicts use native Haystack kinds (
Ref,Marker,Number, etc.) — not wire-encoded JSON dicts. Callers are responsible for encoding before sending to clients.Navigate the site/equip/point hierarchy.
- async point_write(ref, level, val, who='', duration=None)[source]¶
Write a value to a writable point’s priority array.
- Parameters:
ref (
Ref) – Entity Ref of the writable point.level (
int) – Priority level (1-17). Level 17 is the default.val (
Any) – Value to write. PassNoneto clear the level.who (
str(default:'')) – Optional identifier of who is writing.duration (
Any(default:None)) – Optional duration override (ignored by most backends).
- Return type:
- async watch_sub(watch_id, ids, dis='watch')[source]¶
Create or extend a watch subscription.
- Parameters:
- Return type:
- Returns:
(watch_id, entities)where entities is the current state of all newly subscribed entities.
- async watch_unsub(watch_id, ids, *, close=False)[source]¶
Remove entities from a watch, or close the watch entirely.
- async watch_poll(watch_id, *, refresh=False)[source]¶
Poll a watch for changed entities.
- Parameters:
- Return type:
- Returns:
List of entity dicts that have changed since the last poll (or all entities if refresh is
True). The dirty set is cleared after each poll.
- class hs_py.storage.protocol.UserStore(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for user management backends.
All methods are async. Backends that implement both
StorageAdapterandUserStorecan be used as a unified storage layer.- async create_user(user)[source]¶
Persist a new user.
- Parameters:
user (
User) – User to create.- Raises:
ValueError – If a user with the same username already exists.
- Return type:
Memory¶
In-memory storage adapter for testing and prototyping.
In-memory Haystack storage adapter.
Provides InMemoryAdapter, a pure-Python implementation of
StorageAdapter backed by plain dicts.
Suitable for testing, demos, and small deployments that do not require
persistence.
- class hs_py.storage.memory.InMemoryAdapter(entities=None)[source]¶
Bases:
objectIn-memory implementation of
StorageAdapter.All state is held in plain Python dicts; nothing is persisted to disk. Thread-safety is not guaranteed — use within a single asyncio event loop.
- Parameters:
entities (
list[dict[str,Any]] |None(default:None)) – Optional initial list of entity dicts (each must have anidRef).
- load_entities(entities)[source]¶
Bulk-load a list of entity dicts.
Each entity must have an
idRef. Entities without anidare silently skipped.
Navigate the site/equip/point hierarchy.
nav_id=None— return all entities with thesitetag.nav_idof a site — return equips whosesiteRefmatches.nav_idof an equip — return points whoseequipRefmatches.
- async point_write(ref, level, val, who='', duration=None)[source]¶
Write a value to a writable point’s priority array.
- Parameters:
ref (
Ref) – Entity Ref of the writable point.level (
int) – Priority level (1-17). Level 17 is the default.val (
Any) – Value to write. PassNoneto clear the level.who (
str(default:'')) – Optional identifier of who is writing (stored for reference but not currently used).duration (
Any(default:None)) – Ignored by this backend.
- Return type:
- async watch_sub(watch_id, ids, dis='watch')[source]¶
Create or extend a watch subscription.
- Parameters:
- Return type:
- Returns:
(watch_id, entities)— the (possibly new) watch ID and the current state of all subscribed entities.
- async watch_unsub(watch_id, ids, *, close=False)[source]¶
Remove entities from a watch, or close the watch entirely.
- async watch_poll(watch_id, *, refresh=False)[source]¶
Poll a watch for changed entities.
- Parameters:
- Return type:
- Returns:
List of entity dicts that have changed since the last poll, or all watched entities when refresh is
True. The dirty set is cleared after polling.
- async create_user(user)[source]¶
Persist a new user.
- Raises:
ValueError – If a user with the same username already exists.
- Return type:
- Parameters:
user (User)
Redis¶
Redis-backed storage using RediSearch for entity indexing and RedisTimeSeries for history.
Redis storage adapter for Haystack servers.
Implements StorageAdapter using Redis 8
with RedisJSON, RedisTimeSeries, and RediSearch.
Requires Redis 8+ (ships with JSON, TimeSeries, and Search modules) and the
redis[hiredis] Python package (installed via pip install hs-py[server]).
Key schema:
hs:e:{ref_val} RedisJSON document (entity + _tags index field)
hs:ids Set of all entity ref vals
hs:tag:{tagname} Set of ref vals that have this tag
hs:ts:{ref_val} TimeSeries key for history data
hs:pri:{ref_val} Hash mapping level -> JSON-encoded value
hs:w:{watch_id} Hash with watch metadata (dis, lease)
hs:w:{watch_id}:ids Set of watched ref vals
hs:w:{watch_id}:dirty Set of dirty ref vals
RediSearch index hs_idx is created on hs:e:* JSON documents with:
_tagsTAG field (comma-separated tag names) for Has/Missing queriessiteRefTAG field ($.siteRef.val) for site navigation and filtersequipRefTAG field ($.equipRef.val) for equip navigation and filters
- class hs_py.storage.redis.RedisAdapter(redis)[source]¶
Bases:
objectStorageAdapter backed by Redis 8 (JSON + TimeSeries + Search).
Implements
StorageAdapterusing RedisJSON for entity storage, RedisTimeSeries for history data, and RediSearch for efficient filter queries.- Parameters:
redis – A
redis.asyncio.Redisclient instance created withprotocol=3anddecode_responses=True.
- property all_col_names: tuple[str, ...] | None¶
Cached column names across all entities, or
Noneif unknown.
- async read_by_filter(ast, limit=None)[source]¶
Return entities matching a filter AST.
Attempts to delegate fully to RediSearch; falls back to Python evaluation with tag-index candidate narrowing for unsupported filter constructs.
Navigate the site/equip/point hierarchy.
Uses RediSearch indexed
siteRefandequipReffields for efficient lookups instead of loading all entities into memory.
- async point_write(ref, level, val, who='', duration=None)[source]¶
Write a value to a writable point’s priority array.
- Parameters:
- Return type:
- async watch_sub(watch_id, ids, dis='watch')[source]¶
Create or extend a watch subscription.
- Parameters:
- Return type:
- Returns:
(watch_id, entities)where entities is the current state of all newly subscribed entities.
- async watch_unsub(watch_id, ids, *, close=False)[source]¶
Remove entities from a watch, or close the watch entirely.
- async watch_poll(watch_id, *, refresh=False)[source]¶
Poll for changed entities.
- Parameters:
- Return type:
- Returns:
List of entity dicts that have changed since the last poll (or all entities if refresh is
True).- Raises:
ValueError – If watch_id is not found.
- async load_entities(entities)[source]¶
Bulk-load a list of entity dicts into Redis.
Each entity must have an
idRef. Entities without anidare silently skipped. Large batches are chunked to avoid unbounded pipeline memory usage.
- async create_user(user)[source]¶
Persist a new user.
- Raises:
ValueError – If a user with the same username already exists.
- Return type:
- Parameters:
user (User)
- hs_py.storage.redis.create_redis_client(url='redis://localhost:6379', *, tls=None, max_connections=50)[source]¶
Create an async Redis client with optional TLS 1.3.
Uses RESP3 protocol, automatic string decoding, connection health checks, and jittered exponential-backoff retries. When tls is provided, the connection enforces TLS 1.3 minimum, loads the configured client certificate for mutual authentication, and verifies the server certificate against the configured CA.
- Parameters:
url (default:
'redis://localhost:6379') – Redis connection URL (redis://orrediss://).tls (default:
None) – Optional TLS configuration for encrypted connections.max_connections (default:
50) – Maximum connections in the pool (default 50).
- Returns:
An async
redis.asyncio.Redisclient.- Return type:
Redis[str]
Example:
from hs_py.tls import TLSConfig from hs_py.storage.redis import RedisAdapter, create_redis_client tls = TLSConfig( certificate_path="client.pem", private_key_path="client.key", ca_certificates_path="ca.pem", ) redis = create_redis_client("rediss://redis:6379", tls=tls) adapter = RedisAdapter(redis)
Redis Operations¶
Low-level Redis operations for entity and time-series management.
Backward-compatible Redis ops wrapper.
Provides RedisOps, a thin HaystackOps subclass
that wraps RedisAdapter.
For new code, prefer using RedisAdapter directly
with HaystackOps:
from hs_py.ops import HaystackOps
from hs_py.storage.redis import RedisAdapter, create_redis_client
redis = create_redis_client()
adapter = RedisAdapter(redis)
ops = HaystackOps(storage=adapter)
await adapter.start()
- class hs_py.redis_ops.RedisOps(redis, *, namespace=None)[source]¶
Bases:
HaystackOpsHaystack ops backed by Redis 8, wrapping
RedisAdapter.This is a convenience wrapper for backward compatibility. For new code, use
RedisAdapterdirectly withHaystackOps(storage=adapter).- Parameters:
redis – A
redis.asyncio.Redisclient instance.namespace (default:
None) – Optional ontology namespace for defs/libs ops.
- hs_py.redis_ops.create_redis_client(url='redis://localhost:6379', *, tls=None, max_connections=50)[source]¶
Create an async Redis client with optional TLS 1.3.
Uses RESP3 protocol, automatic string decoding, connection health checks, and jittered exponential-backoff retries. When tls is provided, the connection enforces TLS 1.3 minimum, loads the configured client certificate for mutual authentication, and verifies the server certificate against the configured CA.
- Parameters:
url (default:
'redis://localhost:6379') – Redis connection URL (redis://orrediss://).tls (default:
None) – Optional TLS configuration for encrypted connections.max_connections (default:
50) – Maximum connections in the pool (default 50).
- Returns:
An async
redis.asyncio.Redisclient.
Example:
from hs_py.tls import TLSConfig from hs_py.storage.redis import RedisAdapter, create_redis_client tls = TLSConfig( certificate_path="client.pem", private_key_path="client.key", ca_certificates_path="ca.pem", ) redis = create_redis_client("rediss://redis:6379", tls=tls) adapter = RedisAdapter(redis)
TimescaleDB¶
PostgreSQL/TimescaleDB storage with JSONB entities, hypertable history, and filter-to-SQL pushdown.
TimescaleDB StorageAdapter for Haystack server backends.
Provides TimescaleAdapter implementing the StorageAdapter Protocol
using asyncpg for async PostgreSQL/TimescaleDB access.
Schema is created automatically on start(). Entities are stored as JSONB
in hs_entities. Time-series history uses hs_history (optionally a
TimescaleDB hypertable). Priority arrays are stored in hs_priority.
Watches are tracked in hs_watches and hs_watch_entities.
Usage:
pool = await create_timescale_pool("postgresql://localhost/haystack")
adapter = TimescaleAdapter(pool)
await adapter.start()
await adapter.load_entities([{"id": Ref("site1"), "site": MARKER, "dis": "My Site"}])
- class hs_py.storage.timescale.TimescaleAdapter(pool)[source]¶
Bases:
objectStorageAdapter backed by PostgreSQL/TimescaleDB via asyncpg.
- Parameters:
pool – asyncpg connection pool to use.
- async start()[source]¶
Create database schema (idempotent).
Runs DDL to create tables and indexes if they do not already exist. Attempts to create a TimescaleDB hypertable for
hs_history; the attempt is silently skipped if TimescaleDB is not available.- Return type:
- property all_col_names: tuple[str, ...] | None¶
Cached column names across all entities, or
Noneif unknown.
- async read_by_filter(ast, limit=None)[source]¶
Return entities matching the filter AST.
Translates simple (single-segment) filter nodes to JSONB SQL. Falls back to Python-side evaluation via
hs_py.filter.evaluate()for multi-segment paths or unsupported node types.Results are cached by
(sql_clause, limit)to avoid re-decoding on repeated reads.
Navigate the entity tree.
nav_id=Nonereturns all site entities.nav_idset to a site id returns equip entities for that site.nav_idset to an equip id returns point entities for that equip.
- async point_write(ref, level, val, who='', duration=None)[source]¶
Write a value to a writable point’s priority array.
- async watch_sub(watch_id, ids, dis='watch')[source]¶
Create or extend a watch subscription.
- Parameters:
- Return type:
- Returns:
(watch_id, entities)where entities is the current state of all newly subscribed entities.
- async watch_unsub(watch_id, ids, *, close=False)[source]¶
Remove entities from a watch, or close the watch entirely.
- async watch_poll(watch_id, *, refresh=False)[source]¶
Poll a watch for changed entities.
Uses a transaction to atomically fetch and clear dirty flags, preventing lost notifications from concurrent writers.
- async load_entities(entities)[source]¶
Bulk-upsert a list of entity dicts into the store.
Uses a staging table with COPY for fast bulk loading, then upserts into the main table. The
idtag (aRef) is extracted and used as the primary key. Entities without anidare skipped.
- async create_user(user)[source]¶
Persist a new user.
- Raises:
ValueError – If a user with the same username already exists.
- Return type:
- Parameters:
user (User)
- async hs_py.storage.timescale.create_timescale_pool(dsn='postgresql://localhost:5432/haystack', *, min_size=2, max_size=10, command_timeout=60.0, tls=None)[source]¶
Create an asyncpg connection pool for TimescaleDB.
Configures connection recycling, idle connection cleanup, and registers orjson-based JSONB codecs on each new connection.
- Parameters:
dsn (default:
'postgresql://localhost:5432/haystack') – PostgreSQL DSN string.min_size (default:
2) – Minimum number of pooled connections.max_size (default:
10) – Maximum number of pooled connections.command_timeout (default:
60.0) – Per-query timeout in seconds (default 60).tls (default:
None) – Optional TLS configuration for SSL connections.
- Returns:
Open
asyncpg.Poolready for use.- Return type:
asyncpg.Pool[Any]