Source code for bac_py.transport.bip

"""BACnet/IP transport using asyncio UDP per Annex J."""

from __future__ import annotations

import asyncio
import logging
import socket
from typing import TYPE_CHECKING

from bac_py.network.address import BIPAddress, _cached_bip_address
from bac_py.transport.bbmd import BDT_ENTRY_SIZE, FDT_ENTRY_SIZE, BBMDManager, BDTEntry, FDTEntry
from bac_py.transport.bvll import decode_bvll, encode_bvll
from bac_py.transport.foreign_device import ForeignDeviceManager
from bac_py.types.enums import BvlcFunction, BvlcResultCode

if TYPE_CHECKING:
    from collections.abc import Callable

logger = logging.getLogger(__name__)
_DEBUG = logging.DEBUG


[docs] class BvlcNakError(Exception): """Raised when a BVLC-Result NAK is received for a pending request.""" def __init__(self, result_code: int, source: BIPAddress) -> None: self.result_code = result_code self.source = source super().__init__(f"BVLC NAK code {result_code:#06x} from {source}")
# F3: Mapping from BVLC management functions to their NAK result codes. # Non-BBMD devices respond with NAK when receiving these messages. _BVLC_NAK_MAP: dict[BvlcFunction, BvlcResultCode] = { BvlcFunction.WRITE_BROADCAST_DISTRIBUTION_TABLE: BvlcResultCode.WRITE_BROADCAST_DISTRIBUTION_TABLE_NAK, BvlcFunction.READ_BROADCAST_DISTRIBUTION_TABLE: BvlcResultCode.READ_BROADCAST_DISTRIBUTION_TABLE_NAK, BvlcFunction.REGISTER_FOREIGN_DEVICE: BvlcResultCode.REGISTER_FOREIGN_DEVICE_NAK, BvlcFunction.READ_FOREIGN_DEVICE_TABLE: BvlcResultCode.READ_FOREIGN_DEVICE_TABLE_NAK, BvlcFunction.DELETE_FOREIGN_DEVICE_TABLE_ENTRY: BvlcResultCode.DELETE_FOREIGN_DEVICE_TABLE_ENTRY_NAK, BvlcFunction.DISTRIBUTE_BROADCAST_TO_NETWORK: BvlcResultCode.DISTRIBUTE_BROADCAST_TO_NETWORK_NAK, } # Reverse map: NAK result code -> the ACK function that a pending _bvlc_request # would be waiting for. Used to immediately fail pending futures on NAK instead # of waiting for the full timeout. _NAK_TO_PENDING_ACK: dict[int, BvlcFunction] = { BvlcResultCode.READ_BROADCAST_DISTRIBUTION_TABLE_NAK: BvlcFunction.READ_BROADCAST_DISTRIBUTION_TABLE_ACK, BvlcResultCode.READ_FOREIGN_DEVICE_TABLE_NAK: BvlcFunction.READ_FOREIGN_DEVICE_TABLE_ACK, BvlcResultCode.WRITE_BROADCAST_DISTRIBUTION_TABLE_NAK: BvlcFunction.WRITE_BROADCAST_DISTRIBUTION_TABLE, BvlcResultCode.DELETE_FOREIGN_DEVICE_TABLE_ENTRY_NAK: BvlcFunction.DELETE_FOREIGN_DEVICE_TABLE_ENTRY, } def _is_confirmed_request_npdu(npdu: bytes) -> bool: """Check if NPDU data contains a BACnet-Confirmed-Request-PDU. Parses just enough of the NPDU header to reach the APDU type byte. Returns ``False`` for network messages, malformed data, or any APDU type other than Confirmed-Request (type 0). """ if len(npdu) < 3: return False control = npdu[1] if control & 0x80: return False # Network message, not APDU offset = 2 if control & 0x20: # DNET present if offset + 3 > len(npdu): return False dlen = npdu[offset + 2] offset += 3 + dlen if control & 0x08: # SNET present if offset + 3 > len(npdu): return False slen = npdu[offset + 2] offset += 3 + slen if control & 0x20: # Hop count (present when DNET is present) offset += 1 if offset >= len(npdu): return False apdu_type = (npdu[offset] >> 4) & 0x0F return apdu_type == 0 # 0 = BACnet-Confirmed-Request-PDU def _resolve_local_ip() -> str: """Resolve the local machine's IP address. Uses a UDP connect to a public address to determine the outgoing interface IP. Falls back to ``127.0.0.1`` if resolution fails. No actual traffic is sent. """ try: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.connect(("10.255.255.255", 1)) ip: str = s.getsockname()[0] return ip except OSError: return "127.0.0.1" class _UDPProtocol(asyncio.DatagramProtocol): """Low-level :class:`~asyncio.DatagramProtocol` wrapper for BACnet/IP UDP communication.""" def __init__( self, callback: Callable[[bytes, tuple[str, int]], None], connection_lost_callback: Callable[[Exception | None], None] | None = None, ) -> None: self._callback = callback self._connection_lost_callback = connection_lost_callback def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None: """Forward an incoming UDP datagram to the registered callback.""" self._callback(data, addr) def error_received(self, exc: Exception) -> None: """Handle transport errors.""" logger.warning("UDP transport error: %s", exc) def connection_lost(self, exc: Exception | None) -> None: """Handle transport connection loss (interface down, socket closed).""" if exc is not None: logger.warning("UDP connection lost: %s", exc) else: logger.debug("UDP connection closed") if self._connection_lost_callback is not None: self._connection_lost_callback(exc)
[docs] class BIPTransport: """BACnet/IP transport using asyncio UDP. Provides send/receive for BACnet/IP datagrams wrapped in BVLL. """ def __init__( self, interface: str = "0.0.0.0", port: int = 0xBAC0, broadcast_address: str = "255.255.255.255", *, multicast_enabled: bool = False, multicast_address: str = "239.255.186.192", multicast_ttl: int = 32, ) -> None: """Initialize the BACnet/IP transport. :param interface: Local IP address to bind. ``"0.0.0.0"`` binds all interfaces. :param port: UDP port number. Defaults to ``0xBAC0`` (47808). :param broadcast_address: Directed broadcast address for this subnet. :param multicast_enabled: Enable IPv4 multicast per Annex J.8. :param multicast_address: Multicast group address (default ``239.255.186.192``). :param multicast_ttl: Multicast TTL (hop limit). Defaults to 32. """ self._interface = interface self._port = port self._broadcast_address = broadcast_address self._multicast_enabled = multicast_enabled self._multicast_address = multicast_address self._multicast_ttl = multicast_ttl self._protocol: _UDPProtocol | None = None self._transport: asyncio.DatagramTransport | None = None self._receive_callback: Callable[[bytes, bytes], None] | None = None self._local_address: BIPAddress | None = None self._bbmd: BBMDManager | None = None self._foreign_device: ForeignDeviceManager | None = None # F5: Pending BVLC client request futures, keyed by # (expected_response_function, source_address). self._pending_bvlc: dict[tuple[BvlcFunction, BIPAddress], asyncio.Future[bytes]] = {}
[docs] async def start(self) -> None: """Bind UDP socket and start listening.""" if self._transport is not None: return # Already started loop = asyncio.get_running_loop() transport, protocol = await loop.create_datagram_endpoint( lambda: _UDPProtocol(self._on_datagram_received, self._on_connection_lost), local_addr=(self._interface, self._port), allow_broadcast=True, ) self._transport = transport self._protocol = protocol # Discover actual bound address sock = self._transport.get_extra_info("socket") addr: tuple[str, int] = sock.getsockname() host = addr[0] # Update port to actual bound value (matters when port=0 was passed # to let the OS pick an ephemeral port). self._port = addr[1] # Resolve wildcard to actual interface IP so that BBMD BDT # self-comparison and Forwarded-NPDU originating addresses work. if host == "0.0.0.0": host = _resolve_local_ip() self._local_address = BIPAddress(host=host, port=addr[1]) # Join IPv4 multicast group if enabled (Annex J.8) if self._multicast_enabled: try: group = socket.inet_aton(self._multicast_address) iface = socket.inet_aton(self._interface if self._interface != "0.0.0.0" else host) mreq = group + iface sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, self._multicast_ttl) logger.info("Joined multicast group %s", self._multicast_address) except OSError: logger.warning("Failed to join multicast group %s", self._multicast_address) logger.info("BIPTransport started on %s:%d", host, addr[1])
[docs] async def stop(self) -> None: """Close UDP socket and stop BBMD/foreign device if attached.""" if self._foreign_device is not None: await self._foreign_device.stop() self._foreign_device = None if self._bbmd is not None: await self._bbmd.stop() self._bbmd = None # Cancel any pending BVLC request futures for future in self._pending_bvlc.values(): if not future.done(): future.cancel() self._pending_bvlc.clear() if self._transport: # Leave multicast group if joined if self._multicast_enabled: try: sock = self._transport.get_extra_info("socket") host = self._local_address.host if self._local_address else "0.0.0.0" group = socket.inet_aton(self._multicast_address) iface = socket.inet_aton(host) mreq = group + iface sock.setsockopt(socket.IPPROTO_IP, socket.IP_DROP_MEMBERSHIP, mreq) except OSError: pass self._transport.close() self._transport = None self._protocol = None logger.info("BIPTransport stopped")
[docs] def on_receive(self, callback: Callable[[bytes, bytes], None]) -> None: """Register a callback for received NPDU data. :param callback: Called with ``(npdu_bytes, source_mac)`` for each received datagram containing an NPDU. *source_mac* is the 6-byte BACnet/IP MAC (4-byte IP + 2-byte port). """ self._receive_callback = callback
[docs] def send_unicast(self, npdu: bytes, mac_address: bytes) -> None: """Send a directed message (Original-Unicast-NPDU). Parses the 6-byte MAC inline to avoid BIPAddress object creation on the hot path. :param npdu: NPDU bytes to send. :param mac_address: 6-byte destination MAC (4-byte IP + 2-byte port). """ if self._transport is None: msg = "Transport not started" raise RuntimeError(msg) # Inline MAC parsing: 4-byte IP + 2-byte port (avoids BIPAddress allocation) host = f"{mac_address[0]}.{mac_address[1]}.{mac_address[2]}.{mac_address[3]}" port = (mac_address[4] << 8) | mac_address[5] bvll = encode_bvll(BvlcFunction.ORIGINAL_UNICAST_NPDU, npdu) if __debug__ and logger.isEnabledFor(_DEBUG): logger.debug("BIP send unicast %d bytes to %s:%d", len(npdu), host, port) self._transport.sendto(bvll, (host, port))
[docs] def send_broadcast(self, npdu: bytes) -> None: """Send a local broadcast (Original-Broadcast-NPDU). If registered as a foreign device, uses Distribute-Broadcast-To-Network instead per Annex J.5.6. If a BBMD is attached, also forwards to BDT peers and registered foreign devices per Annex J.4.5. :param npdu: NPDU bytes to broadcast. """ if self._transport is None: msg = "Transport not started" raise RuntimeError(msg) # Foreign devices must use Distribute-Broadcast-To-Network # instead of Original-Broadcast-NPDU per Annex J.5.6 if self._foreign_device is not None and self._foreign_device.is_registered: self._foreign_device.send_distribute_broadcast(npdu) return bvll = encode_bvll(BvlcFunction.ORIGINAL_BROADCAST_NPDU, npdu) logger.debug("BIP send broadcast %d bytes", len(npdu)) if self._multicast_enabled: # Send to multicast group per Annex J.8 self._transport.sendto(bvll, (self._multicast_address, self._port)) # Also send to directed broadcast (mixed mode interop with legacy devices) self._transport.sendto(bvll, (self._broadcast_address, self._port)) # If BBMD attached, also forward to peers and foreign devices if self._bbmd is not None: self._bbmd.handle_bvlc(BvlcFunction.ORIGINAL_BROADCAST_NPDU, npdu, self.local_address)
@property def local_address(self) -> BIPAddress: """The local BACnet/IP address of this transport.""" if self._local_address is None: msg = "Transport not started" raise RuntimeError(msg) return self._local_address @property def local_mac(self) -> bytes: """The 6-byte MAC address of this port (4-byte IP + 2-byte port).""" return self.local_address.encode() @property def max_npdu_length(self) -> int: """Maximum NPDU length for BACnet/IP per Clause 6.""" return 1497 @property def bbmd(self) -> BBMDManager | None: """The attached BBMD manager, or ``None`` if not configured.""" return self._bbmd
[docs] async def attach_bbmd(self, bdt_entries: list[BDTEntry] | None = None) -> BBMDManager: """Attach a BBMD manager to this transport. Creates and starts a :class:`BBMDManager` integrated with this transport. The BBMD intercepts incoming BVLC messages before they reach the normal receive path, and outgoing broadcasts are also forwarded to BDT peers and foreign devices. Per Annex J.7.1 this allows a single device to combine BBMD and router functionality. :param bdt_entries: Optional initial BDT entries. If ``None``, the BBMD starts with an empty BDT. :returns: The attached :class:`BBMDManager` instance. :raises RuntimeError: If transport not started or BBMD already attached. """ if self._transport is None: msg = "Transport not started" raise RuntimeError(msg) if self._bbmd is not None: msg = "BBMD already attached" raise RuntimeError(msg) self._bbmd = BBMDManager( local_address=self.local_address, send_callback=self._send_raw, local_broadcast_callback=self._bbmd_local_deliver, broadcast_address=BIPAddress(host=self._broadcast_address, port=self._port), ) if bdt_entries: self._bbmd.set_bdt(bdt_entries) await self._bbmd.start() logger.info( "BBMD attached to transport %s:%d", self.local_address.host, self.local_address.port, ) return self._bbmd
@property def foreign_device(self) -> ForeignDeviceManager | None: """The attached foreign device manager, or ``None``.""" return self._foreign_device
[docs] async def attach_foreign_device( self, bbmd_address: BIPAddress, ttl: int, ) -> ForeignDeviceManager: """Attach a foreign device manager to this transport. Creates and starts a :class:`ForeignDeviceManager` that will register with the specified BBMD and periodically re-register to maintain the registration. Incoming BVLC-Result messages will be routed to the manager to track registration status. :param bbmd_address: Address of the BBMD to register with. :param ttl: Time-to-Live for the registration in seconds. :returns: The attached :class:`ForeignDeviceManager` instance. :raises RuntimeError: If transport not started or foreign device already attached. """ if self._transport is None: msg = "Transport not started" raise RuntimeError(msg) if self._foreign_device is not None: msg = "Foreign device manager already attached" raise RuntimeError(msg) self._foreign_device = ForeignDeviceManager( bbmd_address=bbmd_address, ttl=ttl, send_callback=self._send_raw, local_address=self.local_address, ) await self._foreign_device.start() logger.info( "Foreign device manager attached, registering with BBMD %s:%d", bbmd_address.host, bbmd_address.port, ) return self._foreign_device
# ------------------------------------------------------------------ # F5: BBMD client functions # ------------------------------------------------------------------
[docs] async def read_bdt(self, bbmd_address: BIPAddress, *, timeout: float = 5.0) -> list[BDTEntry]: """Read the Broadcast Distribution Table from a remote BBMD. Sends a Read-Broadcast-Distribution-Table request and waits for the Read-BDT-Ack response. :param bbmd_address: Address of the BBMD to query. :param timeout: Seconds to wait for a response. :returns: List of :class:`BDTEntry` instances from the remote BBMD. :raises RuntimeError: If transport not started. :raises BvlcNakError: If the target is not a BBMD. :raises TimeoutError: If no response within *timeout*. """ if self._transport is None: msg = "Transport not started" raise RuntimeError(msg) bvll = encode_bvll(BvlcFunction.READ_BROADCAST_DISTRIBUTION_TABLE, b"") data = await self._bvlc_request( bvll, bbmd_address, BvlcFunction.READ_BROADCAST_DISTRIBUTION_TABLE_ACK, timeout=timeout, ) entries: list[BDTEntry] = [] for i in range(0, len(data), BDT_ENTRY_SIZE): if i + BDT_ENTRY_SIZE > len(data): break entries.append(BDTEntry.decode(data[i : i + BDT_ENTRY_SIZE])) return entries
[docs] async def write_bdt( self, bbmd_address: BIPAddress, entries: list[BDTEntry], *, timeout: float = 5.0, ) -> BvlcResultCode: """Write a Broadcast Distribution Table to a remote BBMD. Sends a Write-Broadcast-Distribution-Table request and waits for the BVLC-Result response. :param bbmd_address: Address of the BBMD to configure. :param entries: :class:`BDTEntry` instances to write. :param timeout: Seconds to wait for a response. :returns: The :class:`BvlcResultCode` from the BBMD. :raises RuntimeError: If transport not started. :raises TimeoutError: If no response within *timeout*. """ if self._transport is None: msg = "Transport not started" raise RuntimeError(msg) payload = b"".join(entry.encode() for entry in entries) bvll = encode_bvll(BvlcFunction.WRITE_BROADCAST_DISTRIBUTION_TABLE, payload) data = await self._bvlc_request( bvll, bbmd_address, BvlcFunction.BVLC_RESULT, timeout=timeout, ) if len(data) >= 2: return BvlcResultCode(int.from_bytes(data[:2], "big")) return BvlcResultCode.WRITE_BROADCAST_DISTRIBUTION_TABLE_NAK
[docs] async def read_fdt(self, bbmd_address: BIPAddress, *, timeout: float = 5.0) -> list[FDTEntry]: """Read the Foreign Device Table from a remote BBMD. Sends a Read-Foreign-Device-Table request and waits for the Read-FDT-Ack response. :param bbmd_address: Address of the BBMD to query. :param timeout: Seconds to wait for a response. :returns: List of :class:`FDTEntry` instances from the remote BBMD. The ``expiry`` field is set to ``0.0`` since it is not meaningful for remote entries; use the ``remaining`` property instead. :raises RuntimeError: If transport not started. :raises BvlcNakError: If the target is not a BBMD. :raises TimeoutError: If no response within *timeout*. """ if self._transport is None: msg = "Transport not started" raise RuntimeError(msg) bvll = encode_bvll(BvlcFunction.READ_FOREIGN_DEVICE_TABLE, b"") data = await self._bvlc_request( bvll, bbmd_address, BvlcFunction.READ_FOREIGN_DEVICE_TABLE_ACK, timeout=timeout, ) entries: list[FDTEntry] = [] for i in range(0, len(data), FDT_ENTRY_SIZE): if i + FDT_ENTRY_SIZE > len(data): break addr = BIPAddress.decode(data[i : i + 6]) ttl = int.from_bytes(data[i + 6 : i + 8], "big") # Wire bytes i+8:i+10 contain remaining-time, not retained for # remote entries since it is only meaningful on the local BBMD. entries.append(FDTEntry(address=addr, ttl=ttl, expiry=0.0)) return entries
[docs] async def delete_fdt_entry( self, bbmd_address: BIPAddress, entry_address: BIPAddress, *, timeout: float = 5.0, ) -> BvlcResultCode: """Delete a Foreign Device Table entry on a remote BBMD. Sends a Delete-Foreign-Device-Table-Entry request and waits for the BVLC-Result response. :param bbmd_address: Address of the BBMD. :param entry_address: Address of the FDT entry to delete. :param timeout: Seconds to wait for a response. :returns: The :class:`BvlcResultCode` from the BBMD. :raises RuntimeError: If transport not started. :raises TimeoutError: If no response within *timeout*. """ if self._transport is None: msg = "Transport not started" raise RuntimeError(msg) bvll = encode_bvll( BvlcFunction.DELETE_FOREIGN_DEVICE_TABLE_ENTRY, entry_address.encode(), ) data = await self._bvlc_request( bvll, bbmd_address, BvlcFunction.BVLC_RESULT, timeout=timeout, ) if len(data) >= 2: return BvlcResultCode(int.from_bytes(data[:2], "big")) return BvlcResultCode.DELETE_FOREIGN_DEVICE_TABLE_ENTRY_NAK
async def _bvlc_request( self, bvll_data: bytes, destination: BIPAddress, expected_response: BvlcFunction, *, timeout: float = 5.0, ) -> bytes: """Send a BVLC request and wait for the expected response. :param bvll_data: Complete BVLL message to send. :param destination: Target BBMD address. :param expected_response: The :class:`BvlcFunction` code expected in reply. :param timeout: Seconds to wait. :returns: The payload data from the response message. :raises TimeoutError: If no response within *timeout*. """ key = (expected_response, destination) loop = asyncio.get_running_loop() future: asyncio.Future[bytes] = loop.create_future() self._pending_bvlc[key] = future try: if self._transport is not None: self._transport.sendto(bvll_data, (destination.host, destination.port)) return await asyncio.wait_for(future, timeout=timeout) finally: self._pending_bvlc.pop(key, None) if not future.done(): future.cancel() # ------------------------------------------------------------------ # BBMD / foreign device integration helpers # ------------------------------------------------------------------ def _send_raw(self, data: bytes, destination: BIPAddress) -> None: """Send raw BVLL data to a destination. Used as the send callback for :class:`BBMDManager` and :class:`ForeignDeviceManager`. """ if self._transport is not None: self._transport.sendto(data, (destination.host, destination.port)) def _bbmd_local_deliver(self, npdu: bytes, source: BIPAddress) -> None: """Deliver an NPDU to the local receive callback (BBMD callback). Called by :class:`BBMDManager` when a Forwarded-NPDU or Distribute-Broadcast-To-Network message needs to be delivered to the application/router layer. """ # F7: Drop confirmed requests received via broadcast. if _is_confirmed_request_npdu(npdu): logger.debug( "Dropped confirmed request via BBMD broadcast from %s:%d", source.host, source.port, ) return if self._receive_callback is not None: try: self._receive_callback(npdu, source.encode()) except Exception: logger.warning("Error in receive callback", exc_info=True) def _on_connection_lost(self, exc: Exception | None) -> None: """Handle UDP connection loss.""" self._transport = None self._protocol = None def _on_datagram_received(self, data: bytes, addr: tuple[str, int]) -> None: """Process incoming UDP datagram. When a BBMD is attached, BVLC messages are first passed through :meth:`BBMDManager.handle_bvlc` before reaching the normal receive path. This ensures BVLC management messages (Register-Foreign-Device, Read/Write-BDT, etc.) are handled exclusively by the BBMD while broadcast NPDUs are still delivered to the application/router layer. """ try: msg = decode_bvll(memoryview(data)) except (ValueError, IndexError): logger.warning("Dropped malformed BVLL from %s:%d", addr[0], addr[1]) return # Fast self-echo check before BIPAddress allocation if self._local_address is not None and ( addr[0] == self._local_address.host and addr[1] == self._local_address.port ): return source = _cached_bip_address(addr[0], addr[1]) if __debug__ and logger.isEnabledFor(_DEBUG): logger.debug( "BIP recv %d bytes from %s:%d func=%s", len(data), addr[0], addr[1], msg.function.name, ) # --- BBMD intercept --- if self._bbmd is not None: # For Forwarded-NPDU the BBMD expects the originating # address as 'source', not the UDP peer address. if msg.function == BvlcFunction.FORWARDED_NPDU and msg.originating_address: bbmd_source = msg.originating_address else: bbmd_source = source handled = self._bbmd.handle_bvlc( msg.function, msg.data, bbmd_source, udp_source=source ) if handled: # Exclusively handled by BBMD (BVLC management messages # or Distribute-Broadcast which already called # _bbmd_local_deliver). return # BBMD returned False -- the message also needs normal # delivery. For Forwarded-NPDU the BBMD already delivered # via local_broadcast_callback so we skip the normal path # to prevent double delivery. if msg.function == BvlcFunction.FORWARDED_NPDU: return # --- F3: Non-BBMD NAK responses --- # When no BBMD is attached, respond with NAK to BVLC management # messages so the sender knows this device is not a BBMD. if self._bbmd is None: nak_code = _BVLC_NAK_MAP.get(msg.function) if nak_code is not None: self._send_bvlc_nak(nak_code, source) return # --- Normal receive path --- match msg.function: case BvlcFunction.ORIGINAL_UNICAST_NPDU: if self._receive_callback: try: self._receive_callback(msg.data, source.encode()) except Exception: logger.warning("Error in receive callback", exc_info=True) case BvlcFunction.ORIGINAL_BROADCAST_NPDU: # F7: Drop confirmed requests received via broadcast. if _is_confirmed_request_npdu(msg.data): logger.debug( "Dropped confirmed request via broadcast from %s:%d", source.host, source.port, ) return if self._receive_callback: try: self._receive_callback(msg.data, source.encode()) except Exception: logger.warning("Error in receive callback", exc_info=True) case BvlcFunction.FORWARDED_NPDU: # F7: Drop confirmed requests received via broadcast. if _is_confirmed_request_npdu(msg.data): logger.debug( "Dropped confirmed request via forwarded broadcast from %s", msg.originating_address or source, ) return if self._receive_callback and msg.originating_address: try: self._receive_callback(msg.data, msg.originating_address.encode()) except Exception: logger.warning("Error in receive callback", exc_info=True) case BvlcFunction.BVLC_RESULT: # F5: Route to pending client request futures first. if not self._resolve_pending_bvlc(msg.function, msg.data, source): self._handle_bvlc_result(msg.data, source) case ( BvlcFunction.READ_BROADCAST_DISTRIBUTION_TABLE_ACK | BvlcFunction.READ_FOREIGN_DEVICE_TABLE_ACK ): # F5: Route ACK responses to pending client requests. if not self._resolve_pending_bvlc(msg.function, msg.data, source): logger.debug("Ignoring unsolicited %s from %s", msg.function, source) case _: logger.debug("Ignoring BVLC function %s from %s", msg.function, source) def _resolve_pending_bvlc( self, function: BvlcFunction, data: bytes, source: BIPAddress ) -> bool: """Try to resolve a pending BVLC client request future. :param function: BVLC function code of the received message. :param data: Payload data of the received message. :param source: Source address of the received message. :returns: ``True`` if a pending future was resolved, ``False`` otherwise. """ key = (function, source) future = self._pending_bvlc.get(key) if future is not None and not future.done(): future.set_result(data) return True return False def _send_bvlc_nak(self, result_code: BvlcResultCode, destination: BIPAddress) -> None: """Send a BVLC-Result NAK to the given destination. Used to reject BVLC management messages when no BBMD is attached. """ if self._transport is None: return payload = result_code.to_bytes(2, "big") bvll = encode_bvll(BvlcFunction.BVLC_RESULT, payload) self._transport.sendto(bvll, (destination.host, destination.port)) def _handle_bvlc_result(self, data: bytes, source: BIPAddress) -> None: """Handle a BVLC-Result message. Routes the result to the attached :class:`ForeignDeviceManager` (if any) so it can track registration state. Non-zero result codes are logged at warning level. When a NAK maps to a pending ``_bvlc_request`` future (e.g. a Read-BDT-NAK for a pending Read-BDT-ACK), the future is failed immediately with :class:`BvlcNakError` instead of waiting for the full timeout. S3: Only routes to the ForeignDeviceManager if the sender matches the expected BBMD address, preventing rogue devices from spoofing registration confirmations. """ if self._foreign_device is not None and source == self._foreign_device.bbmd_address: # S3: Only accept BVLC-Results from the BBMD we registered with. self._foreign_device.handle_bvlc_result(data) if len(data) >= 2: result_code = int.from_bytes(data[:2], "big") if result_code != 0: logger.debug("BVLC-Result NAK: code %d from %s", result_code, source) # Fail any pending _bvlc_request future immediately. ack_func = _NAK_TO_PENDING_ACK.get(result_code) if ack_func is not None: key = (ack_func, source) future = self._pending_bvlc.get(key) if future is not None and not future.done(): future.set_exception(BvlcNakError(result_code, source)) else: logger.debug("BVLC-Result: %d", result_code)