"""COV (Change of Value) subscription manager per ASHRAE 135-2016 Clause 13.1."""
from __future__ import annotations
import asyncio
import logging
import time
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
from bac_py.app._object_type_sets import ANALOG_TYPES
from bac_py.encoding.primitives import (
encode_application_bit_string,
encode_property_value,
)
from bac_py.services.cov import BACnetPropertyValue, COVNotificationRequest
from bac_py.services.errors import BACnetError
from bac_py.types.constructed import StatusFlags
from bac_py.types.enums import (
ConfirmedServiceChoice,
ErrorClass,
ErrorCode,
ObjectType,
PropertyIdentifier,
UnconfirmedServiceChoice,
)
from bac_py.types.primitives import BitString
if TYPE_CHECKING:
from bac_py.app.application import BACnetApplication
from bac_py.network.address import BACnetAddress
from bac_py.objects.base import BACnetObject, ObjectDatabase
from bac_py.services.cov import (
SubscribeCOVPropertyMultipleRequest,
SubscribeCOVPropertyRequest,
SubscribeCOVRequest,
)
from bac_py.types.primitives import ObjectIdentifier
logger = logging.getLogger(__name__)
[docs]
@dataclass
class COVSubscription:
"""Tracks a single COV subscription."""
subscriber: BACnetAddress
"""BACnet address of the subscribing device."""
process_id: int
"""Subscriber-assigned process identifier."""
monitored_object: ObjectIdentifier
"""Object identifier being monitored."""
confirmed: bool
"""``True`` for confirmed notifications, ``False`` for unconfirmed."""
lifetime: float | None # None = indefinite; seconds
"""Subscription duration in seconds, or ``None`` for indefinite."""
created_at: float = field(default_factory=time.monotonic)
"""Monotonic timestamp when the subscription was created."""
expiry_handle: asyncio.TimerHandle | None = None
"""Timer handle for subscription expiry, if any."""
last_present_value: Any = None
"""Last notified Present_Value, used for COV comparison."""
last_status_flags: Any = None
"""Last notified Status_Flags, used for change detection."""
[docs]
@dataclass
class PropertySubscription:
"""Tracks a single property-level COV subscription (Clause 13.15/13.16)."""
subscriber: BACnetAddress
"""BACnet address of the subscribing device."""
process_id: int
"""Subscriber-assigned process identifier."""
monitored_object: ObjectIdentifier
"""Object identifier being monitored."""
monitored_property: int # PropertyIdentifier value
"""Property identifier being monitored."""
property_array_index: int | None
"""Optional array index within the monitored property."""
confirmed: bool
"""``True`` for confirmed notifications, ``False`` for unconfirmed."""
lifetime: float | None
"""Subscription duration in seconds, or ``None`` for indefinite."""
cov_increment: float | None
"""Subscription-specific COV increment override, or ``None``."""
created_at: float = field(default_factory=time.monotonic)
"""Monotonic timestamp when the subscription was created."""
last_value: Any = None
"""Last notified value for the monitored property."""
expiry_handle: asyncio.TimerHandle | None = None
"""Timer handle for subscription expiry, if any."""
[docs]
class COVManager:
"""Manages COV subscriptions and notification dispatch.
Per Clause 13.1, COV notifications are sent when:
- Analog objects: ``|new - last| >= COV_INCREMENT`` (any change if no increment set)
- Binary/multistate objects: any change in Present_Value
- Any object: change in Status_Flags
"""
def __init__(
self,
app: BACnetApplication,
*,
max_subscriptions: int = 1000,
max_property_subscriptions: int = 1000,
) -> None:
self._app = app
self._max_subscriptions = max_subscriptions
self._max_property_subscriptions = max_property_subscriptions
self._subscriptions: dict[
tuple[Any, int, Any], # (subscriber, process_id, object_id)
COVSubscription,
] = {}
self._property_subscriptions: dict[
tuple[Any, int, Any, int, int | None],
# (subscriber, process_id, object_id, property_id, array_index)
PropertySubscription,
] = {}
# Secondary indices for O(k) lookup in check_and_notify (vs O(N) scan)
self._subs_by_object: dict[Any, dict[tuple[Any, int, Any], COVSubscription]] = {}
self._prop_subs_by_obj_prop: dict[
tuple[Any, int], # (object_id, property_id)
dict[tuple[Any, int, Any, int, int | None], PropertySubscription],
] = {}
[docs]
def subscribe(
self,
subscriber: BACnetAddress,
request: SubscribeCOVRequest,
object_db: ObjectDatabase,
) -> None:
"""Add or update a COV subscription.
:param subscriber: Address of the subscribing device.
:param request: The decoded SubscribeCOV-Request.
:param object_db: Object database to validate the object exists.
:raises BACnetError: If the monitored object does not exist.
"""
obj_id = request.monitored_object_identifier
obj = object_db.get(obj_id)
if obj is None:
raise BACnetError(ErrorClass.OBJECT, ErrorCode.UNKNOWN_OBJECT)
key = (subscriber, request.subscriber_process_identifier, obj_id)
# Cancel existing subscription timer if replacing
existing = self._subscriptions.get(key)
if existing and existing.expiry_handle:
existing.expiry_handle.cancel()
# Reject if at capacity and this is a new subscription
if existing is None and len(self._subscriptions) >= self._max_subscriptions:
raise BACnetError(ErrorClass.RESOURCES, ErrorCode.NO_SPACE_TO_ADD_LIST_ELEMENT)
# Capture initial values for COV comparison
present_value = self._read_present_value(obj)
status_flags = self._read_status_flags(obj)
confirmed = request.issue_confirmed_notifications or False
lifetime = request.lifetime
sub = COVSubscription(
subscriber=subscriber,
process_id=request.subscriber_process_identifier,
monitored_object=obj_id,
confirmed=confirmed,
lifetime=float(lifetime) if lifetime is not None else None,
last_present_value=present_value,
last_status_flags=status_flags,
)
self._subscriptions[key] = sub
self._subs_by_object.setdefault(obj_id, {})[key] = sub
logger.info("COV subscription created: %s subscriber=%s", obj_id, subscriber)
# Start lifetime timer if applicable
if lifetime is not None and lifetime > 0:
loop = asyncio.get_running_loop()
sub.expiry_handle = loop.call_later(
float(lifetime), self._on_subscription_expired, key
)
# Per Clause 13.1.2, send initial notification with current values
self._send_notification(sub, obj)
[docs]
def unsubscribe(
self,
subscriber: BACnetAddress,
process_id: int,
monitored_object: ObjectIdentifier,
) -> None:
"""Remove a subscription (cancellation).
Silently ignores if no matching subscription exists.
:param subscriber: Address of the subscribing device.
:param process_id: Subscriber-assigned process identifier.
:param monitored_object: Object identifier being unsubscribed.
"""
key = (subscriber, process_id, monitored_object)
sub = self._subscriptions.pop(key, None)
if sub:
if sub.expiry_handle:
sub.expiry_handle.cancel()
obj_bucket = self._subs_by_object.get(monitored_object)
if obj_bucket is not None:
obj_bucket.pop(key, None)
if not obj_bucket:
del self._subs_by_object[monitored_object]
logger.info("COV subscription removed: %s", monitored_object)
[docs]
def check_and_notify(
self,
obj: BACnetObject,
changed_property: PropertyIdentifier,
) -> None:
"""Check all subscriptions for this object and send notifications if needed.
Called after a property write. Compares current values against
last-reported values using COV increment logic per Clause 13.1.
:param obj: The object whose property was changed.
:param changed_property: The property that was written.
"""
obj_id = obj.object_identifier
obj_bucket = self._subs_by_object.get(obj_id)
if obj_bucket:
for _key, sub in list(obj_bucket.items()):
if self._should_notify(sub, obj):
self._send_notification(sub, obj)
# Update last-reported values
sub.last_present_value = self._read_present_value(obj)
sub.last_status_flags = self._read_status_flags(obj)
# Also check property-level subscriptions
self.check_and_notify_property(obj, changed_property)
[docs]
def get_active_subscriptions(
self,
object_id: ObjectIdentifier | None = None,
) -> list[COVSubscription]:
"""Return active subscriptions, optionally filtered by object."""
if object_id is None:
return list(self._subscriptions.values())
obj_bucket = self._subs_by_object.get(object_id)
return list(obj_bucket.values()) if obj_bucket else []
[docs]
def shutdown(self) -> None:
"""Cancel all subscription timers."""
for sub in self._subscriptions.values():
if sub.expiry_handle:
sub.expiry_handle.cancel()
self._subscriptions.clear()
self._subs_by_object.clear()
for prop_sub in self._property_subscriptions.values():
if prop_sub.expiry_handle:
prop_sub.expiry_handle.cancel()
self._property_subscriptions.clear()
self._prop_subs_by_obj_prop.clear()
[docs]
def remove_object_subscriptions(self, object_id: ObjectIdentifier) -> None:
"""Remove all subscriptions for a deleted object.
Called when an object is removed from the database to clean
up any outstanding COV subscriptions per Clause 13.1.
"""
# Use secondary index for O(k) cleanup of object-level subscriptions
obj_bucket = self._subs_by_object.pop(object_id, None)
if obj_bucket:
for key, sub in obj_bucket.items():
self._subscriptions.pop(key, None)
if sub.expiry_handle:
sub.expiry_handle.cancel()
# Remove property subscriptions for this object
prop_idx_keys = [
idx_key for idx_key in self._prop_subs_by_obj_prop if idx_key[0] == object_id
]
for idx_key in prop_idx_keys:
prop_bucket = self._prop_subs_by_obj_prop.pop(idx_key)
for pkey, psub in prop_bucket.items():
self._property_subscriptions.pop(pkey, None)
if psub.expiry_handle:
psub.expiry_handle.cancel()
[docs]
def subscribe_property(
self,
subscriber: BACnetAddress,
request: SubscribeCOVPropertyRequest,
object_db: ObjectDatabase,
) -> None:
"""Add or update a property-level COV subscription (Clause 13.15).
:param subscriber: Address of the subscribing device.
:param request: The decoded SubscribeCOVProperty-Request.
:param object_db: Object database to validate the object exists.
:raises BACnetError: If the monitored object does not exist.
"""
obj_id = request.monitored_object_identifier
obj = object_db.get(obj_id)
if obj is None:
raise BACnetError(ErrorClass.OBJECT, ErrorCode.UNKNOWN_OBJECT)
prop_ref = request.monitored_property_identifier
prop_id = prop_ref.property_identifier
array_index = prop_ref.property_array_index
key: tuple[Any, int, Any, int, int | None] = (
subscriber,
request.subscriber_process_identifier,
obj_id,
prop_id,
array_index,
)
# Cancel existing subscription timer if replacing
existing = self._property_subscriptions.get(key)
if existing and existing.expiry_handle:
existing.expiry_handle.cancel()
# Reject if at capacity and this is a new subscription
if (
existing is None
and len(self._property_subscriptions) >= self._max_property_subscriptions
):
raise BACnetError(ErrorClass.RESOURCES, ErrorCode.NO_SPACE_TO_ADD_LIST_ELEMENT)
# Capture initial value of the specific property
initial_value = self._read_property_value(obj, prop_id, array_index)
confirmed = request.issue_confirmed_notifications or False
lifetime = request.lifetime
sub = PropertySubscription(
subscriber=subscriber,
process_id=request.subscriber_process_identifier,
monitored_object=obj_id,
monitored_property=prop_id,
property_array_index=array_index,
confirmed=confirmed,
lifetime=float(lifetime) if lifetime is not None else None,
cov_increment=request.cov_increment,
last_value=initial_value,
)
self._property_subscriptions[key] = sub
self._prop_subs_by_obj_prop.setdefault((obj_id, prop_id), {})[key] = sub
# Start lifetime timer if applicable
if lifetime is not None and lifetime > 0:
loop = asyncio.get_running_loop()
sub.expiry_handle = loop.call_later(
float(lifetime), self._on_property_subscription_expired, key
)
# Send initial notification with current values
self._send_property_notification(sub, obj)
[docs]
def subscribe_property_multiple(
self,
subscriber: BACnetAddress,
request: SubscribeCOVPropertyMultipleRequest,
object_db: ObjectDatabase,
) -> None:
"""Add or update multiple property-level COV subscriptions (Clause 13.16).
:param subscriber: Address of the subscribing device.
:param request: The decoded SubscribeCOVPropertyMultiple-Request.
:param object_db: Object database to validate objects exist.
:raises BACnetError: If any monitored object does not exist.
"""
confirmed = request.issue_confirmed_notifications or False
lifetime = request.lifetime
for spec in request.list_of_cov_subscription_specifications:
obj_id = spec.monitored_object_identifier
obj = object_db.get(obj_id)
if obj is None:
raise BACnetError(ErrorClass.OBJECT, ErrorCode.UNKNOWN_OBJECT)
for ref in spec.list_of_cov_references:
prop_id = ref.monitored_property.property_identifier
array_index = ref.monitored_property.property_array_index
key: tuple[Any, int, Any, int, int | None] = (
subscriber,
request.subscriber_process_identifier,
obj_id,
prop_id,
array_index,
)
# Cancel existing subscription timer if replacing
existing = self._property_subscriptions.get(key)
if existing and existing.expiry_handle:
existing.expiry_handle.cancel()
# Reject if at capacity and this is a new subscription
if (
existing is None
and len(self._property_subscriptions) >= self._max_property_subscriptions
):
raise BACnetError(ErrorClass.RESOURCES, ErrorCode.NO_SPACE_TO_ADD_LIST_ELEMENT)
# Capture initial value of the specific property
initial_value = self._read_property_value(obj, prop_id, array_index)
sub = PropertySubscription(
subscriber=subscriber,
process_id=request.subscriber_process_identifier,
monitored_object=obj_id,
monitored_property=prop_id,
property_array_index=array_index,
confirmed=confirmed,
lifetime=float(lifetime) if lifetime is not None else None,
cov_increment=ref.cov_increment,
last_value=initial_value,
)
self._property_subscriptions[key] = sub
self._prop_subs_by_obj_prop.setdefault((obj_id, prop_id), {})[key] = sub
# Start lifetime timer if applicable
if lifetime is not None and lifetime > 0:
loop = asyncio.get_running_loop()
sub.expiry_handle = loop.call_later(
float(lifetime), self._on_property_subscription_expired, key
)
# Send initial notification with current values
self._send_property_notification(sub, obj)
[docs]
def unsubscribe_property(
self,
subscriber: BACnetAddress,
process_id: int,
obj_id: ObjectIdentifier,
property_id: int,
array_index: int | None = None,
) -> None:
"""Remove a property-level subscription (cancellation).
Silently ignores if no matching subscription exists.
:param subscriber: Address of the subscribing device.
:param process_id: Subscriber-assigned process identifier.
:param obj_id: Object identifier being monitored.
:param property_id: Property identifier value being monitored.
:param array_index: Optional array index within the property.
"""
key: tuple[Any, int, Any, int, int | None] = (
subscriber,
process_id,
obj_id,
property_id,
array_index,
)
sub = self._property_subscriptions.pop(key, None)
if sub:
if sub.expiry_handle:
sub.expiry_handle.cancel()
idx_key = (obj_id, property_id)
prop_bucket = self._prop_subs_by_obj_prop.get(idx_key)
if prop_bucket is not None:
prop_bucket.pop(key, None)
if not prop_bucket:
del self._prop_subs_by_obj_prop[idx_key]
[docs]
def check_and_notify_property(
self,
obj: BACnetObject,
changed_property: PropertyIdentifier,
) -> None:
"""Check property-level subscriptions and send notifications if needed.
For all property subscriptions matching this object and the changed
property, check if the value changed enough to trigger a notification.
For analog types, a subscription-specific ``cov_increment`` overrides
the object's COV_INCREMENT. For non-analog types, any change triggers
a notification.
:param obj: The object whose property was changed.
:param changed_property: The property that was written.
"""
obj_id = obj.object_identifier
changed_prop_int = int(changed_property)
prop_bucket = self._prop_subs_by_obj_prop.get((obj_id, changed_prop_int))
if not prop_bucket:
return
for _key, sub in list(prop_bucket.items()):
current_value = self._read_property_value(
obj, sub.monitored_property, sub.property_array_index
)
if self._should_notify_property(sub, obj, current_value):
self._send_property_notification(sub, obj)
sub.last_value = current_value
def _should_notify_property(
self,
sub: PropertySubscription,
obj: BACnetObject,
current_value: Any,
) -> bool:
"""Determine if a property-level notification should be sent.
:param sub: The property subscription to evaluate.
:param obj: The monitored object.
:param current_value: The current value of the monitored property.
:returns: ``True`` if a notification should be sent.
"""
if current_value == sub.last_value:
return False
obj_type = obj.object_identifier.object_type
if obj_type in ANALOG_TYPES:
# Use subscription-specific cov_increment if available,
# otherwise fall back to the object's COV_INCREMENT
cov_increment = sub.cov_increment
if cov_increment is None:
cov_increment = self._read_cov_increment(obj)
if cov_increment is not None and cov_increment > 0:
if sub.last_value is None:
return True
if isinstance(current_value, (int, float)) and isinstance(
sub.last_value, (int, float)
):
return abs(current_value - sub.last_value) >= cov_increment
# No COV_INCREMENT or zero: any change triggers
return True
# Non-analog: any change triggers notification
return True
def _send_property_notification(
self,
sub: PropertySubscription,
obj: BACnetObject,
) -> None:
"""Send a COV notification for a property-level subscription.
Builds a standard COVNotificationRequest with the monitored property's
value and Status_Flags in ``list_of_values``, then sends via the
application layer.
:param sub: The property subscription triggering the notification.
:param obj: The monitored object.
"""
prop_value = self._read_property_value(
obj, sub.monitored_property, sub.property_array_index
)
status_flags = self._read_status_flags(obj)
prop_bytes = self._encode_value(prop_value, obj.object_identifier.object_type)
sf_bytes = self._encode_status_flags(status_flags)
list_of_values = [
BACnetPropertyValue(
property_identifier=PropertyIdentifier(sub.monitored_property),
property_array_index=sub.property_array_index,
value=prop_bytes,
),
BACnetPropertyValue(
property_identifier=PropertyIdentifier.STATUS_FLAGS,
value=sf_bytes,
),
]
# Compute time_remaining
time_remaining = 0
if sub.lifetime is not None:
elapsed = time.monotonic() - sub.created_at
remaining = max(0, sub.lifetime - elapsed)
time_remaining = int(remaining)
device_id = self._app.device_object_identifier
notification = COVNotificationRequest(
subscriber_process_identifier=sub.process_id,
initiating_device_identifier=device_id,
monitored_object_identifier=sub.monitored_object,
time_remaining=time_remaining,
list_of_values=list_of_values,
)
encoded = notification.encode()
if sub.confirmed:
self._app.send_confirmed_cov_notification(
encoded, sub.subscriber, ConfirmedServiceChoice.CONFIRMED_COV_NOTIFICATION
)
else:
self._app.unconfirmed_request(
destination=sub.subscriber,
service_choice=UnconfirmedServiceChoice.UNCONFIRMED_COV_NOTIFICATION,
service_data=encoded,
)
def _on_property_subscription_expired(
self,
key: tuple[Any, int, Any, int, int | None],
) -> None:
"""Remove an expired property subscription.
:param key: The subscription key to remove.
"""
sub = self._property_subscriptions.pop(key, None)
if sub:
idx_key = (sub.monitored_object, sub.monitored_property)
prop_bucket = self._prop_subs_by_obj_prop.get(idx_key)
if prop_bucket is not None:
prop_bucket.pop(key, None)
if not prop_bucket:
del self._prop_subs_by_obj_prop[idx_key]
logger.debug(
"Property COV subscription expired: process_id=%d, object=%s, property=%d",
sub.process_id,
sub.monitored_object,
sub.monitored_property,
)
@staticmethod
def _read_property_value(
obj: BACnetObject,
property_id: int,
array_index: int | None = None,
) -> Any:
"""Read a specific property value, returning None if not available.
:param obj: The object to read from.
:param property_id: The property identifier value.
:param array_index: Optional array index.
:returns: The property value, or ``None`` if unavailable.
"""
try:
return obj.read_property(PropertyIdentifier(property_id), array_index=array_index)
except (BACnetError, ValueError):
return None
def _should_notify(
self,
sub: COVSubscription,
obj: BACnetObject,
) -> bool:
"""Determine if a notification should be sent based on COV rules.
Per Clause 13.1:
- Analog objects: ``|new_value - last_reported| >= COV_INCREMENT``
- Binary/multistate: any change in Present_Value
- Any: change in Status_Flags
:param sub: The subscription to evaluate.
:param obj: The monitored object.
:returns: ``True`` if a notification should be sent.
"""
current_pv = self._read_present_value(obj)
current_sf = self._read_status_flags(obj)
# Check Status_Flags change
if current_sf != sub.last_status_flags:
return True
# Check Present_Value change
if current_pv == sub.last_present_value:
return False
obj_type = obj.object_identifier.object_type
if obj_type in ANALOG_TYPES:
# Analog: use COV_INCREMENT if available
cov_increment = self._read_cov_increment(obj)
if cov_increment is not None and cov_increment > 0:
if sub.last_present_value is None:
return True
if isinstance(current_pv, (int, float)) and isinstance(
sub.last_present_value, (int, float)
):
return abs(current_pv - sub.last_present_value) >= cov_increment
# No COV_INCREMENT or zero: any change triggers
return True
# Binary/multistate: any change triggers
return True
def _send_notification(self, sub: COVSubscription, obj: BACnetObject) -> None:
"""Send a COV notification (confirmed or unconfirmed).
Builds the notification with Present_Value and Status_Flags,
then sends via the application layer.
:param sub: The subscription triggering the notification.
:param obj: The monitored object.
"""
logger.debug("COV notification for %s to %s", sub.monitored_object, sub.subscriber)
# Build list_of_values with Present_Value and Status_Flags
present_value = self._read_present_value(obj)
status_flags = self._read_status_flags(obj)
pv_bytes = self._encode_value(present_value, obj.object_identifier.object_type)
sf_bytes = self._encode_status_flags(status_flags)
list_of_values = [
BACnetPropertyValue(
property_identifier=PropertyIdentifier.PRESENT_VALUE,
value=pv_bytes,
),
BACnetPropertyValue(
property_identifier=PropertyIdentifier.STATUS_FLAGS,
value=sf_bytes,
),
]
# Compute time_remaining
time_remaining = 0
if sub.lifetime is not None:
elapsed = time.monotonic() - sub.created_at
remaining = max(0, sub.lifetime - elapsed)
time_remaining = int(remaining)
# Build device identifier
device_id = self._app.device_object_identifier
notification = COVNotificationRequest(
subscriber_process_identifier=sub.process_id,
initiating_device_identifier=device_id,
monitored_object_identifier=sub.monitored_object,
time_remaining=time_remaining,
list_of_values=list_of_values,
)
encoded = notification.encode()
if sub.confirmed:
self._app.send_confirmed_cov_notification(
encoded, sub.subscriber, ConfirmedServiceChoice.CONFIRMED_COV_NOTIFICATION
)
else:
self._app.unconfirmed_request(
destination=sub.subscriber,
service_choice=UnconfirmedServiceChoice.UNCONFIRMED_COV_NOTIFICATION,
service_data=encoded,
)
def _on_subscription_expired(self, key: tuple[Any, int, Any]) -> None:
"""Remove expired subscription."""
sub = self._subscriptions.pop(key, None)
if sub:
obj_bucket = self._subs_by_object.get(sub.monitored_object)
if obj_bucket is not None:
obj_bucket.pop(key, None)
if not obj_bucket:
del self._subs_by_object[sub.monitored_object]
logger.debug(
"COV subscription expired: process_id=%d, object=%s",
sub.process_id,
sub.monitored_object,
)
@staticmethod
def _read_present_value(obj: BACnetObject) -> Any:
"""Read Present_Value, returning None if not available."""
try:
return obj.read_property(PropertyIdentifier.PRESENT_VALUE)
except BACnetError:
return None
@staticmethod
def _read_status_flags(obj: BACnetObject) -> Any:
"""Read Status_Flags, returning None if not available."""
try:
return obj.read_property(PropertyIdentifier.STATUS_FLAGS)
except BACnetError:
return None
@staticmethod
def _read_cov_increment(obj: BACnetObject) -> float | None:
"""Read COV_INCREMENT, returning None if not set."""
try:
value = obj.read_property(PropertyIdentifier.COV_INCREMENT)
if isinstance(value, (int, float)):
return float(value)
except BACnetError:
pass
return None
@staticmethod
def _encode_value(value: Any, obj_type: ObjectType) -> bytes:
"""Encode a property value to application-tagged bytes for COV notification.
Delegates to :func:`encode_property_value`. For analog object
types, integers are encoded as REAL; otherwise the native type
encoding is used. Returns raw application-tagged bytes suitable
for inclusion in a BACnetPropertyValue sequence.
"""
return encode_property_value(value, int_as_real=obj_type in ANALOG_TYPES)
@staticmethod
def _encode_status_flags(status_flags: Any) -> bytes:
"""Encode StatusFlags to application-tagged bytes.
Accepts a :class:`StatusFlags` dataclass, a raw
:class:`BitString`, or any other value (in which case
all-clear flags are returned as a fallback).
"""
if isinstance(status_flags, StatusFlags):
return encode_application_bit_string(status_flags.to_bit_string())
if isinstance(status_flags, BitString):
return encode_application_bit_string(status_flags)
# Default: all-clear flags
return encode_application_bit_string(BitString(bytes([0x00]), unused_bits=4))