Source code for bac_py.services.write_property

"""WriteProperty service per ASHRAE 135-2016 Clause 15.9."""

from __future__ import annotations

from dataclasses import dataclass

from bac_py.encoding.primitives import (
    decode_object_identifier,
    decode_unsigned,
    encode_context_object_id,
    encode_context_tagged,
    encode_unsigned,
)
from bac_py.encoding.tags import (
    TagClass,
    as_memoryview,
    decode_optional_context,
    decode_tag,
    encode_closing_tag,
    encode_opening_tag,
    extract_context_value,
)
from bac_py.services.errors import BACnetRejectError
from bac_py.types.enums import ObjectType, PropertyIdentifier, RejectReason
from bac_py.types.primitives import ObjectIdentifier


[docs] @dataclass(frozen=True, slots=True) class WritePropertyRequest: """WriteProperty-Request service parameters (Clause 15.9.1.1). :: WriteProperty-Request ::= SEQUENCE { objectIdentifier [0] BACnetObjectIdentifier, propertyIdentifier [1] BACnetPropertyIdentifier, propertyArrayIndex [2] Unsigned OPTIONAL, propertyValue [3] ABSTRACT-SYNTAX.&TYPE, priority [4] Unsigned (1..16) OPTIONAL } The property_value field contains raw application-tagged encoded bytes. The application layer is responsible for encoding the value appropriately for the target property type. """ object_identifier: ObjectIdentifier property_identifier: PropertyIdentifier property_value: bytes property_array_index: int | None = None priority: int | None = None
[docs] def encode(self) -> bytes: """Encode WriteProperty-Request service parameters. :returns: Encoded service request bytes. """ buf = bytearray() # [0] object-identifier buf.extend(encode_context_object_id(0, self.object_identifier)) # [1] property-identifier buf.extend(encode_context_tagged(1, encode_unsigned(self.property_identifier))) # [2] property-array-index (optional) if self.property_array_index is not None: buf.extend(encode_context_tagged(2, encode_unsigned(self.property_array_index))) # [3] property-value (opening tag 3, data, closing tag 3) buf.extend(encode_opening_tag(3)) buf.extend(self.property_value) buf.extend(encode_closing_tag(3)) # [4] priority (optional) if self.priority is not None: buf.extend(encode_context_tagged(4, encode_unsigned(self.priority))) return bytes(buf)
[docs] @classmethod def decode(cls, data: memoryview | bytes) -> WritePropertyRequest: """Decode WriteProperty-Request from service request bytes. :param data: Raw service request bytes. :returns: Decoded :class:`WritePropertyRequest`. :raises BACnetRejectError: If *priority* is outside the 1--16 range (per Clause 15.9.1.1.5). """ data = as_memoryview(data) offset = 0 # [0] object-identifier tag, offset = decode_tag(data, offset) obj_type, instance = decode_object_identifier(data[offset : offset + tag.length]) offset += tag.length object_identifier = ObjectIdentifier(ObjectType(obj_type), instance) # [1] property-identifier tag, offset = decode_tag(data, offset) property_identifier = PropertyIdentifier( decode_unsigned(data[offset : offset + tag.length]) ) offset += tag.length # [2] property-array-index (optional) or [3] opening tag property_array_index = None tag, offset = decode_tag(data, offset) if tag.cls == TagClass.CONTEXT and tag.number == 2 and not tag.is_opening: property_array_index = decode_unsigned(data[offset : offset + tag.length]) offset += tag.length tag, offset = decode_tag(data, offset) # [3] property-value: opening tag 3 ... closing tag 3 property_value, offset = extract_context_value(data, offset, 3) # [4] priority (optional, 1-16 per Clause 15.9.1.1.5) priority, offset = decode_optional_context(data, offset, 4, decode_unsigned) if priority is not None and not (1 <= priority <= 16): raise BACnetRejectError(RejectReason.PARAMETER_OUT_OF_RANGE) return cls( object_identifier=object_identifier, property_identifier=property_identifier, property_value=property_value, property_array_index=property_array_index, priority=priority, )