"""Audit logging manager per ASHRAE 135-2020 Clause 19.6.
Intercepts auditable operations and records them into Audit Log objects.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from bac_py.objects.audit_log import AuditLogObject
from bac_py.objects.audit_reporter import AuditReporterObject
from bac_py.types.audit_types import BACnetAuditNotification
from bac_py.types.enums import (
AuditLevel,
AuditOperation,
ObjectType,
PropertyIdentifier,
)
from bac_py.types.primitives import ObjectIdentifier
if TYPE_CHECKING:
from bac_py.objects.base import ObjectDatabase
logger = logging.getLogger(__name__)
[docs]
class AuditManager:
"""Manages audit logging per Clause 19.6.
Intercepts auditable operations (writes, creates, deletes, etc.)
and records them into Audit Log objects. Checks audit level and
auditable operations filters before recording.
"""
def __init__(self, object_db: ObjectDatabase) -> None:
self._db = object_db
logger.info("AuditManager started")
[docs]
def record_operation(
self,
operation: AuditOperation,
source_device: ObjectIdentifier | None = None,
target_object: ObjectIdentifier | None = None,
target_property: int | None = None,
target_array_index: int | None = None,
target_priority: int | None = None,
target_value: bytes | None = None,
current_value: bytes | None = None,
invoke_id: int | None = None,
result_error: tuple[int, int] | None = None,
source_comment: str | None = None,
target_comment: str | None = None,
) -> None:
"""Check audit config and record if auditable.
1. Find Audit Reporter object(s)
2. Resolve effective audit level
3. Check auditable_operations bitstring filter
4. Construct BACnetAuditNotification
5. Append to local Audit Log buffer
:param operation: The audit operation being recorded.
:param source_device: Object identifier of the device that initiated
the operation, or ``None`` if unknown.
:param target_object: Object identifier of the affected object.
:param target_property: Property identifier value of the affected
property, or ``None`` if not property-specific.
:param target_array_index: Array index within the property, or
``None`` if not applicable.
:param target_priority: Priority level for commandable writes, or
``None`` if not applicable.
:param target_value: Encoded bytes of the new value written, or
``None`` if not applicable.
:param current_value: Encoded bytes of the value before the
operation, or ``None`` if not captured.
:param invoke_id: BACnet invoke ID from the request, or ``None``.
:param result_error: Error class and code tuple if the operation
failed, or ``None`` on success.
:param source_comment: Free-text comment from the source device.
:param target_comment: Free-text comment about the target.
"""
reporters = self._find_reporters(target_object)
if not reporters:
return
for reporter in reporters:
if not self._should_audit(reporter, operation):
continue
# Build the notification
result_error_class = result_error[0] if result_error else None
result_error_code = result_error[1] if result_error else None
# Find the target device from our device object
target_device = None
device_objects = self._db.get_objects_of_type(ObjectType.DEVICE)
if device_objects:
target_device = device_objects[0].object_identifier
notification = BACnetAuditNotification(
operation=operation,
source_device=source_device,
target_device=target_device,
target_object=target_object,
target_property=target_property,
target_array_index=target_array_index,
target_priority=target_priority,
target_value=target_value,
current_value=current_value,
invoke_id=invoke_id,
result_error_class=result_error_class,
result_error_code=result_error_code,
source_comment=source_comment,
target_comment=target_comment,
)
logger.debug("audit record: %s on %s", operation, target_object)
self._append_to_logs(notification)
def _find_reporters(self, target_oid: ObjectIdentifier | None) -> list[AuditReporterObject]:
"""Find Audit Reporter objects that monitor the target object."""
reporters: list[AuditReporterObject] = []
for obj in self._db.get_objects_of_type(ObjectType.AUDIT_REPORTER):
if not isinstance(obj, AuditReporterObject):
continue
monitored: list[object] = obj._properties.get(PropertyIdentifier.MONITORED_OBJECTS, [])
if not monitored:
# Empty monitored list means monitor everything
reporters.append(obj)
elif target_oid is not None:
# Check if the target is in the monitored list
for m in monitored:
if isinstance(m, ObjectIdentifier) and m == target_oid:
reporters.append(obj)
break
else:
# Also check by object type match
for m in monitored:
if (
isinstance(m, ObjectIdentifier)
and m.object_type == target_oid.object_type
and m.instance_number == 0x3FFFFF
):
reporters.append(obj)
break
return reporters
def _resolve_audit_level(self, reporter: AuditReporterObject) -> AuditLevel:
"""Resolve effective audit level from the reporter."""
level = reporter._properties.get(PropertyIdentifier.AUDIT_LEVEL, AuditLevel.DEFAULT)
try:
return AuditLevel(level)
except ValueError:
return AuditLevel.NONE
def _should_audit(
self,
reporter: AuditReporterObject,
operation: AuditOperation,
) -> bool:
"""Check whether this operation should be audited.
Checks audit level and auditable_operations bitstring filter.
"""
level = self._resolve_audit_level(reporter)
if level == AuditLevel.NONE:
return False
if level == AuditLevel.AUDIT_CONFIG:
# Only audit config changes: WRITE, CREATE, DELETE
config_ops = {
AuditOperation.WRITE,
AuditOperation.CREATE,
AuditOperation.DELETE,
}
if operation not in config_ops:
return False
# Check auditable_operations bitstring
auditable_ops = reporter._properties.get(PropertyIdentifier.AUDITABLE_OPERATIONS)
if auditable_ops is not None and hasattr(auditable_ops, "data"):
op_bit = int(operation)
byte_index = op_bit // 8
bit_index = 7 - (op_bit % 8)
if byte_index < len(auditable_ops.data):
if not (auditable_ops.data[byte_index] & (1 << bit_index)):
return False
else:
# Bit position beyond the bitstring length means not set
return False
return True
def _append_to_logs(self, notification: BACnetAuditNotification) -> None:
"""Append notification to all enabled Audit Log objects."""
for obj in self._db.get_objects_of_type(ObjectType.AUDIT_LOG):
if isinstance(obj, AuditLogObject):
obj.append_record(notification)