Storage Backends¶
haystack-py ships with three storage backends that implement the
StorageAdapter protocol. All backends
support the same set of Haystack operations — entity CRUD, filter reads,
navigation, history, point writes, and watch subscriptions.
Choose the backend that matches your deployment needs:
Backend |
Best For |
Dependencies |
|---|---|---|
Memory |
Testing, prototyping |
None (built-in) |
Redis |
Production, horizontal scaling |
|
TimescaleDB |
SQL analytics, time-series queries |
|
Memory Backend¶
InMemoryAdapter stores entities and history
in-memory. Useful for unit tests and rapid prototyping. All data is lost when
the process exits.
from hs_py.storage.memory import InMemoryAdapter
from hs_py.kinds import Ref, MARKER
# Option 1: Pass entities at construction time
storage = InMemoryAdapter(entities=[
{"id": Ref("site-1"), "dis": "HQ", "site": MARKER},
])
# Option 2: Bulk-load after construction
storage = InMemoryAdapter()
storage.load_entities([
{"id": Ref("site-1"), "dis": "HQ", "site": MARKER},
])
No configuration is required. Pass the adapter to
create_fastapi_app():
from hs_py.fastapi_server import create_fastapi_app
from hs_py.storage.memory import InMemoryAdapter
app = create_fastapi_app(storage=InMemoryAdapter())
Redis Backend¶
The Redis backend uses RediSearch for full-text indexed entity queries and RedisTimeSeries for time-series history storage.
The implementation is split across two modules:
hs_py.redis_ops— Low-level Redis operations (entity hash maps, RediSearch indexing, TimeSeries commands)hs_py.storage.redis—RedisAdapterwrappingredis_opsbehind theStorageAdapterprotocol
Configuration¶
Set the REDIS_URL environment variable or pass it to the adapter:
from hs_py.storage.redis import RedisAdapter
adapter = RedisAdapter(redis_url="redis://localhost:6379")
Docker Compose¶
services:
redis:
image: redis/redis-stack-server:latest
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 3s
Redis Stack includes RediSearch and RedisTimeSeries modules automatically.
Seeding Data¶
Use the /load endpoint or call the adapter directly:
import json
with open("_data/Alpha/alpha.json") as f:
entities = json.load(f)
for entity in entities:
await adapter.create(entity)
TimescaleDB Backend¶
TimescaleAdapter stores entities as
PostgreSQL JSONB rows and time-series data in TimescaleDB hypertables. Filter
expressions are translated to SQL WHERE clauses for server-side pushdown.
Schema¶
The adapter auto-creates two tables on connect():
CREATE TABLE IF NOT EXISTS entities (
id TEXT PRIMARY KEY,
tags JSONB NOT NULL
);
CREATE TABLE IF NOT EXISTS history (
point_id TEXT NOT NULL,
ts TIMESTAMPTZ NOT NULL,
val DOUBLE PRECISION
);
-- TimescaleDB hypertable for history
SELECT create_hypertable('history', 'ts', if_not_exists => TRUE);
Configuration¶
Pass a PostgreSQL DSN to the adapter:
from hs_py.storage.timescale import TimescaleAdapter
adapter = TimescaleAdapter(dsn="postgresql://user:pass@localhost:5432/haystack")
await adapter.connect()
# ... use adapter ...
await adapter.close()
Or use environment variables:
export TIMESCALE_DSN="postgresql://user:pass@localhost:5432/haystack"
Docker Compose¶
services:
timescaledb:
image: timescale/timescaledb:latest-pg16
ports:
- "5432:5432"
environment:
POSTGRES_USER: haystack
POSTGRES_PASSWORD: haystack
POSTGRES_DB: haystack
healthcheck:
test: ["CMD-SHELL", "pg_isready -U haystack"]
interval: 3s
Filter Pushdown¶
Haystack filter expressions are compiled to SQL WHERE clauses. The
_ast_to_sql method translates filter AST nodes to parameterised
PostgreSQL queries using JSONB operators:
has→tags ? 'tagName'missing→NOT (tags ? 'tagName')==→tags->>'tagName' = $1!=→tags->>'tagName' != $1> / >= / < / <=→(tags->>'tagName')::float > $1::float
Ref-valued tag comparisons use the nested JSONB path
tags->'tagName'->>'val' to extract the reference id string.
History Queries¶
Time-series data is stored in a TimescaleDB hypertable for efficient range queries. Use standard Haystack date range strings:
# Single day
his = await adapter.his_read("point-1", "2024-06-15")
# Date range
his = await adapter.his_read("point-1", "2024-06-01,2024-06-30")
StorageAdapter Protocol¶
All backends implement the StorageAdapter
protocol. To create a custom backend, implement these methods:
from hs_py.storage.protocol import StorageAdapter
class MyAdapter(StorageAdapter):
async def about(self) -> dict: ...
async def read(self, filter_str: str, limit: int) -> list[dict]: ...
async def read_by_ids(self, ids: list[str]) -> list[dict]: ...
async def nav(self, nav_id: str | None) -> list[dict]: ...
async def his_read(self, id: str, range_str: str) -> list[dict]: ...
async def his_write(self, id: str, items: list[dict]) -> None: ...
async def point_write(self, id: str, level: int, val, who: str) -> list[dict]: ...
async def watch_sub(self, watch_id: str, ids: list[str]) -> list[dict]: ...
async def watch_unsub(self, watch_id: str, ids: list[str]) -> None: ...
async def watch_poll(self, watch_id: str, refresh: bool) -> list[dict]: ...
See hs_py.storage.protocol for the full method signatures and
type annotations.