Source code for bac_py.transport.sc.tls

"""TLS context builder for BACnet/SC (AB.7.4).

Provides helpers to create ``ssl.SSLContext`` objects for client and server
sides of BACnet/SC WebSocket connections, enforcing TLS 1.3 with mutual
authentication per the Annex AB requirements.

Optional dependency: ``cryptography`` for certificate inspection utilities.
"""

from __future__ import annotations

import logging
import ssl
from dataclasses import dataclass, field
from pathlib import Path

logger = logging.getLogger(__name__)


[docs] @dataclass class SCTLSConfig: """TLS configuration for a BACnet/SC node. :param private_key_path: PEM file containing the device's private key. :param certificate_path: PEM file containing the device's operational certificate. :param ca_certificates_path: PEM file (or colon-separated list of PEM files) containing the trusted CA certificates. :param allow_plaintext: If True, allow ``ws://`` connections (testing only — production BACnet/SC requires TLS 1.3). :param key_password: Optional passphrase for the private key PEM file. Use ``bytes`` or a callable returning ``bytes`` for programmatic retrieval (e.g., from a vault or environment variable). :param verify_depth: Desired maximum certificate chain verification depth. BACnet PKI chains are typically short (device → issuing CA → root). Default 4 allows one intermediate plus some headroom. Reserved for future use — Python's ``ssl`` module does not yet expose OpenSSL's ``SSL_CTX_set_verify_depth``. """ private_key_path: str | None = None certificate_path: str | None = None ca_certificates_path: str | None = None allow_plaintext: bool = False extra_ca_paths: list[str] = field(default_factory=list) key_password: bytes | str | None = None verify_depth: int = 4 def __repr__(self) -> str: """Redact secrets to prevent credential leak in logs/tracebacks.""" key_display = "'<REDACTED>'" if self.private_key_path else "None" pw_display = "'<REDACTED>'" if self.key_password else "None" return ( f"SCTLSConfig(private_key_path={key_display}, " f"certificate_path={self.certificate_path!r}, " f"ca_certificates_path={self.ca_certificates_path!r}, " f"allow_plaintext={self.allow_plaintext!r}, " f"key_password={pw_display})" )
[docs] def build_client_ssl_context(config: SCTLSConfig) -> ssl.SSLContext | None: """Build a TLS 1.3 client context with mutual authentication. Returns ``None`` if *config.allow_plaintext* is True and no certificate material is provided. """ if config.allow_plaintext and not config.certificate_path: logger.warning( "SC TLS disabled: allow_plaintext=True with no certificate. " "BACnet/SC requires TLS 1.3 with mutual authentication in production " "(ASHRAE 135-2020 Annex AB.7.4). Never use plaintext in production." ) return None ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.minimum_version = ssl.TLSVersion.TLSv1_3 ctx.verify_flags |= ssl.VERIFY_X509_STRICT if config.certificate_path and config.private_key_path: logger.debug("SC TLS loading client cert: %s", config.certificate_path) password = _resolve_password(config.key_password) ctx.load_cert_chain(config.certificate_path, config.private_key_path, password=password) elif config.certificate_path and not config.private_key_path: logger.warning( "SC TLS certificate_path is set but private_key_path is missing — " "mutual authentication will not work" ) elif config.private_key_path and not config.certificate_path: logger.warning( "SC TLS private_key_path is set but certificate_path is missing — " "mutual authentication will not work" ) _load_ca_certs(ctx, config) logger.info("SC TLS client context created: mutual_auth=True") return ctx
[docs] def build_server_ssl_context(config: SCTLSConfig) -> ssl.SSLContext | None: """Build a TLS 1.3 server context with client certificate verification. Returns ``None`` if *config.allow_plaintext* is True and no certificate material is provided. """ if config.allow_plaintext and not config.certificate_path: logger.warning( "SC TLS disabled: allow_plaintext=True with no certificate. " "BACnet/SC requires TLS 1.3 with mutual authentication in production " "(ASHRAE 135-2020 Annex AB.7.4). Never use plaintext in production." ) return None ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) ctx.minimum_version = ssl.TLSVersion.TLSv1_3 ctx.verify_mode = ssl.CERT_REQUIRED ctx.verify_flags |= ssl.VERIFY_X509_STRICT if config.certificate_path and config.private_key_path: logger.debug("SC TLS loading server cert: %s", config.certificate_path) password = _resolve_password(config.key_password) ctx.load_cert_chain(config.certificate_path, config.private_key_path, password=password) elif config.certificate_path and not config.private_key_path: logger.warning( "SC TLS certificate_path is set but private_key_path is missing — " "mutual authentication will not work" ) elif config.private_key_path and not config.certificate_path: logger.warning( "SC TLS private_key_path is set but certificate_path is missing — " "mutual authentication will not work" ) _load_ca_certs(ctx, config) logger.info("SC TLS server context created: mutual_auth=True") return ctx
def _resolve_password(key_password: bytes | str | None) -> bytes | None: """Convert key_password to bytes for ssl.SSLContext.load_cert_chain().""" if key_password is None: return None if isinstance(key_password, str): return key_password.encode("utf-8") return key_password def _load_ca_certs(ctx: ssl.SSLContext, config: SCTLSConfig) -> None: """Load CA certificates from config into the SSL context. BACnet/SC devices must only trust explicitly configured CAs — never the system certificate store. When no CA paths are provided we log a warning but intentionally do **not** call ``ctx.load_default_certs()`` so that peer verification will fail rather than silently trusting arbitrary CAs. """ paths: list[str] = [] if config.ca_certificates_path: paths.extend(config.ca_certificates_path.split(":")) paths.extend(config.extra_ca_paths) if not paths: logger.warning( "SC TLS no CA certificates configured — peer certificate " "verification will fail. BACnet/SC requires explicitly configured " "CA certificates; system CAs are intentionally not trusted." ) for ca_path in paths: p = Path(ca_path.strip()) if p.is_file(): logger.debug("SC TLS loading CA file: %s", p) ctx.load_verify_locations(cafile=str(p)) elif p.is_dir(): logger.debug("SC TLS loading CA directory: %s", p) ctx.load_verify_locations(capath=str(p))