Source code for hs_py.watch
"""Watch state tracking with delta encoding and server-side filtering.
Provides :class:`WatchState` for server-side delta computation and
:class:`WatchAccumulator` for client-side delta merging. Both classes
track entity state per watch subscription to minimise push payload size.
Server-side filtering evaluates a Haystack filter expression against
entities before pushing, so clients only receive matching changes.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from hs_py.grid import Grid
from hs_py.kinds import MARKER, REMOVE, Ref
if TYPE_CHECKING:
from hs_py.filter.ast import Node
__all__ = [
"WatchAccumulator",
"WatchState",
]
[docs]
class WatchState:
"""Server-side entity state tracker for a single watch.
Maintains a cache of the last-sent entity state so that
:meth:`compute_delta` can emit only changed, new, or removed
tags on each push cycle.
:param watch_id: The watch identifier.
:param filter_ast: Optional parsed filter AST for server-side filtering.
"""
def __init__(self, watch_id: str, *, filter_ast: Node | None = None) -> None:
"""Initialise watch state.
:param watch_id: The watch identifier.
:param filter_ast: Optional parsed filter AST for server-side filtering.
"""
self.watch_id = watch_id
self.filter_ast = filter_ast
self._cache: dict[str, dict[str, Any]] = {}
[docs]
def compute_delta(self, current: Grid) -> Grid:
"""Compute a delta grid from the current full state.
- New entities (not in cache) are included in full.
- Changed tags are included with their new values.
- Removed tags are represented as :data:`~hs_py.kinds.REMOVE`.
- Entities in the cache but absent from *current* get an
``_removed`` marker row.
:param current: Full current state of watched entities.
:returns: Grid with only the changes since last push.
"""
current_ids: set[str] = set()
delta_rows: list[dict[str, Any]] = []
for row in current:
ref = row.get("id")
if not isinstance(ref, Ref):
continue
entity_id = ref.val
current_ids.add(entity_id)
cached = self._cache.get(entity_id)
if cached is None:
# New entity — send in full
delta_rows.append(dict(row))
else:
# Compute tag diff
diff: dict[str, Any] = {"id": ref}
for key, val in row.items():
if key == "id":
continue
if key not in cached or cached[key] != val:
diff[key] = val
for key in cached:
if key != "id" and key not in row:
diff[key] = REMOVE
if len(diff) > 1: # More than just "id"
delta_rows.append(diff)
# Entities removed from the watch
for entity_id in list(self._cache):
if entity_id not in current_ids:
delta_rows.append({"id": Ref(entity_id), "_removed": MARKER})
if not delta_rows:
return Grid.make_empty()
return Grid.make_rows(delta_rows)
[docs]
def update(self, current: Grid) -> None:
"""Update the cache with the current full entity state.
Call this after :meth:`compute_delta` to keep the cache in sync.
:param current: Full current state of watched entities.
"""
current_ids: set[str] = set()
for row in current:
ref = row.get("id")
if isinstance(ref, Ref):
current_ids.add(ref.val)
self._cache[ref.val] = dict(row)
# Remove entities no longer in the current set
for entity_id in list(self._cache):
if entity_id not in current_ids:
del self._cache[entity_id]
[docs]
def apply_filter(self, grid: Grid) -> Grid:
"""Filter a grid using the watch's filter expression.
If no filter is configured, returns the grid unchanged.
:param grid: Grid of entities to filter.
:returns: Filtered grid with only matching rows.
"""
if self.filter_ast is None:
return grid
from hs_py.filter.eval import evaluate
matching = [row for row in grid if evaluate(self.filter_ast, row)]
if not matching:
return Grid.make_empty()
return Grid.make_rows(matching)
[docs]
class WatchAccumulator:
"""Client-side state accumulator for delta watch pushes.
Merges incoming delta grids into a full entity state cache.
"""
def __init__(self) -> None:
"""Initialise an empty accumulator."""
self._entities: dict[str, dict[str, Any]] = {}
[docs]
def apply_delta(self, delta: Grid) -> None:
"""Merge a delta grid into the accumulated state.
- New entities are added.
- Changed tags are updated.
- Tags with :data:`~hs_py.kinds.REMOVE` value are deleted.
- Rows with ``_removed`` marker are removed entirely.
:param delta: Delta grid from the server.
"""
for row in delta:
ref = row.get("id")
if not isinstance(ref, Ref):
continue
entity_id = ref.val
if row.get("_removed") is MARKER:
self._entities.pop(entity_id, None)
continue
entity = self._entities.get(entity_id)
if entity is None:
entity = {}
self._entities[entity_id] = entity
for key, val in row.items():
if val is REMOVE:
entity.pop(key, None)
else:
entity[key] = val
@property
def entities(self) -> dict[str, dict[str, Any]]:
"""Return the current accumulated entity state."""
return self._entities
[docs]
def to_grid(self) -> Grid:
"""Return the accumulated state as a :class:`~hs_py.grid.Grid`."""
if not self._entities:
return Grid.make_empty()
return Grid.make_rows(list(self._entities.values()))
[docs]
def get(self, entity_id: str) -> dict[str, Any] | None:
"""Look up a single entity by ID.
:param entity_id: Entity identifier string.
:returns: Tag dict, or ``None`` if not present.
"""
return self._entities.get(entity_id)