"""Trend Log recording engine per ASHRAE 135-2020 Clause 12.25.
The :class:`TrendLogEngine` follows the same async lifecycle pattern as
:class:`EventEngine`. It manages polled, triggered, and COV-based
recording for all :class:`TrendLogObject` instances in the object database.
COV-based logging (Clause 12.25.13) uses property-change callbacks on
local objects to record values when the monitored property changes.
"""
from __future__ import annotations
import asyncio
import contextlib
import datetime
import logging
import time
from typing import TYPE_CHECKING, Any
from bac_py.types.constructed import BACnetDateTime, BACnetLogRecord
from bac_py.types.enums import LoggingType, ObjectType, PropertyIdentifier
from bac_py.types.primitives import BACnetDate, BACnetTime
if TYPE_CHECKING:
from bac_py.app.application import BACnetApplication
from bac_py.objects.trendlog import TrendLogObject
logger = logging.getLogger(__name__)
def _now_datetime() -> BACnetDateTime:
"""Create a BACnetDateTime from the current wall-clock time."""
n = datetime.datetime.now()
return BACnetDateTime(
date=BACnetDate(n.year, n.month, n.day, n.isoweekday()),
time=BACnetTime(n.hour, n.minute, n.second, n.microsecond // 10000),
)
def _datetime_to_float(dt: BACnetDateTime) -> float:
"""Convert a BACnetDateTime to a POSIX-ish float for comparison."""
try:
d = dt.date
t = dt.time
py_dt = datetime.datetime(
d.year if d.year != 0xFF else 2000,
d.month if d.month not in (0xFF, 13, 14) else 1,
d.day if d.day not in (0xFF, 32, 33, 34) else 1,
t.hour if t.hour != 0xFF else 0,
t.minute if t.minute != 0xFF else 0,
t.second if t.second != 0xFF else 0,
)
return py_dt.timestamp()
except (ValueError, OSError):
return 0.0
[docs]
class TrendLogEngine:
"""Async engine that drives polled, triggered, and COV-based trend log recording."""
def __init__(
self,
app: BACnetApplication,
*,
scan_interval: float = 1.0,
) -> None:
self._app = app
self._scan_interval = scan_interval
self._task: asyncio.Task[None] | None = None
# Track last poll time per TrendLog OID (monotonic seconds)
self._last_poll: dict[Any, float] = {}
# Track which TrendLog OIDs have active COV subscriptions
self._cov_subscriptions: dict[Any, bool] = {}
# --- Lifecycle ---
[docs]
async def start(self) -> None:
"""Start the periodic recording loop."""
if self._task is not None:
return
logger.info("TrendLogEngine started")
self._task = asyncio.create_task(self._run_loop())
[docs]
async def stop(self) -> None:
"""Stop the recording loop and clean up COV subscriptions."""
if self._task is not None:
self._task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._task
self._task = None
self._last_poll.clear()
self._unregister_all_cov()
logger.info("TrendLogEngine stopped")
# --- Main loop ---
async def _run_loop(self) -> None:
"""Periodically check all TrendLog objects."""
try:
while True:
self._evaluate_cycle()
await asyncio.sleep(self._scan_interval)
except asyncio.CancelledError:
return
def _evaluate_cycle(self) -> None:
"""Run one evaluation cycle across all TrendLog objects."""
db = self._app.object_db
now_mono = time.monotonic()
for tl_obj in db.get_objects_of_type(ObjectType.TREND_LOG):
self._evaluate_trendlog(tl_obj, now_mono) # type: ignore[arg-type]
def _evaluate_trendlog(self, tl: TrendLogObject, now_mono: float) -> None:
"""Evaluate a single TrendLog object."""
oid = tl.object_identifier
log_enable = tl.read_property(PropertyIdentifier.LOG_ENABLE)
if not log_enable:
# If disabled, unsubscribe any COV callback
if oid in self._cov_subscriptions:
self._unregister_cov(tl)
return
# Check start_time / stop_time window
if not self._within_time_window(tl):
# Outside time window, unsubscribe COV
if oid in self._cov_subscriptions:
self._unregister_cov(tl)
return
logging_type = tl.read_property(PropertyIdentifier.LOGGING_TYPE)
if logging_type == LoggingType.POLLED:
# If switching away from COV, clean up
if oid in self._cov_subscriptions:
self._unregister_cov(tl)
self._handle_polled(tl, now_mono)
elif logging_type == LoggingType.TRIGGERED:
if oid in self._cov_subscriptions:
self._unregister_cov(tl)
self._handle_triggered(tl)
elif logging_type == LoggingType.COV:
self._handle_cov(tl)
def _within_time_window(self, tl: TrendLogObject) -> bool:
"""Check if we're within the TrendLog's start/stop time window."""
now_ts = datetime.datetime.now().timestamp()
start_time = tl._properties.get(PropertyIdentifier.START_TIME)
if start_time is not None and _datetime_to_float(start_time) > now_ts:
return False
stop_time = tl._properties.get(PropertyIdentifier.STOP_TIME)
return not (stop_time is not None and _datetime_to_float(stop_time) < now_ts)
def _handle_polled(self, tl: TrendLogObject, now_mono: float) -> None:
"""Handle polled logging (Clause 12.25.12)."""
log_interval = tl._properties.get(PropertyIdentifier.LOG_INTERVAL, 0)
if log_interval <= 0:
return
# log_interval is in centiseconds per spec
interval_secs = log_interval / 100.0
oid = tl.object_identifier
last = self._last_poll.get(oid, 0.0)
# Handle align_intervals
align = tl._properties.get(PropertyIdentifier.ALIGN_INTERVALS, False)
offset = tl._properties.get(PropertyIdentifier.INTERVAL_OFFSET, 0)
offset_secs = offset / 100.0
if align and interval_secs > 0:
# Align to wall-clock boundaries
now_wall = datetime.datetime.now().timestamp()
# Seconds since midnight
midnight = (
datetime.datetime.now()
.replace(hour=0, minute=0, second=0, microsecond=0)
.timestamp()
)
elapsed = now_wall - midnight + offset_secs
# Check if we've crossed an interval boundary since last poll
current_slot = int(elapsed / interval_secs)
last_wall = self._last_poll.get(oid)
if last_wall is not None:
last_elapsed = (now_wall - (now_mono - last_wall)) - midnight + offset_secs
last_slot = int(last_elapsed / interval_secs)
if current_slot <= last_slot:
return
else:
if now_mono - last < interval_secs:
return
self._last_poll[oid] = now_mono
self._record_value(tl)
def _handle_triggered(self, tl: TrendLogObject) -> None:
"""Handle triggered logging (Clause 12.25.14)."""
trigger = tl._properties.get(PropertyIdentifier.TRIGGER, False)
if trigger:
tl._properties[PropertyIdentifier.TRIGGER] = False
self._record_value(tl)
def _handle_cov(self, tl: TrendLogObject) -> None:
"""Handle COV-based logging (Clause 12.25.13).
Registers a property-change callback on the monitored object
so that values are recorded whenever the property changes.
"""
oid = tl.object_identifier
if oid in self._cov_subscriptions:
return # Already subscribed
db = self._app.object_db
ref = tl._properties.get(PropertyIdentifier.LOG_DEVICE_OBJECT_PROPERTY)
if ref is None:
return
target = db.get(ref.object_identifier)
if target is None:
return
prop_id = PropertyIdentifier(ref.property_identifier)
def _on_change(_prop_id: PropertyIdentifier, _old: Any, new_value: Any) -> None:
"""Record value when the monitored property changes."""
if not tl.read_property(PropertyIdentifier.LOG_ENABLE):
return
if not self._within_time_window(tl):
return
status_flags = None
with contextlib.suppress(Exception):
status_flags = target.read_property(PropertyIdentifier.STATUS_FLAGS)
record = BACnetLogRecord(
timestamp=_now_datetime(),
log_datum=new_value,
status_flags=status_flags,
)
tl.append_record(record)
# Store the callback reference for cleanup
tl._cov_callback = _on_change # type: ignore[attr-defined]
db.register_change_callback(ref.object_identifier, prop_id, _on_change)
self._cov_subscriptions[oid] = True
def _unregister_cov(self, tl: TrendLogObject) -> None:
"""Remove COV subscription for a single TrendLog."""
oid = tl.object_identifier
if oid not in self._cov_subscriptions:
return
db = self._app.object_db
ref = tl._properties.get(PropertyIdentifier.LOG_DEVICE_OBJECT_PROPERTY)
callback = getattr(tl, "_cov_callback", None)
if ref is not None and callback is not None:
prop_id = PropertyIdentifier(ref.property_identifier)
db.unregister_change_callback(ref.object_identifier, prop_id, callback)
del self._cov_subscriptions[oid]
def _unregister_all_cov(self) -> None:
"""Remove all COV subscriptions."""
db = self._app.object_db
for tl_obj in db.get_objects_of_type(ObjectType.TREND_LOG):
self._unregister_cov(tl_obj) # type: ignore[arg-type]
self._cov_subscriptions.clear()
def _record_value(self, tl: TrendLogObject) -> None:
"""Read the monitored property and append a log record."""
db = self._app.object_db
ref = tl._properties.get(PropertyIdentifier.LOG_DEVICE_OBJECT_PROPERTY)
if ref is None:
return
# Read the monitored property
target = db.get(ref.object_identifier)
if target is None:
logger.warning(
"TrendLog %s: monitored object %s not found",
tl.object_identifier,
ref.object_identifier,
)
return
try:
prop_id = PropertyIdentifier(ref.property_identifier)
value: Any = target.read_property(
prop_id,
array_index=ref.property_array_index,
)
except Exception:
logger.warning(
"TrendLog %s: failed to read %s.%s",
tl.object_identifier,
ref.object_identifier,
ref.property_identifier,
exc_info=True,
)
return
# Read status flags if available
status_flags = None
with contextlib.suppress(Exception):
status_flags = target.read_property(PropertyIdentifier.STATUS_FLAGS)
record = BACnetLogRecord(
timestamp=_now_datetime(),
log_datum=value,
status_flags=status_flags,
)
logger.debug("trend sample %s: value=%s", tl.object_identifier, value)
tl.append_record(record)