WebSocket

Warning

The WebSocket transport API is experimental and subject to breaking changes in future releases.

WebSocket-based Haystack transport with sans-I/O protocol, async client, and server implementations.

Sans-I/O Protocol

Core WebSocket protocol logic, independent of any async framework.

Sans-I/O WebSocket wrapper for Haystack.

Uses the websockets library’s sans-I/O protocol objects together with asyncio TCP/TLS streams. Each HaystackWebSocket instance owns one WebSocket connection backed by a (StreamReader, StreamWriter) pair.

class hs_py.ws.HaystackWebSocket(reader, writer, protocol)[source]

Bases: object

Async WebSocket connection using the websockets sans-I/O protocol.

Use connect() to initiate a client connection or accept() to accept a server-side connection. Both return a ready-to-use instance.

Parameters:
  • reader (StreamReader)

  • writer (StreamWriter)

  • protocol (ClientProtocol | ServerProtocol)

async classmethod connect(uri, ssl_ctx=None, *, subprotocol='haystack', handshake_timeout=10.0, max_size=None, compression=False)[source]

Initiate a WebSocket client connection.

Parameters:
  • uri (str) – WebSocket URI (wss://host:port/path).

  • ssl_ctx (SSLContext | None (default: None)) – TLS context, or None for plaintext ws://.

  • subprotocol (str (default: 'haystack')) – WebSocket subprotocol to negotiate.

  • handshake_timeout (float (default: 10.0)) – Maximum seconds for the handshake.

  • max_size (int | None (default: None)) – Maximum WebSocket message size.

  • compression (bool (default: False)) – Enable per-message deflate compression.

Return type:

HaystackWebSocket

Returns:

Connected HaystackWebSocket instance.

async classmethod accept(reader, writer, *, subprotocol='haystack', handshake_timeout=10.0, max_size=None, compression=False)[source]

Accept an inbound WebSocket connection on existing streams.

Parameters:
  • reader (StreamReader) – asyncio StreamReader from the accepted TCP connection.

  • writer (StreamWriter) – asyncio StreamWriter from the accepted TCP connection.

  • subprotocol (str (default: 'haystack')) – WebSocket subprotocol to accept.

  • handshake_timeout (float (default: 10.0)) – Maximum seconds for the handshake.

  • max_size (int | None (default: None)) – Maximum WebSocket message size.

  • compression (bool (default: False)) – Enable per-message deflate compression.

Return type:

HaystackWebSocket

Returns:

Accepted HaystackWebSocket instance.

async send_text(text)[source]

Send a text WebSocket frame.

Parameters:

text (str) – Text payload to send.

Return type:

None

async send_text_preencoded(data)[source]

Send pre-encoded UTF-8 bytes as a text WebSocket frame.

Avoids the str encode bytes roundtrip when the caller already holds a UTF-8 byte string (e.g. from orjson.dumps()).

Parameters:

data (bytes) – UTF-8 encoded payload bytes.

Return type:

None

async send_bytes(data)[source]

Send a binary WebSocket frame.

Parameters:

data (bytes) – Binary payload to send.

Return type:

None

async ping(data=b'')[source]

Send a WebSocket ping frame.

Return type:

None

Parameters:

data (bytes)

async recv()[source]

Receive the next WebSocket message payload.

Return type:

str | bytes

Returns:

str for text frames, bytes for binary frames.

Raises:
  • ConnectionClosedOK – On graceful close.

  • ConnectionClosedError – On abnormal close.

async close(code=1000, reason='')[source]

Initiate a graceful WebSocket close.

Parameters:
  • code (int (default: 1000)) – WebSocket close code (default 1000 = normal).

  • reason (str (default: '')) – Optional close reason string.

Return type:

None

property is_open: bool

True if the WebSocket connection appears open.

property subprotocol: str | None

Return the negotiated WebSocket subprotocol.

async hs_py.ws.cancel_task(task)[source]

Cancel an asyncio task and suppress CancelledError.

Parameters:

task (Task[object] | None) – Task to cancel, or None (no-op).

Return type:

None

async hs_py.ws.heartbeat_loop(ws, interval)[source]

Periodically send WebSocket pings to keep a connection alive.

Parameters:
Return type:

None

Client

Async WebSocket client, reconnecting client, connection pool, and channel multiplexer.

Async Haystack WebSocket client.

Provides the same operation API as Client over a persistent WebSocket connection. Uses JSON-encoded request/response envelopes with correlation IDs for concurrent request support.

Message format (client → server):

{"id": "1", "op": "read", "grid": {...}}

Message format (server → client):

{"id": "1", "grid": {...}}

Server-initiated push:

{"type": "watch", "watchId": "w-1", "grid": {...}}
class hs_py.ws_client.ChannelClient(pool, channel, *, pythonic=True)[source]

Bases: object

Virtual client scoped to a single channel within a WebSocketPool.

Each channel has its own request ID space and pending futures. The channel name is included in every JSON envelope as the ch field.

Parameters:
async about(*, raw=False)[source]

Query server information.

Return type:

Grid | list[dict[str, Any]]

Parameters:

raw (bool)

async ops(*, raw=False)[source]

Query available operations.

Return type:

Grid | list[dict[str, Any]]

Parameters:

raw (bool)

async read(filter, limit=None, *, raw=False)[source]

Read entities matching a filter expression.

Return type:

Grid | list[dict[str, Any]]

Parameters:
async read_by_ids(ids, *, raw=False)[source]

Read entities by their identifiers.

Return type:

Grid | list[dict[str, Any]]

Parameters:
class hs_py.ws_client.ReconnectingWebSocketClient(url, *, username='', password='', auth_token='', tls=None, timeout=30.0, heartbeat=30.0, min_reconnect_delay=1.0, max_reconnect_delay=60.0, on_connect=None, on_disconnect=None, metrics=None, compression=False, binary=False, pythonic=True, binary_compression=None, chunked=False)[source]

Bases: object

WebSocket client with automatic reconnection and exponential backoff.

Wraps WebSocketClient and manages the connection lifecycle. On disconnect, reconnects with exponential backoff and re-registers watch callbacks.

Usage:

client = ReconnectingWebSocketClient("ws://host:8080/api/ws")
await client.start()
try:
    rows = await client.about()
finally:
    await client.stop()
Parameters:
  • url (str)

  • username (str)

  • password (str)

  • auth_token (str)

  • tls (TLSConfig | None)

  • timeout (float)

  • heartbeat (float)

  • min_reconnect_delay (float)

  • max_reconnect_delay (float)

  • on_connect (Callable[[], Awaitable[None]] | None)

  • on_disconnect (Callable[[], Awaitable[None]] | None)

  • metrics (MetricsHooks | None)

  • compression (bool)

  • binary (bool)

  • pythonic (bool)

  • binary_compression (int | None)

  • chunked (bool)

async start()[source]

Start the connection loop in the background.

Return type:

None

async stop()[source]

Stop reconnection and close the connection.

Return type:

None

on_watch_push(callback)[source]

Register a watch push callback, preserved across reconnections.

Return type:

None

Parameters:

callback (Callable[[str, Grid], Any])

async about(*, raw=False)[source]

Query server information.

Return type:

Grid | list[dict[str, Any]]

Parameters:

raw (bool)

async ops(*, raw=False)[source]

Query available operations.

Return type:

Grid | list[dict[str, Any]]

Parameters:

raw (bool)

async formats(*, raw=False)[source]

Query supported data formats.

Return type:

Grid | list[dict[str, Any]]

Parameters:

raw (bool)

async read(filter, limit=None, *, raw=False)[source]

Read entities matching a filter expression.

Return type:

Grid | list[dict[str, Any]]

Parameters:
async read_by_ids(ids, *, raw=False)[source]

Read entities by their identifiers.

Return type:

Grid | list[dict[str, Any]]

Parameters:
async nav(nav_id=None, *, raw=False)[source]

Navigate the entity tree.

Return type:

Grid | list[dict[str, Any]]

Parameters:
async his_read(id, range, *, raw=False)[source]

Read time-series data for a single point.

Return type:

Grid | list[dict[str, Any]]

Parameters:
async his_write(id, items)[source]

Write time-series data to a single point.

Return type:

None

Parameters:
async watch_sub(ids, watch_dis, lease=None, *, filter=None, raw=False)[source]

Create a new watch or add entities to an existing one.

Return type:

Grid | list[dict[str, Any]]

Parameters:
async watch_unsub(watch_id, ids)[source]

Remove entities from a watch.

Return type:

None

Parameters:
async watch_poll(watch_id, refresh=False, *, raw=False)[source]

Poll a watch for changes.

Return type:

Grid | list[dict[str, Any]]

Parameters:
async invoke_action(id, action, args=None, *, raw=False)[source]

Invoke an action on an entity.

Return type:

Grid | list[dict[str, Any]]

Parameters:
async batch(*calls)[source]

Send multiple operations in a single WebSocket frame.

Return type:

list[Grid]

Parameters:

calls (tuple[str, Grid])

async his_read_batch(ids, range, *, raw=False)[source]

Read time-series data for multiple points.

Return type:

Grid | list[dict[str, Any]]

Parameters:
async his_write_batch(grid)[source]

Write time-series data for multiple points.

Return type:

None

Parameters:

grid (Grid)

async point_write_array(id, *, raw=False)[source]

Read the priority array of a writable point.

Return type:

Grid | list[dict[str, Any]]

Parameters:
async point_write(id, level, val, who='', duration=None)[source]

Write to a priority array level.

Return type:

None

Parameters:
async watch_close(watch_id)[source]

Close a watch entirely.

Return type:

None

Parameters:

watch_id (str)

async close()[source]

Close the connection (alias for stop()).

Return type:

None

class hs_py.ws_client.WebSocketClient(url, *, username='', password='', auth_token='', tls=None, timeout=30.0, heartbeat=30.0, metrics=None, compression=False, binary=False, pythonic=True, binary_compression=None, chunked=False)[source]

Bases: object

Async Haystack WebSocket client.

Mirrors the Client API over a persistent WebSocket connection.

Usage:

async with WebSocketClient("ws://host:8080/api/ws") as c:
    about = await c.about()  # returns list[dict] by default
    points = await c.read("point and sensor")
    raw_grid = await c.about(raw=True)  # returns Grid
Parameters:
async close()[source]

Close the WebSocket connection.

Return type:

None

async about(*, raw=False)[source]

Query server information.

Return type:

Grid | list[dict[str, Any]]

Parameters:

raw (bool)

async ops(*, raw=False)[source]

Query available operations.

Return type:

Grid | list[dict[str, Any]]

Parameters:

raw (bool)

async formats(*, raw=False)[source]

Query supported data formats.

Return type:

Grid | list[dict[str, Any]]

Parameters:

raw (bool)

async read(filter, limit=None, *, raw=False)[source]

Read entities matching a filter expression.

Return type:

Grid | list[dict[str, Any]]

Parameters:
async read_by_ids(ids, *, raw=False)[source]

Read entities by their identifiers.

Return type:

Grid | list[dict[str, Any]]

Parameters:
async nav(nav_id=None, *, raw=False)[source]

Navigate the entity tree.

Return type:

Grid | list[dict[str, Any]]

Parameters:
async his_read(id, range, *, raw=False)[source]

Read time-series data for a single point.

Return type:

Grid | list[dict[str, Any]]

Parameters:
async his_read_batch(ids, range, *, raw=False)[source]

Read time-series data for multiple points.

Return type:

Grid | list[dict[str, Any]]

Parameters:
async his_write(id, items)[source]

Write time-series data to a single point.

Return type:

None

Parameters:
async his_write_batch(grid)[source]

Write time-series data for multiple points.

Return type:

None

Parameters:

grid (Grid)

async point_write_array(id, *, raw=False)[source]

Read the priority array of a writable point.

Return type:

Grid | list[dict[str, Any]]

Parameters:
async point_write(id, level, val, who='', duration=None)[source]

Write to a priority array level.

Return type:

None

Parameters:
async watch_sub(ids, watch_dis, lease=None, *, filter=None, raw=False)[source]

Create a new watch or add entities to an existing one.

Parameters:
  • filter (str | None (default: None)) – Optional Haystack filter for server-side filtering.

  • raw (bool (default: False)) – If True, return the raw Grid.

  • ids (list[Ref])

  • watch_dis (str)

  • lease (Number | None)

Return type:

Grid | list[dict[str, Any]]

async watch_unsub(watch_id, ids)[source]

Remove entities from a watch.

Return type:

None

Parameters:
async watch_close(watch_id)[source]

Close a watch entirely.

Return type:

None

Parameters:

watch_id (str)

async watch_poll(watch_id, refresh=False, *, raw=False)[source]

Poll a watch for changes.

Return type:

Grid | list[dict[str, Any]]

Parameters:
async invoke_action(id, action, args=None, *, raw=False)[source]

Invoke an action on an entity.

Return type:

Grid | list[dict[str, Any]]

Parameters:
async batch(*calls)[source]

Send multiple operations in a single WebSocket frame.

Parameters:

calls (tuple[str, Grid]) – Tuples of (op_name, grid).

Return type:

list[Grid]

Returns:

List of response grids in the same order as calls.

Raises:

NetworkError – If any call times out.

on_watch_push(callback)[source]

Register a callback for server-initiated watch push messages.

Parameters:

callback (Callable[[str, Grid], Any]) – Called with (watch_id, grid) for each push.

Return type:

None

class hs_py.ws_client.WebSocketPool(url, *, username='', password='', auth_token='', tls=None, timeout=30.0, heartbeat=30.0, compression=False, pythonic=True)[source]

Bases: object

Multiplexes multiple logical channels over a single WebSocket.

Each channel is identified by a string name included in the JSON envelope as the ch field.

Usage:

async with WebSocketPool("ws://host:8080/api/ws") as pool:
    ch1 = pool.channel("tenant-1")
    ch2 = pool.channel("tenant-2")
    about1 = await ch1.about()
    about2 = await ch2.about()
Parameters:
async close()[source]

Close the pooled WebSocket connection.

Return type:

None

channel(name)[source]

Return a virtual client scoped to the given channel name.

Parameters:

name (str) – Channel identifier (included as ch in each message).

Return type:

ChannelClient

Returns:

ChannelClient bound to this pool and channel.

Server

Async WebSocket server with SCRAM handshake and batch dispatch.

Haystack WebSocket server.

Accepts WebSocket connections using the websockets sans-I/O layer (via HaystackWebSocket) and dispatches operations to a HaystackOps implementation.

Uses asyncio.start_server for full control over TLS context, consistent with bac-py patterns and independent of aiohttp.

class hs_py.ws_server.WebSocketServer(ops, *, auth_token='', authenticator=None, tls=None, host='0.0.0.0', port=8080, heartbeat=30.0, metrics=None, cert_auth=None, compression=False, binary=False, user_store=None, binary_compression=None, chunked=False)[source]

Bases: object

Haystack WebSocket server using websockets sans-I/O.

Usage:

ops = MyHaystackOps()
server = WebSocketServer(ops, host="0.0.0.0", port=8080)
await server.start()
# ... server is running ...
await server.stop()
Parameters:
async start()[source]

Start the WebSocket server.

Return type:

None

async stop()[source]

Stop the server and close all connections.

Return type:

None

property port: int

Return the bound port (useful when bound to port 0).

async push_watch(watch_id, grid)[source]

Push a watch change notification to all connected clients.

Parameters:
  • watch_id (str) – The watch identifier.

  • grid (Grid) – Grid of changed entities.

Return type:

None

Binary Frame Codec

Binary WebSocket frame encoding and decoding for compact Haystack message transport.

Binary WebSocket frame codec for Haystack.

Provides a compact binary frame format for high-frequency watch pushes and other latency-sensitive operations. The binary header eliminates the JSON envelope overhead.

Frame layout (v1 — bits 3-4 clear):

Byte 0:    flags (bit 0 = response, bit 1 = error, bit 2 = push)
Bytes 1-2: request ID (uint16 big-endian, 0 for push)
Byte 3:    op code (uint8, mapped from op name)
Bytes 4-N: grid payload (JSON-encoded bytes)

Frame layout (v2 — compression/chunking):

Byte 0:    flags (bits 0-2 same as v1, bit 3 = compressed, bit 4 = chunked)
Bytes 1-2: request ID (uint16 big-endian)
Byte 3:    op code (uint8)

IF compressed (bit 3 set):
    Byte 4: compression algorithm (0 = zlib, 1 = lzma)

IF chunked (bit 4 set):
    Next 2 bytes: chunk index (uint16 big-endian)
    Next 2 bytes: total chunks (uint16 big-endian)

Remaining: payload bytes (compressed or raw)

Total v1 header: 4 bytes. v2 adds 1 byte for compression, 4 for chunking.

hs_py.ws_codec.CHUNK_SIZE: int = 262144

Default chunk size in bytes (before compression).

hs_py.ws_codec.CHUNK_THRESHOLD: int = 262144

Minimum payload size to trigger chunking.

hs_py.ws_codec.COMP_LZMA: int = 1

Compression algorithm ID for LZMA (raw, preset 0).

hs_py.ws_codec.COMP_ZLIB: int = 0

Compression algorithm ID for zlib (level 1).

hs_py.ws_codec.OP_CODES: dict[str, int] = {'about': 1, 'close': 4, 'formats': 3, 'hisRead': 12, 'hisWrite': 13, 'invokeAction': 18, 'nav': 11, 'ops': 2, 'pointWrite': 14, 'read': 10, 'watchPoll': 17, 'watchSub': 15, 'watchUnsub': 16}

Map Haystack operation names to binary op codes.

class hs_py.ws_codec.ChunkAssembler(ttl_seconds=60.0)[source]

Bases: object

Reassemble chunked binary frames into complete payloads.

Feed decoded chunk data via feed() and receive the assembled payload when all chunks have arrived.

Parameters:

ttl_seconds (float (default: 60.0)) – Seconds before incomplete chunk sequences are discarded.

feed(flags, req_id, op, chunk_data)[source]

Process a chunk frame and return the reassembled payload when complete.

Parameters:
  • flags (int) – Frame flags (must have FLAG_CHUNKED set).

  • req_id (int) – Request correlation ID.

  • op (str) – Operation name.

  • chunk_data (bytes) – The grid_bytes from decode_binary_frame() (includes chunk header + payload).

Return type:

bytes | None

Returns:

Fully reassembled (and decompressed) payload when all chunks have been received, or None if still waiting for more.

cleanup(now)[source]

Discard incomplete chunk buffers older than the TTL.

Parameters:

now (float) – Current monotonic timestamp (time.monotonic()).

Return type:

None

property pending_count: int

Number of incomplete chunk sequences being tracked.

hs_py.ws_codec.compress_payload(data, algorithm=0, threshold=1024)[source]

Compress data if it exceeds threshold.

Parameters:
  • data (bytes) – Raw payload bytes.

  • algorithm (int (default: 0)) – COMP_ZLIB or COMP_LZMA.

  • threshold (int (default: 1024)) – Minimum size to compress. Payloads smaller than this are returned unchanged.

Return type:

tuple[bytes, int | None]

Returns:

(payload, algo_id)algo_id is None when the payload was not compressed (below threshold).

hs_py.ws_codec.decode_binary_frame(data)[source]

Decode a binary frame into its components.

Transparently decompresses compressed payloads (FLAG_COMPRESSED) and returns the chunk metadata for chunked frames (FLAG_CHUNKED).

Parameters:

data (bytes) – Raw binary frame bytes.

Return type:

tuple[int, int, str, bytes]

Returns:

Tuple of (flags, req_id, op_name, grid_bytes). For chunked frames the grid_bytes is the raw (possibly compressed) chunk payload — use ChunkAssembler to reassemble.

Raises:

ValueError – If frame is too short or op code is unknown.

hs_py.ws_codec.decompress_payload(data, algorithm)[source]

Decompress data using the given algorithm.

Parameters:
  • data (bytes) – Compressed payload bytes.

  • algorithm (int) – COMP_ZLIB or COMP_LZMA.

Return type:

bytes

Returns:

Decompressed bytes.

Raises:

ValueError – If algorithm is not recognised.

hs_py.ws_codec.encode_binary_push(op, grid, *, compression=None)[source]

Encode a server-initiated push as a binary frame.

Parameters:
  • op (str) – Push type (e.g. "watchPoll").

  • grid (Grid) – Push grid payload.

  • compression (int | None (default: None)) – Optional compression algorithm.

Return type:

bytes

Returns:

Binary frame bytes.

hs_py.ws_codec.encode_binary_request(req_id, op, grid, *, compression=None)[source]

Encode a client request as a binary frame.

Parameters:
  • req_id (int) – Request correlation ID (0-65535).

  • op (str) – Operation name (must be in OP_CODES).

  • grid (Grid) – Request grid payload.

  • compression (int | None (default: None)) – Optional compression algorithm (COMP_ZLIB or COMP_LZMA). None disables compression.

Return type:

bytes

Returns:

Binary frame bytes.

Raises:

ValueError – If op is not a known operation.

hs_py.ws_codec.encode_binary_response(req_id, op, grid, *, is_error=False, compression=None)[source]

Encode a server response as a binary frame.

Parameters:
  • req_id (int) – Correlated request ID.

  • op (str) – Operation name.

  • grid (Grid) – Response grid payload.

  • is_error (bool (default: False)) – True if this is an error response.

  • compression (int | None (default: None)) – Optional compression algorithm.

Return type:

bytes

Returns:

Binary frame bytes.

hs_py.ws_codec.encode_chunked_frames(flags, req_id, op, payload, *, compression=None, chunk_size=262144)[source]

Split payload into chunked binary frames.

Each chunk is independently compressed (if compression is set) and wrapped with the chunked frame header.

Parameters:
  • flags (int) – Base flags for each frame (e.g. FLAG_RESPONSE).

  • req_id (int) – Request correlation ID.

  • op (str) – Operation name.

  • payload (bytes) – Full payload bytes to chunk.

  • compression (int | None (default: None)) – Optional compression algorithm.

  • chunk_size (int (default: 262144)) – Maximum raw bytes per chunk before compression.

Return type:

list[bytes]

Returns:

List of binary frame bytes, one per chunk.