"""BACnet object base classes per ASHRAE 135-2016 Clause 12."""
from __future__ import annotations
import asyncio
import contextlib
import copy
import logging
from dataclasses import dataclass
from enum import IntEnum
from typing import TYPE_CHECKING, Any, ClassVar
if TYPE_CHECKING:
from collections.abc import Callable, Iterator
from bac_py.services.errors import BACnetError
from bac_py.types.constructed import BACnetValueSource, StatusFlags
from bac_py.types.enums import (
ErrorClass,
ErrorCode,
EventState,
EventType,
NotifyType,
ObjectType,
PropertyIdentifier,
Reliability,
)
from bac_py.types.primitives import BACnetDouble, BitString, ObjectIdentifier
logger = logging.getLogger(__name__)
[docs]
class PropertyAccess(IntEnum):
"""Property access mode."""
READ_ONLY = 0
READ_WRITE = 1
[docs]
@dataclass(frozen=True, slots=True)
class PropertyDefinition:
"""Metadata for a single BACnet property."""
identifier: PropertyIdentifier
"""The :class:`PropertyIdentifier` for this property."""
datatype: type
"""Expected Python type for the property value."""
access: PropertyAccess
"""Read-only or read-write access mode."""
required: bool
"""Whether the property is required by the BACnet standard."""
default: Any = None
"""Default value assigned on object creation, or ``None``."""
_STANDARD_PROPERTIES: dict[PropertyIdentifier, PropertyDefinition] | None = None
[docs]
def standard_properties() -> dict[PropertyIdentifier, PropertyDefinition]:
"""Return properties common to all BACnet objects (Clause 12.1).
Includes the required Object_Identifier/Object_Name/Object_Type
triad, the optional Description, and the required Property_List.
:returns: Mapping of :class:`PropertyIdentifier` to :class:`PropertyDefinition`.
"""
global _STANDARD_PROPERTIES
if _STANDARD_PROPERTIES is None:
_STANDARD_PROPERTIES = {
PropertyIdentifier.OBJECT_IDENTIFIER: PropertyDefinition(
PropertyIdentifier.OBJECT_IDENTIFIER,
ObjectIdentifier,
PropertyAccess.READ_ONLY,
required=True,
),
PropertyIdentifier.OBJECT_NAME: PropertyDefinition(
PropertyIdentifier.OBJECT_NAME,
str,
PropertyAccess.READ_WRITE,
required=True,
),
PropertyIdentifier.OBJECT_TYPE: PropertyDefinition(
PropertyIdentifier.OBJECT_TYPE,
ObjectType,
PropertyAccess.READ_ONLY,
required=True,
),
PropertyIdentifier.DESCRIPTION: PropertyDefinition(
PropertyIdentifier.DESCRIPTION,
str,
PropertyAccess.READ_WRITE,
required=False,
),
PropertyIdentifier.PROPERTY_LIST: PropertyDefinition(
PropertyIdentifier.PROPERTY_LIST,
list,
PropertyAccess.READ_ONLY,
required=True,
),
}
return _STANDARD_PROPERTIES
[docs]
def status_properties(
*,
event_state_required: bool = True,
reliability_required: bool = False,
reliability_default: Reliability | None = None,
include_out_of_service: bool = True,
) -> dict[PropertyIdentifier, PropertyDefinition]:
"""Return status monitoring properties shared by most objects (Clause 12).
Includes Status_Flags, Event_State, Reliability, and optionally
Out_Of_Service. Keyword arguments allow per-object-type overrides.
:param event_state_required: Whether Event_State is required.
:param reliability_required: Whether Reliability is required.
:param reliability_default: Default value for Reliability, or ``None``.
:param include_out_of_service: Whether to include the Out_Of_Service property.
:returns: Mapping of :class:`PropertyIdentifier` to :class:`PropertyDefinition`.
"""
props: dict[PropertyIdentifier, PropertyDefinition] = {
PropertyIdentifier.STATUS_FLAGS: PropertyDefinition(
PropertyIdentifier.STATUS_FLAGS,
StatusFlags,
PropertyAccess.READ_ONLY,
required=True,
),
PropertyIdentifier.EVENT_STATE: PropertyDefinition(
PropertyIdentifier.EVENT_STATE,
EventState,
PropertyAccess.READ_ONLY,
required=event_state_required,
default=EventState.NORMAL,
),
PropertyIdentifier.RELIABILITY: PropertyDefinition(
PropertyIdentifier.RELIABILITY,
Reliability,
PropertyAccess.READ_ONLY,
required=reliability_required,
default=reliability_default,
),
}
if include_out_of_service:
props[PropertyIdentifier.OUT_OF_SERVICE] = PropertyDefinition(
PropertyIdentifier.OUT_OF_SERVICE,
bool,
PropertyAccess.READ_WRITE,
required=True,
default=False,
)
return props
[docs]
def commandable_properties(
value_type: type,
default: Any,
*,
required: bool = True,
) -> dict[PropertyIdentifier, PropertyDefinition]:
"""Return properties for commandable objects (Clause 19).
:param value_type: Type of the Relinquish_Default value.
:param default: Default value for Relinquish_Default.
:param required: Whether commandable properties are required
(``True`` for Output objects, ``False`` for optionally commandable Values).
:returns: Mapping of :class:`PropertyIdentifier` to :class:`PropertyDefinition`.
"""
return {
PropertyIdentifier.PRIORITY_ARRAY: PropertyDefinition(
PropertyIdentifier.PRIORITY_ARRAY,
list,
PropertyAccess.READ_ONLY,
required=required,
),
PropertyIdentifier.RELINQUISH_DEFAULT: PropertyDefinition(
PropertyIdentifier.RELINQUISH_DEFAULT,
value_type,
PropertyAccess.READ_WRITE,
required=required,
default=default if required else None,
),
PropertyIdentifier.CURRENT_COMMAND_PRIORITY: PropertyDefinition(
PropertyIdentifier.CURRENT_COMMAND_PRIORITY,
int,
PropertyAccess.READ_ONLY,
required=required,
),
PropertyIdentifier.VALUE_SOURCE: PropertyDefinition(
PropertyIdentifier.VALUE_SOURCE,
BACnetValueSource,
PropertyAccess.READ_ONLY,
required=False,
),
PropertyIdentifier.VALUE_SOURCE_ARRAY: PropertyDefinition(
PropertyIdentifier.VALUE_SOURCE_ARRAY,
list,
PropertyAccess.READ_ONLY,
required=False,
),
PropertyIdentifier.LAST_COMMAND_TIME: PropertyDefinition(
PropertyIdentifier.LAST_COMMAND_TIME,
object,
PropertyAccess.READ_ONLY,
required=False,
),
PropertyIdentifier.COMMAND_TIME_ARRAY: PropertyDefinition(
PropertyIdentifier.COMMAND_TIME_ARRAY,
list,
PropertyAccess.READ_ONLY,
required=False,
),
}
[docs]
def intrinsic_reporting_properties(
*,
include_limit: bool = False,
) -> dict[PropertyIdentifier, PropertyDefinition]:
"""Return optional intrinsic reporting properties (Clause 12, Clause 13).
These properties enable alarm/event reporting without a separate
EventEnrollment object. All are optional -- objects opt in by
merging this dict into their PROPERTY_DEFINITIONS.
:param include_limit: If ``True``, include analog-specific limit detection
properties (High_Limit, Low_Limit, Deadband, Limit_Enable).
:returns: Mapping of :class:`PropertyIdentifier` to :class:`PropertyDefinition`.
"""
props: dict[PropertyIdentifier, PropertyDefinition] = {
PropertyIdentifier.TIME_DELAY: PropertyDefinition(
PropertyIdentifier.TIME_DELAY,
int,
PropertyAccess.READ_WRITE,
required=False,
),
PropertyIdentifier.NOTIFICATION_CLASS: PropertyDefinition(
PropertyIdentifier.NOTIFICATION_CLASS,
int,
PropertyAccess.READ_WRITE,
required=False,
),
PropertyIdentifier.EVENT_ENABLE: PropertyDefinition(
PropertyIdentifier.EVENT_ENABLE,
BitString,
PropertyAccess.READ_WRITE,
required=False,
),
PropertyIdentifier.ACKED_TRANSITIONS: PropertyDefinition(
PropertyIdentifier.ACKED_TRANSITIONS,
BitString,
PropertyAccess.READ_ONLY,
required=False,
),
PropertyIdentifier.NOTIFY_TYPE: PropertyDefinition(
PropertyIdentifier.NOTIFY_TYPE,
NotifyType,
PropertyAccess.READ_WRITE,
required=False,
),
PropertyIdentifier.EVENT_TIME_STAMPS: PropertyDefinition(
PropertyIdentifier.EVENT_TIME_STAMPS,
list,
PropertyAccess.READ_ONLY,
required=False,
),
PropertyIdentifier.EVENT_DETECTION_ENABLE: PropertyDefinition(
PropertyIdentifier.EVENT_DETECTION_ENABLE,
bool,
PropertyAccess.READ_WRITE,
required=False,
),
PropertyIdentifier.EVENT_MESSAGE_TEXTS: PropertyDefinition(
PropertyIdentifier.EVENT_MESSAGE_TEXTS,
list,
PropertyAccess.READ_ONLY,
required=False,
),
PropertyIdentifier.EVENT_MESSAGE_TEXTS_CONFIG: PropertyDefinition(
PropertyIdentifier.EVENT_MESSAGE_TEXTS_CONFIG,
list,
PropertyAccess.READ_WRITE,
required=False,
),
}
if include_limit:
props[PropertyIdentifier.HIGH_LIMIT] = PropertyDefinition(
PropertyIdentifier.HIGH_LIMIT,
float,
PropertyAccess.READ_WRITE,
required=False,
)
props[PropertyIdentifier.LOW_LIMIT] = PropertyDefinition(
PropertyIdentifier.LOW_LIMIT,
float,
PropertyAccess.READ_WRITE,
required=False,
)
props[PropertyIdentifier.DEADBAND] = PropertyDefinition(
PropertyIdentifier.DEADBAND,
float,
PropertyAccess.READ_WRITE,
required=False,
)
props[PropertyIdentifier.LIMIT_ENABLE] = PropertyDefinition(
PropertyIdentifier.LIMIT_ENABLE,
BitString,
PropertyAccess.READ_WRITE,
required=False,
)
return props
[docs]
class BACnetObject:
"""Base class for all BACnet objects.
Each subclass defines its property schema via class-level
PROPERTY_DEFINITIONS. Properties are stored in a dict and
accessed via typed read/write methods.
"""
OBJECT_TYPE: ClassVar[ObjectType]
PROPERTY_DEFINITIONS: ClassVar[dict[PropertyIdentifier, PropertyDefinition]]
INTRINSIC_EVENT_ALGORITHM: ClassVar[EventType | None] = None
"""Event algorithm for intrinsic reporting, or ``None`` if not supported."""
def __init__(self, instance_number: int, **initial_properties: Any) -> None:
"""Initialize a BACnet object with default and overridden properties.
:param instance_number: The BACnet instance number for this object.
:param initial_properties: Property values keyed by uppercase property
name (e.g., ``object_name="MyObject"``).
"""
self._object_id = ObjectIdentifier(self.OBJECT_TYPE, instance_number)
self._properties: dict[PropertyIdentifier, Any] = {}
self._priority_array: list[Any | None] | None = None
self._write_lock = asyncio.Lock()
self._object_db: ObjectDatabase | None = None
self._on_property_written: Callable[[PropertyIdentifier, Any, Any], None] | None = None
# Set defaults from property definitions.
# Use copy.copy() to prevent mutable defaults (e.g. lists) from
# being shared across instances of the same object type.
for prop_id, prop_def in self.PROPERTY_DEFINITIONS.items():
if prop_def.default is not None:
self._properties[prop_id] = copy.copy(prop_def.default)
self._properties[PropertyIdentifier.OBJECT_IDENTIFIER] = self._object_id
self._properties[PropertyIdentifier.OBJECT_TYPE] = self.OBJECT_TYPE
for key, value in initial_properties.items():
prop_id = PropertyIdentifier[key.upper()]
self._properties[prop_id] = value
@property
def object_identifier(self) -> ObjectIdentifier:
"""The :class:`ObjectIdentifier` for this object."""
return self._object_id
def _init_status_flags(self) -> None:
"""Initialize Status_Flags to a default :class:`StatusFlags` if not already set."""
from bac_py.types.constructed import _NORMAL_STATUS_FLAGS
self._set_default(PropertyIdentifier.STATUS_FLAGS, _NORMAL_STATUS_FLAGS)
def _set_default(self, prop_id: PropertyIdentifier, value: Any) -> None:
"""Set a property value only if it hasn't been set yet.
:param prop_id: The property to set.
:param value: The default value to assign.
"""
if prop_id not in self._properties:
self._properties[prop_id] = value
def _init_commandable(self, relinquish_default: Any) -> None:
"""Initialize the priority array for a commandable object.
:param relinquish_default: The value used when all priority slots
are relinquished.
"""
self._priority_array = [None] * 16
self._properties[PropertyIdentifier.PRIORITY_ARRAY] = self._priority_array
self._set_default(PropertyIdentifier.RELINQUISH_DEFAULT, relinquish_default)
# Value Source tracking (Clause 19.5, new in 2020)
none_src = BACnetValueSource.none_source()
self._value_source_array: list[BACnetValueSource] = [none_src] * 16
self._command_time_array: list[object | None] = [None] * 16
self._properties[PropertyIdentifier.VALUE_SOURCE] = none_src
self._properties[PropertyIdentifier.VALUE_SOURCE_ARRAY] = self._value_source_array
self._properties[PropertyIdentifier.LAST_COMMAND_TIME] = None
self._properties[PropertyIdentifier.COMMAND_TIME_ARRAY] = self._command_time_array
@staticmethod
def _coerce_value(prop_def: PropertyDefinition, value: Any) -> Any:
"""Coerce a value to the property's declared datatype if possible.
Handles two cases:
- IntEnum properties: plain ``int`` from wire decoding is coerced
to the declared IntEnum subclass (e.g. ``1`` -> ``BinaryPV.ACTIVE``).
- :class:`BACnetDouble` properties: plain ``float`` is wrapped in
:class:`BACnetDouble` so it encodes as Double (tag 5) instead of
Real (tag 4).
:param prop_def: The :class:`PropertyDefinition` describing the target type.
:param value: The value to coerce.
:returns: The coerced value, or *value* unchanged if coercion is not
applicable or it is already the correct type.
"""
if value is None:
return value
dtype = prop_def.datatype
if (
dtype is not None
and isinstance(value, int)
and not isinstance(value, IntEnum)
and not isinstance(value, bool)
and issubclass(dtype, IntEnum)
):
try:
return dtype(value)
except ValueError:
return value
if (
dtype is BACnetDouble
and isinstance(value, float)
and not isinstance(value, BACnetDouble)
):
return BACnetDouble(value)
return value
[docs]
def read_property(
self,
prop_id: PropertyIdentifier,
array_index: int | None = None,
) -> Any:
"""Read a property value.
:param prop_id: Property identifier to read.
:param array_index: Optional array index for array properties.
:returns: The property value.
:raises BACnetError: If the property is unknown or *array_index* is invalid.
"""
logger.debug("read %s.%s", self._object_id, prop_id)
if prop_id == PropertyIdentifier.PROPERTY_LIST:
return self._get_property_list()
if prop_id == PropertyIdentifier.CURRENT_COMMAND_PRIORITY:
return self._get_current_command_priority()
if prop_id == PropertyIdentifier.STATUS_FLAGS:
return self._get_status_flags()
if prop_id not in self.PROPERTY_DEFINITIONS:
logger.warning("property not found: %s.%s", self._object_id, prop_id)
raise BACnetError(ErrorClass.PROPERTY, ErrorCode.UNKNOWN_PROPERTY)
value = self._properties.get(prop_id)
if value is None and self.PROPERTY_DEFINITIONS[prop_id].required:
raise BACnetError(ErrorClass.PROPERTY, ErrorCode.VALUE_NOT_INITIALIZED)
if array_index is not None:
if isinstance(value, (list, tuple)):
if array_index == 0:
return len(value)
if 1 <= array_index <= len(value):
return value[array_index - 1]
raise BACnetError(ErrorClass.PROPERTY, ErrorCode.INVALID_ARRAY_INDEX)
raise BACnetError(ErrorClass.PROPERTY, ErrorCode.PROPERTY_IS_NOT_AN_ARRAY)
return value
[docs]
def write_property(
self,
prop_id: PropertyIdentifier,
value: Any,
priority: int | None = None,
array_index: int | None = None,
) -> None:
"""Write a property value.
:param prop_id: Property identifier to write.
:param value: Value to write.
:param priority: Optional priority for commandable properties (1-16).
:param array_index: Optional array index for array properties.
:raises BACnetError: If the property is unknown, read-only, or
*priority* / *array_index* is invalid.
"""
logger.debug("write %s.%s", self._object_id, prop_id)
prop_def = self.PROPERTY_DEFINITIONS.get(prop_id)
if prop_def is None:
logger.warning("property not found: %s.%s", self._object_id, prop_id)
raise BACnetError(ErrorClass.PROPERTY, ErrorCode.UNKNOWN_PROPERTY)
if prop_def.access == PropertyAccess.READ_ONLY and not (
# Present_Value is writable when Out_Of_Service is TRUE (Clause 12)
prop_id == PropertyIdentifier.PRESENT_VALUE
and self._properties.get(PropertyIdentifier.OUT_OF_SERVICE) is True
):
raise BACnetError(ErrorClass.PROPERTY, ErrorCode.WRITE_ACCESS_DENIED)
# Object_Name uniqueness enforcement (Clause 12.1.5)
if prop_id == PropertyIdentifier.OBJECT_NAME and self._object_db is not None:
self._object_db.validate_name_unique(value, exclude=self._object_id)
old_name = self._properties.get(PropertyIdentifier.OBJECT_NAME)
self._object_db._update_name_index(self._object_id, old_name, value)
self._object_db._increment_database_revision()
value = self._coerce_value(prop_def, value)
old_value = self._properties.get(prop_id)
if self._is_commandable(prop_id):
effective_priority = priority if priority is not None else 16
self._write_with_priority(prop_id, value, effective_priority)
elif array_index is not None:
self._write_array_element(prop_id, value, array_index)
else:
self._properties[prop_id] = value
# Fire write-change callback if registered (Annex A2)
new_value = self._properties.get(prop_id)
if self._on_property_written is not None and old_value != new_value:
self._on_property_written(prop_id, old_value, new_value)
[docs]
async def async_write_property(
self,
prop_id: PropertyIdentifier,
value: Any,
priority: int | None = None,
array_index: int | None = None,
) -> None:
"""Write a property value with concurrency protection.
Uses an :class:`asyncio.Lock` to serialize writes to this object.
:param prop_id: Property identifier to write.
:param value: Value to write.
:param priority: Optional priority for commandable properties (1-16).
:param array_index: Optional array index for array properties.
"""
async with self._write_lock:
self.write_property(prop_id, value, priority, array_index)
# Properties excluded from Property_List per Clause 12 / 12.11.
_PROPERTY_LIST_EXCLUSIONS: ClassVar[frozenset[PropertyIdentifier]] = frozenset(
{
PropertyIdentifier.OBJECT_IDENTIFIER,
PropertyIdentifier.OBJECT_NAME,
PropertyIdentifier.OBJECT_TYPE,
PropertyIdentifier.PROPERTY_LIST,
}
)
def _get_property_list(self) -> list[PropertyIdentifier]:
"""Return the list of all properties present on this object.
Per the BACnet standard, Property_List shall not include
Object_Identifier, Object_Name, Object_Type, or Property_List.
:returns: List of :class:`PropertyIdentifier` values excluding the
standard triad and Property_List itself.
"""
result = [
pid
for pid in self.PROPERTY_DEFINITIONS
if pid not in self._PROPERTY_LIST_EXCLUSIONS
and (pid in self._properties or self.PROPERTY_DEFINITIONS[pid].required)
]
# Current_Command_Priority is a computed property (not stored in
# _properties) that must appear in Property_List when the object
# is commandable.
if (
self._priority_array is not None
and PropertyIdentifier.CURRENT_COMMAND_PRIORITY in self.PROPERTY_DEFINITIONS
and PropertyIdentifier.CURRENT_COMMAND_PRIORITY not in result
):
result.append(PropertyIdentifier.CURRENT_COMMAND_PRIORITY)
return result
def _get_current_command_priority(self) -> int | None:
"""Return the active command priority level (Clause 19.5).
Scans the priority array from highest (1) to lowest (16) and returns
the index of the first non-null slot, or ``None`` if all slots are
relinquished.
:returns: The active priority level (1-16), or ``None``.
:raises BACnetError: If the object is not commandable.
"""
if self._priority_array is None:
raise BACnetError(ErrorClass.PROPERTY, ErrorCode.UNKNOWN_PROPERTY)
for i, slot in enumerate(self._priority_array):
if slot is not None:
return i + 1
return None
def _get_status_flags(self) -> StatusFlags:
"""Return :class:`StatusFlags` computed from related properties (Clause 12).
IN_ALARM is ``True`` when Event_State is not NORMAL.
FAULT is ``True`` when Reliability is present and not NO_FAULT_DETECTED.
OVERRIDDEN is preserved from the stored value (hardware override).
OUT_OF_SERVICE is read from the stored property.
:returns: Computed :class:`StatusFlags` instance.
:raises BACnetError: If the object does not define Status_Flags.
"""
if PropertyIdentifier.STATUS_FLAGS not in self.PROPERTY_DEFINITIONS:
raise BACnetError(ErrorClass.PROPERTY, ErrorCode.UNKNOWN_PROPERTY)
# IN_ALARM: Event_State != NORMAL
event_state = self._properties.get(PropertyIdentifier.EVENT_STATE)
in_alarm = event_state is not None and event_state != EventState.NORMAL
# FAULT: Reliability present and not NO_FAULT_DETECTED
reliability = self._properties.get(PropertyIdentifier.RELIABILITY)
fault = reliability is not None and reliability != Reliability.NO_FAULT_DETECTED
# OUT_OF_SERVICE: direct property read
out_of_service = bool(self._properties.get(PropertyIdentifier.OUT_OF_SERVICE, False))
# OVERRIDDEN: preserve stored value (hardware override, not computable)
stored = self._properties.get(PropertyIdentifier.STATUS_FLAGS)
overridden = stored.overridden if isinstance(stored, StatusFlags) else False
if not (in_alarm or fault or overridden or out_of_service):
from bac_py.types.constructed import _NORMAL_STATUS_FLAGS
return _NORMAL_STATUS_FLAGS
return StatusFlags(
in_alarm=in_alarm,
fault=fault,
overridden=overridden,
out_of_service=out_of_service,
)
def _is_commandable(self, prop_id: PropertyIdentifier) -> bool:
"""Check if a property supports command prioritization (Clause 19.2).
:param prop_id: The property to check.
:returns: ``True`` if *prop_id* is Present_Value and the object has a
priority array.
"""
return prop_id == PropertyIdentifier.PRESENT_VALUE and self._priority_array is not None
def _write_with_priority(
self,
prop_id: PropertyIdentifier,
value: Any,
priority: int,
value_source: BACnetValueSource | None = None,
) -> None:
"""Write to a commandable property using the priority array.
BACnet priority 1 = highest, 16 = lowest.
Priority 6 is reserved for Minimum On/Off time objects
(Clause 19.2.3) -- writes at priority 6 are rejected when
the object defines Minimum_On_Time or Minimum_Off_Time.
:param prop_id: The commandable property identifier to write.
:param value: The value to write, or ``None`` to relinquish.
:param priority: Priority level (1-16).
:param value_source: Optional source info for the write (Clause 19.5).
:raises BACnetError: If *priority* is out of range or reserved.
"""
if priority < 1 or priority > 16:
raise BACnetError(ErrorClass.SERVICES, ErrorCode.PARAMETER_OUT_OF_RANGE)
if priority == 6 and (
PropertyIdentifier.MINIMUM_ON_TIME in self.PROPERTY_DEFINITIONS
or PropertyIdentifier.MINIMUM_OFF_TIME in self.PROPERTY_DEFINITIONS
):
raise BACnetError(ErrorClass.PROPERTY, ErrorCode.WRITE_ACCESS_DENIED)
if self._priority_array is None:
self._priority_array = [None] * 16
idx = priority - 1
if value is None:
self._priority_array[idx] = None
else:
self._priority_array[idx] = value
# Update Value Source tracking (Clause 19.5)
if hasattr(self, "_value_source_array"):
src = value_source or BACnetValueSource.none_source()
if value is None:
self._value_source_array[idx] = BACnetValueSource.none_source()
self._command_time_array[idx] = None
else:
self._value_source_array[idx] = src
self._command_time_array[idx] = None # timestamp set by caller
# Present Value = highest priority non-None value, or relinquish default
winning_priority = None
for i, pv in enumerate(self._priority_array):
if pv is not None:
self._properties[prop_id] = pv
winning_priority = i
break
else:
self._properties[prop_id] = self._properties.get(PropertyIdentifier.RELINQUISH_DEFAULT)
# Update current value source from winning priority slot
if hasattr(self, "_value_source_array"):
if winning_priority is not None:
self._properties[PropertyIdentifier.VALUE_SOURCE] = self._value_source_array[
winning_priority
]
self._properties[PropertyIdentifier.LAST_COMMAND_TIME] = self._command_time_array[
winning_priority
]
else:
self._properties[PropertyIdentifier.VALUE_SOURCE] = BACnetValueSource.none_source()
self._properties[PropertyIdentifier.LAST_COMMAND_TIME] = None
def _write_array_element(
self,
prop_id: PropertyIdentifier,
value: Any,
array_index: int,
) -> None:
"""Write to a specific element of an array property.
:param prop_id: The array property identifier.
:param value: The value to write at the given index.
:param array_index: 1-based index into the array.
:raises BACnetError: If the property is not an array or *array_index*
is out of range.
"""
current = self._properties.get(prop_id)
if not isinstance(current, list):
raise BACnetError(ErrorClass.PROPERTY, ErrorCode.PROPERTY_IS_NOT_AN_ARRAY)
if array_index < 1 or array_index > len(current):
raise BACnetError(ErrorClass.PROPERTY, ErrorCode.INVALID_ARRAY_INDEX)
current[array_index - 1] = value
[docs]
class ObjectDatabase:
"""Container for all BACnet objects in a device.
Enforces Object_Name uniqueness per Clause 12.1.5.
"""
def __init__(self) -> None:
self._objects: dict[ObjectIdentifier, BACnetObject] = {}
self._names: dict[str, ObjectIdentifier] = {}
self._type_index: dict[ObjectType, dict[ObjectIdentifier, BACnetObject]] = {}
self._device_obj: BACnetObject | None = None
self._change_callbacks: dict[
tuple[ObjectIdentifier, PropertyIdentifier],
list[Callable[[PropertyIdentifier, Any, Any], None]],
] = {}
[docs]
def add(self, obj: BACnetObject) -> None:
"""Add an object to the database.
:param obj: The :class:`BACnetObject` to register.
:raises BACnetError: If an object with the same identifier or name
already exists.
"""
if obj.object_identifier in self._objects:
logger.warning("object already exists: %s", obj.object_identifier)
raise BACnetError(ErrorClass.OBJECT, ErrorCode.OBJECT_IDENTIFIER_ALREADY_EXISTS)
name = obj._properties.get(PropertyIdentifier.OBJECT_NAME)
if name is not None and name in self._names:
raise BACnetError(ErrorClass.OBJECT, ErrorCode.DUPLICATE_NAME)
self._objects[obj.object_identifier] = obj
self._type_index.setdefault(obj.object_identifier.object_type, {})[
obj.object_identifier
] = obj
if name is not None:
self._names[name] = obj.object_identifier
if obj.object_identifier.object_type == ObjectType.DEVICE:
self._device_obj = obj
obj._object_db = self
self._increment_database_revision()
logger.info("object added: %s", obj.object_identifier)
[docs]
def remove(self, object_id: ObjectIdentifier) -> None:
"""Remove an object from the database.
:param object_id: Identifier of the object to remove.
:raises BACnetError: If the object does not exist or is a Device object.
"""
if object_id not in self._objects:
logger.warning("object not found: %s", object_id)
raise BACnetError(ErrorClass.OBJECT, ErrorCode.UNKNOWN_OBJECT)
if object_id.object_type == ObjectType.DEVICE:
raise BACnetError(ErrorClass.OBJECT, ErrorCode.OBJECT_DELETION_NOT_PERMITTED)
obj = self._objects[object_id]
name = obj._properties.get(PropertyIdentifier.OBJECT_NAME)
if name is not None and self._names.get(name) == object_id:
del self._names[name]
type_bucket = self._type_index.get(object_id.object_type)
if type_bucket is not None:
type_bucket.pop(object_id, None)
if not type_bucket:
del self._type_index[object_id.object_type]
obj._object_db = None
del self._objects[object_id]
self._increment_database_revision()
logger.info("object removed: %s", object_id)
[docs]
def validate_name_unique(self, name: str, exclude: ObjectIdentifier | None = None) -> None:
"""Check that a name is unique within the database.
:param name: The object name to check.
:param exclude: Object identifier to exclude (for rename operations).
:raises BACnetError: If *name* is already in use by another object.
"""
existing = self._names.get(name)
if existing is not None and existing != exclude:
raise BACnetError(ErrorClass.PROPERTY, ErrorCode.DUPLICATE_NAME)
def _update_name_index(
self,
object_id: ObjectIdentifier,
old_name: str | None,
new_name: str,
) -> None:
"""Update the name-to-identifier index after a rename.
:param object_id: The identifier of the renamed object.
:param old_name: The previous name, or ``None`` if not yet set.
:param new_name: The new name to register.
"""
if old_name is not None and self._names.get(old_name) == object_id:
del self._names[old_name]
self._names[new_name] = object_id
def _increment_database_revision(self) -> None:
"""Increment Database_Revision on the Device object (Clause 12.11.23).
Called when configuration changes: object add/remove, name changes.
"""
obj = self._device_obj
if obj is not None:
current = obj._properties.get(PropertyIdentifier.DATABASE_REVISION, 0)
obj._properties[PropertyIdentifier.DATABASE_REVISION] = current + 1
[docs]
def register_change_callback(
self,
object_id: ObjectIdentifier,
prop_id: PropertyIdentifier,
callback: Callable[[PropertyIdentifier, Any, Any], None],
) -> None:
"""Register a callback fired when a property value changes.
The callback receives ``(prop_id, old_value, new_value)`` after
a successful write that changes the value.
:param object_id: Target object identifier.
:param prop_id: Property identifier to monitor.
:param callback: Function to call on value change.
"""
key = (object_id, prop_id)
cbs = self._change_callbacks.setdefault(key, [])
if len(cbs) >= 100:
msg = f"Too many change callbacks for {object_id}/{prop_id} (max 100)"
raise ValueError(msg)
cbs.append(callback)
# Wire the object's write notification so it fans out to our callbacks
obj = self._objects.get(object_id)
if obj is not None and obj._on_property_written is None:
obj._on_property_written = self._make_write_notifier(object_id)
[docs]
def unregister_change_callback(
self,
object_id: ObjectIdentifier,
prop_id: PropertyIdentifier,
callback: Callable[[PropertyIdentifier, Any, Any], None],
) -> None:
"""Remove a previously registered change callback.
:param object_id: Target object identifier.
:param prop_id: Property identifier.
:param callback: The callback to remove.
"""
key = (object_id, prop_id)
cbs = self._change_callbacks.get(key)
if cbs is not None:
with contextlib.suppress(ValueError):
cbs.remove(callback)
if not cbs:
del self._change_callbacks[key]
def _make_write_notifier(
self, object_id: ObjectIdentifier
) -> Callable[[PropertyIdentifier, Any, Any], None]:
"""Create a write notification function for a specific object.
:param object_id: The object to create the notifier for.
:returns: A callback suitable for ``_on_property_written``.
"""
def _notify(prop_id: PropertyIdentifier, old_value: Any, new_value: Any) -> None:
key = (object_id, prop_id)
for cb in self._change_callbacks.get(key, []):
with contextlib.suppress(Exception):
cb(prop_id, old_value, new_value)
return _notify
[docs]
def get(self, object_id: ObjectIdentifier) -> BACnetObject | None:
"""Retrieve an object by its identifier.
:param object_id: The :class:`ObjectIdentifier` to look up.
:returns: The :class:`BACnetObject`, or ``None`` if not found.
"""
return self._objects.get(object_id)
[docs]
def get_objects_of_type(self, obj_type: ObjectType) -> list[BACnetObject]:
"""Retrieve all objects matching a given type.
:param obj_type: The :class:`ObjectType` to filter by.
:returns: List of matching :class:`BACnetObject` instances.
"""
type_bucket = self._type_index.get(obj_type)
return list(type_bucket.values()) if type_bucket else []
@property
def object_list(self) -> list[ObjectIdentifier]:
"""List of all :class:`ObjectIdentifier` values in the database."""
return list(self._objects.keys())
def __len__(self) -> int:
"""Return the number of objects in the database."""
return len(self._objects)
def __iter__(self) -> Iterator[ObjectIdentifier]:
return iter(self._objects)
def __contains__(self, object_id: object) -> bool:
return object_id in self._objects
[docs]
def values(self) -> Iterator[BACnetObject]:
"""Iterate over all :class:`BACnetObject` instances in the database."""
return iter(self._objects.values())
# Object type registry for factory creation
_OBJECT_REGISTRY: dict[ObjectType, type[BACnetObject]] = {}
[docs]
def register_object_type(cls: type[BACnetObject]) -> type[BACnetObject]:
"""Class decorator to register a :class:`BACnetObject` subclass in the factory.
:param cls: The :class:`BACnetObject` subclass to register.
:returns: The same class, unmodified.
"""
_OBJECT_REGISTRY[cls.OBJECT_TYPE] = cls
return cls
[docs]
def create_object(
object_type: ObjectType,
instance_number: int,
**properties: Any,
) -> BACnetObject:
"""Create a :class:`BACnetObject` by type using the registry.
:param object_type: BACnet object type.
:param instance_number: Instance number for the new object.
:param properties: Initial property values.
:returns: New :class:`BACnetObject` instance.
:raises BACnetError: If the object type is not registered.
"""
cls = _OBJECT_REGISTRY.get(object_type)
if cls is None:
raise BACnetError(ErrorClass.OBJECT, ErrorCode.UNSUPPORTED_OBJECT_TYPE)
return cls(instance_number, **properties)