WebSocket Transport

Warning

The WebSocket transport API is experimental and subject to breaking changes in future releases.

haystack-py includes a full WebSocket transport layer for persistent, bidirectional communication. The WebSocket client mirrors the HTTP Client API, with added support for server-push notifications, binary frames, channel multiplexing, and automatic reconnection.

See also

WebSocket for the WebSocket API reference, WebSocket for binary frame codec details.

Architecture

The WebSocket stack is split into four layers:

Module

Layer

Responsibility

hs_py.ws

Sans-I/O

Protocol logic, frame I/O, heartbeat, TLS

hs_py.ws_client

Client

Request/response, watches, reconnection, channel multiplexing

hs_py.ws_server

Server

Dispatch, auth, push distribution, connection limits

hs_py.ws_codec

Codec

Binary frame encoding/decoding

WebSocket Client

Basic Usage

WebSocketClient connects to a Haystack WebSocket endpoint and provides the same operations as the HTTP client:

from hs_py.ws_client import WebSocketClient
from hs_py.kinds import Ref

async with WebSocketClient("ws://host/api/ws", auth_token="token") as ws:
    about = await ws.about()
    points = await ws.read("point and sensor")
    his = await ws.his_read(Ref("p1"), "yesterday")

All standard operations are available: about, ops, formats, read, read_by_ids, nav, his_read, his_write, point_write, point_write_array, watch_sub, watch_unsub, watch_poll, invoke_action, and close.

Client Options

Parameter

Type

Description

url

str

WebSocket endpoint URL (ws:// or wss://)

auth_token

str

Bearer token sent on connect

tls

TLSConfig

TLS configuration for wss:// connections

timeout

float

Per-request timeout in seconds (default: 30)

heartbeat

float

Ping interval in seconds (default: 30, 0 to disable)

metrics

MetricsHooks

Observability callbacks

binary

bool

Enable binary frame encoding (default: False)

compression

bool

Enable permessage-deflate compression (default: False)

binary_compression

int | None

Codec-level compression for binary frames: COMP_ZLIB or COMP_LZMA (default: None = disabled)

chunked

bool

Enable chunked transfer for large binary payloads (default: False)

Watch Subscriptions

The WebSocket client supports watch subscriptions for real-time updates. See Watch and Subscriptions for delta encoding and the WatchAccumulator.

from hs_py.ws_client import WebSocketClient
from hs_py.kinds import Ref

async with WebSocketClient("ws://host/api/ws", auth_token="token") as ws:
    # Use raw=True to access grid metadata (e.g. watchId)
    watch = await ws.watch_sub(
        [Ref("p1"), Ref("p2")], watch_dis="My Watch", raw=True,
    )
    watch_id = watch.meta["watchId"]

    # Subscribe with a server-side filter
    watch = await ws.watch_sub(
        [Ref("p1"), Ref("p2")], watch_dis="Filtered",
        filter="curVal > 70", raw=True,
    )

    # Poll for changes
    delta = await ws.watch_poll(watch_id)
    for row in delta:
        print(f"  {row['id']}: curVal={row.get('curVal')}")

Watch Push Callbacks

Register a callback to receive server-initiated watch push messages. The callback receives the watch ID and the delta grid:

from hs_py.ws_client import WebSocketClient
from hs_py import Grid
from hs_py.watch import WatchAccumulator
from hs_py.kinds import Ref

acc = WatchAccumulator()

def handle_push(watch_id: str, grid: Grid) -> None:
    acc.apply_delta(grid)
    print(f"Watch {watch_id}: {len(grid)} rows changed")

async with WebSocketClient("ws://host/api/ws", auth_token="token") as ws:
    ws.on_watch_push(handle_push)
    await ws.watch_sub([Ref("p1"), Ref("p2")], watch_dis="My Watch")
    # Push updates arrive via the callback while the connection is open
    await asyncio.sleep(60)

The callback is also preserved across reconnections when using ReconnectingWebSocketClient:

from hs_py.ws_client import ReconnectingWebSocketClient

client = ReconnectingWebSocketClient("ws://host/api/ws", auth_token="token")
client.on_watch_push(handle_push)
await client.start()

Batch Operations

Send multiple operations in a single WebSocket message for reduced round-trips:

from hs_py import GridBuilder

read_grid = GridBuilder().add_col("filter").add_row(
    {"filter": "point and sensor"}
).to_grid()
about_grid = GridBuilder().to_grid()

results = await ws.batch(("read", read_grid), ("about", about_grid))
# results is a list of Grids, one per operation

Binary Frames

Binary frame mode replaces JSON envelopes with a compact 4-byte header, reducing overhead for high-frequency operations. See WebSocket for the codec API.

async with WebSocketClient(
    "ws://host/api/ws", auth_token="token",
    binary=True,
) as ws:
    # Same API — binary encoding is transparent
    about = await ws.about()
    points = await ws.read("point")

Binary frame header format:

Byte 0: Flags
  bit 0 = FLAG_RESPONSE (0x01)
  bit 1 = FLAG_ERROR    (0x02)
  bit 2 = FLAG_PUSH     (0x04)
  bit 3 = FLAG_COMPRESSED (0x08)  — v2 codec-level compression
  bit 4 = FLAG_CHUNKED    (0x10)  — v2 chunked transfer
Bytes 1-2: Request ID (uint16, big-endian)
Byte 3: Operation code (uint8)

IF compressed (bit 3 set):
  Byte 4: Algorithm (0=zlib, 1=lzma)

IF chunked (bit 4 set):
  Next 2 bytes: Chunk index (uint16, big-endian)
  Next 2 bytes: Total chunks (uint16, big-endian)

Operation codes: about=1, ops=2, formats=3, close=4, read=10, nav=11, hisRead=12, hisWrite=13, pointWrite=14, watchSub=15, watchUnsub=16, watchPoll=17, invokeAction=18.

Compression

haystack-py offers two levels of WebSocket compression:

Transport-level (permessage-deflate) — compresses all frames at the WebSocket protocol layer using zlib. Good for text-mode JSON payloads:

async with WebSocketClient(
    "ws://host/api/ws", auth_token="token",
    compression=True,
) as ws:
    points = await ws.read("point")

Codec-level (v2 binary frames) — compresses individual payloads at the application layer with per-payload algorithm selection. Requires binary=True. Supports zlib (fast) and LZMA (high ratio):

from hs_py.ws_codec import COMP_ZLIB, COMP_LZMA

async with WebSocketClient(
    "ws://host/api/ws", auth_token="token",
    binary=True,
    binary_compression=COMP_ZLIB,  # or COMP_LZMA
) as ws:
    # Payloads > 1 KB are automatically compressed
    points = await ws.read("point")

Codec compression benchmarks on a 1.8 MB Haystack grid (5,000 rows):

Algorithm

Compressed

Compress

Decompress

zlib (level 1)

60 KB (97%)

2.5 ms

0.3 ms

LZMA (raw, preset 0)

13 KB (99.3%)

29 ms

1.8 ms

Chunked Transfer

For very large payloads, enable chunked transfer to split responses into independently-compressed 256 KB chunks. This avoids blocking the WebSocket connection with multi-megabyte single frames:

from hs_py.ws_codec import COMP_ZLIB

# Client
async with WebSocketClient(
    "ws://host/api/ws", auth_token="token",
    binary=True,
    binary_compression=COMP_ZLIB,
    chunked=True,
) as ws:
    # Large hisRead responses are automatically chunked
    history = await ws.his_read(Ref("p1"), "lastMonth")

# Server
server = WebSocketServer(
    ops, host="0.0.0.0", port=8080,
    binary=True,
    binary_compression=COMP_ZLIB,
    chunked=True,
)

Each chunk carries a sequence number and total count. The receiving side reassembles chunks transparently via ChunkAssembler.

Reconnecting Client

ReconnectingWebSocketClient automatically reconnects with exponential backoff when the connection drops:

from hs_py.ws_client import ReconnectingWebSocketClient

client = ReconnectingWebSocketClient(
    "ws://host/api/ws", auth_token="token",
    min_reconnect_delay=1.0,
    max_reconnect_delay=60.0,
)
await client.start()
try:
    about = await client.about()
finally:
    await client.stop()

Parameters:

  • min_reconnect_delay — Initial delay in seconds (default: 1.0).

  • max_reconnect_delay — Maximum delay cap in seconds (default: 60.0).

  • on_connect — Async callback invoked after each (re)connection.

  • on_disconnect — Async callback invoked when the connection drops.

The delay doubles after each failed attempt, capped at max_reconnect_delay.

Channel Multiplexing

WebSocketPool multiplexes multiple logical channels over a single WebSocket connection. Each channel is identified by a string name included in the JSON envelope as the ch field.

from hs_py.ws_client import WebSocketPool

async with WebSocketPool("ws://host/api/ws", auth_token="token") as pool:
    ch1 = pool.channel("tenant-1")
    ch2 = pool.channel("tenant-2")

    # Each channel's requests are scoped independently
    about1 = await ch1.about()
    about2 = await ch2.about()

Channel Client

ChannelClient scopes requests to a named channel within a pool, useful for multi-tenant or multi-context scenarios:

from hs_py.ws_client import WebSocketPool, ChannelClient

async with WebSocketPool("ws://host/api/ws", auth_token="token") as pool:
    ch1 = ChannelClient(pool, channel="building-a")
    ch2 = ChannelClient(pool, channel="building-b")

    # Each channel's requests are scoped independently
    a_points = await ch1.read("point")
    b_points = await ch2.read("point")

WebSocket Server

WebSocketServer is a standalone WebSocket server that dispatches messages to your HaystackOps implementation:

from hs_py.ws_server import WebSocketServer

ops = MyOps()
server = WebSocketServer(ops, host="0.0.0.0", port=8080)
await server.start()
# ... server is running ...
await server.stop()

Features:

  • JSON envelope dispatch — routes op field to the matching handler

  • Binary frame support — decodes binary frames and responds in kind

  • Codec compression — zlib/LZMA compression at the binary frame level

  • Chunked transfer — splits large responses into sequenced chunks

  • Batch requests — processes batch messages as parallel operations

  • Watch push — distributes watch updates to connected clients

  • Certificate auth — extracts client CN from TLS for mTLS authentication

  • Token auth — validates bearer tokens via auth_token parameter

Pushing Watch Updates

The server can push watch updates to all connected WebSocket clients:

from hs_py import Grid, Col, Ref, Number

# Build the update grid
update = Grid(
    cols=(Col("id", {}), Col("curVal", {})),
    rows=({"id": Ref("p1"), "curVal": Number(73.5, "°F")},),
)

# Push to all connected clients for a specific watch
await server.push_watch("w1", update)