"""BVLC-SC message encoding and decoding per Annex AB.2.
Wire format (minimum 4 bytes)::
Function(1) | Control(1) | MessageID(2) | [OrigVMAC(6)] | [DestVMAC(6)]
| [DestOptions(var)] | [DataOptions(var)] | [Payload(var)]
All multi-octet numeric values are big-endian (most significant octet first).
"""
from __future__ import annotations
import logging
import struct
from dataclasses import dataclass
from bac_py.transport.sc.types import (
SC_HEADER_MIN_LENGTH,
VMAC_LENGTH,
BvlcSCFunction,
SCControlFlag,
SCHeaderOptionType,
SCHubConnectionStatus,
SCResultCode,
)
from bac_py.transport.sc.vmac import SCVMAC, DeviceUUID
logger = logging.getLogger(__name__)
_DEBUG = logging.DEBUG
# ---------------------------------------------------------------------------
# Fast-path decode constants (avoid IntFlag/IntEnum overhead per message)
# ---------------------------------------------------------------------------
# SCControlFlag integer values (AB.2.2)
_ORIGINATING_VMAC = 0x08
_DESTINATION_VMAC = 0x04
_DESTINATION_OPTIONS = 0x02
_DATA_OPTIONS = 0x01
# Pre-built BvlcSCFunction lookup tuple indexed by byte value
_SC_FUNCTIONS: tuple[BvlcSCFunction | None, ...] = tuple(
BvlcSCFunction(i) if i <= 0x0C else None for i in range(256)
)
# ---------------------------------------------------------------------------
# Header Options
# ---------------------------------------------------------------------------
# Header Marker bit masks (AB.2.3)
_MARKER_MORE_OPTIONS = 0x80
_MARKER_MUST_UNDERSTAND = 0x40
_MARKER_HAS_DATA = 0x20
_MARKER_TYPE_MASK = 0x1F
# Maximum number of header options per list (defense-in-depth).
# The spec defines only two option types (Secure Path, Proprietary);
# a well-formed message should never approach this limit.
_MAX_HEADER_OPTIONS = 32
# Maximum data size per header option (defense-in-depth).
# Spec-defined options are small; reject excessively large options.
_MAX_OPTION_DATA_SIZE = 512
# ---------------------------------------------------------------------------
# BVLC-SC Message
# ---------------------------------------------------------------------------
[docs]
@dataclass(frozen=True, slots=True)
class SCMessage:
"""A decoded BVLC-SC message (AB.2.1).
This is the generic envelope for all 13 BVLC-SC message types.
The *payload* field contains the raw payload bytes; use the typed
payload dataclasses (e.g. :class:`ConnectRequestPayload`) to decode
specific message payloads.
"""
function: BvlcSCFunction
message_id: int
originating: SCVMAC | None = None
destination: SCVMAC | None = None
dest_options: tuple[SCHeaderOption, ...] = ()
data_options: tuple[SCHeaderOption, ...] = ()
payload: bytes = b""
[docs]
def encode(self) -> bytes:
"""Encode this message to wire bytes."""
if __debug__ and logger.isEnabledFor(_DEBUG):
logger.debug("BVLC-SC encode: %s", self.function.name)
flags = SCControlFlag.NONE
# Pre-calculate total size to avoid bytearray reallocations
size = SC_HEADER_MIN_LENGTH # 4 bytes (function + flags + message_id)
if self.originating is not None:
flags |= SCControlFlag.ORIGINATING_VMAC
size += VMAC_LENGTH
if self.destination is not None:
flags |= SCControlFlag.DESTINATION_VMAC
size += VMAC_LENGTH
# Encode options lazily (only if present)
dest_opt_bytes: bytes = b""
data_opt_bytes: bytes = b""
if self.dest_options:
flags |= SCControlFlag.DESTINATION_OPTIONS
dest_opt_bytes = _encode_options(self.dest_options)
size += len(dest_opt_bytes)
if self.data_options:
flags |= SCControlFlag.DATA_OPTIONS
data_opt_bytes = _encode_options(self.data_options)
size += len(data_opt_bytes)
size += len(self.payload)
buf = bytearray(size)
buf[0] = self.function
buf[1] = flags
struct.pack_into("!H", buf, 2, self.message_id)
offset = SC_HEADER_MIN_LENGTH
if self.originating is not None:
buf[offset : offset + VMAC_LENGTH] = self.originating.address
offset += VMAC_LENGTH
if self.destination is not None:
buf[offset : offset + VMAC_LENGTH] = self.destination.address
offset += VMAC_LENGTH
if dest_opt_bytes:
buf[offset : offset + len(dest_opt_bytes)] = dest_opt_bytes
offset += len(dest_opt_bytes)
if data_opt_bytes:
buf[offset : offset + len(data_opt_bytes)] = data_opt_bytes
offset += len(data_opt_bytes)
if self.payload:
buf[offset : offset + len(self.payload)] = self.payload
return bytes(buf)
[docs]
@staticmethod
def decode(data: bytes | memoryview, *, skip_payload: bool = False) -> SCMessage:
"""Decode a BVLC-SC message from wire bytes.
:param skip_payload: If True, set ``payload`` to ``b""`` instead of
copying the remaining bytes. Used by the hub function which
forwards raw bytes and never inspects the payload.
:raises ValueError: If the message is malformed or truncated.
"""
if isinstance(data, bytes):
data = memoryview(data)
if len(data) < SC_HEADER_MIN_LENGTH:
msg = (
f"BVLC-SC message too short: need at least "
f"{SC_HEADER_MIN_LENGTH} bytes, got {len(data)}"
)
logger.warning("BVLC-SC malformed message: %s", msg)
raise ValueError(msg)
function = _SC_FUNCTIONS[data[0]]
if function is None:
msg = f"Unknown BVLC-SC function: {data[0]:#x}"
raise ValueError(msg)
if __debug__ and logger.isEnabledFor(_DEBUG):
logger.debug("BVLC-SC decode: %s", function.name)
flags = data[1] & 0x0F # plain int, no enum
(message_id,) = struct.unpack_from("!H", data, 2)
offset = SC_HEADER_MIN_LENGTH
originating: SCVMAC | None = None
if flags & _ORIGINATING_VMAC:
if offset + VMAC_LENGTH > len(data):
msg = "Truncated: missing Originating Virtual Address"
logger.warning("BVLC-SC malformed message: %s", msg)
raise ValueError(msg)
originating = SCVMAC._from_trusted(bytes(data[offset : offset + VMAC_LENGTH]))
offset += VMAC_LENGTH
destination: SCVMAC | None = None
if flags & _DESTINATION_VMAC:
if offset + VMAC_LENGTH > len(data):
msg = "Truncated: missing Destination Virtual Address"
logger.warning("BVLC-SC malformed message: %s", msg)
raise ValueError(msg)
destination = SCVMAC._from_trusted(bytes(data[offset : offset + VMAC_LENGTH]))
offset += VMAC_LENGTH
dest_options: tuple[SCHeaderOption, ...] = ()
if flags & _DESTINATION_OPTIONS:
dest_options, consumed = SCHeaderOption.decode_list(data[offset:])
offset += consumed
data_options: tuple[SCHeaderOption, ...] = ()
if flags & _DATA_OPTIONS:
data_options, consumed = SCHeaderOption.decode_list(data[offset:])
offset += consumed
payload = b"" if skip_payload else bytes(data[offset:])
return _make_sc_message(
function,
message_id,
originating,
destination,
dest_options,
data_options,
payload,
)
def _make_sc_message(
function: BvlcSCFunction,
message_id: int,
originating: SCVMAC | None,
destination: SCVMAC | None,
dest_options: tuple[SCHeaderOption, ...],
data_options: tuple[SCHeaderOption, ...],
payload: bytes,
) -> SCMessage:
"""Fast SCMessage construction bypassing frozen-dataclass ``__init__``."""
obj = object.__new__(SCMessage)
object.__setattr__(obj, "function", function)
object.__setattr__(obj, "message_id", message_id)
object.__setattr__(obj, "originating", originating)
object.__setattr__(obj, "destination", destination)
object.__setattr__(obj, "dest_options", dest_options)
object.__setattr__(obj, "data_options", data_options)
object.__setattr__(obj, "payload", payload)
return obj
[docs]
def encode_encapsulated_npdu(
originating: SCVMAC,
destination: SCVMAC | None,
payload: bytes,
) -> bytes:
"""Fast-path encode for Encapsulated-NPDU (AB.2.12).
Avoids creating an :class:`SCMessage` object and the generic
``encode()`` method overhead. Used by :meth:`SCTransport.send_unicast`
and :meth:`SCTransport.send_broadcast` on the hot path.
"""
if destination is not None:
# Flags: ORIGINATING_VMAC(0x08) | DESTINATION_VMAC(0x04)
buf = bytearray(16 + len(payload))
buf[0] = BvlcSCFunction.ENCAPSULATED_NPDU
buf[1] = SCControlFlag.ORIGINATING_VMAC | SCControlFlag.DESTINATION_VMAC
# message_id = 0 (already zero in bytearray)
buf[4:10] = originating.address
buf[10:16] = destination.address
buf[16:] = payload
else:
# Flags: ORIGINATING_VMAC(0x08)
buf = bytearray(10 + len(payload))
buf[0] = BvlcSCFunction.ENCAPSULATED_NPDU
buf[1] = SCControlFlag.ORIGINATING_VMAC
# message_id = 0
buf[4:10] = originating.address
buf[10:] = payload
return bytes(buf)
def _encode_options(options: tuple[SCHeaderOption, ...]) -> bytes:
"""Encode a list of header options with proper More-Options chaining."""
buf = bytearray()
last = len(options) - 1
for i, opt in enumerate(options):
buf.extend(opt.encode(more=i < last))
return bytes(buf)
# ---------------------------------------------------------------------------
# Typed Payloads
# ---------------------------------------------------------------------------
# Connect-Request / Connect-Accept payload: 26 bytes fixed
_CONNECT_PAYLOAD_LENGTH = VMAC_LENGTH + 16 + 2 + 2 # 26
@dataclass(frozen=True, slots=True)
class _ConnectPayload:
"""Shared payload structure for Connect-Request (AB.2.10) and Connect-Accept (AB.2.11)."""
vmac: SCVMAC
uuid: DeviceUUID
max_bvlc_length: int
max_npdu_length: int
def encode(self) -> bytes:
"""Encode to 26 bytes."""
return (
self.vmac.address
+ self.uuid.value
+ struct.pack("!HH", self.max_bvlc_length, self.max_npdu_length)
)
@staticmethod
def decode(data: bytes | memoryview) -> _ConnectPayload:
"""Decode from payload bytes."""
if len(data) < _CONNECT_PAYLOAD_LENGTH:
msg = (
f"Connect payload too short: need {_CONNECT_PAYLOAD_LENGTH} bytes, got {len(data)}"
)
raise ValueError(msg)
vmac = SCVMAC(bytes(data[:6]))
device_uuid = DeviceUUID(bytes(data[6:22]))
max_bvlc, max_npdu = struct.unpack_from("!HH", data, 22)
return _ConnectPayload(vmac, device_uuid, max_bvlc, max_npdu)
# Public aliases — same structure, separate names for API clarity
ConnectRequestPayload = _ConnectPayload
ConnectAcceptPayload = _ConnectPayload
[docs]
@dataclass(frozen=True, slots=True)
class BvlcResultPayload:
"""Payload for BVLC-Result messages (AB.2.4).
For ACK: only *for_function* and *result_code* are meaningful.
For NAK: *error_header_marker*, *error_class*, *error_code*, and
optionally *error_details* describe the error.
"""
for_function: BvlcSCFunction
result_code: SCResultCode
error_header_marker: int = 0
error_class: int = 0
error_code: int = 0
error_details: str = ""
[docs]
def encode(self) -> bytes:
"""Encode BVLC-Result payload."""
if self.result_code != SCResultCode.NAK:
return bytes((self.for_function, self.result_code))
buf = bytearray(7)
buf[0] = self.for_function
buf[1] = self.result_code
buf[2] = self.error_header_marker
struct.pack_into("!HH", buf, 3, self.error_class, self.error_code)
if self.error_details:
buf.extend(self.error_details.encode("utf-8"))
return bytes(buf)
[docs]
@staticmethod
def decode(data: bytes | memoryview) -> BvlcResultPayload:
"""Decode BVLC-Result payload."""
if len(data) < 2:
msg = f"BVLC-Result payload too short: need at least 2 bytes, got {len(data)}"
raise ValueError(msg)
for_function = BvlcSCFunction(data[0])
result_code = SCResultCode(data[1])
if result_code == SCResultCode.ACK:
return BvlcResultPayload(for_function, result_code)
# NAK: marker(1) + error_class(2) + error_code(2) = 5 more bytes minimum
if len(data) < 7:
msg = f"BVLC-Result NAK payload too short: need at least 7 bytes, got {len(data)}"
raise ValueError(msg)
error_header_marker = data[2]
error_class, error_code_val = struct.unpack_from("!HH", data, 3)
error_details = ""
if len(data) > 7:
error_details = bytes(data[7:]).decode("utf-8", errors="replace")
return BvlcResultPayload(
for_function,
result_code,
error_header_marker,
error_class,
error_code_val,
error_details,
)
[docs]
@dataclass(frozen=True, slots=True)
class AdvertisementPayload:
"""Payload for Advertisement messages (AB.2.8)."""
hub_connection_status: SCHubConnectionStatus
accept_direct_connections: bool
max_bvlc_length: int
max_npdu_length: int
[docs]
def encode(self) -> bytes:
"""Encode to 6 bytes."""
return struct.pack(
"!BBHH",
self.hub_connection_status,
0x01 if self.accept_direct_connections else 0x00,
self.max_bvlc_length,
self.max_npdu_length,
)
[docs]
@staticmethod
def decode(data: bytes | memoryview) -> AdvertisementPayload:
"""Decode from payload bytes."""
if len(data) < 6:
msg = f"Advertisement payload too short: need 6 bytes, got {len(data)}"
raise ValueError(msg)
status, accept, max_bvlc, max_npdu = struct.unpack_from("!BBHH", data, 0)
return AdvertisementPayload(
hub_connection_status=SCHubConnectionStatus(status),
accept_direct_connections=accept != 0,
max_bvlc_length=max_bvlc,
max_npdu_length=max_npdu,
)
[docs]
@dataclass(frozen=True, slots=True)
class AddressResolutionAckPayload:
"""Payload for Address-Resolution-ACK messages (AB.2.7).
WebSocket URIs are space-separated in the wire format.
"""
websocket_uris: tuple[str, ...]
[docs]
def encode(self) -> bytes:
"""Encode URI list to UTF-8 payload."""
return " ".join(self.websocket_uris).encode("utf-8")
[docs]
@staticmethod
def decode(data: bytes | memoryview) -> AddressResolutionAckPayload:
"""Decode URI list from payload bytes (max 16 URIs)."""
text = bytes(data).decode("utf-8")
if not text:
return AddressResolutionAckPayload(())
uris = text.split(" ")
return AddressResolutionAckPayload(tuple(uris[:16]))
[docs]
@dataclass(frozen=True, slots=True)
class ProprietaryMessagePayload:
"""Payload for Proprietary-Message (AB.2.16)."""
vendor_id: int
proprietary_function: int
proprietary_data: bytes = b""
[docs]
def encode(self) -> bytes:
"""Encode proprietary payload."""
return (
struct.pack("!HB", self.vendor_id, self.proprietary_function) + self.proprietary_data
)
[docs]
@staticmethod
def decode(data: bytes | memoryview) -> ProprietaryMessagePayload:
"""Decode proprietary payload."""
if len(data) < 3:
msg = f"Proprietary payload too short: need at least 3 bytes, got {len(data)}"
raise ValueError(msg)
vendor_id, prop_func = struct.unpack_from("!HB", data, 0)
prop_data = bytes(data[3:])
return ProprietaryMessagePayload(vendor_id, prop_func, prop_data)
# ---------------------------------------------------------------------------
# Convenience builders
# ---------------------------------------------------------------------------
[docs]
def build_secure_path_option() -> SCHeaderOption:
"""Build a Secure Path data option (AB.2.3.1).
The Secure Path header option has Must-Understand=1 and no data.
"""
return SCHeaderOption(
type=SCHeaderOptionType.SECURE_PATH,
must_understand=True,
)