"""BACnet/IPv6 transport using asyncio UDP per Annex U."""
from __future__ import annotations
import asyncio
import logging
import os
import socket
import struct
import time
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
from bac_py.network.address import BIP6Address
from bac_py.transport.bbmd6 import BBMD6Manager, BDT6Entry
from bac_py.transport.bvll_ipv6 import decode_bvll6, encode_bvll6
from bac_py.transport.foreign_device6 import ForeignDevice6Manager
from bac_py.types.enums import Bvlc6Function, Bvlc6ResultCode
if TYPE_CHECKING:
from collections.abc import Callable
logger = logging.getLogger(__name__)
# Default BACnet/IPv6 multicast addresses (Annex U)
MULTICAST_LINK_LOCAL = "ff02::bac0"
MULTICAST_SITE_LOCAL = "ff05::bac0"
# Maximum number of concurrent pending address resolutions (defense-in-depth).
_MAX_PENDING_RESOLUTIONS = 1024
[docs]
@dataclass(slots=True)
class VMACEntry:
"""A cached mapping from VMAC to IPv6 address."""
address: BIP6Address
last_seen: float # monotonic timestamp
[docs]
class VMACCache:
"""VMAC-to-IPv6 address resolution cache with TTL-based eviction."""
def __init__(self, ttl: float = 300.0, max_entries: int = 4096) -> None:
self._entries: dict[bytes, VMACEntry] = {}
self._ttl = ttl
self._max_entries = max_entries
[docs]
def put(self, vmac: bytes, address: BIP6Address) -> None:
"""Add or update a VMAC-to-address mapping."""
if vmac not in self._entries and len(self._entries) >= self._max_entries:
self.evict_stale()
if len(self._entries) >= self._max_entries:
logger.warning("VMAC cache full (%d entries), dropping oldest", self._max_entries)
oldest_key = min(self._entries, key=lambda k: self._entries[k].last_seen)
del self._entries[oldest_key]
self._entries[vmac] = VMACEntry(address=address, last_seen=time.monotonic())
[docs]
def get(self, vmac: bytes) -> BIP6Address | None:
"""Look up an address by VMAC, returning ``None`` if not cached or stale."""
entry = self._entries.get(vmac)
if entry is None:
return None
if time.monotonic() - entry.last_seen > self._ttl:
del self._entries[vmac]
return None
return entry.address
[docs]
def evict_stale(self) -> None:
"""Remove all entries older than the TTL."""
now = time.monotonic()
stale = [k for k, v in self._entries.items() if now - v.last_seen > self._ttl]
for k in stale:
del self._entries[k]
[docs]
def all_entries(self) -> dict[bytes, VMACEntry]:
"""Return all current entries (for diagnostics)."""
return dict(self._entries)
@dataclass(slots=True)
class _PendingResolution:
"""An NPDU queued while waiting for VMAC address resolution."""
npdu: bytes
created: float = field(default_factory=time.monotonic)
class _UDP6Protocol(asyncio.DatagramProtocol):
"""Low-level :class:`~asyncio.DatagramProtocol` for BACnet/IPv6 UDP."""
def __init__(
self,
callback: Callable[[bytes, tuple[str, int, int, int]], None],
connection_lost_callback: Callable[[Exception | None], None] | None = None,
) -> None:
self._callback = callback
self._connection_lost_callback = connection_lost_callback
def datagram_received(self, data: bytes, addr: tuple[str, int, int, int]) -> None: # type: ignore[override]
self._callback(data, addr)
def error_received(self, exc: Exception) -> None:
logger.warning("UDP6 transport error: %s", exc)
def connection_lost(self, exc: Exception | None) -> None:
if exc is not None:
logger.warning("UDP6 connection lost: %s", exc)
else:
logger.debug("UDP6 connection closed")
if self._connection_lost_callback is not None:
self._connection_lost_callback(exc)
[docs]
class BIP6Transport:
"""BACnet/IPv6 transport using asyncio UDP.
Provides send/receive for BACnet/IPv6 datagrams wrapped in BVLL6.
Uses 3-byte VMACs for network-layer addressing and IPv6 multicast
for broadcasts per Annex U.
"""
def __init__(
self,
interface: str = "::",
port: int = 0xBAC0,
multicast_address: str = MULTICAST_LINK_LOCAL,
*,
vmac: bytes | None = None,
vmac_ttl: float = 300.0,
) -> None:
"""Initialize the BACnet/IPv6 transport.
:param interface: Local IPv6 address to bind. ``"::"`` binds all interfaces.
:param port: UDP port number. Defaults to ``0xBAC0`` (47808).
:param multicast_address: IPv6 multicast group for broadcasts.
:param vmac: Explicit 3-byte VMAC. If ``None``, auto-generated on start.
:param vmac_ttl: TTL in seconds for VMAC resolution cache entries.
"""
self._interface = interface
self._port = port
self._multicast_address = multicast_address
self._explicit_vmac = vmac
self._vmac: bytes = b""
self._vmac_cache = VMACCache(ttl=vmac_ttl)
self._protocol: _UDP6Protocol | None = None
self._transport: asyncio.DatagramTransport | None = None
self._receive_callback: Callable[[bytes, bytes], None] | None = None
self._local_address: BIP6Address | None = None
self._pending_resolutions: dict[bytes, list[_PendingResolution]] = {}
self._pending_bvlc: dict[tuple[Bvlc6Function, BIP6Address], asyncio.Future[bytes]] = {}
self._bbmd: BBMD6Manager | None = None
self._foreign_device: ForeignDevice6Manager | None = None
[docs]
async def start(self) -> None:
"""Bind UDP6 socket, generate VMAC, and join multicast group."""
if self._transport is not None:
return
# Generate or use explicit VMAC
if self._explicit_vmac is not None:
if len(self._explicit_vmac) != 3:
msg = "VMAC must be exactly 3 bytes"
raise ValueError(msg)
self._vmac = self._explicit_vmac
else:
self._vmac = os.urandom(3)
loop = asyncio.get_running_loop()
# Create IPv6 UDP endpoint
transport, protocol = await loop.create_datagram_endpoint(
lambda: _UDP6Protocol(self._on_datagram_received, self._on_connection_lost),
local_addr=(self._interface, self._port),
family=socket.AF_INET6,
)
self._transport = transport
self._protocol = protocol
# Discover actual bound address
sock: socket.socket = self._transport.get_extra_info("socket")
addr = sock.getsockname()
host = addr[0]
if host == "::":
host = "::1"
self._local_address = BIP6Address(host=host, port=addr[1])
# Join multicast group
try:
group_bin = socket.inet_pton(socket.AF_INET6, self._multicast_address)
# struct: 16-byte group + 4-byte interface index (0 = default)
mreq = group_bin + struct.pack("@I", 0)
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
except OSError:
logger.warning("Failed to join IPv6 multicast group %s", self._multicast_address)
logger.info(
"BIP6Transport started on [%s]:%d, VMAC=%s",
host,
addr[1],
self._vmac.hex(),
)
[docs]
async def stop(self) -> None:
"""Stop BBMD/FD managers, leave multicast group, and close UDP socket."""
if self._foreign_device is not None:
await self._foreign_device.stop()
self._foreign_device = None
if self._bbmd is not None:
await self._bbmd.stop()
self._bbmd = None
if self._transport is not None:
# Try to leave multicast group
try:
sock: socket.socket = self._transport.get_extra_info("socket")
group_bin = socket.inet_pton(socket.AF_INET6, self._multicast_address)
mreq = group_bin + struct.pack("@I", 0)
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_LEAVE_GROUP, mreq)
except OSError:
pass
self._transport.close()
self._transport = None
self._protocol = None
# Cancel any pending BVLC request futures
for future in self._pending_bvlc.values():
if not future.done():
future.cancel()
self._pending_bvlc.clear()
# Discard any pending address resolution queues
self._pending_resolutions.clear()
# Clear VMAC cache to release stale entries
self._vmac_cache._entries.clear()
logger.info("BIP6Transport stopped")
[docs]
def on_receive(self, callback: Callable[[bytes, bytes], None]) -> None:
"""Register a callback for received NPDU data.
:param callback: Called with ``(npdu_bytes, source_vmac)`` for each
received datagram containing an NPDU. *source_vmac* is the
3-byte VMAC of the sender.
"""
self._receive_callback = callback
[docs]
def send_unicast(self, npdu: bytes, mac_address: bytes) -> None:
"""Send a directed message (Original-Unicast-NPDU).
:param npdu: NPDU bytes to send.
:param mac_address: 3-byte destination VMAC.
"""
if self._transport is None:
msg = "Transport not started"
raise RuntimeError(msg)
dest_addr = self._vmac_cache.get(mac_address)
if dest_addr is None:
# Queue for address resolution
if (
mac_address not in self._pending_resolutions
and len(self._pending_resolutions) >= _MAX_PENDING_RESOLUTIONS
):
logger.warning("BIP6 pending resolution cache full, dropping request")
return
logger.debug("BIP6 VMAC %s not in cache, queuing for resolution", mac_address.hex())
pending = self._pending_resolutions.setdefault(mac_address, [])
if len(pending) >= 16:
logger.warning(
"Pending resolution queue full for VMAC %s, dropping oldest",
mac_address.hex(),
)
pending.pop(0)
pending.append(_PendingResolution(npdu=npdu))
self._send_address_resolution(mac_address)
return
bvll = encode_bvll6(
Bvlc6Function.ORIGINAL_UNICAST_NPDU,
npdu,
source_vmac=self._vmac,
dest_vmac=mac_address,
)
logger.debug(
"BIP6 send unicast %d bytes to [%s]:%d", len(npdu), dest_addr.host, dest_addr.port
)
self._transport.sendto(bvll, (dest_addr.host, dest_addr.port))
[docs]
def send_broadcast(self, npdu: bytes) -> None:
"""Send a local broadcast (Original-Broadcast-NPDU) to the multicast group.
If registered as a foreign device, uses Distribute-Broadcast-NPDU
instead per Annex U. If a BBMD is attached, also forwards to BDT
peers and registered foreign devices.
:param npdu: NPDU bytes to broadcast.
"""
if self._transport is None:
msg = "Transport not started"
raise RuntimeError(msg)
# Foreign devices must use Distribute-Broadcast-NPDU
if self._foreign_device is not None and self._foreign_device.is_registered:
self._foreign_device.send_distribute_broadcast(npdu)
return
bvll = encode_bvll6(
Bvlc6Function.ORIGINAL_BROADCAST_NPDU,
npdu,
source_vmac=self._vmac,
)
logger.debug(
"BIP6 send broadcast %d bytes to [%s]:%d",
len(npdu),
self._multicast_address,
self._port,
)
self._transport.sendto(bvll, (self._multicast_address, self._port))
# If BBMD attached, also forward to peers and foreign devices
if self._bbmd is not None and self._local_address is not None:
self._bbmd.handle_bvlc(
Bvlc6Function.ORIGINAL_BROADCAST_NPDU,
npdu,
self._local_address,
source_vmac=self._vmac,
)
@property
def local_address(self) -> BIP6Address:
"""The local BACnet/IPv6 address of this transport."""
if self._local_address is None:
msg = "Transport not started"
raise RuntimeError(msg)
return self._local_address
@property
def local_mac(self) -> bytes:
"""The 3-byte VMAC address of this port."""
if not self._vmac:
msg = "Transport not started"
raise RuntimeError(msg)
return self._vmac
@property
def max_npdu_length(self) -> int:
"""Maximum NPDU length for BACnet/IPv6 per Clause 6."""
return 1497
@property
def vmac_cache(self) -> VMACCache:
"""The VMAC resolution cache (for diagnostics)."""
return self._vmac_cache
@property
def bbmd(self) -> BBMD6Manager | None:
"""The attached BBMD6 manager, or ``None`` if not configured."""
return self._bbmd
[docs]
async def attach_bbmd(self, bdt_entries: list[BDT6Entry] | None = None) -> BBMD6Manager:
"""Attach an IPv6 BBMD manager to this transport.
Creates and starts a :class:`BBMD6Manager` integrated with this
transport. The BBMD intercepts incoming BVLC6 messages before
they reach the normal receive path, and outgoing broadcasts are
also forwarded to BDT peers and foreign devices.
:param bdt_entries: Optional initial BDT entries.
:returns: The attached :class:`BBMD6Manager` instance.
:raises RuntimeError: If transport not started or BBMD already attached.
"""
if self._transport is None:
msg = "Transport not started"
raise RuntimeError(msg)
if self._bbmd is not None:
msg = "BBMD already attached"
raise RuntimeError(msg)
self._bbmd = BBMD6Manager(
local_address=self.local_address,
local_vmac=self._vmac,
send_callback=self._send_raw,
local_broadcast_callback=self._bbmd_local_deliver,
multicast_send_callback=self._send_multicast,
)
if bdt_entries:
self._bbmd.set_bdt(bdt_entries)
await self._bbmd.start()
logger.info(
"BBMD6 attached to transport [%s]:%d",
self.local_address.host,
self.local_address.port,
)
return self._bbmd
@property
def foreign_device(self) -> ForeignDevice6Manager | None:
"""The attached foreign device manager, or ``None``."""
return self._foreign_device
[docs]
async def attach_foreign_device(
self,
bbmd_address: BIP6Address,
ttl: int,
) -> ForeignDevice6Manager:
"""Attach an IPv6 foreign device manager to this transport.
Creates and starts a :class:`ForeignDevice6Manager` that will
register with the specified BBMD and periodically re-register.
:param bbmd_address: Address of the BBMD to register with.
:param ttl: Time-to-Live for the registration in seconds.
:returns: The attached :class:`ForeignDevice6Manager` instance.
:raises RuntimeError: If transport not started or FD already attached.
"""
if self._transport is None:
msg = "Transport not started"
raise RuntimeError(msg)
if self._foreign_device is not None:
msg = "Foreign device manager already attached"
raise RuntimeError(msg)
self._foreign_device = ForeignDevice6Manager(
bbmd_address=bbmd_address,
ttl=ttl,
send_callback=self._send_raw,
local_vmac=self._vmac,
local_address=self.local_address,
)
await self._foreign_device.start()
logger.info(
"IPv6 foreign device manager attached, registering with BBMD [%s]:%d",
bbmd_address.host,
bbmd_address.port,
)
return self._foreign_device
# ------------------------------------------------------------------
# BBMD / foreign device integration helpers
# ------------------------------------------------------------------
def _send_raw(self, data: bytes, destination: BIP6Address) -> None:
"""Send raw BVLL6 data to a destination (callback for BBMD6/FD6)."""
if self._transport is not None:
self._transport.sendto(data, (destination.host, destination.port))
def _send_multicast(self, data: bytes) -> None:
"""Send raw BVLL6 data to the multicast group (callback for BBMD6)."""
if self._transport is not None:
self._transport.sendto(data, (self._multicast_address, self._port))
def _bbmd_local_deliver(self, npdu: bytes, source_vmac: bytes) -> None:
"""Deliver an NPDU to the local receive callback (BBMD6 callback).
Called by :class:`BBMD6Manager` when a Forwarded-NPDU or
Distribute-Broadcast-NPDU needs to be delivered locally.
"""
if self._receive_callback is not None:
try:
self._receive_callback(npdu, source_vmac)
except Exception:
logger.warning("Error in receive callback", exc_info=True)
# ------------------------------------------------------------------
# Address resolution
# ------------------------------------------------------------------
def _send_address_resolution(self, target_vmac: bytes) -> None:
"""Send an Address-Resolution broadcast for a VMAC."""
if self._transport is None:
return
bvll = encode_bvll6(
Bvlc6Function.ADDRESS_RESOLUTION,
target_vmac,
source_vmac=self._vmac,
)
self._transport.sendto(bvll, (self._multicast_address, self._port))
def _handle_address_resolution(
self,
source_vmac: bytes,
payload: bytes,
sender: BIP6Address,
) -> None:
"""Respond to an Address-Resolution request if the target VMAC matches ours."""
if len(payload) < 3:
return
target_vmac = payload[:3]
# Update cache with sender's VMAC
self._vmac_cache.put(source_vmac, sender)
if target_vmac == self._vmac:
self._send_address_resolution_ack(source_vmac, sender)
def _send_address_resolution_ack(
self,
target_vmac: bytes,
destination: BIP6Address,
) -> None:
"""Send an Address-Resolution-ACK to the requester."""
if self._transport is None:
return
bvll = encode_bvll6(
Bvlc6Function.ADDRESS_RESOLUTION_ACK,
b"",
source_vmac=self._vmac,
dest_vmac=target_vmac,
)
self._transport.sendto(bvll, (destination.host, destination.port))
def _handle_address_resolution_ack(
self,
source_vmac: bytes,
sender: BIP6Address,
) -> None:
"""Process an Address-Resolution-ACK: update cache and flush pending NPDUs."""
logger.debug(
"BIP6 address resolution ACK: VMAC %s -> [%s]:%d",
source_vmac.hex(),
sender.host,
sender.port,
)
self._vmac_cache.put(source_vmac, sender)
self._flush_pending(source_vmac, sender)
def _handle_virtual_address_resolution(
self,
source_vmac: bytes,
sender: BIP6Address,
) -> None:
"""Respond to a Virtual-Address-Resolution request."""
self._vmac_cache.put(source_vmac, sender)
if self._transport is None:
return
bvll = encode_bvll6(
Bvlc6Function.VIRTUAL_ADDRESS_RESOLUTION_ACK,
b"",
source_vmac=self._vmac,
dest_vmac=source_vmac,
)
self._transport.sendto(bvll, (sender.host, sender.port))
def _flush_pending(self, vmac: bytes, address: BIP6Address) -> None:
"""Send any NPDUs queued while waiting for address resolution."""
pending = self._pending_resolutions.pop(vmac, [])
now = time.monotonic()
for item in pending:
# Drop stale entries older than 30 seconds
if now - item.created > 30.0:
continue
bvll = encode_bvll6(
Bvlc6Function.ORIGINAL_UNICAST_NPDU,
item.npdu,
source_vmac=self._vmac,
dest_vmac=vmac,
)
if self._transport is not None:
self._transport.sendto(bvll, (address.host, address.port))
# ------------------------------------------------------------------
# Datagram receive
# ------------------------------------------------------------------
def _on_connection_lost(self, exc: Exception | None) -> None:
self._transport = None
self._protocol = None
def _on_datagram_received(self, data: bytes, addr: tuple[str, int, int, int]) -> None:
"""Process incoming UDP6 datagram."""
try:
msg = decode_bvll6(memoryview(data))
except (ValueError, IndexError):
logger.warning("Dropped malformed BVLL6 from [%s]:%d", addr[0], addr[1])
return
# Fast self-echo check before BIP6Address allocation
if msg.source_vmac and msg.source_vmac == self._vmac:
return
sender = BIP6Address(host=addr[0], port=addr[1])
logger.debug(
"BIP6 recv %d bytes from [%s]:%d func=%s",
len(data),
addr[0],
addr[1],
msg.function.name,
)
# Update resolution cache for any message with a source VMAC
if msg.source_vmac:
self._vmac_cache.put(msg.source_vmac, sender)
# --- BBMD6 intercept ---
if self._bbmd is not None:
# For Forwarded-NPDU the BBMD expects the originating address
if msg.function == Bvlc6Function.FORWARDED_NPDU and msg.originating_address:
bbmd_source = msg.originating_address
else:
bbmd_source = sender
handled = self._bbmd.handle_bvlc(
msg.function, msg.data, bbmd_source, source_vmac=msg.source_vmac
)
if handled:
return
# BBMD returned False -- also needs normal delivery.
# For Forwarded-NPDU the BBMD already delivered via
# local_broadcast_callback so skip to prevent double delivery.
if msg.function == Bvlc6Function.FORWARDED_NPDU:
return
match msg.function:
case Bvlc6Function.ORIGINAL_UNICAST_NPDU:
if self._receive_callback and msg.source_vmac:
try:
self._receive_callback(msg.data, msg.source_vmac)
except Exception:
logger.warning("Error in receive callback", exc_info=True)
case Bvlc6Function.ORIGINAL_BROADCAST_NPDU:
if self._receive_callback and msg.source_vmac:
try:
self._receive_callback(msg.data, msg.source_vmac)
except Exception:
logger.warning("Error in receive callback", exc_info=True)
case Bvlc6Function.FORWARDED_NPDU:
if self._receive_callback and msg.source_vmac:
try:
self._receive_callback(msg.data, msg.source_vmac)
except Exception:
logger.warning("Error in receive callback", exc_info=True)
case Bvlc6Function.ADDRESS_RESOLUTION:
if msg.source_vmac:
self._handle_address_resolution(msg.source_vmac, msg.data, sender)
case Bvlc6Function.FORWARDED_ADDRESS_RESOLUTION:
if msg.source_vmac:
self._handle_address_resolution(msg.source_vmac, msg.data, sender)
case Bvlc6Function.ADDRESS_RESOLUTION_ACK:
if msg.source_vmac:
self._handle_address_resolution_ack(msg.source_vmac, sender)
case Bvlc6Function.VIRTUAL_ADDRESS_RESOLUTION:
if msg.source_vmac:
self._handle_virtual_address_resolution(msg.source_vmac, sender)
case Bvlc6Function.VIRTUAL_ADDRESS_RESOLUTION_ACK:
if msg.source_vmac:
self._vmac_cache.put(msg.source_vmac, sender)
case Bvlc6Function.BVLC_RESULT:
if not self._resolve_pending_bvlc(msg.function, msg.data, sender):
self._handle_bvlc6_result(msg.data, sender)
case Bvlc6Function.REGISTER_FOREIGN_DEVICE:
if self._bbmd is None:
self._send_bvlc6_nak(Bvlc6ResultCode.REGISTER_FOREIGN_DEVICE_NAK, sender)
case Bvlc6Function.DELETE_FOREIGN_DEVICE_TABLE_ENTRY:
if self._bbmd is None:
self._send_bvlc6_nak(
Bvlc6ResultCode.DELETE_FOREIGN_DEVICE_TABLE_ENTRY_NAK, sender
)
case Bvlc6Function.DISTRIBUTE_BROADCAST_NPDU:
if self._bbmd is None:
self._send_bvlc6_nak(
Bvlc6ResultCode.DISTRIBUTE_BROADCAST_TO_NETWORK_NAK, sender
)
case _:
logger.debug(
"Ignoring BVLC6 function %s from [%s]:%d", msg.function, addr[0], addr[1]
)
def _resolve_pending_bvlc(
self, function: Bvlc6Function, data: bytes, source: BIP6Address
) -> bool:
key = (function, source)
future = self._pending_bvlc.get(key)
if future is not None and not future.done():
future.set_result(data)
return True
return False
def _handle_bvlc6_result(self, data: bytes, source: BIP6Address) -> None:
"""Handle a BVLC6-Result not matched by a pending request.
Routes to the ForeignDevice6Manager if the sender matches
the expected BBMD address.
"""
if self._foreign_device is not None and source == self._foreign_device.bbmd_address:
self._foreign_device.handle_bvlc_result(data)
def _send_bvlc6_nak(self, result_code: Bvlc6ResultCode, destination: BIP6Address) -> None:
"""Send a BVLC6-Result NAK."""
if self._transport is None:
return
payload = result_code.to_bytes(2, "big")
bvll = encode_bvll6(Bvlc6Function.BVLC_RESULT, payload, source_vmac=self._vmac)
self._transport.sendto(bvll, (destination.host, destination.port))