Source code for hs_py.user

"""User model and helpers for Haystack server user management.

Provides a frozen :class:`User` dataclass, a :class:`Role` enum for
role-based access control, and a :func:`create_user` factory that derives
SCRAM-SHA-256 credentials from a plaintext password.  Passwords are
**never** stored — only the derived ``ScramCredentials``.
"""

from __future__ import annotations

import base64
import enum
import os
import time
from dataclasses import dataclass, field
from typing import Any

from hs_py.auth import _derive_key, _hash_digest, _hmac
from hs_py.auth_types import ScramCredentials

__all__ = [
    "Role",
    "User",
    "create_user",
    "user_from_dict",
    "user_to_dict",
]

_DEFAULT_ITERATIONS = 600_000


[docs] class Role(enum.Enum): """User role for permission enforcement. Roles form a strict hierarchy: ADMIN > OPERATOR > VIEWER. * **VIEWER** — read-only Haystack ops (read, nav, hisRead, defs, …). * **OPERATOR** — read + write ops (hisWrite, pointWrite, invokeAction, watches). * **ADMIN** — full access including user management. """ VIEWER = "viewer" OPERATOR = "operator" ADMIN = "admin" # Numeric level for ordering comparisons @property def level(self) -> int: """Return numeric privilege level (higher = more access).""" return _ROLE_LEVEL[self] def __ge__(self, other: object) -> bool: if not isinstance(other, Role): return NotImplemented return self.level >= other.level def __gt__(self, other: object) -> bool: if not isinstance(other, Role): return NotImplemented return self.level > other.level def __le__(self, other: object) -> bool: if not isinstance(other, Role): return NotImplemented return self.level <= other.level def __lt__(self, other: object) -> bool: if not isinstance(other, Role): return NotImplemented return self.level < other.level
_ROLE_LEVEL: dict[Role, int] = { Role.VIEWER: 0, Role.OPERATOR: 1, Role.ADMIN: 2, } # ---- Op classification sets ------------------------------------------------ #: Haystack ops that mutate data — require OPERATOR or ADMIN. WRITE_OPS: frozenset[str] = frozenset( { "hisWrite", "pointWrite", "invokeAction", "watchSub", "watchUnsub", "watchPoll", "close", } ) #: Haystack ops that only read data — require VIEWER or above. READ_OPS: frozenset[str] = frozenset( { "read", "nav", "hisRead", "defs", "libs", "filetypes", } ) def can_admin(role: Role) -> bool: """Return ``True`` if the role has admin privileges.""" return role >= Role.ADMIN def can_write(role: Role) -> bool: """Return ``True`` if the role can perform write operations.""" return role >= Role.OPERATOR def can_read(role: Role) -> bool: """Return ``True`` if the role can perform read operations.""" return role >= Role.VIEWER
[docs] @dataclass(frozen=True, slots=True) class User: """Immutable user record. Passwords are stored as pre-computed :class:`~hs_py.auth_types.ScramCredentials` — the plaintext is never retained. :param username: Unique login identifier (required). :param credentials: SCRAM-SHA-256 credentials derived from the password. :param first_name: User's first name. :param last_name: User's last name. :param email: User's email address. :param role: Permission role (ADMIN, OPERATOR, or VIEWER). :param enabled: Whether the user can log in. :param created_at: Monotonic creation timestamp. :param updated_at: Monotonic last-update timestamp. """ username: str credentials: ScramCredentials first_name: str = "" last_name: str = "" email: str = "" role: Role = Role.VIEWER enabled: bool = True created_at: float = field(default_factory=time.time) updated_at: float = field(default_factory=time.time)
def derive_scram_credentials( password: str, *, iterations: int = _DEFAULT_ITERATIONS, ) -> ScramCredentials: """Derive SCRAM-SHA-256 credentials from a plaintext password. :param password: Plaintext password. :param iterations: PBKDF2 iteration count. :returns: Pre-computed :class:`ScramCredentials`. """ salt = os.urandom(16) salted_pw = _derive_key(password.encode(), salt, iterations, "sha256") client_key = _hmac("sha256", salted_pw, b"Client Key") stored_key = _hash_digest("sha256", client_key) server_key = _hmac("sha256", salted_pw, b"Server Key") return ScramCredentials( salt=salt, iterations=iterations, stored_key=stored_key, server_key=server_key, )
[docs] def create_user( username: str, password: str, *, first_name: str = "", last_name: str = "", email: str = "", role: Role = Role.VIEWER, enabled: bool = True, iterations: int = _DEFAULT_ITERATIONS, ) -> User: """Create a :class:`User` with derived SCRAM credentials. :param username: Unique login identifier. :param password: Plaintext password (used to derive credentials, then discarded). :param first_name: User's first name. :param last_name: User's last name. :param email: User's email address. :param role: Permission role (default :attr:`Role.VIEWER`). :param enabled: Whether the user can log in. :param iterations: PBKDF2 iteration count. :returns: Frozen :class:`User` instance. """ if not username: raise ValueError("username must not be empty") if not password: raise ValueError("password must not be empty") credentials = derive_scram_credentials(password, iterations=iterations) now = time.time() return User( username=username, credentials=credentials, first_name=first_name, last_name=last_name, email=email, role=role, enabled=enabled, created_at=now, updated_at=now, )
# --------------------------------------------------------------------------- # Serialization helpers (for storage backends) # ---------------------------------------------------------------------------
[docs] def user_to_dict(user: User) -> dict[str, Any]: """Serialize a :class:`User` to a plain dict for storage. Credential bytes are base64-encoded for JSON compatibility. """ creds = user.credentials return { "username": user.username, "salt": base64.b64encode(creds.salt).decode(), "iterations": creds.iterations, "stored_key": base64.b64encode(creds.stored_key).decode(), "server_key": base64.b64encode(creds.server_key).decode(), "first_name": user.first_name, "last_name": user.last_name, "email": user.email, "role": user.role.value, "enabled": user.enabled, "created_at": user.created_at, "updated_at": user.updated_at, }
[docs] def user_from_dict(d: dict[str, Any]) -> User: """Deserialize a :class:`User` from a plain dict. Inverse of :func:`user_to_dict`. """ credentials = ScramCredentials( salt=base64.b64decode(d["salt"]), iterations=d["iterations"], stored_key=base64.b64decode(d["stored_key"]), server_key=base64.b64decode(d["server_key"]), ) return User( username=d["username"], credentials=credentials, first_name=d.get("first_name", ""), last_name=d.get("last_name", ""), email=d.get("email", ""), role=Role(d["role"]) if "role" in d else Role.VIEWER, enabled=d.get("enabled", True), created_at=d.get("created_at", 0.0), updated_at=d.get("updated_at", 0.0), )