Source code for hs_py.client

"""Async Haystack HTTP API client.

Provides a high-level async client implementing the standard Project Haystack
operations over HTTP using JSON encoding.

See: https://project-haystack.org/doc/docHaystack/HttpApi
"""

from __future__ import annotations

import asyncio
import contextlib
import logging
from typing import Any, Literal, overload

import aiohttp

from hs_py.auth import authenticate
from hs_py.content_negotiation import _FORMAT_TO_CONTENT_TYPE, decode_request
from hs_py.convert import grid_to_pythonic
from hs_py.errors import AuthError, CallError, NetworkError
from hs_py.grid import Grid, GridBuilder
from hs_py.kinds import MARKER, Number, Ref
from hs_py.tls import TLSConfig, build_client_ssl_context

__all__ = [
    "Client",
]

_log = logging.getLogger(__name__)

_JSON_CT = "application/json"


[docs] class Client: """Async Haystack HTTP API client. Usage:: async with Client("http://host/api", "user", "pass") as c: about = await c.about() # returns list[dict] by default points = await c.read("point and sensor") raw_grid = await c.about(raw=True) # returns Grid """ def __repr__(self) -> str: return f"Client(base_url={self._base_url!r}, username={self._username!r})" def __init__( self, base_url: str, username: str = "", password: str = "", *, timeout: aiohttp.ClientTimeout | None = None, connector: aiohttp.BaseConnector | None = None, tls: TLSConfig | None = None, pythonic: bool = True, accept_format: str = "json", ) -> None: """Initialise the client. :param base_url: Haystack server base URL (e.g. ``http://host/api``). :param username: Username for SCRAM authentication (empty to skip auth). :param password: Password for SCRAM authentication. :param timeout: HTTP request timeout configuration. :param connector: Custom :class:`aiohttp.BaseConnector` for connection pooling. :param tls: Optional :class:`~hs_py.tls.TLSConfig` for TLS 1.3 connections. :param pythonic: When ``True`` (default) Grid-returning methods return ``list[dict[str, Any]]`` with Haystack kinds converted to plain Python values. Pass ``False`` to always return raw :class:`~hs_py.grid.Grid`. :param accept_format: Wire format for request/response bodies. Supported values: ``"json"`` (default), ``"zinc"``. Controls the ``Content-Type`` and ``Accept`` headers sent to the server. """ self._base_url = base_url.rstrip("/") self._username = username self._password = password self._timeout = timeout self._connector = connector self._tls = tls self._pythonic = pythonic self._accept_format = accept_format self._content_type = _FORMAT_TO_CONTENT_TYPE.get(accept_format, _JSON_CT) self._session: aiohttp.ClientSession | None = None self._auth_token: str | None = None async def __aenter__(self) -> Client: connector = self._connector if connector is None and self._tls is not None: ssl_ctx = build_client_ssl_context(self._tls) connector = aiohttp.TCPConnector(ssl=ssl_ctx) self._session = aiohttp.ClientSession( timeout=self._timeout or aiohttp.ClientTimeout(total=30), connector=connector, connector_owner=self._connector is None, ) _log.info("Client session opened for %s", self._base_url) return self async def __aexit__(self, *exc: object) -> None: await self.close()
[docs] async def close(self) -> None: """Close the underlying HTTP session.""" if self._session is not None: with contextlib.suppress(Exception): await self._call_get("close") await self._session.close() # Allow underlying connections to close gracefully (aiohttp best practice) await asyncio.sleep(0) self._session = None self._auth_token = None _log.info("Client session closed for %s", self._base_url)
# ---- Standard ops ------------------------------------------------------ @overload async def about(self, *, raw: Literal[True]) -> Grid: ... @overload async def about(self, *, raw: Literal[False] = ...) -> list[dict[str, Any]]: ... @overload async def about(self, *, raw: bool = ...) -> Grid | list[dict[str, Any]]: ...
[docs] async def about(self, *, raw: bool = False) -> Grid | list[dict[str, Any]]: """Query server information. :param raw: If ``True``, return the raw :class:`~hs_py.grid.Grid` regardless of the *pythonic* constructor setting. :returns: Server metadata rows as ``list[dict]`` or :class:`~hs_py.grid.Grid`. """ grid = await self._call_get("about") return grid if raw or not self._pythonic else grid_to_pythonic(grid)
@overload async def ops(self, *, raw: Literal[True]) -> Grid: ... @overload async def ops(self, *, raw: Literal[False] = ...) -> list[dict[str, Any]]: ... @overload async def ops(self, *, raw: bool = ...) -> Grid | list[dict[str, Any]]: ...
[docs] async def ops(self, *, raw: bool = False) -> Grid | list[dict[str, Any]]: """Query available operations. :param raw: If ``True``, return the raw :class:`~hs_py.grid.Grid`. :returns: Supported ops rows as ``list[dict]`` or :class:`~hs_py.grid.Grid`. """ grid = await self._call_get("ops") return grid if raw or not self._pythonic else grid_to_pythonic(grid)
@overload async def formats(self, *, raw: Literal[True]) -> Grid: ... @overload async def formats(self, *, raw: Literal[False] = ...) -> list[dict[str, Any]]: ... @overload async def formats(self, *, raw: bool = ...) -> Grid | list[dict[str, Any]]: ...
[docs] async def formats(self, *, raw: bool = False) -> Grid | list[dict[str, Any]]: """Query supported data formats. :param raw: If ``True``, return the raw :class:`~hs_py.grid.Grid`. :returns: Supported MIME type rows as ``list[dict]`` or :class:`~hs_py.grid.Grid`. """ grid = await self._call_get("formats") return grid if raw or not self._pythonic else grid_to_pythonic(grid)
@overload async def read(self, filter: str, limit: int | None = ..., *, raw: Literal[True]) -> Grid: ... @overload async def read( self, filter: str, limit: int | None = ..., *, raw: Literal[False] ) -> list[dict[str, Any]]: ... @overload async def read( self, filter: str, limit: int | None = ..., *, raw: bool ) -> Grid | list[dict[str, Any]]: ...
[docs] async def read( self, filter: str, limit: int | None = None, *, raw: bool = False ) -> Grid | list[dict[str, Any]]: """Read entities matching a filter expression. :param filter: Haystack filter string. :param limit: Maximum number of entities to return. :param raw: If ``True``, return the raw :class:`~hs_py.grid.Grid`. :returns: Matching entity rows as ``list[dict]`` or :class:`~hs_py.grid.Grid`. """ row: dict[str, Any] = {"filter": filter} if limit is not None: row["limit"] = Number(float(limit)) grid_req = GridBuilder().add_col("filter").add_col("limit").add_row(row).to_grid() grid = await self._call("read", grid_req) return grid if raw or not self._pythonic else grid_to_pythonic(grid)
@overload async def read_by_ids(self, ids: list[Ref], *, raw: Literal[True]) -> Grid: ... @overload async def read_by_ids( self, ids: list[Ref], *, raw: Literal[False] ) -> list[dict[str, Any]]: ... @overload async def read_by_ids(self, ids: list[Ref], *, raw: bool) -> Grid | list[dict[str, Any]]: ...
[docs] async def read_by_ids( self, ids: list[Ref], *, raw: bool = False ) -> Grid | list[dict[str, Any]]: """Read entities by their identifiers. :param ids: List of entity Refs to read. :param raw: If ``True``, return the raw :class:`~hs_py.grid.Grid`. :returns: Matching entity rows as ``list[dict]`` or :class:`~hs_py.grid.Grid`. """ builder = GridBuilder().add_col("id") for ref in ids: builder.add_row({"id": ref}) grid = await self._call("read", builder.to_grid()) return grid if raw or not self._pythonic else grid_to_pythonic(grid)
@overload async def nav(self, nav_id: str | None = ..., *, raw: Literal[True]) -> Grid: ... @overload async def nav( self, nav_id: str | None = ..., *, raw: Literal[False] ) -> list[dict[str, Any]]: ... @overload async def nav(self, nav_id: str | None = ..., *, raw: bool) -> Grid | list[dict[str, Any]]: ...
[docs] async def nav( self, nav_id: str | None = None, *, raw: bool = False ) -> Grid | list[dict[str, Any]]: """Navigate the entity tree. :param nav_id: Navigation ID for child lookup, or ``None`` for root. :param raw: If ``True``, return the raw :class:`~hs_py.grid.Grid`. :returns: Navigation children as ``list[dict]`` or :class:`~hs_py.grid.Grid`. """ row: dict[str, Any] = {"navId": nav_id} grid_req = GridBuilder().add_col("navId").add_row(row).to_grid() grid = await self._call("nav", grid_req) return grid if raw or not self._pythonic else grid_to_pythonic(grid)
# ---- History ops ------------------------------------------------------- @overload async def his_read(self, id: Ref, range: str, *, raw: Literal[True]) -> Grid: ... @overload async def his_read( self, id: Ref, range: str, *, raw: Literal[False] ) -> list[dict[str, Any]]: ... @overload async def his_read(self, id: Ref, range: str, *, raw: bool) -> Grid | list[dict[str, Any]]: ...
[docs] async def his_read( self, id: Ref, range: str, *, raw: bool = False ) -> Grid | list[dict[str, Any]]: """Read time-series data for a single point. :param id: Ref of the historized point. :param range: Time range string (e.g. ``"today"``, ``"yesterday"``, ``"2024-01-01,2024-01-31"``). :param raw: If ``True``, return the raw :class:`~hs_py.grid.Grid`. :returns: Time-series rows as ``list[dict]`` or :class:`~hs_py.grid.Grid`. """ grid_req = ( GridBuilder() .add_col("id") .add_col("range") .add_row({"id": id, "range": range}) .to_grid() ) grid = await self._call("hisRead", grid_req) return grid if raw or not self._pythonic else grid_to_pythonic(grid)
@overload async def his_read_batch(self, ids: list[Ref], range: str, *, raw: Literal[True]) -> Grid: ... @overload async def his_read_batch( self, ids: list[Ref], range: str, *, raw: Literal[False] ) -> list[dict[str, Any]]: ... @overload async def his_read_batch( self, ids: list[Ref], range: str, *, raw: bool ) -> Grid | list[dict[str, Any]]: ...
[docs] async def his_read_batch( self, ids: list[Ref], range: str, *, raw: bool = False ) -> Grid | list[dict[str, Any]]: """Read time-series data for multiple points. :param ids: List of point Refs. :param range: Time range string. :param raw: If ``True``, return the raw :class:`~hs_py.grid.Grid`. :returns: Time-series rows as ``list[dict]`` or :class:`~hs_py.grid.Grid`. """ builder = GridBuilder().set_meta({"range": range}).add_col("id") for ref in ids: builder.add_row({"id": ref}) grid = await self._call("hisRead", builder.to_grid()) return grid if raw or not self._pythonic else grid_to_pythonic(grid)
[docs] async def his_write(self, id: Ref, items: list[dict[str, Any]]) -> None: """Write time-series data to a single point. :param id: Ref of the historized point. :param items: List of dicts with ``ts`` and ``val`` keys. """ builder = GridBuilder().set_meta({"id": id}).add_col("ts").add_col("val") for item in items: builder.add_row(item) await self._call("hisWrite", builder.to_grid())
[docs] async def his_write_batch(self, grid: Grid) -> None: """Write time-series data for multiple points. :param grid: Pre-built grid with ``ts`` and ``v0``/``v1``/... columns. """ await self._call("hisWrite", grid)
# ---- Point write ops --------------------------------------------------- @overload async def point_write_array(self, id: Ref, *, raw: Literal[True]) -> Grid: ... @overload async def point_write_array(self, id: Ref, *, raw: Literal[False]) -> list[dict[str, Any]]: ... @overload async def point_write_array(self, id: Ref, *, raw: bool) -> Grid | list[dict[str, Any]]: ...
[docs] async def point_write_array( self, id: Ref, *, raw: bool = False ) -> Grid | list[dict[str, Any]]: """Read the priority array of a writable point. :param id: Ref of the writable point. :param raw: If ``True``, return the raw :class:`~hs_py.grid.Grid`. :returns: Priority array rows as ``list[dict]`` or :class:`~hs_py.grid.Grid`. """ grid_req = GridBuilder().add_col("id").add_row({"id": id}).to_grid() grid = await self._call("pointWrite", grid_req) return grid if raw or not self._pythonic else grid_to_pythonic(grid)
[docs] async def point_write( self, id: Ref, level: int, val: Any, who: str = "", duration: Number | None = None, ) -> None: """Write to a priority array level. :param id: Ref of the writable point. :param level: Priority level 1-17. :param val: Value to write, or None to release. :param who: Identifier of who is making the write. :param duration: Optional duration for level 8 timed overrides. """ row: dict[str, Any] = { "id": id, "level": Number(float(level)), "val": val, "who": who, } if duration is not None: row["duration"] = duration cols = ["id", "level", "val", "who", "duration"] builder = GridBuilder() for col in cols: builder.add_col(col) builder.add_row(row) await self._call("pointWrite", builder.to_grid())
# ---- Watch ops --------------------------------------------------------- @overload async def watch_sub( self, ids: list[Ref], watch_dis: str, lease: Number | None = ..., *, raw: Literal[True], ) -> Grid: ... @overload async def watch_sub( self, ids: list[Ref], watch_dis: str, lease: Number | None = ..., *, raw: Literal[False], ) -> list[dict[str, Any]]: ... @overload async def watch_sub( self, ids: list[Ref], watch_dis: str, lease: Number | None = ..., *, raw: bool, ) -> Grid | list[dict[str, Any]]: ...
[docs] async def watch_sub( self, ids: list[Ref], watch_dis: str, lease: Number | None = None, *, raw: bool = False, ) -> Grid | list[dict[str, Any]]: """Create a new watch or add entities to an existing one. :param ids: Entity Refs to watch. :param watch_dis: Display name for the watch. :param lease: Optional lease duration. :param raw: If ``True``, return the raw :class:`~hs_py.grid.Grid`. :returns: Current entity state as ``list[dict]`` or :class:`~hs_py.grid.Grid`. """ meta: dict[str, Any] = {"watchDis": watch_dis} if lease is not None: meta["lease"] = lease builder = GridBuilder().set_meta(meta).add_col("id") for ref in ids: builder.add_row({"id": ref}) grid = await self._call("watchSub", builder.to_grid()) return grid if raw or not self._pythonic else grid_to_pythonic(grid)
[docs] async def watch_unsub(self, watch_id: str, ids: list[Ref]) -> None: """Remove entities from a watch. :param watch_id: Watch identifier. :param ids: Entity Refs to unwatch. """ builder = GridBuilder().set_meta({"watchId": watch_id}).add_col("id") for ref in ids: builder.add_row({"id": ref}) await self._call("watchUnsub", builder.to_grid())
[docs] async def watch_close(self, watch_id: str) -> None: """Close a watch entirely. :param watch_id: Watch identifier to close. """ builder = GridBuilder().set_meta({"watchId": watch_id, "close": MARKER}).add_col("id") await self._call("watchUnsub", builder.to_grid())
@overload async def watch_poll( self, watch_id: str, refresh: bool = ..., *, raw: Literal[True] ) -> Grid: ... @overload async def watch_poll( self, watch_id: str, refresh: bool = ..., *, raw: Literal[False] ) -> list[dict[str, Any]]: ... @overload async def watch_poll( self, watch_id: str, refresh: bool = ..., *, raw: bool ) -> Grid | list[dict[str, Any]]: ...
[docs] async def watch_poll( self, watch_id: str, refresh: bool = False, *, raw: bool = False ) -> Grid | list[dict[str, Any]]: """Poll a watch for changes. :param watch_id: Watch identifier. :param refresh: If ``True``, return full refresh of all watched entities. :param raw: If ``True``, return the raw :class:`~hs_py.grid.Grid`. :returns: Changed (or all) entities as ``list[dict]`` or :class:`~hs_py.grid.Grid`. """ meta: dict[str, Any] = {"watchId": watch_id} if refresh: meta["refresh"] = MARKER grid_req = GridBuilder().set_meta(meta).to_grid() grid = await self._call("watchPoll", grid_req) return grid if raw or not self._pythonic else grid_to_pythonic(grid)
# ---- Action ops -------------------------------------------------------- @overload async def invoke_action( self, id: Ref, action: str, args: dict[str, Any] | None = ..., *, raw: Literal[True], ) -> Grid: ... @overload async def invoke_action( self, id: Ref, action: str, args: dict[str, Any] | None = ..., *, raw: Literal[False], ) -> list[dict[str, Any]]: ... @overload async def invoke_action( self, id: Ref, action: str, args: dict[str, Any] | None = ..., *, raw: bool, ) -> Grid | list[dict[str, Any]]: ...
[docs] async def invoke_action( self, id: Ref, action: str, args: dict[str, Any] | None = None, *, raw: bool = False, ) -> Grid | list[dict[str, Any]]: """Invoke an action on an entity. :param id: Ref of the target entity. :param action: Action name. :param args: Optional action arguments. :param raw: If ``True``, return the raw :class:`~hs_py.grid.Grid`. :returns: Action results as ``list[dict]`` or :class:`~hs_py.grid.Grid`. """ meta: dict[str, Any] = {"id": id, "action": action} builder = GridBuilder().set_meta(meta) if args: for key in args: builder.add_col(key) builder.add_row(args) grid = await self._call("invokeAction", builder.to_grid()) return grid if raw or not self._pythonic else grid_to_pythonic(grid)
# ---- Internal plumbing ------------------------------------------------- async def _ensure_auth(self) -> None: """Authenticate if not already authenticated.""" if self._auth_token is not None: return session = self._require_session() if self._username: _log.debug("Authenticating user '%s' against %s", self._username, self._base_url) self._auth_token = await authenticate( session, self._base_url, self._username, self._password ) # Clear password from memory after successful authentication self._password = "" _log.debug("Authentication successful for '%s'", self._username) else: self._auth_token = "" def _auth_headers(self) -> dict[str, str]: """Return authorization headers.""" headers: dict[str, str] = { "Content-Type": self._content_type, "Accept": self._content_type, } if self._auth_token: headers["Authorization"] = f"BEARER authToken={self._auth_token}" return headers async def _request(self, method: str, op: str, **kwargs: Any) -> Grid: """Send an HTTP request with automatic re-auth on 401.""" await self._ensure_auth() session = self._require_session() url = f"{self._base_url}/{op}" kwargs["headers"] = self._auth_headers() kwargs.setdefault("allow_redirects", False) _log.debug("%s %s", method, url) try: async with session.request(method, url, **kwargs) as resp: if resp.status == 401: _log.warning("Auth expired for %s, re-authenticating", url) # Consume body so the connection can be reused await resp.read() self._auth_token = None await self._ensure_auth() kwargs["headers"] = self._auth_headers() async with session.request(method, url, **kwargs) as retry: return await self._handle_response(retry) return await self._handle_response(resp) except aiohttp.ClientError as exc: _log.warning("Request failed: %s %s%s", method, url, exc) raise NetworkError(str(exc)) from exc async def _call(self, op: str, grid: Grid) -> Grid: """POST a grid to an operation endpoint and return the response grid.""" from hs_py.content_negotiation import encode_response as _encode_response body, _ = _encode_response(grid, self._accept_format) return await self._request("POST", op, data=body) async def _call_get(self, op: str, params: dict[str, str] | None = None) -> Grid: """GET an operation endpoint and return the response grid.""" return await self._request("GET", op, params=params) async def _handle_response(self, resp: aiohttp.ClientResponse) -> Grid: """Decode a response and check for error grids.""" if resp.status == 401: raise AuthError(f"Authentication failed: {resp.status}") if resp.status == 406: raise CallError("Server does not support requested Accept format", Grid.make_empty()) if resp.status == 415: raise CallError("Server does not support request Content-Type", Grid.make_empty()) data = await resp.read() if not data: return Grid.make_empty() ct = resp.headers.get("content-type", self._content_type) grid = decode_request(data, ct) if grid.is_error: raise CallError(grid.meta.get("dis", "Unknown error"), grid) return grid def _require_session(self) -> aiohttp.ClientSession: """Return the active session or raise.""" if self._session is None: msg = "Client is not open. Use 'async with Client(...) as c:'" raise RuntimeError(msg) return self._session