Storage Backends

haystack-py ships with three storage backends that implement the StorageAdapter protocol. All backends support the same set of Haystack operations — entity CRUD, filter reads, navigation, history, point writes, and watch subscriptions.

Choose the backend that matches your deployment needs:

Backend

Best For

Dependencies

Memory

Testing, prototyping

None (built-in)

Redis

Production, horizontal scaling

redis, redisvl (pip install haystack-py[server])

TimescaleDB

SQL analytics, time-series queries

asyncpg (pip install haystack-py[timescale])

Memory Backend

InMemoryAdapter stores entities and history in-memory. Useful for unit tests and rapid prototyping. All data is lost when the process exits.

from hs_py.storage.memory import InMemoryAdapter
from hs_py.kinds import Ref, MARKER

# Option 1: Pass entities at construction time
storage = InMemoryAdapter(entities=[
    {"id": Ref("site-1"), "dis": "HQ", "site": MARKER},
])

# Option 2: Bulk-load after construction
storage = InMemoryAdapter()
storage.load_entities([
    {"id": Ref("site-1"), "dis": "HQ", "site": MARKER},
])

No configuration is required. Pass the adapter to create_fastapi_app():

from hs_py.fastapi_server import create_fastapi_app
from hs_py.storage.memory import InMemoryAdapter

app = create_fastapi_app(storage=InMemoryAdapter())

Redis Backend

The Redis backend uses RediSearch for full-text indexed entity queries and RedisTimeSeries for time-series history storage.

The implementation is split across two modules:

Configuration

Set the REDIS_URL environment variable or pass it to the adapter:

from hs_py.storage.redis import RedisAdapter

adapter = RedisAdapter(redis_url="redis://localhost:6379")

Docker Compose

services:
  redis:
    image: redis/redis-stack-server:latest
    ports:
      - "6379:6379"
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 3s

Redis Stack includes RediSearch and RedisTimeSeries modules automatically.

Seeding Data

Use the /load endpoint or call the adapter directly:

import json

with open("_data/Alpha/alpha.json") as f:
    entities = json.load(f)

for entity in entities:
    await adapter.create(entity)

TimescaleDB Backend

TimescaleAdapter stores entities as PostgreSQL JSONB rows and time-series data in TimescaleDB hypertables. Filter expressions are translated to SQL WHERE clauses for server-side pushdown.

Schema

The adapter auto-creates two tables on connect():

CREATE TABLE IF NOT EXISTS entities (
    id   TEXT PRIMARY KEY,
    tags JSONB NOT NULL
);

CREATE TABLE IF NOT EXISTS history (
    point_id  TEXT        NOT NULL,
    ts        TIMESTAMPTZ NOT NULL,
    val       DOUBLE PRECISION
);

-- TimescaleDB hypertable for history
SELECT create_hypertable('history', 'ts', if_not_exists => TRUE);

Configuration

Pass a PostgreSQL DSN to the adapter:

from hs_py.storage.timescale import TimescaleAdapter

adapter = TimescaleAdapter(dsn="postgresql://user:pass@localhost:5432/haystack")

await adapter.connect()
# ... use adapter ...
await adapter.close()

Or use environment variables:

export TIMESCALE_DSN="postgresql://user:pass@localhost:5432/haystack"

Docker Compose

services:
  timescaledb:
    image: timescale/timescaledb:latest-pg16
    ports:
      - "5432:5432"
    environment:
      POSTGRES_USER: haystack
      POSTGRES_PASSWORD: haystack
      POSTGRES_DB: haystack
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U haystack"]
      interval: 3s

Filter Pushdown

Haystack filter expressions are compiled to SQL WHERE clauses. The _ast_to_sql method translates filter AST nodes to parameterised PostgreSQL queries using JSONB operators:

  • hastags ? 'tagName'

  • missingNOT (tags ? 'tagName')

  • ==tags->>'tagName' = $1

  • !=tags->>'tagName' != $1

  • > / >= / < / <=(tags->>'tagName')::float > $1::float

Ref-valued tag comparisons use the nested JSONB path tags->'tagName'->>'val' to extract the reference id string.

History Queries

Time-series data is stored in a TimescaleDB hypertable for efficient range queries. Use standard Haystack date range strings:

# Single day
his = await adapter.his_read("point-1", "2024-06-15")

# Date range
his = await adapter.his_read("point-1", "2024-06-01,2024-06-30")

StorageAdapter Protocol

All backends implement the StorageAdapter protocol. To create a custom backend, implement these methods:

from hs_py.storage.protocol import StorageAdapter

class MyAdapter(StorageAdapter):
    async def about(self) -> dict: ...
    async def read(self, filter_str: str, limit: int) -> list[dict]: ...
    async def read_by_ids(self, ids: list[str]) -> list[dict]: ...
    async def nav(self, nav_id: str | None) -> list[dict]: ...
    async def his_read(self, id: str, range_str: str) -> list[dict]: ...
    async def his_write(self, id: str, items: list[dict]) -> None: ...
    async def point_write(self, id: str, level: int, val, who: str) -> list[dict]: ...
    async def watch_sub(self, watch_id: str, ids: list[str]) -> list[dict]: ...
    async def watch_unsub(self, watch_id: str, ids: list[str]) -> None: ...
    async def watch_poll(self, watch_id: str, refresh: bool) -> list[dict]: ...

See hs_py.storage.protocol for the full method signatures and type annotations.