Source code for bac_py.transport.bvll

"""BVLL (BACnet Virtual Link Layer) encoding and decoding per Annex J."""

from __future__ import annotations

import struct
from dataclasses import dataclass

from bac_py.network.address import BIPAddress
from bac_py.types.enums import BvlcFunction

BVLC_TYPE_BACNET_IP = 0x81
BVLL_HEADER_LENGTH = 4  # Type(1) + Function(1) + Length(2)
_FORWARDED_ADDR_LENGTH = 6  # 4-byte IP + 2-byte port

# Pre-built BvlcFunction lookup tuple indexed by byte value
_BVLC_FUNCTIONS: tuple[BvlcFunction | None, ...] = tuple(
    BvlcFunction(i) if i <= 0x0C else None for i in range(256)
)


[docs] @dataclass(frozen=True, slots=True) class BvllMessage: """Decoded BVLL message.""" function: BvlcFunction data: bytes originating_address: BIPAddress | None = None
def _make_bvll_message( function: BvlcFunction, data: bytes, originating_address: BIPAddress | None, ) -> BvllMessage: """Fast BvllMessage construction bypassing frozen-dataclass ``__init__``.""" obj = object.__new__(BvllMessage) object.__setattr__(obj, "function", function) object.__setattr__(obj, "data", data) object.__setattr__(obj, "originating_address", originating_address) return obj
[docs] def encode_bvll( function: BvlcFunction, payload: bytes, originating_address: BIPAddress | None = None, ) -> bytes: """Encode a complete BVLL message. Uses a single pre-sized bytearray to avoid intermediate allocations. :param function: BVLC function code. :param payload: NPDU payload bytes. :param originating_address: Required for Forwarded-NPDU. :returns: Complete BVLL message bytes ready for UDP transmission. """ if function == BvlcFunction.FORWARDED_NPDU: if originating_address is None: msg = "Forwarded-NPDU requires originating_address" raise ValueError(msg) total = BVLL_HEADER_LENGTH + _FORWARDED_ADDR_LENGTH + len(payload) buf = bytearray(total) buf[0] = BVLC_TYPE_BACNET_IP buf[1] = function struct.pack_into("!H", buf, 2, total) buf[4:10] = originating_address.encode() buf[10:] = payload return bytes(buf) total = BVLL_HEADER_LENGTH + len(payload) buf = bytearray(total) buf[0] = BVLC_TYPE_BACNET_IP buf[1] = function struct.pack_into("!H", buf, 2, total) buf[4:] = payload return bytes(buf)
[docs] def decode_bvll(data: memoryview | bytes) -> BvllMessage: """Decode a BVLL message from raw UDP datagram. :param data: Raw UDP datagram bytes. :returns: Decoded :class:`BvllMessage`. :raises ValueError: If *data* is too short, BVLC type byte is invalid, declared length is inconsistent, or a function-specific payload (e.g. Forwarded-NPDU originating address) is truncated. """ if len(data) < BVLL_HEADER_LENGTH: msg = f"BVLL data too short: need at least {BVLL_HEADER_LENGTH} bytes, got {len(data)}" raise ValueError(msg) if isinstance(data, bytes): data = memoryview(data) if data[0] != BVLC_TYPE_BACNET_IP: msg = f"Invalid BVLC type: {data[0]:#x}" raise ValueError(msg) function = _BVLC_FUNCTIONS[data[1]] if function is None: msg = f"Unknown BVLC function: {data[1]:#x}" raise ValueError(msg) length = (data[2] << 8) | data[3] if length < BVLL_HEADER_LENGTH or length > len(data): msg = f"Invalid BVLL length: declared {length}, actual {len(data)}" raise ValueError(msg) if function == BvlcFunction.FORWARDED_NPDU: if length < BVLL_HEADER_LENGTH + 6: msg = f"Forwarded-NPDU too short: need at least {BVLL_HEADER_LENGTH + 6} bytes, got {length}" raise ValueError(msg) orig_addr = BIPAddress.decode(data[4:10]) return _make_bvll_message(function, bytes(data[10:length]), orig_addr) return _make_bvll_message(function, bytes(data[4:length]), None)