Source code for telnetlib3.mud

"""
MUD telnet protocol encoding and decoding utilities.

Provides encode/decode functions for:
- GMCP (Generic MUD Communication Protocol, option 201)
- MSDP (MUD Server Data Protocol, option 69)
- MSSP (MUD Server Status Protocol, option 70)
- ZMP (Zenith MUD Protocol, option 93)
- ATCP (Achaea Telnet Client Protocol, option 200)
- AARDWOLF (Aardwolf protocol, option 102)

All encode functions return the payload bytes only (the content between
``IAC SB <option>`` and ``IAC SE``). The caller is responsible for
framing with subnegotiation markers.
"""

from __future__ import annotations

# std imports
import json
from typing import Any

# local
from .telopt import (
    MSDP_VAL,
    MSDP_VAR,
    MSSP_VAL,
    MSSP_VAR,
    MSDP_ARRAY_OPEN,
    MSDP_TABLE_OPEN,
    MSDP_ARRAY_CLOSE,
    MSDP_TABLE_CLOSE,
)

__all__ = (
    "gmcp_encode",
    "gmcp_decode",
    "msdp_encode",
    "msdp_decode",
    "mssp_encode",
    "mssp_decode",
    "MsdpParser",
    "zmp_decode",
    "atcp_decode",
    "aardwolf_decode",
)


def _decode_best_effort(buf: bytes, encoding: str = "utf-8") -> str:
    """
    Decode bytes trying *encoding* first, falling back to latin-1.

    :param buf: Raw bytes to decode.
    :param encoding: Primary encoding to attempt.
    :returns: Decoded string.
    """
    try:
        return buf.decode(encoding)
    except (UnicodeDecodeError, LookupError):
        return buf.decode("latin-1")


[docs] def gmcp_encode(package: str, data: Any = None) -> bytes: """ Encode a GMCP message. :param package: GMCP package name (e.g., "Char.Vitals") :param data: Optional data to encode as JSON :returns: Encoded GMCP payload bytes """ if data is None: return package.encode("utf-8") return package.encode("utf-8") + b" " + json.dumps(data, separators=(",", ":")).encode("utf-8")
[docs] def gmcp_decode(buf: bytes, encoding: str = "utf-8") -> tuple[str, Any]: """ Decode a GMCP payload. :param buf: GMCP payload bytes :param encoding: Character encoding to try first, falls back to latin-1. :returns: Tuple of (package, data), where data is None if no JSON present :raises ValueError: If JSON is malformed """ parts = buf.split(b" ", 1) if len(parts) == 1: return (_decode_best_effort(buf, encoding), None) package = _decode_best_effort(parts[0], encoding) text = _decode_best_effort(parts[1], encoding).strip() if not text: return (package, None) try: data = json.loads(text) except json.JSONDecodeError as exc: raise ValueError(f"Invalid JSON in GMCP payload: {exc}") from exc return (package, data)
[docs] def msdp_encode(variables: dict[str, Any]) -> bytes: """ Encode variables to MSDP wire format. :param variables: Dictionary of variable names to values :returns: Encoded MSDP payload bytes """ def encode_value(value: Any) -> bytes: """Encode a single MSDP value.""" if isinstance(value, dict): result = MSDP_TABLE_OPEN for key, val in value.items(): result += MSDP_VAR + key.encode("utf-8") + MSDP_VAL + encode_value(val) result += MSDP_TABLE_CLOSE return result if isinstance(value, list): result = MSDP_ARRAY_OPEN for item in value: result += MSDP_VAL + encode_value(item) result += MSDP_ARRAY_CLOSE return result return str(value).encode("utf-8") result = b"" for key, value in variables.items(): result += MSDP_VAR + key.encode("utf-8") + MSDP_VAL + encode_value(value) return result
[docs] class MsdpParser: """State machine for parsing MSDP wire bytes.""" _DELIMITERS = (MSDP_VAR, MSDP_VAL, MSDP_TABLE_CLOSE, MSDP_ARRAY_CLOSE) def __init__(self, buf: bytes, encoding: str = "utf-8") -> None: """Initialize parser with raw MSDP buffer.""" self.buf = buf self.idx = 0 self.encoding = encoding def _read_string(self) -> str: start = self.idx while ( self.idx < len(self.buf) and self.buf[self.idx : self.idx + 1] not in self._DELIMITERS ): self.idx += 1 return _decode_best_effort(self.buf[start : self.idx], self.encoding) def _read_key(self) -> str: start = self.idx while self.idx < len(self.buf) and self.buf[self.idx : self.idx + 1] not in ( MSDP_VAL, MSDP_VAR, ): self.idx += 1 return _decode_best_effort(self.buf[start : self.idx], self.encoding) def _parse_table(self) -> dict[str, Any]: table: dict[str, Any] = {} while self.idx < len(self.buf) and self.buf[self.idx : self.idx + 1] != MSDP_TABLE_CLOSE: if self.buf[self.idx : self.idx + 1] == MSDP_VAR: self.idx += 1 key = self._read_key() if self.idx < len(self.buf) and self.buf[self.idx : self.idx + 1] == MSDP_VAL: self.idx += 1 table[key] = self.parse_value() if self.idx < len(self.buf): self.idx += 1 return table def _parse_array(self) -> list[Any]: array: list[Any] = [] while self.idx < len(self.buf) and self.buf[self.idx : self.idx + 1] != MSDP_ARRAY_CLOSE: if self.buf[self.idx : self.idx + 1] == MSDP_VAL: self.idx += 1 array.append(self.parse_value()) if self.idx < len(self.buf): self.idx += 1 return array
[docs] def parse_value(self) -> Any: """Parse a single MSDP value at current position.""" if self.idx >= len(self.buf): return "" marker = self.buf[self.idx : self.idx + 1] self.idx += 1 if marker == MSDP_TABLE_OPEN: return self._parse_table() if marker == MSDP_ARRAY_OPEN: return self._parse_array() self.idx -= 1 return self._read_string()
[docs] def parse(self) -> dict[str, Any]: """Parse the full MSDP buffer into a dict.""" result: dict[str, Any] = {} while self.idx < len(self.buf): if self.buf[self.idx : self.idx + 1] == MSDP_VAR: self.idx += 1 key = self._read_key() if self.idx < len(self.buf) and self.buf[self.idx : self.idx + 1] == MSDP_VAL: self.idx += 1 result[key] = self.parse_value() else: self.idx += 1 return result
[docs] def msdp_decode(buf: bytes, encoding: str = "utf-8") -> dict[str, Any]: """ Decode MSDP wire bytes to dictionary. :param buf: MSDP payload bytes :param encoding: Character encoding to try first, falls back to latin-1. :returns: Dictionary of variable names to values """ return MsdpParser(buf, encoding=encoding).parse()
[docs] def mssp_encode(variables: dict[str, str | list[str]]) -> bytes: """ Encode variables to MSSP wire format. :param variables: Dictionary of variable names to string values or lists :returns: Encoded MSSP payload bytes """ result = b"" for key, value in variables.items(): result += MSSP_VAR + key.encode("utf-8") if isinstance(value, list): for item in value: result += MSSP_VAL + item.encode("utf-8") else: result += MSSP_VAL + value.encode("utf-8") return result
[docs] def mssp_decode(buf: bytes, encoding: str = "utf-8") -> dict[str, str | list[str]]: """ Decode MSSP wire bytes to dictionary. :param buf: MSSP payload bytes :param encoding: Character encoding to try first, falls back to latin-1. :returns: Dictionary with str values for single entries, list[str] for multiple """ result: dict[str, str | list[str]] = {} idx = 0 current_var: str | None = None while idx < len(buf): if buf[idx : idx + 1] == MSSP_VAR: idx += 1 var_start = idx while idx < len(buf) and buf[idx : idx + 1] not in (MSSP_VAL, MSSP_VAR): idx += 1 current_var = _decode_best_effort(buf[var_start:idx], encoding) elif buf[idx : idx + 1] == MSSP_VAL: idx += 1 val_start = idx while idx < len(buf) and buf[idx : idx + 1] not in (MSSP_VAL, MSSP_VAR): idx += 1 value = _decode_best_effort(buf[val_start:idx], encoding) if current_var is not None: if current_var in result: existing = result[current_var] if isinstance(existing, list): existing.append(value) else: result[current_var] = [existing, value] else: result[current_var] = value else: idx += 1 return result
[docs] def zmp_decode(buf: bytes, encoding: str = "utf-8") -> list[str]: """ Decode ZMP payload to list of NUL-delimited strings. The first element is the command name, the rest are arguments. :param buf: ZMP payload bytes (NUL-delimited). :param encoding: Character encoding to try first, falls back to latin-1. :returns: List of strings ``[command, arg1, arg2, ...]``. """ if not buf: return [] # Split on NUL bytes and strip trailing empty string from final NUL. parts = buf.split(b"\x00") if parts and parts[-1] == b"": parts = parts[:-1] return [_decode_best_effort(p, encoding) for p in parts]
[docs] def atcp_decode(buf: bytes, encoding: str = "utf-8") -> tuple[str, str]: """ Decode ATCP payload to ``(package, value)`` tuple. Format is ``package.name value`` separated by the first space. If no space is present, *value* is an empty string. :param buf: ATCP payload bytes. :param encoding: Character encoding to try first, falls back to latin-1. :returns: Tuple of ``(package, value)``. """ parts = buf.split(b" ", 1) package = _decode_best_effort(parts[0], encoding) value = _decode_best_effort(parts[1], encoding) if len(parts) > 1 else "" return (package, value)
# Aardwolf channel byte meanings (server -> client). _AARDWOLF_CHANNELS: dict[int, str] = { 100: "status", 101: "tick", 102: "affect", 103: "group", 104: "skill", 105: "quest", 106: "spell", 107: "stat", 108: "message", }
[docs] def aardwolf_decode(buf: bytes) -> dict[str, Any]: """ Decode Aardwolf protocol payload. :param buf: Aardwolf payload bytes (typically 1-2 bytes). :returns: Dict with ``channel``, ``channel_byte``, ``data_byte``, and ``data_bytes`` (for longer payloads). """ if not buf: return {"channel": "unknown", "channel_byte": 0, "data_bytes": b""} channel_byte = buf[0] channel_name = _AARDWOLF_CHANNELS.get(channel_byte, f"0x{channel_byte:02x}") result: dict[str, Any] = {"channel": channel_name, "channel_byte": channel_byte} if len(buf) == 2: result["data_byte"] = buf[1] if len(buf) > 1: result["data_bytes"] = buf[1:] return result