Source code for bac_py.objects.audit_log

"""BACnet Audit Log object per ASHRAE 135-2020 Clause 12.64 (new in 2020)."""

from __future__ import annotations

from typing import Any, ClassVar

from bac_py.objects.base import (
    BACnetObject,
    PropertyAccess,
    PropertyDefinition,
    register_object_type,
    standard_properties,
    status_properties,
)
from bac_py.types.audit_types import BACnetAuditLogRecord, BACnetAuditNotification
from bac_py.types.enums import (
    AuditLevel,
    ObjectType,
    PropertyIdentifier,
)
from bac_py.types.primitives import BitString


[docs] @register_object_type class AuditLogObject(BACnetObject): """BACnet Audit Log object (Clause 12.64, new in 2020). Circular buffer of audit log records with sequence numbering. """ OBJECT_TYPE: ClassVar[ObjectType] = ObjectType.AUDIT_LOG PROPERTY_DEFINITIONS: ClassVar[dict[PropertyIdentifier, PropertyDefinition]] = { **standard_properties(), **status_properties(include_out_of_service=False), PropertyIdentifier.LOG_ENABLE: PropertyDefinition( PropertyIdentifier.LOG_ENABLE, bool, PropertyAccess.READ_WRITE, required=True, default=False, ), PropertyIdentifier.LOG_BUFFER: PropertyDefinition( PropertyIdentifier.LOG_BUFFER, list, PropertyAccess.READ_ONLY, required=True, ), PropertyIdentifier.RECORD_COUNT: PropertyDefinition( PropertyIdentifier.RECORD_COUNT, int, PropertyAccess.READ_WRITE, required=True, default=0, ), PropertyIdentifier.TOTAL_RECORD_COUNT: PropertyDefinition( PropertyIdentifier.TOTAL_RECORD_COUNT, int, PropertyAccess.READ_ONLY, required=True, default=0, ), PropertyIdentifier.BUFFER_SIZE: PropertyDefinition( PropertyIdentifier.BUFFER_SIZE, int, PropertyAccess.READ_WRITE, required=True, default=100, ), PropertyIdentifier.STOP_WHEN_FULL: PropertyDefinition( PropertyIdentifier.STOP_WHEN_FULL, bool, PropertyAccess.READ_WRITE, required=True, default=False, ), PropertyIdentifier.AUDIT_LEVEL: PropertyDefinition( PropertyIdentifier.AUDIT_LEVEL, int, PropertyAccess.READ_WRITE, required=True, default=AuditLevel.DEFAULT, ), PropertyIdentifier.AUDITABLE_OPERATIONS: PropertyDefinition( PropertyIdentifier.AUDITABLE_OPERATIONS, BitString, PropertyAccess.READ_WRITE, required=True, ), } def __init__(self, instance_number: int, **initial_properties: Any) -> None: super().__init__(instance_number, **initial_properties) self._init_status_flags() self._set_default(PropertyIdentifier.LOG_BUFFER, []) self._sequence_counter = 0
[docs] def append_record(self, notification: BACnetAuditNotification) -> BACnetAuditLogRecord | None: """Append an audit notification to the log buffer. Handles circular buffer overflow (oldest records removed) and stop-when-full behavior. Returns the record if it was added. """ if not self._properties.get(PropertyIdentifier.LOG_ENABLE, False): return None buffer: list[BACnetAuditLogRecord] = self._properties.get( PropertyIdentifier.LOG_BUFFER, [] ) buffer_size = self._properties.get(PropertyIdentifier.BUFFER_SIZE, 100) stop_when_full = self._properties.get(PropertyIdentifier.STOP_WHEN_FULL, False) if stop_when_full and len(buffer) >= buffer_size: return None self._sequence_counter += 1 record = BACnetAuditLogRecord( sequence_number=self._sequence_counter, notification=notification, ) if len(buffer) >= buffer_size: # Circular: remove oldest buffer.pop(0) buffer.append(record) self._properties[PropertyIdentifier.RECORD_COUNT] = len(buffer) self._properties[PropertyIdentifier.TOTAL_RECORD_COUNT] = self._sequence_counter return record
[docs] def query_records( self, start_at: int | None = None, count: int = 100, ) -> tuple[list[BACnetAuditLogRecord], bool]: """Query records from the log buffer. :param start_at: Starting sequence number (inclusive). If None, start from beginning. :param count: Maximum number of records to return. :returns: Tuple of (matching records, no_more_items flag). """ buffer: list[BACnetAuditLogRecord] = self._properties.get( PropertyIdentifier.LOG_BUFFER, [] ) if start_at is not None: filtered = [r for r in buffer if r.sequence_number >= start_at] else: filtered = list(buffer) result = filtered[:count] no_more = len(filtered) <= count return result, no_more