Source code for bac_py.services.write_group

"""WriteGroup service per ASHRAE 135-2020 Clause 15.11.

WriteGroup is an unconfirmed service for writing channel values
via group addressing.
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Self

from bac_py.encoding.primitives import (
    decode_unsigned,
    encode_context_tagged,
    encode_unsigned,
)
from bac_py.encoding.tags import (
    TagClass,
    as_memoryview,
    decode_tag,
    encode_closing_tag,
    encode_opening_tag,
    extract_context_value,
)

_MAX_DECODED_ITEMS = 10_000


[docs] @dataclass(frozen=True, slots=True) class GroupChannelValue: """A single channel value in a WriteGroup change list. :: GroupChannelValue ::= SEQUENCE { channel [0] Unsigned16, overridingPriority [1] Unsigned (1..16) OPTIONAL, value ABSTRACT-SYNTAX.&TYPE } """ channel: int value: bytes overriding_priority: int | None = None
[docs] def encode(self) -> bytes: """Encode a single GroupChannelValue.""" buf = bytearray() # [0] channel buf.extend(encode_context_tagged(0, encode_unsigned(self.channel))) # [1] overridingPriority (optional) if self.overriding_priority is not None: buf.extend(encode_context_tagged(1, encode_unsigned(self.overriding_priority))) # value (opening/closing tag 2) buf.extend(encode_opening_tag(2)) buf.extend(self.value) buf.extend(encode_closing_tag(2)) return bytes(buf)
[docs] @classmethod def decode(cls, data: memoryview, offset: int) -> tuple[GroupChannelValue, int]: """Decode a single GroupChannelValue starting at offset. :returns: Tuple of (decoded value, new offset). """ # [0] channel tag, offset = decode_tag(data, offset) channel = decode_unsigned(data[offset : offset + tag.length]) offset += tag.length # [1] overridingPriority (optional) overriding_priority = None tag, new_offset = decode_tag(data, offset) if tag.cls == TagClass.CONTEXT and tag.number == 1 and not tag.is_opening: overriding_priority = decode_unsigned(data[new_offset : new_offset + tag.length]) offset = new_offset + tag.length tag, new_offset = decode_tag(data, offset) # value (opening/closing tag 2) value, offset = extract_context_value(data, new_offset, 2) return cls(channel=channel, value=value, overriding_priority=overriding_priority), offset
[docs] @dataclass(frozen=True, slots=True) class WriteGroupRequest: """WriteGroup-Request (Clause 15.11.1). :: WriteGroup-Request ::= SEQUENCE { groupNumber [0] Unsigned32, writePriority [1] Unsigned (1..16), changeList [2] SEQUENCE OF GroupChannelValue } """ group_number: int write_priority: int change_list: list[GroupChannelValue]
[docs] def encode(self) -> bytes: """Encode WriteGroup-Request service parameters.""" buf = bytearray() # [0] groupNumber buf.extend(encode_context_tagged(0, encode_unsigned(self.group_number))) # [1] writePriority buf.extend(encode_context_tagged(1, encode_unsigned(self.write_priority))) # [2] changeList (opening/closing) buf.extend(encode_opening_tag(2)) for gcv in self.change_list: buf.extend(gcv.encode()) buf.extend(encode_closing_tag(2)) return bytes(buf)
[docs] @classmethod def decode(cls, data: memoryview | bytes) -> Self: """Decode WriteGroup-Request from service request bytes.""" data = as_memoryview(data) offset = 0 # [0] groupNumber tag, offset = decode_tag(data, offset) group_number = decode_unsigned(data[offset : offset + tag.length]) offset += tag.length # [1] writePriority tag, offset = decode_tag(data, offset) write_priority = decode_unsigned(data[offset : offset + tag.length]) offset += tag.length # [2] changeList (opening/closing tag 2) tag, offset = decode_tag(data, offset) # opening tag 2 change_list: list[GroupChannelValue] = [] while offset < len(data): tag, _ = decode_tag(data, offset) if tag.is_closing and tag.number == 2: break gcv, offset = GroupChannelValue.decode(data, offset) change_list.append(gcv) if len(change_list) >= _MAX_DECODED_ITEMS: msg = f"Decoded item count exceeds limit ({_MAX_DECODED_ITEMS})" raise ValueError(msg) return cls( group_number=group_number, write_priority=write_priority, change_list=change_list, )