"""Default BACnet server handlers per ASHRAE 135-2016.
Provides handlers for Who-Is, ReadProperty, WriteProperty,
ReadPropertyMultiple, WritePropertyMultiple, and ReadRange that
work with the local ObjectDatabase and DeviceObject.
"""
from __future__ import annotations
import contextlib
import hmac
import logging
from typing import TYPE_CHECKING, Any
from bac_py.app._object_type_sets import ANALOG_TYPES
from bac_py.app.audit import AuditManager
from bac_py.encoding.primitives import (
decode_all_application_values,
decode_and_unwrap,
encode_application_object_id,
encode_property_value,
)
from bac_py.network.address import GLOBAL_BROADCAST
from bac_py.objects.base import _OBJECT_REGISTRY, create_object
from bac_py.objects.file import FileObject
from bac_py.services.alarm_summary import (
AlarmSummary,
EnrollmentSummary,
EventSummary,
GetAlarmSummaryACK,
GetAlarmSummaryRequest,
GetEnrollmentSummaryACK,
GetEnrollmentSummaryRequest,
GetEventInformationACK,
GetEventInformationRequest,
)
from bac_py.services.audit import (
AuditLogQueryACK,
AuditLogQueryRequest,
ConfirmedAuditNotificationRequest,
UnconfirmedAuditNotificationRequest,
)
from bac_py.services.cov import (
COVNotificationMultipleRequest,
SubscribeCOVPropertyMultipleRequest,
SubscribeCOVPropertyRequest,
SubscribeCOVRequest,
)
from bac_py.services.device_discovery import WhoAmIRequest, YouAreRequest
from bac_py.services.device_mgmt import (
DeviceCommunicationControlRequest,
ReinitializeDeviceRequest,
TimeSynchronizationRequest,
UTCTimeSynchronizationRequest,
)
from bac_py.services.errors import BACnetError, BACnetRejectError
from bac_py.services.event_notification import (
AcknowledgeAlarmRequest,
EventNotificationRequest,
)
from bac_py.services.file_access import (
AtomicReadFileACK,
AtomicReadFileRequest,
AtomicWriteFileACK,
AtomicWriteFileRequest,
RecordReadACK,
StreamReadAccess,
StreamReadACK,
StreamWriteAccess,
)
from bac_py.services.list_element import AddListElementRequest, RemoveListElementRequest
from bac_py.services.object_mgmt import CreateObjectRequest, DeleteObjectRequest
from bac_py.services.read_property import ReadPropertyACK, ReadPropertyRequest
from bac_py.services.read_property_multiple import (
PropertyReference,
ReadAccessResult,
ReadPropertyMultipleACK,
ReadPropertyMultipleRequest,
ReadResultElement,
)
from bac_py.services.read_range import (
RangeByPosition,
ReadRangeACK,
ReadRangeRequest,
ResultFlags,
)
from bac_py.services.text_message import (
ConfirmedTextMessageRequest,
UnconfirmedTextMessageRequest,
)
from bac_py.services.virtual_terminal import (
VTCloseRequest,
VTDataACK,
VTDataRequest,
VTOpenACK,
VTOpenRequest,
)
from bac_py.services.who_has import IHaveRequest, WhoHasRequest
from bac_py.services.who_is import IAmRequest, WhoIsRequest
from bac_py.services.write_group import WriteGroupRequest
from bac_py.services.write_property import WritePropertyRequest
from bac_py.services.write_property_multiple import WritePropertyMultipleRequest
from bac_py.types.constructed import BACnetTimeStamp
from bac_py.types.enums import (
AcknowledgmentFilter,
AuditOperation,
BackupAndRestoreState,
ConfirmedServiceChoice,
DeviceStatus,
ErrorClass,
ErrorCode,
EventState,
EventType,
NotifyType,
ObjectType,
PropertyIdentifier,
ReinitializedState,
RejectReason,
UnconfirmedServiceChoice,
)
from bac_py.types.primitives import BitString, ObjectIdentifier
if TYPE_CHECKING:
from bac_py.app.application import BACnetApplication
from bac_py.network.address import BACnetAddress
from bac_py.objects.base import BACnetObject, ObjectDatabase
from bac_py.objects.device import DeviceObject
logger = logging.getLogger(__name__)
def _bitstring_from_bits(bit_positions: set[int]) -> BitString:
"""Build a :class:`BitString` with specified bit positions set to ``1``.
:param bit_positions: Set of bit indices (0-based, MSB-first) to set.
:returns: :class:`BitString` with the specified bits set.
"""
if not bit_positions:
return BitString(b"\x00", 0)
max_bit = max(bit_positions)
byte_count = (max_bit // 8) + 1
unused_bits = (byte_count * 8) - (max_bit + 1)
buf = bytearray(byte_count)
for bit in bit_positions:
byte_index = bit // 8
bit_index = 7 - (bit % 8)
buf[byte_index] |= 1 << bit_index
return BitString(bytes(buf), unused_bits)
def _encode_property_value(value: Any, obj_type: ObjectType | None = None) -> bytes:
"""Encode a property value to application-tagged bytes.
Thin wrapper around ``encode_property_value`` that converts
``TypeError`` to :class:`BACnetError` for the server error response path.
:param value: The value to encode.
:param obj_type: Optional object type. When the object is an analog type,
integers are encoded as Real instead of Unsigned.
"""
try:
return encode_property_value(
value, int_as_real=obj_type in ANALOG_TYPES if obj_type is not None else False
)
except TypeError:
raise BACnetError(ErrorClass.PROPERTY, ErrorCode.OTHER) from None
[docs]
class DefaultServerHandlers:
"""Standard BACnet service handlers for a server device.
Registers handlers for ReadProperty, WriteProperty, Who-Is,
ReadPropertyMultiple, WritePropertyMultiple, and ReadRange
with the application's service registry.
"""
def __init__(
self,
app: BACnetApplication,
object_db: ObjectDatabase,
device: DeviceObject,
) -> None:
"""Initialise default server handlers.
:param app: The parent application (used for service registration
and sending unconfirmed responses).
:param object_db: Object database to serve property reads/writes from.
:param device: The local device object (used for Who-Is matching
and wildcard resolution).
"""
self._app = app
self._db = object_db
self._device = device
self._audit_manager = AuditManager(object_db)
[docs]
def register(self) -> None:
"""Register all default handlers with the application."""
registry = self._app.service_registry
logger.debug("registering default server handlers")
registry.register_confirmed(
ConfirmedServiceChoice.READ_PROPERTY,
self.handle_read_property,
)
registry.register_confirmed(
ConfirmedServiceChoice.WRITE_PROPERTY,
self.handle_write_property,
)
registry.register_confirmed(
ConfirmedServiceChoice.READ_PROPERTY_MULTIPLE,
self.handle_read_property_multiple,
)
registry.register_confirmed(
ConfirmedServiceChoice.WRITE_PROPERTY_MULTIPLE,
self.handle_write_property_multiple,
)
registry.register_confirmed(
ConfirmedServiceChoice.READ_RANGE,
self.handle_read_range,
)
registry.register_confirmed(
ConfirmedServiceChoice.SUBSCRIBE_COV,
self.handle_subscribe_cov,
)
registry.register_confirmed(
ConfirmedServiceChoice.DEVICE_COMMUNICATION_CONTROL,
self.handle_device_communication_control,
)
registry.register_confirmed(
ConfirmedServiceChoice.REINITIALIZE_DEVICE,
self.handle_reinitialize_device,
)
registry.register_confirmed(
ConfirmedServiceChoice.ATOMIC_READ_FILE,
self.handle_atomic_read_file,
)
registry.register_confirmed(
ConfirmedServiceChoice.ATOMIC_WRITE_FILE,
self.handle_atomic_write_file,
)
registry.register_confirmed(
ConfirmedServiceChoice.CREATE_OBJECT,
self.handle_create_object,
)
registry.register_confirmed(
ConfirmedServiceChoice.DELETE_OBJECT,
self.handle_delete_object,
)
registry.register_confirmed(
ConfirmedServiceChoice.ADD_LIST_ELEMENT,
self.handle_add_list_element,
)
registry.register_confirmed(
ConfirmedServiceChoice.REMOVE_LIST_ELEMENT,
self.handle_remove_list_element,
)
registry.register_unconfirmed(
UnconfirmedServiceChoice.WHO_IS,
self.handle_who_is,
)
registry.register_unconfirmed(
UnconfirmedServiceChoice.WHO_HAS,
self.handle_who_has,
)
registry.register_unconfirmed(
UnconfirmedServiceChoice.TIME_SYNCHRONIZATION,
self.handle_time_synchronization,
)
registry.register_unconfirmed(
UnconfirmedServiceChoice.UTC_TIME_SYNCHRONIZATION,
self.handle_utc_time_synchronization,
)
registry.register_confirmed(
ConfirmedServiceChoice.ACKNOWLEDGE_ALARM,
self.handle_acknowledge_alarm,
)
registry.register_confirmed(
ConfirmedServiceChoice.CONFIRMED_EVENT_NOTIFICATION,
self.handle_confirmed_event_notification,
)
registry.register_confirmed(
ConfirmedServiceChoice.GET_ALARM_SUMMARY,
self.handle_get_alarm_summary,
)
registry.register_confirmed(
ConfirmedServiceChoice.GET_ENROLLMENT_SUMMARY,
self.handle_get_enrollment_summary,
)
registry.register_confirmed(
ConfirmedServiceChoice.GET_EVENT_INFORMATION,
self.handle_get_event_information,
)
registry.register_unconfirmed(
UnconfirmedServiceChoice.UNCONFIRMED_EVENT_NOTIFICATION,
self.handle_unconfirmed_event_notification,
)
registry.register_confirmed(
ConfirmedServiceChoice.SUBSCRIBE_COV_PROPERTY,
self.handle_subscribe_cov_property,
)
registry.register_confirmed(
ConfirmedServiceChoice.SUBSCRIBE_COV_PROPERTY_MULTIPLE,
self.handle_subscribe_cov_property_multiple,
)
registry.register_confirmed(
ConfirmedServiceChoice.CONFIRMED_COV_NOTIFICATION_MULTIPLE,
self.handle_confirmed_cov_notification_multiple,
)
registry.register_unconfirmed(
UnconfirmedServiceChoice.UNCONFIRMED_COV_NOTIFICATION_MULTIPLE,
self.handle_unconfirmed_cov_notification_multiple,
)
registry.register_confirmed(
ConfirmedServiceChoice.CONFIRMED_TEXT_MESSAGE,
self.handle_confirmed_text_message,
)
registry.register_unconfirmed(
UnconfirmedServiceChoice.UNCONFIRMED_TEXT_MESSAGE,
self.handle_unconfirmed_text_message,
)
registry.register_unconfirmed(
UnconfirmedServiceChoice.WRITE_GROUP,
self.handle_write_group,
)
registry.register_unconfirmed(
UnconfirmedServiceChoice.WHO_AM_I,
self.handle_who_am_i,
)
registry.register_unconfirmed(
UnconfirmedServiceChoice.YOU_ARE,
self.handle_you_are,
)
registry.register_confirmed(
ConfirmedServiceChoice.VT_OPEN,
self.handle_vt_open,
)
registry.register_confirmed(
ConfirmedServiceChoice.VT_CLOSE,
self.handle_vt_close,
)
registry.register_confirmed(
ConfirmedServiceChoice.VT_DATA,
self.handle_vt_data,
)
registry.register_confirmed(
ConfirmedServiceChoice.AUDIT_LOG_QUERY,
self.handle_audit_log_query,
)
registry.register_confirmed(
ConfirmedServiceChoice.CONFIRMED_AUDIT_NOTIFICATION,
self.handle_confirmed_audit_notification,
)
registry.register_unconfirmed(
UnconfirmedServiceChoice.UNCONFIRMED_AUDIT_NOTIFICATION,
self.handle_unconfirmed_audit_notification,
)
logger.debug(
"registered %d confirmed and %d unconfirmed handlers",
len(registry._confirmed),
len(registry._unconfirmed),
)
# Auto-compute Protocol_Services_Supported from registered handlers
# Per Clause 12.11.44: confirmed services at bit positions matching
# their ConfirmedServiceChoice value (0-31), unconfirmed services
# at bit position 32 + UnconfirmedServiceChoice value.
service_bits: set[int] = set()
for sc in registry._confirmed:
service_bits.add(sc)
for sc in registry._unconfirmed:
service_bits.add(32 + sc)
self._device._properties[PropertyIdentifier.PROTOCOL_SERVICES_SUPPORTED] = (
_bitstring_from_bits(service_bits)
)
# Auto-compute Protocol_Object_Types_Supported from registry
# Per Clause 12.11.43: bit position = ObjectType value.
obj_type_bits: set[int] = set()
for obj_type in _OBJECT_REGISTRY:
obj_type_bits.add(int(obj_type))
self._device._properties[PropertyIdentifier.PROTOCOL_OBJECT_TYPES_SUPPORTED] = (
_bitstring_from_bits(obj_type_bits)
)
[docs]
async def handle_read_property(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes:
"""Handle ReadProperty-Request per Clause 15.5.
Decodes the request, looks up the object and property in the
database, and returns the encoded :class:`ReadPropertyACK`.
:param service_choice: Confirmed service choice code.
:param data: Raw service request bytes.
:param source: Address of the requesting device.
:returns: Encoded ReadProperty-ACK service data.
:raises BACnetError: If the object or property is not found.
"""
request = ReadPropertyRequest.decode(data)
obj_id = self._resolve_object_id(request.object_identifier)
logger.debug(
"handling read_property %s %s from %s",
obj_id,
request.property_identifier.name,
source,
)
obj = self._db.get(obj_id)
if obj is None:
logger.warning("read_property: unknown object %s from %s", obj_id, source)
raise BACnetError(ErrorClass.OBJECT, ErrorCode.UNKNOWN_OBJECT)
# May raise BACnetError for unknown/unsupported properties
value = self._read_object_property(
obj,
request.property_identifier,
request.property_array_index,
)
encoded_value = _encode_property_value(value, obj_id.object_type)
ack = ReadPropertyACK(
object_identifier=obj_id,
property_identifier=request.property_identifier,
property_array_index=request.property_array_index,
property_value=encoded_value,
)
return ack.encode()
[docs]
async def handle_write_property(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes | None:
"""Handle WriteProperty-Request per Clause 15.9.
Decodes the request, looks up the object in the database,
and writes the property value. Returns ``None`` for SimpleACK.
:param service_choice: Confirmed service choice code.
:param data: Raw service request bytes.
:param source: Address of the requesting device.
:returns: ``None`` (SimpleACK response).
:raises BACnetError: If the object or property is not found,
or the write is not permitted.
"""
request = WritePropertyRequest.decode(data)
obj_id = self._resolve_object_id(request.object_identifier)
logger.debug(
"handling write_property %s %s from %s",
obj_id,
request.property_identifier.name,
source,
)
obj = self._db.get(obj_id)
if obj is None:
logger.warning("write_property: unknown object %s from %s", obj_id, source)
raise BACnetError(ErrorClass.OBJECT, ErrorCode.UNKNOWN_OBJECT)
# Decode raw application-tagged bytes to native Python types
# before storing -- mirrors the C reference library behavior
# where incoming values are decoded before being applied.
try:
write_value = decode_and_unwrap(request.property_value)
except (ValueError, IndexError):
logger.warning(
"write_property: invalid data type for %s %s from %s",
obj_id,
request.property_identifier.name,
source,
)
raise BACnetError(ErrorClass.PROPERTY, ErrorCode.INVALID_DATA_TYPE) from None
await obj.async_write_property(
request.property_identifier,
write_value,
request.priority,
request.property_array_index,
)
cov_manager = self._app.cov_manager
if cov_manager is not None:
cov_manager.check_and_notify(obj, request.property_identifier)
self._audit_manager.record_operation(
operation=AuditOperation.WRITE,
target_object=obj_id,
target_property=request.property_identifier,
target_array_index=request.property_array_index,
target_priority=request.priority,
target_value=request.property_value,
)
return None
[docs]
async def handle_subscribe_cov(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes | None:
"""Handle SubscribeCOV-Request per Clause 13.14.
Returns ``None`` (SimpleACK) on success.
:param service_choice: Confirmed service choice code.
:param data: Raw service request bytes.
:param source: Address of the requesting device.
:raises BACnetError: If the monitored object does not exist or
COV is not supported.
"""
request = SubscribeCOVRequest.decode(data)
obj_id = self._resolve_object_id(request.monitored_object_identifier)
logger.debug("handling subscribe_cov %s from %s", obj_id, source)
obj = self._db.get(obj_id)
if obj is None:
logger.warning("subscribe_cov: unknown object %s from %s", obj_id, source)
raise BACnetError(ErrorClass.OBJECT, ErrorCode.UNKNOWN_OBJECT)
cov_manager = self._app.cov_manager
if cov_manager is None:
logger.warning("subscribe_cov: COV not supported, denied request from %s", source)
raise BACnetError(ErrorClass.SERVICES, ErrorCode.SERVICE_REQUEST_DENIED)
# Per Clause 13.14.1.1.4: if Lifetime is present then
# Issue Confirmed Notifications shall be present.
if request.lifetime is not None and request.issue_confirmed_notifications is None:
raise BACnetRejectError(RejectReason.MISSING_REQUIRED_PARAMETER)
if request.is_cancellation:
cov_manager.unsubscribe(source, request.subscriber_process_identifier, obj_id)
else:
cov_manager.subscribe(source, request, self._db)
return None # SimpleACK
[docs]
async def handle_subscribe_cov_property(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes | None:
"""Handle SubscribeCOVProperty-Request per Clause 13.15.
:param service_choice: Confirmed service choice code.
:param data: Raw service request bytes.
:param source: Address of the requesting device.
"""
request = SubscribeCOVPropertyRequest.decode(data)
obj_id = self._resolve_object_id(request.monitored_object_identifier)
logger.debug("handling subscribe_cov_property %s from %s", obj_id, source)
obj = self._db.get(obj_id)
if obj is None:
logger.warning("subscribe_cov_property: unknown object %s from %s", obj_id, source)
raise BACnetError(ErrorClass.OBJECT, ErrorCode.UNKNOWN_OBJECT)
cov_manager = self._app.cov_manager
if cov_manager is None:
logger.warning(
"subscribe_cov_property: COV not supported, denied request from %s",
source,
)
raise BACnetError(ErrorClass.SERVICES, ErrorCode.SERVICE_REQUEST_DENIED)
cov_manager.subscribe_property(source, request, self._db)
return None # SimpleACK
[docs]
async def handle_subscribe_cov_property_multiple(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes | None:
"""Handle SubscribeCOVPropertyMultiple-Request per Clause 13.16.
:param service_choice: Confirmed service choice code.
:param data: Raw service request bytes.
:param source: Address of the requesting device.
"""
request = SubscribeCOVPropertyMultipleRequest.decode(data)
logger.debug("handling subscribe_cov_property_multiple from %s", source)
cov_manager = self._app.cov_manager
if cov_manager is None:
logger.warning(
"subscribe_cov_property_multiple: COV not supported, denied request from %s",
source,
)
raise BACnetError(ErrorClass.SERVICES, ErrorCode.SERVICE_REQUEST_DENIED)
cov_manager.subscribe_property_multiple(source, request, self._db)
return None # SimpleACK
[docs]
async def handle_confirmed_cov_notification_multiple(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes | None:
"""Handle ConfirmedCOVNotification-Multiple per Clause 13.17.
:param service_choice: Confirmed service choice code.
:param data: Raw service request bytes.
:param source: Address of the requesting device.
"""
request = COVNotificationMultipleRequest.decode(data)
logger.debug(
"ConfirmedCOVNotification-Multiple from %s: %d notifications",
source,
len(request.list_of_cov_notifications),
)
return None # SimpleACK
[docs]
async def handle_unconfirmed_cov_notification_multiple(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> None:
"""Handle UnconfirmedCOVNotification-Multiple per Clause 13.18.
:param service_choice: Unconfirmed service choice code.
:param data: Raw service request bytes.
:param source: Address of the sending device.
"""
request = COVNotificationMultipleRequest.decode(data)
logger.debug(
"UnconfirmedCOVNotification-Multiple from %s: %d notifications",
source,
len(request.list_of_cov_notifications),
)
[docs]
async def handle_who_is(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> None:
"""Handle Who-Is-Request per Clause 16.10.
Checks if the local device instance is within the requested
range and responds with an I-Am if so.
:param service_choice: Unconfirmed service choice code.
:param data: Raw service request bytes.
:param source: Address of the requesting device.
"""
request = WhoIsRequest.decode(data)
logger.debug(
"handling who_is low=%s high=%s from %s",
request.low_limit,
request.high_limit,
source,
)
instance = self._device.object_identifier.instance_number
# Check if we are in range
if (
request.low_limit is not None
and request.high_limit is not None
and not (request.low_limit <= instance <= request.high_limit)
):
return
# Send I-Am response (per Clause 16.10.2, always send to global broadcast)
config = self._app.config
segmentation = self._device.read_property(PropertyIdentifier.SEGMENTATION_SUPPORTED)
iam = IAmRequest(
object_identifier=ObjectIdentifier(ObjectType.DEVICE, instance),
max_apdu_length=config.max_apdu_length,
segmentation_supported=segmentation,
vendor_id=config.vendor_id,
)
self._app.unconfirmed_request(
destination=GLOBAL_BROADCAST,
service_choice=UnconfirmedServiceChoice.I_AM,
service_data=iam.encode(),
)
def _read_object_property(
self,
obj: BACnetObject,
prop_id: PropertyIdentifier,
array_index: int | None = None,
) -> Any:
"""Read a property from an object, with computed property overrides.
Intercepts OBJECT_LIST reads on the device to return the live
database object list, and ACTIVE_COV_SUBSCRIPTIONS to return
the current COV subscriptions from the COV manager.
:param obj: The BACnet object to read from.
:param prop_id: Property identifier to read.
:param array_index: Optional array index for array properties.
:returns: The property value (type varies by property).
:raises BACnetError: If the property is not found or the array
index is out of range.
"""
if obj is self._device:
if prop_id == PropertyIdentifier.OBJECT_LIST:
full_list = self._db.object_list
if array_index is not None:
if array_index == 0:
return len(full_list)
if 1 <= array_index <= len(full_list):
return full_list[array_index - 1]
raise BACnetError(ErrorClass.PROPERTY, ErrorCode.INVALID_ARRAY_INDEX)
return full_list
if prop_id == PropertyIdentifier.ACTIVE_COV_SUBSCRIPTIONS:
cov_manager = self._app.cov_manager
if cov_manager is None:
return []
return cov_manager.get_active_subscriptions()
return obj.read_property(prop_id, array_index)
def _resolve_object_id(self, obj_id: ObjectIdentifier) -> ObjectIdentifier:
"""Resolve wildcard device instance ``4194303`` to the local device.
:param obj_id: Object identifier to resolve.
:returns: The resolved :class:`ObjectIdentifier`, substituting the
local device identifier when a wildcard is used.
"""
if obj_id.object_type == ObjectType.DEVICE and obj_id.instance_number == 0x3FFFFF:
return self._device.object_identifier
return obj_id
def _expand_property_references(
self,
obj: BACnetObject,
refs: list[PropertyReference],
) -> list[PropertyReference]:
"""Expand ALL/REQUIRED/OPTIONAL into concrete property references.
Per Clause 15.7.3.2, the special property identifiers ALL, REQUIRED,
and OPTIONAL cause the server to expand the request to include
all/required/optional properties of the object.
:param obj: The BACnet object whose property definitions are used.
:param refs: List of property references, possibly containing
special identifiers.
:returns: Expanded list of concrete :class:`PropertyReference` instances.
"""
result: list[PropertyReference] = []
for ref in refs:
if ref.property_identifier == PropertyIdentifier.ALL:
for pid, pdef in obj.PROPERTY_DEFINITIONS.items():
if pid in obj._properties or pdef.required:
result.append(PropertyReference(pid))
# Property_List is computed, always present
if not any(
r.property_identifier == PropertyIdentifier.PROPERTY_LIST for r in result
):
result.append(PropertyReference(PropertyIdentifier.PROPERTY_LIST))
elif ref.property_identifier == PropertyIdentifier.REQUIRED:
for pid, pdef in obj.PROPERTY_DEFINITIONS.items():
if pdef.required:
result.append(PropertyReference(pid))
elif ref.property_identifier == PropertyIdentifier.OPTIONAL:
for pid, pdef in obj.PROPERTY_DEFINITIONS.items():
if not pdef.required and pid in obj._properties:
result.append(PropertyReference(pid))
else:
result.append(ref)
return result
[docs]
async def handle_read_property_multiple(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes:
"""Handle ReadPropertyMultiple-Request per Clause 15.7.
Decodes the request, reads all requested properties from the
database, and returns the encoded ReadPropertyMultiple-ACK.
Per-property errors are embedded in the ACK (no top-level error).
Supports ALL, REQUIRED, and OPTIONAL special property identifiers
per Clause 15.7.3.2.
:param service_choice: Confirmed service choice code.
:param data: Raw service request bytes.
:param source: Address of the requesting device.
:returns: Encoded ReadPropertyMultiple-ACK service data.
"""
request = ReadPropertyMultipleRequest.decode(data)
logger.debug(
"handling read_property_multiple (%d specs) from %s",
len(request.list_of_read_access_specs),
source,
)
results: list[ReadAccessResult] = []
for spec in request.list_of_read_access_specs:
obj_id = self._resolve_object_id(spec.object_identifier)
obj = self._db.get(obj_id)
elements: list[ReadResultElement] = []
# Expand ALL/REQUIRED/OPTIONAL when the object exists
refs = (
self._expand_property_references(obj, spec.list_of_property_references)
if obj is not None
else spec.list_of_property_references
)
for ref in refs:
if obj is None:
elements.append(
ReadResultElement(
property_identifier=ref.property_identifier,
property_array_index=ref.property_array_index,
property_access_error=(
ErrorClass.OBJECT,
ErrorCode.UNKNOWN_OBJECT,
),
)
)
continue
try:
value = self._read_object_property(
obj,
ref.property_identifier,
ref.property_array_index,
)
encoded_value = _encode_property_value(value, obj_id.object_type)
elements.append(
ReadResultElement(
property_identifier=ref.property_identifier,
property_array_index=ref.property_array_index,
property_value=encoded_value,
)
)
except BACnetError as e:
elements.append(
ReadResultElement(
property_identifier=ref.property_identifier,
property_array_index=ref.property_array_index,
property_access_error=(e.error_class, e.error_code),
)
)
results.append(
ReadAccessResult(
object_identifier=obj_id,
list_of_results=elements,
)
)
ack = ReadPropertyMultipleACK(list_of_read_access_results=results)
return ack.encode()
[docs]
async def handle_write_property_multiple(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes | None:
"""Handle WritePropertyMultiple-Request per Clause 15.10.
Uses a two-pass approach for atomicity: validates all writes
first, then applies them. Either all writes succeed or none
are applied.
:param service_choice: Confirmed service choice code.
:param data: Raw service request bytes.
:param source: Address of the requesting device.
:returns: ``None`` (SimpleACK response) on full success.
:raises BACnetError: On first validation or write failure.
"""
from bac_py.objects.base import PropertyAccess
request = WritePropertyMultipleRequest.decode(data)
logger.debug(
"handling write_property_multiple (%d specs) from %s",
len(request.list_of_write_access_specs),
source,
)
# Pass 1: Validate all writes (object existence, property access)
validated: list[tuple[BACnetObject, list[Any]]] = []
for spec in request.list_of_write_access_specs:
obj_id = self._resolve_object_id(spec.object_identifier)
obj = self._db.get(obj_id)
if obj is None:
logger.warning(
"write_property_multiple: unknown object %s from %s", obj_id, source
)
raise BACnetError(ErrorClass.OBJECT, ErrorCode.UNKNOWN_OBJECT)
for pv in spec.list_of_properties:
prop_def = obj.PROPERTY_DEFINITIONS.get(pv.property_identifier)
if prop_def is None:
logger.warning(
"write_property_multiple: unknown property %s on %s from %s",
pv.property_identifier.name,
obj_id,
source,
)
raise BACnetError(ErrorClass.PROPERTY, ErrorCode.UNKNOWN_PROPERTY)
if prop_def.access == PropertyAccess.READ_ONLY and not (
pv.property_identifier == PropertyIdentifier.PRESENT_VALUE
and obj._properties.get(PropertyIdentifier.OUT_OF_SERVICE) is True
):
logger.warning(
"write_property_multiple: write access denied for %s on %s from %s",
pv.property_identifier.name,
obj_id,
source,
)
raise BACnetError(ErrorClass.PROPERTY, ErrorCode.WRITE_ACCESS_DENIED)
validated.append((obj, spec.list_of_properties))
# Pass 2: Apply all writes
for obj, properties in validated:
for pv in properties:
try:
write_value = decode_and_unwrap(pv.value)
except (ValueError, IndexError):
raise BACnetError(ErrorClass.PROPERTY, ErrorCode.INVALID_DATA_TYPE) from None
await obj.async_write_property(
pv.property_identifier,
write_value,
pv.priority,
pv.property_array_index,
)
# Trigger COV notification checks after each successful write
cov_manager = self._app.cov_manager
if cov_manager is not None:
cov_manager.check_and_notify(obj, pv.property_identifier)
self._audit_manager.record_operation(
operation=AuditOperation.WRITE,
target_object=obj.object_identifier,
target_property=pv.property_identifier,
target_array_index=pv.property_array_index,
target_priority=pv.priority,
)
return None
[docs]
async def handle_read_range(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes:
"""Handle ReadRange-Request per Clause 15.8.
Decodes the request, reads the list/array property, applies
the range qualifier, and returns the encoded ReadRange-ACK.
:returns: Encoded ReadRange-ACK service data.
:raises BACnetError: If the object or property is not found.
"""
request = ReadRangeRequest.decode(data)
obj_id = self._resolve_object_id(request.object_identifier)
logger.debug(
"handling read_range %s %s from %s",
obj_id,
request.property_identifier.name,
source,
)
obj = self._db.get(obj_id)
if obj is None:
logger.warning("read_range: unknown object %s from %s", obj_id, source)
raise BACnetError(ErrorClass.OBJECT, ErrorCode.UNKNOWN_OBJECT)
value = self._read_object_property(
obj,
request.property_identifier,
request.property_array_index,
)
if not isinstance(value, list):
logger.warning(
"read_range: property %s on %s is not a list",
request.property_identifier.name,
obj_id,
)
raise BACnetError(ErrorClass.PROPERTY, ErrorCode.PROPERTY_IS_NOT_A_LIST)
total = len(value)
# Apply range qualifier
if isinstance(request.range, RangeByPosition):
ref_idx = request.range.reference_index
count = request.range.count
if count >= 0:
start = max(0, ref_idx - 1)
end = min(total, start + count)
else:
end = min(total, ref_idx)
start = max(0, end + count)
items = value[start:end]
is_first = start == 0
is_last = end >= total
else:
# No range or unsupported range type — return all items
items = value
start = 0
is_first = True
is_last = True
buf = bytearray()
for item in items:
buf.extend(_encode_property_value(item, obj_id.object_type))
item_data = bytes(buf)
more_items = not is_last
ack = ReadRangeACK(
object_identifier=obj_id,
property_identifier=request.property_identifier,
result_flags=ResultFlags(
first_item=is_first,
last_item=is_last,
more_items=more_items,
),
item_count=len(items),
item_data=item_data,
property_array_index=request.property_array_index,
)
return ack.encode()
# --- Device management handlers ---
def _validate_password(self, request_password: str | None) -> None:
"""Validate a request password against the configured device password.
Uses constant-time comparison (``hmac.compare_digest``) to prevent
timing-based password extraction attacks.
:raises BACnetError: If the password does not match or is unexpected.
"""
config_password = self._app.config.password
if config_password is not None:
if request_password is None or not hmac.compare_digest(
request_password.encode("utf-8"),
config_password.encode("utf-8"),
):
logger.warning("password validation failed: incorrect password")
raise BACnetError(ErrorClass.SECURITY, ErrorCode.PASSWORD_FAILURE)
elif request_password is not None:
# Device has no password configured but request includes one
logger.warning("password validation failed: unexpected password provided")
raise BACnetError(ErrorClass.SECURITY, ErrorCode.PASSWORD_FAILURE)
[docs]
async def handle_device_communication_control(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes | None:
"""Handle DeviceCommunicationControl-Request per Clause 16.1.
Sets the application DCC state and optionally starts a timer
to auto-re-enable after the specified duration.
:returns: ``None`` (SimpleACK response).
:raises BACnetError: If the password does not match (Clause 16.1.3.1).
"""
request = DeviceCommunicationControlRequest.decode(data)
self._validate_password(request.password)
logger.info(
"DeviceCommunicationControl from %s: enable_disable=%s, duration=%s",
source,
request.enable_disable,
request.time_duration,
)
self._app.set_dcc_state(request.enable_disable, request.time_duration)
return None
[docs]
async def handle_reinitialize_device(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes | None:
"""Handle ReinitializeDevice-Request per Clause 16.4 & 19.1.
Manages backup/restore state transitions on the Device object
per Clause 19.1:
- ``START_BACKUP`` → ``system_status=BACKUP_IN_PROGRESS``
- ``END_BACKUP`` → ``system_status=OPERATIONAL``
- ``START_RESTORE`` → ``system_status=DOWNLOAD_IN_PROGRESS``
- ``END_RESTORE`` → ``system_status=OPERATIONAL``
- ``ABORT_RESTORE`` → ``system_status=OPERATIONAL``
:returns: ``None`` (SimpleACK response).
:raises BACnetError: If the password does not match or the
state transition is invalid.
"""
request = ReinitializeDeviceRequest.decode(data)
self._validate_password(request.password)
device = self._device
state = request.reinitialized_state
if state in (
ReinitializedState.START_BACKUP,
ReinitializedState.END_BACKUP,
ReinitializedState.START_RESTORE,
ReinitializedState.END_RESTORE,
ReinitializedState.ABORT_RESTORE,
):
br_state = device._properties.get(
PropertyIdentifier.BACKUP_AND_RESTORE_STATE,
BackupAndRestoreState.IDLE,
)
if state == ReinitializedState.START_BACKUP:
if br_state != BackupAndRestoreState.IDLE:
raise BACnetError(ErrorClass.DEVICE, ErrorCode.OTHER)
device._properties[PropertyIdentifier.SYSTEM_STATUS] = (
DeviceStatus.BACKUP_IN_PROGRESS
)
device._properties[PropertyIdentifier.BACKUP_AND_RESTORE_STATE] = (
BackupAndRestoreState.PREPARING_FOR_BACKUP
)
elif state == ReinitializedState.END_BACKUP:
if br_state not in (
BackupAndRestoreState.PREPARING_FOR_BACKUP,
BackupAndRestoreState.PERFORMING_A_BACKUP,
):
raise BACnetError(ErrorClass.DEVICE, ErrorCode.OTHER)
device._properties[PropertyIdentifier.SYSTEM_STATUS] = DeviceStatus.OPERATIONAL
device._properties[PropertyIdentifier.BACKUP_AND_RESTORE_STATE] = (
BackupAndRestoreState.IDLE
)
elif state == ReinitializedState.START_RESTORE:
if br_state != BackupAndRestoreState.IDLE:
raise BACnetError(ErrorClass.DEVICE, ErrorCode.OTHER)
device._properties[PropertyIdentifier.SYSTEM_STATUS] = (
DeviceStatus.DOWNLOAD_IN_PROGRESS
)
device._properties[PropertyIdentifier.BACKUP_AND_RESTORE_STATE] = (
BackupAndRestoreState.PREPARING_FOR_RESTORE
)
elif state == ReinitializedState.END_RESTORE:
if br_state not in (
BackupAndRestoreState.PREPARING_FOR_RESTORE,
BackupAndRestoreState.PERFORMING_A_RESTORE,
):
raise BACnetError(ErrorClass.DEVICE, ErrorCode.OTHER)
device._properties[PropertyIdentifier.SYSTEM_STATUS] = DeviceStatus.OPERATIONAL
device._properties[PropertyIdentifier.BACKUP_AND_RESTORE_STATE] = (
BackupAndRestoreState.IDLE
)
device._properties[PropertyIdentifier.LAST_RESTORE_TIME] = BACnetTimeStamp(
choice=1, value=0
)
elif state == ReinitializedState.ABORT_RESTORE:
if br_state not in (
BackupAndRestoreState.PREPARING_FOR_RESTORE,
BackupAndRestoreState.PERFORMING_A_RESTORE,
):
raise BACnetError(ErrorClass.DEVICE, ErrorCode.OTHER)
device._properties[PropertyIdentifier.SYSTEM_STATUS] = DeviceStatus.OPERATIONAL
device._properties[PropertyIdentifier.BACKUP_AND_RESTORE_STATE] = (
BackupAndRestoreState.IDLE
)
logger.info(
"ReinitializeDevice from %s: state=%s",
source,
request.reinitialized_state,
)
return None
[docs]
async def handle_time_synchronization(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> None:
"""Handle TimeSynchronization-Request per Clause 16.7.
Logs the received time. Does not update the local clock.
"""
request = TimeSynchronizationRequest.decode(data)
logger.info(
"TimeSynchronization from %s: date=%s, time=%s",
source,
request.date,
request.time,
)
[docs]
async def handle_utc_time_synchronization(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> None:
"""Handle UTCTimeSynchronization-Request per Clause 16.8.
Logs the received UTC time. Does not update the local clock.
"""
request = UTCTimeSynchronizationRequest.decode(data)
logger.info(
"UTCTimeSynchronization from %s: date=%s, time=%s",
source,
request.date,
request.time,
)
# --- File access handlers ---
[docs]
async def handle_atomic_read_file(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes:
"""Handle AtomicReadFile-Request per Clause 14.1.
:returns: Encoded AtomicReadFile-ACK service data.
:raises BACnetError: If the object is not found or is not a
:class:`FileObject`.
"""
request = AtomicReadFileRequest.decode(data)
obj_id = self._resolve_object_id(request.file_identifier)
logger.debug("handling atomic_read_file %s from %s", obj_id, source)
obj = self._db.get(obj_id)
if obj is None:
logger.warning("atomic_read_file: unknown object %s from %s", obj_id, source)
raise BACnetError(ErrorClass.OBJECT, ErrorCode.UNKNOWN_OBJECT)
if not isinstance(obj, FileObject):
logger.warning(
"atomic_read_file: object %s is not a FileObject from %s", obj_id, source
)
raise BACnetError(ErrorClass.OBJECT, ErrorCode.INCONSISTENT_OBJECT_TYPE)
if isinstance(request.access_method, StreamReadAccess):
file_data, eof = obj.read_stream(
request.access_method.file_start_position,
request.access_method.requested_octet_count,
)
ack = AtomicReadFileACK(
end_of_file=eof,
access_method=StreamReadACK(
file_start_position=request.access_method.file_start_position,
file_data=file_data,
),
)
else:
records, eof = obj.read_records(
request.access_method.file_start_record,
request.access_method.requested_record_count,
)
ack = AtomicReadFileACK(
end_of_file=eof,
access_method=RecordReadACK(
file_start_record=request.access_method.file_start_record,
returned_record_count=len(records),
file_record_data=records,
),
)
return ack.encode()
[docs]
async def handle_atomic_write_file(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes:
"""Handle AtomicWriteFile-Request per Clause 14.2.
:returns: Encoded AtomicWriteFile-ACK service data.
:raises BACnetError: If the object is not found or is not a
:class:`FileObject`.
"""
request = AtomicWriteFileRequest.decode(data)
obj_id = self._resolve_object_id(request.file_identifier)
logger.debug("handling atomic_write_file %s from %s", obj_id, source)
obj = self._db.get(obj_id)
if obj is None:
logger.warning("atomic_write_file: unknown object %s from %s", obj_id, source)
raise BACnetError(ErrorClass.OBJECT, ErrorCode.UNKNOWN_OBJECT)
if not isinstance(obj, FileObject):
logger.warning(
"atomic_write_file: object %s is not a FileObject from %s", obj_id, source
)
raise BACnetError(ErrorClass.OBJECT, ErrorCode.INCONSISTENT_OBJECT_TYPE)
if isinstance(request.access_method, StreamWriteAccess):
actual_start = obj.write_stream(
request.access_method.file_start_position,
request.access_method.file_data,
)
ack = AtomicWriteFileACK(is_stream=True, file_start=actual_start)
else:
actual_start = obj.write_records(
request.access_method.file_start_record,
request.access_method.file_record_data,
)
ack = AtomicWriteFileACK(is_stream=False, file_start=actual_start)
return ack.encode()
# --- Object management handlers ---
[docs]
async def handle_create_object(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes:
"""Handle CreateObject-Request per Clause 15.3.
Creates a new object in the database and returns the
encoded object identifier.
:returns: Encoded APPLICATION-tagged :class:`ObjectIdentifier`.
:raises BACnetError: If the object type is unsupported or the
object identifier already exists.
"""
request = CreateObjectRequest.decode(data)
logger.debug(
"handling create_object type=%s id=%s from %s",
request.object_type,
request.object_identifier,
source,
)
if request.object_identifier is not None:
obj_type = request.object_identifier.object_type
instance = request.object_identifier.instance_number
elif request.object_type is not None:
obj_type = request.object_type
# Auto-assign instance number by finding max + 1
existing = self._db.get_objects_of_type(obj_type)
if existing:
instance = max(o.object_identifier.instance_number for o in existing) + 1
else:
instance = 1
else:
logger.warning("create_object: missing required parameter from %s", source)
raise BACnetError(ErrorClass.SERVICES, ErrorCode.MISSING_REQUIRED_PARAMETER)
kwargs: dict[str, Any] = {}
if request.list_of_initial_values:
for pv in request.list_of_initial_values:
prop_name = pv.property_identifier.name.lower()
kwargs[prop_name] = decode_and_unwrap(pv.value)
obj = create_object(obj_type, instance, **kwargs)
self._db.add(obj)
self._audit_manager.record_operation(
operation=AuditOperation.CREATE,
target_object=obj.object_identifier,
)
return encode_application_object_id(obj_type, instance)
[docs]
async def handle_delete_object(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes | None:
"""Handle DeleteObject-Request per Clause 15.4.
Removes the object from the database.
:returns: ``None`` (SimpleACK response).
:raises BACnetError: If the object does not exist or is a Device object.
"""
request = DeleteObjectRequest.decode(data)
obj_id = self._resolve_object_id(request.object_identifier)
logger.debug("handling delete_object %s from %s", obj_id, source)
self._db.remove(obj_id)
cov_manager = self._app.cov_manager
if cov_manager is not None:
cov_manager.remove_object_subscriptions(obj_id)
self._audit_manager.record_operation(
operation=AuditOperation.DELETE,
target_object=obj_id,
)
return None
[docs]
async def handle_add_list_element(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes | None:
"""Handle AddListElement-Request per Clause 15.1.
Decodes the elements from the request, validates that the target
property is a list, checks write access, and extends the list.
:returns: ``None`` (SimpleACK response).
:raises BACnetError: If the object or property is not found,
or the property is not a list or is read-only.
"""
from bac_py.objects.base import PropertyAccess
request = AddListElementRequest.decode(data)
obj_id = self._resolve_object_id(request.object_identifier)
logger.debug(
"handling add_list_element %s %s from %s",
obj_id,
request.property_identifier.name,
source,
)
obj = self._db.get(obj_id)
if obj is None:
logger.warning("add_list_element: unknown object %s from %s", obj_id, source)
raise BACnetError(ErrorClass.OBJECT, ErrorCode.UNKNOWN_OBJECT)
prop_def = obj.PROPERTY_DEFINITIONS.get(request.property_identifier)
if prop_def is None:
logger.warning(
"add_list_element: unknown property %s on %s from %s",
request.property_identifier.name,
obj_id,
source,
)
raise BACnetError(ErrorClass.PROPERTY, ErrorCode.UNKNOWN_PROPERTY)
if prop_def.access == PropertyAccess.READ_ONLY:
logger.warning(
"add_list_element: write access denied for %s on %s from %s",
request.property_identifier.name,
obj_id,
source,
)
raise BACnetError(ErrorClass.PROPERTY, ErrorCode.WRITE_ACCESS_DENIED)
current = obj._properties.get(request.property_identifier)
if not isinstance(current, list):
if current is None and prop_def.datatype is list:
current = []
obj._properties[request.property_identifier] = current
else:
logger.warning(
"add_list_element: property %s on %s is not a list",
request.property_identifier.name,
obj_id,
)
raise BACnetError(ErrorClass.PROPERTY, ErrorCode.PROPERTY_IS_NOT_A_LIST)
new_elements = decode_all_application_values(request.list_of_elements)
current.extend(new_elements)
return None
[docs]
async def handle_remove_list_element(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes | None:
"""Handle RemoveListElement-Request per Clause 15.2.
Decodes the elements from the request, validates that the target
property is a list, checks write access, and removes matching entries.
Non-matching elements are silently ignored per the standard.
:returns: ``None`` (SimpleACK response).
:raises BACnetError: If the object or property is not found,
or the property is not a list or is read-only.
"""
from bac_py.objects.base import PropertyAccess
request = RemoveListElementRequest.decode(data)
obj_id = self._resolve_object_id(request.object_identifier)
logger.debug(
"handling remove_list_element %s %s from %s",
obj_id,
request.property_identifier.name,
source,
)
obj = self._db.get(obj_id)
if obj is None:
logger.warning("remove_list_element: unknown object %s from %s", obj_id, source)
raise BACnetError(ErrorClass.OBJECT, ErrorCode.UNKNOWN_OBJECT)
prop_def = obj.PROPERTY_DEFINITIONS.get(request.property_identifier)
if prop_def is None:
logger.warning(
"remove_list_element: unknown property %s on %s from %s",
request.property_identifier.name,
obj_id,
source,
)
raise BACnetError(ErrorClass.PROPERTY, ErrorCode.UNKNOWN_PROPERTY)
if prop_def.access == PropertyAccess.READ_ONLY:
logger.warning(
"remove_list_element: write access denied for %s on %s from %s",
request.property_identifier.name,
obj_id,
source,
)
raise BACnetError(ErrorClass.PROPERTY, ErrorCode.WRITE_ACCESS_DENIED)
current = obj._properties.get(request.property_identifier)
if not isinstance(current, list):
logger.warning(
"remove_list_element: property %s on %s is not a list",
request.property_identifier.name,
obj_id,
)
raise BACnetError(ErrorClass.PROPERTY, ErrorCode.PROPERTY_IS_NOT_A_LIST)
elements_to_remove = decode_all_application_values(request.list_of_elements)
for elem in elements_to_remove:
with contextlib.suppress(ValueError):
current.remove(elem)
return None
# --- Alarm / event handlers ---
[docs]
async def handle_acknowledge_alarm(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes | None:
"""Handle AcknowledgeAlarm-Request per Clause 13.5.
Updates the acked_transitions property on the target object
to mark the appropriate transition as acknowledged.
:returns: ``None`` (SimpleACK response).
:raises BACnetError: If the target object does not exist.
"""
request = AcknowledgeAlarmRequest.decode(data)
logger.debug(
"handling acknowledge_alarm %s state=%s from %s",
request.event_object_identifier,
request.event_state_acknowledged.name,
source,
)
obj = self._db.get(request.event_object_identifier)
if obj is None:
logger.warning(
"acknowledge_alarm: unknown object %s from %s",
request.event_object_identifier,
source,
)
raise BACnetError(ErrorClass.OBJECT, ErrorCode.UNKNOWN_OBJECT)
acked_transitions = obj._properties.get(PropertyIdentifier.ACKED_TRANSITIONS)
if isinstance(acked_transitions, list) and len(acked_transitions) >= 3:
if request.event_state_acknowledged == EventState.FAULT:
idx = 1
elif request.event_state_acknowledged == EventState.NORMAL:
idx = 2
else:
idx = 0
acked_transitions[idx] = True
return None
[docs]
async def handle_confirmed_event_notification(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes | None:
"""Handle ConfirmedEventNotification-Request per Clause 13.8.
Logs the notification at debug level.
:returns: ``None`` (SimpleACK response).
"""
request = EventNotificationRequest.decode(data)
logger.debug(
"ConfirmedEventNotification from %s: object=%s, toState=%s",
source,
request.event_object_identifier,
request.to_state,
)
return None
[docs]
async def handle_unconfirmed_event_notification(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> None:
"""Handle UnconfirmedEventNotification-Request per Clause 13.9.
Logs the notification at debug level.
"""
request = EventNotificationRequest.decode(data)
logger.debug(
"UnconfirmedEventNotification from %s: object=%s, toState=%s",
source,
request.event_object_identifier,
request.to_state,
)
[docs]
async def handle_get_alarm_summary(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes:
"""Handle GetAlarmSummary-Request per Clause 13.6.
Scans all objects in the database for those in an alarm
(non-NORMAL) event state and returns their summaries.
:returns: Encoded GetAlarmSummary-ACK service data.
"""
GetAlarmSummaryRequest.decode(data)
logger.debug("handling get_alarm_summary from %s", source)
summaries: list[AlarmSummary] = []
for obj in self._db.values():
event_state_val = obj._properties.get(PropertyIdentifier.EVENT_STATE)
if event_state_val is None:
continue
if not isinstance(event_state_val, EventState):
try:
event_state_val = EventState(event_state_val)
except (ValueError, TypeError):
continue
if event_state_val == EventState.NORMAL:
continue
acked_raw = obj._properties.get(PropertyIdentifier.ACKED_TRANSITIONS)
if isinstance(acked_raw, list) and len(acked_raw) >= 3:
b0 = bool(acked_raw[0])
b1 = bool(acked_raw[1])
b2 = bool(acked_raw[2])
acked_bits = BitString(
bytes([(int(b0) << 7) | (int(b1) << 6) | (int(b2) << 5)]),
5,
)
elif isinstance(acked_raw, BitString):
acked_bits = acked_raw
else:
acked_bits = BitString(b"\xe0", 5)
summaries.append(
AlarmSummary(
object_identifier=obj.object_identifier,
alarm_state=event_state_val,
acknowledged_transitions=acked_bits,
)
)
return GetAlarmSummaryACK(list_of_alarm_summaries=summaries).encode()
[docs]
async def handle_get_enrollment_summary(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes:
"""Handle GetEnrollmentSummary-Request per Clause 13.7.
Iterates EventEnrollment objects, applies the request filters,
and returns matching enrollment summaries.
:returns: Encoded GetEnrollmentSummary-ACK service data.
"""
request = GetEnrollmentSummaryRequest.decode(data)
logger.debug("handling get_enrollment_summary from %s", source)
summaries: list[EnrollmentSummary] = []
for obj in self._db.get_objects_of_type(ObjectType.EVENT_ENROLLMENT):
event_type_val = obj._properties.get(PropertyIdentifier.EVENT_TYPE)
if event_type_val is not None and not isinstance(event_type_val, EventType):
try:
event_type_val = EventType(event_type_val)
except (ValueError, TypeError):
continue
if event_type_val is None:
event_type_val = EventType.CHANGE_OF_VALUE
event_state_val = obj._properties.get(PropertyIdentifier.EVENT_STATE)
if event_state_val is not None and not isinstance(event_state_val, EventState):
try:
event_state_val = EventState(event_state_val)
except (ValueError, TypeError):
continue
if event_state_val is None:
event_state_val = EventState.NORMAL
notification_class: int = obj._properties.get(PropertyIdentifier.NOTIFICATION_CLASS, 0)
# Apply request filters
if (
request.event_state_filter is not None
and event_state_val != request.event_state_filter
):
continue
if (
request.event_type_filter is not None
and event_type_val != request.event_type_filter
):
continue
if (
request.notification_class_filter is not None
and notification_class != request.notification_class_filter
):
continue
# Acknowledgment filter
if request.acknowledgment_filter == AcknowledgmentFilter.ACKED:
acked_raw = obj._properties.get(PropertyIdentifier.ACKED_TRANSITIONS)
if isinstance(acked_raw, list) and len(acked_raw) >= 3 and not all(acked_raw):
continue
elif request.acknowledgment_filter == AcknowledgmentFilter.NOT_ACKED:
acked_raw = obj._properties.get(PropertyIdentifier.ACKED_TRANSITIONS)
if isinstance(acked_raw, list) and len(acked_raw) >= 3 and all(acked_raw):
continue
summaries.append(
EnrollmentSummary(
object_identifier=obj.object_identifier,
event_type=event_type_val,
event_state=event_state_val,
priority=0,
notification_class=notification_class,
)
)
return GetEnrollmentSummaryACK(
list_of_enrollment_summaries=summaries,
).encode()
# --- Discovery handlers ---
[docs]
async def handle_who_has(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> None:
"""Handle Who-Has-Request per Clause 16.9.
Searches the local object database for a matching object
and responds with I-Have if found.
"""
request = WhoHasRequest.decode(data)
logger.debug(
"handling who_has id=%s name=%s from %s",
request.object_identifier,
request.object_name,
source,
)
instance = self._device.object_identifier.instance_number
# Check if we are in the requested device instance range
if (
request.low_limit is not None
and request.high_limit is not None
and not (request.low_limit <= instance <= request.high_limit)
):
return
found_obj = None
if request.object_identifier is not None:
found_obj = self._db.get(request.object_identifier)
elif request.object_name is not None:
for obj_id in self._db.object_list:
obj = self._db.get(obj_id)
if obj is not None:
try:
name = obj.read_property(PropertyIdentifier.OBJECT_NAME)
if name == request.object_name:
found_obj = obj
break
except BACnetError:
continue
if found_obj is None:
return
# Send I-Have response (per Clause 16.9.2, always broadcast)
obj_name = found_obj.read_property(PropertyIdentifier.OBJECT_NAME)
ihave = IHaveRequest(
device_identifier=ObjectIdentifier(ObjectType.DEVICE, instance),
object_identifier=found_obj.object_identifier,
object_name=obj_name,
)
self._app.unconfirmed_request(
destination=GLOBAL_BROADCAST,
service_choice=UnconfirmedServiceChoice.I_HAVE,
service_data=ihave.encode(),
)
# --- Text message handlers ---
[docs]
async def handle_confirmed_text_message(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes | None:
"""Handle ConfirmedTextMessage-Request per Clause 16.5.
Logs the message and invokes the text message callback if set.
:returns: ``None`` (SimpleACK response).
"""
request = ConfirmedTextMessageRequest.decode(data)
logger.info(
"ConfirmedTextMessage from %s (device %s): priority=%s, message='%s'",
source,
request.text_message_source_device,
request.message_priority.name,
request.message,
)
callback = getattr(self._app, "_text_message_callback", None)
if callback is not None:
callback(request, source)
return None
[docs]
async def handle_unconfirmed_text_message(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> None:
"""Handle UnconfirmedTextMessage-Request per Clause 16.6.
Logs the message and invokes the text message callback if set.
"""
request = UnconfirmedTextMessageRequest.decode(data)
logger.info(
"UnconfirmedTextMessage from %s (device %s): priority=%s, message='%s'",
source,
request.text_message_source_device,
request.message_priority.name,
request.message,
)
callback = getattr(self._app, "_text_message_callback", None)
if callback is not None:
callback(request, source)
# --- WriteGroup handler ---
[docs]
async def handle_write_group(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> None:
"""Handle WriteGroup-Request per Clause 15.11.
Looks up Channel objects by group number and writes values.
"""
request = WriteGroupRequest.decode(data)
logger.debug(
"WriteGroup from %s: group=%d, priority=%d, %d channels",
source,
request.group_number,
request.write_priority,
len(request.change_list),
)
# Look up Channel objects whose Control_Groups includes this group number
for obj in self._db.get_objects_of_type(ObjectType.CHANNEL):
control_groups = obj._properties.get(PropertyIdentifier.CONTROL_GROUPS, [])
if request.group_number not in control_groups:
continue
channel_number = obj._properties.get(PropertyIdentifier.CHANNEL_NUMBER)
if channel_number is None:
continue
for gcv in request.change_list:
if gcv.channel == channel_number:
try:
write_value = decode_and_unwrap(gcv.value)
priority = gcv.overriding_priority or request.write_priority
await obj.async_write_property(
PropertyIdentifier.PRESENT_VALUE,
write_value,
priority,
)
except (ValueError, BACnetError):
logger.debug("WriteGroup: failed to write channel %d", gcv.channel)
# --- Device discovery handlers (new in 2020) ---
[docs]
async def handle_who_am_i(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> None:
"""Handle Who-Am-I-Request per Clause 16.11.
Logs the request. A supervisor application should register a
callback to handle device identity assignment.
"""
request = WhoAmIRequest.decode(data)
logger.info(
"Who-Am-I from %s: vendor=%d, model='%s', serial='%s'",
source,
request.vendor_id,
request.model_name,
request.serial_number,
)
callback = getattr(self._app, "_who_am_i_callback", None)
if callback is not None:
callback(request, source)
[docs]
async def handle_you_are(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> None:
"""Handle You-Are-Request per Clause 16.11.
Applies the assigned device identity if the device is unconfigured.
"""
request = YouAreRequest.decode(data)
logger.info(
"You-Are from %s: device=%s, mac=%s, network=%s",
source,
request.device_identifier,
request.device_mac_address.hex(),
request.device_network_number,
)
callback = getattr(self._app, "_you_are_callback", None)
if callback is not None:
callback(request, source)
# --- Virtual terminal handlers ---
[docs]
async def handle_vt_open(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes:
"""Handle VT-Open-Request per Clause 17.1.
:returns: Encoded VT-Open-ACK with remote session ID.
:raises BACnetError: If no VT sessions are available or the
VT class is not supported.
"""
request = VTOpenRequest.decode(data)
logger.info(
"VT-Open from %s: class=%s, localSession=%d",
source,
request.vt_class.name,
request.local_vt_session_identifier,
)
# Check VT class support
vt_classes = self._device._properties.get(PropertyIdentifier.VT_CLASSES_SUPPORTED, [])
if vt_classes and request.vt_class not in vt_classes:
logger.warning("vt_open: unknown VT class %s from %s", request.vt_class.name, source)
raise BACnetError(ErrorClass.VT, ErrorCode.UNKNOWN_VT_CLASS)
# Allocate a session — use a simple counter on the app
sessions: dict[int, dict[str, Any]] = getattr(self._app, "_vt_sessions", {})
max_vt_sessions = 64
if len(sessions) >= max_vt_sessions:
logger.warning("vt_open: session limit (%d) reached from %s", max_vt_sessions, source)
raise BACnetError(ErrorClass.RESOURCES, ErrorCode.NO_VT_SESSIONS_AVAILABLE)
session_counter = getattr(self._app, "_vt_session_counter", 0) + 1
self._app._vt_session_counter = session_counter # type: ignore[attr-defined]
# Store session mapping
sessions[session_counter] = {
"source": source,
"vt_class": request.vt_class,
"remote_session": request.local_vt_session_identifier,
}
self._app._vt_sessions = sessions # type: ignore[attr-defined]
ack = VTOpenACK(remote_vt_session_identifier=session_counter)
return ack.encode()
[docs]
async def handle_vt_close(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes | None:
"""Handle VT-Close-Request per Clause 17.2.
:returns: ``None`` (SimpleACK response).
:raises BACnetError: If a session ID is unknown.
"""
request = VTCloseRequest.decode(data)
sessions = getattr(self._app, "_vt_sessions", {})
for session_id in request.list_of_remote_vt_session_identifiers:
if session_id not in sessions:
logger.warning("vt_close: unknown VT session %s from %s", session_id, source)
raise BACnetError(ErrorClass.VT, ErrorCode.UNKNOWN_VT_SESSION)
if sessions[session_id]["source"] != source:
logger.warning("vt_close: session %s not owned by %s", session_id, source)
raise BACnetError(ErrorClass.VT, ErrorCode.UNKNOWN_VT_SESSION)
del sessions[session_id]
logger.info(
"VT-Close from %s: sessions=%s",
source,
request.list_of_remote_vt_session_identifiers,
)
return None
[docs]
async def handle_vt_data(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes:
"""Handle VT-Data-Request per Clause 17.3.
:returns: Encoded VT-Data-ACK.
:raises BACnetError: If the session is unknown.
"""
request = VTDataRequest.decode(data)
sessions = getattr(self._app, "_vt_sessions", {})
session = sessions.get(request.vt_session_identifier)
if session is None:
logger.warning(
"vt_data: unknown VT session %d from %s",
request.vt_session_identifier,
source,
)
raise BACnetError(ErrorClass.VT, ErrorCode.UNKNOWN_VT_SESSION)
if session["source"] != source:
logger.warning(
"vt_data: session %d not owned by %s",
request.vt_session_identifier,
source,
)
raise BACnetError(ErrorClass.VT, ErrorCode.UNKNOWN_VT_SESSION)
logger.debug(
"VT-Data from %s: session=%d, %d bytes, flag=%s",
source,
request.vt_session_identifier,
len(request.vt_new_data),
request.vt_data_flag,
)
ack = VTDataACK(all_new_data_accepted=True)
return ack.encode()
# --- Audit handlers (Clause 13.19-13.21) ---
[docs]
async def handle_audit_log_query(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes:
"""Handle AuditLogQuery-Request per Clause 13.19.
Queries the specified Audit Log object's buffer.
:returns: Encoded AuditLogQuery-ACK.
:raises BACnetError: If the audit log object is not found.
"""
from bac_py.objects.audit_log import AuditLogObject
request = AuditLogQueryRequest.decode(data)
logger.debug("handling audit_log_query %s from %s", request.audit_log, source)
obj = self._db.get(request.audit_log)
if obj is None or not isinstance(obj, AuditLogObject):
logger.warning(
"audit_log_query: unknown or invalid audit log %s from %s",
request.audit_log,
source,
)
raise BACnetError(ErrorClass.OBJECT, ErrorCode.UNKNOWN_OBJECT)
records, no_more = obj.query_records(
start_at=request.start_at_sequence_number,
count=request.requested_count,
)
ack = AuditLogQueryACK(
audit_log=request.audit_log,
records=records,
no_more_items=no_more,
)
return ack.encode()
[docs]
async def handle_confirmed_audit_notification(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> bytes | None:
"""Handle ConfirmedAuditNotification-Request per Clause 13.20.
Appends notifications to Audit Log objects.
:returns: ``None`` (SimpleACK response).
"""
from bac_py.objects.audit_log import AuditLogObject
request = ConfirmedAuditNotificationRequest.decode(data)
for notification in request.notifications:
for obj in self._db.get_objects_of_type(ObjectType.AUDIT_LOG):
if isinstance(obj, AuditLogObject):
obj.append_record(notification)
logger.debug(
"ConfirmedAuditNotification from %s: %d notifications",
source,
len(request.notifications),
)
return None
[docs]
async def handle_unconfirmed_audit_notification(
self,
service_choice: int,
data: bytes,
source: BACnetAddress,
) -> None:
"""Handle UnconfirmedAuditNotification-Request per Clause 13.21.
Appends notifications to Audit Log objects.
"""
from bac_py.objects.audit_log import AuditLogObject
request = UnconfirmedAuditNotificationRequest.decode(data)
for notification in request.notifications:
for obj in self._db.get_objects_of_type(ObjectType.AUDIT_LOG):
if isinstance(obj, AuditLogObject):
obj.append_record(notification)
logger.debug(
"UnconfirmedAuditNotification from %s: %d notifications",
source,
len(request.notifications),
)