Source code for bac_py.services.write_property_multiple
"""WritePropertyMultiple service per ASHRAE 135-2016 Clause 15.10."""
from __future__ import annotations
from dataclasses import dataclass
from bac_py.encoding.primitives import (
decode_object_identifier,
encode_context_object_id,
)
from bac_py.encoding.tags import as_memoryview, decode_tag, encode_closing_tag, encode_opening_tag
from bac_py.services.common import BACnetPropertyValue
from bac_py.types.enums import ObjectType
from bac_py.types.primitives import ObjectIdentifier
_MAX_DECODED_ITEMS = 10_000
[docs]
@dataclass(frozen=True, slots=True)
class WriteAccessSpecification:
"""BACnetWriteAccessSpecification (Clause 21).
::
WriteAccessSpecification ::= SEQUENCE {
objectIdentifier [0] BACnetObjectIdentifier,
listOfProperties [1] SEQUENCE OF BACnetPropertyValue
}
"""
object_identifier: ObjectIdentifier
list_of_properties: list[BACnetPropertyValue]
[docs]
def encode(self) -> bytes:
"""Encode this write access specification as context-tagged bytes.
:returns: Encoded write access specification bytes.
"""
buf = bytearray()
# [0] object-identifier
buf.extend(encode_context_object_id(0, self.object_identifier))
# [1] SEQUENCE OF BACnetPropertyValue
buf.extend(encode_opening_tag(1))
for pv in self.list_of_properties:
buf.extend(pv.encode())
buf.extend(encode_closing_tag(1))
return bytes(buf)
[docs]
@classmethod
def decode(cls, data: memoryview | bytes, offset: int) -> tuple[WriteAccessSpecification, int]:
"""Decode a write access specification from a buffer at the given offset.
:param data: Raw bytes containing encoded write access specification data.
:param offset: Byte offset to start decoding from.
:returns: Tuple of (:class:`WriteAccessSpecification`, new offset).
"""
data = as_memoryview(data)
# [0] object-identifier
tag, offset = decode_tag(data, offset)
obj_type, instance = decode_object_identifier(data[offset : offset + tag.length])
offset += tag.length
object_identifier = ObjectIdentifier(ObjectType(obj_type), instance)
# [1] opening tag
_opening, offset = decode_tag(data, offset)
# Decode property values until closing tag 1
props: list[BACnetPropertyValue] = []
while offset < len(data):
tag_peek, next_offset = decode_tag(data, offset)
if tag_peek.is_closing and tag_peek.number == 1:
offset = next_offset
break
pv, offset = BACnetPropertyValue.decode_from(data, offset)
props.append(pv)
if len(props) >= _MAX_DECODED_ITEMS:
msg = f"Decoded item count exceeds limit ({_MAX_DECODED_ITEMS})"
raise ValueError(msg)
return cls(
object_identifier=object_identifier,
list_of_properties=props,
), offset
[docs]
@dataclass(frozen=True, slots=True)
class WritePropertyMultipleRequest:
"""WritePropertyMultiple-Request service parameters (Clause 15.10.1.1).
::
WritePropertyMultiple-Request ::= SEQUENCE {
listOfWriteAccessSpecs SEQUENCE OF WriteAccessSpecification
}
"""
list_of_write_access_specs: list[WriteAccessSpecification]
[docs]
def encode(self) -> bytes:
"""Encode WritePropertyMultiple-Request service parameters.
:returns: Encoded service request bytes.
"""
buf = bytearray()
for spec in self.list_of_write_access_specs:
buf.extend(spec.encode())
return bytes(buf)
[docs]
@classmethod
def decode(cls, data: memoryview | bytes) -> WritePropertyMultipleRequest:
"""Decode WritePropertyMultiple-Request from service request bytes.
:param data: Raw service request bytes.
:returns: Decoded :class:`WritePropertyMultipleRequest`.
"""
data = as_memoryview(data)
offset = 0
specs: list[WriteAccessSpecification] = []
while offset < len(data):
spec, offset = WriteAccessSpecification.decode(data, offset)
specs.append(spec)
if len(specs) >= _MAX_DECODED_ITEMS:
msg = f"Decoded item count exceeds limit ({_MAX_DECODED_ITEMS})"
raise ValueError(msg)
return cls(list_of_write_access_specs=specs)