"""BACnet addressing types per ASHRAE 135-2020 Clause 6."""
from __future__ import annotations
import logging
import re
import socket
import struct
from dataclasses import dataclass, field
from functools import lru_cache
from typing import Any
logger = logging.getLogger(__name__)
[docs]
@dataclass(frozen=True, slots=True)
class BIPAddress:
"""A 6-octet BACnet/IP address composed of a 4-byte IPv4 address and a 2-byte UDP port.
Used as the MAC-layer address for BACnet/IP data links (Annex J).
"""
host: str
port: int
_encoded: bytes | None = field(default=None, init=False, repr=False, compare=False, hash=False)
[docs]
def encode(self) -> bytes:
"""Encode this address to the 6-byte BACnet/IP wire format.
:returns: A 6-byte ``bytes`` object (4 octets IP + 2 octets port, big-endian).
"""
cached = self._encoded
if cached is not None:
return cached
result = socket.inet_aton(self.host) + struct.pack("!H", self.port)
object.__setattr__(self, "_encoded", result)
return result
[docs]
@classmethod
def decode(cls, data: bytes | memoryview) -> BIPAddress:
"""Decode a :class:`BIPAddress` from the 6-byte BACnet/IP wire format.
:param data: At least 6 bytes of raw address data.
:returns: The decoded :class:`BIPAddress`.
:raises ValueError: If *data* is shorter than 6 bytes.
"""
if len(data) < 6:
msg = f"BIPAddress requires at least 6 bytes, got {len(data)}"
raise ValueError(msg)
host = f"{data[0]}.{data[1]}.{data[2]}.{data[3]}"
port = int.from_bytes(data[4:6], "big")
return _cached_bip_address(host, port)
[docs]
def to_dict(self) -> dict[str, Any]:
"""Serialize this address to a JSON-friendly dictionary.
:returns: A dict with ``"host"`` and ``"port"`` keys.
"""
return {"host": self.host, "port": self.port}
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> BIPAddress:
"""Reconstruct a :class:`BIPAddress` from a dictionary produced by :meth:`to_dict`.
:param data: Dictionary containing ``"host"`` and ``"port"`` keys.
:returns: The reconstructed :class:`BIPAddress`.
"""
return cls(host=data["host"], port=data["port"])
@lru_cache(maxsize=256)
def _cached_bip_address(host: str, port: int) -> BIPAddress:
"""Return a cached :class:`BIPAddress` instance for the given host and port."""
return BIPAddress(host=host, port=port)
[docs]
@dataclass(frozen=True, slots=True)
class BIP6Address:
"""An 18-octet BACnet/IPv6 address: 16-byte IPv6 + 2-byte UDP port.
Used as the MAC-layer address for BACnet/IPv6 data links (Annex U).
"""
host: str
port: int
_encoded: bytes | None = field(default=None, init=False, repr=False, compare=False, hash=False)
[docs]
def encode(self) -> bytes:
"""Encode to 18-byte wire format (16 octets IPv6 + 2 octets port, big-endian)."""
cached = self._encoded
if cached is not None:
return cached
result = socket.inet_pton(socket.AF_INET6, self.host) + self.port.to_bytes(2, "big")
object.__setattr__(self, "_encoded", result)
return result
[docs]
@classmethod
def decode(cls, data: bytes | memoryview) -> BIP6Address:
"""Decode from 18-byte wire format.
:raises ValueError: If *data* is shorter than 18 bytes.
"""
if len(data) < 18:
msg = f"BIP6Address requires at least 18 bytes, got {len(data)}"
raise ValueError(msg)
host = socket.inet_ntop(socket.AF_INET6, bytes(data[:16]))
port = int.from_bytes(data[16:18], "big")
return cls(host=host, port=port)
[docs]
def to_dict(self) -> dict[str, Any]:
"""Serialize this address to a JSON-friendly dictionary."""
return {"host": self.host, "port": self.port}
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> BIP6Address:
"""Reconstruct a :class:`BIP6Address` from a dictionary produced by :meth:`to_dict`."""
return cls(host=data["host"], port=data["port"])
[docs]
@dataclass(frozen=True, slots=True)
class EthernetAddress:
"""A 6-octet IEEE 802 MAC address for BACnet Ethernet (Clause 7).
Used as the MAC-layer address for BACnet/Ethernet (ISO 8802-3) data links.
"""
mac: bytes
"""6-byte IEEE MAC address."""
def __post_init__(self) -> None:
"""Validate the MAC address length."""
if len(self.mac) != 6:
msg = f"Ethernet MAC must be 6 bytes, got {len(self.mac)}"
raise ValueError(msg)
[docs]
def encode(self) -> bytes:
"""Encode to 6-byte wire format.
:returns: The 6-byte MAC address.
"""
return self.mac
[docs]
@classmethod
def decode(cls, data: bytes | memoryview) -> EthernetAddress:
"""Decode from 6 bytes of MAC address data.
:param data: At least 6 bytes of raw address data.
:returns: The decoded :class:`EthernetAddress`.
"""
return cls(mac=bytes(data[:6]))
[docs]
def to_dict(self) -> dict[str, Any]:
"""Serialize to a JSON-friendly dictionary.
:returns: A dict with a ``"mac"`` key containing colon-separated hex.
"""
return {"mac": ":".join(f"{b:02x}" for b in self.mac)}
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> EthernetAddress:
"""Reconstruct from a dictionary produced by :meth:`to_dict`.
:param data: Dictionary with a ``"mac"`` key.
:returns: The reconstructed :class:`EthernetAddress`.
"""
mac_str: str = data["mac"]
return cls(mac=bytes(int(x, 16) for x in mac_str.split(":")))
def __str__(self) -> str:
"""Format as colon-separated hex (``AA:BB:CC:DD:EE:FF``)."""
return ":".join(f"{b:02x}" for b in self.mac)
[docs]
@dataclass(frozen=True, slots=True)
class BACnetAddress:
"""A full BACnet address: optional network number + MAC address.
Network numbers must be ``None`` (local), 0xFFFF (global broadcast),
or 1-65534 (valid remote network per Clause 6.2.1).
"""
network: int | None = None
mac_address: bytes = b""
def __post_init__(self) -> None:
"""Validate the network number range per Clause 6.2.1."""
if (
self.network is not None
and self.network != 0xFFFF
and (self.network < 1 or self.network > 65534)
):
msg = f"Network number must be 1-65534, got {self.network}"
raise ValueError(msg)
@property
def is_local(self) -> bool:
"""``True`` if this address targets the local network (no DNET specified)."""
return self.network is None
@property
def is_broadcast(self) -> bool:
"""``True`` if this is any type of broadcast address (global, remote, or local)."""
return self.network == 0xFFFF or len(self.mac_address) == 0
@property
def is_global_broadcast(self) -> bool:
"""``True`` if this is a global broadcast (DNET = 0xFFFF)."""
return self.network == 0xFFFF
@property
def is_remote_broadcast(self) -> bool:
"""True if this is a broadcast on a specific remote network.
A remote broadcast has a network number set (not global 0xFFFF)
and an empty MAC address (DLEN=0).
"""
return self.network is not None and self.network != 0xFFFF and len(self.mac_address) == 0
def __str__(self) -> str:
"""Human-readable address string.
Produces strings that round-trip through ``parse_address()``:
- ``"192.168.1.100:47808"`` for local BACnet/IP unicast
- ``"2:192.168.1.100:47808"`` for remote BACnet/IP unicast
- ``"*"`` for global broadcast
- ``"2:*"`` for remote broadcast on network 2
- ``"4352:01"`` for remote non-IP station (e.g. MS/TP behind router)
- ``""`` for local broadcast (no MAC)
"""
if self.is_global_broadcast:
return "*"
if self.is_remote_broadcast:
return f"{self.network}:*"
if len(self.mac_address) == 6:
bip = BIPAddress.decode(self.mac_address)
ip_port = f"{bip.host}:{bip.port}"
if self.network is not None:
return f"{self.network}:{ip_port}"
return ip_port
if len(self.mac_address) == 18:
bip6 = BIP6Address.decode(self.mac_address)
ip_port = f"[{bip6.host}]:{bip6.port}"
if self.network is not None:
return f"{self.network}:{ip_port}"
return ip_port
if self.mac_address:
mac_hex = self.mac_address.hex()
if self.network is not None:
return f"{self.network}:{mac_hex}"
return mac_hex
return ""
[docs]
def to_dict(self) -> dict[str, Any]:
"""Serialize this address to a JSON-friendly dictionary.
:returns: A dict with optional ``"network"`` and ``"mac_address"`` keys.
"""
result: dict[str, Any] = {}
if self.network is not None:
result["network"] = self.network
if self.mac_address:
result["mac_address"] = self.mac_address.hex()
return result
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> BACnetAddress:
"""Reconstruct a :class:`BACnetAddress` from a dictionary produced by :meth:`to_dict`.
:param data: Dictionary with optional ``"network"`` and ``"mac_address"`` keys.
:returns: The reconstructed :class:`BACnetAddress`.
"""
network = data.get("network")
mac_hex = data.get("mac_address", "")
mac = bytes.fromhex(mac_hex) if mac_hex else b""
return cls(network=network, mac_address=mac)
# Convenience constants
LOCAL_BROADCAST = BACnetAddress()
GLOBAL_BROADCAST = BACnetAddress(network=0xFFFF)
[docs]
def remote_broadcast(network: int) -> BACnetAddress:
"""Create a remote broadcast address for a specific network.
A remote broadcast has the DNET set and an empty MAC address (DLEN=0).
:param network: The target network number (1--65534).
:returns: A :class:`BACnetAddress` representing a directed broadcast on *network*.
"""
return BACnetAddress(network=network, mac_address=b"")
[docs]
def remote_station(network: int, mac: bytes) -> BACnetAddress:
"""Create a unicast address for a station on a remote network.
:param network: The target network number (1--65534).
:param mac: The MAC address of the station on that network.
:returns: A :class:`BACnetAddress` with both DNET and DADR set.
"""
return BACnetAddress(network=network, mac_address=mac)
# Default BACnet/IP port
_DEFAULT_PORT = 0xBAC0
# Pattern: optional "network:" prefix, then IP with optional ":port", or "*"
_ADDR_RE = re.compile(
r"^(?:(\d+):)?" # optional network number + colon
r"(?:"
r"(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})" # IPv4 address
r"(?::(\d+))?" # optional :port
r"|(\*)" # OR wildcard broadcast
r")$"
)
# Ethernet MAC pattern: AA:BB:CC:DD:EE:FF (with optional network prefix)
_ETHER_RE = re.compile(
r"^(?:(\d+):)?" # optional network number + colon
r"([0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:"
r"[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2})$"
)
# IPv6 bracket notation: optional "network:" prefix, then [ipv6]:port or [ipv6]
_ADDR6_RE = re.compile(
r"^(?:(\d+):)?" # optional network number + colon
r"\[([^\]]+)\]" # IPv6 address in brackets
r"(?::(\d+))?$" # optional :port
)
# Remote station with arbitrary hex MAC: "NETWORK:HEXMAC"
# Handles MS/TP (1-byte), ARCNET (1-byte), or other non-IP data links
# behind routers. MAC must be even-length hex (at least 1 byte).
_REMOTE_HEX_RE = re.compile(r"^(\d+):([0-9a-fA-F]{2}(?:[0-9a-fA-F]{2})*)$")
[docs]
def parse_address(addr: str | BACnetAddress) -> BACnetAddress:
"""Parse a human-readable address string to a BACnetAddress.
Accepted formats::
"192.168.1.100" -> local BACnet/IP, default port 0xBAC0
"192.168.1.100:47809" -> local BACnet/IP, explicit port
"2:192.168.1.100" -> remote network 2, default port
"2:192.168.1.100:47809" -> remote network 2, explicit port
"[::1]" -> local BACnet/IPv6, default port 0xBAC0
"[::1]:47808" -> local BACnet/IPv6, explicit port
"2:[::1]:47808" -> remote network 2, IPv6
"AA:BB:CC:DD:EE:FF" -> local Ethernet MAC
"2:AA:BB:CC:DD:EE:FF" -> remote Ethernet MAC on network 2
"4352:01" -> remote station with hex MAC (e.g. MS/TP)
"*" -> global broadcast
"2:*" -> remote broadcast on network 2
If already a :class:`BACnetAddress`, returns it unchanged (pass-through).
String results are cached so repeated lookups of the same address
(common in building polling loops) are O(1) after the first call.
:param addr: Address string or existing :class:`BACnetAddress`.
:returns: The parsed :class:`BACnetAddress`.
:raises ValueError: If the format is not recognised.
"""
if isinstance(addr, BACnetAddress):
return addr
return _parse_address_str(addr)
@lru_cache(maxsize=256)
def _parse_address_str(addr: str) -> BACnetAddress:
"""Parse a string address to a :class:`BACnetAddress` (cached)."""
addr = addr.strip()
if not addr:
msg = "Address string must not be empty"
raise ValueError(msg)
# Try Ethernet MAC format (AA:BB:CC:DD:EE:FF)
me = _ETHER_RE.match(addr)
if me:
network_str, mac_str = me.groups()
network = int(network_str) if network_str is not None else None
mac = bytes(int(x, 16) for x in mac_str.split(":"))
if network is not None:
result = remote_station(network, mac)
logger.debug("parse_address: %r -> Ethernet MAC on network %d", addr, network)
return result
result = BACnetAddress(mac_address=mac)
logger.debug("parse_address: %r -> local Ethernet MAC", addr)
return result
# Try IPv6 bracket notation first
m6 = _ADDR6_RE.match(addr)
if m6:
network_str, ipv6, port_str = m6.groups()
network = int(network_str) if network_str is not None else None
port = int(port_str) if port_str else _DEFAULT_PORT
if not (0 <= port <= 65535):
msg = f"Port number out of range: {port}"
logger.warning("parse_address: %r -> %s", addr, msg)
raise ValueError(msg)
# Validate IPv6 address
try:
socket.inet_pton(socket.AF_INET6, ipv6)
except OSError:
msg = f"Invalid IPv6 address: {ipv6!r}"
logger.warning("parse_address: %r -> %s", addr, msg)
raise ValueError(msg) from None
mac = BIP6Address(host=ipv6, port=port).encode()
if network is not None:
result = remote_station(network, mac)
logger.debug("parse_address: %r -> IPv6 on network %d", addr, network)
return result
result = BACnetAddress(mac_address=mac)
logger.debug("parse_address: %r -> local IPv6", addr)
return result
m = _ADDR_RE.match(addr)
if m:
network_str, ip, port_str, wildcard = m.groups()
network = int(network_str) if network_str is not None else None
if wildcard:
# "*" or "N:*"
if network is None:
return GLOBAL_BROADCAST
return remote_broadcast(network)
# IP address with optional port
port = int(port_str) if port_str else _DEFAULT_PORT
if not (0 <= port <= 65535):
msg = f"Port number out of range: {port}"
raise ValueError(msg)
mac = BIPAddress(host=ip, port=port).encode()
if network is not None:
return remote_station(network, mac)
return BACnetAddress(mac_address=mac)
# Try remote station with arbitrary hex MAC (e.g. MS/TP: "4352:01")
mh = _REMOTE_HEX_RE.match(addr)
if mh:
network = int(mh.group(1))
mac = bytes.fromhex(mh.group(2))
logger.debug("parse_address: %r -> remote hex MAC on network %d", addr, network)
return remote_station(network, mac)
msg = (
f"Cannot parse address: {addr!r}. "
"Expected format like '192.168.1.100', '192.168.1.100:47808', "
"'2:192.168.1.100', '[::1]:47808', 'AA:BB:CC:DD:EE:FF', "
"'4352:01' (network:hex_mac), or '*'"
)
logger.warning("parse_address: failed to parse %r", addr)
raise ValueError(msg)