Source code for bac_py.network.messages

"""Network layer message encoding and decoding per ASHRAE 135-2024 Clause 6.4.

This module handles the variable-length data payloads of router-related
network layer messages. The NPDU envelope (version, control, addresses,
hop count, message type) is handled by npdu.py; this module handles
only the data that follows the message type byte.
"""

from __future__ import annotations

from dataclasses import dataclass

from bac_py.types.enums import NetworkMessageType, RejectMessageReason

# Maximum number of 2-byte network numbers decoded from a single message.
# BACnet MTU constrains this to ~750 anyway; 512 is conservative.
_MAX_NETWORK_LIST: int = 512

# ---------------------------------------------------------------------------
# Message dataclasses
# ---------------------------------------------------------------------------


[docs] @dataclass(frozen=True, slots=True) class WhoIsRouterToNetwork: """Clause 6.4.1 -- request routing info for a specific or all networks. If network is None, the request is for all reachable networks. """ network: int | None = None
[docs] @dataclass(frozen=True, slots=True) class IAmRouterToNetwork: """Clause 6.4.2 -- list of reachable DNETs.""" networks: tuple[int, ...]
[docs] @dataclass(frozen=True, slots=True) class ICouldBeRouterToNetwork: """Clause 6.4.3 -- half-router advertisement. Performance index: lower value = higher performance. """ network: int performance_index: int
[docs] @dataclass(frozen=True, slots=True) class RejectMessageToNetwork: """Clause 6.4.4 -- routing rejection with reason code.""" reason: RejectMessageReason network: int
[docs] @dataclass(frozen=True, slots=True) class RouterBusyToNetwork: """Clause 6.4.5 -- congestion control, impose. Empty networks list means all networks served by the router. """ networks: tuple[int, ...]
[docs] @dataclass(frozen=True, slots=True) class RouterAvailableToNetwork: """Clause 6.4.6 -- congestion control, lift. Empty networks list means all previously curtailed networks. """ networks: tuple[int, ...]
[docs] @dataclass(frozen=True, slots=True) class RoutingTablePort: """A single port entry within an Initialize-Routing-Table message (Clause 6.5.5).""" network: int port_id: int port_info: bytes = b""
[docs] @dataclass(frozen=True, slots=True) class InitializeRoutingTable: """Clause 6.4.7 -- routing table initialization or query. Empty ports list (Number of Ports = 0) is a query requesting the complete routing table without modification. """ ports: tuple[RoutingTablePort, ...]
[docs] @dataclass(frozen=True, slots=True) class InitializeRoutingTableAck: """Clause 6.4.8 -- routing table initialization response. Contains routing table data when responding to a query. Empty ports list when acknowledging an update. """ ports: tuple[RoutingTablePort, ...]
[docs] @dataclass(frozen=True, slots=True) class EstablishConnectionToNetwork: """Clause 6.4.9 -- instruct half-router to establish PTP connection. Termination time of 0 means the connection is permanent. """ network: int termination_time: int
[docs] @dataclass(frozen=True, slots=True) class DisconnectConnectionToNetwork: """Clause 6.4.10 -- instruct half-router to disconnect PTP connection.""" network: int
[docs] @dataclass(frozen=True, slots=True) class WhatIsNetworkNumber: """Clause 6.4.19 -- request local network number. No payload.""" pass
[docs] @dataclass(frozen=True, slots=True) class NetworkNumberIs: """Clause 6.4.20 -- announce local network number. configured=True means the number was manually configured. configured=False means it was learned from another device. """ network: int configured: bool
# Union type for all network messages handled by this module. NetworkMessage = ( WhoIsRouterToNetwork | IAmRouterToNetwork | ICouldBeRouterToNetwork | RejectMessageToNetwork | RouterBusyToNetwork | RouterAvailableToNetwork | InitializeRoutingTable | InitializeRoutingTableAck | EstablishConnectionToNetwork | DisconnectConnectionToNetwork | WhatIsNetworkNumber | NetworkNumberIs ) # --------------------------------------------------------------------------- # Encoding # ---------------------------------------------------------------------------
[docs] def encode_network_message(msg: NetworkMessage) -> bytes: """Encode the data payload of a network layer message. This encodes only the variable-length data that follows the message type byte in the NPDU. The caller is responsible for constructing the full NPDU with the correct message type. Some message types (e.g. ``WhatIsNetworkNumber``) have no payload, so this function returns ``b""``. See :func:`decode_network_message` for the inverse operation. :param msg: A network message dataclass instance. :returns: Encoded data bytes (may be empty for some message types). :raises TypeError: If the message type is not recognized. """ if isinstance(msg, WhoIsRouterToNetwork): return _encode_who_is_router(msg) if isinstance(msg, IAmRouterToNetwork): return _encode_network_list(msg.networks) if isinstance(msg, ICouldBeRouterToNetwork): return _encode_i_could_be_router(msg) if isinstance(msg, RejectMessageToNetwork): return _encode_reject_message(msg) if isinstance(msg, RouterBusyToNetwork): return _encode_network_list(msg.networks) if isinstance(msg, RouterAvailableToNetwork): return _encode_network_list(msg.networks) if isinstance(msg, InitializeRoutingTable): return _encode_routing_table(msg.ports) if isinstance(msg, InitializeRoutingTableAck): return _encode_routing_table(msg.ports) if isinstance(msg, EstablishConnectionToNetwork): return _encode_establish_connection(msg) if isinstance(msg, DisconnectConnectionToNetwork): return msg.network.to_bytes(2, "big") if isinstance(msg, WhatIsNetworkNumber): return b"" if isinstance(msg, NetworkNumberIs): return _encode_network_number_is(msg) msg_text = f"Unknown network message type: {type(msg).__name__}" raise TypeError(msg_text)
def _encode_who_is_router(msg: WhoIsRouterToNetwork) -> bytes: """Encode Who-Is-Router payload: 2-byte DNET or empty if querying all.""" if msg.network is None: return b"" return msg.network.to_bytes(2, "big") def _encode_network_list(networks: tuple[int, ...]) -> bytes: """Encode a sequence of 2-byte big-endian network numbers.""" buf = bytearray() for net in networks: buf.extend(net.to_bytes(2, "big")) return bytes(buf) def _encode_i_could_be_router(msg: ICouldBeRouterToNetwork) -> bytes: """Encode I-Could-Be-Router payload: 2-byte DNET + 1-byte performance index.""" buf = bytearray() buf.extend(msg.network.to_bytes(2, "big")) buf.append(msg.performance_index & 0xFF) return bytes(buf) def _encode_reject_message(msg: RejectMessageToNetwork) -> bytes: """Encode Reject-Message payload: 1-byte reason + 2-byte DNET.""" buf = bytearray() buf.append(int(msg.reason) & 0xFF) buf.extend(msg.network.to_bytes(2, "big")) return bytes(buf) def _encode_routing_table(ports: tuple[RoutingTablePort, ...]) -> bytes: """Encode a routing table per Clause 6.5.5: count + repeated port entries.""" buf = bytearray() buf.append(len(ports)) for port in ports: buf.extend(port.network.to_bytes(2, "big")) buf.append(port.port_id & 0xFF) buf.append(len(port.port_info) & 0xFF) buf.extend(port.port_info) return bytes(buf) def _encode_establish_connection(msg: EstablishConnectionToNetwork) -> bytes: """Encode Establish-Connection payload: 2-byte DNET + 1-byte termination time.""" buf = bytearray() buf.extend(msg.network.to_bytes(2, "big")) buf.append(msg.termination_time & 0xFF) return bytes(buf) def _encode_network_number_is(msg: NetworkNumberIs) -> bytes: """Encode Network-Number-Is payload: 2-byte network + 1-byte configured flag.""" buf = bytearray() buf.extend(msg.network.to_bytes(2, "big")) buf.append(1 if msg.configured else 0) return bytes(buf) # --------------------------------------------------------------------------- # Decoding # ---------------------------------------------------------------------------
[docs] def decode_network_message(message_type: int, data: bytes | memoryview) -> NetworkMessage: """Decode the data payload of a network layer message. See :func:`encode_network_message` for the inverse operation. :param message_type: The :class:`~bac_py.types.enums.NetworkMessageType` value from the NPDU. :param data: The raw data bytes following the message type byte (may be empty for message types with no payload). :returns: A decoded network message dataclass instance. :raises ValueError: If the message type is not supported or data is malformed. """ if isinstance(data, memoryview): data = bytes(data) mt = message_type if mt == NetworkMessageType.WHO_IS_ROUTER_TO_NETWORK: return _decode_who_is_router(data) if mt == NetworkMessageType.I_AM_ROUTER_TO_NETWORK: return IAmRouterToNetwork(networks=_decode_network_list(data)) if mt == NetworkMessageType.I_COULD_BE_ROUTER_TO_NETWORK: return _decode_i_could_be_router(data) if mt == NetworkMessageType.REJECT_MESSAGE_TO_NETWORK: return _decode_reject_message(data) if mt == NetworkMessageType.ROUTER_BUSY_TO_NETWORK: return RouterBusyToNetwork(networks=_decode_network_list(data)) if mt == NetworkMessageType.ROUTER_AVAILABLE_TO_NETWORK: return RouterAvailableToNetwork(networks=_decode_network_list(data)) if mt == NetworkMessageType.INITIALIZE_ROUTING_TABLE: return InitializeRoutingTable(ports=_decode_routing_table(data)) if mt == NetworkMessageType.INITIALIZE_ROUTING_TABLE_ACK: return InitializeRoutingTableAck(ports=_decode_routing_table(data)) if mt == NetworkMessageType.ESTABLISH_CONNECTION_TO_NETWORK: return _decode_establish_connection(data) if mt == NetworkMessageType.DISCONNECT_CONNECTION_TO_NETWORK: return _decode_disconnect_connection(data) if mt == NetworkMessageType.WHAT_IS_NETWORK_NUMBER: return WhatIsNetworkNumber() if mt == NetworkMessageType.NETWORK_NUMBER_IS: return _decode_network_number_is(data) msg = f"Unsupported network message type: 0x{mt:02X}" raise ValueError(msg)
def _decode_who_is_router(data: bytes) -> WhoIsRouterToNetwork: """Decode Who-Is-Router payload: empty means all networks, otherwise 2-byte DNET.""" if len(data) == 0: return WhoIsRouterToNetwork(network=None) if len(data) < 2: msg = "Who-Is-Router-To-Network data too short" raise ValueError(msg) network = int.from_bytes(data[:2], "big") return WhoIsRouterToNetwork(network=network) def _decode_network_list(data: bytes) -> tuple[int, ...]: """Decode a sequence of 2-byte big-endian network numbers.""" if len(data) % 2 != 0: msg = "Network list data length must be a multiple of 2" raise ValueError(msg) count = len(data) // 2 if count > _MAX_NETWORK_LIST: msg = f"Network list too large: {count} entries (max {_MAX_NETWORK_LIST})" raise ValueError(msg) networks = [] for i in range(0, len(data), 2): networks.append(int.from_bytes(data[i : i + 2], "big")) return tuple(networks) def _decode_i_could_be_router(data: bytes) -> ICouldBeRouterToNetwork: """Decode I-Could-Be-Router payload: 2-byte DNET + 1-byte performance index.""" if len(data) < 3: msg = "I-Could-Be-Router-To-Network data too short" raise ValueError(msg) network = int.from_bytes(data[:2], "big") performance_index = data[2] return ICouldBeRouterToNetwork(network=network, performance_index=performance_index) def _decode_reject_message(data: bytes) -> RejectMessageToNetwork: """Decode Reject-Message payload: 1-byte reason + 2-byte DNET.""" if len(data) < 3: msg = "Reject-Message-To-Network data too short" raise ValueError(msg) reason = RejectMessageReason(data[0]) network = int.from_bytes(data[1:3], "big") return RejectMessageToNetwork(reason=reason, network=network) def _decode_routing_table(data: bytes) -> tuple[RoutingTablePort, ...]: """Decode a routing table per Clause 6.5.5: count + repeated port entries.""" if len(data) < 1: msg = "Routing table data too short" raise ValueError(msg) num_ports = data[0] offset = 1 ports: list[RoutingTablePort] = [] for _ in range(num_ports): if offset + 4 > len(data): msg = "Routing table data truncated" raise ValueError(msg) network = int.from_bytes(data[offset : offset + 2], "big") offset += 2 port_id = data[offset] offset += 1 port_info_len = data[offset] offset += 1 if offset + port_info_len > len(data): msg = "Routing table port info data truncated" raise ValueError(msg) port_info = data[offset : offset + port_info_len] offset += port_info_len ports.append(RoutingTablePort(network=network, port_id=port_id, port_info=port_info)) return tuple(ports) def _decode_establish_connection(data: bytes) -> EstablishConnectionToNetwork: """Decode Establish-Connection payload: 2-byte DNET + 1-byte termination time.""" if len(data) < 3: msg = "Establish-Connection-To-Network data too short" raise ValueError(msg) network = int.from_bytes(data[:2], "big") termination_time = data[2] return EstablishConnectionToNetwork(network=network, termination_time=termination_time) def _decode_disconnect_connection(data: bytes) -> DisconnectConnectionToNetwork: """Decode Disconnect-Connection payload: 2-byte DNET.""" if len(data) < 2: msg = "Disconnect-Connection-To-Network data too short" raise ValueError(msg) network = int.from_bytes(data[:2], "big") return DisconnectConnectionToNetwork(network=network) def _decode_network_number_is(data: bytes) -> NetworkNumberIs: """Decode Network-Number-Is payload: 2-byte network + 1-byte configured flag.""" if len(data) < 3: msg = "Network-Number-Is data too short" raise ValueError(msg) network = int.from_bytes(data[:2], "big") configured = data[2] != 0 return NetworkNumberIs(network=network, configured=configured)