WebSocket¶
Warning
The WebSocket transport API is experimental and subject to breaking changes in future releases.
WebSocket-based Haystack transport with sans-I/O protocol, async client, and server implementations.
Sans-I/O Protocol¶
Core WebSocket protocol logic, independent of any async framework.
Sans-I/O WebSocket wrapper for Haystack.
Uses the websockets library’s sans-I/O protocol objects together with
asyncio TCP/TLS streams. Each HaystackWebSocket instance owns
one WebSocket connection backed by a (StreamReader, StreamWriter) pair.
- class hs_py.ws.HaystackWebSocket(reader, writer, protocol)[source]¶
Bases:
objectAsync WebSocket connection using the websockets sans-I/O protocol.
Use
connect()to initiate a client connection oraccept()to accept a server-side connection. Both return a ready-to-use instance.- Parameters:
reader (StreamReader)
writer (StreamWriter)
protocol (ClientProtocol | ServerProtocol)
- async classmethod connect(uri, ssl_ctx=None, *, subprotocol='haystack', handshake_timeout=10.0, max_size=None, compression=False)[source]¶
Initiate a WebSocket client connection.
- Parameters:
uri (
str) – WebSocket URI (wss://host:port/path).ssl_ctx (
SSLContext|None(default:None)) – TLS context, or None for plaintextws://.subprotocol (
str(default:'haystack')) – WebSocket subprotocol to negotiate.handshake_timeout (
float(default:10.0)) – Maximum seconds for the handshake.max_size (
int|None(default:None)) – Maximum WebSocket message size.compression (
bool(default:False)) – Enable per-message deflate compression.
- Return type:
- Returns:
Connected
HaystackWebSocketinstance.
- async classmethod accept(reader, writer, *, subprotocol='haystack', handshake_timeout=10.0, max_size=None, compression=False)[source]¶
Accept an inbound WebSocket connection on existing streams.
- Parameters:
reader (
StreamReader) – asyncio StreamReader from the accepted TCP connection.writer (
StreamWriter) – asyncio StreamWriter from the accepted TCP connection.subprotocol (
str(default:'haystack')) – WebSocket subprotocol to accept.handshake_timeout (
float(default:10.0)) – Maximum seconds for the handshake.max_size (
int|None(default:None)) – Maximum WebSocket message size.compression (
bool(default:False)) – Enable per-message deflate compression.
- Return type:
- Returns:
Accepted
HaystackWebSocketinstance.
- async send_text_preencoded(data)[source]¶
Send pre-encoded UTF-8 bytes as a text WebSocket frame.
Avoids the
str → encode → bytesroundtrip when the caller already holds a UTF-8 byte string (e.g. fromorjson.dumps()).
- async hs_py.ws.cancel_task(task)[source]¶
Cancel an asyncio task and suppress
CancelledError.
- async hs_py.ws.heartbeat_loop(ws, interval)[source]¶
Periodically send WebSocket pings to keep a connection alive.
- Parameters:
ws (
HaystackWebSocket) – WebSocket connection to ping.interval (
float) – Seconds between pings.
- Return type:
Client¶
Async WebSocket client, reconnecting client, connection pool, and channel multiplexer.
Async Haystack WebSocket client.
Provides the same operation API as Client over a
persistent WebSocket connection. Uses JSON-encoded request/response
envelopes with correlation IDs for concurrent request support.
Message format (client → server):
{"id": "1", "op": "read", "grid": {...}}
Message format (server → client):
{"id": "1", "grid": {...}}
Server-initiated push:
{"type": "watch", "watchId": "w-1", "grid": {...}}
- class hs_py.ws_client.ChannelClient(pool, channel, *, pythonic=True)[source]¶
Bases:
objectVirtual client scoped to a single channel within a
WebSocketPool.Each channel has its own request ID space and pending futures. The channel name is included in every JSON envelope as the
chfield.- Parameters:
pool (WebSocketPool)
channel (str)
pythonic (bool)
- class hs_py.ws_client.ReconnectingWebSocketClient(url, *, username='', password='', auth_token='', tls=None, timeout=30.0, heartbeat=30.0, min_reconnect_delay=1.0, max_reconnect_delay=60.0, on_connect=None, on_disconnect=None, metrics=None, compression=False, binary=False, pythonic=True, binary_compression=None, chunked=False)[source]¶
Bases:
objectWebSocket client with automatic reconnection and exponential backoff.
Wraps
WebSocketClientand manages the connection lifecycle. On disconnect, reconnects with exponential backoff and re-registers watch callbacks.Usage:
client = ReconnectingWebSocketClient("ws://host:8080/api/ws") await client.start() try: rows = await client.about() finally: await client.stop()
- Parameters:
url (str)
username (str)
password (str)
auth_token (str)
tls (TLSConfig | None)
timeout (float)
heartbeat (float)
min_reconnect_delay (float)
max_reconnect_delay (float)
on_connect (Callable[[], Awaitable[None]] | None)
on_disconnect (Callable[[], Awaitable[None]] | None)
metrics (MetricsHooks | None)
compression (bool)
binary (bool)
pythonic (bool)
binary_compression (int | None)
chunked (bool)
Navigate the entity tree.
- async watch_sub(ids, watch_dis, lease=None, *, filter=None, raw=False)[source]¶
Create a new watch or add entities to an existing one.
- class hs_py.ws_client.WebSocketClient(url, *, username='', password='', auth_token='', tls=None, timeout=30.0, heartbeat=30.0, metrics=None, compression=False, binary=False, pythonic=True, binary_compression=None, chunked=False)[source]¶
Bases:
objectAsync Haystack WebSocket client.
Mirrors the
ClientAPI over a persistent WebSocket connection.Usage:
async with WebSocketClient("ws://host:8080/api/ws") as c: about = await c.about() # returns list[dict] by default points = await c.read("point and sensor") raw_grid = await c.about(raw=True) # returns Grid
- Parameters:
Navigate the entity tree.
- async watch_sub(ids, watch_dis, lease=None, *, filter=None, raw=False)[source]¶
Create a new watch or add entities to an existing one.
- class hs_py.ws_client.WebSocketPool(url, *, username='', password='', auth_token='', tls=None, timeout=30.0, heartbeat=30.0, compression=False, pythonic=True)[source]¶
Bases:
objectMultiplexes multiple logical channels over a single WebSocket.
Each channel is identified by a string name included in the JSON envelope as the
chfield.Usage:
async with WebSocketPool("ws://host:8080/api/ws") as pool: ch1 = pool.channel("tenant-1") ch2 = pool.channel("tenant-2") about1 = await ch1.about() about2 = await ch2.about()
- Parameters:
- channel(name)[source]¶
Return a virtual client scoped to the given channel name.
- Parameters:
name (
str) – Channel identifier (included aschin each message).- Return type:
- Returns:
ChannelClientbound to this pool and channel.
Server¶
Async WebSocket server with SCRAM handshake and batch dispatch.
Haystack WebSocket server.
Accepts WebSocket connections using the websockets sans-I/O layer
(via HaystackWebSocket) and dispatches operations to a
HaystackOps implementation.
Uses asyncio.start_server for full control over TLS context, consistent
with bac-py patterns and independent of aiohttp.
- class hs_py.ws_server.WebSocketServer(ops, *, auth_token='', authenticator=None, tls=None, host='0.0.0.0', port=8080, heartbeat=30.0, metrics=None, cert_auth=None, compression=False, binary=False, user_store=None, binary_compression=None, chunked=False)[source]¶
Bases:
objectHaystack WebSocket server using websockets sans-I/O.
Usage:
ops = MyHaystackOps() server = WebSocketServer(ops, host="0.0.0.0", port=8080) await server.start() # ... server is running ... await server.stop()
- Parameters:
ops (HaystackOps)
auth_token (str)
authenticator (Authenticator | None)
tls (TLSConfig | None)
host (str)
port (int)
heartbeat (float)
metrics (MetricsHooks | None)
cert_auth (CertAuthenticator | None)
compression (bool)
binary (bool)
user_store (Any)
binary_compression (int | None)
chunked (bool)
Binary Frame Codec¶
Binary WebSocket frame encoding and decoding for compact Haystack message transport.
Binary WebSocket frame codec for Haystack.
Provides a compact binary frame format for high-frequency watch pushes and other latency-sensitive operations. The binary header eliminates the JSON envelope overhead.
Frame layout (v1 — bits 3-4 clear):
Byte 0: flags (bit 0 = response, bit 1 = error, bit 2 = push)
Bytes 1-2: request ID (uint16 big-endian, 0 for push)
Byte 3: op code (uint8, mapped from op name)
Bytes 4-N: grid payload (JSON-encoded bytes)
Frame layout (v2 — compression/chunking):
Byte 0: flags (bits 0-2 same as v1, bit 3 = compressed, bit 4 = chunked)
Bytes 1-2: request ID (uint16 big-endian)
Byte 3: op code (uint8)
IF compressed (bit 3 set):
Byte 4: compression algorithm (0 = zlib, 1 = lzma)
IF chunked (bit 4 set):
Next 2 bytes: chunk index (uint16 big-endian)
Next 2 bytes: total chunks (uint16 big-endian)
Remaining: payload bytes (compressed or raw)
Total v1 header: 4 bytes. v2 adds 1 byte for compression, 4 for chunking.
- hs_py.ws_codec.OP_CODES: dict[str, int] = {'about': 1, 'close': 4, 'formats': 3, 'hisRead': 12, 'hisWrite': 13, 'invokeAction': 18, 'nav': 11, 'ops': 2, 'pointWrite': 14, 'read': 10, 'watchPoll': 17, 'watchSub': 15, 'watchUnsub': 16}¶
Map Haystack operation names to binary op codes.
- class hs_py.ws_codec.ChunkAssembler(ttl_seconds=60.0)[source]¶
Bases:
objectReassemble chunked binary frames into complete payloads.
Feed decoded chunk data via
feed()and receive the assembled payload when all chunks have arrived.- Parameters:
ttl_seconds (
float(default:60.0)) – Seconds before incomplete chunk sequences are discarded.
- feed(flags, req_id, op, chunk_data)[source]¶
Process a chunk frame and return the reassembled payload when complete.
- Parameters:
flags (
int) – Frame flags (must haveFLAG_CHUNKEDset).req_id (
int) – Request correlation ID.op (
str) – Operation name.chunk_data (
bytes) – Thegrid_bytesfromdecode_binary_frame()(includes chunk header + payload).
- Return type:
- Returns:
Fully reassembled (and decompressed) payload when all chunks have been received, or
Noneif still waiting for more.
- hs_py.ws_codec.compress_payload(data, algorithm=0, threshold=1024)[source]¶
Compress data if it exceeds threshold.
- Parameters:
- Return type:
- Returns:
(payload, algo_id)— algo_id isNonewhen the payload was not compressed (below threshold).
- hs_py.ws_codec.decode_binary_frame(data)[source]¶
Decode a binary frame into its components.
Transparently decompresses compressed payloads (
FLAG_COMPRESSED) and returns the chunk metadata for chunked frames (FLAG_CHUNKED).- Parameters:
data (
bytes) – Raw binary frame bytes.- Return type:
- Returns:
Tuple of
(flags, req_id, op_name, grid_bytes). For chunked frames the grid_bytes is the raw (possibly compressed) chunk payload — useChunkAssemblerto reassemble.- Raises:
ValueError – If frame is too short or op code is unknown.
- hs_py.ws_codec.decompress_payload(data, algorithm)[source]¶
Decompress data using the given algorithm.
- Parameters:
- Return type:
- Returns:
Decompressed bytes.
- Raises:
ValueError – If algorithm is not recognised.
- hs_py.ws_codec.encode_binary_push(op, grid, *, compression=None)[source]¶
Encode a server-initiated push as a binary frame.
- hs_py.ws_codec.encode_binary_request(req_id, op, grid, *, compression=None)[source]¶
Encode a client request as a binary frame.
- Parameters:
- Return type:
- Returns:
Binary frame bytes.
- Raises:
ValueError – If op is not a known operation.
- hs_py.ws_codec.encode_binary_response(req_id, op, grid, *, is_error=False, compression=None)[source]¶
Encode a server response as a binary frame.
- Parameters:
- Return type:
- Returns:
Binary frame bytes.
- hs_py.ws_codec.encode_chunked_frames(flags, req_id, op, payload, *, compression=None, chunk_size=262144)[source]¶
Split payload into chunked binary frames.
Each chunk is independently compressed (if compression is set) and wrapped with the chunked frame header.
- Parameters:
flags (
int) – Base flags for each frame (e.g.FLAG_RESPONSE).req_id (
int) – Request correlation ID.op (
str) – Operation name.payload (
bytes) – Full payload bytes to chunk.compression (
int|None(default:None)) – Optional compression algorithm.chunk_size (
int(default:262144)) – Maximum raw bytes per chunk before compression.
- Return type:
- Returns:
List of binary frame bytes, one per chunk.