Source code for bac_py.app.application

"""BACnet application layer orchestrator per ASHRAE 135-2016."""

from __future__ import annotations

import asyncio
import logging
import struct
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any

from bac_py.app.cov import COVManager
from bac_py.app.event_engine import EventEngine
from bac_py.app.tsm import ClientTSM, ServerTSM
from bac_py.encoding.apdu import (
    AbortPDU,
    ComplexAckPDU,
    ConfirmedRequestPDU,
    ErrorPDU,
    RejectPDU,
    SegmentAckPDU,
    SimpleAckPDU,
    UnconfirmedRequestPDU,
    decode_apdu,
    encode_apdu,
)
from bac_py.network.layer import NetworkLayer
from bac_py.network.router import NetworkRouter, RouterPort
from bac_py.objects.base import ObjectDatabase
from bac_py.segmentation.manager import compute_max_segment_payload
from bac_py.services.base import ServiceRegistry
from bac_py.services.cov import COVNotificationRequest
from bac_py.services.errors import BACnetAbortError, BACnetError, BACnetRejectError
from bac_py.transport.bip import BIPTransport
from bac_py.transport.bip6 import BIP6Transport
from bac_py.types.enums import (
    AbortReason,
    ConfirmedServiceChoice,
    EnableDisable,
    ObjectType,
    RejectReason,
    UnconfirmedServiceChoice,
)
from bac_py.types.primitives import ObjectIdentifier

if TYPE_CHECKING:
    from collections.abc import Callable

    from bac_py.app.tsm import ServerTransaction
    from bac_py.network.address import BACnetAddress, BIP6Address, BIPAddress
    from bac_py.transport.bbmd import BDTEntry
    from bac_py.transport.ethernet import EthernetTransport
    from bac_py.transport.sc import SCTransport, SCTransportConfig

logger = logging.getLogger(__name__)


[docs] @dataclass(frozen=True, slots=True) class DeviceInfo: """Cached peer device capabilities from I-Am responses (Clause 19.4).""" max_apdu_length: int """Maximum APDU length accepted by the peer device.""" segmentation_supported: int """Segmentation support level (Segmentation enum value)."""
[docs] @dataclass class BBMDConfig: """Configuration for BBMD on a router port.""" bdt_entries: list[BDTEntry] = field(default_factory=list) """Initial Broadcast Distribution Table entries (including self). If empty, the BBMD starts with an empty BDT (foreign-device-only mode). """
[docs] @dataclass class RouterPortConfig: """Configuration for a single router port.""" port_id: int network_number: int interface: str = "0.0.0.0" port: int = 0xBAC0 broadcast_address: str = "255.255.255.255" """Directed broadcast address for this port's subnet.""" bbmd_config: BBMDConfig | None = None ipv6: bool = False """Use BACnet/IPv6 (Annex U) transport for this port.""" multicast_address: str = "" """IPv6 multicast group address. Defaults to ``ff02::bac0``.""" vmac: bytes | None = None """3-byte VMAC for IPv6. Auto-generated if ``None``.""" sc_config: SCTransportConfig | None = None """BACnet/SC transport for this router port. Mutually exclusive with ``ipv6``.""" ethernet_interface: str | None = None """Network interface for BACnet Ethernet transport on this router port.""" ethernet_mac: bytes | None = None """Explicit 6-byte MAC for Ethernet transport on this router port."""
[docs] @dataclass class RouterConfig: """Configuration for a router device.""" ports: list[RouterPortConfig] = field(default_factory=list) application_port_id: int = 1
[docs] @dataclass class DeviceConfig: """Configuration for a BACnet device.""" instance_number: int """BACnet device instance number (0-4194302).""" name: str = "bac-py" """Device object name.""" vendor_name: str = "bac-py" """Vendor name string.""" vendor_id: int = 0 """ASHRAE-assigned vendor identifier.""" model_name: str = "bac-py" """Device model name string.""" firmware_revision: str = "" """Firmware revision string. Defaults to the bac-py package version.""" application_software_version: str = "" """Application software version string. Defaults to the bac-py package version.""" interface: str = "0.0.0.0" """Local IP address to bind to (``"0.0.0.0"`` for all).""" port: int = 0xBAC0 """UDP port number (default ``0xBAC0`` / 47808).""" apdu_timeout: int = 6000 # milliseconds """APDU timeout in milliseconds.""" apdu_segment_timeout: int = 2000 # milliseconds """Segment timeout in milliseconds.""" apdu_retries: int = 3 """Maximum number of APDU retries.""" max_apdu_length: int = 1476 """Maximum APDU length in bytes.""" max_segments: int | None = None """Maximum segments accepted, or ``None`` for unlimited.""" router_config: RouterConfig | None = None """Optional router configuration for multi-network mode.""" broadcast_address: str = "255.255.255.255" """Directed broadcast address for this subnet. Default global broadcast works on physical networks; set to the subnet broadcast (e.g. ``"192.168.1.255"``) when running in Docker bridge networks where global broadcast is not routable.""" password: str | None = None """Optional password for DeviceCommunicationControl and ReinitializeDevice services (1-20 characters, per Clause 16.1.3.1 and 16.4.3.4). When set, incoming requests must include a matching password.""" ipv6: bool = False """Use BACnet/IPv6 (Annex U) transport instead of BACnet/IP (Annex J).""" multicast_address: str = "" """IPv6 multicast group address. Defaults to ``ff02::bac0`` when *ipv6* is ``True``. Ignored for IPv4 mode.""" vmac: bytes | None = None """3-byte VMAC for IPv6 transport. Auto-generated if ``None``. Ignored for IPv4 mode.""" sc_config: SCTransportConfig | None = None """BACnet/SC transport configuration. When set, uses SC transport instead of BIP/BIP6. Mutually exclusive with ``ipv6``.""" ethernet_interface: str | None = None """Network interface for BACnet Ethernet (Clause 7) transport, e.g. ``"eth0"``. When set, uses raw 802.3/802.2 Ethernet instead of BIP/BIP6. Mutually exclusive with ``ipv6`` and ``sc_config``.""" ethernet_mac: bytes | None = None """Explicit 6-byte MAC for Ethernet transport. Auto-detected from the interface if ``None``. Required on macOS where auto-detection is not supported.""" def __post_init__(self) -> None: """Fill version defaults and validate mutual exclusion.""" if self.sc_config is not None and self.ipv6: msg = "sc_config and ipv6 are mutually exclusive" raise ValueError(msg) if self.ethernet_interface is not None and self.ipv6: msg = "ethernet_interface and ipv6 are mutually exclusive" raise ValueError(msg) if self.ethernet_interface is not None and self.sc_config is not None: msg = "ethernet_interface and sc_config are mutually exclusive" raise ValueError(msg) if not self.firmware_revision or not self.application_software_version: import bac_py # lazy to avoid circular import if not self.firmware_revision: self.firmware_revision = bac_py.__version__ if not self.application_software_version: self.application_software_version = bac_py.__version__
[docs] @dataclass(frozen=True, slots=True) class ForeignDeviceStatus: """Foreign device registration status. Provides a snapshot of the current registration state when operating as a foreign device via a BBMD. """ bbmd_address: str """BBMD address string (e.g. ``"192.168.1.1:47808"``).""" ttl: int """Registration time-to-live in seconds.""" is_registered: bool """Whether registration is currently active.""" last_result: str | None """Last BVLC result code name, or ``None`` if no response yet."""
[docs] class BACnetApplication: """Central orchestrator connecting all protocol layers. Wires transport, network, TSMs, and service dispatch. """ def __init__(self, config: DeviceConfig) -> None: """Initialise the application from a device configuration. :param config: Device and network parameters for this BACnet device. """ self._config = config self._transport: BIPTransport | BIP6Transport | SCTransport | EthernetTransport | None = ( None ) self._network: NetworkLayer | None = None self._router: NetworkRouter | None = None self._transports: list[BIPTransport | BIP6Transport | SCTransport | EthernetTransport] = [] self._client_tsm: ClientTSM | None = None self._server_tsm: ServerTSM | None = None self._service_registry = ServiceRegistry() self._object_db = ObjectDatabase() self._running = False self._stopped = False self._stop_event: asyncio.Event | None = None self._run_task: asyncio.Task[None] | None = None self._ready_event: asyncio.Event | None = None self._unconfirmed_listeners: dict[int, list[Callable[..., Any]]] = {} self._background_tasks: set[asyncio.Task[None]] = set() self._loop: asyncio.AbstractEventLoop | None = None self._cov_manager: COVManager | None = None self._event_engine: EventEngine | None = None self._cov_callbacks: dict[int, Callable[..., Any]] = {} self._dcc_state: EnableDisable = EnableDisable.ENABLE self._dcc_timer: asyncio.TimerHandle | None = None self._device_info_cache: dict[BACnetAddress, DeviceInfo] = {} @property def object_db(self) -> ObjectDatabase: """The object database for this device.""" return self._object_db @property def service_registry(self) -> ServiceRegistry: """The service handler registry.""" return self._service_registry @property def config(self) -> DeviceConfig: """The device configuration.""" return self._config @property def cov_manager(self) -> COVManager | None: """The COV subscription manager, or None if not started.""" return self._cov_manager @property def event_engine(self) -> EventEngine | None: """The event/alarm evaluation engine, or None if not started.""" return self._event_engine @property def dcc_state(self) -> EnableDisable: """The current DeviceCommunicationControl state.""" return self._dcc_state
[docs] def set_dcc_state( self, state: EnableDisable, duration: int | None = None, ) -> None: """Set the DeviceCommunicationControl state. :param state: New DCC state (ENABLE, DISABLE, or DISABLE_INITIATION). :param duration: Optional duration in minutes. When provided and state is not ENABLE, a timer is set to auto-re-enable after the specified number of minutes. """ # Cancel any existing DCC timer if self._dcc_timer is not None: self._dcc_timer.cancel() self._dcc_timer = None self._dcc_state = state if duration is not None and state != EnableDisable.ENABLE: loop = asyncio.get_running_loop() self._dcc_timer = loop.call_later( duration * 60, self._dcc_timer_expired, )
@property def device_object_identifier(self) -> Any: """The device object identifier for this application. Returns the ObjectIdentifier of the device object from the object database. Used by the COV manager to populate the initiating device identifier in notifications. """ return ObjectIdentifier(ObjectType.DEVICE, self._config.instance_number)
[docs] def get_device_info(self, address: BACnetAddress) -> DeviceInfo | None: """Look up cached peer device capabilities. Returns cached :class:`DeviceInfo` from I-Am responses, or ``None`` if no information is available for *address*. :param address: The peer device address. :returns: Cached device info, or ``None``. """ return self._device_info_cache.get(address)
[docs] async def start(self) -> None: """Start the transport and initialize all layers.""" logger.info( "BACnetApplication starting on %s:%d (instance=%d)", self._config.interface, self._config.port, self._config.instance_number, ) self._stopped = False self._stop_event = asyncio.Event() self._loop = asyncio.get_running_loop() if self._config.router_config: await self._start_router_mode() else: await self._start_non_router_mode() # Common: create TSMs with whichever network sender is active network = self._router or self._network if network is None: msg = "Neither router nor network layer initialized after start" raise RuntimeError(msg) segment_timeout = self._config.apdu_segment_timeout / 1000 self._client_tsm = ClientTSM( network, apdu_timeout=self._config.apdu_timeout / 1000, apdu_retries=self._config.apdu_retries, max_apdu_length=self._config.max_apdu_length, max_segments=self._config.max_segments, segment_timeout=segment_timeout, ) self._server_tsm = ServerTSM( network, segment_timeout=segment_timeout, max_apdu_length=self._config.max_apdu_length, max_segments=self._config.max_segments, ) self._running = True # Initialize COV manager and register notification handlers self._cov_manager = COVManager(self) self._service_registry.register_confirmed( ConfirmedServiceChoice.CONFIRMED_COV_NOTIFICATION, self._handle_confirmed_cov_notification, ) self._service_registry.register_unconfirmed( UnconfirmedServiceChoice.UNCONFIRMED_COV_NOTIFICATION, self._handle_unconfirmed_cov_notification, ) # Initialize event engine and start evaluation loop self._event_engine = EventEngine(self) await self._event_engine.start() # Register I-Am listener for device info caching (Clause 19.4) self._service_registry.register_unconfirmed( UnconfirmedServiceChoice.I_AM, self._handle_i_am_for_cache, ) # Broadcast I-Am on startup per Clause 12.11.13 self._broadcast_i_am() logger.info("BACnetApplication started")
async def _start_non_router_mode(self) -> None: """Start in non-router (simple device) mode.""" if self._config.sc_config is not None: await self._start_sc_mode() elif self._config.ethernet_interface is not None: await self._start_ethernet_mode() elif self._config.ipv6 or ":" in self._config.interface: iface = self._config.interface if self._config.interface != "0.0.0.0" else "::" mcast = self._config.multicast_address or "ff02::bac0" self._transport = BIP6Transport( interface=iface, port=self._config.port, multicast_address=mcast, vmac=self._config.vmac, ) self._network = NetworkLayer(self._transport) self._network.on_receive(self._on_apdu_received) await self._transport.start() else: self._transport = BIPTransport( interface=self._config.interface, port=self._config.port, broadcast_address=self._config.broadcast_address, ) self._network = NetworkLayer(self._transport) self._network.on_receive(self._on_apdu_received) await self._transport.start() async def _start_sc_mode(self) -> None: """Start in BACnet/SC (Annex AB) mode.""" from bac_py.transport.sc import SCTransport sc_config = self._config.sc_config assert sc_config is not None # guaranteed by caller transport = SCTransport(sc_config) self._transport = transport self._network = NetworkLayer(transport, network_number=sc_config.network_number) self._network.on_receive(self._on_apdu_received) await transport.start() if sc_config.primary_hub_uri: connected = await transport.hub_connector.wait_connected( timeout=sc_config.connect_timeout ) if not connected: await transport.stop() raise ConnectionError( f"Failed to connect to SC hub at {sc_config.primary_hub_uri} " f"within {sc_config.connect_timeout}s" ) async def _start_ethernet_mode(self) -> None: """Start in BACnet Ethernet (Clause 7) mode.""" from bac_py.transport.ethernet import EthernetTransport as _EthernetTransport transport = _EthernetTransport( self._config.ethernet_interface, # type: ignore[arg-type] mac_address=self._config.ethernet_mac, ) self._transport = transport self._network = NetworkLayer(transport) self._network.on_receive(self._on_apdu_received) await transport.start() async def _start_router_mode(self) -> None: """Start in router mode with multiple ports.""" if self._config.router_config is None: msg = "Router config is required for router mode" raise RuntimeError(msg) ports: list[RouterPort] = [] for pc in self._config.router_config.ports: transport: BIPTransport | BIP6Transport | SCTransport | EthernetTransport if pc.sc_config is not None: from bac_py.transport.sc import SCTransport as _SCTransport sc_cfg = pc.sc_config sc_transport = _SCTransport(sc_cfg) await sc_transport.start() if sc_cfg.primary_hub_uri: connected = await sc_transport.hub_connector.wait_connected( timeout=sc_cfg.connect_timeout ) if not connected: await sc_transport.stop() raise ConnectionError( f"Failed to connect to SC hub at {sc_cfg.primary_hub_uri} " f"within {sc_cfg.connect_timeout}s" ) transport = sc_transport elif pc.ethernet_interface is not None: from bac_py.transport.ethernet import EthernetTransport as _EthernetTransport transport = _EthernetTransport( pc.ethernet_interface, mac_address=pc.ethernet_mac, ) await transport.start() elif pc.ipv6 or ":" in pc.interface: iface = pc.interface if pc.interface != "0.0.0.0" else "::" mcast = pc.multicast_address or "ff02::bac0" transport = BIP6Transport( interface=iface, port=pc.port, multicast_address=mcast, vmac=pc.vmac, ) await transport.start() else: transport = BIPTransport( interface=pc.interface, port=pc.port, broadcast_address=pc.broadcast_address, ) await transport.start() self._transports.append(transport) # Attach BBMD if configured for this port (BIP/BIP6 only) if pc.bbmd_config is not None and hasattr(transport, "attach_bbmd"): await transport.attach_bbmd(pc.bbmd_config.bdt_entries or None) # type: ignore[arg-type] port = RouterPort( port_id=pc.port_id, network_number=pc.network_number, transport=transport, mac_address=transport.local_mac, max_npdu_length=transport.max_npdu_length, ) ports.append(port) self._router = NetworkRouter( ports, application_port_id=self._config.router_config.application_port_id, application_callback=self._on_apdu_received, ) await self._router.start()
[docs] async def stop(self) -> None: """Stop the application and clean up resources. This method is idempotent -- calling it multiple times is safe. """ if self._stopped: return logger.info("BACnetApplication stopping") self._stopped = True # Shutdown event engine if self._event_engine: await self._event_engine.stop() self._event_engine = None # Shutdown COV manager (cancel subscription timers) if self._cov_manager: self._cov_manager.shutdown() self._cov_manager = None # Cancel DCC timer if self._dcc_timer is not None: self._dcc_timer.cancel() self._dcc_timer = None # Cancel all pending client transactions if self._client_tsm: for txn in self._client_tsm.active_transactions(): if not txn.future.done(): txn.future.cancel() if self._stop_event: self._stop_event.set() # Cancel all background tasks (copy to avoid mutation during iteration) tasks = list(self._background_tasks) for task in tasks: task.cancel() if tasks: await asyncio.gather(*tasks, return_exceptions=True) self._background_tasks.clear() if self._router: await self._router.stop() elif self._transport: await self._transport.stop() # Clear caches and listener registrations to release references self._unconfirmed_listeners.clear() self._device_info_cache.clear() self._running = False logger.info("BACnetApplication stopped")
[docs] async def run(self) -> None: """Start the application and block until stopped.""" await self.start() try: if self._stop_event is not None: await self._stop_event.wait() finally: await self.stop()
async def __aenter__(self) -> BACnetApplication: """Start the application as an async context manager.""" await self.start() return self async def __aexit__(self, *exc_info: object) -> None: """Stop the application when exiting the context.""" await self.stop() # --- Client API --- def _broadcast_i_am(self) -> None: """Broadcast I-Am on startup per Clause 12.11.13. Sends an unsolicited I-Am to the global broadcast address using the device's configuration parameters. """ from bac_py.network.address import GLOBAL_BROADCAST from bac_py.services.who_is import IAmRequest from bac_py.types.enums import PropertyIdentifier, Segmentation # Get segmentation supported from device object if available device_oid = self.device_object_identifier device_obj = self._object_db.get(device_oid) if device_obj is not None: try: segmentation = device_obj.read_property(PropertyIdentifier.SEGMENTATION_SUPPORTED) except Exception: segmentation = Segmentation.BOTH else: segmentation = Segmentation.BOTH iam = IAmRequest( object_identifier=device_oid, max_apdu_length=self._config.max_apdu_length, segmentation_supported=segmentation, vendor_id=self._config.vendor_id, ) self.unconfirmed_request( destination=GLOBAL_BROADCAST, service_choice=UnconfirmedServiceChoice.I_AM, service_data=iam.encode(), ) # --- Foreign device API --- def _parse_bip_address(self, address: str) -> BIPAddress: """Parse a string to a BIPAddress. Accepts ``"host:port"`` or ``"host"`` (default port 47808). """ from bac_py.network.address import BIPAddress as _BIPAddress if ":" in address: host, port_str = address.rsplit(":", 1) return _BIPAddress(host=host, port=int(port_str)) return _BIPAddress(host=address, port=0xBAC0) def _parse_bip6_address(self, address: str) -> BIP6Address: """Parse a string to a BIP6Address. Accepts ``"[host]:port"`` or ``"host"`` (default port 47808). """ from bac_py.network.address import BIP6Address as _BIP6Address if address.startswith("["): # "[host]:port" format bracket_end = address.index("]") host = address[1:bracket_end] rest = address[bracket_end + 1 :] if rest.startswith(":"): return _BIP6Address(host=host, port=int(rest[1:])) return _BIP6Address(host=host, port=0xBAC0) return _BIP6Address(host=address, port=0xBAC0)
[docs] async def register_as_foreign_device( self, bbmd_address: str, ttl: int = 60, ) -> None: """Register as a foreign device with a BBMD. Attaches a foreign device manager to the primary transport and begins periodic re-registration at TTL/2 intervals. :param bbmd_address: Address of the BBMD. For IPv4 use ``"192.168.1.1"`` or ``"192.168.1.1:47808"``. For IPv6 use ``"[fd00::1]:47808"`` or ``"fd00::1"``. :param ttl: Registration time-to-live in seconds. :raises RuntimeError: If already registered, application not started, or running in router mode. """ if self._transport is None: if self._router is not None: msg = "Foreign device registration is not supported in router mode" raise RuntimeError(msg) msg = "Application not started" raise RuntimeError(msg) if not isinstance(self._transport, (BIPTransport, BIP6Transport)): msg = "Foreign device registration is only supported with BIP/BIP6 transport" raise RuntimeError(msg) if self._transport.foreign_device is not None: msg = "Already registered as a foreign device" raise RuntimeError(msg) if isinstance(self._transport, BIP6Transport): addr = self._parse_bip6_address(bbmd_address) await self._transport.attach_foreign_device(addr, ttl) else: bip_addr = self._parse_bip_address(bbmd_address) await self._transport.attach_foreign_device(bip_addr, ttl)
[docs] async def deregister_foreign_device(self) -> None: """Deregister from the BBMD and stop re-registration. Sends a Delete-Foreign-Device-Table-Entry to the BBMD so the entry is removed immediately rather than waiting for TTL expiry. :raises RuntimeError: If not registered as a foreign device. """ if not isinstance(self._transport, (BIPTransport, BIP6Transport)): msg = "Not registered as a foreign device" raise RuntimeError(msg) if self._transport.foreign_device is None: msg = "Not registered as a foreign device" raise RuntimeError(msg) await self._transport.foreign_device.stop() self._transport._foreign_device = None
@property def is_foreign_device(self) -> bool: """Whether this device is currently registered as a foreign device.""" if not isinstance(self._transport, (BIPTransport, BIP6Transport)): return False return ( self._transport.foreign_device is not None and self._transport.foreign_device.is_registered ) @property def foreign_device_status(self) -> ForeignDeviceStatus | None: """Current foreign device registration status. Returns ``None`` if foreign device mode is not active. """ if not isinstance(self._transport, (BIPTransport, BIP6Transport)): return None if self._transport.foreign_device is None: return None fd = self._transport.foreign_device return ForeignDeviceStatus( bbmd_address=f"{fd.bbmd_address.host}:{fd.bbmd_address.port}", ttl=fd.ttl, is_registered=fd.is_registered, last_result=fd.last_result.name if fd.last_result is not None else None, )
[docs] async def wait_for_registration(self, timeout: float = 10.0) -> bool: """Wait for foreign device registration to complete. Blocks until the BBMD confirms registration or the timeout elapses. Useful after :meth:`register_as_foreign_device` to ensure broadcasts will be distributed before performing discovery. :param timeout: Maximum seconds to wait. :returns: ``True`` if registered, ``False`` if timeout expired. """ if not isinstance(self._transport, (BIPTransport, BIP6Transport)): return False if self._transport.foreign_device is None: return False fd = self._transport.foreign_device try: await asyncio.wait_for(fd._registered.wait(), timeout) except TimeoutError: return False return fd.is_registered
# --- Network message API ---
[docs] def send_network_message( self, message_type: int, data: bytes, destination: BACnetAddress | None = None, ) -> None: """Send a network-layer message (non-APDU). :param message_type: Network message type code. :param data: Encoded message payload. :param destination: Target address. If ``None``, broadcasts locally. :raises RuntimeError: If the application is not started or is running in router mode (which has its own message API). """ if self._network is None: msg = "Network layer not available" raise RuntimeError(msg) self._network.send_network_message(message_type, data, destination)
[docs] def add_route(self, network: int, router_address: str) -> None: """Pre-populate the router cache for a remote network. Allows sending to a remote network without broadcast-based router discovery. Useful in Docker or test environments where broadcast delivery is unreliable. :param network: The remote network number. :param router_address: IP address of the router (e.g. ``"172.30.1.150"`` or ``"172.30.1.150:47808"``). :raises RuntimeError: If the application is not started. """ if self._network is None: msg = "Network layer not available" raise RuntimeError(msg) from bac_py.network.address import parse_address addr = parse_address(router_address) self._network.add_route(network, addr.mac_address)
[docs] def register_network_message_handler( self, message_type: int, handler: Callable[..., None], ) -> None: """Register a handler for incoming network-layer messages. :param message_type: Network message type code to listen for. :param handler: Called with ``(decoded_message, source_mac)`` when a matching network message is received. :raises RuntimeError: If the network layer is not available. """ if self._network is None: msg = "Network layer not available" raise RuntimeError(msg) self._network.register_network_message_handler(message_type, handler)
[docs] def unregister_network_message_handler( self, message_type: int, handler: Callable[..., None], ) -> None: """Remove a network-layer message handler. :param message_type: Network message type code. :param handler: The handler to remove. """ if self._network is not None: self._network.unregister_network_message_handler(message_type, handler)
async def _handle_i_am_for_cache( self, service_choice: int, data: bytes, source: BACnetAddress, ) -> None: """Cache peer device info from incoming I-Am responses (Clause 19.4).""" try: from bac_py.services.who_is import IAmRequest iam = IAmRequest.decode(data) # Evict oldest entries if cache exceeds limit to prevent unbounded growth if len(self._device_info_cache) >= 1000: # Remove first 100 entries (FIFO order preserved in Python 3.7+ dicts) for evict_key in list(self._device_info_cache)[:100]: del self._device_info_cache[evict_key] self._device_info_cache[source] = DeviceInfo( max_apdu_length=iam.max_apdu_length, segmentation_supported=int(iam.segmentation_supported), ) logger.debug( "cached device info: instance=%s max_apdu=%d from %s", iam.object_identifier, iam.max_apdu_length, source, ) except Exception: logger.debug("Failed to decode I-Am for cache from %s", source, exc_info=True)
[docs] async def confirmed_request( self, destination: BACnetAddress, service_choice: int, service_data: bytes, timeout: float | None = None, ) -> bytes: """Send a confirmed request and await response. :param destination: Target device address. :param service_choice: Confirmed service choice number. :param service_data: Encoded service request bytes. :param timeout: Optional caller-level timeout in seconds. When provided, the request is cancelled if no response is received within this duration (raises ``asyncio.TimeoutError``). When ``None``, the TSM's built-in retry/timeout logic is used exclusively. :returns: ComplexACK service data, or empty bytes for SimpleACK. """ if self._client_tsm is None: msg = "Application not started" raise RuntimeError(msg) logger.debug("sending confirmed request service=%s to %s", service_choice, destination) # Constrain APDU size to peer capability if cached (Clause 19.4) max_apdu_override: int | None = None device_info = self._device_info_cache.get(destination) if device_info is not None: max_apdu_override = min(self._config.max_apdu_length, device_info.max_apdu_length) coro = self._client_tsm.send_request( service_choice, service_data, destination, max_apdu_override=max_apdu_override, ) if timeout is not None: return await asyncio.wait_for(coro, timeout) return await coro
[docs] def unconfirmed_request( self, destination: BACnetAddress, service_choice: int, service_data: bytes, ) -> None: """Send an unconfirmed request. :param destination: Target device address. :param service_choice: Unconfirmed service choice number. :param service_data: Encoded service request bytes. """ network = self._router or self._network if network is None: msg = "Application not started" raise RuntimeError(msg) # DCC enforcement: suppress outbound unsolicited when DISABLE_INITIATION if self._dcc_state == EnableDisable.DISABLE_INITIATION: logger.debug( "DCC DISABLE_INITIATION: suppressing outbound unconfirmed service %d", service_choice, ) return if self._dcc_state == EnableDisable.DISABLE: logger.debug( "DCC DISABLE: suppressing outbound unconfirmed service %d", service_choice, ) return pdu = UnconfirmedRequestPDU( service_choice=service_choice, service_request=service_data, ) apdu_bytes = encode_apdu(pdu) network.send(apdu_bytes, destination, expecting_reply=False)
[docs] def send_confirmed_cov_notification( self, service_data: bytes, destination: BACnetAddress, service_choice: int, ) -> None: """Send a confirmed COV notification (fire-and-forget). Unlike ``confirmed_request``, this does not await a response. COV notifications are best-effort; failures are logged but do not propagate. :param service_data: Encoded service request bytes. :param destination: Target device address. :param service_choice: Confirmed service choice number. """ self._spawn_task(self._send_confirmed_cov(service_data, destination, service_choice))
async def _send_confirmed_cov( self, service_data: bytes, destination: BACnetAddress, service_choice: int, ) -> None: """Background task to send a confirmed COV notification.""" try: await self.confirmed_request( destination=destination, service_choice=service_choice, service_data=service_data, ) except Exception: logger.debug( "Confirmed COV notification to %s failed", destination, exc_info=True, ) # --- COV callback management ---
[docs] def register_cov_callback( self, process_id: int, callback: Callable[..., Any], ) -> None: """Register a callback for incoming COV notifications. :param process_id: Subscriber process identifier to match. :param callback: Called with ``(notification, source)`` when a COV notification arrives for this process ID. """ self._cov_callbacks[process_id] = callback
[docs] def unregister_cov_callback(self, process_id: int) -> None: """Remove a COV notification callback.""" self._cov_callbacks.pop(process_id, None)
# --- Listener management ---
[docs] def register_temporary_handler( self, service_choice: int, handler: Callable[..., Any], ) -> None: """Register a temporary listener for unconfirmed service responses.""" self._unconfirmed_listeners.setdefault(service_choice, []).append(handler)
[docs] def unregister_temporary_handler( self, service_choice: int, handler: Callable[..., Any], ) -> None: """Remove a temporary listener.""" listeners = self._unconfirmed_listeners.get(service_choice, []) if handler in listeners: listeners.remove(handler)
# --- Receive path --- def _spawn_task(self, coro: Any) -> None: """Create a background task and track it to prevent GC.""" loop = self._loop or asyncio.get_running_loop() task = loop.create_task(coro) self._background_tasks.add(task) task.add_done_callback(self._on_background_task_done) def _on_background_task_done(self, task: asyncio.Task[None]) -> None: """Clean up a finished background task and log unexpected errors.""" self._background_tasks.discard(task) if not task.cancelled() and task.exception() is not None: logger.error("Background task failed: %s", task.exception(), exc_info=task.exception()) def _dcc_timer_expired(self) -> None: """Re-enable communication after DCC timer expires.""" self._dcc_state = EnableDisable.ENABLE self._dcc_timer = None logger.info("DCC timer expired, communication re-enabled") # Services allowed when DCC is DISABLE (per Clause 16.1) _DCC_ALLOWED_SERVICES: frozenset[int] = frozenset( { ConfirmedServiceChoice.DEVICE_COMMUNICATION_CONTROL, ConfirmedServiceChoice.REINITIALIZE_DEVICE, } ) def _on_apdu_received(self, data: bytes, source: BACnetAddress) -> None: """Dispatch received APDU based on PDU type. Routes confirmed requests to the server TSM, unconfirmed requests to the service registry, and responses (simple-ack, complex-ack, error, reject, abort, segment-ack) to the client TSM for correlation with outstanding transactions. """ try: pdu = decode_apdu(data) except (ValueError, IndexError): logger.warning("Dropped malformed APDU from %s", source) return if isinstance(pdu, ConfirmedRequestPDU): if pdu.segmented: self._handle_segmented_request(pdu, source) else: self._spawn_task(self._handle_confirmed_request(pdu, source)) elif isinstance(pdu, UnconfirmedRequestPDU): self._spawn_task(self._handle_unconfirmed_request(pdu, source)) elif isinstance(pdu, SimpleAckPDU): if self._client_tsm: self._client_tsm.handle_simple_ack(source, pdu.invoke_id, pdu.service_choice) elif isinstance(pdu, ComplexAckPDU): if self._client_tsm and pdu.segmented: self._client_tsm.handle_segmented_complex_ack(source, pdu) elif self._client_tsm: self._client_tsm.handle_complex_ack( source, pdu.invoke_id, pdu.service_choice, pdu.service_ack ) elif isinstance(pdu, ErrorPDU): if self._client_tsm: self._client_tsm.handle_error( source, pdu.invoke_id, pdu.error_class, pdu.error_code, pdu.error_data, ) elif isinstance(pdu, RejectPDU): if self._client_tsm: self._client_tsm.handle_reject(source, pdu.invoke_id, pdu.reject_reason) elif isinstance(pdu, AbortPDU): if self._client_tsm: self._client_tsm.handle_abort(source, pdu.invoke_id, pdu.abort_reason) elif isinstance(pdu, SegmentAckPDU): if pdu.sent_by_server: # Server sent ACK -> we are the client receiving it if self._client_tsm: self._client_tsm.handle_segment_ack(source, pdu) else: # Client sent ACK -> we are the server receiving it if self._server_tsm: self._server_tsm.handle_segment_ack_for_response(source, pdu) def _handle_segmented_request( self, pdu: ConfirmedRequestPDU, source: BACnetAddress, ) -> None: """Handle a segmented confirmed request (first or subsequent segment).""" if self._server_tsm is None: return result = self._server_tsm.receive_confirmed_request(pdu, source) if result is None: return # Duplicate or segment processed, waiting for more txn, service_data = result if service_data is not None: # All segments received, dispatch to service handler self._spawn_task(self._dispatch_request(txn, pdu.service_choice, service_data, source)) async def _handle_confirmed_request( self, pdu: ConfirmedRequestPDU, source: BACnetAddress, ) -> None: """Process incoming non-segmented confirmed request through server TSM.""" if self._server_tsm is None: return result = self._server_tsm.receive_confirmed_request(pdu, source) if result is None: return # Duplicate, response already resent txn, service_data = result if service_data is None: return # Should not happen for non-segmented requests await self._dispatch_request(txn, pdu.service_choice, service_data, source) async def _dispatch_request( self, txn: ServerTransaction, service_choice: int, service_data: bytes, source: BACnetAddress, ) -> None: """Dispatch a confirmed request to the service handler and send the response.""" if self._server_tsm is None: return network = self._router or self._network if network is None: return logger.debug("dispatching confirmed service %s from %s", service_choice, source) # DCC enforcement: when DISABLE, only allow DCC and ReinitializeDevice if ( self._dcc_state == EnableDisable.DISABLE and service_choice not in self._DCC_ALLOWED_SERVICES ): logger.debug( "DCC DISABLE: dropping confirmed service %d from %s", service_choice, source, ) return response_pdu: SimpleAckPDU | ComplexAckPDU | ErrorPDU | RejectPDU | AbortPDU try: result = await self._service_registry.dispatch_confirmed( service_choice, service_data, source ) if result is None: response_pdu = SimpleAckPDU( invoke_id=txn.invoke_id, service_choice=service_choice, ) else: # Check if response needs segmentation max_payload = compute_max_segment_payload( txn.client_max_apdu_length, "complex_ack" ) if len(result) > max_payload: # Response is too large for a single APDU; segment it self._server_tsm.start_segmented_response(txn, service_choice, result) return response_pdu = ComplexAckPDU( segmented=False, more_follows=False, invoke_id=txn.invoke_id, sequence_number=None, proposed_window_size=None, service_choice=service_choice, service_ack=result, ) except BACnetError as e: response_pdu = ErrorPDU( invoke_id=txn.invoke_id, service_choice=service_choice, error_class=e.error_class, error_code=e.error_code, error_data=e.error_data, ) except BACnetRejectError as e: if e.reason == RejectReason.UNRECOGNIZED_SERVICE: logger.warning("no handler for confirmed service %s", service_choice) response_pdu = RejectPDU( invoke_id=txn.invoke_id, reject_reason=e.reason, ) except BACnetAbortError as e: response_pdu = AbortPDU( sent_by_server=True, invoke_id=txn.invoke_id, abort_reason=e.reason, ) except (ValueError, IndexError, struct.error): # Malformed service data (truncated, bad encoding, etc.) logger.debug( "Malformed service data for service %d from %s", service_choice, source, exc_info=True, ) response_pdu = RejectPDU( invoke_id=txn.invoke_id, reject_reason=RejectReason.INVALID_PARAMETER_DATA_TYPE, ) except Exception as exc: logger.error("handler error for service %s: %s", service_choice, exc, exc_info=True) response_pdu = AbortPDU( sent_by_server=True, invoke_id=txn.invoke_id, abort_reason=AbortReason.OTHER, ) response_bytes = encode_apdu(response_pdu) network.send(response_bytes, source, expecting_reply=False) self._server_tsm.complete_transaction(txn, response_bytes) async def _handle_unconfirmed_request( self, pdu: Any, source: BACnetAddress, ) -> None: """Dispatch unconfirmed request to handlers.""" if not isinstance(pdu, UnconfirmedRequestPDU): return # DCC enforcement for unconfirmed requests if self._dcc_state == EnableDisable.DISABLE: logger.debug( "DCC DISABLE: dropping unconfirmed service %d from %s", pdu.service_choice, source, ) return if self._dcc_state == EnableDisable.DISABLE_INITIATION and pdu.service_choice not in { UnconfirmedServiceChoice.WHO_IS, UnconfirmedServiceChoice.WHO_HAS, }: logger.debug( "DCC DISABLE_INITIATION: dropping unconfirmed service %d from %s", pdu.service_choice, source, ) return logger.debug("dispatching unconfirmed service %s from %s", pdu.service_choice, source) # Dispatch to permanent handlers await self._service_registry.dispatch_unconfirmed( pdu.service_choice, pdu.service_request, source ) # Dispatch to temporary listeners listeners = self._unconfirmed_listeners.get(pdu.service_choice, []) for listener in listeners: try: listener(pdu.service_request, source) except Exception: logger.debug("Error in unconfirmed listener", exc_info=True) # --- COV notification handlers (client role) --- def _dispatch_cov_notification(self, data: bytes, source: BACnetAddress) -> None: """Decode a COV notification and dispatch to the registered callback.""" notification = COVNotificationRequest.decode(data) callback = self._cov_callbacks.get(notification.subscriber_process_identifier) if callback: try: callback(notification, source) except Exception: logger.debug("Error in COV callback", exc_info=True) async def _handle_confirmed_cov_notification( self, service_choice: int, data: bytes, source: BACnetAddress, ) -> bytes | None: """Handle incoming confirmed COV notification (client role). Dispatches to registered COV callbacks and returns SimpleACK. """ self._dispatch_cov_notification(data, source) return None # SimpleACK async def _handle_unconfirmed_cov_notification( self, service_choice: int, data: bytes, source: BACnetAddress, ) -> None: """Handle incoming unconfirmed COV notification (client role). Dispatches to registered COV callbacks. """ self._dispatch_cov_notification(data, source)