"""BACnet/SC Hub Connector (AB.5.2).
Maintains a persistent connection to a primary hub with automatic
reconnection and failover to a secondary hub when configured.
"""
from __future__ import annotations
import asyncio
import contextlib
import logging
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
from bac_py.transport.sc.connection import (
SCConnection,
SCConnectionConfig,
SCConnectionState,
)
from bac_py.transport.sc.tls import SCTLSConfig, build_client_ssl_context
from bac_py.transport.sc.types import (
SC_HUB_SUBPROTOCOL,
SCHubConnectionStatus,
)
from bac_py.transport.sc.websocket import SCWebSocket
# Aliases for readable status references
_STATUS_NONE = SCHubConnectionStatus.NO_HUB_CONNECTION
_STATUS_PRIMARY = SCHubConnectionStatus.CONNECTED_TO_PRIMARY
_STATUS_FAILOVER = SCHubConnectionStatus.CONNECTED_TO_FAILOVER
if TYPE_CHECKING:
from collections.abc import Awaitable, Callable
from bac_py.transport.sc.bvlc import SCMessage
from bac_py.transport.sc.vmac import SCVMAC, DeviceUUID
logger = logging.getLogger(__name__)
[docs]
@dataclass
class SCHubConnectorConfig:
"""Configuration for an SC Hub Connector."""
primary_hub_uri: str = ""
failover_hub_uri: str | None = None
tls_config: SCTLSConfig = field(default_factory=SCTLSConfig)
connection_config: SCConnectionConfig = field(default_factory=SCConnectionConfig)
min_reconnect_time: float = 10.0
max_reconnect_time: float = 600.0
max_bvlc_length: int = 1600
max_npdu_length: int = 1497
[docs]
class SCHubConnector:
"""BACnet/SC Hub Connector (AB.5.2).
Maintains a persistent connection to the primary hub with automatic
reconnection and failover to a secondary hub when configured.
"""
def __init__(
self,
local_vmac: SCVMAC,
local_uuid: DeviceUUID,
config: SCHubConnectorConfig | None = None,
) -> None:
self._config = config or SCHubConnectorConfig()
self._local_vmac = local_vmac
self._local_uuid = local_uuid
self._connection: SCConnection | None = None
self._connected_to: SCHubConnectionStatus = _STATUS_NONE
self._reconnect_delay: float = self._config.min_reconnect_time
self._running = False
self._connect_task: asyncio.Task[None] | None = None
self._connected_event = asyncio.Event()
self._ssl_ctx = build_client_ssl_context(self._config.tls_config)
# Callbacks
self.on_message: Callable[[SCMessage, bytes | None], Awaitable[None] | None] | None = None
self.on_status_change: Callable[[SCHubConnectionStatus], None] | None = None
@property
def is_connected(self) -> bool:
"""Whether the connector has an active hub connection."""
return (
self._connection is not None and self._connection.state == SCConnectionState.CONNECTED
)
@property
def connection_status(self) -> SCHubConnectionStatus:
"""Current hub connection status."""
return self._connected_to
@property
def local_vmac(self) -> SCVMAC:
"""Local VMAC address."""
return self._local_vmac
@local_vmac.setter
def local_vmac(self, value: SCVMAC) -> None:
self._local_vmac = value
if self._connection:
self._connection.local_vmac = value
[docs]
async def start(self) -> None:
"""Start the hub connector, begin connection attempts."""
if self._ssl_ctx is None:
logger.warning(
"SC hub connector starting WITHOUT TLS — hub communication "
"will be unencrypted and unauthenticated"
)
logger.info("SC hub connector starting")
self._running = True
self._connect_task = asyncio.create_task(self._connect_loop())
[docs]
async def stop(self) -> None:
"""Disconnect from hub and stop reconnection loop."""
logger.info("SC hub connector stopping")
self._running = False
if self._connect_task and not self._connect_task.done():
self._connect_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._connect_task
self._connect_task = None
if self._connection:
with contextlib.suppress(Exception):
await self._connection._go_idle()
self._connection = None
self._set_status(_STATUS_NONE)
[docs]
async def send(self, msg: SCMessage) -> None:
"""Send a message to the hub.
:raises ConnectionError: If not connected.
"""
if not self.is_connected or self._connection is None:
err = "Hub connector not connected"
raise ConnectionError(err)
await self._connection.send_message(msg)
[docs]
async def send_raw(self, data: bytes) -> None:
"""Send pre-encoded bytes to the hub.
:raises ConnectionError: If not connected.
"""
if not self.is_connected or self._connection is None:
err = "Hub connector not connected"
raise ConnectionError(err)
await self._connection.send_raw(data)
[docs]
async def wait_connected(self, timeout: float | None = None) -> bool:
"""Wait until the connector is connected to a hub.
:returns: True if connected, False if timeout expired.
"""
try:
if timeout is not None:
async with asyncio.timeout(timeout):
await self._connected_event.wait()
else:
await self._connected_event.wait()
return True
except TimeoutError:
return False
# ------------------------------------------------------------------
# Connection loop
# ------------------------------------------------------------------
async def _connect_loop(self) -> None:
"""Reconnection loop with exponential backoff."""
try:
while self._running:
# Try primary hub
if self._config.primary_hub_uri and await self._try_connect(
self._config.primary_hub_uri,
_STATUS_PRIMARY,
):
self._reset_backoff()
await self._run_until_disconnected()
if not self._running:
break
continue
# Try failover hub
if self._config.failover_hub_uri and await self._try_connect(
self._config.failover_hub_uri,
_STATUS_FAILOVER,
):
self._reset_backoff()
await self._run_until_disconnected()
if not self._running:
break
continue
# Both failed — backoff and retry
logger.warning(
"SC hub connection failed, retrying in %.1fs", self._reconnect_delay
)
await asyncio.sleep(self._reconnect_delay)
self._increase_backoff()
except asyncio.CancelledError:
pass
async def _try_connect(self, uri: str, status: SCHubConnectionStatus) -> bool:
"""Attempt to connect to a hub URI.
:returns: True if connected successfully.
"""
try:
ws = await SCWebSocket.connect(
uri, self._ssl_ctx, SC_HUB_SUBPROTOCOL, max_size=self._config.max_bvlc_length
)
except (OSError, ConnectionError, Exception) as exc:
logger.debug("Failed to connect to %s: %s", uri, exc)
return False
conn = SCConnection(
self._local_vmac,
self._local_uuid,
config=self._config.connection_config,
max_bvlc_length=self._config.max_bvlc_length,
max_npdu_length=self._config.max_npdu_length,
)
connected = asyncio.Event()
vmac_collision = False
def on_connected() -> None:
connected.set()
def on_vmac_collision() -> None:
nonlocal vmac_collision
vmac_collision = True
conn.on_connected = on_connected
conn.on_vmac_collision = on_vmac_collision
conn.on_message = self.on_message
await conn.initiate(ws)
if vmac_collision:
logger.warning("VMAC collision connecting to %s", uri)
await conn._go_idle() # Clean up connection resources
return False
if conn.state != SCConnectionState.CONNECTED:
await conn._go_idle() # Clean up connection resources
return False
conn.on_disconnected = self._on_disconnected
self._connection = conn
self._set_status(status)
self._connected_event.set()
if status == _STATUS_FAILOVER:
logger.info("Failed over to hub: %s", uri)
else:
logger.info("Connected to hub: %s", uri)
return True
async def _run_until_disconnected(self) -> None:
"""Wait until the current connection drops."""
disconnected = asyncio.Event()
def on_disc() -> None:
disconnected.set()
if self._connection:
self._connection.on_disconnected = on_disc
await disconnected.wait()
logger.info("Disconnected from hub")
self._connected_event.clear()
self._connection = None
self._set_status(_STATUS_NONE)
def _on_disconnected(self) -> None:
"""Handle unexpected disconnection."""
self._connected_event.clear()
# ------------------------------------------------------------------
# Backoff
# ------------------------------------------------------------------
def _reset_backoff(self) -> None:
self._reconnect_delay = self._config.min_reconnect_time
def _increase_backoff(self) -> None:
self._reconnect_delay = min(
self._reconnect_delay * 2,
self._config.max_reconnect_time,
)
def _set_status(self, status: SCHubConnectionStatus) -> None:
self._connected_to = status
if self.on_status_change:
self.on_status_change(status)