Source code for telnetlib3.fingerprinting

"""
Fingerprint shell for telnet client identification.

This module runs **server-side**: it is the shell callback for a telnetlib3
server that probes connecting *clients* for protocol capabilities, collects
session data, and saves fingerprint files.  Despite the generic name, it
fingerprints the remote *client*, not the server.

Display, REPL, and post-script code live in
``telnetlib3.fingerprinting_display``.
"""

from __future__ import annotations

# std imports
import os
import sys
import json
import time
import asyncio
import hashlib
import logging
import argparse
import datetime
from typing import Any, Union, Optional, TypedDict, cast

# local
from . import slc
from ._paths import _atomic_json_write
from .server import TelnetServer
from .telopt import (
    BM,
    DO,
    DET,
    EOR,
    MSP,
    MXP,
    RCP,
    RSP,
    SGA,
    TLS,
    ZMP,
    ATCP,
    DONT,
    ECHO,
    GMCP,
    MSDP,
    MSSP,
    NAMS,
    NAOL,
    NAOP,
    NAWS,
    RCTE,
    LFLOW,
    TTYPE,
    X3PAD,
    XAUTH,
    BINARY,
    KERMIT,
    NAOCRD,
    NAOFFD,
    NAOHTD,
    NAOHTS,
    NAOLFD,
    NAOVTD,
    NAOVTS,
    SNDLOC,
    STATUS,
    SUPDUP,
    TSPEED,
    TTYLOC,
    CHARSET,
    ENCRYPT,
    TN3270E,
    AARDWOLF,
    LINEMODE,
    SEND_URL,
    XDISPLOC,
    FORWARD_X,
    SSPI_LOGON,
    NEW_ENVIRON,
    PRAGMA_LOGON,
    SUPDUPOUTPUT,
    VT3270REGIME,
    AUTHENTICATION,
    MCCP2_COMPRESS,
    MCCP3_COMPRESS,
    COM_PORT_OPTION,
    PRAGMA_HEARTBEAT,
    SUPPRESS_LOCAL_ECHO,
    theNULL,
)
from .accessories import encoding_from_lang
from .stream_reader import TelnetReader, TelnetReaderUnicode
from .stream_writer import TelnetWriter, TelnetWriterUnicode


[docs] class ProbeResult(TypedDict, total=False): """Result of probing a single telnet option.""" status: str opt: bytes description: str already_negotiated: bool
# Data directory for saving fingerprint data DATA_DIR: Optional[str] = os.environ.get("TELNETLIB3_DATA_DIR", "data") # Maximum files per protocol-fingerprint folder FINGERPRINT_MAX_FILES = int(os.environ.get("TELNETLIB3_FINGERPRINT_MAX_FILES", "1000")) # Maximum number of unique fingerprint folders FINGERPRINT_MAX_FINGERPRINTS = int( os.environ.get("TELNETLIB3_FINGERPRINT_MAX_FINGERPRINTS", "1000") ) # Post-fingerprint Python module to execute with saved file path # Example: TELNETLIB3_FINGERPRINT_POST_SCRIPT=telnetlib3.fingerprinting_display FINGERPRINT_POST_SCRIPT = os.environ.get("TELNETLIB3_FINGERPRINT_POST_SCRIPT", "") # Terminal types that uniquely identify specific telnet clients PROTOCOL_MATCHED_TERMINALS = {"syncterm"} # SyncTERM BBS client # Terminal types associated with MUD clients, matched case-insensitively. # These clients are likely to support extended options like GMCP. MUD_TERMINALS = { "mudlet", "cmud", "zmud", "mushclient", "atlantis", "tintin++", "tt++", "blowtorch", "mudrammer", "kildclient", "portal", "beip", "savitar", } __all__ = ( "ENVIRON_EXTENDED", "FingerprintingServer", "FingerprintingTelnetServer", "ProbeResult", "fingerprint_server_main", "fingerprinting_server_shell", "fingerprinting_post_script", "get_client_fingerprint", "probe_client_capabilities", "probe_client_loop_detection", ) #: Extended NEW_ENVIRON variable list used during client fingerprinting. #: The base :class:`~telnetlib3.server.TelnetServer` requests only common #: variables (USER, LOGNAME, LANG, TERM, etc.). This extended set collects #: additional information useful for identifying and classifying clients -- #: and for demonstrating the oversharing risk of RFC 1572 NEW_ENVIRON. ENVIRON_EXTENDED: list[str] = [ "HOME", "SHELL", "IPADDRESS", "SSH_CLIENT", "SSH_TTY", "SSH_AUTH_SOCK", "SSH_REMOTE_HOST", "HOSTNAME", "HOSTTYPE", "OSTYPE", "PWD", "PS1", "VISUAL", "TMUX", "STY", "SHELLOPTS", "TERM_PROGRAM_VERSION", "XDG_SESSION_PATH", "XDG_CURRENT_DESKTOP", "XCURSOR_THEME", "LC_ALL", "LC_CTYPE", "LC_MESSAGES", "LC_COLLATE", "LC_TIME", "LC_ADDRESS", "LC_IDENTIFICATION", "LC_MEASUREMENT", "LC_MONETARY", "LC_NAME", "LC_NUMERIC", "LC_PAPER", "LC_TELEPHONE", "DOCKER_HOST", "HISTFILE", "HISTFILESIZE", "HISTSIZE", "EPOCHREALTIME", "LSCOLORS", "LS_COLORS", "DIRCOLORS", "GCC_COLORS", "AWS_PROFILE", "AWS_REGION", "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_SESSION_TOKEN", "AWS_DEFAULT_REGION", "SSH_REMOTE_IP", "GITHUB_TOKEN", "GH_TOKEN", "GITLAB_TOKEN", "GL_TOKEN", "ANTHROPIC_API_KEY", "OPENAI_API_KEY", "STRIPE_SECRET_KEY", "SENDGRID_API_KEY", "HEROKU_API_KEY", "NPM_TOKEN", "SLACK_TOKEN", "TWILIO_AUTH_TOKEN", "DATABASE_URL", "PGPASSWORD", "MYSQL_PWD", "REDIS_URL", "AZURE_CLIENT_SECRET", "GOOGLE_APPLICATION_CREDENTIALS", "SECRET_KEY", "API_KEY", "PRIVATE_KEY", "JWT_SECRET", "DOCKER_PASSWORD", ] logger = logging.getLogger("telnetlib3.fingerprint")
[docs] class FingerprintingTelnetServer: """ Mixin that extends ``on_request_environ`` with :data:`ENVIRON_EXTENDED`. Usage with :func:`~telnetlib3.server.create_server`:: from telnetlib3.server import TelnetServer from telnetlib3.fingerprinting import FingerprintingTelnetServer class MyServer(FingerprintingTelnetServer, TelnetServer): pass server = await create_server(protocol_factory=MyServer, ...) """
[docs] def on_request_environ(self) -> list[Union[str, bytes]]: """Return base environ keys plus :data:`ENVIRON_EXTENDED`.""" if not isinstance(self, TelnetServer): raise TypeError("FingerprintingTelnetServer must be combined with TelnetServer") # pylint: disable=no-member base: list[Union[str, bytes]] = super().on_request_environ() # type: ignore[misc] # Insert extended keys before the trailing VAR/USERVAR sentinels from .telopt import VAR, USERVAR extra = [k for k in ENVIRON_EXTENDED if k not in base] # Find where VAR/USERVAR sentinels start and insert before them insert_at = len(base) for i, item in enumerate(base): if item in (VAR, USERVAR): insert_at = i break return base[:insert_at] + extra + base[insert_at:]
[docs] class FingerprintingServer(FingerprintingTelnetServer, TelnetServer): """ :class:`~telnetlib3.server.TelnetServer` with extended ``NEW_ENVIRON``. Combines :class:`FingerprintingTelnetServer` with :class:`~telnetlib3.server.TelnetServer` so that :func:`fingerprinting_server_shell` receives the full set of environment variables needed for stable fingerprint hashes. Used as the default ``protocol_factory`` by :func:`fingerprint_server_main` / ``telnetlib3-fingerprint-server`` CLI. """
[docs] def connection_lost(self, exc: Optional[Exception]) -> None: """Log connection close/loss.""" if not self._closing: if exc is None: logger.info("Connection closed for %s", self) else: logger.info("Connection lost for %s: %s", self, exc) self._closing = True # pylint: disable=attribute-defined-outside-init if exc is None: self.reader.feed_eof() else: self.reader.set_exception(exc) for task in self._tasks: try: task.cancel() except Exception: pass
# Timeout for probe_client_capabilities in _run_probe (seconds) _PROBE_TIMEOUT = 0.5 # Telnet options to probe, grouped by category # Each entry is (option_bytes, name, description) CORE_OPTIONS = [ (BINARY, "BINARY", "8-bit binary mode"), (SGA, "SGA", "Suppress Go Ahead"), (ECHO, "ECHO", "Echo mode"), (STATUS, "STATUS", "Option status reporting"), (TTYPE, "TTYPE", "Terminal type"), (TSPEED, "TSPEED", "Terminal speed"), (LFLOW, "LFLOW", "Local flow control"), (XDISPLOC, "XDISPLOC", "X display location"), (NAWS, "NAWS", "Window size"), (NEW_ENVIRON, "NEW_ENVIRON", "Environment variables"), (CHARSET, "CHARSET", "Character set"), (LINEMODE, "LINEMODE", "Line mode with SLC"), (EOR, "EOR", "End of Record"), # LOGOUT omitted - BSD client times out on this (SNDLOC, "SNDLOC", "Send location"), ] MUD_OPTIONS = [(COM_PORT_OPTION, "COM_PORT", "Serial port control (RFC 2217)")] # Options with non-standard byte values (> 140) that crash some clients. # icy_term (icy_net) only accepts option bytes 0-49, 138-140, and 255, # returning a hard error for anything else. GMCP-capable MUD clients # typically self-announce via IAC WILL GMCP, so probing is unnecessary. EXTENDED_OPTIONS = [ (MCCP2_COMPRESS, "MCCP2", "MUD Client Compression Protocol v2"), (MCCP3_COMPRESS, "MCCP3", "MUD Client Compression Protocol v3"), (GMCP, "GMCP", "Generic MUD Communication Protocol"), (MSDP, "MSDP", "MUD Server Data Protocol"), (MSSP, "MSSP", "MUD Server Status Protocol"), (MSP, "MSP", "MUD Sound Protocol"), (MXP, "MXP", "MUD eXtension Protocol"), (ZMP, "ZMP", "Zenith MUD Protocol"), (AARDWOLF, "AARDWOLF", "Aardwolf protocol"), (ATCP, "ATCP", "Achaea Telnet Client Protocol"), ] LEGACY_OPTIONS = [ (AUTHENTICATION, "AUTHENTICATION", "Telnet authentication"), (ENCRYPT, "ENCRYPT", "Encryption option"), (TN3270E, "TN3270E", "3270 terminal emulation"), (XAUTH, "XAUTH", "X authentication"), (RSP, "RSP", "Remote serial port"), (SUPPRESS_LOCAL_ECHO, "SUPPRESS_LOCAL_ECHO", "Local echo suppression"), (TLS, "TLS", "TLS negotiation"), (KERMIT, "KERMIT", "Kermit file transfer"), (SEND_URL, "SEND_URL", "URL sending"), (FORWARD_X, "FORWARD_X", "X11 forwarding"), (PRAGMA_LOGON, "PRAGMA_LOGON", "Pragma logon"), (SSPI_LOGON, "SSPI_LOGON", "SSPI logon"), (PRAGMA_HEARTBEAT, "PRAGMA_HEARTBEAT", "Heartbeat"), (X3PAD, "X3PAD", "X.3 PAD"), (VT3270REGIME, "VT3270REGIME", "VT3270 regime"), (TTYLOC, "TTYLOC", "Terminal location"), (SUPDUP, "SUPDUP", "SUPDUP protocol"), (SUPDUPOUTPUT, "SUPDUPOUTPUT", "SUPDUP output"), (DET, "DET", "Data entry terminal"), (BM, "BM", "Byte macro"), (RCP, "RCP", "Reconnection"), (NAMS, "NAMS", "NAMS"), (RCTE, "RCTE", "Remote controlled transmit/echo"), (NAOL, "NAOL", "Output line width"), (NAOP, "NAOP", "Output page size"), (NAOCRD, "NAOCRD", "Output CR disposition"), (NAOHTS, "NAOHTS", "Output horiz tab stops"), (NAOHTD, "NAOHTD", "Output horiz tab disposition"), (NAOFFD, "NAOFFD", "Output formfeed disposition"), (NAOVTS, "NAOVTS", "Output vert tabstops"), (NAOVTD, "NAOVTD", "Output vert tab disposition"), (NAOLFD, "NAOLFD", "Output LF disposition"), ] ALL_PROBE_OPTIONS = CORE_OPTIONS + MUD_OPTIONS + LEGACY_OPTIONS QUICK_PROBE_OPTIONS = CORE_OPTIONS + MUD_OPTIONS # All known options including extended, for display/name lookup only _ALL_KNOWN_OPTIONS = ALL_PROBE_OPTIONS + EXTENDED_OPTIONS # Build mapping from hex string (e.g., "0x03") to option name (e.g., "SGA") _OPT_BYTE_TO_NAME = {f"0x{opt[0]:02x}": name for opt, name, _ in _ALL_KNOWN_OPTIONS}
[docs] async def probe_client_loop_detection( writer: TelnetWriter, probe_results: dict[str, ProbeResult], timeout: float = 0.3 ) -> list[str]: """ Detect clients that would re-negotiate already-agreed options (telnet loop). Saves the negotiation state for options the client already agreed to, clears the cache, re-sends IAC DO / IAC WILL for those options, and checks whether the client replies again. A well-behaved client ignores redundant requests in the YES state; a loop-prone client replies again. Checks both directions: re-DO'ing options the client already WILL'd, and re-WILL'ing options the client already DO'd. :returns: Sorted list of option names that would loop. """ from .telopt import DO, WILL looped: set[str] = set() for _label, opt_dict, probe_cmd in ( ("remote", writer.remote_option, DO), ("local", writer.local_option, WILL), ): agreed: dict[bytes, bool] = {} for opt, enabled in opt_dict.items(): if enabled: agreed[opt] = True if not agreed: continue saved: dict[bytes, bool | None] = {} for opt in agreed: saved[opt] = opt_dict.get(opt) opt_dict[opt] = None # type: ignore[assignment] writer.pending_option.pop(probe_cmd + opt, None) try: writer._in_loop_detection = True for opt in agreed: writer.iac(probe_cmd, opt) loop = asyncio.get_running_loop() deadline = loop.time() + timeout while loop.time() < deadline: all_settled = all(opt_dict.get(opt) is not None for opt in agreed) if all_settled: break await asyncio.sleep(0.05) for opt in agreed: if opt_dict.get(opt) is not None: looped.add(_opt_byte_to_name(opt)) finally: writer._in_loop_detection = False for opt, value in saved.items(): opt_dict[opt] = value # type: ignore[assignment] for opt in agreed: writer.pending_option.pop(probe_cmd + opt, None) return sorted(looped)
[docs] async def probe_client_capabilities( writer: Union[TelnetWriter, TelnetWriterUnicode], options: Optional[list[tuple[bytes, str, str]]] = None, timeout: float = 0.5, ) -> dict[str, ProbeResult]: """ Actively probe client for telnet capability support. Sends IAC DO for ALL options at once, waits for responses, then collects results. :param writer: TelnetWriter instance. :param options: List of (opt_bytes, name, description) tuples to probe. Defaults to ALL_PROBE_OPTIONS. :param timeout: Timeout in seconds to wait for all responses. :returns: Dict mapping option name to :class:`ProbeResult`. """ if options is None: options = ALL_PROBE_OPTIONS results: dict[str, ProbeResult] = {} to_probe = [] for opt, name, description in options: if writer.remote_option.enabled(opt): results[name] = ProbeResult( status="WILL", opt=opt, description=description, already_negotiated=True ) elif writer.remote_option.get(opt) is False: results[name] = ProbeResult( status="WONT", opt=opt, description=description, already_negotiated=True ) else: to_probe.append((opt, name, description)) for opt, name, description in to_probe: writer.iac(DO, opt) await writer.drain() loop = asyncio.get_running_loop() deadline = loop.time() + timeout while loop.time() < deadline: all_responded = all( writer.remote_option.get(opt) is not None for opt, name, desc in to_probe if name not in results ) if all_responded: break await asyncio.sleep(0.05) for opt, name, description in to_probe: if name in results: continue if writer.remote_option.enabled(opt): results[name] = ProbeResult(status="WILL", opt=opt, description=description) elif writer.remote_option.get(opt) is False: results[name] = ProbeResult(status="WONT", opt=opt, description=description) else: results[name] = ProbeResult(status="timeout", opt=opt, description=description) return results
# Keys to collect from extra_info _EXTRA_INFO_KEYS = ( "TERM", "term", "cols", "rows", "COLUMNS", "LINES", "charset", "LANG", "COLORTERM", "peername", "sockname", "tspeed", "xdisploc", "DISPLAY", "encoding", ) + tuple(f"ttype{n}" for n in range(1, 9))
[docs] def get_client_fingerprint(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> dict[str, Any]: """ Collect all available client information from writer. :param writer: TelnetWriter instance. :returns: Dictionary of all negotiated client attributes. """ fingerprint = {} for key in _EXTRA_INFO_KEYS: value = writer.get_extra_info(key) if value is not None and value: fingerprint[key] = value for env_key in ("USER", "SHELL", "HOME", "PATH", "LOGNAME", "MAIL"): value = writer.get_extra_info(env_key) if value is not None and value: fingerprint[env_key] = value return fingerprint
async def _run_probe( writer: Union[TelnetWriter, TelnetWriterUnicode], verbose: bool = True ) -> tuple[dict[str, ProbeResult], float]: """Run active probe, optionally extending to MUD options.""" if _is_maybe_ms_telnet(writer): probe_options = [opt for opt in CORE_OPTIONS + MUD_OPTIONS if opt[0] != NEW_ENVIRON] logger.info( "reduced probe for suspected MS telnet (ttype1=%r, ttype2=%r)", writer.get_extra_info("ttype1"), writer.get_extra_info("ttype2"), ) else: probe_options = ALL_PROBE_OPTIONS total = len(probe_options) _writer = cast(TelnetWriterUnicode, writer) if verbose: _writer.write(f"\rProbing {total} telnet options...\x1b[J") await _writer.drain() start_time = time.time() results = await probe_client_capabilities(writer, options=probe_options, timeout=_PROBE_TIMEOUT) if _is_maybe_mud(writer) and EXTENDED_OPTIONS: ext_results = await probe_client_capabilities( writer, options=EXTENDED_OPTIONS, timeout=_PROBE_TIMEOUT ) results.update(ext_results) elapsed = time.time() - start_time if verbose: _writer.write("\r\x1b[K") return results, elapsed def _get_protocol(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> Any: """Return the protocol object from a writer.""" return getattr(writer, "_protocol", None) or getattr(writer, "protocol", None) def _opt_byte_to_name(opt: bytes) -> str: """Convert option bytes to name or hex string.""" if isinstance(opt, bytes) and len(opt) > 0: hex_key = f"0x{opt[0]:02x}" return _OPT_BYTE_TO_NAME.get(hex_key, hex_key) return str(opt) def _collect_option_states( writer: Union[TelnetWriter, TelnetWriterUnicode], ) -> dict[str, dict[str, Any]]: """Collect all telnet option states from writer.""" options = {} for label, opt_dict in [("remote", writer.remote_option), ("local", writer.local_option)]: entries = {_opt_byte_to_name(opt): enabled for opt, enabled in opt_dict.items()} if entries: options[label] = entries return options def _collect_rejected_options( writer: Union[TelnetWriter, TelnetWriterUnicode], ) -> dict[str, list[str]]: """Collect rejected option offers from writer.""" result: dict[str, list[str]] = {} if getattr(writer, "rejected_will", None): result["will"] = sorted(_opt_byte_to_name(opt) for opt in writer.rejected_will) if getattr(writer, "rejected_do", None): result["do"] = sorted(_opt_byte_to_name(opt) for opt in writer.rejected_do) if getattr(writer, "directional_refusals", None): result["directional"] = sorted( _opt_byte_to_name(opt) for opt in writer.directional_refusals ) return result def _collect_extra_info(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> dict[str, Any]: """Collect all extra_info from writer, including private _extra dict.""" extra: dict[str, Any] = {} protocol = _get_protocol(writer) if protocol and hasattr(protocol, "_extra"): for key, value in protocol._extra.items(): if isinstance(value, tuple): extra[key] = list(value) elif isinstance(value, bytes): extra[key] = value.hex() else: extra[key] = value # Transport-level keys not in protocol._extra for key in ("peername", "sockname", "timeout"): if key not in extra: if (value := writer.get_extra_info(key)) is not None: extra[key] = list(value) if isinstance(value, tuple) else value # Clean up: prefer uppercase over lowercase redundant keys if "TERM" in extra and "term" in extra: del extra["term"] if "COLUMNS" in extra and "cols" in extra: del extra["cols"] if "LINES" in extra and "rows" in extra: del extra["rows"] # Remove ttype1, ttype2, etc. - collected separately in ttype_cycle for i in range(1, 20): extra.pop(f"ttype{i}", None) return extra def _collect_ttype_cycle(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> list[str]: """Collect the full TTYPE cycle responses.""" ttype_list = [] protocol = _get_protocol(writer) extra_dict = getattr(protocol, "_extra", {}) if protocol else {} for i in range(1, 20): if value := (extra_dict.get(f"ttype{i}") or writer.get_extra_info(f"ttype{i}")): ttype_list.append(value) else: break return ttype_list def _collect_protocol_timing(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> dict[str, Any]: """Collect timing information from protocol.""" timing = {} protocol = _get_protocol(writer) if protocol: if hasattr(protocol, "duration"): timing["duration"] = protocol.duration if hasattr(protocol, "idle"): timing["idle"] = protocol.idle if hasattr(protocol, "_connect_time"): timing["connect_time"] = protocol._connect_time return timing def _collect_slc_tab(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> dict[str, Any]: """Collect non-default SLC entries when LINEMODE was negotiated.""" slctab = getattr(writer, "slctab", None) if not slctab: return {} if not (hasattr(writer, "remote_option") and writer.remote_option.enabled(LINEMODE)): return {} defaults = slc.generate_slctab(slc.BSD_SLC_TAB) result: dict[str, Any] = {} slc_set: dict[str, Any] = {} slc_unset: list[str] = [] slc_nosupport: list[str] = [] for slc_func, slc_def in slctab.items(): default_def = defaults.get(slc_func) if ( default_def is not None and slc_def.mask == default_def.mask and slc_def.val == default_def.val ): continue name = slc.name_slc_command(slc_func) if slc_def.nosupport: slc_nosupport.append(name) elif slc_def.val == theNULL: slc_unset.append(name) else: slc_set[name] = slc_def.val[0] if isinstance(slc_def.val, bytes) else slc_def.val if slc_set: result["set"] = slc_set if slc_unset: result["unset"] = sorted(slc_unset) if slc_nosupport: result["nosupport"] = sorted(slc_nosupport) return result def _create_protocol_fingerprint( writer: Union[TelnetWriter, TelnetWriterUnicode], probe_results: dict[str, ProbeResult], looped_options: Optional[list[str]] = None, ) -> dict[str, Any]: """ Create anonymized/summarized protocol fingerprint from session data. Fields are only included if negotiated. Environment variables are summarized as "True" (non- empty value) or "None" (empty string). :param writer: TelnetWriter instance. :param probe_results: Probe results from capability probing. :returns: Dict with anonymized protocol fingerprint data. """ fingerprint: dict[str, Any] = {"probed-protocol": "client"} protocol = _get_protocol(writer) extra_dict = getattr(protocol, "_extra", {}) if protocol else {} for key in ("HOME", "USER", "SHELL", "IPADDRESS"): if key in extra_dict: fingerprint[key] = "True" if extra_dict[key] else "None" # Encoding extracted from LANG if lang := writer.get_extra_info("LANG"): encoding = encoding_from_lang(lang) fingerprint["encoding"] = encoding if encoding else "None" else: fingerprint["encoding"] = "None" # TERM categorization (inlined) term = writer.get_extra_info("TERM") or writer.get_extra_info("term") if not term: fingerprint["TERM"] = "None" elif (term_lower := term.lower()) in PROTOCOL_MATCHED_TERMINALS: fingerprint["TERM"] = term_lower.capitalize() elif "ansi" in term_lower: fingerprint["TERM"] = "Yes-ansi" else: fingerprint["TERM"] = "Yes" charset = writer.get_extra_info("charset") fingerprint["charset"] = charset if charset else "None" ttype_cycle = _collect_ttype_cycle(writer) fingerprint["ttype-count"] = len(ttype_cycle) supported: list[str] = sorted( [name for name, info in probe_results.items() if info["status"] == "WILL"] ) refused: list[str] = sorted( [name for name, info in probe_results.items() if info["status"] in ("WONT", "timeout")] ) fingerprint["supported-options"] = supported fingerprint["refused-options"] = refused rejected = _collect_rejected_options(writer) if rejected.get("will"): fingerprint["rejected-will"] = rejected["will"] if rejected.get("do"): fingerprint["rejected-do"] = rejected["do"] if rejected.get("directional"): fingerprint["directional-refusals"] = rejected["directional"] if looped_options: fingerprint["looped-negotiation"] = looped_options linemode_probed = any( name == "LINEMODE" and info["status"] == "WILL" for name, info in probe_results.items() ) if linemode_probed: slc_tab = _collect_slc_tab(writer) if slc_tab: fingerprint["slc"] = slc_tab return fingerprint def _hash_fingerprint(data: dict[str, Any]) -> str: """Create deterministic 16-char SHA256 hash of a fingerprint dict.""" canonical = json.dumps(data, sort_keys=True, separators=(",", ":")) return hashlib.sha256(canonical.encode("utf-8")).hexdigest()[:16] def _count_protocol_folder_files(protocol_dir: str) -> int: """Count JSON files in protocol fingerprint directory.""" if not os.path.exists(protocol_dir): return 0 return sum(1 for f in os.listdir(protocol_dir) if f.endswith(".json")) def _count_fingerprint_folders(data_dir: Optional[str] = None, side: str = "client") -> int: """Count unique fingerprint folders in ``DATA_DIR/<side>/``.""" _dir = data_dir if data_dir is not None else DATA_DIR if _dir is None: return 0 side_dir = os.path.join(_dir, side) if not os.path.exists(side_dir): return 0 return sum(1 for f in os.listdir(side_dir) if os.path.isdir(os.path.join(side_dir, f))) def _save_fingerprint_to_dir( target_dir: str, session_hash: str, data: dict[str, Any], *, probe_key: str, data_dir: str, side: str, protocol_hash: str, ) -> Optional[str]: """ Save fingerprint data to a directory with limit checks and session appending. Handles fingerprint-count and file-count limits, creates directories as needed, and appends to existing session files when the session hash matches. :param target_dir: Directory path for this fingerprint's files. :param session_hash: Hash used for the filename. :param data: Complete fingerprint data dict to save. :param probe_key: Top-level key in *data* (e.g. ``"telnet-probe"``). :param data_dir: Base data directory for counting fingerprint folders. :param side: ``"client"`` or ``"server"`` subdirectory name. :param protocol_hash: Protocol fingerprint hash for logging. :returns: Path to saved file, or ``None`` if saving was skipped. """ is_new_dir = not os.path.exists(target_dir) if is_new_dir: if _count_fingerprint_folders(data_dir, side=side) >= FINGERPRINT_MAX_FINGERPRINTS: logger.warning( "max fingerprints (%d) exceeded, not saving %s", FINGERPRINT_MAX_FINGERPRINTS, protocol_hash, ) return None try: os.makedirs(target_dir, exist_ok=True) except OSError as exc: logger.warning("failed to create directory %s: %s", target_dir, exc) return None logger.info("new %s fingerprint %s", side, protocol_hash) else: if _count_protocol_folder_files(target_dir) >= FINGERPRINT_MAX_FILES: logger.warning( "fingerprint %s at file limit (%d), not saving", protocol_hash, FINGERPRINT_MAX_FILES, ) return None logger.debug("connection for %s fingerprint %s", side, protocol_hash) filepath = os.path.join(target_dir, f"{session_hash}.json") if os.path.exists(filepath): try: with open(filepath, encoding="utf-8") as f: existing = json.load(f) existing[probe_key]["session_data"] = data[probe_key]["session_data"] existing["sessions"].append(data["sessions"][0]) except (OSError, json.JSONDecodeError, KeyError) as exc: logger.warning("failed to read existing %s: %s", filepath, exc) existing = None if existing is not None: try: _atomic_json_write(filepath, existing) return filepath except OSError as exc: logger.warning("failed to update fingerprint: %s", exc) return None try: _atomic_json_write(filepath, data) return filepath except OSError as exc: logger.warning("failed to save fingerprint: %s", exc) return None _UNKNOWN_TERMINAL_HASH = "0" * 16 AMBIGUOUS_WIDTH_UNKNOWN = -1 def _create_session_fingerprint(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> dict[str, Any]: """Create session identity fingerprint from stable client fields.""" identity: dict[str, Any] = {} if peername := writer.get_extra_info("peername"): identity["client-ip"] = peername[0] if term := (writer.get_extra_info("TERM") or writer.get_extra_info("term")): identity["TERM"] = term for key in ("USER", "HOME", "SHELL", "LANG", "IPADDRESS", "charset"): if (value := writer.get_extra_info(key)) is not None and value: identity[key] = value return identity def _load_fingerprint_names(data_dir: Optional[str] = None) -> dict[str, str]: """Load fingerprint hash-to-name mapping from ``fingerprint_names.json``.""" _dir = data_dir if data_dir is not None else DATA_DIR if _dir is None: return {} names_file = os.path.join(_dir, "fingerprint_names.json") if not os.path.exists(names_file): return {} with open(names_file, encoding="utf-8") as f: result: dict[str, str] = json.load(f) return result def _save_fingerprint_name(hash_val: str, name: str, data_dir: Optional[str] = None) -> str: """ Save a fingerprint hash-to-name mapping in ``fingerprint_names.json``. Loads the existing names file, adds or updates the entry for *hash_val*, and writes it back atomically. :param hash_val: 16-character hex fingerprint hash. :param name: Human-readable name to associate. :param data_dir: Override data directory. Falls back to :data:`DATA_DIR`. :returns: Path to the saved names file. :raises ValueError: If *data_dir* is ``None`` and :data:`DATA_DIR` is unset. """ _dir = data_dir if data_dir is not None else DATA_DIR if _dir is None: raise ValueError("no data directory configured") os.makedirs(_dir, exist_ok=True) names_file = os.path.join(_dir, "fingerprint_names.json") names = _load_fingerprint_names(_dir) names[hash_val] = name _atomic_json_write(names_file, names) return names_file def _resolve_hash_name(hash_val: str, names: dict[str, str]) -> str: """Return human-readable name for a hash, falling back to the hash itself.""" return names.get(hash_val, hash_val) def _validate_suggestion(text: str) -> Optional[str]: """Validate a user-submitted fingerprint name suggestion.""" cleaned = text.strip() if not cleaned: return None for c in cleaned: if ord(c) < 32 or ord(c) == 127: return None return cleaned def _cooked_input(prompt: str) -> str: """Call :func:`input` with echo and canonical mode temporarily enabled.""" import termios fd = sys.stdin.fileno() old_attrs = termios.tcgetattr(fd) new_attrs = list(old_attrs) new_attrs[3] |= termios.ECHO | termios.ICANON termios.tcsetattr(fd, termios.TCSANOW, new_attrs) try: return input(prompt) except EOFError: return "" finally: termios.tcsetattr(fd, termios.TCSANOW, old_attrs) def _build_session_fingerprint( writer: Union[TelnetWriter, TelnetWriterUnicode], probe_results: dict[str, ProbeResult], probe_time: float, ) -> dict[str, Any]: """Build the session fingerprint dict (raw detailed data).""" extra = _collect_extra_info(writer) extra.pop("peername", None) extra.pop("sockname", None) ttype_cycle = _collect_ttype_cycle(writer) option_states = _collect_option_states(writer) timing = _collect_protocol_timing(writer) linemode_probed = probe_results.get("LINEMODE", {}).get("status") slc_tab = _collect_slc_tab(writer) if linemode_probed == "WILL" else {} probe_by_status: dict[str, dict[str, int]] = {} for name, info in probe_results.items(): status = info["status"] opt_byte = info["opt"][0] if isinstance(info["opt"], bytes) else info["opt"] if status not in probe_by_status: probe_by_status[status] = {} probe_by_status[status][name] = opt_byte timing["probe"] = probe_time result = { "extra": extra, "ttype_cycle": ttype_cycle, "option_states": option_states, "probe": probe_by_status, "timing": timing, } if slc_tab: result["slc_tab"] = slc_tab rejected = _collect_rejected_options(writer) if rejected: result["rejected"] = rejected if writer.comport_data: result["comport"] = writer.comport_data return result def _save_fingerprint_data( writer: Union[TelnetWriter, TelnetWriterUnicode], probe_results: dict[str, ProbeResult], probe_time: float, session_fp: Optional[dict[str, Any]] = None, looped: Optional[list[str]] = None, ) -> Optional[str]: """ Save comprehensive fingerprint data to a JSON file. Creates directory structure: ``DATA_DIR/client/<protocol-hash>/<probe-hash>/<session_hash>.json`` :param writer: TelnetWriter instance with full protocol access. :param probe_results: Probe results from capability probing. :param probe_time: Time taken for probing. :param session_fp: Pre-built session fingerprint, or None to build it. :returns: Path to saved file, or None if save skipped/failed. """ if DATA_DIR is None: return None if not os.path.isdir(DATA_DIR): os.makedirs(DATA_DIR, exist_ok=True) if session_fp is None: session_fp = _build_session_fingerprint(writer, probe_results, probe_time) protocol_fp = _create_protocol_fingerprint(writer, probe_results, looped_options=looped) telnet_hash = _hash_fingerprint(protocol_fp) session_identity = _create_session_fingerprint(writer) session_hash = _hash_fingerprint(session_identity) telnet_dir = os.path.join(DATA_DIR, "client", telnet_hash) probe_dir = None if os.path.exists(telnet_dir): for name in os.listdir(telnet_dir): candidate = os.path.join(telnet_dir, name) if os.path.isdir(candidate) and name != _UNKNOWN_TERMINAL_HASH: probe_dir = candidate break if probe_dir is None: probe_dir = os.path.join(telnet_dir, _UNKNOWN_TERMINAL_HASH) peername = writer.get_extra_info("peername") now = datetime.datetime.now(datetime.timezone.utc) session_entry = {"ip": str(peername[0]) if peername else None, "connected": now.isoformat()} data = { "telnet-probe": { "fingerprint": telnet_hash, "fingerprint-data": protocol_fp, "session_data": session_fp, }, "sessions": [session_entry], } return _save_fingerprint_to_dir( target_dir=probe_dir, session_hash=session_hash, data=data, probe_key="telnet-probe", data_dir=DATA_DIR, side="client", protocol_hash=telnet_hash, ) def _is_maybe_mud(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> bool: """Return whether the client looks like a MUD client.""" term = (writer.get_extra_info("TERM") or "").lower() if term in MUD_TERMINALS: return True for key in ("ttype1", "ttype2", "ttype3"): if (writer.get_extra_info(key) or "").lower() in MUD_TERMINALS: return True mud_opts = (GMCP, MSDP, MXP, MSP, ATCP, AARDWOLF) if any(writer.remote_option.enabled(opt) for opt in mud_opts): return True return False def _is_maybe_ms_telnet(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> bool: """ Return whether the client looks like Microsoft Windows telnet. Microsoft telnet reports ttype1="ANSI", ttype2="VT100", refuses CHARSET, and sends unsolicited WILL NAWS. The ttype cycle stalls after VT100. Sending a large NEW_ENVIRON sub-negotiation or a burst of legacy IAC DO commands crashes the client. :param writer: TelnetWriter instance. """ ttype1 = (writer.get_extra_info("ttype1") or "").upper() if ttype1 != "ANSI": return False ttype2 = (writer.get_extra_info("ttype2") or "").upper() if ttype2 and ttype2 != "VT100": return False return True
[docs] async def fingerprinting_server_shell( reader: Union[TelnetReader, TelnetReaderUnicode], writer: Union[TelnetWriter, TelnetWriterUnicode], ) -> None: """ Shell that probes client telnet capabilities and runs post-script. Immediately probes all telnet options on connect. If DATA_DIR is configured, saves fingerprint data and runs the post-script through a PTY so it can probe the client's terminal with ucs- detect. :param reader: TelnetReader instance. :param writer: TelnetWriter instance. """ from .server_pty_shell import pty_shell writer = cast(TelnetWriterUnicode, writer) probe_results, probe_time = await _run_probe(writer, verbose=False) # Switch syncterm to Topaz (Amiga) font, just for fun why not if (writer.get_extra_info("TERM") or "").lower() == "syncterm": writer.write("\x1b[0;40 D") await writer.drain() # Collect fingerprint data BEFORE disabling LINEMODE, so that # _collect_slc_tab sees remote_option[LINEMODE] as True. session_fp = _build_session_fingerprint(writer, probe_results, probe_time) # Detect telnet option re-negotiation loops (clients that would WILL/WONT # options they have already settled). looped = await probe_client_loop_detection(writer, probe_results, timeout=_PROBE_TIMEOUT) if looped: logger.debug("probe: %d looped options: %s", len(looped), looped) else: logger.debug("probe: no looped options detected") filepath = _save_fingerprint_data(writer, probe_results, probe_time, session_fp, looped=looped) # Disable LINEMODE if it was negotiated - stay in kludge mode (SGA+ECHO) # for PTY shell. LINEMODE causes echo loops with GNU telnet when running # ucs-detect (client's LIT_ECHO + PTY echo = feedback loop). if probe_results.get("LINEMODE", {}).get("status") == "WILL": writer.iac(DONT, LINEMODE) await writer.drain() await asyncio.sleep(0.1) client_has_sga = writer.remote_option.get(SGA, False) # Detect MUD clients early to skip escape-sequence probes _MUD_OPTIONS = {AARDWOLF, GMCP, MSDP, MXP, MSSP, ATCP} is_mud_client = any(writer.remote_option.get(opt, False) for opt in _MUD_OPTIONS) if not is_mud_client: for n in range(1, 10): ttype_n = (writer.get_extra_info(f"ttype{n}") or "").upper() if not ttype_n: break if ttype_n.startswith("MTTS "): is_mud_client = True break try: if filepath is not None: # Switch to latin-1 for PTY shell, lossless byte passthrough # required for binary protocols like ZMODEM. Done after probing # so that ucs-detect and other Unicode probes use UTF-8. if (writer.get_extra_info("TERM") or "").lower() == "syncterm": writer.fn_encoding = lambda **kw: "latin-1" writer.encoding_errors = "replace" if hasattr(reader, "fn_encoding"): reader.fn_encoding = lambda **kw: "latin-1" # Force peer IP into env for PTY subprocess logging peername = writer.get_extra_info("peername") if peername: writer._protocol._extra["IPADDRESS"] = peername[0] if client_has_sga and not is_mud_client: os.environ["TELNETLIB3_INTERACTIVE_TERMINAL"] = "1" else: os.environ.pop("TELNETLIB3_INTERACTIVE_TERMINAL", None) post_script = FINGERPRINT_POST_SCRIPT or "telnetlib3.fingerprinting_display" await pty_shell( reader, writer, sys.executable, ["-W", "ignore::RuntimeWarning:runpy", "-m", post_script, str(filepath)], raw_mode=True, ) writer.close() else: writer.close() except (ConnectionResetError, FileNotFoundError, BrokenPipeError, UnicodeDecodeError, OSError): pass
[docs] def fingerprinting_post_script(filepath: str) -> None: """ Post-fingerprint script that optionally runs ucs-detect for terminal probing. If ucs-detect is available in PATH, runs it to collect terminal capabilities and merges the results into the fingerprint data. Can be used as the TELNETLIB3_FINGERPRINT_POST_SCRIPT target:: TELNETLIB3_FINGERPRINT_POST_SCRIPT=telnetlib3.fingerprinting TELNETLIB3_DATA_DIR=./data telnetlib3-server --shell fingerprinting_server_shell :param filepath: Path to the saved fingerprint JSON file. """ from .fingerprinting_display import fingerprinting_post_script as _fps _fps(filepath)
[docs] def fingerprint_server_main() -> None: """ Entry point for ``telnetlib3-fingerprint-server`` CLI. Reuses :func:`~telnetlib3.server.parse_server_args` and :func:`~telnetlib3.server.run_server` with :class:`FingerprintingServer` as the default protocol factory and :func:`fingerprinting_server_shell` as the default shell. Accepts ``--data-dir`` to set the fingerprint data directory. Falls back to the ``TELNETLIB3_DATA_DIR`` environment variable, then to ``data/`` in the current directory. """ # local import is required to prevent circular imports from .server import _config, run_server, parse_server_args # noqa: PLC0415 global DATA_DIR def _add_extra_args(parser: argparse.ArgumentParser) -> None: parser.add_argument("--data-dir", default=DATA_DIR, help="directory for fingerprint data") args = parse_server_args(extra_args_fn=_add_extra_args) DATA_DIR = args.pop("data_dir") os.environ["TELNETLIB3_DATA_DIR"] = DATA_DIR if args["shell"] is _config.shell: args["shell"] = fingerprinting_server_shell args["protocol_factory"] = FingerprintingServer asyncio.run(run_server(**args))
def main() -> None: """CLI entry point for fingerprinting post-processing.""" if len(sys.argv) != 2: print(f"Usage: python -m {__name__} <filepath>", file=sys.stderr) sys.exit(1) fingerprinting_post_script(sys.argv[1]) if __name__ == "__main__": # pragma: no cover main()