Storage Backends ================ haystack-py ships with three storage backends that implement the :class:`~hs_py.storage.protocol.StorageAdapter` protocol. All backends support the same set of Haystack operations — entity CRUD, filter reads, navigation, history, point writes, and watch subscriptions. Choose the backend that matches your deployment needs: .. list-table:: :header-rows: 1 :widths: 20 30 50 * - Backend - Best For - Dependencies * - **Memory** - Testing, prototyping - None (built-in) * - **Redis** - Production, horizontal scaling - ``redis``, ``redisvl`` (``pip install haystack-py[server]``) * - **TimescaleDB** - SQL analytics, time-series queries - ``asyncpg`` (``pip install haystack-py[timescale]``) Memory Backend -------------- :class:`~hs_py.storage.memory.InMemoryAdapter` stores entities and history in-memory. Useful for unit tests and rapid prototyping. All data is lost when the process exits. .. code-block:: python from hs_py.storage.memory import InMemoryAdapter from hs_py.kinds import Ref, MARKER # Option 1: Pass entities at construction time storage = InMemoryAdapter(entities=[ {"id": Ref("site-1"), "dis": "HQ", "site": MARKER}, ]) # Option 2: Bulk-load after construction storage = InMemoryAdapter() storage.load_entities([ {"id": Ref("site-1"), "dis": "HQ", "site": MARKER}, ]) No configuration is required. Pass the adapter to :func:`~hs_py.fastapi_server.create_fastapi_app`: .. code-block:: python from hs_py.fastapi_server import create_fastapi_app from hs_py.storage.memory import InMemoryAdapter app = create_fastapi_app(storage=InMemoryAdapter()) Redis Backend ------------- The Redis backend uses `RediSearch `_ for full-text indexed entity queries and `RedisTimeSeries `_ for time-series history storage. The implementation is split across two modules: - :mod:`hs_py.redis_ops` — Low-level Redis operations (entity hash maps, RediSearch indexing, TimeSeries commands) - :mod:`hs_py.storage.redis` — :class:`~hs_py.storage.redis.RedisAdapter` wrapping ``redis_ops`` behind the ``StorageAdapter`` protocol Configuration ^^^^^^^^^^^^^ Set the ``REDIS_URL`` environment variable or pass it to the adapter: .. code-block:: python from hs_py.storage.redis import RedisAdapter adapter = RedisAdapter(redis_url="redis://localhost:6379") Docker Compose ^^^^^^^^^^^^^^ .. code-block:: yaml services: redis: image: redis/redis-stack-server:latest ports: - "6379:6379" healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 3s Redis Stack includes RediSearch and RedisTimeSeries modules automatically. Seeding Data ^^^^^^^^^^^^ Use the ``/load`` endpoint or call the adapter directly: .. code-block:: python import json with open("_data/Alpha/alpha.json") as f: entities = json.load(f) for entity in entities: await adapter.create(entity) TimescaleDB Backend ------------------- :class:`~hs_py.storage.timescale.TimescaleAdapter` stores entities as PostgreSQL JSONB rows and time-series data in TimescaleDB hypertables. Filter expressions are translated to SQL ``WHERE`` clauses for server-side pushdown. Schema ^^^^^^ The adapter auto-creates two tables on ``connect()``: .. code-block:: sql CREATE TABLE IF NOT EXISTS entities ( id TEXT PRIMARY KEY, tags JSONB NOT NULL ); CREATE TABLE IF NOT EXISTS history ( point_id TEXT NOT NULL, ts TIMESTAMPTZ NOT NULL, val DOUBLE PRECISION ); -- TimescaleDB hypertable for history SELECT create_hypertable('history', 'ts', if_not_exists => TRUE); Configuration ^^^^^^^^^^^^^ Pass a PostgreSQL DSN to the adapter: .. code-block:: python from hs_py.storage.timescale import TimescaleAdapter adapter = TimescaleAdapter(dsn="postgresql://user:pass@localhost:5432/haystack") await adapter.connect() # ... use adapter ... await adapter.close() Or use environment variables: .. code-block:: bash export TIMESCALE_DSN="postgresql://user:pass@localhost:5432/haystack" Docker Compose ^^^^^^^^^^^^^^ .. code-block:: yaml services: timescaledb: image: timescale/timescaledb:latest-pg16 ports: - "5432:5432" environment: POSTGRES_USER: haystack POSTGRES_PASSWORD: haystack POSTGRES_DB: haystack healthcheck: test: ["CMD-SHELL", "pg_isready -U haystack"] interval: 3s Filter Pushdown ^^^^^^^^^^^^^^^ Haystack filter expressions are compiled to SQL ``WHERE`` clauses. The ``_ast_to_sql`` method translates filter AST nodes to parameterised PostgreSQL queries using JSONB operators: - ``has`` → ``tags ? 'tagName'`` - ``missing`` → ``NOT (tags ? 'tagName')`` - ``==`` → ``tags->>'tagName' = $1`` - ``!=`` → ``tags->>'tagName' != $1`` - ``> / >= / < / <=`` → ``(tags->>'tagName')::float > $1::float`` Ref-valued tag comparisons use the nested JSONB path ``tags->'tagName'->>'val'`` to extract the reference id string. History Queries ^^^^^^^^^^^^^^^ Time-series data is stored in a TimescaleDB hypertable for efficient range queries. Use standard Haystack date range strings: .. code-block:: python # Single day his = await adapter.his_read("point-1", "2024-06-15") # Date range his = await adapter.his_read("point-1", "2024-06-01,2024-06-30") StorageAdapter Protocol ----------------------- All backends implement the :class:`~hs_py.storage.protocol.StorageAdapter` protocol. To create a custom backend, implement these methods: .. code-block:: python from hs_py.storage.protocol import StorageAdapter class MyAdapter(StorageAdapter): async def about(self) -> dict: ... async def read(self, filter_str: str, limit: int) -> list[dict]: ... async def read_by_ids(self, ids: list[str]) -> list[dict]: ... async def nav(self, nav_id: str | None) -> list[dict]: ... async def his_read(self, id: str, range_str: str) -> list[dict]: ... async def his_write(self, id: str, items: list[dict]) -> None: ... async def point_write(self, id: str, level: int, val, who: str) -> list[dict]: ... async def watch_sub(self, watch_id: str, ids: list[str]) -> list[dict]: ... async def watch_unsub(self, watch_id: str, ids: list[str]) -> None: ... async def watch_poll(self, watch_id: str, refresh: bool) -> list[dict]: ... See :mod:`hs_py.storage.protocol` for the full method signatures and type annotations.