Source code for bac_py.services.who_is

"""Who-Is and I-Am services per ASHRAE 135-2016 Clause 16.10."""

from __future__ import annotations

from dataclasses import dataclass

from bac_py.encoding.primitives import (
    decode_object_identifier,
    decode_unsigned,
    encode_application_enumerated,
    encode_application_object_id,
    encode_application_unsigned,
    encode_context_tagged,
    encode_unsigned,
)
from bac_py.encoding.tags import TagClass, as_memoryview, decode_tag
from bac_py.types.enums import ObjectType, Segmentation
from bac_py.types.primitives import ObjectIdentifier


[docs] @dataclass(frozen=True, slots=True) class WhoIsRequest: """Who-Is-Request service parameters (Clause 16.10.1). Both limits must be present or both absent. """ low_limit: int | None = None high_limit: int | None = None
[docs] def encode(self) -> bytes: """Encode Who-Is-Request service parameters. :returns: Encoded service request bytes (may be empty if no range is set). """ if self.low_limit is None or self.high_limit is None: return b"" buf = bytearray() # [0] device-instance-range-low-limit buf.extend(encode_context_tagged(0, encode_unsigned(self.low_limit))) # [1] device-instance-range-high-limit buf.extend(encode_context_tagged(1, encode_unsigned(self.high_limit))) return bytes(buf)
[docs] @classmethod def decode(cls, data: memoryview | bytes) -> WhoIsRequest: """Decode Who-Is-Request from service request bytes. :param data: Raw service request bytes. :returns: Decoded :class:`WhoIsRequest`. """ data = as_memoryview(data) if len(data) == 0: return cls() offset = 0 low_limit = None high_limit = None # [0] device-instance-range-low-limit tag, offset = decode_tag(data, offset) if tag.cls == TagClass.CONTEXT and tag.number == 0: low_limit = decode_unsigned(data[offset : offset + tag.length]) offset += tag.length # [1] device-instance-range-high-limit if offset < len(data): tag, offset = decode_tag(data, offset) if tag.cls == TagClass.CONTEXT and tag.number == 1: high_limit = decode_unsigned(data[offset : offset + tag.length]) return cls(low_limit=low_limit, high_limit=high_limit)
def __post_init__(self) -> None: """Validate that both limits are present or both absent. If only one limit is provided, both are reset to ``None`` (per Clause 16.10.1.1.1, malformed ranges are treated as unbounded). """ if (self.low_limit is None) != (self.high_limit is None): # Per Clause 16.10.1.1.1, treat malformed as unbounded object.__setattr__(self, "low_limit", None) object.__setattr__(self, "high_limit", None)
[docs] @dataclass(frozen=True, slots=True) class IAmRequest: """I-Am-Request service parameters (Clause 16.10.2). All fields use APPLICATION tags (not context-specific). """ object_identifier: ObjectIdentifier max_apdu_length: int segmentation_supported: Segmentation vendor_id: int
[docs] def encode(self) -> bytes: """Encode I-Am-Request service parameters. :returns: Encoded service request bytes. """ buf = bytearray() # iAmDeviceIdentifier - application tagged object-id buf.extend( encode_application_object_id( self.object_identifier.object_type, self.object_identifier.instance_number, ) ) # maxAPDULengthAccepted - application tagged unsigned buf.extend(encode_application_unsigned(self.max_apdu_length)) # segmentationSupported - application tagged enumerated buf.extend(encode_application_enumerated(self.segmentation_supported)) # vendorID - application tagged unsigned buf.extend(encode_application_unsigned(self.vendor_id)) return bytes(buf)
[docs] @classmethod def decode(cls, data: memoryview | bytes) -> IAmRequest: """Decode I-Am-Request from service request bytes. :param data: Raw service request bytes. :returns: Decoded :class:`IAmRequest`. """ data = as_memoryview(data) offset = 0 # iAmDeviceIdentifier - application tagged object-id (tag 12) tag, offset = decode_tag(data, offset) obj_type, instance = decode_object_identifier(data[offset : offset + tag.length]) offset += tag.length object_identifier = ObjectIdentifier(ObjectType(obj_type), instance) # maxAPDULengthAccepted - application tagged unsigned (tag 2) tag, offset = decode_tag(data, offset) max_apdu_length = decode_unsigned(data[offset : offset + tag.length]) offset += tag.length # segmentationSupported - application tagged enumerated (tag 9) tag, offset = decode_tag(data, offset) segmentation_supported = Segmentation(decode_unsigned(data[offset : offset + tag.length])) offset += tag.length # vendorID - application tagged unsigned (tag 2) tag, offset = decode_tag(data, offset) vendor_id = decode_unsigned(data[offset : offset + tag.length]) return cls( object_identifier=object_identifier, max_apdu_length=max_apdu_length, segmentation_supported=segmentation_supported, vendor_id=vendor_id, )