"""StorageAdapter and UserStore Protocols for Haystack server backends.
Defines the async interfaces that all storage backends must implement.
Concrete implementations include :class:`~hs_py.storage.memory.InMemoryAdapter`,
:class:`~hs_py.storage.redis.RedisAdapter`, and
:class:`~hs_py.storage.timescale.TimescaleAdapter`.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
if TYPE_CHECKING:
from hs_py.filter.ast import Node
from hs_py.kinds import Ref
from hs_py.user import User
__all__ = ["StorageAdapter", "UserStore"]
[docs]
@runtime_checkable
class StorageAdapter(Protocol):
"""Protocol for Haystack server storage backends.
All methods are async. Concrete backends must implement every method.
The ``start()`` / ``close()`` methods bracket the lifetime of the adapter
and are called by the server on startup and shutdown respectively.
Entity dicts use native Haystack kinds (``Ref``, ``Marker``, ``Number``,
etc.) — not wire-encoded JSON dicts. Callers are responsible for encoding
before sending to clients.
"""
[docs]
async def read_by_filter(
self,
ast: Node,
limit: int | None = None,
) -> list[dict[str, Any]]:
"""Return entities matching a filter AST.
:param ast: Compiled filter AST from :func:`~hs_py.filter.parse`.
:param limit: Maximum number of results to return. ``None`` means
no limit.
:returns: List of entity dicts (order unspecified).
"""
...
[docs]
async def read_by_ids(self, ids: list[Ref]) -> list[dict[str, Any] | None]:
"""Return entities for a list of Refs, preserving order.
:param ids: Ordered list of entity Refs to fetch.
:returns: List the same length as *ids*. Each entry is the entity
dict if found, or ``None`` if the Ref does not exist.
"""
...
[docs]
async def nav(self, nav_id: str | None = None) -> list[dict[str, Any]]:
"""Navigate the site/equip/point hierarchy.
:param nav_id: The ``Ref.val`` of the entity whose children should be
returned. Pass ``None`` to get root-level sites.
:returns: List of child entity dicts.
"""
...
[docs]
async def his_read(
self,
ref: Ref,
range_str: str | None = None,
) -> list[dict[str, Any]]:
"""Return time-series history for a point.
:param ref: Entity Ref of the point.
:param range_str: Optional range string (e.g. ``"today"``,
``"2024-01-01,2024-01-31"``). If ``None``, all data is returned.
:returns: List of dicts, each with ``"ts"`` (datetime) and ``"val"``
keys.
"""
...
[docs]
async def his_write(self, ref: Ref, items: list[dict[str, Any]]) -> None:
"""Append time-series data for a point.
:param ref: Entity Ref of the point.
:param items: List of dicts with ``"ts"`` (datetime) and ``"val"``
keys.
"""
...
[docs]
async def point_write(
self,
ref: Ref,
level: int,
val: Any,
who: str = "",
duration: Any = None,
) -> None:
"""Write a value to a writable point's priority array.
:param ref: Entity Ref of the writable point.
:param level: Priority level (1-17). Level 17 is the default.
:param val: Value to write. Pass ``None`` to clear the level.
:param who: Optional identifier of who is writing.
:param duration: Optional duration override (ignored by most backends).
"""
...
[docs]
async def point_read_array(self, ref: Ref) -> list[dict[str, Any]]:
"""Return the 17-level priority array for a writable point.
:param ref: Entity Ref of the writable point.
:returns: List of 17 dicts, each with a ``"level"`` key and an
optional ``"val"`` key (absent when the level is unset).
"""
...
[docs]
async def watch_sub(
self,
watch_id: str | None,
ids: list[Ref],
dis: str = "watch",
) -> tuple[str, list[dict[str, Any]]]:
"""Create or extend a watch subscription.
:param watch_id: Existing watch ID to extend, or ``None`` to create a
new watch.
:param ids: Entity Refs to add to the watch.
:param dis: Human-readable display name for a new watch.
:returns: ``(watch_id, entities)`` where *entities* is the current
state of all newly subscribed entities.
"""
...
[docs]
async def watch_unsub(
self,
watch_id: str,
ids: list[Ref],
*,
close: bool = False,
) -> None:
"""Remove entities from a watch, or close the watch entirely.
:param watch_id: Watch to modify.
:param ids: Entity Refs to remove. Ignored when *close* is ``True``.
:param close: If ``True``, the entire watch is torn down.
"""
...
[docs]
async def watch_poll(
self,
watch_id: str,
*,
refresh: bool = False,
) -> list[dict[str, Any]]:
"""Poll a watch for changed entities.
:param watch_id: Watch to poll.
:param refresh: If ``True``, return all watched entities (full
refresh) regardless of dirty state.
:returns: List of entity dicts that have changed since the last poll
(or all entities if *refresh* is ``True``). The dirty set is
cleared after each poll.
"""
...
[docs]
async def start(self) -> None:
"""Initialize the backend.
Called once before the server begins serving requests. May open
connections, create indexes, warm caches, etc.
"""
...
[docs]
async def close(self) -> None:
"""Tear down the backend.
Called when the server shuts down. Must release all resources
(connections, file handles, etc.).
"""
...
[docs]
@runtime_checkable
class UserStore(Protocol):
"""Protocol for user management backends.
All methods are async. Backends that implement both :class:`StorageAdapter`
and :class:`UserStore` can be used as a unified storage layer.
"""
[docs]
async def get_user(self, username: str) -> User | None:
"""Return a user by username, or ``None`` if not found.
:param username: The unique login identifier.
:returns: :class:`~hs_py.user.User` or ``None``.
"""
...
[docs]
async def list_users(self) -> list[User]:
"""Return all users.
:returns: List of :class:`~hs_py.user.User` instances.
"""
...
[docs]
async def create_user(self, user: User) -> None:
"""Persist a new user.
:param user: User to create.
:raises ValueError: If a user with the same username already exists.
"""
...
[docs]
async def update_user(self, username: str, **fields: Any) -> User:
"""Update fields on an existing user.
Accepts keyword arguments matching :class:`~hs_py.user.User` field
names. The ``credentials`` field can be set directly, or pass
``password`` (plaintext) to re-derive credentials.
:param username: Username of the user to update.
:param fields: Field names and new values.
:returns: The updated :class:`~hs_py.user.User`.
:raises KeyError: If the user does not exist.
"""
...
[docs]
async def delete_user(self, username: str) -> bool:
"""Delete a user by username.
:param username: Username to delete.
:returns: ``True`` if the user was deleted, ``False`` if not found.
"""
...