"""BACnet Secure Connect (BACnet/SC) transport per ASHRAE 135-2020 Annex AB.
Provides ``SCTransport`` which implements the ``TransportPort`` protocol,
wiring together the hub connector, optional hub function, and optional
direct-connection node switch.
"""
from __future__ import annotations
import asyncio
import contextlib
import logging
import struct
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
from bac_py.transport.sc.bvlc import (
AddressResolutionAckPayload,
AdvertisementPayload,
BvlcResultPayload,
SCMessage,
)
from bac_py.transport.sc.connection import SCConnectionConfig
from bac_py.transport.sc.hub_connector import SCHubConnector, SCHubConnectorConfig
from bac_py.transport.sc.hub_function import SCHubConfig, SCHubFunction
from bac_py.transport.sc.node_switch import SCNodeSwitch, SCNodeSwitchConfig
from bac_py.transport.sc.tls import SCTLSConfig
from bac_py.transport.sc.types import BvlcSCFunction, SCResultCode
from bac_py.transport.sc.vmac import SCVMAC, DeviceUUID
if TYPE_CHECKING:
from collections.abc import Callable, Coroutine
logger = logging.getLogger(__name__)
_DEBUG = logging.DEBUG
[docs]
@dataclass
class SCTransportConfig:
"""Configuration for a BACnet/SC transport."""
primary_hub_uri: str = ""
"""WebSocket URI of the primary hub (e.g. ``wss://hub.example.com:4443``)."""
failover_hub_uri: str | None = None
"""WebSocket URI of the failover hub, if configured."""
hub_function_config: SCHubConfig | None = None
"""If this node IS a hub, provide hub function configuration."""
node_switch_config: SCNodeSwitchConfig | None = None
"""Optional direct peer-to-peer connection configuration."""
tls_config: SCTLSConfig = field(default_factory=SCTLSConfig)
"""TLS configuration for hub connections and direct connections."""
connection_config: SCConnectionConfig = field(default_factory=SCConnectionConfig)
"""Timeouts and tuning for individual SC connections."""
vmac: SCVMAC | None = None
"""Local VMAC address (auto-generated if ``None``)."""
device_uuid: DeviceUUID | None = None
"""Device UUID (auto-generated if ``None``)."""
max_bvlc_length: int = 1600
"""Maximum BVLC-SC message length."""
max_npdu_length: int = 1497
"""Maximum NPDU length."""
min_reconnect_time: float = 10.0
"""Minimum reconnect delay in seconds (AB.6.1: fixed range 10..30s)."""
max_reconnect_time: float = 600.0
"""Maximum reconnect delay in seconds."""
network_number: int | None = None
"""Local network number for the SC network. Used by the network layer
for NPDU routing. ``None`` if unknown."""
connect_timeout: float = 15.0
"""Timeout in seconds for initial hub connection during ``start()``."""
[docs]
class SCTransport:
"""BACnet/SC transport implementing the ``TransportPort`` protocol.
Wraps the hub connector (client to primary/failover hub), optional
hub function (if this node is a hub), and optional node switch
(for direct peer-to-peer connections).
"""
def __init__(self, config: SCTransportConfig | None = None) -> None:
self._config = config or SCTransportConfig()
self._vmac = self._config.vmac or SCVMAC.random()
self._uuid = self._config.device_uuid or DeviceUUID.generate()
self._receive_callback: Callable[[bytes, bytes], None] | None = None
self._send_tasks: set[asyncio.Task[None]] = set()
# Cached BVLC-SC headers keyed by destination MAC (6 bytes).
# For unicast: 16-byte header (function + flags + msg_id + orig + dest).
# Avoids SCVMAC/SCMessage creation and encode() on every send.
# Capped at 256 entries to prevent unbounded growth.
self._unicast_header_cache: dict[bytes, bytes] = {}
self._unicast_cache_max = 256
# Broadcast header is fixed (16 bytes: function + flags + msg_id + orig + bcast_dest).
# Per AB.5.3.3, broadcast messages include the Destination VMAC set to the
# broadcast address so receiving nodes can identify the message as broadcast.
# SCControlFlag: ORIGINATING_VMAC(0x08) | DESTINATION_VMAC(0x04) = 0x0C
_bcast_flags = 0x0C # ORIGINATING_VMAC | DESTINATION_VMAC
self._broadcast_header: bytes = (
struct.pack("!BBH", BvlcSCFunction.ENCAPSULATED_NPDU, _bcast_flags, 0)
+ self._vmac.address
+ b"\xff\xff\xff\xff\xff\xff" # broadcast VMAC
)
# Hub connector (client)
self._hub_connector = SCHubConnector(
self._vmac,
self._uuid,
config=SCHubConnectorConfig(
primary_hub_uri=self._config.primary_hub_uri,
failover_hub_uri=self._config.failover_hub_uri,
tls_config=self._config.tls_config,
connection_config=self._config.connection_config,
min_reconnect_time=self._config.min_reconnect_time,
max_reconnect_time=self._config.max_reconnect_time,
max_bvlc_length=self._config.max_bvlc_length,
max_npdu_length=self._config.max_npdu_length,
),
)
self._hub_connector.on_message = self._on_hub_message
# Optional hub function (server)
self._hub_function: SCHubFunction | None = None
if self._config.hub_function_config:
self._hub_function = SCHubFunction(
self._vmac,
self._uuid,
config=self._config.hub_function_config,
)
# Optional node switch (direct connections)
self._node_switch: SCNodeSwitch | None = None
if self._config.node_switch_config:
ns = SCNodeSwitch(
self._vmac,
self._uuid,
config=self._config.node_switch_config,
)
ns.on_message = self._on_direct_message
self._node_switch = ns
@property
def local_mac(self) -> bytes:
"""6-byte VMAC address as raw bytes."""
return self._vmac.address
@property
def max_npdu_length(self) -> int:
"""Maximum NPDU length for SC transport."""
return self._config.max_npdu_length
@property
def hub_connector(self) -> SCHubConnector:
"""The hub connector instance."""
return self._hub_connector
@property
def hub_function(self) -> SCHubFunction | None:
"""The hub function instance (if this node is a hub)."""
return self._hub_function
@property
def node_switch(self) -> SCNodeSwitch | None:
"""The node switch instance (if direct connections enabled)."""
return self._node_switch
[docs]
def on_receive(self, callback: Callable[[bytes, bytes], None]) -> None:
"""Register a callback for incoming NPDUs.
:param callback: Called with ``(npdu_bytes, source_mac)`` for each
received NPDU. *source_mac* is the 6-byte VMAC of the sender.
"""
self._receive_callback = callback
[docs]
async def start(self) -> None:
"""Start the SC transport: hub function, hub connector, node switch."""
tls = self._config.tls_config
if tls.allow_plaintext and not tls.certificate_path:
logger.warning(
"SC transport starting WITHOUT TLS — all traffic will be unencrypted. "
"Configure certificate_path, private_key_path, and ca_certificates_path "
"for production use (ASHRAE 135-2020 Annex AB.7.4)."
)
logger.info("SC transport starting: vmac=%s", self._vmac)
if self._hub_function:
await self._hub_function.start()
if self._config.primary_hub_uri:
await self._hub_connector.start()
if self._node_switch:
await self._node_switch.start()
logger.info("SC transport started")
[docs]
async def stop(self) -> None:
"""Stop the SC transport and release all resources."""
logger.info("SC transport stopping")
if self._node_switch:
with contextlib.suppress(Exception):
await self._node_switch.stop()
with contextlib.suppress(Exception):
await self._hub_connector.stop()
if self._hub_function:
with contextlib.suppress(Exception):
await self._hub_function.stop()
# Cancel and clean up pending send tasks
for task in self._send_tasks:
if not task.done():
task.cancel()
if self._send_tasks:
await asyncio.gather(*self._send_tasks, return_exceptions=True)
self._send_tasks.clear()
logger.info("SC transport stopped")
[docs]
def send_unicast(self, npdu: bytes, mac_address: bytes) -> None:
"""Send an NPDU to a specific VMAC.
Tries direct connection first (if available), then hub.
Uses a per-destination header cache to avoid SCVMAC/SCMessage
creation and encoding overhead on the hot path.
"""
if __debug__ and logger.isEnabledFor(_DEBUG):
logger.debug("SC send unicast: %d bytes to %s", len(npdu), mac_address.hex())
# Try direct connection first (rare path — most traffic goes via hub).
# Per AB.4.2, direct connection messages omit both Originating and
# Destination VMACs. If the direct send fails, we fall back to the
# hub path which includes both VMACs.
if self._node_switch:
dest = SCVMAC(mac_address)
if self._node_switch.has_direct(dest):
direct_msg = SCMessage(
BvlcSCFunction.ENCAPSULATED_NPDU,
message_id=0,
payload=npdu,
)
hub_msg = SCMessage(
BvlcSCFunction.ENCAPSULATED_NPDU,
message_id=0,
originating=self._vmac,
destination=dest,
payload=npdu,
)
self._schedule_send(self._send_direct_or_hub(dest, direct_msg, hub_msg))
return
# Fast path: use cached header + payload concatenation.
# Header is 16 bytes and constant per (source, dest) pair.
header = self._unicast_header_cache.get(mac_address)
if header is None:
# SCControlFlag: ORIGINATING_VMAC(0x08) | DESTINATION_VMAC(0x04) = 0x0C
header = (
struct.pack("!BBH", BvlcSCFunction.ENCAPSULATED_NPDU, 0x0C, 0)
+ self._vmac.address
+ mac_address
)
if len(self._unicast_header_cache) >= self._unicast_cache_max:
self._unicast_header_cache.clear()
self._unicast_header_cache[mac_address] = header
self._schedule_send(self._send_raw_via_hub(header + npdu))
[docs]
def send_broadcast(self, npdu: bytes) -> None:
"""Send an NPDU as a broadcast via the hub.
Uses a pre-computed broadcast header to skip encoding overhead.
"""
if __debug__ and logger.isEnabledFor(_DEBUG):
logger.debug("SC send broadcast: %d bytes", len(npdu))
self._schedule_send(self._send_raw_via_hub(self._broadcast_header + npdu))
def _schedule_send(self, coro: Coroutine[object, object, None]) -> None:
"""Schedule an async send and track the task."""
task = asyncio.create_task(coro)
self._send_tasks.add(task)
task.add_done_callback(self._on_send_task_done)
def _on_send_task_done(self, task: asyncio.Task[None]) -> None:
"""Clean up a finished send task and log unexpected errors."""
self._send_tasks.discard(task)
if not task.cancelled() and task.exception() is not None:
logger.debug("SC send task failed: %s", task.exception())
# ------------------------------------------------------------------
# Internal send helpers
# ------------------------------------------------------------------
async def _send_via_hub(self, msg: SCMessage) -> None:
"""Send a message through the hub connector."""
try:
await self._hub_connector.send(msg)
except ConnectionError:
logger.debug("Hub not connected, message dropped")
async def _send_raw_via_hub(self, data: bytes) -> None:
"""Send pre-encoded bytes through the hub connector."""
try:
await self._hub_connector.send_raw(data)
except ConnectionError:
logger.debug("Hub not connected, message dropped")
async def _send_direct_or_hub(
self, dest: SCVMAC, direct_msg: SCMessage, hub_msg: SCMessage
) -> None:
"""Try direct connection first, fall back to hub.
*direct_msg* omits VMACs per AB.4.2; *hub_msg* includes them.
"""
if self._node_switch:
ok = await self._node_switch.send_direct(dest, direct_msg)
if ok:
return
await self._send_via_hub(hub_msg)
# ------------------------------------------------------------------
# Message receive handlers
# ------------------------------------------------------------------
async def _on_hub_message(self, msg: SCMessage, raw: bytes | None = None) -> None:
"""Handle a message received from the hub connection."""
if msg.function == BvlcSCFunction.ENCAPSULATED_NPDU and msg.payload:
source_mac = msg.originating.address if msg.originating else b"\x00" * 6
if __debug__ and logger.isEnabledFor(_DEBUG):
logger.debug(
"SC recv from hub: %d bytes from %s", len(msg.payload), msg.originating
)
if self._receive_callback:
try:
self._receive_callback(msg.payload, source_mac)
except Exception:
logger.warning("Error in receive callback", exc_info=True)
elif msg.function == BvlcSCFunction.ADDRESS_RESOLUTION_ACK and self._node_switch:
self._node_switch.handle_address_resolution_ack(msg)
elif msg.function == BvlcSCFunction.ADVERTISEMENT_SOLICITATION:
await self._handle_advertisement_solicitation(msg)
elif msg.function == BvlcSCFunction.ADDRESS_RESOLUTION:
await self._handle_address_resolution(msg)
async def _handle_advertisement_solicitation(self, msg: SCMessage) -> None:
"""Respond to Advertisement-Solicitation with an Advertisement (AB.3.2).
The Advertisement contains current hub connection status, whether
this node accepts direct connections, and max BVLC/NPDU lengths.
Advertisement is NOT a response message per AB.3.1.4, so it uses
a fresh message ID (does not copy the solicitation's ID).
"""
accepts_direct = self._node_switch is not None and self._node_switch._config.enable
payload = AdvertisementPayload(
hub_connection_status=self._hub_connector.connection_status,
accept_direct_connections=accepts_direct,
max_bvlc_length=self._config.max_bvlc_length,
max_npdu_length=self._config.max_npdu_length,
).encode()
response = SCMessage(
BvlcSCFunction.ADVERTISEMENT,
message_id=0,
originating=self._vmac,
destination=msg.originating,
payload=payload,
)
try:
await self._hub_connector.send(response)
except ConnectionError:
logger.debug("Hub not connected, Advertisement dropped")
async def _handle_address_resolution(self, msg: SCMessage) -> None:
"""Respond to Address-Resolution (AB.3.3).
If this node accepts direct connections, responds with
Address-Resolution-ACK. Otherwise, responds with BVLC-Result NAK
(optional functionality not supported).
"""
if self._node_switch and self._node_switch._config.enable:
# Respond with ACK containing our direct connection URIs.
# URI list may be empty if our external address is unknown.
ack_payload = AddressResolutionAckPayload(()).encode()
response = SCMessage(
BvlcSCFunction.ADDRESS_RESOLUTION_ACK,
message_id=msg.message_id,
originating=self._vmac,
destination=msg.originating,
payload=ack_payload,
)
else:
# NAK: optional functionality not supported
nak_payload = BvlcResultPayload(
for_function=BvlcSCFunction.ADDRESS_RESOLUTION,
result_code=SCResultCode.NAK,
error_header_marker=0x00,
error_class=7, # COMMUNICATION
error_code=0x0039, # OPTIONAL_FUNCTIONALITY_NOT_SUPPORTED
).encode()
response = SCMessage(
BvlcSCFunction.BVLC_RESULT,
message_id=msg.message_id,
originating=self._vmac,
destination=msg.originating,
payload=nak_payload,
)
try:
await self._hub_connector.send(response)
except ConnectionError:
logger.debug("Hub not connected, Address-Resolution response dropped")
async def _on_direct_message(self, msg: SCMessage, raw: bytes | None = None) -> None:
"""Handle a message received from a direct connection."""
if msg.function == BvlcSCFunction.ENCAPSULATED_NPDU and msg.payload:
source_mac = msg.originating.address if msg.originating else b"\x00" * 6
if __debug__ and logger.isEnabledFor(_DEBUG):
logger.debug(
"SC recv from direct: %d bytes from %s", len(msg.payload), msg.originating
)
if self._receive_callback:
try:
self._receive_callback(msg.payload, source_mac)
except Exception:
logger.warning("Error in receive callback", exc_info=True)
__all__ = [
"SCTransport",
"SCTransportConfig",
]