Source code for bac_py.app.event_engine

"""Event state machine, algorithm evaluators, and async engine per ASHRAE 135-2020 Clause 13.

The :class:`EventStateMachine` implements the state transition logic of
Clause 13.2.  Each ``evaluate_*`` function implements one of the 18 event
algorithm evaluators defined in Clause 13.3.

The :class:`EventEngine` is the async integration layer that periodically
evaluates ``EventEnrollment`` objects and intrinsic-reporting objects,
drives the state machines, and dispatches ``EventNotificationRequest``
PDUs on state transitions.

The state machine and evaluators are **pure logic** -- no async, no I/O,
no side effects.  The ``EventEngine`` provides the async scheduling and
notification dispatch wrapper.
"""

from __future__ import annotations

import asyncio
import contextlib
import logging
import time
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any

from bac_py.types.enums import (
    EventState,
    EventType,
    NotifyType,
    ObjectType,
    PropertyIdentifier,
    Reliability,
)

if TYPE_CHECKING:
    from collections.abc import Callable

    from bac_py.app.application import BACnetApplication
    from bac_py.objects.base import BACnetObject
    from bac_py.types.enums import LifeSafetyState, TimerState

logger = logging.getLogger(__name__)


# ---------------------------------------------------------------------------
# EventTransition -- result of a state machine evaluation
# ---------------------------------------------------------------------------


[docs] @dataclass(frozen=True, slots=True) class EventTransition: """Result of a state-machine evaluation that triggered a transition. :param from_state: The state before the transition. :param to_state: The state after the transition. :param timestamp: Monotonic time when the transition fired. """ from_state: EventState to_state: EventState timestamp: float
# --------------------------------------------------------------------------- # EventStateMachine -- Clause 13.2 # ---------------------------------------------------------------------------
[docs] @dataclass(slots=True) class EventStateMachine: """Per-enrollment event state machine (Clause 13.2). Tracks state, timestamps, acknowledgments, and time-delay logic. Call :meth:`evaluate` each scan cycle with the event algorithm result and the fault algorithm result. Returns an :class:`EventTransition` when a state change fires, or ``None`` when no change occurs. :param event_state: Current event state. :param event_enable: Three-element list ``[to_offnormal, to_fault, to_normal]``. :param acked_transitions: Three-element list ``[to_offnormal, to_fault, to_normal]`` indicating which transitions have been acknowledged. :param time_delay: Seconds the event condition must persist before transitioning to an alarm state. :param time_delay_normal: Seconds the normal condition must persist before returning to ``NORMAL``. Defaults to *time_delay* when ``None``. """ event_state: EventState = EventState.NORMAL event_enable: list[bool] = field(default_factory=lambda: [True, True, True]) acked_transitions: list[bool] = field(default_factory=lambda: [True, True, True]) time_delay: float = 0.0 time_delay_normal: float | None = None # Internal: monotonic time when the pending condition was first detected. _pending_state: EventState | None = field(default=None, repr=False) _pending_since: float | None = field(default=None, repr=False) @property def effective_time_delay_normal(self) -> float: """Return the effective time-delay-normal value.""" return self.time_delay_normal if self.time_delay_normal is not None else self.time_delay
[docs] def evaluate( self, event_result: EventState | None, fault_result: Reliability, current_time: float, ) -> EventTransition | None: """Evaluate one scan cycle and return a transition if one fires. :param event_result: Target :class:`EventState` from the event algorithm, or ``None`` if no alarm condition is detected. :param fault_result: :class:`Reliability` from the fault algorithm. Any value other than ``NO_FAULT_DETECTED`` indicates a fault. :param current_time: Monotonic clock value (seconds). :returns: An :class:`EventTransition` if a state change fires, ``None`` otherwise. """ has_fault = fault_result != Reliability.NO_FAULT_DETECTED # --- FAULT transitions take priority (Clause 13.2.5) --- if has_fault and self.event_state != EventState.FAULT: if self.event_enable[1]: # to-fault enabled return self._transition(EventState.FAULT, current_time) return None # --- Staying in FAULT while still faulted --- if has_fault and self.event_state == EventState.FAULT: return None # --- Clearing from FAULT --- if self.event_state == EventState.FAULT and not has_fault: # Determine return state target = event_result if event_result is not None else EventState.NORMAL if target == EventState.NORMAL: if self.event_enable[2]: # to-normal enabled return self._apply_delay(target, current_time, normal=True) else: if self.event_enable[0]: # to-offnormal enabled return self._apply_delay(target, current_time, normal=False) # If transition disabled, stay in FAULT if self._pending_state != target: self._pending_state = None self._pending_since = None return None # --- NORMAL -> alarm transitions --- if self.event_state == EventState.NORMAL: if ( event_result is not None and event_result != EventState.NORMAL and self.event_enable[0] # to-offnormal enabled ): return self._apply_delay(event_result, current_time, normal=False) # Condition cleared while pending self._pending_state = None self._pending_since = None return None # --- Alarm -> NORMAL transitions --- if event_result is None or event_result == EventState.NORMAL: if self.event_enable[2]: # to-normal enabled return self._apply_delay(EventState.NORMAL, current_time, normal=True) self._pending_state = None self._pending_since = None return None # --- Alarm -> different alarm (e.g., HIGH_LIMIT -> LOW_LIMIT) --- if event_result != self.event_state and self.event_enable[0]: # to-offnormal return self._apply_delay(event_result, current_time, normal=False) # Staying in same alarm state -- clear any pending self._pending_state = None self._pending_since = None return None
def _apply_delay( self, target: EventState, current_time: float, *, normal: bool, ) -> EventTransition | None: """Apply time-delay logic and return a transition if the delay has elapsed.""" delay = self.effective_time_delay_normal if normal else self.time_delay if self._pending_state != target: # New condition -- start timing self._pending_state = target self._pending_since = current_time if delay <= 0: return self._transition(target, current_time) return None # Same condition still active -- check if delay elapsed assert self._pending_since is not None if current_time - self._pending_since >= delay: return self._transition(target, current_time) return None def _transition(self, target: EventState, current_time: float) -> EventTransition: """Execute a state transition and update internal bookkeeping.""" old = self.event_state self.event_state = target self._pending_state = None self._pending_since = None # Update acked_transitions: mark the relevant transition as unacknowledged if target == EventState.FAULT: self.acked_transitions[1] = False elif target == EventState.NORMAL: self.acked_transitions[2] = False else: self.acked_transitions[0] = False return EventTransition(from_state=old, to_state=target, timestamp=current_time)
# --------------------------------------------------------------------------- # Event Algorithm Evaluators -- Clause 13.3 # --------------------------------------------------------------------------- # # Each evaluator is a pure function returning the target EventState if an # alarm condition is detected, or None if the monitored value is normal. # # Group A -- Threshold-based # ---------------------------------------------------------------------------
[docs] def evaluate_out_of_range( value: float, high_limit: float, low_limit: float, deadband: float, *, current_state: EventState = EventState.NORMAL, ) -> EventState | None: """Evaluate OUT_OF_RANGE (Clause 13.3.6). :param value: Current monitored real value. :param high_limit: High-limit threshold. :param low_limit: Low-limit threshold. :param deadband: Hysteresis value for returning to normal. :param current_state: The current event state (for deadband logic). :returns: Target :class:`EventState` or ``None``. """ if value > high_limit: return EventState.HIGH_LIMIT if value < low_limit: return EventState.LOW_LIMIT # Deadband: must drop below (high_limit - deadband) to go normal from HIGH_LIMIT if current_state == EventState.HIGH_LIMIT and value >= high_limit - deadband: return EventState.HIGH_LIMIT if current_state == EventState.LOW_LIMIT and value <= low_limit + deadband: return EventState.LOW_LIMIT return None
[docs] def evaluate_double_out_of_range( value: float, high_limit: float, low_limit: float, deadband: float, *, current_state: EventState = EventState.NORMAL, ) -> EventState | None: """Evaluate DOUBLE_OUT_OF_RANGE (Clause 13.3.14). Same logic as OUT_OF_RANGE but for Double precision values. """ return evaluate_out_of_range( value, high_limit, low_limit, deadband, current_state=current_state )
[docs] def evaluate_signed_out_of_range( value: int, high_limit: int, low_limit: int, deadband: int, *, current_state: EventState = EventState.NORMAL, ) -> EventState | None: """Evaluate SIGNED_OUT_OF_RANGE (Clause 13.3.15). Same logic as OUT_OF_RANGE but for Signed integer values. """ return evaluate_out_of_range( value, high_limit, low_limit, deadband, current_state=current_state )
[docs] def evaluate_unsigned_out_of_range( value: int, high_limit: int, low_limit: int, deadband: int, *, current_state: EventState = EventState.NORMAL, ) -> EventState | None: """Evaluate UNSIGNED_OUT_OF_RANGE (Clause 13.3.16). Same logic as OUT_OF_RANGE but for Unsigned integer values. """ return evaluate_out_of_range( value, high_limit, low_limit, deadband, current_state=current_state )
[docs] def evaluate_unsigned_range( value: int, high_limit: int, low_limit: int, ) -> EventState | None: """Evaluate UNSIGNED_RANGE (Clause 13.3.11). Simpler variant with no deadband. :param value: Current monitored unsigned value. :param high_limit: High-limit threshold. :param low_limit: Low-limit threshold. :returns: Target :class:`EventState` or ``None``. """ if value > high_limit: return EventState.HIGH_LIMIT if value < low_limit: return EventState.LOW_LIMIT return None
[docs] def evaluate_floating_limit( value: float, setpoint: float, high_diff_limit: float, low_diff_limit: float, deadband: float, *, current_state: EventState = EventState.NORMAL, ) -> EventState | None: """Evaluate FLOATING_LIMIT (Clause 13.3.5). Limits are relative to *setpoint*: ``setpoint + high_diff_limit`` and ``setpoint - low_diff_limit``. :param value: Current monitored real value. :param setpoint: Reference setpoint. :param high_diff_limit: Positive offset above setpoint for high limit. :param low_diff_limit: Positive offset below setpoint for low limit. :param deadband: Hysteresis value. :param current_state: Current event state for deadband logic. :returns: Target :class:`EventState` or ``None``. """ high_limit = setpoint + high_diff_limit low_limit = setpoint - low_diff_limit return evaluate_out_of_range( value, high_limit, low_limit, deadband, current_state=current_state )
# --------------------------------------------------------------------------- # Group B -- Set membership # ---------------------------------------------------------------------------
[docs] def evaluate_change_of_state( value: int, alarm_values: tuple[int, ...], ) -> EventState | None: """Evaluate CHANGE_OF_STATE (Clause 13.3.2). :param value: Current enumerated property value (as int). :param alarm_values: Tuple of enumerated values that trigger OFFNORMAL. :returns: ``OFFNORMAL`` if *value* is in *alarm_values*, else ``None``. """ if value in alarm_values: return EventState.OFFNORMAL return None
[docs] def evaluate_change_of_bitstring( value: tuple[int, ...], bitmask: tuple[int, ...], alarm_values: tuple[tuple[int, ...], ...], ) -> EventState | None: """Evaluate CHANGE_OF_BITSTRING (Clause 13.3.1). Applies *bitmask* to *value* and checks if the masked result matches any entry in *alarm_values*. :param value: Current bitstring as tuple of bit values (0/1). :param bitmask: Bitmask to AND with *value*. :param alarm_values: Set of masked bitstring values that trigger OFFNORMAL. :returns: ``OFFNORMAL`` if masked value matches any alarm value, else ``None``. """ masked = tuple(v & m for v, m in zip(value, bitmask, strict=False)) if masked in alarm_values: return EventState.OFFNORMAL return None
[docs] def evaluate_change_of_life_safety( tracking_value: LifeSafetyState, mode: int, alarm_values: tuple[int, ...], life_safety_alarm_values: tuple[int, ...], ) -> EventState | None: """Evaluate CHANGE_OF_LIFE_SAFETY (Clause 13.3.8). :param tracking_value: Current life-safety state. :param mode: Current life-safety mode (as int). :param alarm_values: States triggering OFFNORMAL. :param life_safety_alarm_values: States triggering LIFE_SAFETY_ALARM. :returns: Target state or ``None``. """ _ = mode # Mode filtering is caller's responsibility val = int(tracking_value) if val in life_safety_alarm_values: return EventState.LIFE_SAFETY_ALARM if val in alarm_values: return EventState.OFFNORMAL return None
[docs] def evaluate_change_of_characterstring( value: str, alarm_values: tuple[str, ...], ) -> EventState | None: """Evaluate CHANGE_OF_CHARACTERSTRING (Clause 13.3.17). :param value: Current character string value. :param alarm_values: Strings that trigger OFFNORMAL. :returns: ``OFFNORMAL`` if *value* is in *alarm_values*, else ``None``. """ if value in alarm_values: return EventState.OFFNORMAL return None
[docs] def evaluate_access_event( access_event: int, access_event_list: tuple[int, ...], ) -> EventState | None: """Evaluate ACCESS_EVENT (Clause 13.3.13). :param access_event: Current access event value (as int). :param access_event_list: Events that trigger OFFNORMAL. :returns: ``OFFNORMAL`` if *access_event* is in the list, else ``None``. """ if access_event in access_event_list: return EventState.OFFNORMAL return None
# --------------------------------------------------------------------------- # Group C -- Change detection # ---------------------------------------------------------------------------
[docs] def evaluate_change_of_value( value: float, previous_value: float, cov_increment: float, ) -> EventState | None: """Evaluate CHANGE_OF_VALUE (Clause 13.3.3). Triggers OFFNORMAL when the absolute change since the last reported value exceeds *cov_increment*. :param value: Current monitored value. :param previous_value: Value at last notification. :param cov_increment: Minimum change to trigger. :returns: ``OFFNORMAL`` if change exceeds increment, else ``None``. """ if abs(value - previous_value) >= cov_increment: return EventState.OFFNORMAL return None
[docs] def evaluate_change_of_status_flags( current_flags: tuple[bool, ...], previous_flags: tuple[bool, ...], selected_flags: tuple[bool, ...], ) -> EventState | None: """Evaluate CHANGE_OF_STATUS_FLAGS (Clause 13.3.18). Triggers OFFNORMAL when any selected flag has changed from its previous value. :param current_flags: Current status flags (in_alarm, fault, overridden, out_of_service). :param previous_flags: Previous status flags at last notification. :param selected_flags: Which flags to monitor (True = monitor). :returns: ``OFFNORMAL`` if any selected flag changed, else ``None``. """ for cur, prev, sel in zip(current_flags, previous_flags, selected_flags, strict=False): if sel and cur != prev: return EventState.OFFNORMAL return None
[docs] def evaluate_change_of_reliability( reliability: Reliability, ) -> EventState | None: """Evaluate CHANGE_OF_RELIABILITY (Clause 13.3.19). :param reliability: Current reliability value. :returns: ``OFFNORMAL`` if reliability is not ``NO_FAULT_DETECTED``, else ``None``. """ if reliability != Reliability.NO_FAULT_DETECTED: return EventState.OFFNORMAL return None
[docs] def evaluate_command_failure( feedback_value: Any, command_value: Any, ) -> EventState | None: """Evaluate COMMAND_FAILURE (Clause 13.3.4). Triggers OFFNORMAL when the feedback value does not match the commanded value (the time-delay enforcement is handled by the state machine, not this evaluator). :param feedback_value: Current feedback property value. :param command_value: Most recent commanded value. :returns: ``OFFNORMAL`` if feedback != command, else ``None``. """ if feedback_value != command_value: return EventState.OFFNORMAL return None
# --------------------------------------------------------------------------- # Group D -- Specialized # ---------------------------------------------------------------------------
[docs] def evaluate_buffer_ready( current_count: int, previous_count: int, notification_threshold: int, ) -> EventState | None: """Evaluate BUFFER_READY (Clause 13.3.10). Triggers OFFNORMAL when the number of new records since the last notification meets or exceeds the threshold. :param current_count: Current record count. :param previous_count: Record count at last notification. :param notification_threshold: Minimum new records to trigger. :returns: ``OFFNORMAL`` if threshold met, else ``None``. """ if current_count - previous_count >= notification_threshold: return EventState.OFFNORMAL return None
[docs] def evaluate_extended( monitored_value: Any, params: Any, *, vendor_callback: Callable[[Any, Any], EventState | None] | None = None, ) -> EventState | None: """Evaluate EXTENDED (Clause 13.3.9). Vendor-specific algorithm. Delegates to *vendor_callback* if provided. :param monitored_value: Current property value. :param params: Vendor-specific parameters. :param vendor_callback: Optional callable implementing vendor logic. :returns: Target state from callback, or ``None``. """ if vendor_callback is not None: return vendor_callback(monitored_value, params) return None
[docs] def evaluate_change_of_timer( timer_state: TimerState, alarm_values: tuple[int, ...], ) -> EventState | None: """Evaluate CHANGE_OF_TIMER (Clause 13.3.20, new in 2020). :param timer_state: Current timer state. :param alarm_values: Timer state values (as int) that trigger OFFNORMAL. :returns: ``OFFNORMAL`` if current state is in alarm values, else ``None``. """ if int(timer_state) in alarm_values: return EventState.OFFNORMAL return None
[docs] def evaluate_change_of_discrete_value( current_value: Any, previous_value: Any, ) -> EventState | None: """Evaluate CHANGE_OF_DISCRETE_VALUE (Clause 13.3.21, new in 2020). Fires OFFNORMAL when the monitored property's discrete value changes. Applies to Integer, Unsigned, Large-Analog, and other discrete-valued objects. :param current_value: Current monitored property value. :param previous_value: Previous monitored property value. :returns: ``OFFNORMAL`` if values differ, else ``None``. """ if current_value != previous_value: return EventState.OFFNORMAL return None
# --------------------------------------------------------------------------- # EventEngine -- Async integration layer (Clause 13) # --------------------------------------------------------------------------- class _EnrollmentContext: """Per-enrollment tracking state for the EventEngine.""" __slots__ = ("last_reliability", "state_machine") def __init__(self) -> None: self.state_machine = EventStateMachine() self.last_reliability: Reliability = Reliability.NO_FAULT_DETECTED
[docs] class EventEngine: """Async event/alarm evaluation engine per Clause 13. Mirrors the ``COVManager`` lifecycle pattern: - Constructed with a reference to :class:`BACnetApplication`. - :meth:`start` launches the periodic evaluation loop. - :meth:`stop` cancels the loop and cleans up. Each evaluation cycle iterates ``EventEnrollment`` objects and intrinsic-reporting objects in the object database, runs fault algorithms (Clause 13.4) then event algorithms (Clause 13.3), feeds results to per-enrollment :class:`EventStateMachine` instances, and dispatches ``EventNotificationRequest`` PDUs on transitions. """ def __init__( self, app: BACnetApplication, *, scan_interval: float = 1.0, ) -> None: self._app = app self._scan_interval = scan_interval self._task: asyncio.Task[None] | None = None # Keyed by (object_type, instance_number) for both enrollment and intrinsic self._contexts: dict[tuple[int, int], _EnrollmentContext] = {} # Track fire-and-forget confirmed notification tasks to prevent GC self._pending_confirmed_tasks: set[asyncio.Task[None]] = set() # --- Lifecycle ---
[docs] async def start(self) -> None: """Start the periodic evaluation loop.""" if self._task is not None: return self._task = asyncio.create_task(self._run_loop())
[docs] async def stop(self) -> None: """Stop the evaluation loop and clean up.""" if self._task is not None: self._task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._task self._task = None # Cancel pending confirmed notification tasks to release memory for task in self._pending_confirmed_tasks: if not task.done(): task.cancel() self._pending_confirmed_tasks.clear() self._contexts.clear()
# --- Main loop --- async def _run_loop(self) -> None: """Periodically evaluate all enrollments and intrinsic objects.""" try: while True: self._evaluate_cycle() await asyncio.sleep(self._scan_interval) except asyncio.CancelledError: return def _evaluate_cycle(self) -> None: """Run one evaluation cycle across all monitored objects.""" now = time.monotonic() db = self._app.object_db # 1. Evaluate EventEnrollment objects for obj in db.get_objects_of_type(ObjectType.EVENT_ENROLLMENT): self._evaluate_enrollment(obj, now) # 2. Evaluate intrinsic-reporting objects for bac_obj in db.values(): if bac_obj.INTRINSIC_EVENT_ALGORITHM is not None: self._evaluate_intrinsic(bac_obj, now) # --- Enrollment-based evaluation --- def _evaluate_enrollment(self, enrollment: BACnetObject, now: float) -> None: """Evaluate a single EventEnrollment object.""" logger.debug("evaluating EventEnrollment for %s", enrollment.object_identifier) # Check event_detection_enable detection_enable = self._read_prop(enrollment, PropertyIdentifier.EVENT_DETECTION_ENABLE) if detection_enable is False: return # Get or create context key = ( int(enrollment.object_identifier.object_type), enrollment.object_identifier.instance_number, ) ctx = self._contexts.get(key) if ctx is None: ctx = _EnrollmentContext() self._contexts[key] = ctx # Sync state machine settings each cycle in case they changed self._sync_state_machine(ctx.state_machine, enrollment) # Read monitored property monitored_value = self._read_monitored_property(enrollment) if monitored_value is _SENTINEL: return # Cannot read; skip this cycle # Run fault evaluation fault_result = self._evaluate_enrollment_fault(enrollment, monitored_value) # Run event algorithm event_type = self._read_prop(enrollment, PropertyIdentifier.EVENT_TYPE) event_result = self._run_event_algorithm( event_type, monitored_value, enrollment, ctx.state_machine.event_state ) # Check Event_Algorithm_Inhibit (Clause 13.2.2, p.638) algorithm_inhibit = self._read_prop(enrollment, PropertyIdentifier.EVENT_ALGORITHM_INHIBIT) if algorithm_inhibit is True: event_result = EventState.NORMAL # Feed to state machine transition = ctx.state_machine.evaluate(event_result, fault_result, now) if transition is not None: logger.info( "event state transition %s: %s -> %s", enrollment.object_identifier, transition.from_state, transition.to_state, ) self._dispatch_notification(enrollment, transition, event_type, fault_result) def _read_monitored_property(self, enrollment: BACnetObject) -> Any: """Read the property referenced by an enrollment's Object_Property_Reference.""" ref = self._read_prop(enrollment, PropertyIdentifier.OBJECT_PROPERTY_REFERENCE) if ref is None: return _SENTINEL db = self._app.object_db target_obj = db.get(ref.object_identifier) if target_obj is None: return _SENTINEL prop_id = ref.property_identifier try: return target_obj.read_property(prop_id, ref.property_array_index) except Exception: logger.debug( "Failed to read monitored property %s on %s", prop_id, ref.object_identifier, exc_info=True, ) return _SENTINEL @staticmethod def _evaluate_enrollment_fault( enrollment: BACnetObject, monitored_value: Any, ) -> Reliability: """Run fault evaluation for an enrollment object.""" # Check Reliability_Evaluation_Inhibit (Clause 13.2.2, p.638) inhibit = enrollment._properties.get(PropertyIdentifier.RELIABILITY_EVALUATION_INHIBIT) if inhibit is True: return Reliability.NO_FAULT_DETECTED # Check reliability of the enrollment object itself reliability = enrollment._properties.get(PropertyIdentifier.RELIABILITY) if isinstance(reliability, Reliability) and reliability != Reliability.NO_FAULT_DETECTED: return reliability return Reliability.NO_FAULT_DETECTED # --- Intrinsic reporting evaluation --- def _evaluate_intrinsic(self, obj: BACnetObject, now: float) -> None: """Evaluate an intrinsic-reporting object.""" logger.debug("evaluating intrinsic reporting for %s", obj.object_identifier) # Check event_detection_enable if present detection_enable = obj._properties.get(PropertyIdentifier.EVENT_DETECTION_ENABLE) if detection_enable is False: return key = (int(obj.object_identifier.object_type), obj.object_identifier.instance_number) ctx = self._contexts.get(key) if ctx is None: ctx = _EnrollmentContext() self._contexts[key] = ctx self._sync_intrinsic_state_machine(ctx.state_machine, obj) # Read present value try: present_value = obj.read_property(PropertyIdentifier.PRESENT_VALUE) except Exception: return # Fault: check reliability property reliability = obj._properties.get( PropertyIdentifier.RELIABILITY, Reliability.NO_FAULT_DETECTED ) fault_result = reliability # Check Reliability_Evaluation_Inhibit (Clause 13.2.2, p.638) rel_inhibit = obj._properties.get(PropertyIdentifier.RELIABILITY_EVALUATION_INHIBIT) if rel_inhibit is True: fault_result = Reliability.NO_FAULT_DETECTED # Run intrinsic event algorithm event_type = obj.INTRINSIC_EVENT_ALGORITHM event_result = self._run_intrinsic_algorithm(event_type, present_value, obj, ctx) # Check Event_Algorithm_Inhibit (Clause 13.2.2, p.638) algorithm_inhibit = obj._properties.get(PropertyIdentifier.EVENT_ALGORITHM_INHIBIT) if algorithm_inhibit is True: event_result = EventState.NORMAL transition = ctx.state_machine.evaluate(event_result, fault_result, now) if transition is not None: logger.info( "event state transition %s: %s -> %s", obj.object_identifier, transition.from_state, transition.to_state, ) self._dispatch_intrinsic_notification(obj, transition, event_type, fault_result) def _run_intrinsic_algorithm( self, event_type: EventType | None, present_value: Any, obj: BACnetObject, ctx: _EnrollmentContext, ) -> EventState | None: """Run the intrinsic event algorithm for an object.""" if event_type == EventType.OUT_OF_RANGE: high_limit = obj._properties.get(PropertyIdentifier.HIGH_LIMIT) low_limit = obj._properties.get(PropertyIdentifier.LOW_LIMIT) deadband = obj._properties.get(PropertyIdentifier.DEADBAND, 0.0) if high_limit is None or low_limit is None: return None # Check limit_enable limit_enable = obj._properties.get(PropertyIdentifier.LIMIT_ENABLE) if limit_enable is not None: # limit_enable is a BitString or list: [high_limit_enable, low_limit_enable] bits = _limit_enable_bits(limit_enable) effective_high = high_limit if bits[0] else float("inf") effective_low = low_limit if bits[1] else float("-inf") else: effective_high = high_limit effective_low = low_limit return evaluate_out_of_range( float(present_value), effective_high, effective_low, float(deadband), current_state=ctx.state_machine.event_state, ) if event_type == EventType.CHANGE_OF_STATE: alarm_values = obj._properties.get(PropertyIdentifier.ALARM_VALUES) if alarm_values is None: # Try ALARM_VALUE (singular) for binary objects alarm_value = obj._properties.get(PropertyIdentifier.ALARM_VALUE) if alarm_value is None: return None alarm_values = (alarm_value,) if not isinstance(alarm_values, tuple): alarm_values = tuple(alarm_values) return evaluate_change_of_state(present_value, alarm_values) if event_type == EventType.UNSIGNED_RANGE: high_limit = obj._properties.get(PropertyIdentifier.HIGH_LIMIT) low_limit = obj._properties.get(PropertyIdentifier.LOW_LIMIT) if high_limit is None or low_limit is None: return None return evaluate_unsigned_range(int(present_value), int(high_limit), int(low_limit)) if event_type == EventType.FLOATING_LIMIT: setpoint = obj._properties.get(PropertyIdentifier.SETPOINT) error_limit = obj._properties.get(PropertyIdentifier.ERROR_LIMIT) deadband = obj._properties.get(PropertyIdentifier.DEADBAND, 0.0) if setpoint is None or error_limit is None: return None # Per Clause 13.3.5: for Loop, error_limit is used symmetrically # as both high_diff_limit and low_diff_limit return evaluate_floating_limit( float(present_value), float(setpoint), float(error_limit), float(error_limit), float(deadband), current_state=ctx.state_machine.event_state, ) if event_type == EventType.CHANGE_OF_LIFE_SAFETY: tracking_value = obj._properties.get(PropertyIdentifier.TRACKING_VALUE) if tracking_value is None: tracking_value = present_value mode = obj._properties.get(PropertyIdentifier.MODE, 0) alarm_values = obj._properties.get(PropertyIdentifier.ALARM_VALUES, ()) life_safety_alarm_values = obj._properties.get( PropertyIdentifier.LIFE_SAFETY_ALARM_VALUES, () ) alarm_values = tuple(int(v) for v in alarm_values) life_safety_alarm_values = obj._properties.get( PropertyIdentifier.LIFE_SAFETY_ALARM_VALUES, () ) life_safety_alarm_values = tuple(int(v) for v in life_safety_alarm_values) return evaluate_change_of_life_safety( tracking_value, int(mode), alarm_values, life_safety_alarm_values ) return None # --- Event algorithm dispatch --- def _run_event_algorithm( self, event_type: Any, monitored_value: Any, enrollment: BACnetObject, current_state: EventState, ) -> EventState | None: """Dispatch to the appropriate event algorithm evaluator.""" params = self._read_prop(enrollment, PropertyIdentifier.EVENT_PARAMETERS) if event_type == EventType.OUT_OF_RANGE and isinstance(params, dict): return evaluate_out_of_range( float(monitored_value), params.get("high_limit", float("inf")), params.get("low_limit", float("-inf")), params.get("deadband", 0.0), current_state=current_state, ) if event_type == EventType.CHANGE_OF_STATE and isinstance(params, dict): alarm_values = params.get("alarm_values", ()) if not isinstance(alarm_values, tuple): alarm_values = tuple(alarm_values) return evaluate_change_of_state(monitored_value, alarm_values) if event_type == EventType.CHANGE_OF_BITSTRING and isinstance(params, dict): bitmask = params.get("bitmask", ()) alarm_values = params.get("alarm_values", ()) if not isinstance(bitmask, tuple): bitmask = tuple(bitmask) if not isinstance(alarm_values, tuple): alarm_values = tuple( tuple(v) if not isinstance(v, tuple) else v for v in alarm_values ) return evaluate_change_of_bitstring(monitored_value, bitmask, alarm_values) if event_type == EventType.CHANGE_OF_VALUE and isinstance(params, dict): prev = params.get("previous_value", monitored_value) increment = params.get("cov_increment", 0.0) return evaluate_change_of_value(float(monitored_value), float(prev), float(increment)) if event_type == EventType.COMMAND_FAILURE and isinstance(params, dict): feedback = params.get("feedback_value") return evaluate_command_failure(feedback, monitored_value) if event_type == EventType.FLOATING_LIMIT and isinstance(params, dict): return evaluate_floating_limit( float(monitored_value), params.get("setpoint", 0.0), params.get("high_diff_limit", 0.0), params.get("low_diff_limit", 0.0), params.get("deadband", 0.0), current_state=current_state, ) if event_type == EventType.CHANGE_OF_LIFE_SAFETY and isinstance(params, dict): alarm_values = params.get("alarm_values", ()) life_safety_alarm_values = params.get("life_safety_alarm_values", ()) if not isinstance(alarm_values, tuple): alarm_values = tuple(alarm_values) if not isinstance(life_safety_alarm_values, tuple): life_safety_alarm_values = tuple(life_safety_alarm_values) mode = params.get("mode", 0) return evaluate_change_of_life_safety( monitored_value, mode, alarm_values, life_safety_alarm_values ) if event_type == EventType.EXTENDED and isinstance(params, dict): vendor_callback = params.get("vendor_callback") return evaluate_extended(monitored_value, params, vendor_callback=vendor_callback) if event_type == EventType.BUFFER_READY and isinstance(params, dict): previous_count = params.get("previous_count", 0) notification_threshold = params.get("notification_threshold", 1) return evaluate_buffer_ready( int(monitored_value), previous_count, notification_threshold ) if event_type == EventType.UNSIGNED_RANGE and isinstance(params, dict): return evaluate_unsigned_range( int(monitored_value), params.get("high_limit", 0), params.get("low_limit", 0), ) if event_type == EventType.ACCESS_EVENT and isinstance(params, dict): access_event_list = params.get("access_event_list", ()) if not isinstance(access_event_list, tuple): access_event_list = tuple(access_event_list) return evaluate_access_event(int(monitored_value), access_event_list) if event_type == EventType.DOUBLE_OUT_OF_RANGE and isinstance(params, dict): return evaluate_double_out_of_range( float(monitored_value), params.get("high_limit", float("inf")), params.get("low_limit", float("-inf")), params.get("deadband", 0.0), current_state=current_state, ) if event_type == EventType.SIGNED_OUT_OF_RANGE and isinstance(params, dict): return evaluate_signed_out_of_range( int(monitored_value), params.get("high_limit", 0), params.get("low_limit", 0), params.get("deadband", 0), current_state=current_state, ) if event_type == EventType.UNSIGNED_OUT_OF_RANGE and isinstance(params, dict): return evaluate_unsigned_out_of_range( int(monitored_value), params.get("high_limit", 0), params.get("low_limit", 0), params.get("deadband", 0), current_state=current_state, ) if event_type == EventType.CHANGE_OF_CHARACTERSTRING and isinstance(params, dict): alarm_values = params.get("alarm_values", ()) if not isinstance(alarm_values, tuple): alarm_values = tuple(alarm_values) return evaluate_change_of_characterstring(str(monitored_value), alarm_values) if event_type == EventType.CHANGE_OF_STATUS_FLAGS and isinstance(params, dict): current_flags = monitored_value previous_flags = params.get("previous_flags", ()) selected_flags = params.get("selected_flags", ()) if not isinstance(current_flags, tuple): current_flags = tuple(current_flags) if not isinstance(previous_flags, tuple): previous_flags = tuple(previous_flags) if not isinstance(selected_flags, tuple): selected_flags = tuple(selected_flags) return evaluate_change_of_status_flags(current_flags, previous_flags, selected_flags) if event_type == EventType.CHANGE_OF_RELIABILITY and isinstance(params, dict): reliability = monitored_value if not isinstance(reliability, Reliability): reliability = Reliability(int(reliability)) return evaluate_change_of_reliability(reliability) if event_type == EventType.CHANGE_OF_TIMER and isinstance(params, dict): alarm_values = params.get("alarm_values", ()) if not isinstance(alarm_values, tuple): alarm_values = tuple(alarm_values) return evaluate_change_of_timer(monitored_value, alarm_values) if event_type == EventType.CHANGE_OF_DISCRETE_VALUE and isinstance(params, dict): previous_value = params.get("previous_value", monitored_value) return evaluate_change_of_discrete_value(monitored_value, previous_value) # Unsupported or no params -- no alarm return None # --- Notification dispatch --- def _dispatch_notification( self, enrollment: BACnetObject, transition: EventTransition, event_type: Any, fault_result: Reliability, ) -> None: """Build and send an EventNotificationRequest for an enrollment transition.""" from bac_py.services.event_notification import EventNotificationRequest from bac_py.types.constructed import BACnetTimeStamp notify_type = self._read_prop(enrollment, PropertyIdentifier.NOTIFY_TYPE) if notify_type is None: notify_type = NotifyType.ALARM notification_class_num = self._read_prop(enrollment, PropertyIdentifier.NOTIFICATION_CLASS) if notification_class_num is None: notification_class_num = 0 priority = self._get_priority(notification_class_num, transition.to_state) # Determine ack_required from notification class ack_required = self._get_ack_required(notification_class_num, transition.to_state) notification = EventNotificationRequest( process_identifier=0, initiating_device_identifier=self._app.device_object_identifier, event_object_identifier=enrollment.object_identifier, time_stamp=BACnetTimeStamp(choice=1, value=int(transition.timestamp)), notification_class=notification_class_num, priority=priority, event_type=event_type if isinstance(event_type, EventType) else EventType.CHANGE_OF_STATE, notify_type=notify_type, to_state=transition.to_state, ack_required=ack_required, from_state=transition.from_state, ) logger.debug("sending event notification for %s", enrollment.object_identifier) self._route_notification(notification, notification_class_num, transition.to_state) # Update event_time_stamps on the enrollment self._update_event_timestamps(enrollment, transition) def _dispatch_intrinsic_notification( self, obj: BACnetObject, transition: EventTransition, event_type: EventType | None, fault_result: Reliability, ) -> None: """Build and send an EventNotificationRequest for an intrinsic object.""" from bac_py.services.event_notification import EventNotificationRequest from bac_py.types.constructed import BACnetTimeStamp notify_type = obj._properties.get(PropertyIdentifier.NOTIFY_TYPE, NotifyType.ALARM) notification_class_num = obj._properties.get(PropertyIdentifier.NOTIFICATION_CLASS, 0) priority = self._get_priority(notification_class_num, transition.to_state) ack_required = self._get_ack_required(notification_class_num, transition.to_state) notification = EventNotificationRequest( process_identifier=0, initiating_device_identifier=self._app.device_object_identifier, event_object_identifier=obj.object_identifier, time_stamp=BACnetTimeStamp(choice=1, value=int(transition.timestamp)), notification_class=notification_class_num, priority=priority, event_type=event_type if event_type is not None else EventType.CHANGE_OF_STATE, notify_type=notify_type, to_state=transition.to_state, ack_required=ack_required, from_state=transition.from_state, ) logger.debug("sending event notification for %s", obj.object_identifier) self._route_notification(notification, notification_class_num, transition.to_state) # Update event_state on the object itself obj._properties[PropertyIdentifier.EVENT_STATE] = transition.to_state # Update event_time_stamps if present self._update_event_timestamps(obj, transition) def _route_notification( self, notification: Any, notification_class_num: int, to_state: EventState, ) -> None: """Route a notification through the NotificationClass recipient list. Per ASHRAE 135-2020 Clause 13.8: look up the NotificationClass object's ``RECIPIENT_LIST``, filter by current day-of-week and time window, and send to each matching recipient using confirmed or unconfirmed as specified. Falls back to an unconfirmed global broadcast if no NotificationClass exists or the recipient list is empty. """ from bac_py.types.primitives import ObjectIdentifier nc_oid = ObjectIdentifier(ObjectType.NOTIFICATION_CLASS, notification_class_num) nc_obj = self._app.object_db.get(nc_oid) recipients = None if nc_obj is not None: recipients = nc_obj._properties.get(PropertyIdentifier.RECIPIENT_LIST) if not recipients: # Fallback: unconfirmed global broadcast self._send_notification_unconfirmed(notification, destination=None) return # Filter recipients and send trans_idx = _transition_index(to_state) for dest in recipients: if not _recipient_matches(dest, trans_idx): continue if _dest_issue_confirmed(dest): self._send_notification_confirmed(notification, dest) else: address = _dest_address(dest) self._send_notification_unconfirmed(notification, destination=address) def _send_notification_unconfirmed(self, notification: Any, *, destination: Any) -> None: """Encode and send an unconfirmed event notification.""" from bac_py.types.enums import UnconfirmedServiceChoice try: encoded = notification.encode() except Exception: logger.debug("Failed to encode event notification", exc_info=True) return try: from bac_py.network.address import GLOBAL_BROADCAST dest = destination if destination is not None else GLOBAL_BROADCAST self._app.unconfirmed_request( destination=dest, service_choice=UnconfirmedServiceChoice.UNCONFIRMED_EVENT_NOTIFICATION, service_data=encoded, ) except Exception: logger.debug("Failed to send unconfirmed event notification", exc_info=True) def _send_notification_confirmed(self, notification: Any, dest: Any) -> None: """Encode and send a confirmed event notification. Because confirmed requests are async and the engine evaluation cycle is synchronous, we schedule the send as an asyncio task. """ try: encoded = notification.encode() except Exception: logger.debug("Failed to encode event notification", exc_info=True) return address = _dest_address(dest) if address is None: logger.debug("Cannot send confirmed notification: no address for recipient") return try: loop = asyncio.get_running_loop() _task = loop.create_task(self._send_confirmed_async(encoded, address)) # Store reference to prevent GC of the fire-and-forget task self._pending_confirmed_tasks.add(_task) _task.add_done_callback(self._pending_confirmed_tasks.discard) except RuntimeError: logger.debug("No running event loop for confirmed notification") async def _send_confirmed_async(self, encoded: bytes, address: Any) -> None: """Send a confirmed event notification asynchronously.""" from bac_py.types.enums import ConfirmedServiceChoice try: await self._app.confirmed_request( destination=address, service_choice=ConfirmedServiceChoice.CONFIRMED_EVENT_NOTIFICATION, service_data=encoded, ) except Exception: logger.debug("Failed to send confirmed event notification", exc_info=True) # --- Helper methods --- def _get_priority(self, notification_class_num: int, to_state: EventState) -> int: """Look up the priority from the NotificationClass object.""" from bac_py.types.primitives import ObjectIdentifier nc_oid = ObjectIdentifier(ObjectType.NOTIFICATION_CLASS, notification_class_num) nc_obj = self._app.object_db.get(nc_oid) if nc_obj is None: return 0 priorities = nc_obj._properties.get(PropertyIdentifier.PRIORITY) if not isinstance(priorities, list) or len(priorities) < 3: return 0 idx = _transition_index(to_state) return int(priorities[idx]) def _get_ack_required(self, notification_class_num: int, to_state: EventState) -> bool: """Look up ack_required from the NotificationClass object.""" from bac_py.types.primitives import ObjectIdentifier nc_oid = ObjectIdentifier(ObjectType.NOTIFICATION_CLASS, notification_class_num) nc_obj = self._app.object_db.get(nc_oid) if nc_obj is None: return False ack_req = nc_obj._properties.get(PropertyIdentifier.ACK_REQUIRED) if not isinstance(ack_req, list) or len(ack_req) < 3: return False idx = _transition_index(to_state) return bool(ack_req[idx]) @staticmethod def _sync_state_machine(sm: EventStateMachine, enrollment: BACnetObject) -> None: """Synchronize state machine settings from an enrollment object.""" event_enable = enrollment._properties.get(PropertyIdentifier.EVENT_ENABLE) if isinstance(event_enable, list) and len(event_enable) >= 3: sm.event_enable = list(event_enable[:3]) time_delay = enrollment._properties.get(PropertyIdentifier.TIME_DELAY) if isinstance(time_delay, (int, float)): sm.time_delay = float(time_delay) time_delay_normal = enrollment._properties.get(PropertyIdentifier.TIME_DELAY_NORMAL) if isinstance(time_delay_normal, (int, float)): sm.time_delay_normal = float(time_delay_normal) @staticmethod def _sync_intrinsic_state_machine(sm: EventStateMachine, obj: BACnetObject) -> None: """Synchronize state machine settings from an intrinsic-reporting object.""" event_enable = obj._properties.get(PropertyIdentifier.EVENT_ENABLE) if isinstance(event_enable, list) and len(event_enable) >= 3: sm.event_enable = list(event_enable[:3]) time_delay = obj._properties.get(PropertyIdentifier.TIME_DELAY) if isinstance(time_delay, (int, float)): sm.time_delay = float(time_delay) time_delay_normal = obj._properties.get(PropertyIdentifier.TIME_DELAY_NORMAL) if isinstance(time_delay_normal, (int, float)): sm.time_delay_normal = float(time_delay_normal) # Sync event_state from object to state machine on first load event_state = obj._properties.get(PropertyIdentifier.EVENT_STATE) if isinstance(event_state, EventState) and sm.event_state == EventState.NORMAL: sm.event_state = event_state @staticmethod def _update_event_timestamps( obj: BACnetObject, transition: EventTransition, ) -> None: """Update the event_time_stamps property on a transition.""" from bac_py.types.constructed import BACnetTimeStamp timestamps = obj._properties.get(PropertyIdentifier.EVENT_TIME_STAMPS) if not isinstance(timestamps, list) or len(timestamps) < 3: return idx = _transition_index(transition.to_state) timestamps[idx] = BACnetTimeStamp(choice=1, value=int(transition.timestamp)) @staticmethod def _read_prop(obj: BACnetObject, prop_id: PropertyIdentifier) -> Any: """Read a property safely, returning None on error.""" try: return obj.read_property(prop_id) except Exception: return None
# --------------------------------------------------------------------------- # Module-level helpers # --------------------------------------------------------------------------- _SENTINEL = object() def _transition_index(to_state: EventState) -> int: """Map a target EventState to its 3-element array index.""" if to_state == EventState.FAULT: return 1 if to_state == EventState.NORMAL: return 2 return 0 # offnormal / high_limit / low_limit / life_safety_alarm def _limit_enable_bits(limit_enable: Any) -> tuple[bool, bool]: """Extract (high_limit_enable, low_limit_enable) from a LimitEnable value.""" if isinstance(limit_enable, (list, tuple)) and len(limit_enable) >= 2: return (bool(limit_enable[0]), bool(limit_enable[1])) return (True, True) # --------------------------------------------------------------------------- # Recipient routing helpers (Clause 13.8) # --------------------------------------------------------------------------- def _recipient_matches(dest: Any, transition_index: int) -> bool: """Check if a BACnetDestination matches the current day/time and transition. :param dest: A :class:`BACnetDestination` object. :param transition_index: 0=to-offnormal, 1=to-fault, 2=to-normal. :returns: ``True`` if the recipient should receive this notification. """ from datetime import UTC, datetime from bac_py.types.constructed import BACnetDestination if not isinstance(dest, BACnetDestination): return True # Non-typed entry: send by default # Check transitions filter transitions = dest.transitions try: if not transitions[transition_index]: return False except (IndexError, TypeError): pass # No transitions filter or malformed: allow # Check day-of-week filter now = datetime.now(tz=UTC) # Python: Monday=0..Sunday=6; BACnet valid_days: Monday=bit0..Sunday=bit6 day_index = now.weekday() try: if not dest.valid_days[day_index]: return False except (IndexError, TypeError): pass # No day filter or malformed: allow # Check time window try: current_time_tuple = (now.hour, now.minute, now.second) from_t = dest.from_time to_t = dest.to_time from_tuple = ( from_t.hour if from_t.hour != 0xFF else 0, from_t.minute if from_t.minute != 0xFF else 0, from_t.second if from_t.second != 0xFF else 0, ) to_tuple = ( to_t.hour if to_t.hour != 0xFF else 23, to_t.minute if to_t.minute != 0xFF else 59, to_t.second if to_t.second != 0xFF else 59, ) if not (from_tuple <= current_time_tuple <= to_tuple): return False except (AttributeError, TypeError): pass # No time filter or malformed: allow return True def _dest_issue_confirmed(dest: Any) -> bool: """Return True if the destination requests confirmed notifications.""" try: return bool(dest.issue_confirmed_notifications) except AttributeError: return False def _dest_address(dest: Any) -> Any: """Extract the BACnetAddress from a BACnetDestination.""" try: recipient = dest.recipient if recipient.address is not None: return recipient.address if recipient.device is not None: # For device-type recipients, we'd need a device-to-address resolver. # Fall back to None (caller should use broadcast or skip). return None except AttributeError: return None return None