Source code for hs_py.redis_ops

"""Backward-compatible Redis ops wrapper.

Provides :class:`RedisOps`, a thin :class:`~hs_py.ops.HaystackOps` subclass
that wraps :class:`~hs_py.storage.redis.RedisAdapter`.

For new code, prefer using :class:`~hs_py.storage.redis.RedisAdapter` directly
with :class:`~hs_py.ops.HaystackOps`::

    from hs_py.ops import HaystackOps
    from hs_py.storage.redis import RedisAdapter, create_redis_client

    redis = create_redis_client()
    adapter = RedisAdapter(redis)
    ops = HaystackOps(storage=adapter)
    await adapter.start()
"""

from __future__ import annotations

from typing import TYPE_CHECKING, Any

from hs_py.grid import Grid
from hs_py.kinds import Symbol
from hs_py.ops import HaystackOps
from hs_py.storage.redis import RedisAdapter, create_redis_client

if TYPE_CHECKING:
    from redis.asyncio import Redis

    from hs_py.ontology.namespace import Namespace

__all__ = ["RedisOps", "create_redis_client"]


[docs] class RedisOps(HaystackOps): """Haystack ops backed by Redis 8, wrapping :class:`~hs_py.storage.redis.RedisAdapter`. This is a convenience wrapper for backward compatibility. For new code, use ``RedisAdapter`` directly with ``HaystackOps(storage=adapter)``. :param redis: A ``redis.asyncio.Redis`` client instance. :param namespace: Optional ontology namespace for defs/libs ops. """ def __init__( self, redis: Redis[str], *, namespace: Namespace | None = None, ) -> None: adapter = RedisAdapter(redis) super().__init__(storage=adapter, namespace=namespace) self._adapter = adapter # -- Convenience property proxies ------------------------------------------ @property def _r(self) -> Redis[str]: """Direct Redis client access (for testing and internal helpers).""" return self._adapter._r # -- Lifecycle -------------------------------------------------------------
[docs] async def start(self) -> None: """Verify Redis connection and create RediSearch index.""" await self._adapter.start()
[docs] async def stop(self) -> None: """Close the Redis connection.""" await self._adapter.close()
# -- Internal helpers (delegated to adapter) ------------------------------ async def _store_entity(self, ref_val: str, entity: dict[str, Any]) -> None: """Store a single entity with tag indexes (delegates to adapter).""" await self._adapter._store_entity(ref_val, entity) # -- Bulk load -------------------------------------------------------------
[docs] async def load_entities(self, entities: list[dict[str, Any]]) -> int: """Bulk-load a list of entity dicts into Redis. :param entities: List of entity dicts (each must have an ``id`` Ref). :returns: Number of entities actually stored. """ return await self._adapter.load_entities(entities)
[docs] async def load_grid(self, grid: Grid) -> int: """Bulk-load entities from a Grid into Redis. :param grid: Grid of entities (each row must have an ``id`` Ref). :returns: Number of entities loaded. """ entities = [dict(row) for row in grid] return await self._adapter.load_entities(entities)
# -- Standard ops overrides ------------------------------------------------
[docs] async def about(self) -> Grid: """Return server information.""" return Grid.make_rows( [ { "haystackVersion": "4.0", "tz": "New_York", "serverName": "hs-py Redis Server", "productName": "hs-py", "productVersion": "0.3.0", } ] )
[docs] async def filetypes(self, grid: Grid) -> Grid: """Return supported file types.""" return Grid.make_rows( [ {"def": Symbol("filetype:json"), "dis": "JSON", "mime": "application/json"}, {"def": Symbol("filetype:zinc"), "dis": "Zinc", "mime": "text/zinc"}, {"def": Symbol("filetype:trio"), "dis": "Trio", "mime": "text/trio"}, {"def": Symbol("filetype:csv"), "dis": "CSV", "mime": "text/csv"}, ] )
[docs] async def invoke_action(self, grid: Grid) -> Grid: """Invoke an action on an entity.""" action = grid.meta.get("action", "unknown") return Grid.make_rows([{"action": str(action), "result": "ok"}])