Source code for hs_py.ontology.normalize

"""Normalization pipeline for compiling raw defs into a resolved namespace.

Implements a simplified version of the Haystack normalization pipeline:

1. Parse: convert Trio text to raw tag dicts
2. Resolve: create Def objects from parsed records
3. Taxonify: compute conjunct supertypes
4. Inherit: propagate tags down the taxonomy tree
5. Validate: check for missing references and cycles

See: https://project-haystack.org/doc/docHaystack/Normalization
"""

from __future__ import annotations

from collections import deque

from hs_py.kinds import Symbol
from hs_py.ontology.defs import Def, Lib
from hs_py.ontology.namespace import Namespace
from hs_py.ontology.taxonomy import is_conjunct, resolve_conjunct_parts

__all__ = [
    "NormalizeError",
    "compile_namespace",
]


[docs] class NormalizeError(ValueError): """Raised when normalization encounters an error."""
[docs] def compile_namespace(libs: list[Lib]) -> Namespace: """Run the normalization pipeline on a set of libs. :param libs: Libraries to compile. :returns: Fully resolved Namespace. :raises NormalizeError: If validation fails. """ # Step 1: Collect all defs across all libs all_defs: list[Def] = [] for lib in libs: all_defs.extend(lib.defs) # Step 2: Build initial name index by_name: dict[str, Def] = {} for d in all_defs: by_name[d.symbol.val] = d if d.name not in by_name: by_name[d.name] = d # Step 3: Taxonify — generate conjunct supertypes all_defs = _taxonify(all_defs, by_name) # Step 4: Rebuild libs with updated defs normalized_libs = _rebuild_libs(libs, all_defs) # Step 5: Build namespace ns = Namespace(normalized_libs) # Step 6: Validate _validate(ns) return ns
def _taxonify(defs: list[Def], by_name: dict[str, Def]) -> list[Def]: """Compute conjunct supertypes for compound terms. For a conjunct like ``hot-water``, its ``is`` tag should include the individual parts (``hot``, ``water``) as supertypes if they exist as defs. """ updated: list[Def] = [] for d in defs: if is_conjunct(d.symbol.val): parts = resolve_conjunct_parts(d.symbol.val) existing_is = d.is_list existing_names = {s.val for s in existing_is} new_supers = list(existing_is) for part in parts: if part in by_name and part not in existing_names: new_supers.append(Symbol(part)) existing_names.add(part) if len(new_supers) != len(existing_is): new_tags = dict(d.tags) new_tags["is"] = new_supers if len(new_supers) > 1 else new_supers[0] updated.append(Def(symbol=d.symbol, tags=new_tags)) else: updated.append(d) else: updated.append(d) return updated def _rebuild_libs(original_libs: list[Lib], all_defs: list[Def]) -> list[Lib]: """Rebuild libs using potentially updated defs.""" # Index updated defs by symbol updated: dict[str, Def] = {d.symbol.val: d for d in all_defs} rebuilt: list[Lib] = [] for lib in original_libs: new_defs = [updated.get(d.symbol.val, d) for d in lib.defs] rebuilt.append( Lib( symbol=lib.symbol, version=lib.version, depends=lib.depends, base_uri=lib.base_uri, defs=tuple(new_defs), ) ) return rebuilt def _validate(ns: Namespace) -> None: """Validate the namespace for common errors.""" errors: list[str] = [] for d in ns.all_defs(): # Check that all supertypes exist for parent_sym in d.is_list: if not ns.has(parent_sym.val): errors.append(f"{d.symbol.val}: supertype {parent_sym.val!r} not found") # Check for cycles in the taxonomy for d in ns.all_defs(): if _has_cycle(ns, d.symbol.val): errors.append(f"{d.symbol.val}: cycle detected in is-hierarchy") if errors: msg = "Normalization errors:\n" + "\n".join(f" - {e}" for e in errors) raise NormalizeError(msg) def _has_cycle(ns: Namespace, start: str) -> bool: """Check if following the is-chain from *start* leads back to *start*.""" visited: set[str] = set() queue = deque([start]) first = True while queue: current = queue.popleft() if current in visited: if current == start and not first: return True continue visited.add(current) first = False d = ns.get(current) if d is None: continue for parent_sym in d.is_list: queue.append(parent_sym.val) return False