Watch and Subscriptions

The watch module provides tools for real-time data subscriptions. WatchState tracks server-side entity state and computes deltas, while WatchAccumulator merges deltas on the client side.

See also

Types for the full watch API reference.

Concepts

In the Haystack protocol, a watch is a subscription to a set of entity records. The server tracks the current state and notifies clients when tags change. Instead of sending the full record every time, the server computes a delta — only the tags that have changed, been added, or been removed.

Term

Description

Watch

A named subscription to a set of entity records

Delta

A grid containing only changed tags since the last poll

REMOVE

Sentinel value indicating a tag was removed

_removed

Metadata flag indicating an entity was removed from the watch

Server-Side: WatchState

WatchState maintains the last-known state of watched entities and computes deltas:

from hs_py.watch import WatchState
from hs_py import Grid, Col
from hs_py.kinds import Ref, Number, MARKER, REMOVE

state = WatchState("w1")

# First update — everything is new, so the delta is the full record
current = Grid(
    cols=(Col("id", {}), Col("curVal", {}), Col("point", {})),
    rows=(
        {"id": Ref("p1"), "curVal": Number(72.5, "°F"), "point": MARKER},
        {"id": Ref("p2"), "curVal": Number(68.0, "°F"), "point": MARKER},
    ),
)
delta = state.compute_delta(current)
# delta contains both records in full (first time seen)

# Second update — only changes are emitted
current = Grid(
    cols=(Col("id", {}), Col("curVal", {}), Col("point", {})),
    rows=(
        {"id": Ref("p1"), "curVal": Number(73.0, "°F"), "point": MARKER},
        {"id": Ref("p2"), "curVal": Number(68.0, "°F"), "point": MARKER},
    ),
)
delta = state.compute_delta(current)
# delta contains only p1 with updated curVal
# p2 is omitted (no changes)

Tag Removal

When a tag is removed from an entity, the delta includes a REMOVE marker:

# Previously: {"id": Ref("p1"), "curVal": Number(73), "alarm": MARKER}
# Now:        {"id": Ref("p1"), "curVal": Number(73)}
# Delta:      {"id": Ref("p1"), "alarm": REMOVE}

Entity Removal

When an entity is no longer in the current set, it appears in the delta with a _removed flag:

# Entity p2 dropped from the watch
current = Grid(
    cols=(Col("id", {}), Col("curVal", {})),
    rows=({"id": Ref("p1"), "curVal": Number(73.0, "°F")},),
)
delta = state.compute_delta(current)
# delta includes: {"id": Ref("p2"), "_removed": MARKER}

Server-Side Filtering

Configure a watch with a filter expression, then apply it to the delta before sending to the client:

from hs_py.watch import WatchState
from hs_py.filter import parse

state = WatchState("w1", filter_ast=parse("curVal > 70"))

delta = state.compute_delta(current)
filtered = state.apply_filter(delta)
# Only includes rows where curVal > 70

Updating the Cache

After computing and sending a delta, call update() to sync the internal cache with the current state. This ensures the next compute_delta call produces correct diffs:

delta = state.compute_delta(current)
# ... send delta to client ...
state.update(current)  # Keep cache in sync

Client-Side: WatchAccumulator

WatchAccumulator merges incoming delta grids into a complete entity state on the client side:

from hs_py.watch import WatchAccumulator
from hs_py import Grid, Col
from hs_py.kinds import Ref, Number, MARKER, REMOVE

acc = WatchAccumulator()

# Apply the first delta (initial state)
delta = Grid(
    cols=(Col("id", {}), Col("curVal", {}), Col("point", {})),
    rows=(
        {"id": Ref("p1"), "curVal": Number(72.5, "°F"), "point": MARKER},
        {"id": Ref("p2"), "curVal": Number(68.0, "°F"), "point": MARKER},
    ),
)
acc.apply_delta(delta)

# Current state
print(acc.get("p1"))
# {"id": Ref("p1"), "curVal": Number(72.5, "°F"), "point": MARKER}

# Apply an update delta
update = Grid(
    cols=(Col("id", {}), Col("curVal", {})),
    rows=({"id": Ref("p1"), "curVal": Number(73.0, "°F")},),
)
acc.apply_delta(update)

# State is merged — unchanged tags preserved
print(acc.get("p1"))
# {"id": Ref("p1"), "curVal": Number(73.0, "°F"), "point": MARKER}

# Apply a removal delta
removal = Grid(
    cols=(Col("id", {}), Col("alarm", {})),
    rows=({"id": Ref("p1"), "alarm": REMOVE},),
)
acc.apply_delta(removal)
# "alarm" tag is removed from p1's state

# Entity removal
removed = Grid(
    cols=(Col("id", {}), Col("_removed", {})),
    rows=({"id": Ref("p2"), "_removed": MARKER},),
)
acc.apply_delta(removed)
assert acc.get("p2") is None

Accessing Accumulated State

Beyond get() for individual lookups, the accumulator provides access to all entities and can export the full state as a grid:

# All entity IDs and their current tag dicts
for entity_id, tags in acc.entities.items():
    print(f"{entity_id}: {tags.get('dis')}")

# Export the accumulated state as a Grid
grid = acc.to_grid()
print(f"Tracking {len(grid)} entities")

HTTP Watch Workflow

Complete watch lifecycle using the HTTP client:

from hs_py import Client

async with Client("http://host/api", "user", "pass") as c:
    # 1. Subscribe to entities (raw=True to access grid metadata)
    watch = await c.watch_sub(
        [Ref("p:demo:r:1"), Ref("p:demo:r:2")],
        watch_dis="My Watch",
        raw=True,
    )
    watch_id = watch.meta["watchId"]

    # 2. Poll for changes
    changes = await c.watch_poll(watch_id)
    for row in changes:
        print(f"{row['id']}: {row.get('curVal')}")

    # 3. Remove entities
    await c.watch_unsub(watch_id, [Ref("p:demo:r:2")])

    # 4. Close the watch
    await c.watch_close(watch_id)

WebSocket Watch Workflow

With WebSocket, watch updates can be polled or handled via application logic:

from hs_py import Grid
from hs_py.ws_client import WebSocketClient
from hs_py.watch import WatchAccumulator

acc = WatchAccumulator()

async with WebSocketClient("ws://host/api/ws", auth_token="token") as ws:
    watch = await ws.watch_sub(
        [Ref("p1"), Ref("p2")], watch_dis="WS Watch", raw=True,
    )
    watch_id = watch.meta["watchId"]

    # Poll for changes
    delta = await ws.watch_poll(watch_id)
    acc.apply_delta(delta)
    print(f"State updated: {acc.get('p1')}")