Source code for bac_py.encoding.primitives

"""BACnet primitive type encoding/decoding per ASHRAE 135-2016 Clause 20.2."""

from __future__ import annotations

import enum
import logging
import struct

from bac_py.encoding.tags import TagClass, encode_closing_tag, encode_opening_tag, encode_tag
from bac_py.types.enums import ObjectType
from bac_py.types.primitives import (
    BACnetDate,
    BACnetDouble,
    BACnetTime,
    BitString,
    ObjectIdentifier,
)

logger = logging.getLogger(__name__)

# Application tag numbers for primitive types
_TAG_NULL = 0
_TAG_BOOLEAN = 1
_TAG_UNSIGNED = 2
_TAG_SIGNED = 3
_TAG_REAL = 4
_TAG_DOUBLE = 5
_TAG_OCTET_STRING = 6
_TAG_CHARACTER_STRING = 7
_TAG_BIT_STRING = 8
_TAG_ENUMERATED = 9
_TAG_DATE = 10
_TAG_TIME = 11
_TAG_OBJECT_IDENTIFIER = 12

# Charset decoders for CharacterString (Clause 20.2.9)
_CHARSET_DECODERS: dict[int, str] = {
    0x00: "utf-8",
    0x01: "iso2022_jp",  # JIS X0201 (Clause 20.2.9)
    0x02: "iso2022_jp",
    0x03: "utf-32-be",
    0x04: "utf-16-be",
    0x05: "iso-8859-1",
}


# --- Unsigned Integer (Clause 20.2.4) ---

# Pre-computed single-byte unsigned encodings for values 0-255.
_UNSIGNED_1BYTE: tuple[bytes, ...] = tuple(bytes([i]) for i in range(256))


[docs] def encode_unsigned(value: int) -> bytes: """Encode an unsigned integer using the minimum number of octets, big-endian. BACnet unsigned integers are at most 4 bytes (0..4,294,967,295). :param value: Non-negative integer to encode (0--4,294,967,295). :returns: Big-endian encoded bytes (1--4 bytes). :raises ValueError: If *value* is negative or exceeds the 4-byte maximum. """ if value < 0: msg = f"Unsigned integer must be >= 0, got {value}" raise ValueError(msg) if value <= 0xFF: return _UNSIGNED_1BYTE[value] if value > 0xFFFFFFFF: msg = f"Unsigned integer exceeds 4-byte maximum (4294967295), got {value}" raise ValueError(msg) n = (value.bit_length() + 7) // 8 return value.to_bytes(n, "big")
[docs] def decode_unsigned(data: memoryview | bytes) -> int: """Decode an unsigned integer from big-endian bytes. :param data: One or more bytes encoding a big-endian unsigned integer. :returns: The decoded non-negative integer. """ return int.from_bytes(data, "big")
# --- Unsigned64 Integer (Clause 20.2.4, extended for Unsigned64 fields) ---
[docs] def encode_unsigned64(value: int) -> bytes: """Encode an unsigned integer using the minimum number of octets, up to 8 bytes. Used for BACnet Unsigned64 fields such as audit log sequence numbers. :param value: Non-negative integer to encode (0--18,446,744,073,709,551,615). :returns: Big-endian encoded bytes (1--8 bytes). :raises ValueError: If *value* is negative or exceeds the 8-byte maximum. """ if value < 0: msg = f"Unsigned64 integer must be >= 0, got {value}" raise ValueError(msg) if value <= 0xFF: return _UNSIGNED_1BYTE[value] if value > 0xFFFFFFFFFFFFFFFF: msg = f"Unsigned64 integer exceeds 8-byte maximum, got {value}" raise ValueError(msg) n = (value.bit_length() + 7) // 8 return value.to_bytes(n, "big")
[docs] def decode_unsigned64(data: memoryview | bytes) -> int: """Decode an unsigned integer from big-endian bytes (up to 8 bytes). :param data: One or more bytes encoding a big-endian unsigned integer. :returns: The decoded non-negative integer. """ return int.from_bytes(data, "big")
# --- Signed Integer (Clause 20.2.5) --- def _min_signed_bytes(value: int) -> int: """Return the minimum number of bytes needed to encode a signed integer. Per Clause 20.2.5, signed integers shall use the minimum number of octets in two's-complement representation. :param value: Signed integer to measure. :returns: Byte count (1--4) required for two's-complement big-endian encoding. """ if value == 0: return 1 if value > 0: return (value.bit_length() + 8) // 8 # +1 for sign bit, rounded up # For negative values: (-value - 1) gives the magnitude that must # be representable. E.g. -128 -> 127 -> 7 bits -> (7+8)//8 = 1 byte. return ((-value - 1).bit_length() + 8) // 8
[docs] def encode_signed(value: int) -> bytes: """Encode a signed integer using minimum octets, two's complement, big-endian. BACnet signed integers are at most 4 bytes (-2,147,483,648..2,147,483,647). :param value: Signed integer to encode (-2,147,483,648..2,147,483,647). :returns: Two's-complement big-endian encoded bytes (1--4 bytes). :raises ValueError: If *value* is outside the 4-byte signed range. """ if value < -0x80000000 or value > 0x7FFFFFFF: msg = f"Signed integer out of 4-byte range (-2147483648..2147483647), got {value}" raise ValueError(msg) n = _min_signed_bytes(value) return value.to_bytes(n, "big", signed=True)
[docs] def decode_signed(data: memoryview | bytes) -> int: """Decode a signed integer from two's-complement big-endian bytes. :param data: One or more bytes encoding a two's-complement big-endian signed integer. :returns: The decoded signed integer. """ return int.from_bytes(data, "big", signed=True)
# --- Real (Clause 20.2.6) ---
[docs] def encode_real(value: float) -> bytes: """Encode an IEEE-754 single-precision (32-bit) float. :param value: Floating-point value to encode. :returns: 4 bytes in big-endian IEEE-754 single-precision format. """ return struct.pack(">f", value)
[docs] def decode_real(data: memoryview | bytes) -> float: """Decode an IEEE-754 single-precision (32-bit) float. :param data: At least 4 bytes of big-endian IEEE-754 single-precision data. :returns: The decoded floating-point value. :raises ValueError: If *data* contains fewer than 4 bytes. """ if len(data) < 4: msg = f"decode_real requires at least 4 bytes, got {len(data)}" raise ValueError(msg) result: float = struct.unpack_from(">f", data)[0] return result
# --- Double (Clause 20.2.7) ---
[docs] def encode_double(value: float) -> bytes: """Encode an IEEE-754 double-precision (64-bit) float. :param value: Floating-point value to encode. :returns: 8 bytes in big-endian IEEE-754 double-precision format. """ return struct.pack(">d", value)
[docs] def decode_double(data: memoryview | bytes) -> float: """Decode an IEEE-754 double-precision (64-bit) float. :param data: At least 8 bytes of big-endian IEEE-754 double-precision data. :returns: The decoded floating-point value. :raises ValueError: If *data* contains fewer than 8 bytes. """ if len(data) < 8: msg = f"decode_double requires at least 8 bytes, got {len(data)}" raise ValueError(msg) result: float = struct.unpack_from(">d", data)[0] return result
# --- Octet String (Clause 20.2.8) ---
[docs] def decode_octet_string(data: memoryview | bytes) -> bytes: """Decode an octet string by copying the raw bytes. :param data: Raw octet-string content bytes. :returns: A copy of the input as a ``bytes`` object. """ return bytes(data)
# --- Character String (Clause 20.2.9) ---
[docs] def encode_character_string(value: str, charset: int = 0) -> bytes: """Encode a character string with a leading charset byte. :param value: The string to encode. :param charset: Character set identifier (default ``0x00`` = UTF-8). :returns: Encoded bytes with leading charset byte. :raises ValueError: If *charset* is not a supported BACnet character set. """ encoding = _CHARSET_DECODERS.get(charset) if encoding is None: msg = f"Unsupported BACnet character set: 0x{charset:02x}" raise ValueError(msg) encoded = value.encode(encoding) buf = bytearray(1 + len(encoded)) buf[0] = charset buf[1:] = encoded return bytes(buf)
[docs] def decode_character_string(data: memoryview | bytes) -> str: """Decode a character string from contents octets. The first byte is the character set identifier. Unknown charsets fall back to latin-1 decoding with a warning log rather than raising, to preserve data from devices using non-standard charsets. :param data: Contents octets with leading charset byte. :returns: The decoded Python string. :raises ValueError: If *data* is empty. """ if len(data) < 1: msg = "CharacterString data too short: need at least 1 byte for charset" raise ValueError(msg) charset = data[0] encoding = _CHARSET_DECODERS.get(charset) if encoding is None: logger.warning( "Unknown BACnet character set 0x%02x; falling back to latin-1", charset, ) encoding = "iso-8859-1" return ( data[1:].tobytes().decode(encoding) if isinstance(data, memoryview) else data[1:].decode(encoding) )
# --- Enumerated (Clause 20.2.11) ---
[docs] def encode_enumerated(value: int) -> bytes: """Encode an enumerated value (same encoding as unsigned). :param value: Enumerated value to encode. :returns: Big-endian encoded bytes. """ return encode_unsigned(value)
[docs] def decode_enumerated(data: memoryview | bytes) -> int: """Decode an enumerated value (same encoding as unsigned). :param data: Big-endian encoded bytes. :returns: The decoded enumerated value as an integer. """ return decode_unsigned(data)
# --- Bit String (Clause 20.2.10) ---
[docs] def encode_bit_string(value: BitString) -> bytes: """Encode a bit string with a leading unused-bits count byte. :param value: The :class:`BitString` to encode. :returns: Encoded bytes with leading unused-bits count followed by the bit data. """ buf = bytearray(1 + len(value.data)) buf[0] = value.unused_bits buf[1:] = value.data return bytes(buf)
[docs] def decode_bit_string(data: memoryview | bytes) -> BitString: """Decode a bit string from contents octets. :param data: Contents octets with leading unused-bits count byte. :returns: The decoded :class:`BitString`. :raises ValueError: If *data* is empty. """ if len(data) < 1: msg = "BitString data too short: need at least 1 byte for unused-bits count" raise ValueError(msg) unused_bits = data[0] if unused_bits > 7: msg = f"BitString unused_bits must be 0-7, got {unused_bits}" raise ValueError(msg) return BitString(bytes(data[1:]), unused_bits)
# --- Date (Clause 20.2.12) ---
[docs] def encode_date(date: BACnetDate) -> bytes: """Encode a :class:`BACnetDate` to 4 bytes: year-1900, month, day, day-of-week. Valid years are 1900--2155 or ``0xFF`` (unspecified). :param date: The :class:`BACnetDate` to encode. :returns: 4 bytes representing the encoded date. :raises ValueError: If the year is outside the valid range. """ if date.year == 0xFF: year_byte = 0xFF elif not 1900 <= date.year <= 2155: msg = f"BACnetDate year must be 1900-2155 or 0xFF (unspecified), got {date.year}" raise ValueError(msg) else: year_byte = date.year - 1900 return bytes([year_byte, date.month, date.day, date.day_of_week])
[docs] def decode_date(data: memoryview | bytes) -> BACnetDate: """Decode a :class:`BACnetDate` from 4 bytes. :param data: At least 4 bytes of encoded date data. :returns: The decoded :class:`BACnetDate`. :raises ValueError: If *data* contains fewer than 4 bytes. """ if len(data) < 4: msg = f"Date data too short: need 4 bytes, got {len(data)}" raise ValueError(msg) year = 0xFF if data[0] == 0xFF else data[0] + 1900 return BACnetDate(year, data[1], data[2], data[3])
# --- Time (Clause 20.2.13) ---
[docs] def encode_time(time: BACnetTime) -> bytes: """Encode a :class:`BACnetTime` to 4 bytes: hour, minute, second, hundredth. :param time: The :class:`BACnetTime` to encode. :returns: 4 bytes representing the encoded time. """ return bytes([time.hour, time.minute, time.second, time.hundredth])
[docs] def decode_time(data: memoryview | bytes) -> BACnetTime: """Decode a :class:`BACnetTime` from 4 bytes. :param data: At least 4 bytes of encoded time data. :returns: The decoded :class:`BACnetTime`. :raises ValueError: If *data* contains fewer than 4 bytes. """ if len(data) < 4: msg = f"Time data too short: need 4 bytes, got {len(data)}" raise ValueError(msg) return BACnetTime(data[0], data[1], data[2], data[3])
# --- Object Identifier (Clause 20.2.14) ---
[docs] def encode_object_identifier(obj_type: int, instance: int) -> bytes: """Encode a BACnet object identifier to 4 bytes. Object type is a 10-bit field (0--1023). Instance is a 22-bit field (0--4,194,303). Delegates to :class:`ObjectIdentifier` for encoding. :param obj_type: Object type number (0--1023). :param instance: Instance number (0--4,194,303). :returns: 4 bytes encoding the object identifier. """ return ObjectIdentifier(ObjectType(obj_type), instance).encode()
[docs] def decode_object_identifier(data: memoryview | bytes) -> tuple[int, int]: """Decode a BACnet object identifier from 4 bytes. :param data: At least 4 bytes of encoded object identifier data. :returns: Tuple of ``(object_type, instance_number)``. :raises ValueError: If *data* contains fewer than 4 bytes. """ if len(data) < 4: msg = f"ObjectIdentifier data too short: need 4 bytes, got {len(data)}" raise ValueError(msg) value = int.from_bytes(data[:4], "big") return (value >> 22, value & 0x3FFFFF)
# --- Null (Clause 20.2.2) --- # --- Boolean (Clause 20.2.3) --- # Note: Application-tagged booleans encode the value in the tag's L/V/T bits # (no contents octet). Context-tagged booleans use 1 contents octet. # The tag encoding handles this; these functions handle the contents octet form. _BOOL_TRUE = b"\x01" _BOOL_FALSE = b"\x00"
[docs] def encode_boolean(value: bool) -> bytes: """Encode a boolean value as a single contents octet. For context-tagged booleans. Application-tagged booleans encode the value in the tag itself. :param value: The boolean value to encode. :returns: A single byte (``0x01`` for ``True``, ``0x00`` for ``False``). """ return _BOOL_TRUE if value else _BOOL_FALSE
[docs] def decode_boolean(data: memoryview | bytes) -> bool: """Decode a boolean value from a single contents octet. :param data: At least 1 byte; the first byte is interpreted as the boolean value. :returns: ``True`` if the first byte is non-zero, ``False`` otherwise. :raises ValueError: If *data* is empty. """ if len(data) < 1: msg = f"decode_boolean requires at least 1 byte, got {len(data)}" raise ValueError(msg) return bool(data[0])
# --- Application-Tagged Convenience Functions ---
[docs] def encode_application_tagged(tag_number: int, data: bytes) -> bytes: """Encode data with an application tag. :param tag_number: Application tag number identifying the data type. :param data: Encoded content bytes to wrap with the tag. :returns: Application-tagged encoded bytes. """ return encode_tag(tag_number, TagClass.APPLICATION, len(data)) + data
[docs] def encode_context_tagged(tag_number: int, data: bytes) -> bytes: """Encode data with a context-specific tag. :param tag_number: Context tag number. :param data: Encoded content bytes to wrap with the tag. :returns: Context-tagged encoded bytes. """ return encode_tag(tag_number, TagClass.CONTEXT, len(data)) + data
[docs] def encode_application_null() -> bytes: """Encode an application-tagged Null. :returns: Application-tagged Null encoding (tag only, no content). """ return encode_application_tagged(_TAG_NULL, b"")
[docs] def encode_application_boolean(value: bool) -> bytes: """Encode an application-tagged Boolean. Per Clause 20.2.3, the value is encoded in the L/V/T bits of the tag with no contents octet. :param value: The boolean value to encode. :returns: Application-tagged Boolean encoding. """ return encode_tag(_TAG_BOOLEAN, TagClass.APPLICATION, 1 if value else 0)
[docs] def encode_application_unsigned(value: int) -> bytes: """Encode an application-tagged Unsigned Integer. :param value: Non-negative integer to encode. :returns: Application-tagged Unsigned Integer encoding. """ data = encode_unsigned(value) return encode_application_tagged(_TAG_UNSIGNED, data)
[docs] def encode_application_signed(value: int) -> bytes: """Encode an application-tagged Signed Integer. :param value: Signed integer to encode. :returns: Application-tagged Signed Integer encoding. """ data = encode_signed(value) return encode_application_tagged(_TAG_SIGNED, data)
[docs] def encode_application_real(value: float) -> bytes: """Encode an application-tagged Real. :param value: Floating-point value to encode. :returns: Application-tagged Real encoding. """ data = encode_real(value) return encode_application_tagged(_TAG_REAL, data)
[docs] def encode_application_double(value: float) -> bytes: """Encode an application-tagged Double. :param value: Floating-point value to encode. :returns: Application-tagged Double encoding. """ data = encode_double(value) return encode_application_tagged(_TAG_DOUBLE, data)
[docs] def encode_application_octet_string(value: bytes) -> bytes: """Encode an application-tagged Octet String. :param value: Raw bytes to encode. :returns: Application-tagged Octet String encoding. """ return encode_application_tagged(_TAG_OCTET_STRING, value)
[docs] def encode_application_character_string(value: str) -> bytes: """Encode an application-tagged Character String. :param value: String to encode (UTF-8 by default). :returns: Application-tagged Character String encoding. """ data = encode_character_string(value) return encode_application_tagged(_TAG_CHARACTER_STRING, data)
[docs] def encode_application_enumerated(value: int) -> bytes: """Encode an application-tagged Enumerated. :param value: Enumerated value to encode. :returns: Application-tagged Enumerated encoding. """ data = encode_enumerated(value) return encode_application_tagged(_TAG_ENUMERATED, data)
[docs] def encode_application_date(date: BACnetDate) -> bytes: """Encode an application-tagged Date. :param date: The :class:`BACnetDate` to encode. :returns: Application-tagged Date encoding. """ data = encode_date(date) return encode_application_tagged(_TAG_DATE, data)
[docs] def encode_application_time(time: BACnetTime) -> bytes: """Encode an application-tagged Time. :param time: The :class:`BACnetTime` to encode. :returns: Application-tagged Time encoding. """ data = encode_time(time) return encode_application_tagged(_TAG_TIME, data)
[docs] def encode_application_object_id(obj_type: int, instance: int) -> bytes: """Encode an application-tagged Object Identifier. :param obj_type: Object type number. :param instance: Instance number. :returns: Application-tagged Object Identifier encoding. """ data = encode_object_identifier(obj_type, instance) return encode_application_tagged(_TAG_OBJECT_IDENTIFIER, data)
[docs] def encode_context_object_id(tag_number: int, obj_id: ObjectIdentifier) -> bytes: """Encode an :class:`ObjectIdentifier` with a context-specific tag. :param tag_number: Context tag number. :param obj_id: The :class:`ObjectIdentifier` to encode. :returns: Context-tagged Object Identifier encoding. """ return encode_context_tagged(tag_number, obj_id.encode())
[docs] def encode_context_unsigned(tag_number: int, value: int) -> bytes: """Encode an unsigned integer with a context-specific tag. :param tag_number: Context tag number. :param value: Non-negative integer to encode. :returns: Context-tagged unsigned integer encoding. """ return encode_context_tagged(tag_number, encode_unsigned(value))
[docs] def encode_context_signed(tag_number: int, value: int) -> bytes: """Encode a signed integer with a context-specific tag. :param tag_number: Context tag number. :param value: Signed integer to encode. :returns: Context-tagged signed integer encoding. """ return encode_context_tagged(tag_number, encode_signed(value))
[docs] def encode_context_enumerated(tag_number: int, value: int) -> bytes: """Encode an enumerated value with a context-specific tag. :param tag_number: Context tag number. :param value: Enumerated value to encode. :returns: Context-tagged enumerated encoding. """ return encode_context_tagged(tag_number, encode_enumerated(value))
[docs] def encode_context_boolean(tag_number: int, value: bool) -> bytes: """Encode a boolean with a context-specific tag. Context-tagged booleans use a 1-byte contents octet, unlike application-tagged booleans which encode the value in the tag LVT. :param tag_number: Context tag number. :param value: The boolean value to encode. :returns: Context-tagged boolean encoding. """ return encode_context_tagged(tag_number, encode_boolean(value))
[docs] def encode_context_real(tag_number: int, value: float) -> bytes: """Encode a Real with a context-specific tag. :param tag_number: Context tag number. :param value: Floating-point value to encode. :returns: Context-tagged Real encoding. """ return encode_context_tagged(tag_number, encode_real(value))
[docs] def encode_context_octet_string(tag_number: int, value: bytes) -> bytes: """Encode an octet string with a context-specific tag. :param tag_number: Context tag number. :param value: Raw bytes to encode. :returns: Context-tagged octet string encoding. """ return encode_context_tagged(tag_number, value)
[docs] def encode_context_bit_string(tag_number: int, value: BitString) -> bytes: """Encode a bit string with a context-specific tag. :param tag_number: Context tag number. :param value: The :class:`BitString` to encode. :returns: Context-tagged bit string encoding. """ return encode_context_tagged(tag_number, encode_bit_string(value))
[docs] def encode_context_date(tag_number: int, value: BACnetDate) -> bytes: """Encode a date with a context-specific tag. :param tag_number: Context tag number. :param value: The :class:`BACnetDate` to encode. :returns: Context-tagged date encoding. """ return encode_context_tagged(tag_number, encode_date(value))
[docs] def encode_application_bit_string(value: BitString) -> bytes: """Encode an application-tagged Bit String. :param value: The :class:`BitString` to encode. :returns: Application-tagged Bit String encoding. """ data = encode_bit_string(value) return encode_application_tagged(_TAG_BIT_STRING, data)
[docs] def decode_application_value(data: bytes | memoryview) -> object: """Decode application-tagged bytes to a native Python value. Inspects the application tag number and dispatches to the appropriate decoder. Returns native Python types: Tag 0 (Null) -> ``None`` Tag 1 (Boolean) -> ``bool`` Tag 2 (Unsigned) -> ``int`` Tag 3 (Signed) -> ``int`` Tag 4 (Real) -> ``float`` Tag 5 (Double) -> ``float`` Tag 6 (Octet String) -> ``bytes`` Tag 7 (Character String) -> ``str`` Tag 8 (Bit String) -> :class:`BitString` Tag 9 (Enumerated) -> ``int`` Tag 10 (Date) -> :class:`BACnetDate` Tag 11 (Time) -> :class:`BACnetTime` Tag 12 (Object Id) -> :class:`ObjectIdentifier` :param data: Application-tagged encoded bytes. :returns: Decoded Python value. :raises ValueError: If the tag is not application-class or is unrecognised. Example:: from bac_py.encoding.primitives import ( decode_application_value, encode_application_real, ) encoded = encode_application_real(72.5) value = decode_application_value(encoded) # -> 72.5 """ from bac_py.encoding.tags import decode_tag tag, offset = decode_tag(data, 0) if tag.cls != TagClass.APPLICATION: msg = f"Expected application tag, got context tag {tag.number}" raise ValueError(msg) match tag.number: case 0: # Null return None case 1: # Boolean - value is in the tag L/V/T field (Clause 20.2.3) return tag.is_boolean_true # Bounds check (after Boolean/Null which don't use content bytes) if offset + tag.length > len(data): msg = ( f"Application tag content truncated: tag claims {tag.length} bytes " f"at offset {offset}, but only {len(data) - offset} bytes remain" ) logger.warning(msg) raise ValueError(msg) content = data[offset : offset + tag.length] match tag.number: case 2: # Unsigned return decode_unsigned(content) case 3: # Signed return decode_signed(content) case 4: # Real return decode_real(content) case 5: # Double return decode_double(content) case 6: # Octet String return decode_octet_string(content) case 7: # Character String return decode_character_string(content) case 8: # Bit String return decode_bit_string(content) case 9: # Enumerated return decode_enumerated(content) case 10: # Date return decode_date(content) case 11: # Time return decode_time(content) case 12: # Object Identifier obj_type, instance = decode_object_identifier(content) return ObjectIdentifier(ObjectType(obj_type), instance) case _: msg = f"Unknown application tag number: {tag.number}" raise ValueError(msg)
_MAX_DECODED_VALUES = 10_000 """Maximum number of decoded application-tagged values to prevent memory exhaustion."""
[docs] def decode_all_application_values(data: bytes | memoryview) -> list[object]: """Decode all application-tagged values from concatenated bytes. Iterates through the buffer, decoding each application-tagged element and collecting them into a list. :param data: Concatenated application-tagged encoded bytes. :returns: List of decoded Python values. :raises ValueError: If a non-application tag is encountered or the number of decoded values exceeds :data:`_MAX_DECODED_VALUES`. """ from bac_py.encoding.tags import decode_tag if isinstance(data, bytes): data = memoryview(data) results: list[object] = [] offset = 0 while offset < len(data): if len(results) >= _MAX_DECODED_VALUES: msg = ( f"Decoded value count exceeds maximum ({_MAX_DECODED_VALUES}): " f"possible malformed or malicious payload" ) logger.warning(msg) raise ValueError(msg) tag, tag_end = decode_tag(data, offset) if tag.cls != TagClass.APPLICATION: msg = f"Expected application tag at offset {offset}, got context tag {tag.number}" raise ValueError(msg) # For booleans, the value is in the tag length field (no content bytes) element_end = tag_end if tag.number == _TAG_BOOLEAN else tag_end + tag.length element_bytes = data[offset:element_end] results.append(decode_application_value(element_bytes)) offset = element_end return results
[docs] def decode_and_unwrap(data: bytes | memoryview) -> object: """Decode application-tagged bytes and unwrap single-element lists. Convenience wrapper around :func:`decode_all_application_values` that returns a single value directly when the data contains exactly one application-tagged element, ``None`` for empty data, or the full list for multiple elements. :param data: Concatenated application-tagged encoded bytes. :returns: ``None`` if *data* decodes to zero elements, the single decoded value if exactly one element, or a ``list`` of decoded values if multiple elements. """ values = decode_all_application_values(data) if len(values) == 1: return values[0] if len(values) == 0: return None return values
_CONSTRUCTED_ENCODERS: dict[type, object] | None = None def _build_constructed_encoders() -> dict[type, object]: """Build a type-to-encoder dispatch table for constructed BACnet types. Built lazily on first call to :func:`encode_property_value` to avoid circular imports between ``encoding.primitives`` and ``types.constructed``. Each encoder is a callable ``(value, int_as_real) -> bytes``. """ # Local import to break circular dependency with types.constructed from typing import Any from bac_py.types.constructed import ( BACnetAddress, BACnetCalendarEntry, BACnetCOVSubscription, BACnetDateRange, BACnetDateTime, BACnetDestination, BACnetDeviceObjectPropertyReference, BACnetDeviceObjectReference, BACnetLogRecord, BACnetObjectPropertyReference, BACnetPrescale, BACnetPriorityArray, BACnetPriorityValue, BACnetRecipient, BACnetRecipientProcess, BACnetScale, BACnetSpecialEvent, BACnetTimeValue, BACnetValueSource, BACnetWeekNDay, StatusFlags, ) def _enc_status_flags(v: Any, _iar: bool) -> bytes: return encode_application_bit_string(v.to_bit_string()) def _enc_datetime(v: Any, _iar: bool) -> bytes: return encode_application_date(v.date) + encode_application_time(v.time) def _enc_date_range(v: Any, _iar: bool) -> bytes: return encode_application_date(v.start_date) + encode_application_date(v.end_date) def _enc_week_n_day(v: Any, _iar: bool) -> bytes: return encode_application_octet_string(bytes([v.month, v.week_of_month, v.day_of_week])) def _enc_calendar_entry(v: Any, _iar: bool) -> bytes: return _encode_calendar_entry(v) def _enc_time_value(v: Any, iar: bool) -> bytes: return encode_application_time(v.time) + encode_property_value(v.value, int_as_real=iar) def _enc_special_event(v: Any, iar: bool) -> bytes: return _encode_special_event(v, int_as_real=iar) def _enc_dev_obj_prop_ref(v: Any, _iar: bool) -> bytes: parts = [ encode_context_object_id(0, v.object_identifier), encode_context_enumerated(1, v.property_identifier), ] if v.property_array_index is not None: parts.append(encode_context_unsigned(2, v.property_array_index)) if v.device_identifier is not None: parts.append(encode_context_object_id(3, v.device_identifier)) return b"".join(parts) def _enc_obj_prop_ref(v: Any, _iar: bool) -> bytes: parts = [ encode_context_object_id(0, v.object_identifier), encode_context_enumerated(1, v.property_identifier), ] if v.property_array_index is not None: parts.append(encode_context_unsigned(2, v.property_array_index)) return b"".join(parts) def _enc_address(v: Any, _iar: bool) -> bytes: return encode_context_unsigned(0, v.network_number) + encode_context_octet_string( 1, v.mac_address ) def _enc_recipient(v: Any, _iar: bool) -> bytes: return _encode_recipient(v) def _enc_recipient_process(v: Any, _iar: bool) -> bytes: return b"".join( [ encode_opening_tag(0), _encode_recipient(v.recipient), encode_closing_tag(0), encode_context_unsigned(1, v.process_identifier), ] ) def _enc_destination(v: Any, _iar: bool) -> bytes: return b"".join( [ encode_application_bit_string(v.valid_days), encode_application_time(v.from_time), encode_application_time(v.to_time), _encode_recipient(v.recipient), encode_application_unsigned(v.process_identifier), encode_application_boolean(v.issue_confirmed_notifications), encode_application_bit_string(v.transitions), ] ) def _enc_scale(v: Any, _iar: bool) -> bytes: if v.float_scale is not None: return encode_context_real(0, v.float_scale) if v.integer_scale is not None: return encode_context_signed(1, v.integer_scale) return encode_context_real(0, 0.0) def _enc_prescale(v: Any, _iar: bool) -> bytes: return encode_context_unsigned(0, v.multiplier) + encode_context_unsigned( 1, v.modulo_divide ) def _enc_log_record(v: Any, iar: bool) -> bytes: parts = [ encode_application_date(v.timestamp.date), encode_application_time(v.timestamp.time), encode_property_value(v.log_datum, int_as_real=iar), ] if v.status_flags is not None: parts.append(encode_context_bit_string(1, v.status_flags.to_bit_string())) return b"".join(parts) def _enc_cov_subscription(v: Any, _iar: bool) -> bytes: return _encode_cov_subscription(v) def _enc_value_source(v: Any, _iar: bool) -> bytes: result: bytes = v.encode() return result def _enc_dev_obj_ref(v: Any, _iar: bool) -> bytes: result: bytes = v.encode() return result def _enc_priority_value(v: Any, iar: bool) -> bytes: if v.value is None: return encode_application_null() return encode_property_value(v.value, int_as_real=iar) def _enc_priority_array(v: Any, iar: bool) -> bytes: parts: list[bytes] = [] for slot in v.slots: if slot.value is None: parts.append(encode_application_null()) else: parts.append(encode_property_value(slot.value, int_as_real=iar)) return b"".join(parts) return { StatusFlags: _enc_status_flags, BACnetDateTime: _enc_datetime, BACnetDateRange: _enc_date_range, BACnetWeekNDay: _enc_week_n_day, BACnetCalendarEntry: _enc_calendar_entry, BACnetTimeValue: _enc_time_value, BACnetSpecialEvent: _enc_special_event, BACnetDeviceObjectPropertyReference: _enc_dev_obj_prop_ref, BACnetObjectPropertyReference: _enc_obj_prop_ref, BACnetAddress: _enc_address, BACnetRecipient: _enc_recipient, BACnetRecipientProcess: _enc_recipient_process, BACnetDestination: _enc_destination, BACnetScale: _enc_scale, BACnetPrescale: _enc_prescale, BACnetLogRecord: _enc_log_record, BACnetCOVSubscription: _enc_cov_subscription, BACnetValueSource: _enc_value_source, BACnetDeviceObjectReference: _enc_dev_obj_ref, BACnetPriorityValue: _enc_priority_value, BACnetPriorityArray: _enc_priority_array, }
[docs] def encode_property_value(value: object, *, int_as_real: bool = False) -> bytes: """Encode a Python value to application-tagged bytes. Handles the common types stored in BACnet object properties, including both primitive and constructed BACnet types. :param value: The value to encode. :param int_as_real: If ``True``, encode plain ``int`` values as Real instead of Unsigned (used for analog object types where Present_Value is Real). :returns: Application-tagged encoded bytes. :raises TypeError: If the value type is not supported. """ global _CONSTRUCTED_ENCODERS if _CONSTRUCTED_ENCODERS is None: _CONSTRUCTED_ENCODERS = _build_constructed_encoders() if value is None: return encode_application_null() if isinstance(value, ObjectIdentifier): return encode_application_object_id(value.object_type, value.instance_number) # --- Constructed types: O(1) dispatch table lookup --- encoder = _CONSTRUCTED_ENCODERS.get(type(value)) if encoder is not None: result: bytes = encoder(value, int_as_real) # type: ignore[operator] return result # --- Primitive types (order matters due to subclass relationships) --- if isinstance(value, BitString): return encode_application_bit_string(value) if isinstance(value, BACnetDate): return encode_application_date(value) if isinstance(value, BACnetTime): return encode_application_time(value) if isinstance(value, str): return encode_application_character_string(value) if isinstance(value, bool): # Must check bool before int since bool is a subclass of int return encode_application_boolean(value) if isinstance(value, enum.IntEnum): # Must check IntEnum before int since IntEnum is a subclass of int return encode_application_enumerated(value) if isinstance(value, BACnetDouble): # Must check BACnetDouble before float since BACnetDouble is a subclass of float return encode_application_double(value) if isinstance(value, float): return encode_application_real(value) if isinstance(value, int): if int_as_real: return encode_application_real(float(value)) return encode_application_unsigned(value) if isinstance(value, bytes): # Already-encoded application-tagged bytes (pass-through) return value if isinstance(value, list): return b"".join(encode_property_value(item, int_as_real=int_as_real) for item in value) msg = f"Cannot encode value of type {type(value).__name__}" raise TypeError(msg)
def _encode_calendar_entry(entry: object) -> bytes: """Encode a ``BACnetCalendarEntry`` CHOICE with context tags. :param entry: A ``BACnetCalendarEntry`` instance containing a :class:`BACnetDate`, ``BACnetDateRange``, or ``BACnetWeekNDay``. :returns: Context-tagged encoded bytes for the calendar entry. """ from bac_py.types.constructed import BACnetCalendarEntry, BACnetDateRange, BACnetWeekNDay assert isinstance(entry, BACnetCalendarEntry) val = entry.value if isinstance(val, BACnetDate): # date [0] return encode_context_date(0, val) if isinstance(val, BACnetDateRange): # dateRange [1] - constructed return b"".join( [ encode_opening_tag(1), encode_application_date(val.start_date), encode_application_date(val.end_date), encode_closing_tag(1), ] ) # weekNDay [2] assert isinstance(val, BACnetWeekNDay) return encode_context_octet_string( 2, bytes([val.month, val.week_of_month, val.day_of_week]), ) def _encode_special_event(event: object, *, int_as_real: bool = False) -> bytes: """Encode a ``BACnetSpecialEvent`` SEQUENCE. :param event: A ``BACnetSpecialEvent`` instance. :param int_as_real: If ``True``, encode integer time-values as Real. :returns: Encoded bytes for the special event. """ from bac_py.types.constructed import BACnetCalendarEntry, BACnetSpecialEvent assert isinstance(event, BACnetSpecialEvent) parts: list[bytes] = [] if isinstance(event.period, BACnetCalendarEntry): parts.append(encode_opening_tag(0)) parts.append(_encode_calendar_entry(event.period)) parts.append(encode_closing_tag(0)) else: # Calendar object reference parts.append(encode_context_object_id(1, event.period)) parts.append(encode_opening_tag(2)) # listOfTimeValues for tv in event.list_of_time_values: parts.append(encode_application_time(tv.time)) parts.append(encode_property_value(tv.value, int_as_real=int_as_real)) parts.append(encode_closing_tag(2)) parts.append(encode_context_unsigned(3, event.event_priority)) return b"".join(parts) def _encode_recipient(recipient: object) -> bytes: """Encode a ``BACnetRecipient`` CHOICE. :param recipient: A ``BACnetRecipient`` instance with either a device or address field populated. :returns: Context-tagged encoded bytes for the recipient. """ from bac_py.types.constructed import BACnetRecipient assert isinstance(recipient, BACnetRecipient) if recipient.device is not None: # device [0] ObjectIdentifier return encode_context_object_id(0, recipient.device) if recipient.address is not None: # address [1] BACnetAddress - constructed return b"".join( [ encode_opening_tag(1), encode_context_unsigned(0, recipient.address.network_number), encode_context_octet_string(1, recipient.address.mac_address), encode_closing_tag(1), ] ) # Empty recipient defaults to device context tag with zero-length return encode_context_object_id(0, ObjectIdentifier(ObjectType(0), 0)) def _encode_cov_subscription(sub: object) -> bytes: """Encode a ``BACnetCOVSubscription`` SEQUENCE. :param sub: A ``BACnetCOVSubscription`` instance. :returns: Encoded bytes for the COV subscription. """ from bac_py.types.constructed import BACnetCOVSubscription assert isinstance(sub, BACnetCOVSubscription) parts = [ # recipient [0] BACnetRecipientProcess encode_opening_tag(0), encode_opening_tag(0), # recipient.recipient _encode_recipient(sub.recipient.recipient), encode_closing_tag(0), encode_context_unsigned(1, sub.recipient.process_identifier), encode_closing_tag(0), # monitoredPropertyReference [1] encode_opening_tag(1), encode_context_object_id(0, sub.monitored_object), encode_closing_tag(1), # issueConfirmedNotifications [2] encode_context_boolean(2, sub.issue_confirmed_notifications), # timeRemaining [3] encode_context_unsigned(3, sub.time_remaining), ] # covIncrement [4] OPTIONAL if sub.cov_increment is not None: parts.append(encode_context_real(4, sub.cov_increment)) return b"".join(parts)