Source code for bac_py.transport.bvll_ipv6

"""BVLL (BACnet Virtual Link Layer) encoding and decoding for BACnet/IPv6 per Annex U."""

from __future__ import annotations

import struct
from dataclasses import dataclass

from bac_py.network.address import BIP6Address
from bac_py.types.enums import Bvlc6Function

BVLC_TYPE_BACNET_IPV6 = 0x82
BVLL6_HEADER_LENGTH = 4  # Type(1) + Function(1) + Length(2)
VMAC_LENGTH = 3
BIP6_ADDRESS_LENGTH = 18  # 16-byte IPv6 + 2-byte port

# Which function codes include source VMAC, dest VMAC, and/or originating address.
_HAS_SOURCE_VMAC = frozenset(
    {
        Bvlc6Function.BVLC_RESULT,
        Bvlc6Function.ORIGINAL_UNICAST_NPDU,
        Bvlc6Function.ORIGINAL_BROADCAST_NPDU,
        Bvlc6Function.FORWARDED_NPDU,
        Bvlc6Function.ADDRESS_RESOLUTION,
        Bvlc6Function.FORWARDED_ADDRESS_RESOLUTION,
        Bvlc6Function.ADDRESS_RESOLUTION_ACK,
        Bvlc6Function.VIRTUAL_ADDRESS_RESOLUTION,
        Bvlc6Function.VIRTUAL_ADDRESS_RESOLUTION_ACK,
        Bvlc6Function.REGISTER_FOREIGN_DEVICE,
        Bvlc6Function.DELETE_FOREIGN_DEVICE_TABLE_ENTRY,
        Bvlc6Function.DISTRIBUTE_BROADCAST_NPDU,
    }
)

_HAS_DEST_VMAC = frozenset(
    {
        Bvlc6Function.ORIGINAL_UNICAST_NPDU,
        Bvlc6Function.ADDRESS_RESOLUTION_ACK,
        Bvlc6Function.VIRTUAL_ADDRESS_RESOLUTION_ACK,
    }
)

_HAS_ORIGINATING_ADDRESS = frozenset(
    {
        Bvlc6Function.FORWARDED_NPDU,
        Bvlc6Function.FORWARDED_ADDRESS_RESOLUTION,
    }
)


[docs] @dataclass(frozen=True, slots=True) class Bvll6Message: """Decoded BACnet/IPv6 BVLL message.""" function: Bvlc6Function data: bytes source_vmac: bytes | None = None dest_vmac: bytes | None = None originating_address: BIP6Address | None = None
[docs] def encode_bvll6( function: Bvlc6Function, payload: bytes, *, source_vmac: bytes | None = None, dest_vmac: bytes | None = None, originating_address: BIP6Address | None = None, ) -> bytes: """Encode a complete BACnet/IPv6 BVLL message. Uses a single pre-sized bytearray to avoid intermediate allocations. :param function: BVLC6 function code. :param payload: NPDU payload bytes (or result/registration data). :param source_vmac: 3-byte source VMAC (required for most functions). :param dest_vmac: 3-byte destination VMAC (required for unicast and ACKs). :param originating_address: 18-byte originating IPv6 address (Forwarded-NPDU). :returns: Complete BVLL message bytes ready for UDP transmission. """ # Calculate total size upfront for single allocation size = BVLL6_HEADER_LENGTH + len(payload) has_src = function in _HAS_SOURCE_VMAC has_dst = function in _HAS_DEST_VMAC has_orig = function in _HAS_ORIGINATING_ADDRESS if has_src: if source_vmac is None or len(source_vmac) != VMAC_LENGTH: msg = f"{function.name} requires a 3-byte source VMAC" raise ValueError(msg) size += VMAC_LENGTH if has_dst: if dest_vmac is None or len(dest_vmac) != VMAC_LENGTH: msg = f"{function.name} requires a 3-byte destination VMAC" raise ValueError(msg) size += VMAC_LENGTH orig_bytes: bytes | None = None if has_orig: if originating_address is None: msg = f"{function.name} requires originating_address" raise ValueError(msg) orig_bytes = originating_address.encode() size += len(orig_bytes) buf = bytearray(size) buf[0] = BVLC_TYPE_BACNET_IPV6 buf[1] = function struct.pack_into("!H", buf, 2, size) offset = BVLL6_HEADER_LENGTH if has_src: assert source_vmac is not None # validated above buf[offset : offset + VMAC_LENGTH] = source_vmac offset += VMAC_LENGTH if has_dst: assert dest_vmac is not None # validated above buf[offset : offset + VMAC_LENGTH] = dest_vmac offset += VMAC_LENGTH if orig_bytes is not None: buf[offset : offset + len(orig_bytes)] = orig_bytes offset += len(orig_bytes) buf[offset:] = payload return bytes(buf)
[docs] def decode_bvll6(data: memoryview | bytes) -> Bvll6Message: """Decode a BACnet/IPv6 BVLL message from raw UDP datagram. :param data: Raw UDP datagram bytes. :returns: Decoded :class:`Bvll6Message`. :raises ValueError: If data is too short, type byte is invalid, or payload is truncated. """ if len(data) < BVLL6_HEADER_LENGTH: msg = f"BVLL6 data too short: need at least {BVLL6_HEADER_LENGTH} bytes, got {len(data)}" raise ValueError(msg) if isinstance(data, bytes): data = memoryview(data) if data[0] != BVLC_TYPE_BACNET_IPV6: msg = f"Invalid BVLC type: {data[0]:#x}, expected {BVLC_TYPE_BACNET_IPV6:#x}" raise ValueError(msg) function = Bvlc6Function(data[1]) length = (data[2] << 8) | data[3] if length < BVLL6_HEADER_LENGTH or length > len(data): msg = f"Invalid BVLL6 length: declared {length}, actual {len(data)}" raise ValueError(msg) offset = BVLL6_HEADER_LENGTH source_vmac: bytes | None = None dest_vmac: bytes | None = None originating_address: BIP6Address | None = None if function in _HAS_SOURCE_VMAC: if offset + VMAC_LENGTH > length: msg = f"{function.name} truncated: missing source VMAC" raise ValueError(msg) source_vmac = bytes(data[offset : offset + VMAC_LENGTH]) offset += VMAC_LENGTH if function in _HAS_DEST_VMAC: if offset + VMAC_LENGTH > length: msg = f"{function.name} truncated: missing destination VMAC" raise ValueError(msg) dest_vmac = bytes(data[offset : offset + VMAC_LENGTH]) offset += VMAC_LENGTH if function in _HAS_ORIGINATING_ADDRESS: if offset + BIP6_ADDRESS_LENGTH > length: msg = f"{function.name} truncated: missing originating address" raise ValueError(msg) originating_address = BIP6Address.decode(data[offset : offset + BIP6_ADDRESS_LENGTH]) offset += BIP6_ADDRESS_LENGTH return Bvll6Message( function=function, data=bytes(data[offset:length]), source_vmac=source_vmac, dest_vmac=dest_vmac, originating_address=originating_address, )