Source code for hs_py.encoding.zinc

"""Haystack Zinc encoding and decoding.

Zinc is the primary text format for Haystack data. It encodes grids as a
line-oriented text format with typed scalar values.

See: https://project-haystack.org/doc/docHaystack/Zinc
"""

from __future__ import annotations

import datetime
from typing import Any

from hs_py.encoding.scanner import (
    escape_str,
    format_number,
    format_ref,
    scan_str,
    scan_tag_name,
    scan_val,
    skip_ws,
    tz_name,
)
from hs_py.grid import Col, Grid
from hs_py.kinds import (
    MARKER,
    Coord,
    Marker,
    Na,
    Number,
    Ref,
    Remove,
    Symbol,
    Uri,
    XStr,
)

__all__ = [
    "decode_grid",
    "decode_val",
    "encode_grid",
    "encode_val",
]

_ZINC_VER = "3.0"

# Maximum grid dimensions when decoding Zinc (matches JSON decoder limits).
_MAX_GRID_ROWS = 200_000
_MAX_GRID_COLS = 10_000


# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------


[docs] def encode_val(val: Any) -> str: """Encode a single Haystack value as Zinc text. :param val: Haystack value to encode. :returns: Zinc-encoded string. """ return _encode(val)
[docs] def decode_val(text: str) -> Any: """Decode a Zinc-encoded scalar value string. :param text: Zinc value text. :returns: Parsed Haystack value. """ text = text.strip() if not text: return None val, _ = scan_val(text, 0) return val
[docs] def encode_grid(grid: Grid) -> str: """Encode a Grid as Zinc text. :param grid: Grid to encode. :returns: Zinc-encoded grid string. """ lines: list[str] = [] # Version + metadata line ver_parts = [f'ver:"{_ZINC_VER}"'] for k, v in grid.meta.items(): ver_parts.append(_encode_tag(k, v)) lines.append(" ".join(ver_parts)) # Column definitions if not grid.cols: lines.append("empty") else: col_parts: list[str] = [] for col in grid.cols: parts = [col.name] for mk, mv in col.meta.items(): parts.append(_encode_tag(mk, mv)) col_parts.append(" ".join(parts)) lines.append(",".join(col_parts)) # Data rows for row in grid.rows: cells: list[str] = [] for col in grid.cols: val = row.get(col.name) cells.append(_encode(val)) lines.append(",".join(cells)) return "\n".join(lines)
[docs] def decode_grid(text: str, *, _depth: int = 0) -> Grid: """Decode Zinc text into a Grid. :param text: Zinc grid text. :returns: Decoded Grid. """ lines = [ln for ln in text.split("\n") if ln.strip()] if not lines: return Grid() # Parse version + metadata meta = _parse_ver_line(lines[0]) if len(lines) < 2: return Grid(meta=meta) # Parse columns cols = _parse_cols_line(lines[1]) if len(cols) > _MAX_GRID_COLS: msg = f"Grid exceeds maximum column count of {_MAX_GRID_COLS}" raise ValueError(msg) # Check for empty grid marker if len(cols) == 1 and cols[0].name == "empty" and not cols[0].meta: return Grid(meta=meta) # Parse rows data_lines = lines[2:] if len(data_lines) > _MAX_GRID_ROWS: msg = f"Grid exceeds maximum row count of {_MAX_GRID_ROWS}" raise ValueError(msg) rows: list[dict[str, Any]] = [] for line in data_lines: row = _parse_row_line(line, cols, _depth=_depth) rows.append(row) return Grid(meta=meta, cols=tuple(cols), rows=tuple(rows))
# --------------------------------------------------------------------------- # Scalar encoding helpers # --------------------------------------------------------------------------- def _encode(val: Any) -> str: """Encode any Haystack value as Zinc text.""" if val is None: return "N" if isinstance(val, bool): return "T" if val else "F" if isinstance(val, Marker): return "M" if isinstance(val, Na): return "NA" if isinstance(val, Remove): return "R" if isinstance(val, Number): return format_number(val) if isinstance(val, str): return _encode_str(val) if isinstance(val, Ref): return format_ref(val, zinc=True) if isinstance(val, Symbol): return f"^{val.val}" if isinstance(val, Uri): return _encode_uri(val) if isinstance(val, Coord): return f"C({val.lat},{val.lng})" if isinstance(val, XStr): return f'{val.type_name}("{escape_str(val.val)}")' if isinstance(val, datetime.datetime): return _encode_datetime(val) if isinstance(val, datetime.date): return val.isoformat() if isinstance(val, datetime.time): return val.isoformat() if isinstance(val, Grid): return _encode_nested_grid(val) if isinstance(val, list): items = ", ".join([_encode(v) for v in val]) return f"[{items}]" if isinstance(val, dict): return _encode_dict(val) if isinstance(val, int | float): return format_number(Number(float(val))) msg = f"Cannot encode {type(val).__name__} as Zinc" raise TypeError(msg) def _encode_str(s: str) -> str: return f'"{escape_str(s)}"' def _encode_uri(uri: Uri) -> str: escaped = uri.val.replace("\\", "\\\\").replace("`", "\\`") return f"`{escaped}`" def _encode_datetime(dt: datetime.datetime) -> str: iso = dt.isoformat() tz = tz_name(dt) if tz is not None: return f"{iso} {tz}" return iso def _encode_dict(d: dict[str, Any]) -> str: parts: list[str] = [] for k, v in d.items(): parts.append(_encode_tag(k, v)) return "{" + " ".join(parts) + "}" def _encode_nested_grid(grid: Grid) -> str: inner = encode_grid(grid) return f"<<\n{inner}\n>>" def _encode_tag(name: str, val: Any) -> str: """Encode a tag as ``name`` (marker) or ``name:value``.""" if isinstance(val, Marker): return name return f"{name}:{_encode(val)}" # --------------------------------------------------------------------------- # Grid decoding helpers # --------------------------------------------------------------------------- def _parse_ver_line(line: str) -> dict[str, Any]: """Parse the ``ver:"3.0" tag1:val tag2`` metadata line.""" if not line.startswith("ver:"): msg = f"Zinc grid must start with 'ver:', got: {line!r}" raise ValueError(msg) pos = 4 # Parse and discard the version string _, pos = scan_str(line, pos) # Parse metadata tags meta: dict[str, Any] = {} while pos < len(line): pos = skip_ws(line, pos) if pos >= len(line): break name, pos = _scan_tag_name(line, pos) if not name: break if pos < len(line) and line[pos] == ":": pos += 1 val, pos = scan_val(line, pos) meta[name] = val else: meta[name] = MARKER return meta def _parse_cols_line(line: str) -> list[Col]: """Parse the column definitions line.""" cols: list[Col] = [] pos = 0 while pos < len(line): pos = skip_ws(line, pos) if pos >= len(line): break # Parse column name name, pos = _scan_tag_name(line, pos) if not name: break # Parse column metadata until comma or end of line meta: dict[str, Any] = {} while pos < len(line) and line[pos] != ",": pos = skip_ws(line, pos) if pos >= len(line) or line[pos] == ",": break # Bare display string → implicit dis tag if line[pos] == '"': dis_val, pos = scan_str(line, pos) meta["dis"] = dis_val continue mname, pos = _scan_tag_name(line, pos) if not mname: break if pos < len(line) and line[pos] == ":": pos += 1 val, pos = scan_val(line, pos) meta[mname] = val else: meta[mname] = MARKER cols.append(Col(name=name, meta=meta)) # Skip comma separator if pos < len(line) and line[pos] == ",": pos += 1 return cols def _parse_row_line(line: str, cols: list[Col], *, _depth: int = 0) -> dict[str, Any]: """Parse a data row line into a dict keyed by column names.""" row: dict[str, Any] = {} pos = 0 for i, col in enumerate(cols): pos = skip_ws(line, pos) if pos >= len(line): break # Empty cell (consecutive comma or trailing) if line[pos] == ",": if i < len(cols) - 1: pos += 1 continue # Parse value val, pos = scan_val(line, pos, _depth=_depth) if val is not None: row[col.name] = val # Skip comma after value pos = skip_ws(line, pos) if pos < len(line) and line[pos] == ",": pos += 1 return row _scan_tag_name = scan_tag_name