Source code for bac_py.services.cov

"""COV (Change of Value) services per ASHRAE 135-2016 Clause 13.1/13.14."""

from __future__ import annotations

from dataclasses import dataclass

from bac_py.encoding.primitives import (
    decode_boolean,
    decode_object_identifier,
    decode_real,
    decode_unsigned,
    encode_boolean,
    encode_context_object_id,
    encode_context_real,
    encode_context_tagged,
    encode_real,
    encode_unsigned,
)
from bac_py.encoding.tags import (
    TagClass,
    as_memoryview,
    decode_optional_context,
    decode_tag,
    encode_closing_tag,
    encode_opening_tag,
)
from bac_py.services.common import BACnetPropertyValue
from bac_py.types.constructed import BACnetTimeStamp
from bac_py.types.enums import ObjectType
from bac_py.types.primitives import ObjectIdentifier

_MAX_DECODED_ITEMS = 10_000
_MAX_NESTING_DEPTH = 32

# Re-export for backward compatibility
__all__ = [
    "BACnetPropertyReference",
    "BACnetPropertyValue",
    "COVNotificationMultipleRequest",
    "COVNotificationRequest",
    "COVObjectNotification",
    "COVPropertyValue",
    "COVReference",
    "COVSubscriptionSpecification",
    "SubscribeCOVPropertyMultipleRequest",
    "SubscribeCOVPropertyRequest",
    "SubscribeCOVRequest",
]


[docs] @dataclass(frozen=True, slots=True) class SubscribeCOVRequest: """SubscribeCOV-Request service parameters (Clause 13.14.1). :: SubscribeCOV-Request ::= SEQUENCE { subscriberProcessIdentifier [0] Unsigned32, monitoredObjectIdentifier [1] BACnetObjectIdentifier, issueConfirmedNotifications [2] BOOLEAN OPTIONAL, lifetime [3] Unsigned OPTIONAL } Per the spec, omitting both ``issueConfirmedNotifications`` and ``lifetime`` constitutes a subscription cancellation request. """ subscriber_process_identifier: int monitored_object_identifier: ObjectIdentifier issue_confirmed_notifications: bool | None = None lifetime: int | None = None @property def is_cancellation(self) -> bool: """Check whether this request is a subscription cancellation. :returns: ``True`` when both optional fields are ``None`` (cancellation per spec). """ return self.issue_confirmed_notifications is None and self.lifetime is None
[docs] def encode(self) -> bytes: """Encode SubscribeCOV-Request service parameters. :returns: Encoded service request bytes. """ buf = bytearray() # [0] subscriberProcessIdentifier buf.extend(encode_context_tagged(0, encode_unsigned(self.subscriber_process_identifier))) # [1] monitoredObjectIdentifier buf.extend(encode_context_object_id(1, self.monitored_object_identifier)) # [2] issueConfirmedNotifications (optional) if self.issue_confirmed_notifications is not None: buf.extend( encode_context_tagged(2, encode_boolean(self.issue_confirmed_notifications)) ) # [3] lifetime (optional) if self.lifetime is not None: buf.extend(encode_context_tagged(3, encode_unsigned(self.lifetime))) return bytes(buf)
[docs] @classmethod def decode(cls, data: memoryview | bytes) -> SubscribeCOVRequest: """Decode SubscribeCOV-Request from service request bytes. :param data: Raw service request bytes. :returns: Decoded :class:`SubscribeCOVRequest`. """ data = as_memoryview(data) offset = 0 # [0] subscriberProcessIdentifier tag, offset = decode_tag(data, offset) subscriber_process_identifier = decode_unsigned(data[offset : offset + tag.length]) offset += tag.length # [1] monitoredObjectIdentifier tag, offset = decode_tag(data, offset) obj_type, instance = decode_object_identifier(data[offset : offset + tag.length]) offset += tag.length monitored_object_identifier = ObjectIdentifier(ObjectType(obj_type), instance) # [2] issueConfirmedNotifications (optional) issue_confirmed_notifications, offset = decode_optional_context( data, offset, 2, decode_boolean ) # [3] lifetime (optional) lifetime, offset = decode_optional_context(data, offset, 3, decode_unsigned) return cls( subscriber_process_identifier=subscriber_process_identifier, monitored_object_identifier=monitored_object_identifier, issue_confirmed_notifications=issue_confirmed_notifications, lifetime=lifetime, )
[docs] @dataclass(frozen=True, slots=True) class COVNotificationRequest: """Confirmed/Unconfirmed COVNotification-Request per Clause 13.14.7/13.14.8. :: COVNotification-Request ::= SEQUENCE { subscriberProcessIdentifier [0] Unsigned32, initiatingDeviceIdentifier [1] BACnetObjectIdentifier, monitoredObjectIdentifier [2] BACnetObjectIdentifier, timeRemaining [3] Unsigned, listOfValues [4] SEQUENCE OF BACnetPropertyValue } The same encoding is used for both ConfirmedCOVNotification-Request (service choice 1) and UnconfirmedCOVNotification-Request (service choice 2). """ subscriber_process_identifier: int initiating_device_identifier: ObjectIdentifier monitored_object_identifier: ObjectIdentifier time_remaining: int list_of_values: list[BACnetPropertyValue]
[docs] def encode(self) -> bytes: """Encode COVNotification-Request service parameters. :returns: Encoded service request bytes. """ buf = bytearray() # [0] subscriberProcessIdentifier buf.extend(encode_context_tagged(0, encode_unsigned(self.subscriber_process_identifier))) # [1] initiatingDeviceIdentifier buf.extend(encode_context_object_id(1, self.initiating_device_identifier)) # [2] monitoredObjectIdentifier buf.extend(encode_context_object_id(2, self.monitored_object_identifier)) # [3] timeRemaining buf.extend(encode_context_tagged(3, encode_unsigned(self.time_remaining))) # [4] listOfValues (SEQUENCE OF BACnetPropertyValue) buf.extend(encode_opening_tag(4)) for pv in self.list_of_values: buf.extend(pv.encode()) buf.extend(encode_closing_tag(4)) return bytes(buf)
[docs] @classmethod def decode(cls, data: memoryview | bytes) -> COVNotificationRequest: """Decode COVNotification-Request from service request bytes. :param data: Raw service request bytes. :returns: Decoded :class:`COVNotificationRequest`. """ data = as_memoryview(data) offset = 0 # [0] subscriberProcessIdentifier tag, offset = decode_tag(data, offset) subscriber_process_identifier = decode_unsigned(data[offset : offset + tag.length]) offset += tag.length # [1] initiatingDeviceIdentifier tag, offset = decode_tag(data, offset) obj_type, instance = decode_object_identifier(data[offset : offset + tag.length]) offset += tag.length initiating_device_identifier = ObjectIdentifier(ObjectType(obj_type), instance) # [2] monitoredObjectIdentifier tag, offset = decode_tag(data, offset) obj_type, instance = decode_object_identifier(data[offset : offset + tag.length]) offset += tag.length monitored_object_identifier = ObjectIdentifier(ObjectType(obj_type), instance) # [3] timeRemaining tag, offset = decode_tag(data, offset) time_remaining = decode_unsigned(data[offset : offset + tag.length]) offset += tag.length # [4] listOfValues -- opening tag 4 tag, offset = decode_tag(data, offset) # tag should be opening tag 4 list_of_values: list[BACnetPropertyValue] = [] while offset < len(data): # Check for closing tag 4 tag, new_offset = decode_tag(data, offset) if tag.is_closing and tag.number == 4: offset = new_offset break # Decode next BACnetPropertyValue pv, offset = BACnetPropertyValue.decode_from(data, offset) list_of_values.append(pv) if len(list_of_values) >= _MAX_DECODED_ITEMS: msg = f"Decoded item count exceeds limit ({_MAX_DECODED_ITEMS})" raise ValueError(msg) return cls( subscriber_process_identifier=subscriber_process_identifier, initiating_device_identifier=initiating_device_identifier, monitored_object_identifier=monitored_object_identifier, time_remaining=time_remaining, list_of_values=list_of_values, )
[docs] @dataclass(frozen=True, slots=True) class BACnetPropertyReference: """BACnetPropertyReference -- property identifier with optional array index. :: BACnetPropertyReference ::= SEQUENCE { propertyIdentifier [0] BACnetPropertyIdentifier, propertyArrayIndex [1] Unsigned OPTIONAL } """ property_identifier: int property_array_index: int | None = None
[docs] def encode(self) -> bytes: """Encode BACnetPropertyReference as context-tagged fields. :returns: Encoded bytes for this property reference. """ buf = bytearray() # [0] propertyIdentifier buf.extend(encode_context_tagged(0, encode_unsigned(self.property_identifier))) # [1] propertyArrayIndex (optional) if self.property_array_index is not None: buf.extend(encode_context_tagged(1, encode_unsigned(self.property_array_index))) return bytes(buf)
[docs] @classmethod def decode( cls, data: memoryview | bytes, offset: int = 0 ) -> tuple[BACnetPropertyReference, int]: """Decode BACnetPropertyReference from data at a given offset. :param data: Raw bytes containing encoded property reference data. :param offset: Byte offset to start decoding from. :returns: Tuple of (decoded :class:`BACnetPropertyReference`, new offset). """ data = as_memoryview(data) # [0] propertyIdentifier tag, offset = decode_tag(data, offset) property_identifier = decode_unsigned(data[offset : offset + tag.length]) offset += tag.length # [1] propertyArrayIndex (optional) property_array_index: int | None = None if offset < len(data): tag, new_offset = decode_tag(data, offset) if ( tag.cls == TagClass.CONTEXT and tag.number == 1 and not tag.is_opening and not tag.is_closing ): property_array_index = decode_unsigned(data[new_offset : new_offset + tag.length]) offset = new_offset + tag.length return cls( property_identifier=property_identifier, property_array_index=property_array_index, ), offset
[docs] @dataclass(frozen=True, slots=True) class SubscribeCOVPropertyRequest: """SubscribeCOVProperty-Request service parameters (Clause 13.15.1). :: SubscribeCOVProperty-Request ::= SEQUENCE { subscriberProcessIdentifier [0] Unsigned32, monitoredObjectIdentifier [1] BACnetObjectIdentifier, issueConfirmedNotifications [2] BOOLEAN OPTIONAL, lifetime [3] Unsigned OPTIONAL, monitoredPropertyIdentifier [4] BACnetPropertyReference, covIncrement [5] REAL OPTIONAL } """ subscriber_process_identifier: int monitored_object_identifier: ObjectIdentifier monitored_property_identifier: BACnetPropertyReference issue_confirmed_notifications: bool | None = None lifetime: int | None = None cov_increment: float | None = None
[docs] def encode(self) -> bytes: """Encode SubscribeCOVProperty-Request service parameters. :returns: Encoded service request bytes. """ buf = bytearray() # [0] subscriberProcessIdentifier buf.extend(encode_context_tagged(0, encode_unsigned(self.subscriber_process_identifier))) # [1] monitoredObjectIdentifier buf.extend(encode_context_object_id(1, self.monitored_object_identifier)) # [2] issueConfirmedNotifications (optional) if self.issue_confirmed_notifications is not None: buf.extend( encode_context_tagged(2, encode_boolean(self.issue_confirmed_notifications)) ) # [3] lifetime (optional) if self.lifetime is not None: buf.extend(encode_context_tagged(3, encode_unsigned(self.lifetime))) # [4] monitoredPropertyIdentifier (constructed) buf.extend(encode_opening_tag(4)) buf.extend(self.monitored_property_identifier.encode()) buf.extend(encode_closing_tag(4)) # [5] covIncrement (optional) if self.cov_increment is not None: buf.extend(encode_context_tagged(5, encode_real(self.cov_increment))) return bytes(buf)
[docs] @classmethod def decode(cls, data: memoryview | bytes) -> SubscribeCOVPropertyRequest: """Decode SubscribeCOVProperty-Request from service request bytes. :param data: Raw service request bytes. :returns: Decoded :class:`SubscribeCOVPropertyRequest`. """ data = as_memoryview(data) offset = 0 # [0] subscriberProcessIdentifier tag, offset = decode_tag(data, offset) subscriber_process_identifier = decode_unsigned(data[offset : offset + tag.length]) offset += tag.length # [1] monitoredObjectIdentifier tag, offset = decode_tag(data, offset) obj_type, instance = decode_object_identifier(data[offset : offset + tag.length]) offset += tag.length monitored_object_identifier = ObjectIdentifier(ObjectType(obj_type), instance) # [2] issueConfirmedNotifications (optional) issue_confirmed_notifications, offset = decode_optional_context( data, offset, 2, decode_boolean ) # [3] lifetime (optional) lifetime, offset = decode_optional_context(data, offset, 3, decode_unsigned) # [4] monitoredPropertyIdentifier (constructed -- opening/closing tag 4) tag, offset = decode_tag(data, offset) # tag should be opening tag 4 monitored_property_identifier, offset = BACnetPropertyReference.decode(data, offset) # consume closing tag 4 tag, offset = decode_tag(data, offset) # [5] covIncrement (optional) cov_increment, offset = decode_optional_context(data, offset, 5, decode_real) return cls( subscriber_process_identifier=subscriber_process_identifier, monitored_object_identifier=monitored_object_identifier, monitored_property_identifier=monitored_property_identifier, issue_confirmed_notifications=issue_confirmed_notifications, lifetime=lifetime, cov_increment=cov_increment, )
[docs] @dataclass(frozen=True, slots=True) class COVReference: """A single COV reference within a COV subscription specification. :: SEQUENCE { monitoredProperty [0] BACnetPropertyReference, covIncrement [1] REAL OPTIONAL } """ monitored_property: BACnetPropertyReference cov_increment: float | None = None
[docs] def encode(self) -> bytes: """Encode COVReference. :returns: Encoded bytes for this COV reference. """ buf = bytearray() # [0] monitoredProperty (constructed) buf.extend(encode_opening_tag(0)) buf.extend(self.monitored_property.encode()) buf.extend(encode_closing_tag(0)) # [1] covIncrement (optional) if self.cov_increment is not None: buf.extend(encode_context_real(1, self.cov_increment)) return bytes(buf)
[docs] @classmethod def decode(cls, data: memoryview | bytes, offset: int = 0) -> tuple[COVReference, int]: """Decode COVReference from data at a given offset. :param data: Buffer to decode from. :param offset: Starting byte offset. :returns: Tuple of (decoded :class:`COVReference`, new offset). """ data = as_memoryview(data) # [0] monitoredProperty -- opening tag 0 tag, offset = decode_tag(data, offset) # tag should be opening tag 0 monitored_property, offset = BACnetPropertyReference.decode(data, offset) # consume closing tag 0 tag, offset = decode_tag(data, offset) # [1] covIncrement (optional) cov_increment: float | None = None if offset < len(data): tag, new_offset = decode_tag(data, offset) if ( tag.cls == TagClass.CONTEXT and tag.number == 1 and not tag.is_opening and not tag.is_closing ): cov_increment = decode_real(data[new_offset : new_offset + tag.length]) offset = new_offset + tag.length return cls( monitored_property=monitored_property, cov_increment=cov_increment, ), offset
[docs] @dataclass(frozen=True, slots=True) class COVSubscriptionSpecification: """A single subscription specification within SubscribeCOVPropertyMultiple. :: SEQUENCE { monitoredObjectIdentifier [0] BACnetObjectIdentifier, listOfCOVReferences [1] SEQUENCE OF COVReference } """ monitored_object_identifier: ObjectIdentifier list_of_cov_references: list[COVReference]
[docs] def encode(self) -> bytes: """Encode COVSubscriptionSpecification. :returns: Encoded bytes for this subscription specification. """ buf = bytearray() # [0] monitoredObjectIdentifier buf.extend(encode_context_object_id(0, self.monitored_object_identifier)) # [1] listOfCOVReferences (constructed) buf.extend(encode_opening_tag(1)) for ref in self.list_of_cov_references: buf.extend(ref.encode()) buf.extend(encode_closing_tag(1)) return bytes(buf)
[docs] @classmethod def decode( cls, data: memoryview | bytes, offset: int = 0 ) -> tuple[COVSubscriptionSpecification, int]: """Decode COVSubscriptionSpecification from data at a given offset. :param data: Buffer to decode from. :param offset: Starting byte offset. :returns: Tuple of (decoded :class:`COVSubscriptionSpecification`, new offset). """ data = as_memoryview(data) # [0] monitoredObjectIdentifier tag, offset = decode_tag(data, offset) obj_type, instance = decode_object_identifier(data[offset : offset + tag.length]) offset += tag.length monitored_object_identifier = ObjectIdentifier(ObjectType(obj_type), instance) # [1] listOfCOVReferences -- opening tag 1 tag, offset = decode_tag(data, offset) # tag should be opening tag 1 list_of_cov_references: list[COVReference] = [] while offset < len(data): # Check for closing tag 1 tag, new_offset = decode_tag(data, offset) if tag.is_closing and tag.number == 1: offset = new_offset break ref, offset = COVReference.decode(data, offset) list_of_cov_references.append(ref) if len(list_of_cov_references) >= _MAX_DECODED_ITEMS: msg = f"Decoded item count exceeds limit ({_MAX_DECODED_ITEMS})" raise ValueError(msg) return cls( monitored_object_identifier=monitored_object_identifier, list_of_cov_references=list_of_cov_references, ), offset
[docs] @dataclass(frozen=True, slots=True) class SubscribeCOVPropertyMultipleRequest: """SubscribeCOVPropertyMultiple-Request service parameters (Clause 13.16.1). :: SubscribeCOVPropertyMultiple-Request ::= SEQUENCE { subscriberProcessIdentifier [0] Unsigned32, issueConfirmedNotifications [1] BOOLEAN OPTIONAL, lifetime [2] Unsigned OPTIONAL, maxNotificationDelay [3] Unsigned OPTIONAL, listOfCOVSubscriptionSpecifications [4] SEQUENCE OF ... } """ subscriber_process_identifier: int list_of_cov_subscription_specifications: list[COVSubscriptionSpecification] issue_confirmed_notifications: bool | None = None lifetime: int | None = None max_notification_delay: int | None = None
[docs] def encode(self) -> bytes: """Encode SubscribeCOVPropertyMultiple-Request service parameters. :returns: Encoded service request bytes. """ buf = bytearray() # [0] subscriberProcessIdentifier buf.extend(encode_context_tagged(0, encode_unsigned(self.subscriber_process_identifier))) # [1] issueConfirmedNotifications (optional) if self.issue_confirmed_notifications is not None: buf.extend( encode_context_tagged(1, encode_boolean(self.issue_confirmed_notifications)) ) # [2] lifetime (optional) if self.lifetime is not None: buf.extend(encode_context_tagged(2, encode_unsigned(self.lifetime))) # [3] maxNotificationDelay (optional) if self.max_notification_delay is not None: buf.extend(encode_context_tagged(3, encode_unsigned(self.max_notification_delay))) # [4] listOfCOVSubscriptionSpecifications (constructed) buf.extend(encode_opening_tag(4)) for spec in self.list_of_cov_subscription_specifications: buf.extend(spec.encode()) buf.extend(encode_closing_tag(4)) return bytes(buf)
[docs] @classmethod def decode(cls, data: memoryview | bytes) -> SubscribeCOVPropertyMultipleRequest: """Decode SubscribeCOVPropertyMultiple-Request from service request bytes. :param data: Raw service request bytes. :returns: Decoded :class:`SubscribeCOVPropertyMultipleRequest`. """ data = as_memoryview(data) offset = 0 # [0] subscriberProcessIdentifier tag, offset = decode_tag(data, offset) subscriber_process_identifier = decode_unsigned(data[offset : offset + tag.length]) offset += tag.length # [1] issueConfirmedNotifications (optional) issue_confirmed_notifications, offset = decode_optional_context( data, offset, 1, decode_boolean ) # [2] lifetime (optional) lifetime, offset = decode_optional_context(data, offset, 2, decode_unsigned) # [3] maxNotificationDelay (optional) max_notification_delay, offset = decode_optional_context(data, offset, 3, decode_unsigned) # [4] listOfCOVSubscriptionSpecifications -- opening tag 4 tag, offset = decode_tag(data, offset) # tag should be opening tag 4 specs: list[COVSubscriptionSpecification] = [] while offset < len(data): # Check for closing tag 4 tag, new_offset = decode_tag(data, offset) if tag.is_closing and tag.number == 4: offset = new_offset break spec, offset = COVSubscriptionSpecification.decode(data, offset) specs.append(spec) if len(specs) >= _MAX_DECODED_ITEMS: msg = f"Decoded item count exceeds limit ({_MAX_DECODED_ITEMS})" raise ValueError(msg) return cls( subscriber_process_identifier=subscriber_process_identifier, issue_confirmed_notifications=issue_confirmed_notifications, lifetime=lifetime, max_notification_delay=max_notification_delay, list_of_cov_subscription_specifications=specs, )
[docs] @dataclass(frozen=True, slots=True) class COVPropertyValue: """A single property value within a COV notification. :: SEQUENCE { propertyIdentifier [0] BACnetPropertyIdentifier, arrayIndex [1] Unsigned OPTIONAL, value [2] ABSTRACT-SYNTAX.&Type, timeOfChange [3] BACnetTimeStamp OPTIONAL } """ property_identifier: int value: bytes array_index: int | None = None time_of_change: BACnetTimeStamp | None = None
[docs] def encode(self) -> bytes: """Encode COVPropertyValue. :returns: Encoded bytes for this property value. """ buf = bytearray() # [0] propertyIdentifier buf.extend(encode_context_tagged(0, encode_unsigned(self.property_identifier))) # [1] arrayIndex (optional) if self.array_index is not None: buf.extend(encode_context_tagged(1, encode_unsigned(self.array_index))) # [2] value (opening/closing tag with raw application-tagged content) buf.extend(encode_opening_tag(2)) buf.extend(self.value) buf.extend(encode_closing_tag(2)) # [3] timeOfChange (optional, constructed) if self.time_of_change is not None: buf.extend(encode_opening_tag(3)) buf.extend(self.time_of_change.encode()) buf.extend(encode_closing_tag(3)) return bytes(buf)
[docs] @classmethod def decode(cls, data: memoryview | bytes, offset: int = 0) -> tuple[COVPropertyValue, int]: """Decode COVPropertyValue from data at a given offset. :param data: Buffer to decode from. :param offset: Starting byte offset. :returns: Tuple of (decoded :class:`COVPropertyValue`, new offset). """ data = as_memoryview(data) # [0] propertyIdentifier tag, offset = decode_tag(data, offset) property_identifier = decode_unsigned(data[offset : offset + tag.length]) offset += tag.length # [1] arrayIndex (optional) array_index: int | None = None if offset < len(data): tag, new_offset = decode_tag(data, offset) if ( tag.cls == TagClass.CONTEXT and tag.number == 1 and not tag.is_opening and not tag.is_closing ): array_index = decode_unsigned(data[new_offset : new_offset + tag.length]) offset = new_offset + tag.length # [2] value -- opening tag 2, collect raw bytes until closing tag 2 tag, offset = decode_tag(data, offset) # tag should be opening tag 2 value_start = offset depth = 1 while depth > 0 and offset < len(data): inner_tag, inner_offset = decode_tag(data, offset) if inner_tag.is_opening: depth += 1 if depth > _MAX_NESTING_DEPTH: msg = f"Nesting depth exceeds {_MAX_NESTING_DEPTH}" raise ValueError(msg) offset = inner_offset elif inner_tag.is_closing: depth -= 1 if depth == 0: value_bytes = bytes(data[value_start:offset]) offset = inner_offset break offset = inner_offset else: offset = inner_offset + inner_tag.length else: value_bytes = bytes(data[value_start:offset]) # [3] timeOfChange (optional, constructed) time_of_change: BACnetTimeStamp | None = None if offset < len(data): tag, new_offset = decode_tag(data, offset) if tag.is_opening and tag.number == 3: time_of_change, offset = BACnetTimeStamp.decode(data, new_offset) # consume closing tag 3 tag, offset = decode_tag(data, offset) return cls( property_identifier=property_identifier, value=value_bytes, array_index=array_index, time_of_change=time_of_change, ), offset
[docs] @dataclass(frozen=True, slots=True) class COVObjectNotification: """A single object notification within COVNotificationMultiple. :: SEQUENCE { monitoredObjectIdentifier [0] BACnetObjectIdentifier, listOfValues [1] SEQUENCE OF COVPropertyValue } """ monitored_object_identifier: ObjectIdentifier list_of_values: list[COVPropertyValue]
[docs] def encode(self) -> bytes: """Encode COVObjectNotification. :returns: Encoded bytes for this object notification. """ buf = bytearray() # [0] monitoredObjectIdentifier buf.extend(encode_context_object_id(0, self.monitored_object_identifier)) # [1] listOfValues (constructed) buf.extend(encode_opening_tag(1)) for pv in self.list_of_values: buf.extend(pv.encode()) buf.extend(encode_closing_tag(1)) return bytes(buf)
[docs] @classmethod def decode( cls, data: memoryview | bytes, offset: int = 0 ) -> tuple[COVObjectNotification, int]: """Decode COVObjectNotification from data at a given offset. :param data: Buffer to decode from. :param offset: Starting byte offset. :returns: Tuple of (decoded :class:`COVObjectNotification`, new offset). """ data = as_memoryview(data) # [0] monitoredObjectIdentifier tag, offset = decode_tag(data, offset) obj_type, instance = decode_object_identifier(data[offset : offset + tag.length]) offset += tag.length monitored_object_identifier = ObjectIdentifier(ObjectType(obj_type), instance) # [1] listOfValues -- opening tag 1 tag, offset = decode_tag(data, offset) # tag should be opening tag 1 list_of_values: list[COVPropertyValue] = [] while offset < len(data): # Check for closing tag 1 tag, new_offset = decode_tag(data, offset) if tag.is_closing and tag.number == 1: offset = new_offset break pv, offset = COVPropertyValue.decode(data, offset) list_of_values.append(pv) if len(list_of_values) >= _MAX_DECODED_ITEMS: msg = f"Decoded item count exceeds limit ({_MAX_DECODED_ITEMS})" raise ValueError(msg) return cls( monitored_object_identifier=monitored_object_identifier, list_of_values=list_of_values, ), offset
[docs] @dataclass(frozen=True, slots=True) class COVNotificationMultipleRequest: """Confirmed/Unconfirmed COVNotification-Multiple-Request per Clause 13.17/13.18. :: COVNotification-Multiple-Request ::= SEQUENCE { subscriberProcessIdentifier [0] Unsigned32, initiatingDeviceIdentifier [1] BACnetObjectIdentifier, timeRemaining [2] Unsigned, timestamp [3] BACnetTimeStamp, listOfCOVNotifications [4] SEQUENCE OF COVObjectNotification } """ subscriber_process_identifier: int initiating_device_identifier: ObjectIdentifier time_remaining: int timestamp: BACnetTimeStamp list_of_cov_notifications: list[COVObjectNotification]
[docs] def encode(self) -> bytes: """Encode COVNotificationMultiple-Request service parameters. :returns: Encoded service request bytes. """ buf = bytearray() # [0] subscriberProcessIdentifier buf.extend(encode_context_tagged(0, encode_unsigned(self.subscriber_process_identifier))) # [1] initiatingDeviceIdentifier buf.extend(encode_context_object_id(1, self.initiating_device_identifier)) # [2] timeRemaining buf.extend(encode_context_tagged(2, encode_unsigned(self.time_remaining))) # [3] timestamp (constructed) buf.extend(encode_opening_tag(3)) buf.extend(self.timestamp.encode()) buf.extend(encode_closing_tag(3)) # [4] listOfCOVNotifications (constructed) buf.extend(encode_opening_tag(4)) for notification in self.list_of_cov_notifications: buf.extend(notification.encode()) buf.extend(encode_closing_tag(4)) return bytes(buf)
[docs] @classmethod def decode(cls, data: memoryview | bytes) -> COVNotificationMultipleRequest: """Decode COVNotificationMultiple-Request from service request bytes. :param data: Raw service request bytes. :returns: Decoded :class:`COVNotificationMultipleRequest`. """ data = as_memoryview(data) offset = 0 # [0] subscriberProcessIdentifier tag, offset = decode_tag(data, offset) subscriber_process_identifier = decode_unsigned(data[offset : offset + tag.length]) offset += tag.length # [1] initiatingDeviceIdentifier tag, offset = decode_tag(data, offset) obj_type, instance = decode_object_identifier(data[offset : offset + tag.length]) offset += tag.length initiating_device_identifier = ObjectIdentifier(ObjectType(obj_type), instance) # [2] timeRemaining tag, offset = decode_tag(data, offset) time_remaining = decode_unsigned(data[offset : offset + tag.length]) offset += tag.length # [3] timestamp (constructed -- opening/closing tag 3) tag, offset = decode_tag(data, offset) # tag should be opening tag 3 timestamp, offset = BACnetTimeStamp.decode(data, offset) # consume closing tag 3 tag, offset = decode_tag(data, offset) # [4] listOfCOVNotifications -- opening tag 4 tag, offset = decode_tag(data, offset) # tag should be opening tag 4 list_of_cov_notifications: list[COVObjectNotification] = [] while offset < len(data): # Check for closing tag 4 tag, new_offset = decode_tag(data, offset) if tag.is_closing and tag.number == 4: offset = new_offset break notification, offset = COVObjectNotification.decode(data, offset) list_of_cov_notifications.append(notification) if len(list_of_cov_notifications) >= _MAX_DECODED_ITEMS: msg = f"Decoded item count exceeds limit ({_MAX_DECODED_ITEMS})" raise ValueError(msg) return cls( subscriber_process_identifier=subscriber_process_identifier, initiating_device_identifier=initiating_device_identifier, time_remaining=time_remaining, timestamp=timestamp, list_of_cov_notifications=list_of_cov_notifications, )