"""Virtual terminal services per ASHRAE 135-2020 Clause 17.
VT-Open (Clause 17.1), VT-Close (Clause 17.2), VT-Data (Clause 17.3).
"""
from __future__ import annotations
from dataclasses import dataclass
from bac_py.encoding.primitives import (
decode_octet_string,
decode_unsigned,
encode_application_boolean,
encode_application_enumerated,
encode_application_octet_string,
encode_application_unsigned,
)
from bac_py.encoding.tags import as_memoryview, decode_tag
from bac_py.types.enums import VTClass
_MAX_DECODED_ITEMS = 10_000
[docs]
@dataclass(frozen=True, slots=True)
class VTOpenRequest:
"""VT-Open-Request (Clause 17.1.1).
::
VT-Open-Request ::= SEQUENCE {
vtClass BACnetVTClass,
localVTSessionIdentifier Unsigned8
}
"""
vt_class: VTClass
local_vt_session_identifier: int
[docs]
def encode(self) -> bytes:
"""Encode VT-Open-Request service parameters."""
buf = bytearray()
buf.extend(encode_application_enumerated(self.vt_class))
buf.extend(encode_application_unsigned(self.local_vt_session_identifier))
return bytes(buf)
[docs]
@classmethod
def decode(cls, data: memoryview | bytes) -> VTOpenRequest:
"""Decode VT-Open-Request from service request bytes."""
data = as_memoryview(data)
offset = 0
tag, offset = decode_tag(data, offset)
vt_class = VTClass(decode_unsigned(data[offset : offset + tag.length]))
offset += tag.length
tag, offset = decode_tag(data, offset)
local_id = decode_unsigned(data[offset : offset + tag.length])
return cls(vt_class=vt_class, local_vt_session_identifier=local_id)
[docs]
@dataclass(frozen=True, slots=True)
class VTOpenACK:
"""VT-Open-ACK (Clause 17.1.2).
::
VT-Open-ACK ::= SEQUENCE {
remoteVTSessionIdentifier Unsigned8
}
"""
remote_vt_session_identifier: int
[docs]
def encode(self) -> bytes:
"""Encode VT-Open-ACK service parameters."""
return bytes(encode_application_unsigned(self.remote_vt_session_identifier))
[docs]
@classmethod
def decode(cls, data: memoryview | bytes) -> VTOpenACK:
"""Decode VT-Open-ACK from service request bytes."""
data = as_memoryview(data)
tag, offset = decode_tag(data, 0)
remote_id = decode_unsigned(data[offset : offset + tag.length])
return cls(remote_vt_session_identifier=remote_id)
[docs]
@dataclass(frozen=True, slots=True)
class VTCloseRequest:
"""VT-Close-Request (Clause 17.2.1).
::
VT-Close-Request ::= SEQUENCE {
listOfRemoteVTSessionIdentifiers SEQUENCE OF Unsigned8
}
"""
list_of_remote_vt_session_identifiers: list[int]
[docs]
def encode(self) -> bytes:
"""Encode VT-Close-Request service parameters."""
buf = bytearray()
for session_id in self.list_of_remote_vt_session_identifiers:
buf.extend(encode_application_unsigned(session_id))
return bytes(buf)
[docs]
@classmethod
def decode(cls, data: memoryview | bytes) -> VTCloseRequest:
"""Decode VT-Close-Request from service request bytes."""
data = as_memoryview(data)
offset = 0
identifiers: list[int] = []
while offset < len(data):
tag, offset = decode_tag(data, offset)
identifiers.append(decode_unsigned(data[offset : offset + tag.length]))
offset += tag.length
if len(identifiers) >= _MAX_DECODED_ITEMS:
msg = f"Decoded item count exceeds limit ({_MAX_DECODED_ITEMS})"
raise ValueError(msg)
return cls(list_of_remote_vt_session_identifiers=identifiers)
[docs]
@dataclass(frozen=True, slots=True)
class VTDataRequest:
"""VT-Data-Request (Clause 17.3.1).
::
VT-Data-Request ::= SEQUENCE {
vtSessionIdentifier Unsigned8,
vtNewData OCTET STRING,
vtDataFlag BOOLEAN
}
"""
vt_session_identifier: int
vt_new_data: bytes
vt_data_flag: bool
[docs]
def encode(self) -> bytes:
"""Encode VT-Data-Request service parameters."""
buf = bytearray()
buf.extend(encode_application_unsigned(self.vt_session_identifier))
buf.extend(encode_application_octet_string(self.vt_new_data))
buf.extend(encode_application_boolean(self.vt_data_flag))
return bytes(buf)
[docs]
@classmethod
def decode(cls, data: memoryview | bytes) -> VTDataRequest:
"""Decode VT-Data-Request from service request bytes."""
data = as_memoryview(data)
offset = 0
tag, offset = decode_tag(data, offset)
session_id = decode_unsigned(data[offset : offset + tag.length])
offset += tag.length
tag, offset = decode_tag(data, offset)
vt_new_data = decode_octet_string(data[offset : offset + tag.length])
offset += tag.length
# Application-tagged boolean: value is in the tag L/V/T field
tag, offset = decode_tag(data, offset)
vt_data_flag = tag.is_boolean_true
return cls(
vt_session_identifier=session_id,
vt_new_data=vt_new_data,
vt_data_flag=vt_data_flag,
)
[docs]
@dataclass(frozen=True, slots=True)
class VTDataACK:
"""VT-Data-ACK (Clause 17.3.2).
::
VT-Data-ACK ::= SEQUENCE {
allNewDataAccepted BOOLEAN,
acceptedOctetCount Unsigned OPTIONAL
}
``accepted_octet_count`` is present only when
``all_new_data_accepted`` is ``False``.
"""
all_new_data_accepted: bool
accepted_octet_count: int | None = None
[docs]
def encode(self) -> bytes:
"""Encode VT-Data-ACK service parameters."""
buf = bytearray()
buf.extend(encode_application_boolean(self.all_new_data_accepted))
if not self.all_new_data_accepted and self.accepted_octet_count is not None:
buf.extend(encode_application_unsigned(self.accepted_octet_count))
return bytes(buf)
[docs]
@classmethod
def decode(cls, data: memoryview | bytes) -> VTDataACK:
"""Decode VT-Data-ACK from service request bytes."""
data = as_memoryview(data)
offset = 0
# Application-tagged boolean: value is in the tag L/V/T field
tag, offset = decode_tag(data, offset)
all_accepted = tag.is_boolean_true
accepted_count = None
if offset < len(data):
tag, offset = decode_tag(data, offset)
accepted_count = decode_unsigned(data[offset : offset + tag.length])
return cls(
all_new_data_accepted=all_accepted,
accepted_octet_count=accepted_count,
)