Source code for bac_py.services.object_mgmt

"""Object management services per ASHRAE 135-2016 Clause 15.3-15.4.

CreateObject (Clause 15.3), DeleteObject (Clause 15.4).
"""

from __future__ import annotations

from dataclasses import dataclass

from bac_py.encoding.primitives import (
    decode_object_identifier,
    decode_unsigned,
    encode_application_object_id,
    encode_context_object_id,
    encode_context_tagged,
    encode_enumerated,
)
from bac_py.encoding.tags import (
    TagClass,
    as_memoryview,
    decode_tag,
    encode_closing_tag,
    encode_opening_tag,
)
from bac_py.services.common import BACnetPropertyValue
from bac_py.types.enums import ObjectType
from bac_py.types.primitives import ObjectIdentifier

_MAX_DECODED_ITEMS = 10_000


[docs] @dataclass(frozen=True, slots=True) class CreateObjectRequest: """CreateObject-Request (Clause 15.3.1.1). :: CreateObject-Request ::= SEQUENCE { objectSpecifier [0] CHOICE { objectType [0] BACnetObjectType, objectIdentifier [1] BACnetObjectIdentifier }, listOfInitialValues [1] SEQUENCE OF BACnetPropertyValue OPTIONAL } """ object_type: ObjectType | None = None object_identifier: ObjectIdentifier | None = None list_of_initial_values: list[BACnetPropertyValue] | None = None
[docs] def encode(self) -> bytes: """Encode CreateObject-Request service parameters. :returns: Encoded service request bytes. """ buf = bytearray() # [0] objectSpecifier buf.extend(encode_opening_tag(0)) if self.object_identifier is not None: buf.extend(encode_context_object_id(1, self.object_identifier)) elif self.object_type is not None: buf.extend(encode_context_tagged(0, encode_enumerated(self.object_type))) buf.extend(encode_closing_tag(0)) # [1] listOfInitialValues (optional) if self.list_of_initial_values: buf.extend(encode_opening_tag(1)) for pv in self.list_of_initial_values: buf.extend(pv.encode()) buf.extend(encode_closing_tag(1)) return bytes(buf)
[docs] @classmethod def decode(cls, data: memoryview | bytes) -> CreateObjectRequest: """Decode CreateObject-Request from service request bytes. :param data: Raw service request bytes. :returns: Decoded :class:`CreateObjectRequest`. :raises ValueError: If the objectSpecifier CHOICE tag is unrecognized. """ data = as_memoryview(data) offset = 0 # [0] objectSpecifier (opening tag 0) opening, offset = decode_tag(data, offset) if not opening.is_opening or opening.number != 0: msg = f"Expected opening tag 0 for objectSpecifier, got tag {opening.number}" raise ValueError(msg) object_type: ObjectType | None = None object_identifier: ObjectIdentifier | None = None tag, new_offset = decode_tag(data, offset) if tag.cls == TagClass.CONTEXT and tag.number == 0: # objectType object_type = ObjectType(decode_unsigned(data[new_offset : new_offset + tag.length])) offset = new_offset + tag.length elif tag.cls == TagClass.CONTEXT and tag.number == 1: # objectIdentifier obj_type, instance = decode_object_identifier( data[new_offset : new_offset + tag.length] ) object_identifier = ObjectIdentifier(ObjectType(obj_type), instance) offset = new_offset + tag.length else: msg = f"Unexpected tag {tag.number} in CreateObject objectSpecifier CHOICE" raise ValueError(msg) # closing tag 0 _closing, offset = decode_tag(data, offset) # [1] listOfInitialValues (optional) list_of_initial_values: list[BACnetPropertyValue] | None = None if offset < len(data): tag, new_offset = decode_tag(data, offset) if tag.is_opening and tag.number == 1: offset = new_offset list_of_initial_values = [] while offset < len(data): tag, new_offset = decode_tag(data, offset) if tag.is_closing and tag.number == 1: offset = new_offset break pv, offset = BACnetPropertyValue.decode_from(data, offset) list_of_initial_values.append(pv) if len(list_of_initial_values) >= _MAX_DECODED_ITEMS: msg = f"Decoded item count exceeds limit ({_MAX_DECODED_ITEMS})" raise ValueError(msg) return cls( object_type=object_type, object_identifier=object_identifier, list_of_initial_values=list_of_initial_values, )
[docs] @dataclass(frozen=True, slots=True) class DeleteObjectRequest: """DeleteObject-Request (Clause 15.4.1.1). :: DeleteObject-Request ::= SEQUENCE { objectIdentifier BACnetObjectIdentifier } The objectIdentifier is APPLICATION-tagged. """ object_identifier: ObjectIdentifier
[docs] def encode(self) -> bytes: """Encode DeleteObject-Request service parameters. :returns: Encoded service request bytes. """ return encode_application_object_id( self.object_identifier.object_type, self.object_identifier.instance_number, )
[docs] @classmethod def decode(cls, data: memoryview | bytes) -> DeleteObjectRequest: """Decode DeleteObject-Request from service request bytes. :param data: Raw service request bytes. :returns: Decoded :class:`DeleteObjectRequest`. """ data = as_memoryview(data) offset = 0 tag, offset = decode_tag(data, offset) obj_type, instance = decode_object_identifier(data[offset : offset + tag.length]) return cls(object_identifier=ObjectIdentifier(ObjectType(obj_type), instance))