Source code for bac_py.services.file_access

"""Atomic file access services per ASHRAE 135-2016 Clause 14.

AtomicReadFile (Clause 14.1), AtomicWriteFile (Clause 14.2).
"""

from __future__ import annotations

from dataclasses import dataclass

from bac_py.encoding.primitives import (
    decode_object_identifier,
    decode_octet_string,
    decode_signed,
    decode_unsigned,
    encode_application_boolean,
    encode_application_object_id,
    encode_application_octet_string,
    encode_application_signed,
    encode_application_unsigned,
    encode_context_tagged,
    encode_signed,
)
from bac_py.encoding.tags import as_memoryview, decode_tag, encode_closing_tag, encode_opening_tag
from bac_py.types.enums import ObjectType
from bac_py.types.primitives import ObjectIdentifier

# --- AtomicReadFile ---


[docs] @dataclass(frozen=True, slots=True) class StreamReadAccess: """Stream access parameters for AtomicReadFile-Request.""" file_start_position: int requested_octet_count: int
[docs] @dataclass(frozen=True, slots=True) class RecordReadAccess: """Record access parameters for AtomicReadFile-Request.""" file_start_record: int requested_record_count: int
[docs] @dataclass(frozen=True, slots=True) class AtomicReadFileRequest: """AtomicReadFile-Request (Clause 14.1.1.1). :: AtomicReadFile-Request ::= SEQUENCE { fileIdentifier BACnetObjectIdentifier, accessMethod CHOICE { streamAccess [0] SEQUENCE { fileStartPosition INTEGER, requestedOctetCount Unsigned }, recordAccess [1] SEQUENCE { fileStartRecord INTEGER, requestedRecordCount Unsigned } } } """ file_identifier: ObjectIdentifier access_method: StreamReadAccess | RecordReadAccess
[docs] def encode(self) -> bytes: """Encode AtomicReadFile-Request service parameters. :returns: Encoded service request bytes. """ buf = bytearray() # fileIdentifier (APPLICATION-tagged) buf.extend( encode_application_object_id( self.file_identifier.object_type, self.file_identifier.instance_number, ) ) if isinstance(self.access_method, StreamReadAccess): buf.extend(encode_opening_tag(0)) buf.extend(encode_application_signed(self.access_method.file_start_position)) buf.extend(encode_application_unsigned(self.access_method.requested_octet_count)) buf.extend(encode_closing_tag(0)) else: buf.extend(encode_opening_tag(1)) buf.extend(encode_application_signed(self.access_method.file_start_record)) buf.extend(encode_application_unsigned(self.access_method.requested_record_count)) buf.extend(encode_closing_tag(1)) return bytes(buf)
[docs] @classmethod def decode(cls, data: memoryview | bytes) -> AtomicReadFileRequest: """Decode AtomicReadFile-Request from service request bytes. :param data: Raw service request bytes. :returns: Decoded :class:`AtomicReadFileRequest`. :raises ValueError: If the access method CHOICE tag is unrecognized. """ data = as_memoryview(data) offset = 0 # fileIdentifier (APPLICATION tag 12) tag, offset = decode_tag(data, offset) obj_type, instance = decode_object_identifier(data[offset : offset + tag.length]) offset += tag.length file_identifier = ObjectIdentifier(ObjectType(obj_type), instance) # accessMethod CHOICE tag, offset = decode_tag(data, offset) access_method: StreamReadAccess | RecordReadAccess if tag.number == 0 and tag.is_opening: # streamAccess t, offset = decode_tag(data, offset) file_start_position = decode_signed(data[offset : offset + t.length]) offset += t.length t, offset = decode_tag(data, offset) requested_octet_count = decode_unsigned(data[offset : offset + t.length]) offset += t.length _closing, offset = decode_tag(data, offset) access_method = StreamReadAccess(file_start_position, requested_octet_count) elif tag.number == 1 and tag.is_opening: # recordAccess t, offset = decode_tag(data, offset) file_start_record = decode_signed(data[offset : offset + t.length]) offset += t.length t, offset = decode_tag(data, offset) requested_record_count = decode_unsigned(data[offset : offset + t.length]) offset += t.length _closing, offset = decode_tag(data, offset) access_method = RecordReadAccess(file_start_record, requested_record_count) else: msg = f"Unexpected tag {tag.number} in AtomicReadFile CHOICE" raise ValueError(msg) return cls(file_identifier=file_identifier, access_method=access_method)
[docs] @dataclass(frozen=True, slots=True) class StreamReadACK: """Stream access result for AtomicReadFile-ACK.""" file_start_position: int file_data: bytes
[docs] @dataclass(frozen=True, slots=True) class RecordReadACK: """Record access result for AtomicReadFile-ACK.""" file_start_record: int returned_record_count: int file_record_data: list[bytes]
[docs] @dataclass(frozen=True, slots=True) class AtomicReadFileACK: """AtomicReadFile-ACK (Clause 14.1.1.2). :: AtomicReadFile-ACK ::= SEQUENCE { endOfFile BOOLEAN, accessMethod CHOICE { streamAccess [0] SEQUENCE { fileStartPosition INTEGER, fileData OCTET STRING }, recordAccess [1] SEQUENCE { fileStartRecord INTEGER, returnedRecordCount Unsigned, fileRecordData SEQUENCE OF OCTET STRING } } } """ end_of_file: bool access_method: StreamReadACK | RecordReadACK
[docs] def encode(self) -> bytes: """Encode AtomicReadFile-ACK service parameters. :returns: Encoded service ACK bytes. """ buf = bytearray() # endOfFile (APPLICATION-tagged boolean) buf.extend(encode_application_boolean(self.end_of_file)) if isinstance(self.access_method, StreamReadACK): buf.extend(encode_opening_tag(0)) buf.extend(encode_application_signed(self.access_method.file_start_position)) buf.extend(encode_application_octet_string(self.access_method.file_data)) buf.extend(encode_closing_tag(0)) else: buf.extend(encode_opening_tag(1)) buf.extend(encode_application_signed(self.access_method.file_start_record)) buf.extend(encode_application_unsigned(self.access_method.returned_record_count)) for record in self.access_method.file_record_data: buf.extend(encode_application_octet_string(record)) buf.extend(encode_closing_tag(1)) return bytes(buf)
[docs] @classmethod def decode(cls, data: memoryview | bytes) -> AtomicReadFileACK: """Decode AtomicReadFile-ACK from service ACK bytes. :param data: Raw service ACK bytes. :returns: Decoded :class:`AtomicReadFileACK`. :raises ValueError: If the access method CHOICE tag is unrecognized. """ data = as_memoryview(data) offset = 0 # endOfFile (APPLICATION-tagged boolean, tag 1) # Per Clause 20.2.3, the boolean value is in the tag's L/V/T field. tag, offset = decode_tag(data, offset) end_of_file = tag.is_boolean_true # accessMethod CHOICE tag, offset = decode_tag(data, offset) access_method: StreamReadACK | RecordReadACK if tag.number == 0 and tag.is_opening: # streamAccess t, offset = decode_tag(data, offset) file_start_position = decode_signed(data[offset : offset + t.length]) offset += t.length t, offset = decode_tag(data, offset) file_data = decode_octet_string(data[offset : offset + t.length]) offset += t.length _closing, offset = decode_tag(data, offset) access_method = StreamReadACK(file_start_position, file_data) elif tag.number == 1 and tag.is_opening: # recordAccess t, offset = decode_tag(data, offset) file_start_record = decode_signed(data[offset : offset + t.length]) offset += t.length t, offset = decode_tag(data, offset) returned_record_count = decode_unsigned(data[offset : offset + t.length]) offset += t.length file_record_data: list[bytes] = [] for _ in range(returned_record_count): t, offset = decode_tag(data, offset) file_record_data.append(decode_octet_string(data[offset : offset + t.length])) offset += t.length _closing, offset = decode_tag(data, offset) access_method = RecordReadACK( file_start_record, returned_record_count, file_record_data ) else: msg = f"Unexpected tag {tag.number} in AtomicReadFileACK CHOICE" raise ValueError(msg) return cls(end_of_file=end_of_file, access_method=access_method)
# --- AtomicWriteFile ---
[docs] @dataclass(frozen=True, slots=True) class StreamWriteAccess: """Stream access parameters for AtomicWriteFile-Request.""" file_start_position: int file_data: bytes
[docs] @dataclass(frozen=True, slots=True) class RecordWriteAccess: """Record access parameters for AtomicWriteFile-Request.""" file_start_record: int record_count: int file_record_data: list[bytes]
[docs] @dataclass(frozen=True, slots=True) class AtomicWriteFileRequest: """AtomicWriteFile-Request (Clause 14.2.1.1). :: AtomicWriteFile-Request ::= SEQUENCE { fileIdentifier BACnetObjectIdentifier, accessMethod CHOICE { streamAccess [0] SEQUENCE { fileStartPosition INTEGER, fileData OCTET STRING }, recordAccess [1] SEQUENCE { fileStartRecord INTEGER, recordCount Unsigned, fileRecordData SEQUENCE OF OCTET STRING } } } """ file_identifier: ObjectIdentifier access_method: StreamWriteAccess | RecordWriteAccess
[docs] def encode(self) -> bytes: """Encode AtomicWriteFile-Request service parameters. :returns: Encoded service request bytes. """ buf = bytearray() buf.extend( encode_application_object_id( self.file_identifier.object_type, self.file_identifier.instance_number, ) ) if isinstance(self.access_method, StreamWriteAccess): buf.extend(encode_opening_tag(0)) buf.extend(encode_application_signed(self.access_method.file_start_position)) buf.extend(encode_application_octet_string(self.access_method.file_data)) buf.extend(encode_closing_tag(0)) else: buf.extend(encode_opening_tag(1)) buf.extend(encode_application_signed(self.access_method.file_start_record)) buf.extend(encode_application_unsigned(self.access_method.record_count)) for record in self.access_method.file_record_data: buf.extend(encode_application_octet_string(record)) buf.extend(encode_closing_tag(1)) return bytes(buf)
[docs] @classmethod def decode(cls, data: memoryview | bytes) -> AtomicWriteFileRequest: """Decode AtomicWriteFile-Request from service request bytes. :param data: Raw service request bytes. :returns: Decoded :class:`AtomicWriteFileRequest`. :raises ValueError: If the access method CHOICE tag is unrecognized. """ data = as_memoryview(data) offset = 0 # fileIdentifier (APPLICATION tag 12) tag, offset = decode_tag(data, offset) obj_type, instance = decode_object_identifier(data[offset : offset + tag.length]) offset += tag.length file_identifier = ObjectIdentifier(ObjectType(obj_type), instance) # accessMethod CHOICE tag, offset = decode_tag(data, offset) access_method: StreamWriteAccess | RecordWriteAccess if tag.number == 0 and tag.is_opening: # streamAccess t, offset = decode_tag(data, offset) file_start_position = decode_signed(data[offset : offset + t.length]) offset += t.length t, offset = decode_tag(data, offset) file_data = decode_octet_string(data[offset : offset + t.length]) offset += t.length _closing, offset = decode_tag(data, offset) access_method = StreamWriteAccess(file_start_position, file_data) elif tag.number == 1 and tag.is_opening: # recordAccess t, offset = decode_tag(data, offset) file_start_record = decode_signed(data[offset : offset + t.length]) offset += t.length t, offset = decode_tag(data, offset) record_count = decode_unsigned(data[offset : offset + t.length]) offset += t.length file_record_data: list[bytes] = [] for _ in range(record_count): t, offset = decode_tag(data, offset) file_record_data.append(decode_octet_string(data[offset : offset + t.length])) offset += t.length _closing, offset = decode_tag(data, offset) access_method = RecordWriteAccess(file_start_record, record_count, file_record_data) else: msg = f"Unexpected tag {tag.number} in AtomicWriteFile CHOICE" raise ValueError(msg) return cls(file_identifier=file_identifier, access_method=access_method)
[docs] @dataclass(frozen=True, slots=True) class AtomicWriteFileACK: """AtomicWriteFile-ACK (Clause 14.2.1.2). :: AtomicWriteFile-ACK ::= CHOICE { fileStartPosition [0] INTEGER, fileStartRecord [1] INTEGER } """ is_stream: bool file_start: int
[docs] def encode(self) -> bytes: """Encode AtomicWriteFile-ACK service parameters. :returns: Encoded service ACK bytes. """ tag_number = 0 if self.is_stream else 1 return encode_context_tagged(tag_number, encode_signed(self.file_start))
[docs] @classmethod def decode(cls, data: memoryview | bytes) -> AtomicWriteFileACK: """Decode AtomicWriteFile-ACK from service ACK bytes. :param data: Raw service ACK bytes. :returns: Decoded :class:`AtomicWriteFileACK`. """ data = as_memoryview(data) offset = 0 tag, offset = decode_tag(data, offset) is_stream = tag.number == 0 file_start = decode_signed(data[offset : offset + tag.length]) return cls(is_stream=is_stream, file_start=file_start)