Source code for bac_py.encoding.apdu

"""APDU encoding and decoding per ASHRAE 135-2016 Clause 20.1."""

from __future__ import annotations

import logging
from dataclasses import dataclass

from bac_py.encoding.primitives import decode_enumerated, encode_application_enumerated
from bac_py.encoding.tags import decode_tag
from bac_py.types.enums import AbortReason, ErrorClass, ErrorCode, PduType, RejectReason

logger = logging.getLogger(__name__)
_DEBUG = logging.DEBUG

# Pre-built PduType lookup tuple indexed by nibble value (0-15)
_PDU_TYPES: tuple[PduType | None, ...] = tuple(PduType(i) if i <= 7 else None for i in range(16))

# Max-segments encoding table (Clause 20.1.2.4)
# B'000' = unspecified, B'001' = 2, ... B'110' = 64, B'111' = >64
_MAX_SEGMENTS_ENCODE: dict[int, int] = {
    2: 1,
    4: 2,
    8: 3,
    16: 4,
    32: 5,
    64: 6,
}
_MAX_SEGMENTS_UNSPECIFIED = 0  # B'000'
_MAX_SEGMENTS_OVER_64 = 7  # B'111'

# Fast tuple lookup for decode (indexed 0-7)
_MAX_SEGMENTS_DECODE_TUPLE: tuple[int | None, ...] = (None, 2, 4, 8, 16, 32, 64, None)

# Max-APDU-length encoding table (Clause 20.1.2.5)
_MAX_APDU_ENCODE: dict[int, int] = {
    50: 0,
    128: 1,
    206: 2,
    480: 3,
    1024: 4,
    1476: 5,
}

# Fast tuple lookup for decode (indexed 0-5, default 1476 for 6+)
_MAX_APDU_DECODE_TUPLE: tuple[int, ...] = (50, 128, 206, 480, 1024, 1476)


def _encode_max_segments(value: int | None) -> int:
    """Encode a max-segments value to a 3-bit field per Clause 20.1.2.4.

    :param value: Maximum number of segments (2, 4, 8, 16, 32, 64) or
        ``None`` for unspecified.
    :returns: 3-bit encoded field value (0-7).
    """
    if value is None:
        return _MAX_SEGMENTS_UNSPECIFIED
    return _MAX_SEGMENTS_ENCODE.get(value, _MAX_SEGMENTS_OVER_64)


def _decode_max_segments(value: int) -> int | None:
    """Decode a 3-bit max-segments field per Clause 20.1.2.4.

    :param value: 3-bit field value from the PDU header.
    :returns: Segment count, or ``None`` if unspecified or >64.
    """
    return _MAX_SEGMENTS_DECODE_TUPLE[value]


def _encode_max_apdu(value: int) -> int:
    """Encode a max-APDU-length to a 4-bit field per Clause 20.1.2.5.

    :param value: Maximum APDU length in bytes (50, 128, 206, 480, 1024, 1476).
    :returns: 4-bit encoded field value (0-5), defaults to 5 (1476).
    """
    return _MAX_APDU_ENCODE.get(value, 5)


def _decode_max_apdu(value: int) -> int:
    """Decode a 4-bit max-APDU-length field per Clause 20.1.2.5.

    :param value: 4-bit field value from the PDU header.
    :returns: Maximum APDU length in bytes, defaults to 1476.
    """
    if value < len(_MAX_APDU_DECODE_TUPLE):
        return _MAX_APDU_DECODE_TUPLE[value]
    return 1476


# --- PDU Dataclasses ---


[docs] @dataclass(frozen=True, slots=True) class ConfirmedRequestPDU: """BACnet Confirmed-Request PDU (Clause 20.1.2).""" segmented: bool """Whether this PDU is a segment of a larger message.""" more_follows: bool """Whether more segments follow this one.""" segmented_response_accepted: bool """Whether the sender can accept a segmented response.""" max_segments: int | None """Maximum number of segments the sender can accept, or ``None`` for unspecified.""" max_apdu_length: int """Maximum APDU length in bytes the sender can accept.""" invoke_id: int """Invoke ID for matching requests to responses (0-255).""" sequence_number: int | None """Segment sequence number, or ``None`` if not segmented.""" proposed_window_size: int | None """Proposed window size for segmentation, or ``None`` if not segmented.""" service_choice: int """Confirmed service choice number.""" service_request: bytes """Encoded service request parameters."""
[docs] @dataclass(frozen=True, slots=True) class UnconfirmedRequestPDU: """BACnet Unconfirmed-Request PDU (Clause 20.1.3).""" service_choice: int """Unconfirmed service choice number.""" service_request: bytes """Encoded service request parameters."""
[docs] @dataclass(frozen=True, slots=True) class SimpleAckPDU: """BACnet SimpleACK PDU (Clause 20.1.4).""" invoke_id: int """Invoke ID of the confirmed request being acknowledged.""" service_choice: int """Service choice of the confirmed request being acknowledged."""
[docs] @dataclass(frozen=True, slots=True) class ComplexAckPDU: """BACnet ComplexACK PDU (Clause 20.1.5).""" segmented: bool """Whether this PDU is a segment of a larger response.""" more_follows: bool """Whether more segments follow this one.""" invoke_id: int """Invoke ID of the confirmed request being acknowledged.""" sequence_number: int | None """Segment sequence number, or ``None`` if not segmented.""" proposed_window_size: int | None """Proposed window size for segmentation, or ``None`` if not segmented.""" service_choice: int """Service choice of the confirmed request being acknowledged.""" service_ack: bytes """Encoded service response parameters."""
[docs] @dataclass(frozen=True, slots=True) class SegmentAckPDU: """BACnet SegmentACK PDU (Clause 20.1.6).""" negative_ack: bool """Whether this is a negative acknowledgement (requesting retransmission).""" sent_by_server: bool """Whether the server (not the client) sent this SegmentACK.""" invoke_id: int """Invoke ID of the segmented transaction.""" sequence_number: int """Sequence number of the last segment received.""" actual_window_size: int """Actual window size the sender can accept."""
[docs] @dataclass(frozen=True, slots=True) class ErrorPDU: """BACnet Error PDU (Clause 20.1.7). Represents the error response with error-class, error-code, and optional trailing error data for extended error types (e.g. ChangeList-Error, CreateObject-Error). """ invoke_id: int """Invoke ID of the confirmed request that caused the error.""" service_choice: int """Service choice of the confirmed request that caused the error.""" error_class: ErrorClass """Error class categorising the error (e.g. object, property, resource).""" error_code: ErrorCode """Specific error code within the error class.""" error_data: bytes = b"" """Optional trailing bytes for extended error types."""
[docs] @dataclass(frozen=True, slots=True) class RejectPDU: """BACnet Reject PDU (Clause 20.1.8).""" invoke_id: int """Invoke ID of the confirmed request being rejected.""" reject_reason: RejectReason """Reason the request was rejected."""
[docs] @dataclass(frozen=True, slots=True) class AbortPDU: """BACnet Abort PDU (Clause 20.1.9).""" sent_by_server: bool """Whether the server (not the client) initiated the abort.""" invoke_id: int """Invoke ID of the transaction being aborted.""" abort_reason: AbortReason """Reason the transaction was aborted."""
# Union of all PDU types APDU = ( ConfirmedRequestPDU | UnconfirmedRequestPDU | SimpleAckPDU | ComplexAckPDU | SegmentAckPDU | ErrorPDU | RejectPDU | AbortPDU ) # --- Segmentation field helpers --- def _encode_segmentation_fields( buf: bytearray, segmented: bool, sequence_number: int | None, proposed_window_size: int | None, ) -> None: """Append segmentation fields to *buf* if the PDU is segmented. :param buf: Mutable byte buffer to append to. :param segmented: Whether the PDU is segmented. :param sequence_number: Segment sequence number. :param proposed_window_size: Proposed window size. """ if segmented: buf.append(sequence_number if sequence_number is not None else 0) buf.append(proposed_window_size if proposed_window_size is not None else 1) def _decode_segmentation_fields( data: memoryview, offset: int, segmented: bool, min_len: int, pdu_name: str, ) -> tuple[int | None, int | None, int]: """Decode segmentation fields if the PDU is segmented. :param data: Buffer containing the raw PDU bytes. :param offset: Current position in the buffer. :param segmented: Whether the PDU is segmented. :param min_len: Minimum buffer length required for segmented PDUs. :param pdu_name: PDU type name for error messages. :returns: Tuple of (sequence_number, proposed_window_size, new_offset). :raises ValueError: If the buffer is too short for segmented fields. """ if not segmented: return None, None, offset if len(data) < min_len: msg = f"Segmented {pdu_name} too short: need at least {min_len} bytes, got {len(data)}" raise ValueError(msg) return data[offset], data[offset + 1], offset + 2 # --- Encoding ---
[docs] def encode_apdu(pdu: APDU) -> bytes: """Encode an APDU dataclass to wire-format bytes. Dispatches to the appropriate encoder based on the PDU type. :param pdu: The PDU dataclass instance to encode. :returns: Encoded APDU bytes ready for transmission. :raises TypeError: If *pdu* is not a recognised PDU type. """ match pdu: case ConfirmedRequestPDU(): return _encode_confirmed_request(pdu) case UnconfirmedRequestPDU(): return _encode_unconfirmed_request(pdu) case SimpleAckPDU(): return _encode_simple_ack(pdu) case ComplexAckPDU(): return _encode_complex_ack(pdu) case SegmentAckPDU(): return _encode_segment_ack(pdu) case ErrorPDU(): return _encode_error(pdu) case RejectPDU(): return _encode_reject(pdu) case AbortPDU(): return _encode_abort(pdu) case _: msg = f"Unknown PDU type: {type(pdu).__name__}" logger.warning(msg) raise TypeError(msg)
def _encode_confirmed_request(pdu: ConfirmedRequestPDU) -> bytes: """Encode a :class:`ConfirmedRequestPDU` to bytes per Clause 20.1.2. :param pdu: Confirmed request to encode. :returns: Encoded PDU bytes. """ if __debug__ and logger.isEnabledFor(_DEBUG): logger.debug( "encode confirmed request service=%d invoke_id=%d", pdu.service_choice, pdu.invoke_id, ) buf = bytearray() # Byte 0: PDU type + flags byte0 = PduType.CONFIRMED_REQUEST << 4 if pdu.segmented: byte0 |= 0x08 if pdu.more_follows: byte0 |= 0x04 if pdu.segmented_response_accepted: byte0 |= 0x02 buf.append(byte0) # Byte 1: max-segments + max-APDU-length byte1 = (_encode_max_segments(pdu.max_segments) << 4) | _encode_max_apdu(pdu.max_apdu_length) buf.append(byte1) buf.append(pdu.invoke_id) _encode_segmentation_fields(buf, pdu.segmented, pdu.sequence_number, pdu.proposed_window_size) buf.append(pdu.service_choice) buf.extend(pdu.service_request) return bytes(buf) def _encode_unconfirmed_request(pdu: UnconfirmedRequestPDU) -> bytes: """Encode an :class:`UnconfirmedRequestPDU` to bytes per Clause 20.1.3. :param pdu: Unconfirmed request to encode. :returns: Encoded PDU bytes. """ buf = bytearray() buf.append(PduType.UNCONFIRMED_REQUEST << 4) buf.append(pdu.service_choice) buf.extend(pdu.service_request) return bytes(buf) def _encode_simple_ack(pdu: SimpleAckPDU) -> bytes: """Encode a :class:`SimpleAckPDU` to bytes per Clause 20.1.4. :param pdu: Simple ACK to encode. :returns: Encoded PDU bytes (3 bytes). """ return bytes([PduType.SIMPLE_ACK << 4, pdu.invoke_id, pdu.service_choice]) def _encode_complex_ack(pdu: ComplexAckPDU) -> bytes: """Encode a :class:`ComplexAckPDU` to bytes per Clause 20.1.5. :param pdu: Complex ACK to encode. :returns: Encoded PDU bytes. """ buf = bytearray() byte0 = PduType.COMPLEX_ACK << 4 if pdu.segmented: byte0 |= 0x08 if pdu.more_follows: byte0 |= 0x04 buf.append(byte0) buf.append(pdu.invoke_id) _encode_segmentation_fields(buf, pdu.segmented, pdu.sequence_number, pdu.proposed_window_size) buf.append(pdu.service_choice) buf.extend(pdu.service_ack) return bytes(buf) def _encode_segment_ack(pdu: SegmentAckPDU) -> bytes: """Encode a :class:`SegmentAckPDU` to bytes per Clause 20.1.6. :param pdu: Segment ACK to encode. :returns: Encoded PDU bytes (4 bytes). """ byte0 = PduType.SEGMENT_ACK << 4 if pdu.negative_ack: byte0 |= 0x02 if pdu.sent_by_server: byte0 |= 0x01 return bytes( [ byte0, pdu.invoke_id, pdu.sequence_number, pdu.actual_window_size, ] ) def _encode_error(pdu: ErrorPDU) -> bytes: """Encode an :class:`ErrorPDU` to bytes per Clause 20.1.7. :param pdu: Error PDU to encode. :returns: Encoded PDU bytes including error class, code, and optional data. """ buf = bytearray() buf.append(PduType.ERROR << 4) buf.append(pdu.invoke_id) buf.append(pdu.service_choice) buf.extend(encode_application_enumerated(pdu.error_class)) buf.extend(encode_application_enumerated(pdu.error_code)) if pdu.error_data: buf.extend(pdu.error_data) return bytes(buf) def _encode_reject(pdu: RejectPDU) -> bytes: """Encode a :class:`RejectPDU` to bytes per Clause 20.1.8. :param pdu: Reject PDU to encode. :returns: Encoded PDU bytes (3 bytes). """ return bytes([PduType.REJECT << 4, pdu.invoke_id, pdu.reject_reason]) def _encode_abort(pdu: AbortPDU) -> bytes: """Encode an :class:`AbortPDU` to bytes per Clause 20.1.9. :param pdu: Abort PDU to encode. :returns: Encoded PDU bytes (3 bytes). """ byte0 = PduType.ABORT << 4 if pdu.sent_by_server: byte0 |= 0x01 return bytes([byte0, pdu.invoke_id, pdu.abort_reason]) # --- Decoding ---
[docs] def decode_apdu(data: memoryview | bytes) -> APDU: """Decode an APDU from raw bytes. Inspects the PDU type nibble in the first byte and dispatches to the appropriate decoder. :param data: Raw APDU bytes. :returns: Decoded PDU dataclass instance. :raises ValueError: If *data* is too short to decode. :raises TypeError: If the PDU type is not recognised. """ if len(data) < 1: msg = "APDU data too short: need at least 1 byte" raise ValueError(msg) if isinstance(data, bytes): data = memoryview(data) pdu_type = _PDU_TYPES[(data[0] >> 4) & 0x0F] if pdu_type is None: msg = f"Unknown PDU type nibble: {(data[0] >> 4) & 0x0F:#x}" logger.warning(msg) raise ValueError(msg) if __debug__ and logger.isEnabledFor(_DEBUG): logger.debug("decode APDU type=%s", pdu_type) match pdu_type: case PduType.CONFIRMED_REQUEST: return _decode_confirmed_request(data) case PduType.UNCONFIRMED_REQUEST: return _decode_unconfirmed_request(data) case PduType.SIMPLE_ACK: return _decode_simple_ack(data) case PduType.COMPLEX_ACK: return _decode_complex_ack(data) case PduType.SEGMENT_ACK: return _decode_segment_ack(data) case PduType.ERROR: return _decode_error(data) case PduType.REJECT: return _decode_reject(data) case PduType.ABORT: return _decode_abort(data) case _: msg = f"Unknown PDU type: {pdu_type!r}" logger.warning(msg) raise TypeError(msg)
def _decode_confirmed_request(data: memoryview) -> ConfirmedRequestPDU: """Decode a :class:`ConfirmedRequestPDU` from raw bytes per Clause 20.1.2. :param data: Raw PDU bytes (at least 4 bytes). :returns: Decoded :class:`ConfirmedRequestPDU`. :raises ValueError: If *data* is too short. """ if len(data) < 4: msg = f"ConfirmedRequest too short: need at least 4 bytes, got {len(data)}" raise ValueError(msg) byte0 = data[0] segmented = bool(byte0 & 0x08) more_follows = bool(byte0 & 0x04) segmented_response_accepted = bool(byte0 & 0x02) byte1 = data[1] max_segments = _decode_max_segments((byte1 >> 4) & 0x07) max_apdu_length = _decode_max_apdu(byte1 & 0x0F) invoke_id = data[2] offset = 3 sequence_number, proposed_window_size, offset = _decode_segmentation_fields( data, offset, segmented, 6, "ConfirmedRequest" ) service_choice = data[offset] offset += 1 service_request = bytes(data[offset:]) return _make_confirmed_request( segmented, more_follows, segmented_response_accepted, max_segments, max_apdu_length, invoke_id, sequence_number, proposed_window_size, service_choice, service_request, ) def _make_confirmed_request( segmented: bool, more_follows: bool, segmented_response_accepted: bool, max_segments: int | None, max_apdu_length: int, invoke_id: int, sequence_number: int | None, proposed_window_size: int | None, service_choice: int, service_request: bytes, ) -> ConfirmedRequestPDU: """Fast ConfirmedRequestPDU construction bypassing frozen-dataclass ``__init__``.""" obj = object.__new__(ConfirmedRequestPDU) object.__setattr__(obj, "segmented", segmented) object.__setattr__(obj, "more_follows", more_follows) object.__setattr__(obj, "segmented_response_accepted", segmented_response_accepted) object.__setattr__(obj, "max_segments", max_segments) object.__setattr__(obj, "max_apdu_length", max_apdu_length) object.__setattr__(obj, "invoke_id", invoke_id) object.__setattr__(obj, "sequence_number", sequence_number) object.__setattr__(obj, "proposed_window_size", proposed_window_size) object.__setattr__(obj, "service_choice", service_choice) object.__setattr__(obj, "service_request", service_request) return obj def _decode_unconfirmed_request(data: memoryview) -> UnconfirmedRequestPDU: """Decode an :class:`UnconfirmedRequestPDU` from raw bytes per Clause 20.1.3. :param data: Raw PDU bytes (at least 2 bytes). :returns: Decoded :class:`UnconfirmedRequestPDU`. :raises ValueError: If *data* is too short. """ if len(data) < 2: msg = f"UnconfirmedRequest too short: need at least 2 bytes, got {len(data)}" raise ValueError(msg) service_choice = data[1] service_request = bytes(data[2:]) return UnconfirmedRequestPDU( service_choice=service_choice, service_request=service_request, ) def _decode_simple_ack(data: memoryview) -> SimpleAckPDU: """Decode a :class:`SimpleAckPDU` from raw bytes per Clause 20.1.4. :param data: Raw PDU bytes (at least 3 bytes). :returns: Decoded :class:`SimpleAckPDU`. :raises ValueError: If *data* is too short. """ if len(data) < 3: msg = f"SimpleACK too short: need at least 3 bytes, got {len(data)}" raise ValueError(msg) return SimpleAckPDU(invoke_id=data[1], service_choice=data[2]) def _decode_complex_ack(data: memoryview) -> ComplexAckPDU: """Decode a :class:`ComplexAckPDU` from raw bytes per Clause 20.1.5. :param data: Raw PDU bytes (at least 3 bytes). :returns: Decoded :class:`ComplexAckPDU`. :raises ValueError: If *data* is too short. """ if len(data) < 3: msg = f"ComplexACK too short: need at least 3 bytes, got {len(data)}" raise ValueError(msg) byte0 = data[0] segmented = bool(byte0 & 0x08) more_follows = bool(byte0 & 0x04) invoke_id = data[1] offset = 2 sequence_number, proposed_window_size, offset = _decode_segmentation_fields( data, offset, segmented, 5, "ComplexACK" ) service_choice = data[offset] offset += 1 service_ack = bytes(data[offset:]) return ComplexAckPDU( segmented=segmented, more_follows=more_follows, invoke_id=invoke_id, sequence_number=sequence_number, proposed_window_size=proposed_window_size, service_choice=service_choice, service_ack=service_ack, ) def _decode_segment_ack(data: memoryview) -> SegmentAckPDU: """Decode a :class:`SegmentAckPDU` from raw bytes per Clause 20.1.6. :param data: Raw PDU bytes (at least 4 bytes). :returns: Decoded :class:`SegmentAckPDU`. :raises ValueError: If *data* is too short. """ if len(data) < 4: msg = f"SegmentACK too short: need at least 4 bytes, got {len(data)}" raise ValueError(msg) byte0 = data[0] return SegmentAckPDU( negative_ack=bool(byte0 & 0x02), sent_by_server=bool(byte0 & 0x01), invoke_id=data[1], sequence_number=data[2], actual_window_size=data[3], ) def _decode_error(data: memoryview) -> ErrorPDU: """Decode an :class:`ErrorPDU` from raw bytes per Clause 20.1.7. Decodes the error class and error code as application-tagged enumerated values, and preserves any trailing bytes as extended error data. :param data: Raw PDU bytes (at least 5 bytes). :returns: Decoded :class:`ErrorPDU`. :raises ValueError: If *data* is too short. """ if len(data) < 5: msg = f"ErrorPDU too short: need at least 5 bytes, got {len(data)}" raise ValueError(msg) invoke_id = data[1] service_choice = data[2] # Error class and code are application-tagged enumerated values offset = 3 tag, offset = decode_tag(data, offset) if offset + tag.length > len(data): msg = f"ErrorPDU truncated at error class: need {tag.length} bytes at offset {offset}" raise ValueError(msg) error_class = ErrorClass(decode_enumerated(data[offset : offset + tag.length])) offset += tag.length tag, offset = decode_tag(data, offset) if offset + tag.length > len(data): msg = f"ErrorPDU truncated at error code: need {tag.length} bytes at offset {offset}" raise ValueError(msg) error_code = ErrorCode(decode_enumerated(data[offset : offset + tag.length])) offset += tag.length # Preserve any trailing error data (extended error types) error_data = bytes(data[offset:]) if offset < len(data) else b"" return ErrorPDU( invoke_id=invoke_id, service_choice=service_choice, error_class=error_class, error_code=error_code, error_data=error_data, ) def _decode_reject(data: memoryview) -> RejectPDU: """Decode a :class:`RejectPDU` from raw bytes per Clause 20.1.8. :param data: Raw PDU bytes (at least 3 bytes). :returns: Decoded :class:`RejectPDU`. :raises ValueError: If *data* is too short. """ if len(data) < 3: msg = f"RejectPDU too short: need at least 3 bytes, got {len(data)}" raise ValueError(msg) return RejectPDU( invoke_id=data[1], reject_reason=RejectReason(data[2]), ) def _decode_abort(data: memoryview) -> AbortPDU: """Decode an :class:`AbortPDU` from raw bytes per Clause 20.1.9. :param data: Raw PDU bytes (at least 3 bytes). :returns: Decoded :class:`AbortPDU`. :raises ValueError: If *data* is too short. """ if len(data) < 3: msg = f"AbortPDU too short: need at least 3 bytes, got {len(data)}" raise ValueError(msg) byte0 = data[0] return AbortPDU( sent_by_server=bool(byte0 & 0x01), invoke_id=data[1], abort_reason=AbortReason(data[2]), )