"""Telnet server shell implementations."""
from __future__ import annotations
# std imports
import types
import asyncio
from typing import Union, Optional, Generator, cast
# 3rd party
from wcwidth import wcswidth as _wcswidth
from wcwidth import iter_graphemes_reverse as _iter_graphemes_reverse
from wcwidth.escape_sequences import ZERO_WIDTH_PATTERN as _ZERO_WIDTH_PATTERN
# local
from . import slc, telopt, accessories
from .stream_reader import TelnetReader, TelnetReaderUnicode
from .stream_writer import TelnetWriter, TelnetWriterUnicode
CR, LF, NUL = ("\r", "\n", "\x00")
ESC = "\x1b"
# Characters after ESC that start multi-byte output sequences.
# Fe sequences (ESC + 0x40-0x5F) that overlap with these starters
# are not matched as 2-byte sequences, avoiding premature consumption
# of a CSI, OSC, DCS, APC, PM, or charset designation start.
_SEQ_STARTERS = frozenset("[])P_^(")
# SS3 (ESC O) introduces 3-byte input sequences: F1-F4 (ESC O P-S),
# application-mode arrows (ESC O A-D), and keypad keys (ESC O j-y, M, X).
# wcwidth's ZERO_WIDTH_PATTERN matches ESC O as a 2-byte Fe sequence
# (0x4F is in Fe range 0x40-0x5F), missing the third byte. We handle
# SS3 explicitly since it is an input sequence, not an output sequence.
_SS3 = "O"
async def filter_ansi(reader: TelnetReaderUnicode, _writer: TelnetWriterUnicode) -> str:
"""
Read and return the next non-ANSI-escape character from reader.
When wcwidth is available, handles CSI, OSC, DCS, APC, PM, charset designation, Fe, Fp, and SS3
sequences. Otherwise falls back to CSI and SS3 only.
"""
while True:
char = await reader.read(1)
if not char:
return ""
if char != ESC:
return char
next_char = await reader.read(1)
if not next_char:
return ""
# SS3: ESC O + one final byte (F1-F4, keypad, app-mode arrows).
# Handled before wcwidth's ZERO_WIDTH_PATTERN which would match
# ESC O as a 2-byte Fe sequence, missing the third byte.
if next_char == _SS3:
await reader.read(1)
continue
buf = ESC + next_char
if next_char in _SEQ_STARTERS:
# Multi-byte: CSI, OSC, DCS, APC, PM, or charset
while len(buf) < 256:
seq_char = await reader.read(1)
if not seq_char:
break
buf += seq_char
match = _ZERO_WIDTH_PATTERN.match(buf)
# Skip spurious 2-byte Fe matches on the
# ESC+starter prefix -- the real sequence is
# longer (CSI 3+, charset 3, OSC/DCS/APC/PM 4+)
if match and match.end() > 2:
if match.end() < len(buf):
return buf[match.end()]
break
else:
# Check for 2-byte Fe/Fp sequence
match = _ZERO_WIDTH_PATTERN.match(buf)
if not match:
return next_char
def _backspace_grapheme(command: str) -> tuple[str, str]:
"""Remove last grapheme cluster, return (new_command, echo_str)."""
if not command:
return command, ""
last = next(_iter_graphemes_reverse(command))
new_command = command[: len(command) - len(last)]
w = int(_wcswidth(last))
w = max(w, 1)
return new_command, "\b \b" * w
def _visible_width(text: str) -> int:
"""Return visible display width of text."""
result = int(_wcswidth(text))
return max(0, result)
class _LineEditor:
"""Shared line-editing state machine for readline and readline_async."""
def __init__(self, max_visible_width: int = 0) -> None:
self.command: str = ""
self.last_char: str = ""
self.max_visible_width: int = max_visible_width
def feed(self, char: str) -> tuple[str, Optional[str]]:
"""Feed one character, return (echo_str, command_or_none)."""
# LF/NUL after CR: silently consume
if char in (LF, NUL) and self.last_char == CR:
self.last_char = char
return "", None
# Line terminator (CR or LF)
if char in (CR, LF):
self.last_char = char
cmd = self.command
self.command = ""
return "", cmd
# Backspace
if char in ("\b", "\x7f"):
self.last_char = char
if self.command:
self.command, echo = _backspace_grapheme(self.command)
return echo, None
return "", None
# Regular character -- check max_visible_width
self.last_char = char
if self.max_visible_width and _visible_width(self.command + char) > self.max_visible_width:
return "", None
self.command += char
return char, None
__all__ = (
"telnet_server_shell",
"readline_async",
"readline",
"get_linemode",
"get_slcdata",
"do_toggle",
)
[docs]
async def telnet_server_shell(
reader: Union[TelnetReader, TelnetReaderUnicode],
writer: Union[TelnetWriter, TelnetWriterUnicode],
) -> None:
"""
A default telnet shell, appropriate for use with telnetlib3.create_server.
This shell provides a very simple REPL, allowing introspection and state toggling of the
connected client session.
"""
_reader = cast(TelnetReaderUnicode, reader)
writer = cast(TelnetWriterUnicode, writer)
ssl_obj = writer.get_extra_info("ssl_object")
if ssl_obj is not None:
version = ssl_obj.version() or "TLS"
writer.write(f"Ready (secure: {version})." + CR + LF)
else:
writer.write("Ready." + CR + LF)
command = None
while not writer.is_closing():
if command:
writer.write(CR + LF)
writer.write("tel:sh> ")
if not getattr(writer.protocol, "never_send_ga", False):
writer.send_ga()
await writer.drain()
command = await readline_async(_reader, writer)
if command is None:
return
writer.write(CR + LF)
if command == "quit":
# server hangs up on client
writer.write("Goodbye." + CR + LF)
break
if command == "help":
writer.write("quit, writer, slc, linemode, toggle [option|all], reader, proto, dump")
elif command == "writer":
# show 'writer' status
writer.write(repr(writer))
elif command == "reader":
# show 'reader' status
writer.write(repr(reader))
elif command == "proto":
# show 'proto' details of writer
writer.write(repr(writer.protocol))
elif command == "version":
writer.write(accessories.get_version())
elif command == "slc":
# show 'slc' support and data tables
writer.write(get_slcdata(writer))
elif command == "linemode":
writer.write(get_linemode(writer))
elif command.startswith("toggle"):
# toggle specified options
option = command[len("toggle ") :] or None
writer.write(do_toggle(writer, option))
elif command.startswith("dump"):
# dump [kb] [ms_delay] [drain|nodrain] [close|noclose]
#
# this allows you to experiment with the effects of
# 'drain', and, some longer-running programs that check
# for early break through writer.is_closing().
try:
kb_limit = int(command.split()[1])
except (ValueError, IndexError):
kb_limit = 1000
try:
delay = int(float(command.split()[2]) / 1000)
except (ValueError, IndexError):
delay = 0
# experiment with large sizes and 'nodrain', the server
# pretty much locks up and stops talking to new clients.
try:
drain = command.split()[3].lower() == "nodrain"
except IndexError:
drain = True
try:
do_close = command.split()[4].lower() == "close"
except IndexError:
do_close = False
msg = f"kb_limit={kb_limit}, delay={delay}," f" drain={drain}, do_close={do_close}:\r\n"
writer.write(msg)
for lineout in character_dump(kb_limit):
if writer.is_closing():
break
writer.write(lineout)
if drain:
await writer.drain()
if delay:
await asyncio.sleep(delay)
if not writer.is_closing():
writer.write(f"\r\n{kb_limit} OK")
if do_close:
break
elif command:
writer.write("no such command.")
writer.close()
def character_dump(kb_limit: int) -> Generator[str, None, None]:
"""Generate character dump output up to kb_limit kilobytes."""
num_bytes = 0
while (num_bytes) < (kb_limit * 1024):
for char in ("/", "\\"):
lineout = (char * 80) + "\033[1G"
yield lineout
num_bytes += len(lineout)
yield "\033[1G" + "wrote " + str(num_bytes) + " bytes"
[docs]
@types.coroutine
def readline(
_reader: Union[TelnetReader, TelnetReaderUnicode],
writer: Union[TelnetWriter, TelnetWriterUnicode],
max_visible_width: int = 0,
) -> Generator[Optional[str], str, None]:
"""
Blocking readline using generator yield/send protocol.
Characters are fed in via ``send()`` and complete lines are yielded.
Uses ``_LineEditor`` for grapheme-aware backspace and max_visible_width
support.
"""
_writer = cast(TelnetWriterUnicode, writer)
editor = _LineEditor(max_visible_width=max_visible_width)
inp = yield None
while True:
echo, cmd = editor.feed(inp)
if echo:
_writer.echo(echo)
inp = yield cmd
[docs]
async def readline_async(
reader: Union[TelnetReader, TelnetReaderUnicode],
writer: Union[TelnetWriter, TelnetWriterUnicode],
max_visible_width: int = 0,
) -> Optional[str]:
"""
Async readline that filters ANSI escape sequences.
Uses ``filter_ansi()`` to strip escape sequences and
``_LineEditor`` for grapheme-aware backspace and max_visible_width support.
"""
_reader = cast(TelnetReaderUnicode, reader)
_writer = cast(TelnetWriterUnicode, writer)
editor = _LineEditor(max_visible_width=max_visible_width)
while True:
next_char = await filter_ansi(_reader, _writer)
if not next_char:
return None
# Skip leading LF/NUL on empty buffer -- accounts for
# CR+LF pairs split across successive readline_async calls
if next_char in (LF, NUL) and not editor.command:
continue
echo, cmd = editor.feed(next_char)
if echo:
_writer.echo(echo)
if cmd is not None:
return cmd
readline2 = readline_async
[docs]
def get_slcdata(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> str:
"""Display Special Line Editing (SLC) characters."""
_slcs = sorted(
[
f"{slc.name_slc_command(slc_func):>15}: {slc_def}"
for (slc_func, slc_def) in sorted(writer.slctab.items())
if not (slc_def.nosupport or slc_def.val == slc.theNULL)
]
)
_unset = sorted(
[
slc.name_slc_command(slc_func)
for (slc_func, slc_def) in sorted(writer.slctab.items())
if slc_def.val == slc.theNULL
]
)
_nosupport = sorted(
[
slc.name_slc_command(slc_func)
for (slc_func, slc_def) in sorted(writer.slctab.items())
if slc_def.nosupport
]
)
return (
"Special Line Characters:\r\n"
+ "\r\n".join(_slcs)
+ "\r\nUnset by client: "
+ ", ".join(_unset)
+ "\r\nNot supported by server: "
+ ", ".join(_nosupport)
)
[docs]
def get_linemode(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> str:
"""Display current LINEMODE negotiation state."""
active = writer.remote_option.enabled(telopt.LINEMODE)
if not active:
return "LINEMODE not negotiated."
lm = writer.linemode
bits = (
f"EDIT={'on' if lm.edit else 'off'}"
f" TRAPSIG={'on' if lm.trapsig else 'off'}"
f" SOFT_TAB={'on' if lm.soft_tab else 'off'}"
f" LIT_ECHO={'on' if lm.lit_echo else 'off'}"
f" ACK={'on' if lm.ack else 'off'}"
)
return f"LINEMODE active. Mode: {writer.mode}\r\n{bits}"
[docs]
def do_toggle(writer: Union[TelnetWriter, TelnetWriterUnicode], option: Optional[str]) -> str:
"""Display or toggle telnet session parameters."""
linemode_active = writer.remote_option.enabled(telopt.LINEMODE)
tbl_opt = {
"echo": writer.local_option.enabled(telopt.ECHO),
"goahead": not writer.local_option.enabled(telopt.SGA),
"outbinary": writer.outbinary,
"inbinary": writer.inbinary,
"binary": writer.outbinary and writer.inbinary,
"xon-any": writer.xon_any,
"lflow": writer.lflow,
"linemode": linemode_active,
"linemode-edit": writer.linemode.edit if linemode_active else False,
"linemode-trapsig": writer.linemode.trapsig if linemode_active else False,
}
if not option:
return "\r\n".join(
f"{opt} {'ON' if enabled else 'off'}" for opt, enabled in sorted(tbl_opt.items())
)
msgs = []
if option in ("echo", "all"):
cmd = telopt.WONT if tbl_opt["echo"] else telopt.WILL
writer.iac(cmd, telopt.ECHO)
msgs.append(f"{telopt.name_command(cmd).lower()} echo.")
if option in ("goahead", "all"):
cmd = telopt.WILL if tbl_opt["goahead"] else telopt.WONT
writer.iac(cmd, telopt.SGA)
msgs.append(f"{telopt.name_command(cmd).lower()}" " suppress go-ahead.")
if option in ("outbinary", "binary", "all"):
cmd = telopt.WONT if tbl_opt["outbinary"] else telopt.WILL
writer.iac(cmd, telopt.BINARY)
msgs.append(f"{telopt.name_command(cmd).lower()} outbinary.")
if option in ("inbinary", "binary", "all"):
cmd = telopt.DONT if tbl_opt["inbinary"] else telopt.DO
writer.iac(cmd, telopt.BINARY)
msgs.append(f"{telopt.name_command(cmd).lower()} inbinary.")
if option in ("xon-any", "all"):
writer.xon_any = not tbl_opt["xon-any"]
writer.send_lineflow_mode()
msgs.append(f"xon-any {'en' if writer.xon_any else 'dis'}abled.")
if option in ("lflow", "all"):
writer.lflow = not tbl_opt["lflow"]
writer.send_lineflow_mode()
msgs.append(f"lineflow {'en' if writer.lflow else 'dis'}abled.")
if option in ("linemode",):
cmd = telopt.DONT if tbl_opt["linemode"] else telopt.DO
writer.iac(cmd, telopt.LINEMODE)
msgs.append(f"{telopt.name_command(cmd).lower()} linemode.")
if option in ("linemode-edit",):
if not tbl_opt["linemode"]:
msgs.append("linemode not active.")
else:
writer.request_linemode_change(edit=not tbl_opt["linemode-edit"])
msgs.append(f"linemode-edit {'dis' if tbl_opt['linemode-edit'] else 'en'}abled.")
if option in ("linemode-trapsig",):
if not tbl_opt["linemode"]:
msgs.append("linemode not active.")
else:
writer.request_linemode_change(trapsig=not tbl_opt["linemode-trapsig"])
msgs.append(f"linemode-trapsig {'dis' if tbl_opt['linemode-trapsig'] else 'en'}abled.")
if option not in tbl_opt and option != "all":
msgs.append("toggle: not an option.")
return "\r\n".join(msgs)