Source code for telnetlib3.client

#!/usr/bin/env python3
"""Telnet Client API for the 'telnetlib3' python package."""

from __future__ import annotations

# std imports
import os
import ssl as ssl_module
import sys
import codecs
import struct
import asyncio
import argparse
import functools
from typing import Any, Dict, List, Tuple, Union, Callable, Optional, Sequence

# local
from telnetlib3 import accessories, client_base
from telnetlib3._types import ShellCallback
from telnetlib3.stream_reader import TelnetReader, TelnetReaderUnicode
from telnetlib3.stream_writer import TelnetWriter, TelnetWriterUnicode

__all__ = ("TelnetClient", "TelnetTerminalClient", "open_connection")

#: Default GMCP modules requested via ``Core.Supports.Set``.
#: Sub-modules are listed explicitly because not all servers treat
#: top-level subscriptions as wildcards.
_DEFAULT_GMCP_MODULES = [
    "Char 1",
    "Char.Vitals 1",
    "Char.Items 1",
    "Room 1",
    "Room.Info 1",
    "Comm 1",
    "Comm.Channel 1",
    "Group 1",
]


[docs] class TelnetClient(client_base.BaseClient): """ Telnet client that supports all common options. Useful for automation, appearing as a virtual terminal to the remote end without requiring an interactive terminal to run. """ #: On :meth:`send_env`, the value of 'LANG' will be 'C' for binary #: transmission. When encoding is specified (utf8 by default), the LANG #: variable must also contain a locale, this value is used, providing a #: full default LANG value of 'en_US.utf8' DEFAULT_LOCALE = "en_US" #: Default environment variables to send via NEW_ENVIRON DEFAULT_SEND_ENVIRON = ("TERM", "LANG", "COLUMNS", "LINES", "COLORTERM") def __init__( self, term: str = "unknown", cols: int = 80, rows: int = 25, tspeed: Tuple[int, int] = (38400, 38400), xdisploc: str = "", send_environ: Optional[Sequence[str]] = None, shell: Optional[ShellCallback] = None, encoding: Union[str, bool] = "utf8", encoding_errors: str = "strict", force_binary: bool = False, connect_minwait: float = 0, connect_maxwait: float = 4.0, compression: Optional[bool] = None, limit: Optional[int] = None, waiter_closed: Optional[asyncio.Future[None]] = None, _waiter_connected: Optional[asyncio.Future[None]] = None, gmcp_modules: Optional[List[str]] = None, ) -> None: """Initialize TelnetClient with terminal parameters.""" self._compression = compression super().__init__( shell=shell, encoding=encoding, encoding_errors=encoding_errors, force_binary=force_binary, connect_minwait=connect_minwait, connect_maxwait=connect_maxwait, limit=limit, waiter_closed=waiter_closed, _waiter_connected=_waiter_connected, ) self._gmcp_modules = gmcp_modules or list(_DEFAULT_GMCP_MODULES) self._gmcp_hello_sent = False self._send_environ = set(send_environ or self.DEFAULT_SEND_ENVIRON) self._extra.update( { "charset": encoding or "", # for our purposes, we only send the second part (encoding) of our # 'lang' variable, CHARSET negotiation does not provide locale # negotiation; this is better left to the real LANG variable # negotiated as-is by send_env(). # # So which locale should we represent? Rather than using the # locale.getpreferredencoding() method, we provide a deterministic # class value DEFAULT_LOCALE (en_US), derive and modify as needed. "lang": ("C" if not encoding else self.DEFAULT_LOCALE + "." + str(encoding)), "cols": cols, "rows": rows, "term": term, "tspeed": f"{tspeed[0]},{tspeed[1]}", "xdisploc": xdisploc, } )
[docs] def connection_made(self, transport: asyncio.BaseTransport) -> None: """ Handle connection made to server. Wire up telnet option callbacks for terminal type, speed, display, environment, window size, and character set negotiation. """ from telnetlib3.telopt import NAWS, TTYPE, TSPEED, CHARSET, XDISPLOC, NEW_ENVIRON super().connection_made(transport) # Set compression policy on writer self.writer.compression = self._compression # Wire extended rfc callbacks for requests of # terminal attributes, environment values, etc. for opt, func in ( (TTYPE, self.send_ttype), (TSPEED, self.send_tspeed), (XDISPLOC, self.send_xdisploc), (NEW_ENVIRON, self.send_env), (NAWS, self.send_naws), (CHARSET, self.send_charset), ): self.writer.set_ext_send_callback(opt, func) # Offer callbacks define what to include in outgoing requests # (e.g. what charsets to offer in SB CHARSET REQUEST). for opt, offer_func in ( (CHARSET, self.on_request_charset), (NEW_ENVIRON, self.on_request_environ), ): self.writer.set_ext_offer_callback(opt, offer_func) # Override the default handle_will method to detect when both sides support CHARSET # Store the original only on first connection to prevent chain growth on reconnect. if not hasattr(self.writer, "_original_handle_will"): self.writer._original_handle_will = self.writer.handle_will else: self.writer.handle_will = ( # type: ignore[method-assign] self.writer._original_handle_will ) original_handle_will = self.writer.handle_will writer = self.writer def enhanced_handle_will(opt: bytes) -> None: original_handle_will(opt) # If this was a WILL CHARSET from the server, and we also have WILL CHARSET enabled, # log that both sides support CHARSET. The server should initiate the actual REQUEST. if ( opt == CHARSET and writer.remote_option.enabled(CHARSET) and writer.local_option.enabled(CHARSET) ): self.log.debug("Both sides support CHARSET, ready for server to initiate REQUEST") self.writer.handle_will = enhanced_handle_will # type: ignore[method-assign] self._setup_gmcp()
def _setup_gmcp(self) -> None: """Wire GMCP callback and WILL-detection for Core.Hello handshake.""" from telnetlib3.telopt import GMCP self.writer.set_ext_callback(GMCP, self.on_gmcp) # Capture current handle_will (already includes CHARSET wrapper). # On reconnect, _original_handle_will was already restored in connection_made, # so this always wraps exactly once. original_handle_will_gmcp = self.writer.handle_will def _detect_gmcp_will(opt: bytes) -> None: original_handle_will_gmcp(opt) if opt == GMCP and self.writer.remote_option.enabled(GMCP): self._send_gmcp_hello() self.writer.handle_will = _detect_gmcp_will # type: ignore[method-assign] def _send_gmcp_hello(self) -> None: """Send ``Core.Hello`` and ``Core.Supports.Set`` after GMCP negotiation.""" if self._gmcp_hello_sent: return self._gmcp_hello_sent = True from telnetlib3.accessories import get_version self.writer.send_gmcp("Core.Hello", {"client": "telnetlib3", "version": get_version()}) self.writer.send_gmcp("Core.Supports.Set", self._gmcp_modules) self.log.info("GMCP handshake: Core.Hello + Core.Supports.Set %s", self._gmcp_modules)
[docs] def on_gmcp(self, package: str, data: Any) -> None: """Store incoming GMCP data on ``writer.ctx``, merging dict updates.""" gmcp = self.writer.ctx.gmcp_data if isinstance(data, dict) and isinstance(gmcp.get(package), dict): gmcp[package].update(data) else: gmcp[package] = data self.log.debug("GMCP: %s %r", package, data)
[docs] def send_ttype(self) -> str: """Callback for responding to TTYPE requests.""" result: str = self._extra["term"] return result
[docs] def send_tspeed(self) -> Tuple[int, int]: """Callback for responding to TSPEED requests.""" parts = self._extra["tspeed"].split(",") return (int(parts[0]), int(parts[1]))
[docs] def send_xdisploc(self) -> str: """Callback for responding to XDISPLOC requests.""" result: str = self._extra["xdisploc"] return result
[docs] def send_env(self, keys: Sequence[str]) -> Dict[str, Any]: """ Callback for responding to NEW_ENVIRON requests. Only sends variables listed in ``_send_environ`` (set via ``send_environ`` parameter or ``--send-environ`` CLI option). :param keys: Values are requested for the keys specified. When empty, all environment values that wish to be volunteered should be returned. :returns: Environment values requested, or an empty string for keys not available. A return value must be given for each key requested. """ # All available values all_env = { # Terminal info from connection parameters "LANG": self._extra["lang"], "TERM": self._extra["term"], "LINES": self._extra["rows"], "COLUMNS": self._extra["cols"], # Environment variables from os.environ "COLORTERM": os.environ.get("COLORTERM", ""), "USER": os.environ.get("USER", ""), "HOME": os.environ.get("HOME", ""), "SHELL": os.environ.get("SHELL", ""), # Note: DISPLAY intentionally not available (security) } # Filter to only allowed variables env = {k: v for k, v in all_env.items() if k in self._send_environ} return {key: env.get(key, "") for key in keys} or env
@staticmethod def _normalize_charset_name(name: str) -> str: """ Normalize server-advertised charset names for :func:`codecs.lookup`. Servers sometimes advertise non-standard encoding names that Python's codec registry does not recognise. This tries progressively simpler variations until one resolves: 1. Original name (spaces -> hyphens) 2. Leading zeros stripped from numeric parts (``iso-8859-02`` -> ``iso-8859-2``) 3. Hyphens removed entirely (``cp-1250`` -> ``cp1250``) 4. Hyphens removed from all but the first segment (``iso-8859-2`` kept) :param name: Raw charset name from the server. :returns: Normalized name suitable for :func:`codecs.lookup`. """ import re base = name.strip().replace(" ", "-") # Strip leading zeros from numeric segments: iso-8859-02 -> iso-8859-2 no_leading_zeros = re.sub(r"-0+(\d)", r"-\1", base) # All hyphens removed: cp-1250 -> cp1250 no_hyphens = base.replace("-", "") # Keep first hyphen-segment, collapse the rest: iso-8859-2 stays parts = no_leading_zeros.split("-") if len(parts) > 2: partial = parts[0] + "-" + "".join(parts[1:]) else: partial = no_leading_zeros for candidate in (base, no_leading_zeros, no_hyphens, partial): try: codecs.lookup(candidate) return candidate except LookupError: continue return base
[docs] def send_charset(self, offered: List[str]) -> str: """ Callback for responding to CHARSET requests. Simplified policy: - If client has explicit encoding that matches an offered charset, use it - If client has explicit encoding that isn't offered, - For Latin-1 (weak default), accept first viable offered encoding - For other explicit encodings, reject (keep client's choice) - If no explicit encoding preference, accept first viable offered encoding - If no viable encodings found, reject :param offered: CHARSET options offered by server. :returns: Character encoding agreed to be used, or empty string to reject. """ # Get client's desired encoding canonical name desired_name = None if self.default_encoding and isinstance(self.default_encoding, str): try: desired_name = codecs.lookup(self.default_encoding).name except LookupError: # Unknown encoding, treat as no explicit preference pass # Find first viable offered encoding and check for exact match first_viable = None matched_offer = None for offer in offered: try: canon = codecs.lookup(self._normalize_charset_name(offer)).name # Record first viable encoding if first_viable is None: first_viable = (offer, canon) # Check for exact match with desired encoding if desired_name and canon == desired_name: matched_offer = (offer, canon) break except LookupError: self.log.info("LookupError: encoding %s not available", offer) continue # Decision logic: # Case 1: Found exact match for desired encoding if matched_offer: offer, canon = matched_offer self._extra["charset"] = canon self._extra["lang"] = self.DEFAULT_LOCALE + "." + canon self.log.debug("encoding negotiated: %s", offer) return offer # Case 2: Has explicit encoding but not offered if desired_name: # Special case: Latin-1 is a weak default, accept first viable instead is_latin1 = desired_name in ("latin-1", "latin1", "iso8859-1", "iso-8859-1") if is_latin1 and first_viable: offer, canon = first_viable self._extra["charset"] = canon self._extra["lang"] = self.DEFAULT_LOCALE + "." + canon self.log.debug("encoding negotiated: %s", offer) return offer # Otherwise reject - keep client's explicit encoding self.log.debug("Declining offered charsets %s; prefer %s", offered, desired_name) return "" # Case 3: No explicit preference, use first viable if first_viable: offer, canon = first_viable self._extra["charset"] = canon self._extra["lang"] = self.DEFAULT_LOCALE + "." + canon self.log.debug("encoding negotiated: %s", offer) return offer # Case 4: No viable encodings found self.log.warning("No suitable encoding offered by server: %s", offered) return ""
[docs] def on_request_charset(self) -> List[str]: """ Offer callback for client-initiated CHARSET REQUEST, :rfc:`2066`. Called by :meth:`~.TelnetWriter.request_charset` to determine which character sets the client offers to the server. :returns: List of charset name strings to offer. """ return ["UTF-8", "LATIN1", "US-ASCII"]
[docs] def on_request_environ(self) -> List[str]: """ Offer callback for client-initiated NEW_ENVIRON SEND, :rfc:`1572`. Called by :meth:`~.TelnetWriter.request_environ` to determine which environment variable names the client requests from the server. :returns: List of environment variable names to request. """ return []
[docs] def send_naws(self) -> Tuple[int, int]: """ Callback for responding to NAWS requests. :returns: Client window size as (rows, columns). """ return (self._extra["rows"], self._extra["cols"])
[docs] def encoding(self, outgoing: Optional[bool] = None, incoming: Optional[bool] = None) -> str: """ Return encoding for the given stream direction. :param outgoing: Whether the return value is suitable for encoding bytes for transmission to server. :param incoming: Whether the return value is suitable for decoding bytes received by the client. :raises TypeError: When a direction argument, either ``outgoing`` or ``incoming``, was not set ``True``. :returns: ``'US-ASCII'`` for the directions indicated, unless ``BINARY`` :rfc:`856` has been negotiated for the direction indicated or ``force_binary`` is set ``True``. """ if not (outgoing or incoming): raise TypeError( "encoding arguments 'outgoing' and 'incoming' are required: toggle at least one." ) # may we encode in the direction indicated? outgoing_only = outgoing and not incoming incoming_only = not outgoing and incoming bidirectional = outgoing and incoming may_encode = ( (outgoing_only and self.writer.outbinary) or (incoming_only and self.writer.inbinary) or (bidirectional and self.writer.outbinary and self.writer.inbinary) ) if self.force_binary or may_encode: # The 'charset' value, initialized using keyword argument # default_encoding, may be re-negotiated later. Only the CHARSET # negotiation method allows the server to select an encoding, so # this value is reflected here by a single return statement. result: str = self._extra["charset"] return result return "US-ASCII"
[docs] class TelnetTerminalClient(TelnetClient): """Telnet client for sessions with a network virtual terminal (NVT)."""
[docs] def send_naws(self) -> Tuple[int, int]: """ Callback replies to request for window size, NAWS :rfc:`1073`. :returns: Window dimensions by lines and columns. """ return self._winsize()
[docs] def send_env(self, keys: Sequence[str]) -> Dict[str, Any]: """ Callback replies to request for env values, NEW_ENVIRON :rfc:`1572`. :returns: Super class value updated with window LINES and COLUMNS. """ env = super().send_env(keys) env["LINES"], env["COLUMNS"] = self._winsize() return env
@staticmethod def _winsize() -> Tuple[int, int]: try: import fcntl import termios fmt = "hhhh" buf = b"\x00" * struct.calcsize(fmt) val = fcntl.ioctl(sys.stdin.fileno(), termios.TIOCGWINSZ, buf) rows, cols, _, _ = struct.unpack(fmt, val) return rows, cols except (ImportError, IOError): try: sz = os.get_terminal_size() return sz.lines, sz.columns except OSError: return (int(os.environ.get("LINES", 25)), int(os.environ.get("COLUMNS", 80)))
[docs] async def open_connection( host: Optional[str] = None, port: int = 23, *, client_factory: Optional[Callable[..., client_base.BaseClient]] = None, family: int = 0, flags: int = 0, local_addr: Optional[Tuple[str, int]] = None, encoding: Union[str, bool] = "utf8", encoding_errors: str = "replace", force_binary: bool = False, term: str = "unknown", cols: int = 80, rows: int = 25, tspeed: Tuple[int, int] = (38400, 38400), xdisploc: str = "", shell: Optional[ShellCallback] = None, connect_minwait: float = 0, connect_maxwait: float = 3.0, connect_timeout: Optional[float] = None, compression: Optional[bool] = None, waiter_closed: Optional[asyncio.Future[None]] = None, _waiter_connected: Optional[asyncio.Future[None]] = None, limit: Optional[int] = None, send_environ: Optional[Sequence[str]] = None, ssl: Union[bool, ssl_module.SSLContext, None] = None, server_hostname: Optional[str] = None, ) -> Tuple[Union[TelnetReader, TelnetReaderUnicode], Union[TelnetWriter, TelnetWriterUnicode]]: """ Connect to a TCP Telnet server as a Telnet client. :param host: Remote Internet TCP Server host. :param port: Remote Internet host TCP port. :param client_factory: Client connection class factory. When ``None``, :class:`TelnetTerminalClient` is used when *stdin* is attached to a terminal, :class:`TelnetClient` otherwise. :param family: Same meaning as :meth:`asyncio.loop.create_connection`. :param flags: Same meaning as :meth:`asyncio.loop.create_connection`. :param local_addr: Same meaning as :meth:`asyncio.loop.create_connection`. :param encoding: The default assumed encoding, or ``False`` to disable unicode support. This value is used for decoding bytes received by and encoding bytes transmitted to the Server. These values are preferred in response to NEW_ENVIRON :rfc:`1572` as environment value ``LANG``, and by CHARSET :rfc:`2066` negotiation. The server's attached ``reader, writer`` streams accept and return unicode, unless this value is explicitly set ``False``. In that case, the attached streams interfaces are bytes-only. :param encoding_errors: Same meaning as :meth:`codecs.Codec.encode`. :param term: Terminal type sent for requests of TTYPE, :rfc:`930` or as Environment value TERM by NEW_ENVIRON negotiation, :rfc:`1672`. :param cols: Client window dimension sent as Environment value COLUMNS by NEW_ENVIRON negotiation, :rfc:`1672` or NAWS :rfc:`1073`. :param rows: Client window dimension sent as Environment value LINES by NEW_ENVIRON negotiation, :rfc:`1672` or NAWS :rfc:`1073`. :param tspeed: Client BPS line speed in form ``(rx, tx)`` for receive and transmit, respectively. Sent when requested by TSPEED, :rfc:`1079`. :param xdisploc: String transmitted in response for request of XDISPLOC, :rfc:`1086` by server (X11). :param connect_minwait: The client allows any additional telnet negotiations to be demanded by the server within this period of time before launching the shell. Servers should assert desired negotiation on-connect and in response to 1 or 2 round trips. A server that does not make any telnet demands, such as a TCP server that is not a telnet server, will delay the execution of ``shell`` for exactly this amount of time. :param connect_maxwait: If the remote end is not compliant, or otherwise confused by our demands, the shell continues anyway after the greater of this value has elapsed. A client that is not answering option negotiation will delay the start of the shell by this amount. :param connect_timeout: Timeout in seconds for the TCP connection to be established. When ``None`` (default), no timeout is applied and the connection attempt may block indefinitely. When specified, a :exc:`ConnectionError` is raised if the connection is not established within the given time. :param compression: MCCP compression policy. ``None`` (default) passively accepts compression when offered by the server. ``True`` actively requests MCCP2/MCCP3. ``False`` rejects all compression offers. :param force_binary: When ``True``, the encoding is used regardless of BINARY mode negotiation. :param waiter_closed: Future that completes when the connection is closed. :param shell: An async function that is called after negotiation completes, receiving arguments ``(reader, writer)``. :param limit: The buffer limit for reader stream. :param ssl: TLS configuration. ``True`` creates a default :func:`ssl.create_default_context` that verifies CA certificates. An :class:`ssl.SSLContext` gives full control. ``None`` (default) uses plain TCP. :param server_hostname: Hostname for TLS certificate verification. When ``ssl`` is truthy and *server_hostname* is ``None``, defaults to *host*. :return: The reader is a :class:`~.TelnetReader` instance, the writer is a :class:`~.TelnetWriter` instance. """ if client_factory is None: client_factory = TelnetClient if sys.stdin.isatty(): client_factory = TelnetTerminalClient def connection_factory() -> client_base.BaseClient: return client_factory( encoding=encoding, encoding_errors=encoding_errors, force_binary=force_binary, term=term, cols=cols, rows=rows, tspeed=tspeed, xdisploc=xdisploc, shell=shell, connect_minwait=connect_minwait, connect_maxwait=connect_maxwait, compression=compression, waiter_closed=waiter_closed, _waiter_connected=_waiter_connected, limit=limit, send_environ=send_environ, ) # Resolve TLS context ssl_context: Union[ssl_module.SSLContext, None] = None if ssl is True: ssl_context = ssl_module.create_default_context() elif isinstance(ssl, ssl_module.SSLContext): ssl_context = ssl conn_kwargs: Dict[str, Any] = {"family": family, "flags": flags, "local_addr": local_addr} if ssl_context is not None: conn_kwargs["ssl"] = ssl_context conn_kwargs["server_hostname"] = server_hostname or host or "localhost" try: _, protocol = await asyncio.wait_for( asyncio.get_running_loop().create_connection( connection_factory, host or "localhost", port, **conn_kwargs ), timeout=connect_timeout, ) except asyncio.TimeoutError as exc: raise ConnectionError( f"TCP connection to {host or 'localhost'}:{port}" f" timed out after {connect_timeout}s" ) from exc await protocol._waiter_connected assert protocol.reader is not None and protocol.writer is not None return protocol.reader, protocol.writer
async def run_client() -> None: """Command-line 'telnetlib3-client' entry point, via setuptools.""" args = _transform_args(_get_argument_parser().parse_args()) config_msg = f"Client configuration: {accessories.repr_mapping(args)}" log = accessories.make_logger( name=__name__, loglevel=args["loglevel"], logfile=args["logfile"], logfmt=args["logfmt"], filemode="w" if args.get("logfile_mode") == "rewrite" else "a", ) log.debug(config_msg) always_will: set[bytes] = args["always_will"] always_do: set[bytes] = args["always_do"] always_wont: set[bytes] = args["always_wont"] always_dont: set[bytes] = args["always_dont"] # Wrap client factory to inject always_will/always_do/always_wont/always_dont # and encoding flags before negotiation starts. environ_encoding = args["encoding"] or "ascii" encoding_explicit = environ_encoding not in ("utf8", "utf-8", "ascii") gmcp_modules: Optional[List[str]] = args.get("gmcp_modules") def _client_factory(**kwargs: Any) -> client_base.BaseClient: client: TelnetClient kwargs["gmcp_modules"] = gmcp_modules if sys.stdin.isatty(): client = TelnetTerminalClient(**kwargs) else: client = TelnetClient(**kwargs) orig_connection_made = client.connection_made def _patched_connection_made(transport: asyncio.BaseTransport) -> None: orig_connection_made(transport) if always_will: client.writer.always_will = always_will client.writer.always_do = always_do if always_wont: client.writer.always_wont = always_wont if always_dont: client.writer.always_dont = always_dont from .telopt import GMCP as _GMCP client.writer.passive_do = {_GMCP} client.writer.environ_encoding = environ_encoding client.writer._encoding_explicit = encoding_explicit client.connection_made = _patched_connection_made # type: ignore[method-assign] return client client_factory: Optional[Callable[..., client_base.BaseClient]] = _client_factory shell_callback = args["shell"] # Wrap shell to inject raw_mode flag and input translation for retro encodings raw_mode_val: Optional[bool] = args.get("raw_mode", False) if raw_mode_val is not False: from .client_shell import _INPUT_XLAT, _INPUT_SEQ_XLAT, InputFilter enc_key = (args.get("encoding", "") or "").lower() byte_xlat = dict(_INPUT_XLAT.get(enc_key, {})) if args.get("ascii_eol"): byte_xlat.pop(0x0D, None) byte_xlat.pop(0x0A, None) seq_xlat = {} if args.get("ansi_keys") else _INPUT_SEQ_XLAT.get(enc_key, {}) input_filter: Optional[InputFilter] = ( InputFilter(seq_xlat, byte_xlat) if (seq_xlat or byte_xlat) else None ) ascii_eol: bool = args.get("ascii_eol", False) _inner_shell = shell_callback async def _raw_shell( reader: Union[TelnetReader, TelnetReaderUnicode], writer_arg: Union[TelnetWriter, TelnetWriterUnicode], ) -> None: ctx = writer_arg.ctx ctx.raw_mode = raw_mode_val if ascii_eol: ctx.ascii_eol = True if input_filter is not None: ctx.input_filter = input_filter await _inner_shell(reader, writer_arg) shell_callback = _raw_shell # Wrap shell to inject typescript recording file handle typescript_path: Optional[str] = args.get("typescript") if typescript_path: _ts_inner = shell_callback async def _typescript_shell( reader: Union[TelnetReader, TelnetReaderUnicode], writer_arg: Union[TelnetWriter, TelnetWriterUnicode], ) -> None: ctx = writer_arg.ctx assert typescript_path is not None ts_file = open( # noqa: SIM115 typescript_path, "w" if args.get("typescript_mode") == "rewrite" else "a", encoding="utf-8", ) ctx.typescript_file = ts_file try: await _ts_inner(reader, writer_arg) finally: ts_file.close() shell_callback = _typescript_shell # Build connection kwargs explicitly to avoid pylint false positive connection_kwargs: Dict[str, Any] = { "encoding": args["encoding"], "tspeed": args["tspeed"], "shell": shell_callback, "term": args["term"], "force_binary": args["force_binary"], "encoding_errors": args["encoding_errors"], "connect_minwait": args["connect_minwait"], "connect_timeout": args["connect_timeout"], "send_environ": args["send_environ"], } if args.get("ssl"): connection_kwargs["ssl"] = args["ssl"] if client_factory is not None: connection_kwargs["client_factory"] = client_factory # connect _, writer = await open_connection(args["host"], args["port"], **connection_kwargs) # repl loop await writer.protocol.waiter_closed def _get_argument_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="telnetlib3-client", description="Telnet protocol client", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument("host", action="store", help="hostname") parser.add_argument("port", nargs="?", default=23, type=int, help="port number") parser.add_argument( "--always-do", action="append", default=[], metavar="OPT", help="always send DO for this option (comma-separated, named like GMCP" " or numeric like 201, repeatable)", ) parser.add_argument( "--always-dont", action="append", default=[], metavar="OPT", help="always send DONT for this option, refusing even natively supported" " options (comma-separated, named or numeric, repeatable)", ) parser.add_argument( "--always-will", action="append", default=[], metavar="OPT", help="always send WILL for this option (comma-separated, named like MXP" " or numeric like 91, repeatable)", ) parser.add_argument( "--always-wont", action="append", default=[], metavar="OPT", help="always send WONT for this option, refusing even natively supported" " options (comma-separated, named or numeric, repeatable)", ) parser.add_argument( "--ansi-keys", action="store_true", default=False, help="transmit raw ANSI escape sequences for arrow and function " "keys instead of encoding-specific control codes. Use for " "BBSes that expect ANSI cursor sequences.", ) parser.add_argument( "--ascii-eol", action="store_true", default=False, help="use ASCII CR/LF for line endings instead of encoding-native " "EOL (e.g. ATASCII 0x9B). Use for BBSes that display retro " "graphics but use standard CR/LF for line breaks.", ) parser.add_argument( "--compression", action=argparse.BooleanOptionalAction, default=None, help="MCCP compression: --compression to request, --no-compression to reject, " "omit to passively accept (default)", ) parser.add_argument( "--connect-maxwait", default=4.0, type=float, help="timeout for pending negotiation" ) parser.add_argument( "--connect-minwait", default=0, type=float, help="shell delay for negotiation" ) parser.add_argument( "--connect-timeout", default=10, type=float, help="timeout for TCP connection in seconds (default: 10)", ) parser.add_argument("--encoding", default="utf8", help="encoding name") parser.add_argument( "--encoding-errors", default="replace", help="handler for encoding errors", choices=("replace", "ignore", "strict"), ) parser.add_argument("--force-binary", action="store_true", help="force encoding", default=True) parser.add_argument( "--gmcp-modules", default=None, metavar="MODULES", help="comma-separated GMCP module specs to request " '(e.g. "Char 1,Room 1,IRE.Rift 1"). ' "When provided, replaces the built-in defaults.", ) mode_group = parser.add_mutually_exclusive_group() mode_group.add_argument( "--line-mode", action="store_true", default=False, help="force line-buffered input with local echo. Appropriate for " "simple command-line services.", ) mode_group.add_argument( "--raw-mode", action="store_true", default=False, help="force raw terminal mode (no line buffering, no local echo). " "Correct for BBS and retro systems. Default: auto-detect from " "server negotiation.", ) parser.add_argument("--logfile", help="filepath") parser.add_argument( "--logfile-mode", default="append", choices=["append", "rewrite"], dest="logfile_mode", help="Log file write mode: append (default) or rewrite.", ) parser.add_argument("--logfmt", default=accessories._DEFAULT_LOGFMT, help="log format") parser.add_argument("--loglevel", default="warn", help="log level") parser.add_argument( "--send-environ", default="TERM,LANG,COLUMNS,LINES,COLORTERM", help="comma-separated environment variables to send (NEW_ENVIRON)", ) parser.add_argument( "--shell", default="telnetlib3.telnet_client_shell", help="module.function_name" ) parser.add_argument("--speed", default=38400, type=int, help="connection speed") parser.add_argument( "--ssl", action="store_true", default=False, help="connect using TLS (TELNETS)" ) parser.add_argument( "--ssl-cafile", default=None, metavar="PATH", help="path to CA or self-signed certificate PEM file for TLS verification", ) parser.add_argument( "--ssl-no-verify", action="store_true", default=False, help="skip certificate verification for TLS (implies --ssl). " "WARNING: this is insecure -- connections are encrypted but " "the server identity is not verified, allowing " "man-in-the-middle attacks", ) parser.add_argument("--term", default=os.environ.get("TERM", "unknown"), help="terminal type") parser.add_argument( "--typescript", default=None, metavar="FILE", help="record session to FILE (like Unix script(1))", ) parser.add_argument( "--typescript-mode", default="append", choices=["append", "rewrite"], dest="typescript_mode", help="Typescript write mode: append (default) or rewrite.", ) return parser def _parse_option_arg(value: str) -> bytes: """ Resolve a telnet option name or integer to option bytes. :param value: Option name (e.g. ``"MXP"``) or decimal byte value (e.g. ``"91"``). :returns: Single-byte option value. :raises ValueError: When *value* is not a known name or valid integer. """ from .telopt import option_from_name try: return option_from_name(value) except KeyError: return bytes([int(value)]) def _parse_option_list(values: List[str]) -> set[bytes]: """ Parse a list of option arguments, splitting comma-separated values. :param values: List of option strings, each may be comma-separated. :returns: Set of parsed option bytes. """ result: set[bytes] = set() for v in values: for item in v.split(","): item = item.strip() if item: result.add(_parse_option_arg(item)) return result def _transform_args(args: argparse.Namespace) -> Dict[str, Any]: # Auto-enable force_binary for any non-ASCII encoding that uses high-bit bytes. from .encodings import FORCE_BINARY_ENCODINGS force_binary = args.force_binary # Three-state: True (forced raw), False (forced line), None (auto-detect) if args.raw_mode: raw_mode: Optional[bool] = True elif args.line_mode: raw_mode = False else: raw_mode = None enc_key = args.encoding.lower().replace("-", "_") if enc_key not in ("us_ascii", "ascii"): force_binary = True if enc_key in FORCE_BINARY_ENCODINGS: raw_mode = True # Build TLS context from --ssl / --ssl-cafile / --ssl-no-verify ssl_ctx: Union[ssl_module.SSLContext, None] = None if args.ssl or args.ssl_no_verify: if args.ssl_no_verify: ssl_ctx = ssl_module.SSLContext(ssl_module.PROTOCOL_TLS_CLIENT) ssl_ctx.check_hostname = False ssl_ctx.verify_mode = ssl_module.CERT_NONE elif args.ssl_cafile: ssl_ctx = ssl_module.create_default_context(cafile=args.ssl_cafile) else: ssl_ctx = ssl_module.create_default_context() return { "host": args.host, "port": args.port, "loglevel": args.loglevel, "logfile": args.logfile, "logfile_mode": args.logfile_mode, "logfmt": args.logfmt, "encoding": args.encoding, "tspeed": (args.speed, args.speed), "shell": accessories.function_lookup(args.shell), "term": args.term, "force_binary": force_binary, "encoding_errors": args.encoding_errors, "connect_minwait": args.connect_minwait, "connect_timeout": args.connect_timeout or None, "send_environ": tuple(v.strip() for v in args.send_environ.split(",") if v.strip()), "always_will": _parse_option_list(args.always_will), "always_do": _parse_option_list(args.always_do), "always_wont": _parse_option_list(args.always_wont), "always_dont": _parse_option_list(args.always_dont), "raw_mode": raw_mode, "ascii_eol": args.ascii_eol, "ansi_keys": args.ansi_keys, "ssl": ssl_ctx, "gmcp_modules": ( [m.strip() for m in args.gmcp_modules.split(",") if m.strip()] if args.gmcp_modules else None ), "compression": args.compression, "typescript": args.typescript, "typescript_mode": args.typescript_mode, } def main() -> None: """Entry point for telnetlib3-client command.""" try: asyncio.run(run_client()) except KeyboardInterrupt: pass except OSError as err: print(f"Error: {err}", file=sys.stderr) sys.exit(1) def _get_fingerprint_argument_parser() -> argparse.ArgumentParser: """Build argument parser for ``telnetlib3-fingerprint`` CLI.""" parser = argparse.ArgumentParser( description="Fingerprint a remote telnet server", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument("host", help="remote hostname or IP") parser.add_argument("port", nargs="?", default=23, type=int, help="port number") parser.add_argument( "--always-do", action="append", default=[], metavar="OPT", help="always send DO for this option (comma-separated, named like GMCP" " or numeric like 201, repeatable)", ) parser.add_argument( "--always-dont", action="append", default=[], metavar="OPT", help="always send DONT for this option, refusing even natively supported" " options (comma-separated, named or numeric, repeatable)", ) parser.add_argument( "--always-will", action="append", default=[], metavar="OPT", help="always send WILL for this option (comma-separated, named like MXP" " or numeric like 91, repeatable)", ) parser.add_argument( "--always-wont", action="append", default=[], metavar="OPT", help="always send WONT for this option, refusing even natively supported" " options (comma-separated, named or numeric, repeatable)", ) parser.add_argument( "--banner-max-bytes", default=65536, type=int, help="max bytes per banner read call" ) parser.add_argument( "--banner-max-wait", default=8.0, type=float, help="max seconds to wait for banner data" ) parser.add_argument( "--banner-quiet-time", default=2.0, type=float, help="seconds of silence before considering banner complete", ) parser.add_argument( "--connect-timeout", default=10, type=float, help="TCP connection timeout in seconds" ) parser.add_argument( "--data-dir", default=None, help="directory for fingerprint data (default: $TELNETLIB3_DATA_DIR)", ) parser.add_argument( "--encoding", default="ascii", metavar="CODEC", dest="stream_encoding", help="character encoding of the remote server (e.g. cp037 for EBCDIC)", ) parser.add_argument("--logfile", default=None, help="filepath") parser.add_argument("--logfmt", default=accessories._DEFAULT_LOGFMT, help="log format") parser.add_argument("--loglevel", default="warn", help="log level") parser.add_argument( "--mssp-wait", default=5.0, type=float, help="max seconds since connect to wait for MSSP data", ) parser.add_argument( "--save-json", default=None, metavar="PATH", help="write fingerprint JSON to this path" ) parser.add_argument( "--scan-type", choices=["quick", "full"], default="quick", help="probe depth: 'quick' probes core options only, " "'full' includes legacy options", ) parser.add_argument( "--send-env", action="append", metavar="KEY=VALUE", default=[], help="environment variable to send (repeatable)", ) parser.add_argument( "--set-name", default=None, metavar="NAME", help="store this name for the fingerprint in fingerprint_names.json", ) parser.add_argument( "--silent", action="store_true", help="suppress fingerprint output to stdout" ) parser.add_argument( "--ssl", action="store_true", default=False, help="connect using TLS (TELNETS)" ) parser.add_argument( "--ssl-cafile", default=None, metavar="PATH", help="path to CA or self-signed certificate PEM file for TLS verification", ) parser.add_argument( "--ssl-no-verify", action="store_true", default=False, help="skip certificate verification for TLS (implies --ssl). " "WARNING: this is insecure -- connections are encrypted but " "the server identity is not verified, allowing " "man-in-the-middle attacks", ) parser.add_argument( "--ttype", default="VT100", help="terminal type sent in response to TTYPE requests" ) return parser async def run_fingerprint_client() -> None: """ Connect to a remote telnet server and fingerprint it. Parses CLI arguments, binds them into :func:`~telnetlib3.server_fingerprinting.fingerprinting_client_shell` via :func:`functools.partial`, and runs the connection. """ from . import fingerprinting, server_fingerprinting args = _get_fingerprint_argument_parser().parse_args() if args.data_dir is not None: fingerprinting.DATA_DIR = args.data_dir log = accessories.make_logger( name=__name__, loglevel=args.loglevel, logfile=args.logfile, logfmt=args.logfmt ) log.debug("Fingerprint client: host=%s port=%d", args.host, args.port) shell = functools.partial( server_fingerprinting.fingerprinting_client_shell, host=args.host, port=args.port, save_path=args.save_json, silent=args.silent, set_name=args.set_name, environ_encoding=args.stream_encoding, scan_type=args.scan_type, mssp_wait=args.mssp_wait, banner_quiet_time=args.banner_quiet_time, banner_max_wait=args.banner_max_wait, banner_max_bytes=args.banner_max_bytes, ) # Parse --always-will/--always-do/--always-wont/--always-dont option names/numbers fp_always_will = _parse_option_list(args.always_will) fp_always_do = _parse_option_list(args.always_do) fp_always_wont = _parse_option_list(args.always_wont) fp_always_dont = _parse_option_list(args.always_dont) # Parse --send-env KEY=VALUE pairs extra_env: Dict[str, str] = {} for item in args.send_env: if "=" in item: k, v = item.split("=", 1) extra_env[k] = v else: extra_env[item] = "" # environ_encoding must be set on the writer BEFORE negotiation # starts, so we wrap the client factory to inject it during # connection_made (before begin_negotiation fires). environ_encoding = args.stream_encoding ttype = args.ttype def fingerprint_client_factory(**kwargs: Any) -> client_base.BaseClient: # Ensure extra env keys are in the send list if extra_env: send = set(kwargs.get("send_environ") or TelnetClient.DEFAULT_SEND_ENVIRON) send.update(extra_env.keys()) kwargs["send_environ"] = list(send) client = TelnetClient(**kwargs) orig_connection_made = client.connection_made orig_send_env = client.send_env def patched_connection_made(transport: asyncio.BaseTransport) -> None: orig_connection_made(transport) assert client.writer is not None client.writer.environ_encoding = environ_encoding client.writer._encoding_explicit = environ_encoding != "ascii" mud_opts = {opt for opt, _, _ in fingerprinting.EXTENDED_OPTIONS} client.writer.always_will = fp_always_will | mud_opts client.writer.always_do = fp_always_do | mud_opts if fp_always_wont: client.writer.always_wont = fp_always_wont if fp_always_dont: client.writer.always_dont = fp_always_dont def patched_send_env(keys: Sequence[str]) -> Dict[str, Any]: result = orig_send_env(keys) result.update(extra_env) return result client.connection_made = patched_connection_made # type: ignore[method-assign] if extra_env: client.send_env = patched_send_env # type: ignore[method-assign] return client # Build TLS context for fingerprint client fp_ssl: Union[ssl_module.SSLContext, None] = None if args.ssl or args.ssl_no_verify: if args.ssl_no_verify: fp_ssl = ssl_module.SSLContext(ssl_module.PROTOCOL_TLS_CLIENT) fp_ssl.check_hostname = False fp_ssl.verify_mode = ssl_module.CERT_NONE elif args.ssl_cafile: fp_ssl = ssl_module.create_default_context(cafile=args.ssl_cafile) else: fp_ssl = ssl_module.create_default_context() waiter_closed: asyncio.Future[None] = asyncio.get_running_loop().create_future() fp_conn_kwargs: Dict[str, Any] = { "host": args.host, "port": args.port, "client_factory": fingerprint_client_factory, "shell": shell, "encoding": False, "term": ttype, "connect_minwait": 0, "connect_maxwait": 4.0, "connect_timeout": args.connect_timeout or None, "waiter_closed": waiter_closed, } if fp_ssl is not None: fp_conn_kwargs["ssl"] = fp_ssl try: _, writer = await open_connection(**fp_conn_kwargs) except OSError as err: log.error("%s:%d: %s", args.host, args.port, err) raise await writer.protocol.waiter_closed def fingerprint_main() -> None: """Entry point for ``telnetlib3-fingerprint`` command.""" try: asyncio.run(run_fingerprint_client()) except OSError as err: print(f"Error: {err}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": # pragma: no cover main()