"""
Telnet server implementation with command-line interface.
The ``main`` function here is wired to the command line tool by name
telnetlib3-server. If this server's PID receives the SIGTERM signal,
it attempts to shutdown gracefully.
The :class:`TelnetServer` class negotiates a character-at-a-time (WILL-SGA,
WILL-ECHO) session with support for negotiation about window size, environment
variables, terminal type name, and to automatically close connections clients
after an idle period.
"""
from __future__ import annotations
# std imports
import ssl as ssl_module
import sys
import zlib
import codecs
import signal
import socket
import asyncio
import logging
import argparse
from typing import Any, Dict, List, Type, Tuple, Union, Callable, Optional, Sequence, NamedTuple
# local
from . import accessories, server_base
from ._types import ShellCallback
from .telopt import SB, SE, IAC, MCCP2_COMPRESS, name_commands
from .stream_reader import TelnetReader, TelnetReaderUnicode
from .stream_writer import TelnetWriter, TelnetWriterUnicode
# Check if PTY support is available (Unix-only modules: pty, termios, fcntl)
try:
import pty # noqa: F401 pylint:disable=unused-import
import fcntl # noqa: F401 pylint:disable=unused-import
import termios # noqa: F401 pylint:disable=unused-import
PTY_SUPPORT = True
except ImportError:
PTY_SUPPORT = False
__all__ = (
"TelnetServer",
"LinemodeServer",
"Server",
"create_server",
"run_server",
"parse_server_args",
)
class CONFIG(NamedTuple):
"""Default configuration for the telnet server."""
host: str = "localhost"
port: int = 6023
loglevel: str = "info"
logfile: Optional[str] = None
logfmt: str = accessories._DEFAULT_LOGFMT
shell: Callable[..., Any] = accessories.function_lookup("telnetlib3.telnet_server_shell")
encoding: str = "utf8"
force_binary: bool = False
timeout: int = 300
connect_maxwait: float = 1.5
pty_exec: Optional[str] = None
pty_args: Optional[List[str]] = None
pty_raw: bool = True
robot_check: bool = False
pty_fork_limit: int = 0
status_interval: int = 20
never_send_ga: bool = False
line_mode: bool = False
# Default config instance - use this to access default values
_config = CONFIG()
logger = logging.getLogger("telnetlib3.server")
[docs]
class TelnetServer(server_base.BaseServer):
"""Telnet Server protocol performing common negotiation."""
#: Maximum number of cycles to seek for all terminal types. We are seeking
#: the repeat or cycle of a terminal table, choosing the first -- but when
#: negotiated by MUD clients, we chose the must Unix TERM appropriate,
TTYPE_LOOPMAX = 8
# Derived methods from base class
def __init__(
self,
term: str = "unknown",
cols: int = 80,
rows: int = 25,
timeout: int = 300,
shell: Optional[ShellCallback] = None,
_waiter_connected: Optional[asyncio.Future[None]] = None,
encoding: Union[str, bool] = "utf8",
encoding_errors: str = "strict",
force_binary: bool = False,
never_send_ga: bool = False,
line_mode: bool = False,
connect_maxwait: float = 4.0,
compression: Optional[bool] = None,
limit: Optional[int] = None,
reader_factory: type = TelnetReader,
reader_factory_encoding: type = TelnetReaderUnicode,
writer_factory: type = TelnetWriter,
writer_factory_encoding: type = TelnetWriterUnicode,
) -> None:
"""Initialize TelnetServer with terminal parameters."""
super().__init__(
shell=shell,
_waiter_connected=_waiter_connected,
encoding=encoding,
encoding_errors=encoding_errors,
force_binary=force_binary,
never_send_ga=never_send_ga,
line_mode=line_mode,
connect_maxwait=connect_maxwait,
limit=limit,
reader_factory=reader_factory,
reader_factory_encoding=reader_factory_encoding,
writer_factory=writer_factory,
writer_factory_encoding=writer_factory_encoding,
)
self._environ_requested = False
self._echo_negotiated = False
self._mccp2_compressor: Optional[Any] = None
self._mccp2_pending: bool = False
self._compression: Optional[bool] = compression
self._mccp2_enabled: bool = compression is True
self._mccp2_orig_write: Optional[Any] = None
self.waiter_encoding: asyncio.Future[bool] = asyncio.Future()
self._tasks.append(self.waiter_encoding)
self._ttype_count = 1
self._timer: Optional[asyncio.TimerHandle] = None
self._extra.update(
{
"term": term,
"charset": encoding or "",
"cols": cols,
"rows": rows,
"timeout": timeout,
}
)
[docs]
def connection_made(self, transport: asyncio.BaseTransport) -> None:
"""Handle new connection and wire up telnet option callbacks."""
from .telopt import NAWS, TTYPE, TSPEED, CHARSET, XDISPLOC, NEW_ENVIRON
super().connection_made(transport)
# Set compression policy on writer
self.writer.compression = self._compression
# begin timeout timer
self.set_timeout()
# Wire extended rfc callbacks for responses to
# requests of terminal attributes, environment values, etc.
_ext_callbacks: List[Tuple[bytes, Callable[..., Any]]] = [
(NAWS, self.on_naws),
(NEW_ENVIRON, self.on_environ),
(TSPEED, self.on_tspeed),
(TTYPE, self.on_ttype),
(XDISPLOC, self.on_xdisploc),
(CHARSET, self.on_charset),
]
for tel_opt, callback_fn in _ext_callbacks:
self.writer.set_ext_callback(tel_opt, callback_fn)
# Wire up offer callbacks that return definitions for outgoing requests.
for tel_opt, callback_fn in [
(NEW_ENVIRON, self.on_request_environ),
(CHARSET, self.on_request_charset),
]:
self.writer.set_ext_offer_callback(tel_opt, callback_fn)
_tls_checked = False
[docs]
def data_received(self, data: bytes) -> None:
"""Process received data and reset timeout timer."""
if not self._tls_checked:
self._tls_checked = True
if data and data[0] == 0x16:
peername = self._transport.get_extra_info("peername", ("-", 0))
logger.warning(
"TLS ClientHello from %s:%s but server has no SSL"
" context -- closing connection",
peername[0],
peername[1],
)
self._transport.close()
return
self.set_timeout()
super().data_received(data)
# MCCP2: start compression once client confirms DO MCCP2
if (
self._mccp2_enabled
and not self._mccp2_pending
and self._mccp2_compressor is None
and self.writer.local_option.enabled(MCCP2_COMPRESS)
):
self._mccp2_start()
def _mccp2_start(self) -> None:
"""Send SB MCCP2 SE and start compressing server→client output."""
self._mccp2_pending = True
# All bytes after this SE are compressed.
self.writer.send_iac(IAC + SB + MCCP2_COMPRESS + IAC + SE)
self._mccp2_compressor = zlib.compressobj(
zlib.Z_BEST_COMPRESSION, zlib.DEFLATED, 12, 5, zlib.Z_DEFAULT_STRATEGY
)
# Wrap transport.write so all subsequent output is compressed
transport = self.writer._transport
orig_write = transport.write
def compressed_write(data: bytes) -> None:
if self._mccp2_compressor is not None:
compressed = self._mccp2_compressor.compress(data)
compressed += self._mccp2_compressor.flush(zlib.Z_SYNC_FLUSH)
orig_write(compressed)
else:
orig_write(data)
transport.write = compressed_write # type: ignore[method-assign]
self._mccp2_orig_write = orig_write
self.writer.mccp2_active = True
logger.debug("MCCP2 compression started (server→client)")
def _mccp2_end(self) -> None:
"""Stop MCCP2 compression, flush Z_FINISH."""
if self._mccp2_compressor is not None:
try:
assert self._mccp2_orig_write is not None
self._mccp2_orig_write(self._mccp2_compressor.flush(zlib.Z_FINISH))
except zlib.error as exc:
logger.debug("MCCP2 Z_FINISH flush error: %s", exc)
self._mccp2_compressor = None
self.writer._transport.write = self._mccp2_orig_write # type: ignore[method-assign]
self._mccp2_pending = False
self.writer.mccp2_active = False
logger.debug("MCCP2 compression ended (server→client)")
[docs]
def begin_negotiation(self) -> None:
"""Begin telnet negotiation by requesting terminal type."""
from .telopt import DO, TTYPE
super().begin_negotiation()
self.writer.iac(DO, TTYPE)
[docs]
def begin_advanced_negotiation(self) -> None:
"""
Request advanced telnet options from client.
``DO NEW_ENVIRON`` is deferred until the TTYPE cycle completes
so that Microsoft telnet (ANSI + VT100) can be detected first.
See ``_negotiate_environ()`` and GitHub issue #24.
``WILL ECHO`` is deferred until TTYPE reveals the client identity.
MUD clients (Mudlet, TinTin++, etc.) interpret ``WILL ECHO`` as
"password mode" and mask input. See ``_negotiate_echo()``.
"""
from .telopt import DO, SGA, NAWS, WILL, BINARY, CHARSET, MCCP2_COMPRESS, MCCP3_COMPRESS
super().begin_advanced_negotiation()
if not self.line_mode:
self.writer.iac(WILL, SGA)
# WILL ECHO is deferred -- see _negotiate_echo()
self.writer.iac(WILL, BINARY)
# DO NEW_ENVIRON is deferred -- see _negotiate_environ()
self.writer.iac(DO, NAWS)
if self.default_encoding:
self.writer.iac(DO, CHARSET)
# MCCP2/MCCP3: opt-in via compression=True, disabled over TLS
# (compress-then-encrypt is vulnerable to CRIME/BREACH attacks).
if self._mccp2_enabled:
ssl_obj = self.writer.get_extra_info("ssl_object")
if ssl_obj is None:
self.writer.iac(WILL, MCCP2_COMPRESS)
self.writer.iac(WILL, MCCP3_COMPRESS)
else:
logger.debug("MCCP disabled: TLS active (CRIME/BREACH mitigation)")
[docs]
def check_negotiation(self, final: bool = False) -> bool:
"""Check if negotiation is complete including encoding."""
from .telopt import DO, SB, TTYPE, CHARSET, NEW_ENVIRON
# If TTYPE cycle stalled or client refused TTYPE, trigger
# deferred ECHO and NEW_ENVIRON negotiation now. Only when
# advanced negotiation is active -- a raw TCP client that
# WONTs TTYPE should not be sent DO NEW_ENVIRON.
if not self._echo_negotiated and self._advanced:
ttype_refused = self.writer.remote_option.get(TTYPE) is False
if ttype_refused or final:
self._negotiate_echo()
if not self._environ_requested and self._advanced:
ttype_refused = self.writer.remote_option.get(TTYPE) is False
ttype_do_pending = self.writer.pending_option.get(DO + TTYPE)
ttype_sb_pending = self.writer.pending_option.get(SB + TTYPE)
if ttype_refused or final:
self._negotiate_environ()
elif not ttype_do_pending and not ttype_sb_pending:
# TTYPE fully resolved but on_ttype never called
# _negotiate_environ (shouldn't happen, but be safe)
self._negotiate_environ()
# Debug log to see which options are still pending
pending = [
(name_commands(opt), val) for opt, val in self.writer.pending_option.items() if val
]
if pending:
logger.debug("Pending options: %r", pending)
# Check if we're waiting for important subnegotiations
waiting_for_environ = (
SB + NEW_ENVIRON in self.writer.pending_option
and self.writer.pending_option[SB + NEW_ENVIRON]
)
waiting_for_charset = (
SB + CHARSET in self.writer.pending_option and self.writer.pending_option[SB + CHARSET]
)
if waiting_for_environ or waiting_for_charset:
if final:
logger.warning(
"Waiting for critical subnegotiation: environ=%s, charset=%s",
waiting_for_environ,
waiting_for_charset,
)
parent = super().check_negotiation()
# In addition to the base class negotiation check, periodically check
# for completion of bidirectional encoding negotiation.
result = self._check_encoding()
encoding = self.encoding(outgoing=True, incoming=True)
if not self.waiter_encoding.done() and result:
logger.debug("encoding complete: %r", encoding)
self.waiter_encoding.set_result(result)
elif not self.waiter_encoding.done() and self.writer.remote_option.get(TTYPE) is False:
# if the remote end doesn't support TTYPE, which is agreed upon
# to continue towards advanced negotiation of CHARSET, we assume
# the distant end would not support it, declaring encoding failed.
logger.debug(
"encoding failed after %1.2fs: %s, remote_option[TTYPE]=%s, result=%s",
self.duration,
encoding,
self.writer.remote_option.get(TTYPE),
result,
)
self.waiter_encoding.set_result(result) # False
return parent
elif not self.waiter_encoding.done() and final:
logger.debug("encoding failed after %1.2fs: %s", self.duration, encoding)
self.waiter_encoding.set_result(result) # False
return parent
# Now consider the pending critical options for the final return value
# This ensures we don't complete negotiation until env/charset are done
if waiting_for_environ or waiting_for_charset:
return False
return parent and result
# new methods
[docs]
def encoding(
self, outgoing: Optional[bool] = None, incoming: Optional[bool] = None
) -> Union[str, bool]:
"""
Return encoding for the given stream direction.
:param outgoing: Whether the return value is suitable for
encoding bytes for transmission to client end.
:param incoming: Whether the return value is suitable for
decoding bytes received from the client.
:raises TypeError: when a direction argument, either ``outgoing``
or ``incoming``, was not set ``True``.
:returns: ``'US-ASCII'`` for the directions indicated, unless
``BINARY`` :rfc:`856` has been negotiated for the direction
indicated or ``force_binary`` is set ``True``.
"""
if not (outgoing or incoming):
raise TypeError(
"encoding arguments 'outgoing' and 'incoming' are required: toggle at least one."
)
# may we encode in the direction indicated?
_outgoing_only = outgoing and not incoming
_incoming_only = not outgoing and incoming
_bidirectional = outgoing and incoming
may_encode = self.force_binary or (
(_outgoing_only and self.writer.outbinary)
or (_incoming_only and self.writer.inbinary)
or (_bidirectional and self.writer.outbinary and self.writer.inbinary)
)
if may_encode:
# prefer 'LANG' environment variable forwarded by client, if any.
# for modern systems, this is the preferred method of encoding
# negotiation.
_lang = self.get_extra_info("LANG", "")
if _lang and _lang != "C":
candidate = accessories.encoding_from_lang(_lang)
if candidate:
try:
codecs.lookup(candidate)
return candidate
except LookupError:
pass # fall through to charset or default
# otherwise, less common CHARSET negotiation may be found in many
# East-Asia BBS and Western MUD systems.
return self.get_extra_info("charset") or self.default_encoding
return "US-ASCII"
[docs]
def set_timeout(self, duration: int = -1) -> None:
"""
Restart or unset timeout for client.
:param duration: When specified as a positive integer,
schedules Future for callback of :meth:`on_timeout`. When ``-1``,
the value of ``self.get_extra_info('timeout')`` is used. When
non-True, it is canceled.
"""
if duration == -1:
duration = self.get_extra_info("timeout")
if self._timer is not None:
if self._timer in self._tasks:
self._tasks.remove(self._timer)
self._timer.cancel()
if duration:
loop = asyncio.get_event_loop()
self._timer = loop.call_later(duration, self.on_timeout)
self._tasks.append(self._timer)
self._extra["timeout"] = duration
# Callback methods
[docs]
def on_timeout(self) -> None:
"""
Callback received on session timeout.
Default implementation writes "Timeout." bound by CRLF and closes.
This can be disabled by calling :meth:`set_timeout` with
``duration`` value of ``0``.
"""
logger.debug("Timeout after %1.2fs", self.idle)
if isinstance(self.writer, TelnetWriterUnicode):
self.writer.write("\r\nTimeout.\r\n")
else:
self.writer.write(b"\r\nTimeout.\r\n")
self.timeout_connection()
[docs]
def on_naws(self, rows: int, cols: int) -> None:
"""
Callback receives NAWS response, :rfc:`1073`.
:param rows: screen size, by number of cells in height.
:param cols: screen size, by number of cells in width.
"""
self._extra.update({"rows": rows, "cols": cols})
[docs]
def on_request_environ(self) -> List[Union[str, bytes]]:
"""
Definition for NEW_ENVIRON request of client, :rfc:`1572`.
This method is a callback from :meth:`~.TelnetWriter.request_environ`,
first entered on receipt of (WILL, NEW_ENVIRON) by server. The return
value *defines the request made to the client* for environment values.
:returns: A list of US-ASCII character strings indicating the
environment keys the server requests of the client. If this list
contains the special byte constants, ``USERVAR`` or ``VAR``, the
client is allowed to volunteer any other additional user or system
values. An empty return value indicates that no request should be
made.
The default return value requests only common variables needed for
session setup. Override this method or see
:data:`~.fingerprinting.ENVIRON_EXTENDED` for a larger set used
during client fingerprinting.
.. note::
``USER`` is excluded when the client is Microsoft telnet
(ttype1=ANSI, ttype2=VT100) because requesting it crashes
``telnet.exe``. See GitHub issue #24.
"""
from .telopt import VAR, USERVAR
ttype1 = self.get_extra_info("ttype1") or ""
ttype2 = self.get_extra_info("ttype2") or ""
is_ms_telnet = ttype1 == "ANSI" and ttype2 == "VT100"
result: List[Union[str, bytes]] = []
if not is_ms_telnet:
result.append("USER")
result.extend(
[
"LOGNAME",
"DISPLAY",
"LANG",
"TERM",
"TERM_PROGRAM",
"COLUMNS",
"LINES",
"COLORTERM",
"EDITOR",
"IPADDRESS",
# Request any other VAR/USERVAR the client wants to send
VAR,
USERVAR,
]
)
return result
[docs]
def on_environ(self, mapping: Dict[str, str]) -> None:
"""Callback receives NEW_ENVIRON response, :rfc:`1572`."""
# A well-formed client responds with empty values for variables to
# mean "no value". They might have it, they just may not wish to
# divulge that information. We pop these keys as a side effect.
for key, val in list(mapping.items()):
if not val:
mapping.pop(key)
# because we are working with "untrusted input", we make one fair
# distinction: all keys received by NEW_ENVIRON are in uppercase.
# this ensures a client may not override trusted values such as
# 'peer'.
u_mapping = {key.upper(): val for key, val in list(mapping.items())}
logger.debug("on_environ received: %r", u_mapping)
self._extra.update(u_mapping)
# When the client provides LANG (with encoding suffix) or CHARSET,
# presume BINARY capability even without explicit BINARY negotiation.
has_charset = bool(u_mapping.get("CHARSET"))
lang_val = u_mapping.get("LANG", "")
has_lang_encoding = "." in lang_val and lang_val != "C"
if (has_charset or has_lang_encoding) and self.writer is not None:
self.writer._force_binary_on_protocol()
[docs]
def on_request_charset(self) -> List[str]:
"""
Definition for CHARSET request by client, :rfc:`2066`.
This method is a callback from :meth:`~.TelnetWriter.request_charset`,
first entered on receipt of (WILL, CHARSET) by server. The return
value *defines the request made to the client* for encodings.
:returns: A list of US-ASCII character strings indicating the
encodings offered by the server in its preferred order. An empty
return value indicates that no encodings are offered.
The default return value includes common encodings for both Western and Eastern scripts::
['UTF-8', 'UTF-16', 'LATIN1', 'US-ASCII', 'CP1252', 'ISO-8859-15', 'CP437',
'SHIFT_JIS', 'CP932', 'BIG5', 'CP950', 'GBK', 'GB2312', 'CP936', 'EUC-KR', 'CP949']
"""
return [
"UTF-8", # Most common modern encoding
"UTF-16", # Common Unicode encoding
"LATIN1", # ISO-8859-1, Western European
"CP1252", # Windows Western European
"ISO-8859-15", # Updated Western European (includes Euro symbol)
"CP437", # PC-DOS / US telnet BBS systems
# Eastern encodings
"SHIFT_JIS", # Japan
"CP932", # Japan (Windows code page)
"BIG5", # Taiwan/Hong Kong
"CP950", # Taiwan/Hong Kong (Windows code page)
"GBK", # Mainland China
"GB2312", # Mainland China
"CP936", # Mainland China (Windows code page)
"EUC-KR", # Korea
"CP949", # Korea (Windows code page)
"US-ASCII", # Basic ASCII
]
[docs]
def on_charset(self, charset: str) -> None:
"""Callback for CHARSET response, :rfc:`2066`."""
self._extra["charset"] = charset
[docs]
def on_tspeed(self, rx: str, tx: str) -> None:
"""Callback for TSPEED response, :rfc:`1079`."""
self._extra["tspeed"] = f"{rx},{tx}"
[docs]
def on_ttype(self, ttype: str) -> None:
"""Callback for TTYPE response, :rfc:`930`."""
# TTYPE may be requested multiple times, we honor this system and
# attempt to cause the client to cycle, as their first response may
# not be their most significant. All responses held as 'ttype{n}',
# where {n} is their serial response order number.
#
# The most recently received terminal type by the server is
# assumed TERM by this implementation, even when unsolicited.
key = f"ttype{self._ttype_count}"
self._extra[key] = ttype
if ttype:
self._extra["TERM"] = ttype
_lastval = self.get_extra_info(f"ttype{self._ttype_count - 1}")
# After first TTYPE, negotiate ECHO -- MUD clients are detected
# by ttype1 and never receive WILL ECHO (avoids password mode).
self._negotiate_echo()
# After ttype1: send DO NEW_ENVIRON now unless ttype1 is "ANSI",
# in which case we defer until ttype2 to detect Microsoft telnet
# (ANSI + VT100) which crashes on NEW_ENVIRON (issue #24).
if key == "ttype1" and ttype != "ANSI":
self._negotiate_environ()
elif key == "ttype2" and not self._environ_requested:
self._negotiate_environ()
if key != "ttype1" and ttype == self.get_extra_info("ttype1", None):
# cycle has looped, stop
logger.debug("ttype cycle stop at %s: %s, looped.", key, ttype)
self._negotiate_environ()
elif not ttype or self._ttype_count > self.TTYPE_LOOPMAX:
# empty reply string or too many responses!
logger.warning("ttype cycle stop at %s: %s.", key, ttype)
self._negotiate_environ()
elif self._ttype_count == 3 and ttype.upper().startswith("MTTS "):
val = self.get_extra_info("ttype2")
logger.debug("ttype cycle stop at %s: %s, using %s from ttype2.", key, ttype, val)
self._extra["TERM"] = val
self._negotiate_environ()
elif ttype == _lastval:
logger.debug("ttype cycle stop at %s: %s, repeated.", key, ttype)
self._negotiate_environ()
else:
logger.debug("ttype cycle cont at %s: %s.", key, ttype)
self._ttype_count += 1
self.writer.request_ttype()
[docs]
def on_xdisploc(self, xdisploc: str) -> None:
"""Callback for XDISPLOC response, :rfc:`1096`."""
self._extra["xdisploc"] = xdisploc
# private methods
def _negotiate_environ(self) -> None:
"""
Send ``DO NEW_ENVIRON``.
Called from :meth:`on_ttype` as soon as we have enough information:
- After ``ttype1`` when it is not ``"ANSI"``.
- After ``ttype2`` when ``ttype1`` *is* ``"ANSI"`` -- this gives
:meth:`on_request_environ` enough context to detect Microsoft
telnet and exclude ``USER`` (GitHub issue #24).
- From :meth:`check_negotiation` when TTYPE stalls or is refused.
"""
if self._environ_requested:
return
self._environ_requested = True
from .telopt import DO, NEW_ENVIRON
self.writer.iac(DO, NEW_ENVIRON)
def _negotiate_echo(self) -> None:
"""
Send ``WILL ECHO`` unless the client is a MUD client or line mode.
MUD clients (Mudlet, TinTin++, etc.) interpret ``WILL ECHO`` as
"password mode" and mask the input bar. We defer ECHO negotiation
until TTYPE arrives so MUD clients are detected first.
When :attr:`line_mode` is ``True``, ECHO is never sent so the
client stays in NVT local (line) mode.
Called from :meth:`on_ttype` on each TTYPE response, and from
:meth:`check_negotiation` when TTYPE stalls or is refused.
"""
if self._echo_negotiated:
return
self._echo_negotiated = True
if self.line_mode:
return
from .telopt import ECHO, WILL
from .fingerprinting import _is_maybe_mud
assert self.writer is not None
if _is_maybe_mud(self.writer):
logger.info("skipping WILL ECHO for MUD client")
return
self.writer.iac(WILL, ECHO)
def _check_encoding(self) -> bool:
# Periodically check for completion of ``waiter_encoding``.
from .telopt import DO, SB, BINARY, CHARSET
# Check if we need to request client to use BINARY mode for client-to-server communication
if (
self.writer.outbinary
and not self.writer.inbinary
and (DO + BINARY) not in self.writer.pending_option
):
logger.debug("BINARY in: direction request.")
self.writer.iac(DO, BINARY)
return False
# Check if CHARSET is enabled but no REQUEST has been sent yet
if (
self.writer.remote_option.enabled(CHARSET)
and self.writer.local_option.enabled(CHARSET)
and (SB + CHARSET) not in self.writer.pending_option
):
logger.debug("Initiating CHARSET REQUEST after capabilities negotiation")
self.writer.request_charset()
# are we able to negotiate BINARY bidirectionally?
# or, is force_binary=True ?
return (self.writer.outbinary and self.writer.inbinary) or self.force_binary
class _TLSAutoDetectProtocol(asyncio.Protocol):
"""
Protocol wrapper that auto-detects TLS vs plain telnet connections.
Reading is paused on connect, then a non-blocking ``MSG_PEEK`` on a
duplicated socket checks the first byte without consuming it. A TLS
ClientHello always begins with ``0x16`` (22); anything else (telnet IAC
``0xFF``, printable ASCII, etc.) is plain telnet.
Plain telnet clients typically wait for the server to speak first, so a
timeout (*detect_timeout* seconds) assumes plain telnet when no data
arrives promptly. TLS clients always send ClientHello immediately.
When TLS is detected, :meth:`loop.start_tls` upgrades the transport.
Plain connections resume reading and hand off directly.
"""
_PEEK_RETRY_SECS = 0.01
def __init__(
self,
ssl_context: ssl_module.SSLContext,
real_factory: Callable[[], asyncio.Protocol],
detect_timeout: float = 0.5,
) -> None:
self._ssl_context = ssl_context
self._detect_timeout_secs = detect_timeout
self._real_factory = real_factory
self._transport: Optional[asyncio.Transport] = None
self._detect_timer: Optional[asyncio.TimerHandle] = None
self._decided = False
self._buffered: bytes = b""
def connection_made(self, transport: asyncio.BaseTransport) -> None:
"""Pause reading and begin non-blocking peek loop."""
self._transport = transport # type: ignore[assignment]
transport.pause_reading() # type: ignore[attr-defined]
loop = asyncio.get_event_loop()
self._detect_timer = loop.call_later(self._detect_timeout_secs, self._on_detect_timeout)
loop.call_soon(self._try_peek)
def _cancel_timer(self) -> None:
"""Cancel the pending detect timeout."""
if self._detect_timer is not None:
self._detect_timer.cancel()
self._detect_timer = None
def _try_peek(self) -> None:
"""Non-blocking peek at the first byte via a duplicated socket."""
if self._decided:
return
if self._transport is None:
logger.debug("tls-auto: no transport in _try_peek (client disconnected?)")
return
tsock = self._transport.get_extra_info("socket")
if tsock is None:
logger.debug("tls-auto: no socket in _try_peek (client disconnected?)")
return
peek_sock = socket.fromfd(tsock.fileno(), tsock.family, socket.SOCK_STREAM)
peek_sock.setblocking(False)
try:
data = peek_sock.recv(1, socket.MSG_PEEK)
except BlockingIOError:
asyncio.get_event_loop().call_later(self._PEEK_RETRY_SECS, self._try_peek)
return
except OSError:
data = b""
finally:
peek_sock.close()
self._decided = True
self._cancel_timer()
if not data:
self._transport.close()
return
if data[0] == 0x16:
logger.debug("tls-auto: TLS ClientHello detected")
asyncio.ensure_future(self._upgrade_to_tls())
else:
logger.debug("tls-auto: non-TLS byte 0x%02x, plain telnet", data[0])
self._handoff_plain()
def _on_detect_timeout(self) -> None:
"""No data arrived -- assume plain telnet."""
self._detect_timer = None
if self._decided:
return
self._decided = True
logger.debug("tls-auto: no data in %.1fs, assuming plain telnet", self._detect_timeout_secs)
self._handoff_plain()
async def _upgrade_to_tls(self) -> None:
"""
Upgrade the plain transport to TLS, then hand off.
.. note::
On Python < 3.11, ``loop.start_tls(server_side=True)`` may hang
due to a bug in the ``_SSLPipe``-based ``SSLProtocol``
(rewritten in 3.11). See
https://github.com/python/cpython/issues/79156
"""
loop = asyncio.get_running_loop()
assert self._transport is not None
protocol = self._real_factory()
try:
# start_tls uses call_connection_made=False, so we must call
# connection_made ourselves with the returned SSL transport.
ssl_transport = await loop.start_tls(
self._transport, protocol, self._ssl_context, server_side=True
)
except (ssl_module.SSLError, OSError) as exc:
logger.debug("TLS handshake failed: %s", exc)
if not self._transport.is_closing():
self._transport.close()
return
assert ssl_transport is not None
logger.debug(
"tls-auto: TLS handshake succeeded for %s", self._transport.get_extra_info("peername")
)
protocol.connection_made(ssl_transport)
def _handoff_plain(self) -> None:
"""
Hand off to the real protocol as a plain telnet connection.
Any data delivered to this protocol while paused (a race on some Python versions) is
replayed into the real protocol.
"""
assert self._transport is not None
protocol = self._real_factory()
self._transport.set_protocol(protocol)
protocol.connection_made(self._transport)
if self._buffered:
protocol.data_received(self._buffered)
self._transport.resume_reading()
def data_received(self, data: bytes) -> None:
"""Buffer data that arrives during detection (race on some Pythons)."""
self._buffered += data
def connection_lost(self, exc: Optional[Exception]) -> None:
"""Connection dropped before detection completed."""
self._decided = True
self._cancel_timer()
[docs]
class LinemodeServer(TelnetServer):
"""
:class:`TelnetServer` subclass that negotiates LINEMODE EDIT.
In addition to the standard options negotiated by :class:`TelnetServer`,
this server sends ``DO LINEMODE`` during advanced negotiation, proposes
LINEMODE EDIT (local line editing by the client), and suppresses
``WILL ECHO`` so the client performs local echoing via its LINEMODE buffer.
Use with :func:`create_server` to enable RFC 1184 LINEMODE EDIT on a
:func:`~.telnet_server_shell` session or any custom shell.
"""
from . import slc as _slc_module
#: Propose LINEMODE EDIT (local line editing) instead of remote mode.
default_linemode = _slc_module.Linemode(_slc_module.LMODE_MODE_LOCAL)
[docs]
def begin_advanced_negotiation(self) -> None:
"""Negotiate standard options plus ``DO LINEMODE``."""
from .telopt import DO, LINEMODE
super().begin_advanced_negotiation()
# Propagate the protocol-level default_linemode to the writer so that
# TelnetWriter.handle_will(LINEMODE) proposes the correct mode (LOCAL/EDIT)
# rather than the TelnetWriter class default (REMOTE).
self.writer.default_linemode = self.default_linemode
self.writer.iac(DO, LINEMODE)
def _negotiate_echo(self) -> None:
"""Skip ``WILL ECHO`` — LINEMODE EDIT client handles local echo."""
if self._echo_negotiated:
return
self._echo_negotiated = True
def _enqueue_client(server: "Server", protocol: server_base.BaseServer) -> None:
"""
Push a completed protocol onto the server's client queue.
Discards the oldest entry when the bounded queue is full so that
long-running servers (which may never call
:meth:`Server.wait_for_client`) do not accumulate unbounded memory.
"""
try:
server._new_client.put_nowait(protocol)
except asyncio.QueueFull:
try:
server._new_client.get_nowait()
except asyncio.QueueEmpty:
pass
server._new_client.put_nowait(protocol)
[docs]
class Server:
"""
Telnet server that tracks connected clients.
Wraps asyncio.Server with protocol tracking and connection waiting.
Returned by :func:`create_server`.
"""
def __init__(self, server: Optional[asyncio.Server]) -> None:
"""Initialize wrapper around asyncio.Server."""
self._server: Optional[asyncio.Server] = server
self._protocols: List[server_base.BaseServer] = []
# Bounded queue prevents unbounded memory growth on long-running
# servers where wait_for_client() is never called. The capacity
# (1000) is far beyond any realistic wait_for_client() drain rate.
self._new_client: asyncio.Queue[server_base.BaseServer] = asyncio.Queue(maxsize=1000)
[docs]
def close(self) -> None:
"""Close the server, stop accepting new connections, and close all clients."""
self._server.close()
# Close all connected client transports
for protocol in list(self._protocols):
if hasattr(protocol, "_transport") and protocol._transport is not None:
protocol._transport.close()
[docs]
async def wait_closed(self) -> None:
"""Wait until the server and all client connections are closed."""
await self._server.wait_closed()
# Yield to event loop for pending close callbacks
await asyncio.sleep(0)
# Clear protocol list now that server is closed
self._protocols.clear()
@property
def sockets(self) -> Optional[Tuple["socket.socket", ...]]:
"""Return list of socket objects the server is listening on."""
return self._server.sockets
[docs]
def is_serving(self) -> bool:
"""Return True if the server is accepting new connections."""
return self._server.is_serving()
@property
def clients(self) -> List[server_base.BaseServer]:
"""
List of connected client protocol instances.
:returns: List of protocol instances for all connected clients.
"""
# Filter out closed protocols (lazy cleanup)
self._protocols = [p for p in self._protocols if not getattr(p, "_closing", False)]
return list(self._protocols)
[docs]
async def wait_for_client(self) -> server_base.BaseServer:
r"""
Wait for a client to connect and complete negotiation.
:returns: The protocol instance for the connected client.
Example::
server = await telnetlib3.create_server(port=6023)
client = await server.wait_for_client()
client.writer.write("Welcome!\r\n")
"""
return await self._new_client.get()
def _register_protocol(self, protocol: asyncio.Protocol) -> None:
"""Register a new protocol instance (called by factory)."""
# Prune dead protocols to prevent unbounded memory growth on
# long-running servers handling many short-lived connections.
self._protocols = [p for p in self._protocols if not getattr(p, "_closing", False)]
self._protocols.append(protocol) # type: ignore[arg-type]
if hasattr(protocol, "_waiter_connected"):
protocol._waiter_connected.add_done_callback(
lambda f, p=protocol: _enqueue_client(self, p) if not f.cancelled() else None
)
class StatusLogger:
"""Periodic status logger for connected clients."""
def __init__(self, server: Server, interval: int) -> None:
"""
Initialize status logger.
:param server: Server instance to monitor.
:param interval: Logging interval in seconds.
"""
self._server = server
self._interval = interval
self._task: Optional["asyncio.Task[None]"] = None
self._last_status: Optional[Dict[str, Any]] = None
def _get_status(self) -> Dict[str, Any]:
"""Get current status snapshot using IP:port pairs for change detection."""
clients = self._server.clients
client_data = []
for client in clients:
peername = client.get_extra_info("peername", ("-", 0))
client_data.append(
{
"ip": peername[0],
"port": peername[1],
"rx": getattr(client, "rx_bytes", 0),
"tx": getattr(client, "tx_bytes", 0),
"idle": int(getattr(client, "idle", 0)),
"tls": client.get_extra_info("ssl_object") is not None,
}
)
client_data.sort(key=lambda x: (x["ip"], x["port"]))
return {"count": len(clients), "clients": client_data}
def _status_changed(self, current: Dict[str, Any]) -> bool:
"""Check if status differs from last logged."""
if self._last_status is None:
return bool(current["count"] > 0)
return current != self._last_status
def _format_status(self, status: Dict[str, Any]) -> str:
"""Format status for logging."""
if status["count"] == 0:
return "0 clients connected"
def _fmt_client(c: Dict[str, Any]) -> str:
tls = " tls" if c["tls"] else ""
return f"{c['ip']}:{c['port']} (rx={c['rx']}, tx={c['tx']}, idle={c['idle']}{tls})"
client_info = ", ".join(_fmt_client(c) for c in status["clients"])
return f"{status['count']} client(s): {client_info}"
async def _run(self) -> None:
"""Run periodic status logging."""
while True:
await asyncio.sleep(self._interval)
status = self._get_status()
if self._status_changed(status):
logger.info("Status: %s", self._format_status(status))
self._last_status = status
def start(self) -> None:
"""Start the status logging task."""
if self._interval > 0:
self._task = asyncio.create_task(self._run())
def stop(self) -> None:
"""Stop the status logging task."""
if self._task:
self._task.cancel()
[docs]
async def create_server(
host: Optional[Union[str, Sequence[str]]] = None,
port: int = 23,
protocol_factory: Optional[Type[asyncio.Protocol]] = TelnetServer,
shell: Optional[ShellCallback] = None,
encoding: Union[str, bool] = "utf8",
encoding_errors: str = "strict",
force_binary: bool = False,
never_send_ga: bool = False,
line_mode: bool = False,
connect_maxwait: float = 4.0,
compression: Optional[bool] = None,
limit: Optional[int] = None,
term: str = "unknown",
cols: int = 80,
rows: int = 25,
timeout: int = 300,
ssl: Optional[ssl_module.SSLContext] = None,
tls_auto: Union[bool, float] = False,
) -> Server:
"""
Create a TCP Telnet server.
:param host: The host parameter can be a string, in that case the TCP
server is bound to host and port. The host parameter can also be a
sequence of strings, and in that case the TCP server is bound to all
hosts of the sequence.
:param port: Listen port for TCP server.
:param protocol_factory: An alternate protocol factory for the server.
When unspecified, :class:`TelnetServer` is used.
:param shell: An async function that is called after negotiation
completes, receiving arguments ``(reader, writer)``.
Default is :func:`~.telnet_server_shell`. The reader is a
:class:`~.TelnetReader` instance, the writer is a
:class:`~.TelnetWriter` instance.
:param encoding: The default assumed encoding, or ``False`` to disable
unicode support. Encoding may be negotiated to another value by
the client through NEW_ENVIRON :rfc:`1572` by sending environment value
of ``LANG``, or by any legal value for CHARSET :rfc:`2066` negotiation.
The server's attached ``reader, writer`` streams accept and return
unicode, or natural strings, "hello world", unless this value is
explicitly set to ``False``. In that case, the attached stream
interfaces are bytes-only, b"hello world".
:param encoding_errors: Same meaning as :meth:`codecs.Codec.encode`.
Default value is ``strict``.
:param force_binary: When ``True``, the encoding specified is
used for both directions even when BINARY mode, :rfc:`856`, is not
negotiated for the direction specified. This parameter has no effect
when ``encoding=False``.
Note that when combined with a default ``encoding``, use of this option
may prematurely cause data transmitted in the default encoding immediately
on-connect, before a "smart" telnet client or server can negotiate a
different one.
In most cases, so long as the initial login banner/etc is US-ASCII, this
may be no problem at all. If an encoding is assumed, as in many MUD and
BBS systems, the combination of ``force_binary`` with a default
``encoding`` is often preferred.
:param line_mode: When ``True``, the server does not send ``WILL SGA``
or ``WILL ECHO`` during negotiation. This keeps the client in NVT
local (line) mode, where the client performs its own line editing
and sends complete lines. Default is ``False`` (kludge mode).
:param term: Value returned for ``writer.get_extra_info('term')``
until negotiated by TTYPE :rfc:`930`, or NAWS :rfc:`1572`. Default value
is ``'unknown'``.
:param cols: Value returned for ``writer.get_extra_info('cols')``
until negotiated by NAWS :rfc:`1572`. Default value is 80 columns.
:param rows: Value returned for ``writer.get_extra_info('rows')``
until negotiated by NAWS :rfc:`1572`. Default value is 25 rows.
:param timeout: Causes clients to disconnect if idle for this duration,
in seconds. This ensures resources are freed on busy servers. When
explicitly set to ``False``, clients will not be disconnected for
timeout. Default value is 300 seconds (5 minutes).
:param connect_maxwait: If the remote end is not compliant, or
otherwise confused by our demands, the shell continues anyway after the
greater of this value has elapsed. A client that is not answering
option negotiation will delay the start of the shell by this amount.
:param compression: MCCP compression policy. ``None`` (default)
passively accepts compression if requested by the client. ``True``
advertises MCCP2/MCCP3 during advanced negotiation. ``False``
rejects all compression offers.
:param limit: The buffer limit for the reader stream.
:param ssl: An :class:`ssl.SSLContext` for TLS-encrypted connections
(TELNETS, :rfc:`855` over TLS). When provided, the server performs a
TLS handshake before any telnet data is exchanged. ``None`` (default)
creates a plain TCP server.
:param tls_auto: When truthy and *ssl* is provided, the server accepts
both TLS and plain telnet clients on the same port. A ``float``
value sets the number of seconds to wait for a TLS ClientHello
(``0x16``) before assuming a plain telnet connection; ``True`` uses
a default of 0.5 seconds. TLS clients send ClientHello immediately;
plain telnet clients typically wait for the server to speak first,
so the timeout distinguishes the two. ``False`` or ``0`` (default)
disables auto-detection. Requires *ssl* to be an
:class:`ssl.SSLContext`.
:return: A :class:`Server` instance that wraps the asyncio.Server
and provides access to connected client protocols via
:meth:`Server.wait_for_client` and :attr:`Server.clients`.
"""
if tls_auto and ssl is None:
raise ValueError("tls_auto requires an ssl SSLContext")
# normalize True → 0.5
if tls_auto is True:
tls_auto = 0.5
protocol_factory = protocol_factory or TelnetServer
telnet_server = Server(None)
def _make_telnet_protocol() -> asyncio.Protocol:
protocol: asyncio.Protocol
if issubclass(protocol_factory, TelnetServer):
protocol = protocol_factory(
shell=shell,
encoding=encoding,
encoding_errors=encoding_errors,
force_binary=force_binary,
never_send_ga=never_send_ga,
line_mode=line_mode,
connect_maxwait=connect_maxwait,
compression=compression,
limit=limit,
term=term,
cols=cols,
rows=rows,
timeout=timeout,
)
elif issubclass(protocol_factory, server_base.BaseServer):
protocol = protocol_factory(
shell=shell,
encoding=encoding,
encoding_errors=encoding_errors,
force_binary=force_binary,
never_send_ga=never_send_ga,
line_mode=line_mode,
connect_maxwait=connect_maxwait,
limit=limit,
)
else:
protocol = protocol_factory()
telnet_server._register_protocol(protocol)
return protocol
if tls_auto:
assert ssl is not None
def factory() -> asyncio.Protocol:
return _TLSAutoDetectProtocol(ssl, _make_telnet_protocol, tls_auto)
telnet_server._server = await asyncio.get_running_loop().create_server(factory, host, port)
else:
def factory() -> asyncio.Protocol:
return _make_telnet_protocol()
telnet_server._server = await asyncio.get_running_loop().create_server(
factory, host, port, ssl=ssl
)
return telnet_server
async def _sigterm_handler(server: Server, _log: logging.Logger) -> None:
logger.info("SIGTERM received, closing server.")
# This signals the completion of the server.wait_closed() Future,
# allowing the main() function to complete.
server.close()
[docs]
def parse_server_args(
extra_args_fn: Optional[Callable[[argparse.ArgumentParser], None]] = None,
) -> Dict[str, Any]:
"""
Parse command-line arguments for telnet server.
:param extra_args_fn: Optional callback to add extra arguments to the parser
before parsing. Used by ``telnetlib3-fingerprint-server`` to inject
``--data-dir``.
"""
# Extract arguments after '--' for PTY program before argparse sees them
argv = sys.argv[1:]
pty_args = []
if PTY_SUPPORT and "--" in argv:
idx = argv.index("--")
pty_args = argv[idx + 1 :]
argv = argv[:idx]
parser = argparse.ArgumentParser(
description="Telnet protocol server", formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument("host", nargs="?", default=_config.host, help="bind address")
parser.add_argument("port", nargs="?", type=int, default=_config.port, help="bind port")
parser.add_argument(
"--compression",
action=argparse.BooleanOptionalAction,
default=None,
help="MCCP compression: --compression to advertise, --no-compression to reject, "
"omit to passively accept (default)",
)
parser.add_argument(
"--connect-maxwait",
type=float,
default=_config.connect_maxwait,
help="timeout for pending negotiation",
)
parser.add_argument("--encoding", default=_config.encoding, help="encoding name")
parser.add_argument(
"--force-binary",
action="store_true",
default=_config.force_binary,
help="force binary transmission",
)
parser.add_argument(
"--line-mode",
action="store_true",
default=_config.line_mode,
help="keep clients in NVT line mode by not sending WILL SGA or "
"WILL ECHO during negotiation. Clients perform their own line "
"editing and send complete lines. Also sets cooked PTY mode "
"when combined with --pty-exec.",
)
parser.add_argument("--logfile", default=_config.logfile, help="filepath")
parser.add_argument("--logfmt", default=_config.logfmt, help="log format")
parser.add_argument("--loglevel", default=_config.loglevel, help="level name")
parser.add_argument(
"--never-send-ga",
action="store_true",
default=_config.never_send_ga,
help="never send IAC GA (Go-Ahead). Default sends GA when SGA is "
"not negotiated, which is correct for MUD clients but may "
"confuse some other clients.",
)
if PTY_SUPPORT:
parser.add_argument(
"--pty-exec",
metavar="PROGRAM",
default=_config.pty_exec,
help="execute PROGRAM in a PTY for each connection (use -- to pass args)",
)
parser.add_argument(
"--pty-fork-limit",
type=int,
metavar="N",
default=_config.pty_fork_limit,
help="limit concurrent PTY connections (0 disables)",
)
# Hidden backwards-compat: --pty-raw was the default since 2.5,
# keep it as a silent no-op so existing scripts don't break.
parser.add_argument("--pty-raw", action="store_true", default=False, help=argparse.SUPPRESS)
parser.add_argument(
"--robot-check",
action="store_true",
default=_config.robot_check,
help="check if client can render wide unicode (rejects bots)",
)
parser.add_argument(
"--shell",
default=_config.shell,
type=accessories.function_lookup,
help="module.function_name",
)
parser.add_argument(
"--ssl-certfile",
default=None,
metavar="PATH",
help="path to PEM certificate file for TLS (enables TELNETS)",
)
parser.add_argument(
"--ssl-keyfile", default=None, metavar="PATH", help="path to PEM private key file for TLS"
)
parser.add_argument(
"--status-interval",
type=int,
metavar="SECONDS",
default=_config.status_interval,
help=(
"periodic status log interval in seconds (0 to disable). "
"status only logged when connected clients has changed."
),
)
parser.add_argument("--timeout", default=_config.timeout, help="idle disconnect (0 disables)")
parser.add_argument(
"--tls-auto",
type=float,
nargs="?",
const=0.5,
default=0,
metavar="SECONDS",
help="accept both TLS and plain telnet on the same port;"
" value is seconds to wait for TLS ClientHello before"
" assuming plain telnet (default: 0.5, requires --ssl-certfile)",
)
if extra_args_fn is not None:
extra_args_fn(parser)
result = vars(parser.parse_args(argv))
result["pty_args"] = pty_args if PTY_SUPPORT else None
# --pty-raw is a hidden no-op (raw is now the default);
# --line-mode opts out of raw mode and suppresses WILL SGA/ECHO.
result.pop("pty_raw", None)
result["pty_raw"] = not result.get("line_mode", False)
if not PTY_SUPPORT:
result["pty_exec"] = None
result["pty_fork_limit"] = 0
result["pty_raw"] = False
# Auto-enable force_binary for any non-ASCII encoding that uses high-bit bytes.
enc_key = result["encoding"].lower().replace("-", "_")
if enc_key not in ("us_ascii", "ascii"):
result["force_binary"] = True
# Build SSLContext from --ssl-certfile / --ssl-keyfile
ssl_certfile = result.pop("ssl_certfile", None)
ssl_keyfile = result.pop("ssl_keyfile", None)
tls_auto = result.pop("tls_auto", False)
if ssl_certfile:
ctx = ssl_module.SSLContext(ssl_module.PROTOCOL_TLS_SERVER)
ctx.load_cert_chain(ssl_certfile, keyfile=ssl_keyfile)
result["ssl"] = ctx
else:
result["ssl"] = None
result["tls_auto"] = tls_auto
return result
[docs]
async def run_server(
host: str = _config.host,
port: int = _config.port,
loglevel: str = _config.loglevel,
logfile: Optional[str] = _config.logfile,
logfmt: str = _config.logfmt,
shell: Callable[..., Any] = _config.shell,
encoding: Union[str, bool] = _config.encoding,
force_binary: bool = _config.force_binary,
timeout: int = _config.timeout,
connect_maxwait: float = _config.connect_maxwait,
pty_exec: Optional[str] = _config.pty_exec,
pty_args: Optional[List[str]] = _config.pty_args,
pty_raw: bool = _config.pty_raw,
robot_check: bool = _config.robot_check,
pty_fork_limit: int = _config.pty_fork_limit,
status_interval: int = _config.status_interval,
never_send_ga: bool = _config.never_send_ga,
line_mode: bool = _config.line_mode,
compression: Optional[bool] = None,
protocol_factory: Optional[Type[asyncio.Protocol]] = None,
ssl: Optional[ssl_module.SSLContext] = None,
tls_auto: Union[bool, float] = False,
) -> None:
"""
Program entry point for server daemon.
This function configures a logger and creates a telnet server for the given keyword arguments,
serving forever, completing only upon receipt of SIGTERM.
"""
log = accessories.make_logger(
name="telnetlib3.server", loglevel=loglevel, logfile=logfile, logfmt=logfmt
)
if pty_exec:
if not PTY_SUPPORT:
raise NotImplementedError("PTY support is not available on this platform (Windows?)")
from .server_pty_shell import make_pty_shell
shell = make_pty_shell(pty_exec, pty_args, raw_mode=pty_raw)
# Wrap shell with guards if enabled
if robot_check or pty_fork_limit:
from .guard_shells import ConnectionCounter, busy_shell
from .guard_shells import robot_check as do_robot_check
from .guard_shells import robot_shell
counter = ConnectionCounter(pty_fork_limit) if pty_fork_limit else None
inner_shell = shell
async def guarded_shell(
reader: Union[TelnetReader, TelnetReaderUnicode],
writer: Union[TelnetWriter, TelnetWriterUnicode],
) -> None:
try:
# Check connection limit first
if counter and not counter.try_acquire():
try:
await busy_shell(reader, writer)
finally:
if not writer.is_closing():
writer.close()
return
try:
# Check robot if enabled
if robot_check:
passed = await do_robot_check(reader, writer)
if not passed:
await robot_shell(reader, writer)
if not writer.is_closing():
writer.close()
return
# Run actual shell
await inner_shell(reader, writer)
finally:
if counter:
counter.release()
except (ConnectionResetError, BrokenPipeError, EOFError):
logger.debug(
"Connection lost in guarded_shell: %s",
writer.get_extra_info("peername", "unknown"),
)
shell = guarded_shell
# log all function arguments.
_locals = locals()
_cfg_mapping = ", ".join((f"{field}={{{field}}}" for field in CONFIG._fields)).format(**_locals)
logger.debug("Server configuration: %s", _cfg_mapping)
loop = asyncio.get_running_loop()
# bind
server = await create_server(
host,
port,
shell=shell,
protocol_factory=protocol_factory,
encoding=encoding,
force_binary=force_binary,
never_send_ga=never_send_ga,
line_mode=line_mode,
connect_maxwait=connect_maxwait,
compression=compression,
timeout=timeout,
ssl=ssl,
tls_auto=tls_auto,
)
# SIGTERM cases server to gracefully stop
loop.add_signal_handler(signal.SIGTERM, asyncio.ensure_future, _sigterm_handler(server, log))
# Start periodic status logger if enabled
status_logger = None
if status_interval > 0:
status_logger = StatusLogger(server, status_interval)
status_logger.start()
logger.info("Server ready on %s:%s", host, port)
# await completion of server stop
try:
await server.wait_closed()
finally:
# stop status logger
if status_logger:
status_logger.stop()
# remove signal handler on stop
loop.remove_signal_handler(signal.SIGTERM)
logger.info("Server stop.")
def main() -> None:
"""Entry point for telnetlib3-server command."""
asyncio.run(run_server(**parse_server_args()))
if __name__ == "__main__": # pragma: no cover
main()