WebSocket Transport¶
Warning
The WebSocket transport API is experimental and subject to breaking changes in future releases.
haystack-py includes a full WebSocket transport layer for persistent, bidirectional
communication. The WebSocket client mirrors the HTTP
Client API, with added support for server-push
notifications, binary frames, channel multiplexing, and automatic reconnection.
Architecture¶
The WebSocket stack is split into four layers:
Module |
Layer |
Responsibility |
|---|---|---|
Sans-I/O |
Protocol logic, frame I/O, heartbeat, TLS |
|
Client |
Request/response, watches, reconnection, channel multiplexing |
|
Server |
Dispatch, auth, push distribution, connection limits |
|
Codec |
Binary frame encoding/decoding |
WebSocket Client¶
Basic Usage¶
WebSocketClient connects to a Haystack WebSocket
endpoint and provides the same operations as the HTTP client:
from hs_py.ws_client import WebSocketClient
from hs_py.kinds import Ref
async with WebSocketClient("ws://host/api/ws", auth_token="token") as ws:
about = await ws.about()
points = await ws.read("point and sensor")
his = await ws.his_read(Ref("p1"), "yesterday")
All standard operations are available: about, ops, formats,
read, read_by_ids, nav, his_read, his_write,
point_write, point_write_array, watch_sub, watch_unsub,
watch_poll, invoke_action, and close.
Client Options¶
Parameter |
Type |
Description |
|---|---|---|
|
|
WebSocket endpoint URL ( |
|
|
Bearer token sent on connect |
|
TLS configuration for |
|
|
|
Per-request timeout in seconds (default: 30) |
|
|
Ping interval in seconds (default: 30, 0 to disable) |
|
Observability callbacks |
|
|
|
Enable binary frame encoding (default: |
|
|
Enable permessage-deflate compression (default: |
|
|
Codec-level compression for binary frames: |
|
|
Enable chunked transfer for large binary payloads (default: |
Watch Subscriptions¶
The WebSocket client supports watch subscriptions for real-time updates.
See Watch and Subscriptions for delta encoding and the
WatchAccumulator.
from hs_py.ws_client import WebSocketClient
from hs_py.kinds import Ref
async with WebSocketClient("ws://host/api/ws", auth_token="token") as ws:
# Use raw=True to access grid metadata (e.g. watchId)
watch = await ws.watch_sub(
[Ref("p1"), Ref("p2")], watch_dis="My Watch", raw=True,
)
watch_id = watch.meta["watchId"]
# Subscribe with a server-side filter
watch = await ws.watch_sub(
[Ref("p1"), Ref("p2")], watch_dis="Filtered",
filter="curVal > 70", raw=True,
)
# Poll for changes
delta = await ws.watch_poll(watch_id)
for row in delta:
print(f" {row['id']}: curVal={row.get('curVal')}")
Watch Push Callbacks¶
Register a callback to receive server-initiated watch push messages. The callback receives the watch ID and the delta grid:
from hs_py.ws_client import WebSocketClient
from hs_py import Grid
from hs_py.watch import WatchAccumulator
from hs_py.kinds import Ref
acc = WatchAccumulator()
def handle_push(watch_id: str, grid: Grid) -> None:
acc.apply_delta(grid)
print(f"Watch {watch_id}: {len(grid)} rows changed")
async with WebSocketClient("ws://host/api/ws", auth_token="token") as ws:
ws.on_watch_push(handle_push)
await ws.watch_sub([Ref("p1"), Ref("p2")], watch_dis="My Watch")
# Push updates arrive via the callback while the connection is open
await asyncio.sleep(60)
The callback is also preserved across reconnections when using
ReconnectingWebSocketClient:
from hs_py.ws_client import ReconnectingWebSocketClient
client = ReconnectingWebSocketClient("ws://host/api/ws", auth_token="token")
client.on_watch_push(handle_push)
await client.start()
Batch Operations¶
Send multiple operations in a single WebSocket message for reduced round-trips:
from hs_py import GridBuilder
read_grid = GridBuilder().add_col("filter").add_row(
{"filter": "point and sensor"}
).to_grid()
about_grid = GridBuilder().to_grid()
results = await ws.batch(("read", read_grid), ("about", about_grid))
# results is a list of Grids, one per operation
Binary Frames¶
Binary frame mode replaces JSON envelopes with a compact 4-byte header, reducing overhead for high-frequency operations. See WebSocket for the codec API.
async with WebSocketClient(
"ws://host/api/ws", auth_token="token",
binary=True,
) as ws:
# Same API — binary encoding is transparent
about = await ws.about()
points = await ws.read("point")
Binary frame header format:
Byte 0: Flags
bit 0 = FLAG_RESPONSE (0x01)
bit 1 = FLAG_ERROR (0x02)
bit 2 = FLAG_PUSH (0x04)
bit 3 = FLAG_COMPRESSED (0x08) — v2 codec-level compression
bit 4 = FLAG_CHUNKED (0x10) — v2 chunked transfer
Bytes 1-2: Request ID (uint16, big-endian)
Byte 3: Operation code (uint8)
IF compressed (bit 3 set):
Byte 4: Algorithm (0=zlib, 1=lzma)
IF chunked (bit 4 set):
Next 2 bytes: Chunk index (uint16, big-endian)
Next 2 bytes: Total chunks (uint16, big-endian)
Operation codes: about=1, ops=2, formats=3, close=4,
read=10, nav=11, hisRead=12, hisWrite=13, pointWrite=14,
watchSub=15, watchUnsub=16, watchPoll=17, invokeAction=18.
Compression¶
haystack-py offers two levels of WebSocket compression:
Transport-level (permessage-deflate) — compresses all frames at the WebSocket protocol layer using zlib. Good for text-mode JSON payloads:
async with WebSocketClient(
"ws://host/api/ws", auth_token="token",
compression=True,
) as ws:
points = await ws.read("point")
Codec-level (v2 binary frames) — compresses individual payloads at the
application layer with per-payload algorithm selection. Requires binary=True.
Supports zlib (fast) and LZMA (high ratio):
from hs_py.ws_codec import COMP_ZLIB, COMP_LZMA
async with WebSocketClient(
"ws://host/api/ws", auth_token="token",
binary=True,
binary_compression=COMP_ZLIB, # or COMP_LZMA
) as ws:
# Payloads > 1 KB are automatically compressed
points = await ws.read("point")
Codec compression benchmarks on a 1.8 MB Haystack grid (5,000 rows):
Algorithm |
Compressed |
Compress |
Decompress |
|---|---|---|---|
zlib (level 1) |
60 KB (97%) |
2.5 ms |
0.3 ms |
LZMA (raw, preset 0) |
13 KB (99.3%) |
29 ms |
1.8 ms |
Chunked Transfer¶
For very large payloads, enable chunked transfer to split responses into independently-compressed 256 KB chunks. This avoids blocking the WebSocket connection with multi-megabyte single frames:
from hs_py.ws_codec import COMP_ZLIB
# Client
async with WebSocketClient(
"ws://host/api/ws", auth_token="token",
binary=True,
binary_compression=COMP_ZLIB,
chunked=True,
) as ws:
# Large hisRead responses are automatically chunked
history = await ws.his_read(Ref("p1"), "lastMonth")
# Server
server = WebSocketServer(
ops, host="0.0.0.0", port=8080,
binary=True,
binary_compression=COMP_ZLIB,
chunked=True,
)
Each chunk carries a sequence number and total count. The receiving side
reassembles chunks transparently via ChunkAssembler.
Reconnecting Client¶
ReconnectingWebSocketClient automatically
reconnects with exponential backoff when the connection drops:
from hs_py.ws_client import ReconnectingWebSocketClient
client = ReconnectingWebSocketClient(
"ws://host/api/ws", auth_token="token",
min_reconnect_delay=1.0,
max_reconnect_delay=60.0,
)
await client.start()
try:
about = await client.about()
finally:
await client.stop()
Parameters:
min_reconnect_delay— Initial delay in seconds (default: 1.0).max_reconnect_delay— Maximum delay cap in seconds (default: 60.0).on_connect— Async callback invoked after each (re)connection.on_disconnect— Async callback invoked when the connection drops.
The delay doubles after each failed attempt, capped at max_reconnect_delay.
Channel Multiplexing¶
WebSocketPool multiplexes multiple logical channels
over a single WebSocket connection. Each channel is identified by a string
name included in the JSON envelope as the ch field.
from hs_py.ws_client import WebSocketPool
async with WebSocketPool("ws://host/api/ws", auth_token="token") as pool:
ch1 = pool.channel("tenant-1")
ch2 = pool.channel("tenant-2")
# Each channel's requests are scoped independently
about1 = await ch1.about()
about2 = await ch2.about()
Channel Client¶
ChannelClient scopes requests to a named
channel within a pool, useful for multi-tenant or multi-context scenarios:
from hs_py.ws_client import WebSocketPool, ChannelClient
async with WebSocketPool("ws://host/api/ws", auth_token="token") as pool:
ch1 = ChannelClient(pool, channel="building-a")
ch2 = ChannelClient(pool, channel="building-b")
# Each channel's requests are scoped independently
a_points = await ch1.read("point")
b_points = await ch2.read("point")
WebSocket Server¶
WebSocketServer is a standalone WebSocket server
that dispatches messages to your HaystackOps
implementation:
from hs_py.ws_server import WebSocketServer
ops = MyOps()
server = WebSocketServer(ops, host="0.0.0.0", port=8080)
await server.start()
# ... server is running ...
await server.stop()
Features:
JSON envelope dispatch — routes
opfield to the matching handlerBinary frame support — decodes binary frames and responds in kind
Codec compression — zlib/LZMA compression at the binary frame level
Chunked transfer — splits large responses into sequenced chunks
Batch requests — processes
batchmessages as parallel operationsWatch push — distributes watch updates to connected clients
Certificate auth — extracts client CN from TLS for mTLS authentication
Token auth — validates bearer tokens via
auth_tokenparameter
Pushing Watch Updates¶
The server can push watch updates to all connected WebSocket clients:
from hs_py import Grid, Col, Ref, Number
# Build the update grid
update = Grid(
cols=(Col("id", {}), Col("curVal", {})),
rows=({"id": Ref("p1"), "curVal": Number(73.5, "°F")},),
)
# Push to all connected clients for a specific watch
await server.push_watch("w1", update)