Storage

Pluggable storage backends for the Haystack server.

Protocol

The StorageAdapter runtime-checkable protocol that all backends implement for entity and history storage, and the UserStore protocol for user management.

StorageAdapter and UserStore Protocols for Haystack server backends.

Defines the async interfaces that all storage backends must implement. Concrete implementations include InMemoryAdapter, RedisAdapter, and TimescaleAdapter.

class hs_py.storage.protocol.StorageAdapter(*args, **kwargs)[source]

Bases: Protocol

Protocol for Haystack server storage backends.

All methods are async. Concrete backends must implement every method. The start() / close() methods bracket the lifetime of the adapter and are called by the server on startup and shutdown respectively.

Entity dicts use native Haystack kinds (Ref, Marker, Number, etc.) — not wire-encoded JSON dicts. Callers are responsible for encoding before sending to clients.

async read_by_filter(ast, limit=None)[source]

Return entities matching a filter AST.

Parameters:
  • ast (Has | Missing | Cmp | And | Or) – Compiled filter AST from parse().

  • limit (int | None (default: None)) – Maximum number of results to return. None means no limit.

Return type:

list[dict[str, Any]]

Returns:

List of entity dicts (order unspecified).

async read_by_ids(ids)[source]

Return entities for a list of Refs, preserving order.

Parameters:

ids (list[Ref]) – Ordered list of entity Refs to fetch.

Return type:

list[dict[str, Any] | None]

Returns:

List the same length as ids. Each entry is the entity dict if found, or None if the Ref does not exist.

async nav(nav_id=None)[source]

Navigate the site/equip/point hierarchy.

Parameters:

nav_id (str | None (default: None)) – The Ref.val of the entity whose children should be returned. Pass None to get root-level sites.

Return type:

list[dict[str, Any]]

Returns:

List of child entity dicts.

async his_read(ref, range_str=None)[source]

Return time-series history for a point.

Parameters:
  • ref (Ref) – Entity Ref of the point.

  • range_str (str | None (default: None)) – Optional range string (e.g. "today", "2024-01-01,2024-01-31"). If None, all data is returned.

Return type:

list[dict[str, Any]]

Returns:

List of dicts, each with "ts" (datetime) and "val" keys.

async his_write(ref, items)[source]

Append time-series data for a point.

Parameters:
  • ref (Ref) – Entity Ref of the point.

  • items (list[dict[str, Any]]) – List of dicts with "ts" (datetime) and "val" keys.

Return type:

None

async point_write(ref, level, val, who='', duration=None)[source]

Write a value to a writable point’s priority array.

Parameters:
  • ref (Ref) – Entity Ref of the writable point.

  • level (int) – Priority level (1-17). Level 17 is the default.

  • val (Any) – Value to write. Pass None to clear the level.

  • who (str (default: '')) – Optional identifier of who is writing.

  • duration (Any (default: None)) – Optional duration override (ignored by most backends).

Return type:

None

async point_read_array(ref)[source]

Return the 17-level priority array for a writable point.

Parameters:

ref (Ref) – Entity Ref of the writable point.

Return type:

list[dict[str, Any]]

Returns:

List of 17 dicts, each with a "level" key and an optional "val" key (absent when the level is unset).

async watch_sub(watch_id, ids, dis='watch')[source]

Create or extend a watch subscription.

Parameters:
  • watch_id (str | None) – Existing watch ID to extend, or None to create a new watch.

  • ids (list[Ref]) – Entity Refs to add to the watch.

  • dis (str (default: 'watch')) – Human-readable display name for a new watch.

Return type:

tuple[str, list[dict[str, Any]]]

Returns:

(watch_id, entities) where entities is the current state of all newly subscribed entities.

async watch_unsub(watch_id, ids, *, close=False)[source]

Remove entities from a watch, or close the watch entirely.

Parameters:
  • watch_id (str) – Watch to modify.

  • ids (list[Ref]) – Entity Refs to remove. Ignored when close is True.

  • close (bool (default: False)) – If True, the entire watch is torn down.

Return type:

None

async watch_poll(watch_id, *, refresh=False)[source]

Poll a watch for changed entities.

Parameters:
  • watch_id (str) – Watch to poll.

  • refresh (bool (default: False)) – If True, return all watched entities (full refresh) regardless of dirty state.

Return type:

list[dict[str, Any]]

Returns:

List of entity dicts that have changed since the last poll (or all entities if refresh is True). The dirty set is cleared after each poll.

async start()[source]

Initialize the backend.

Called once before the server begins serving requests. May open connections, create indexes, warm caches, etc.

Return type:

None

async close()[source]

Tear down the backend.

Called when the server shuts down. Must release all resources (connections, file handles, etc.).

Return type:

None

class hs_py.storage.protocol.UserStore(*args, **kwargs)[source]

Bases: Protocol

Protocol for user management backends.

All methods are async. Backends that implement both StorageAdapter and UserStore can be used as a unified storage layer.

async get_user(username)[source]

Return a user by username, or None if not found.

Parameters:

username (str) – The unique login identifier.

Return type:

User | None

Returns:

User or None.

async list_users()[source]

Return all users.

Return type:

list[User]

Returns:

List of User instances.

async create_user(user)[source]

Persist a new user.

Parameters:

user (User) – User to create.

Raises:

ValueError – If a user with the same username already exists.

Return type:

None

async update_user(username, **fields)[source]

Update fields on an existing user.

Accepts keyword arguments matching User field names. The credentials field can be set directly, or pass password (plaintext) to re-derive credentials.

Parameters:
  • username (str) – Username of the user to update.

  • fields (Any) – Field names and new values.

Return type:

User

Returns:

The updated User.

Raises:

KeyError – If the user does not exist.

async delete_user(username)[source]

Delete a user by username.

Parameters:

username (str) – Username to delete.

Return type:

bool

Returns:

True if the user was deleted, False if not found.

Memory

In-memory storage adapter for testing and prototyping.

In-memory Haystack storage adapter.

Provides InMemoryAdapter, a pure-Python implementation of StorageAdapter backed by plain dicts. Suitable for testing, demos, and small deployments that do not require persistence.

class hs_py.storage.memory.InMemoryAdapter(entities=None)[source]

Bases: object

In-memory implementation of StorageAdapter.

All state is held in plain Python dicts; nothing is persisted to disk. Thread-safety is not guaranteed — use within a single asyncio event loop.

Parameters:

entities (list[dict[str, Any]] | None (default: None)) – Optional initial list of entity dicts (each must have an id Ref).

async start()[source]

No-op initializer (in-memory adapter needs no setup).

Return type:

None

async close()[source]

No-op teardown (no resources to release).

Return type:

None

load_entities(entities)[source]

Bulk-load a list of entity dicts.

Each entity must have an id Ref. Entities without an id are silently skipped.

Parameters:

entities (list[dict[str, Any]]) – List of entity dicts to load.

Return type:

int

Returns:

Number of entities actually stored.

property all_col_names: tuple[str, ...]

Return all unique tag names across all entities (cached).

async read_by_filter(ast, limit=None)[source]

Return entities matching a filter AST.

Parameters:
  • ast (Has | Missing | Cmp | And | Or) – Compiled filter AST from parse().

  • limit (int | None (default: None)) – Maximum number of results to return. None means no limit.

Return type:

list[dict[str, Any]]

Returns:

List of matching entity dicts.

async read_by_ids(ids)[source]

Return entities for a list of Refs, preserving input order.

Parameters:

ids (list[Ref]) – Ordered list of entity Refs to fetch.

Return type:

list[dict[str, Any] | None]

Returns:

List the same length as ids. Each entry is the entity dict if found, or None if the Ref does not exist.

async nav(nav_id=None)[source]

Navigate the site/equip/point hierarchy.

  • nav_id=None — return all entities with the site tag.

  • nav_id of a site — return equips whose siteRef matches.

  • nav_id of an equip — return points whose equipRef matches.

Parameters:

nav_id (str | None (default: None)) – Ref val of the parent entity, or None for roots.

Return type:

list[dict[str, Any]]

Returns:

List of child entity dicts.

async his_read(ref, range_str=None)[source]

Return time-series history for a point.

Parameters:
  • ref (Ref) – Entity Ref of the point.

  • range_str (str | None (default: None)) – Optional range string (currently ignored; all data is returned).

Return type:

list[dict[str, Any]]

Returns:

List of dicts with "ts" and "val" keys.

async his_write(ref, items)[source]

Append time-series data for a point.

Parameters:
  • ref (Ref) – Entity Ref of the point.

  • items (list[dict[str, Any]]) – List of dicts with "ts" and "val" keys to append.

Return type:

None

async point_write(ref, level, val, who='', duration=None)[source]

Write a value to a writable point’s priority array.

Parameters:
  • ref (Ref) – Entity Ref of the writable point.

  • level (int) – Priority level (1-17). Level 17 is the default.

  • val (Any) – Value to write. Pass None to clear the level.

  • who (str (default: '')) – Optional identifier of who is writing (stored for reference but not currently used).

  • duration (Any (default: None)) – Ignored by this backend.

Return type:

None

async point_read_array(ref)[source]

Return the 17-level priority array for a writable point.

Parameters:

ref (Ref) – Entity Ref of the writable point.

Return type:

list[dict[str, Any]]

Returns:

List of 17 dicts, each containing "level" (Number) and optionally "val" (absent when the level is unset).

async watch_sub(watch_id, ids, dis='watch')[source]

Create or extend a watch subscription.

Parameters:
  • watch_id (str | None) – Existing watch ID to extend, or None to create a new watch. If the provided ID does not exist it is treated as None and a new watch is created.

  • ids (list[Ref]) – Entity Refs to subscribe to.

  • dis (str (default: 'watch')) – Human-readable name for a newly created watch.

Return type:

tuple[str, list[dict[str, Any]]]

Returns:

(watch_id, entities) — the (possibly new) watch ID and the current state of all subscribed entities.

async watch_unsub(watch_id, ids, *, close=False)[source]

Remove entities from a watch, or close the watch entirely.

Parameters:
  • watch_id (str) – Watch to modify.

  • ids (list[Ref]) – Entity Refs to remove. Ignored when close is True.

  • close (bool (default: False)) – If True, tear down the entire watch.

Return type:

None

async watch_poll(watch_id, *, refresh=False)[source]

Poll a watch for changed entities.

Parameters:
  • watch_id (str) – Watch to poll.

  • refresh (bool (default: False)) – If True, return all watched entities regardless of dirty state.

Return type:

list[dict[str, Any]]

Returns:

List of entity dicts that have changed since the last poll, or all watched entities when refresh is True. The dirty set is cleared after polling.

mark_dirty(ref_val)[source]

Mark an entity as changed in all watches that subscribe to it.

Parameters:

ref_val (str) – The Ref.val of the entity that changed.

Return type:

None

async get_user(username)[source]

Return a user by username, or None if not found.

Return type:

User | None

Parameters:

username (str)

async list_users()[source]

Return all users.

Return type:

list[User]

async create_user(user)[source]

Persist a new user.

Raises:

ValueError – If a user with the same username already exists.

Return type:

None

Parameters:

user (User)

async update_user(username, **fields)[source]

Update fields on an existing user.

Raises:

KeyError – If the user does not exist.

Return type:

User

Parameters:
async delete_user(username)[source]

Delete a user by username.

Return type:

bool

Parameters:

username (str)

Redis

Redis-backed storage using RediSearch for entity indexing and RedisTimeSeries for history.

Redis storage adapter for Haystack servers.

Implements StorageAdapter using Redis 8 with RedisJSON, RedisTimeSeries, and RediSearch.

Requires Redis 8+ (ships with JSON, TimeSeries, and Search modules) and the redis[hiredis] Python package (installed via pip install hs-py[server]).

Key schema:

hs:e:{ref_val}          RedisJSON document (entity + _tags index field)
hs:ids                  Set of all entity ref vals
hs:tag:{tagname}        Set of ref vals that have this tag
hs:ts:{ref_val}         TimeSeries key for history data
hs:pri:{ref_val}        Hash mapping level -> JSON-encoded value
hs:w:{watch_id}         Hash with watch metadata (dis, lease)
hs:w:{watch_id}:ids     Set of watched ref vals
hs:w:{watch_id}:dirty   Set of dirty ref vals

RediSearch index hs_idx is created on hs:e:* JSON documents with:

  • _tags TAG field (comma-separated tag names) for Has/Missing queries

  • siteRef TAG field ($.siteRef.val) for site navigation and filters

  • equipRef TAG field ($.equipRef.val) for equip navigation and filters

class hs_py.storage.redis.RedisAdapter(redis)[source]

Bases: object

StorageAdapter backed by Redis 8 (JSON + TimeSeries + Search).

Implements StorageAdapter using RedisJSON for entity storage, RedisTimeSeries for history data, and RediSearch for efficient filter queries.

Parameters:

redis – A redis.asyncio.Redis client instance created with protocol=3 and decode_responses=True.

async start()[source]

Verify Redis connection and create RediSearch index.

Return type:

None

async close()[source]

Close the Redis connection.

Return type:

None

property all_col_names: tuple[str, ...] | None

Cached column names across all entities, or None if unknown.

async read_by_filter(ast, limit=None)[source]

Return entities matching a filter AST.

Attempts to delegate fully to RediSearch; falls back to Python evaluation with tag-index candidate narrowing for unsupported filter constructs.

Parameters:
  • ast (Has | Missing | Cmp | And | Or) – Compiled filter AST from parse().

  • limit (int | None (default: None)) – Maximum number of results to return. None means no limit.

Return type:

list[dict[str, Any]]

Returns:

List of matching entity dicts.

async read_by_ids(ids)[source]

Return entities for a list of Refs, preserving order.

Parameters:

ids (list[Ref]) – Ordered list of entity Refs to fetch.

Return type:

list[dict[str, Any] | None]

Returns:

List the same length as ids. Each entry is the entity dict if found, or None if the Ref does not exist.

async nav(nav_id=None)[source]

Navigate the site/equip/point hierarchy.

Uses RediSearch indexed siteRef and equipRef fields for efficient lookups instead of loading all entities into memory.

Parameters:

nav_id (str | None (default: None)) – The Ref.val of the entity whose children should be returned. Pass None to get root-level sites.

Return type:

list[dict[str, Any]]

Returns:

List of child entity dicts.

async his_read(ref, range_str=None)[source]

Return time-series history for a point.

Parameters:
  • ref (Ref) – Entity Ref of the point.

  • range_str (str | None (default: None)) – Optional range string (currently ignored; all data is returned).

Return type:

list[dict[str, Any]]

Returns:

List of dicts with "ts" (datetime) and "val" keys.

async his_write(ref, items)[source]

Append time-series data for a point.

Parameters:
  • ref (Ref) – Entity Ref of the point.

  • items (list[dict[str, Any]]) – List of dicts with "ts" and "val" keys.

Return type:

None

async point_write(ref, level, val, who='', duration=None)[source]

Write a value to a writable point’s priority array.

Parameters:
  • ref (Ref) – Entity Ref of the writable point.

  • level (int) – Priority level (1-17).

  • val (Any) – Value to write. Pass None to clear the level.

  • who (str (default: '')) – Optional identifier of who is writing (ignored).

  • duration (Any (default: None)) – Optional duration override (ignored).

Return type:

None

async point_read_array(ref)[source]

Return the 17-level priority array for a writable point.

Parameters:

ref (Ref) – Entity Ref of the writable point.

Return type:

list[dict[str, Any]]

Returns:

List of 17 dicts, each with a "level" key and an optional "val" key (absent when the level is unset).

async watch_sub(watch_id, ids, dis='watch')[source]

Create or extend a watch subscription.

Parameters:
  • watch_id (str | None) – Existing watch ID to extend, or None to create a new watch.

  • ids (list[Ref]) – Entity Refs to add to the watch.

  • dis (str (default: 'watch')) – Human-readable display name for a new watch.

Return type:

tuple[str, list[dict[str, Any]]]

Returns:

(watch_id, entities) where entities is the current state of all newly subscribed entities.

async watch_unsub(watch_id, ids, *, close=False)[source]

Remove entities from a watch, or close the watch entirely.

Parameters:
  • watch_id (str) – Watch to modify.

  • ids (list[Ref]) – Entity Refs to remove. Ignored when close is True.

  • close (bool (default: False)) – If True, the entire watch is torn down.

Raises:

ValueError – If watch_id is not found.

Return type:

None

async watch_poll(watch_id, *, refresh=False)[source]

Poll for changed entities.

Parameters:
  • watch_id (str) – Watch to poll.

  • refresh (bool (default: False)) – If True, return all watched entities (full refresh) regardless of dirty state.

Return type:

list[dict[str, Any]]

Returns:

List of entity dicts that have changed since the last poll (or all entities if refresh is True).

Raises:

ValueError – If watch_id is not found.

async load_entities(entities)[source]

Bulk-load a list of entity dicts into Redis.

Each entity must have an id Ref. Entities without an id are silently skipped. Large batches are chunked to avoid unbounded pipeline memory usage.

Parameters:

entities (list[dict[str, Any]]) – List of entity dicts to load.

Return type:

int

Returns:

Number of entities actually stored.

async get_user(username)[source]

Return a user by username, or None if not found.

Return type:

User | None

Parameters:

username (str)

async list_users()[source]

Return all users.

Return type:

list[User]

async create_user(user)[source]

Persist a new user.

Raises:

ValueError – If a user with the same username already exists.

Return type:

None

Parameters:

user (User)

async update_user(username, **fields)[source]

Update fields on an existing user.

Raises:

KeyError – If the user does not exist.

Return type:

User

Parameters:
async delete_user(username)[source]

Delete a user by username.

Return type:

bool

Parameters:

username (str)

hs_py.storage.redis.create_redis_client(url='redis://localhost:6379', *, tls=None, max_connections=50)[source]

Create an async Redis client with optional TLS 1.3.

Uses RESP3 protocol, automatic string decoding, connection health checks, and jittered exponential-backoff retries. When tls is provided, the connection enforces TLS 1.3 minimum, loads the configured client certificate for mutual authentication, and verifies the server certificate against the configured CA.

Parameters:
  • url (default: 'redis://localhost:6379') – Redis connection URL (redis:// or rediss://).

  • tls (default: None) – Optional TLS configuration for encrypted connections.

  • max_connections (default: 50) – Maximum connections in the pool (default 50).

Returns:

An async redis.asyncio.Redis client.

Return type:

Redis[str]

Example:

from hs_py.tls import TLSConfig
from hs_py.storage.redis import RedisAdapter, create_redis_client

tls = TLSConfig(
    certificate_path="client.pem",
    private_key_path="client.key",
    ca_certificates_path="ca.pem",
)
redis = create_redis_client("rediss://redis:6379", tls=tls)
adapter = RedisAdapter(redis)

Redis Operations

Low-level Redis operations for entity and time-series management.

Backward-compatible Redis ops wrapper.

Provides RedisOps, a thin HaystackOps subclass that wraps RedisAdapter.

For new code, prefer using RedisAdapter directly with HaystackOps:

from hs_py.ops import HaystackOps
from hs_py.storage.redis import RedisAdapter, create_redis_client

redis = create_redis_client()
adapter = RedisAdapter(redis)
ops = HaystackOps(storage=adapter)
await adapter.start()
class hs_py.redis_ops.RedisOps(redis, *, namespace=None)[source]

Bases: HaystackOps

Haystack ops backed by Redis 8, wrapping RedisAdapter.

This is a convenience wrapper for backward compatibility. For new code, use RedisAdapter directly with HaystackOps(storage=adapter).

Parameters:
  • redis – A redis.asyncio.Redis client instance.

  • namespace (default: None) – Optional ontology namespace for defs/libs ops.

async start()[source]

Verify Redis connection and create RediSearch index.

Return type:

None

async stop()[source]

Close the Redis connection.

Return type:

None

async load_entities(entities)[source]

Bulk-load a list of entity dicts into Redis.

Parameters:

entities (list[dict[str, Any]]) – List of entity dicts (each must have an id Ref).

Return type:

int

Returns:

Number of entities actually stored.

async load_grid(grid)[source]

Bulk-load entities from a Grid into Redis.

Parameters:

grid (Grid) – Grid of entities (each row must have an id Ref).

Return type:

int

Returns:

Number of entities loaded.

async about()[source]

Return server information.

Return type:

Grid

async filetypes(grid)[source]

Return supported file types.

Return type:

Grid

Parameters:

grid (Grid)

async invoke_action(grid)[source]

Invoke an action on an entity.

Return type:

Grid

Parameters:

grid (Grid)

hs_py.redis_ops.create_redis_client(url='redis://localhost:6379', *, tls=None, max_connections=50)[source]

Create an async Redis client with optional TLS 1.3.

Uses RESP3 protocol, automatic string decoding, connection health checks, and jittered exponential-backoff retries. When tls is provided, the connection enforces TLS 1.3 minimum, loads the configured client certificate for mutual authentication, and verifies the server certificate against the configured CA.

Parameters:
  • url (default: 'redis://localhost:6379') – Redis connection URL (redis:// or rediss://).

  • tls (default: None) – Optional TLS configuration for encrypted connections.

  • max_connections (default: 50) – Maximum connections in the pool (default 50).

Returns:

An async redis.asyncio.Redis client.

Example:

from hs_py.tls import TLSConfig
from hs_py.storage.redis import RedisAdapter, create_redis_client

tls = TLSConfig(
    certificate_path="client.pem",
    private_key_path="client.key",
    ca_certificates_path="ca.pem",
)
redis = create_redis_client("rediss://redis:6379", tls=tls)
adapter = RedisAdapter(redis)

TimescaleDB

PostgreSQL/TimescaleDB storage with JSONB entities, hypertable history, and filter-to-SQL pushdown.

TimescaleDB StorageAdapter for Haystack server backends.

Provides TimescaleAdapter implementing the StorageAdapter Protocol using asyncpg for async PostgreSQL/TimescaleDB access.

Schema is created automatically on start(). Entities are stored as JSONB in hs_entities. Time-series history uses hs_history (optionally a TimescaleDB hypertable). Priority arrays are stored in hs_priority. Watches are tracked in hs_watches and hs_watch_entities.

Usage:

pool = await create_timescale_pool("postgresql://localhost/haystack")
adapter = TimescaleAdapter(pool)
await adapter.start()
await adapter.load_entities([{"id": Ref("site1"), "site": MARKER, "dis": "My Site"}])
class hs_py.storage.timescale.TimescaleAdapter(pool)[source]

Bases: object

StorageAdapter backed by PostgreSQL/TimescaleDB via asyncpg.

Parameters:

pool – asyncpg connection pool to use.

async start()[source]

Create database schema (idempotent).

Runs DDL to create tables and indexes if they do not already exist. Attempts to create a TimescaleDB hypertable for hs_history; the attempt is silently skipped if TimescaleDB is not available.

Return type:

None

async close()[source]

Close the underlying connection pool.

Return type:

None

property all_col_names: tuple[str, ...] | None

Cached column names across all entities, or None if unknown.

async read_by_filter(ast, limit=None)[source]

Return entities matching the filter AST.

Translates simple (single-segment) filter nodes to JSONB SQL. Falls back to Python-side evaluation via hs_py.filter.evaluate() for multi-segment paths or unsupported node types.

Results are cached by (sql_clause, limit) to avoid re-decoding on repeated reads.

Parameters:
  • ast (Has | Missing | Cmp | And | Or) – Parsed filter AST.

  • limit (int | None (default: None)) – Maximum number of entities to return. None means no limit.

Return type:

list[dict[str, Any]]

Returns:

List of entity tag dicts.

async read_by_ids(ids)[source]

Return entities for a list of Refs, preserving input order.

Parameters:

ids (list[Ref]) – Ordered list of entity Refs to fetch.

Return type:

list[dict[str, Any] | None]

Returns:

List the same length as ids. Each entry is the entity dict if found, or None if the Ref does not exist.

async nav(nav_id=None)[source]

Navigate the entity tree.

  • nav_id=None returns all site entities.

  • nav_id set to a site id returns equip entities for that site.

  • nav_id set to an equip id returns point entities for that equip.

Parameters:

nav_id (str | None (default: None)) – None for root, or an entity id to navigate into.

Return type:

list[dict[str, Any]]

Returns:

List of entity tag dicts.

async his_read(ref, range_str=None)[source]

Return time-series history for a point.

Parameters:
  • ref (Ref) – Entity Ref of the point.

  • range_str (str | None (default: None)) – Optional Haystack range string (e.g. "today", "2024-01-01,2024-01-31"). If None, all data is returned.

Return type:

list[dict[str, Any]]

Returns:

List of dicts with "ts" (datetime) and "val" keys.

async his_write(ref, items)[source]

Append time-series data for a point.

Parameters:
  • ref (Ref) – Entity Ref of the point.

  • items (list[dict[str, Any]]) – List of dicts with "ts" and "val" keys.

Return type:

None

async point_write(ref, level, val, who='', duration=None)[source]

Write a value to a writable point’s priority array.

Parameters:
  • ref (Ref) – Entity Ref of the writable point.

  • level (int) – Priority level (1-17).

  • val (Any) – Value to write. Pass None to clear the level.

  • who (str (default: '')) – Optional identifier of who is writing.

  • duration (Any (default: None)) – Ignored by this backend.

Return type:

None

async point_read_array(ref)[source]

Return the 17-level priority array for a writable point.

Parameters:

ref (Ref) – Entity Ref of the writable point.

Return type:

list[dict[str, Any]]

Returns:

List of 17 dicts, each with a "level" key and an optional "val" key (absent when the level is unset).

async watch_sub(watch_id, ids, dis='watch')[source]

Create or extend a watch subscription.

Parameters:
  • watch_id (str | None) – Existing watch ID to extend, or None to create a new watch.

  • ids (list[Ref]) – Entity Refs to add to the watch.

  • dis (str (default: 'watch')) – Human-readable display name for a new watch.

Return type:

tuple[str, list[dict[str, Any]]]

Returns:

(watch_id, entities) where entities is the current state of all newly subscribed entities.

async watch_unsub(watch_id, ids, *, close=False)[source]

Remove entities from a watch, or close the watch entirely.

Parameters:
  • watch_id (str) – Watch to modify.

  • ids (list[Ref]) – Entity Refs to remove. Ignored when close is True.

  • close (bool (default: False)) – If True, the entire watch is torn down.

Return type:

None

async watch_poll(watch_id, *, refresh=False)[source]

Poll a watch for changed entities.

Uses a transaction to atomically fetch and clear dirty flags, preventing lost notifications from concurrent writers.

Parameters:
  • watch_id (str) – Watch to poll.

  • refresh (bool (default: False)) – If True, return all watched entities regardless of dirty state.

Return type:

list[dict[str, Any]]

Returns:

List of entity dicts that have changed since the last poll.

async load_entities(entities)[source]

Bulk-upsert a list of entity dicts into the store.

Uses a staging table with COPY for fast bulk loading, then upserts into the main table. The id tag (a Ref) is extracted and used as the primary key. Entities without an id are skipped.

Parameters:

entities (list[dict[str, Any]]) – List of entity tag dicts.

Return type:

int

Returns:

Number of entities written.

async get_user(username)[source]

Return a user by username, or None if not found.

Return type:

User | None

Parameters:

username (str)

async list_users()[source]

Return all users.

Return type:

list[User]

async create_user(user)[source]

Persist a new user.

Raises:

ValueError – If a user with the same username already exists.

Return type:

None

Parameters:

user (User)

async update_user(username, **fields)[source]

Update fields on an existing user.

Raises:

KeyError – If the user does not exist.

Return type:

User

Parameters:
async delete_user(username)[source]

Delete a user by username.

Return type:

bool

Parameters:

username (str)

async hs_py.storage.timescale.create_timescale_pool(dsn='postgresql://localhost:5432/haystack', *, min_size=2, max_size=10, command_timeout=60.0, tls=None)[source]

Create an asyncpg connection pool for TimescaleDB.

Configures connection recycling, idle connection cleanup, and registers orjson-based JSONB codecs on each new connection.

Parameters:
  • dsn (default: 'postgresql://localhost:5432/haystack') – PostgreSQL DSN string.

  • min_size (default: 2) – Minimum number of pooled connections.

  • max_size (default: 10) – Maximum number of pooled connections.

  • command_timeout (default: 60.0) – Per-query timeout in seconds (default 60).

  • tls (default: None) – Optional TLS configuration for SSL connections.

Returns:

Open asyncpg.Pool ready for use.

Return type:

asyncpg.Pool[Any]