Source code for bac_py.services.read_range

"""ReadRange service per ASHRAE 135-2016 Clause 15.8."""

from __future__ import annotations

from dataclasses import dataclass

from bac_py.encoding.primitives import (
    decode_bit_string,
    decode_date,
    decode_object_identifier,
    decode_signed,
    decode_time,
    decode_unsigned,
    encode_application_date,
    encode_application_signed,
    encode_application_time,
    encode_application_unsigned,
    encode_bit_string,
    encode_context_object_id,
    encode_context_tagged,
    encode_unsigned,
)
from bac_py.encoding.tags import (
    TagClass,
    as_memoryview,
    decode_optional_context,
    decode_tag,
    encode_closing_tag,
    encode_opening_tag,
    extract_context_value,
)
from bac_py.types.enums import ObjectType, PropertyIdentifier
from bac_py.types.primitives import BACnetDate, BACnetTime, BitString, ObjectIdentifier


[docs] @dataclass(frozen=True, slots=True) class RangeByPosition: """Range qualifier: by position (index + count). :: SEQUENCE { referenceIndex Unsigned, count INTEGER } """ reference_index: int count: int
[docs] @dataclass(frozen=True, slots=True) class RangeBySequenceNumber: """Range qualifier: by sequence number. :: SEQUENCE { referenceSequenceNumber Unsigned, count INTEGER } """ reference_sequence_number: int count: int
[docs] @dataclass(frozen=True, slots=True) class RangeByTime: """Range qualifier: by time. :: SEQUENCE { referenceTime BACnetDateTime, count INTEGER } """ reference_date: BACnetDate reference_time: BACnetTime count: int
[docs] @dataclass(frozen=True, slots=True) class ResultFlags: """BACnetResultFlags — 3-bit BitString. Bit 0: FIRST_ITEM — result includes the first item in the list Bit 1: LAST_ITEM — result includes the last item in the list Bit 2: MORE_ITEMS — more items remain beyond what was returned """ first_item: bool = False last_item: bool = False more_items: bool = False
[docs] def to_bit_string(self) -> BitString: """Convert to a BACnet :class:`~bac_py.types.primitives.BitString`. :returns: 3-bit BitString encoding of the result flags. """ value = (self.first_item << 2) | (self.last_item << 1) | self.more_items return BitString(bytes([value << 5]), unused_bits=5)
[docs] @classmethod def from_bit_string(cls, bs: BitString) -> ResultFlags: """Decode from a BACnet :class:`~bac_py.types.primitives.BitString`. :param bs: BitString with up to 3 significant bits. :returns: Decoded :class:`ResultFlags`. """ return cls( first_item=bs[0] if len(bs) > 0 else False, last_item=bs[1] if len(bs) > 1 else False, more_items=bs[2] if len(bs) > 2 else False, )
[docs] @dataclass(frozen=True, slots=True) class ReadRangeRequest: """ReadRange-Request service parameters (Clause 15.8.1.1). :: ReadRange-Request ::= SEQUENCE { objectIdentifier [0] BACnetObjectIdentifier, propertyIdentifier [1] BACnetPropertyIdentifier, propertyArrayIndex [2] Unsigned OPTIONAL, range CHOICE { byPosition [3] SEQUENCE { referenceIndex Unsigned, count INTEGER }, bySequenceNumber [6] SEQUENCE { referenceSequenceNumber Unsigned, count INTEGER }, byTime [7] SEQUENCE { referenceTime BACnetDateTime, count INTEGER } } OPTIONAL } """ object_identifier: ObjectIdentifier property_identifier: PropertyIdentifier property_array_index: int | None = None range: RangeByPosition | RangeBySequenceNumber | RangeByTime | None = None
[docs] def encode(self) -> bytes: """Encode ReadRange-Request service parameters. :returns: Encoded service request bytes. """ buf = bytearray() # [0] object-identifier buf.extend(encode_context_object_id(0, self.object_identifier)) # [1] property-identifier buf.extend(encode_context_tagged(1, encode_unsigned(self.property_identifier))) # [2] property-array-index (optional) if self.property_array_index is not None: buf.extend(encode_context_tagged(2, encode_unsigned(self.property_array_index))) # Range qualifier (optional) -- inner SEQUENCE elements use # application tags per BACnet encoding rules. if isinstance(self.range, RangeByPosition): buf.extend(encode_opening_tag(3)) buf.extend(encode_application_unsigned(self.range.reference_index)) buf.extend(encode_application_signed(self.range.count)) buf.extend(encode_closing_tag(3)) elif isinstance(self.range, RangeBySequenceNumber): buf.extend(encode_opening_tag(6)) buf.extend(encode_application_unsigned(self.range.reference_sequence_number)) buf.extend(encode_application_signed(self.range.count)) buf.extend(encode_closing_tag(6)) elif isinstance(self.range, RangeByTime): buf.extend(encode_opening_tag(7)) buf.extend(encode_application_date(self.range.reference_date)) buf.extend(encode_application_time(self.range.reference_time)) buf.extend(encode_application_signed(self.range.count)) buf.extend(encode_closing_tag(7)) return bytes(buf)
[docs] @classmethod def decode(cls, data: memoryview | bytes) -> ReadRangeRequest: """Decode ReadRange-Request from service request bytes. :param data: Raw service request bytes. :returns: Decoded :class:`ReadRangeRequest`. """ data = as_memoryview(data) offset = 0 # [0] object-identifier tag, offset = decode_tag(data, offset) obj_type, instance = decode_object_identifier(data[offset : offset + tag.length]) offset += tag.length object_identifier = ObjectIdentifier(ObjectType(obj_type), instance) # [1] property-identifier tag, offset = decode_tag(data, offset) property_identifier = PropertyIdentifier( decode_unsigned(data[offset : offset + tag.length]) ) offset += tag.length # [2] property-array-index (optional) property_array_index = None range_val: RangeByPosition | RangeBySequenceNumber | RangeByTime | None = None if offset < len(data): tag_peek, next_offset = decode_tag(data, offset) if ( tag_peek.cls == TagClass.CONTEXT and tag_peek.number == 2 and not tag_peek.is_opening and not tag_peek.is_closing ): property_array_index = decode_unsigned( data[next_offset : next_offset + tag_peek.length] ) offset = next_offset + tag_peek.length # Range qualifier (optional) if offset < len(data): tag_peek, next_offset = decode_tag(data, offset) if tag_peek.is_opening: offset = next_offset if tag_peek.number == 3: # byPosition t, offset = decode_tag(data, offset) ref_index = decode_unsigned(data[offset : offset + t.length]) offset += t.length t, offset = decode_tag(data, offset) count = decode_signed(data[offset : offset + t.length]) offset += t.length # closing tag 3 _closing, offset = decode_tag(data, offset) range_val = RangeByPosition(reference_index=ref_index, count=count) elif tag_peek.number == 6: # bySequenceNumber t, offset = decode_tag(data, offset) ref_seq = decode_unsigned(data[offset : offset + t.length]) offset += t.length t, offset = decode_tag(data, offset) count = decode_signed(data[offset : offset + t.length]) offset += t.length # closing tag 6 _closing, offset = decode_tag(data, offset) range_val = RangeBySequenceNumber( reference_sequence_number=ref_seq, count=count ) elif tag_peek.number == 7: # byTime t, offset = decode_tag(data, offset) ref_date = decode_date(data[offset : offset + t.length]) offset += t.length t, offset = decode_tag(data, offset) ref_time = decode_time(data[offset : offset + t.length]) offset += t.length t, offset = decode_tag(data, offset) count = decode_signed(data[offset : offset + t.length]) offset += t.length # closing tag 7 _closing, offset = decode_tag(data, offset) range_val = RangeByTime( reference_date=ref_date, reference_time=ref_time, count=count, ) return cls( object_identifier=object_identifier, property_identifier=property_identifier, property_array_index=property_array_index, range=range_val, )
[docs] @dataclass(frozen=True, slots=True) class ReadRangeACK: """ReadRange-ACK service parameters (Clause 15.8.1.2). :: ReadRange-ACK ::= SEQUENCE { objectIdentifier [0] BACnetObjectIdentifier, propertyIdentifier [1] BACnetPropertyIdentifier, propertyArrayIndex [2] Unsigned OPTIONAL, resultFlags [3] BACnetResultFlags, itemCount [4] Unsigned, itemData [5] SEQUENCE OF ABSTRACT-SYNTAX.&TYPE, firstSequenceNumber [6] Unsigned32 OPTIONAL } The itemData field contains raw encoded bytes wrapped in context tag 5 (opening/closing). """ object_identifier: ObjectIdentifier property_identifier: PropertyIdentifier result_flags: ResultFlags item_count: int item_data: bytes property_array_index: int | None = None first_sequence_number: int | None = None
[docs] def encode(self) -> bytes: """Encode ReadRange-ACK service parameters. :returns: Encoded service ACK bytes. """ buf = bytearray() # [0] object-identifier buf.extend(encode_context_object_id(0, self.object_identifier)) # [1] property-identifier buf.extend(encode_context_tagged(1, encode_unsigned(self.property_identifier))) # [2] property-array-index (optional) if self.property_array_index is not None: buf.extend(encode_context_tagged(2, encode_unsigned(self.property_array_index))) # [3] result-flags (BitString) buf.extend(encode_context_tagged(3, encode_bit_string(self.result_flags.to_bit_string()))) # [4] item-count buf.extend(encode_context_tagged(4, encode_unsigned(self.item_count))) # [5] item-data (opening/closing) buf.extend(encode_opening_tag(5)) buf.extend(self.item_data) buf.extend(encode_closing_tag(5)) # [6] first-sequence-number (optional) if self.first_sequence_number is not None: buf.extend(encode_context_tagged(6, encode_unsigned(self.first_sequence_number))) return bytes(buf)
[docs] @classmethod def decode(cls, data: memoryview | bytes) -> ReadRangeACK: """Decode ReadRange-ACK from service ACK bytes. :param data: Raw service ACK bytes. :returns: Decoded :class:`ReadRangeACK`. """ data = as_memoryview(data) offset = 0 # [0] object-identifier tag, offset = decode_tag(data, offset) obj_type, instance = decode_object_identifier(data[offset : offset + tag.length]) offset += tag.length object_identifier = ObjectIdentifier(ObjectType(obj_type), instance) # [1] property-identifier tag, offset = decode_tag(data, offset) property_identifier = PropertyIdentifier( decode_unsigned(data[offset : offset + tag.length]) ) offset += tag.length # [2] property-array-index (optional) property_array_index = None tag_peek, next_offset = decode_tag(data, offset) if ( tag_peek.cls == TagClass.CONTEXT and tag_peek.number == 2 and not tag_peek.is_opening and not tag_peek.is_closing ): property_array_index = decode_unsigned( data[next_offset : next_offset + tag_peek.length] ) offset = next_offset + tag_peek.length tag_peek, next_offset = decode_tag(data, offset) # [3] result-flags # tag_peek should be context tag 3 tag = tag_peek offset = next_offset bs = decode_bit_string(data[offset : offset + tag.length]) result_flags = ResultFlags.from_bit_string(bs) offset += tag.length # [4] item-count tag, offset = decode_tag(data, offset) item_count = decode_unsigned(data[offset : offset + tag.length]) offset += tag.length # [5] item-data (opening/closing) _opening, offset = decode_tag(data, offset) item_data, offset = extract_context_value(data, offset, 5) # [6] first-sequence-number (optional) first_sequence_number, _ = decode_optional_context(data, offset, 6, decode_unsigned) return cls( object_identifier=object_identifier, property_identifier=property_identifier, result_flags=result_flags, item_count=item_count, item_data=item_data, property_array_index=property_array_index, first_sequence_number=first_sequence_number, )