Source code for telnetlib3.server_pty_shell

"""
PTY shell implementation for telnetlib3.

This module provides the ability to spawn PTY-connected programs (bash, tmux, nethack, etc.) for
each telnet connection, with proper terminal negotiation forwarding.
"""

from __future__ import annotations

# std imports
import os
import sys
import time
import shlex
import codecs
import struct
import asyncio
import logging
from typing import Any, Dict, List, Tuple, Union, Callable, Optional, Awaitable, cast

# local
from .telopt import SGA, ECHO, NAWS, WONT
from .stream_reader import TelnetReader, TelnetReaderUnicode
from .stream_writer import TelnetWriter, TelnetWriterUnicode

__all__ = ("make_pty_shell", "pty_shell", "PTYSpawnError")

# Delay between termination signals (seconds)
_TERMINATE_DELAY = 0.1

# Debounce delay for NAWS updates (seconds)
_NAWS_DEBOUNCE = 0.2

# Idle delay before sending IAC GA (seconds)
_GA_IDLE = 0.1

# Polling interval for _wait_for_terminal_info (seconds)
_TERMINAL_INFO_POLL = 0.05


[docs] class PTYSpawnError(Exception): """Raised when PTY child process fails to exec."""
logger = logging.getLogger("telnetlib3.server_pty_shell") # Synchronized Output sequences (DEC private mode 2026) # https://gist.github.com/christianparpart/d8a62cc1ab659194337d73e399004036 _BSU = b"\x1b[?2026h" # Begin Synchronized Update _ESU = b"\x1b[?2026l" # End Synchronized Update def _platform_check() -> None: """Verify platform supports PTY operations.""" if sys.platform == "win32": raise NotImplementedError("PTY support is not available on Windows") class PTYSession: """Manages a PTY session lifecycle.""" def __init__( self, reader: Union[TelnetReader, TelnetReaderUnicode], writer: Union[TelnetWriter, TelnetWriterUnicode], program: str, args: Optional[List[str]], *, preexec_fn: Optional[Callable[[], None]] = None, raw_mode: bool = False, ) -> None: """ Initialize PTY session. :param reader: TelnetReader instance. :param writer: TelnetWriter instance. :param program: Path to program to execute. :param args: List of arguments for the program. :param preexec_fn: Optional callable to run in child before exec. Called with no arguments after fork but before _setup_child. Useful for test coverage tracking in the forked child process. :param raw_mode: If True, disable PTY echo and canonical mode. Use for programs that handle their own terminal I/O (e.g., blessed, curses, ucs-detect). """ self.reader = reader self.writer = writer self.program = program self.args = args or [] self.preexec_fn = preexec_fn self.raw_mode = raw_mode self.master_fd: Optional[int] = None self.child_pid: Optional[int] = None self._closing = False self._output_buffer = b"" self._in_sync_update = False self._decoder: Optional[codecs.IncrementalDecoder] = None self._decoder_charset: Optional[str] = None self._naws_pending: Optional[Tuple[int, int]] = None self._naws_timer: Optional[asyncio.TimerHandle] = None self._ga_timer: Optional[asyncio.TimerHandle] = None self.exit_code: Optional[int] = None def start(self) -> None: """ Fork PTY, configure environment, and exec program. :raises PTYSpawnError: If the child process fails to exec. """ import pty import fcntl _platform_check() env = self._build_environment() rows, cols = self._get_window_size() # Create pipe for exec error detection (ptyprocess pattern). # Child sets close-on-exec; successful exec closes pipe automatically. # If exec fails, child writes error through pipe before exiting. exec_err_pipe_read, exec_err_pipe_write = os.pipe() self.child_pid, self.master_fd = pty.fork() if self.child_pid == 0: # Child process os.close(exec_err_pipe_read) fcntl.fcntl(exec_err_pipe_write, fcntl.F_SETFD, fcntl.FD_CLOEXEC) # Coverage object from preexec_fn, saved before exec child_cov = None if self.preexec_fn is not None: try: child_cov = self.preexec_fn() except Exception as e: self._write_exec_error(exec_err_pipe_write, e) os._exit(1) self._setup_child(env, rows, cols, exec_err_pipe_write, child_cov=child_cov) else: # Parent process os.close(exec_err_pipe_write) exec_err_data = os.read(exec_err_pipe_read, 4096) os.close(exec_err_pipe_read) if exec_err_data: self._handle_exec_error(exec_err_data) cmd_str = shlex.join([self.program] + self.args) logger.debug("forked PTY: pid=%d fd=%d cmd=%s", self.child_pid, self.master_fd, cmd_str) self._setup_parent() pid, status = os.waitpid(self.child_pid, os.WNOHANG) if pid: logger.warning("child already exited: status=%d", status) def _write_exec_error(self, pipe_fd: int, exc: Exception) -> None: """Write exception info to pipe for parent to read.""" ename = type(exc).__name__ msg = f"{ename}:{getattr(exc, 'errno', 0)}:{exc}" os.write(pipe_fd, msg.encode("utf-8", errors="replace")) os.close(pipe_fd) def _handle_exec_error(self, data: bytes) -> None: """Parse exec error from child and raise appropriate exception.""" try: parts = data.decode("utf-8", errors="replace").split(":", 2) if len(parts) == 3: errclass, _errno_s, errmsg = parts raise PTYSpawnError(f"{errclass}: {errmsg}") raise PTYSpawnError(f"Exec failed: {data!r}") except PTYSpawnError: raise except Exception as exc: raise PTYSpawnError(f"Exec failed: {data!r}") from exc def _build_environment(self) -> Dict[str, str]: """Build environment dict from negotiated values.""" env = os.environ.copy() term = self.writer.get_extra_info("TERM", "xterm") if term: term_lower = term.lower() # vt100/vt52/vtnt terminfo entries have padding delays ($<2>) # that render as visible garbage. Force 'ansi' which has none. if term_lower in ("vt100", "vtnt", "vt52"): term_lower = "ansi" env["TERM"] = term_lower rows = self.writer.get_extra_info("rows") cols = self.writer.get_extra_info("cols") if rows: env["LINES"] = str(rows) if cols: env["COLUMNS"] = str(cols) lang = self.writer.get_extra_info("LANG") if lang: env["LANG"] = lang env["LC_ALL"] = lang else: charset = self.writer.get_extra_info("charset") if charset: env["LANG"] = f"en_US.{charset}" for key in ("DISPLAY", "USER", "COLORTERM", "HOME", "SHELL", "LOGNAME", "IPADDRESS"): val = self.writer.get_extra_info(key) if val: env[key] = val return env def _get_window_size(self) -> Tuple[int, int]: """Get window size from negotiated values.""" rows: int = self.writer.get_extra_info("rows", 25) cols: int = self.writer.get_extra_info("cols", 80) return rows, cols def _setup_child( self, env: Dict[str, str], rows: int, cols: int, exec_err_pipe: int, *, child_cov: Any = None, ) -> None: """Child process setup before exec.""" # Note: pty.fork() already calls setsid() for the child, so we don't need to import fcntl import termios if rows and cols: winsize = struct.pack("HHHH", rows, cols, 0, 0) fcntl.ioctl(sys.stdout.fileno(), termios.TIOCSWINSZ, winsize) attrs = termios.tcgetattr(sys.stdin.fileno()) if self.raw_mode: # Raw mode: disable echo and canonical mode for programs that handle # their own terminal I/O (blessed, curses, ucs-detect). This prevents # terminal responses from being echoed back through the PTY. attrs[3] &= ~(termios.ECHO | termios.ICANON) else: # Normal mode: keep ICANON for line editing but disable ECHO. # We sent WONT ECHO so the client does local echo; if the PTY # also echoed, every character would appear twice. attrs[3] &= ~termios.ECHO # Set VERASE to ^H (0x08) since many telnet clients send ^H for backspace # (default PTY ERASE is often ^? which won't work for those clients). attrs[6][termios.VERASE] = 8 # ^H termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, attrs) # Save coverage data before exec replaces the process if child_cov is not None: child_cov.stop() child_cov.save() argv = [self.program] + self.args try: os.execvpe(self.program, argv, env) except OSError as err: self._write_exec_error(exec_err_pipe, err) os._exit(os.EX_OSERR) def _setup_parent(self) -> None: """Parent process setup after fork.""" import fcntl assert self.master_fd is not None flags = fcntl.fcntl(self.master_fd, fcntl.F_GETFL) fcntl.fcntl(self.master_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) self.writer.set_ext_callback(NAWS, self._on_naws) def _on_naws(self, rows: int, cols: int) -> None: """Handle NAWS updates by resizing PTY with debouncing.""" self.writer.protocol.on_naws(rows, cols) self._schedule_naws_update(rows, cols) def _schedule_naws_update(self, rows: int, cols: int) -> None: """Schedule debounced NAWS update to avoid signal storms during rapid resize.""" self._naws_pending = (rows, cols) if self._naws_timer is not None: self._naws_timer.cancel() loop = asyncio.get_event_loop() self._naws_timer = loop.call_later(_NAWS_DEBOUNCE, self._fire_naws_update) def _fire_naws_update(self) -> None: """Fire the pending NAWS update after debounce delay.""" if self._naws_pending is not None: rows, cols = self._naws_pending self._naws_pending = None self._naws_timer = None self._set_window_size(rows, cols) def _set_window_size(self, rows: int, cols: int) -> None: """Set PTY window size and send SIGWINCH to child.""" import fcntl import signal import termios if self.master_fd is None or self.child_pid is None: return winsize = struct.pack("HHHH", rows, cols, 0, 0) fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, winsize) try: os.killpg(os.getpgid(self.child_pid), signal.SIGWINCH) except ProcessLookupError: pass async def run(self) -> None: """Bridge loop between telnet and PTY.""" import errno loop = asyncio.get_running_loop() pty_read_event = asyncio.Event() pty_data_queue: asyncio.Queue[bytes] = asyncio.Queue() assert self.child_pid is not None assert self.master_fd is not None pid, _ = os.waitpid(self.child_pid, os.WNOHANG) if pid: return master_fd = self.master_fd def pty_readable() -> None: """Callback when PTY has data to read.""" # Drain available data to reduce tearing, but cap at 256KB to avoid # buffering forever on continuous output (e.g., cat large_file) chunks: list[bytes] = [] total = 0 max_batch = 262144 # 256KB while total < max_batch: try: data = os.read(master_fd, 65536) if data: chunks.append(data) total += len(data) else: self._closing = True break except OSError as e: if e.errno == errno.EAGAIN: break # No more data available if e.errno == errno.EIO: self._closing = True break logger.debug("PTY read error: %s", e) self._closing = True break if chunks: pty_data_queue.put_nowait(b"".join(chunks)) pty_read_event.set() loop.add_reader(master_fd, pty_readable) try: await self._bridge_loop(pty_read_event, pty_data_queue) finally: try: loop.remove_reader(master_fd) except (ValueError, KeyError): pass async def _bridge_loop( self, pty_read_event: asyncio.Event, pty_data_queue: asyncio.Queue[bytes] ) -> None: """Main bridge loop transferring data between telnet and PTY.""" while not self._closing and not self.writer.is_closing(): telnet_task: asyncio.Task[Union[bytes, str]] = asyncio.create_task( self.reader.read(4096) ) pty_task: asyncio.Task[bool] = asyncio.create_task(pty_read_event.wait()) done, pending = await asyncio.wait( {telnet_task, pty_task}, return_when=asyncio.FIRST_COMPLETED ) for task in pending: task.cancel() try: await task except asyncio.CancelledError: pass try: if telnet_task in done: telnet_data = telnet_task.result() if telnet_data: logger.log( 5, "telnet->pty: %r", ( telnet_data[:200] if isinstance(telnet_data, (bytes, str)) else telnet_data ), ) self._write_to_pty(telnet_data) else: self._closing = True continue if pty_task in done: pty_task.result() while not pty_data_queue.empty(): pty_data = pty_data_queue.get_nowait() self._write_to_telnet(pty_data) # EAGAIN was hit - flush any remaining partial line self._flush_remaining() pty_read_event.clear() except Exception as e: logger.debug("bridge loop error: %s", e) self._closing = True break def _write_to_pty(self, data: Union[str, bytes]) -> None: """ Write data from telnet to PTY. Translates DEL (0x7F) to ``^H`` (0x08) so that both backspace encodings work with the PTY's VERASE setting (``^H``). """ if self.master_fd is None: return if isinstance(data, str): if hasattr(self.writer, "fn_encoding"): charset = self.writer.fn_encoding(incoming=True) else: charset = self.writer.get_extra_info("charset") or "utf-8" data = data.encode(charset, errors="replace") data = data.replace(b"\x7f", b"\x08") try: os.write(self.master_fd, data) except OSError: self._closing = True def _write_to_telnet(self, data: bytes) -> None: """Write data from PTY to telnet, respecting synchronized update boundaries.""" if self._ga_timer is not None: self._ga_timer.cancel() self._ga_timer = None self._output_buffer += data # Process buffer, flushing on ESU or newline boundaries while True: if self._in_sync_update: # Look for End Synchronized Update esu_pos = self._output_buffer.find(_ESU) if esu_pos != -1: # Flush up to and including ESU end = esu_pos + len(_ESU) self._flush_output(self._output_buffer[:end]) self._output_buffer = self._output_buffer[end:] self._in_sync_update = False else: # Still waiting for ESU, but flush if buffer too large if len(self._output_buffer) > 262144: # 256KB safety limit self._flush_output(self._output_buffer) self._output_buffer = b"" break else: # Look for Begin Synchronized Update bsu_pos = self._output_buffer.find(_BSU) if bsu_pos != -1: # Flush everything before BSU (up to last newline if any) if bsu_pos > 0: self._flush_output(self._output_buffer[:bsu_pos]) self._output_buffer = self._output_buffer[bsu_pos:] self._in_sync_update = True else: # Flush up to and including last newline for line-oriented output nl_pos = self._output_buffer.rfind(b"\n") if nl_pos != -1: end = nl_pos + 1 self._flush_output(self._output_buffer[:end]) self._output_buffer = self._output_buffer[end:] # Keep any partial line in buffer (will flush on next newline, # next sync boundary, or when more data arrives with EAGAIN) break def _flush_output(self, data: bytes, final: bool = False) -> None: """Send data to telnet client using incremental decoder.""" if not data: return if hasattr(self.writer, "fn_encoding"): charset = self.writer.fn_encoding(outgoing=True) else: charset = self.writer.get_extra_info("charset") or "utf-8" # Get or create incremental decoder, recreating if charset changed if self._decoder is None or self._decoder_charset != charset: self._decoder = codecs.getincrementaldecoder(charset)(errors="replace") self._decoder_charset = charset # Decode using incremental decoder - it buffers incomplete sequences text = self._decoder.decode(data, final) if text: cast(TelnetWriterUnicode, self.writer).write(text) self._schedule_ga() def _flush_remaining(self) -> None: """Flush remaining buffer after EAGAIN (partial lines, prompts, etc.).""" if self._output_buffer and not self._in_sync_update: logger.log(5, "flush_remaining: %r", self._output_buffer[:200]) self._flush_output(self._output_buffer) self._output_buffer = b"" self._schedule_ga() def _schedule_ga(self) -> None: """Schedule IAC GA after 500ms idle, for clients that refuse SGA.""" if self._ga_timer is not None: self._ga_timer.cancel() self._ga_timer = None if self.raw_mode: return if self.writer.remote_option.get(SGA, False): return if getattr(self.writer.protocol, "never_send_ga", False): return loop = asyncio.get_event_loop() self._ga_timer = loop.call_later(_GA_IDLE, self._fire_ga) def _fire_ga(self) -> None: """Send IAC GA if writer is still open.""" self._ga_timer = None if not self.writer.is_closing(): self.writer.send_ga() def _isalive(self) -> bool: """Check if child process is still running.""" if self.child_pid is None: return False try: pid, _status = os.waitpid(self.child_pid, os.WNOHANG) return pid == 0 except ChildProcessError: return False def _terminate(self, force: bool = False) -> bool: """ Terminate child with signal escalation (ptyprocess pattern). Tries SIGHUP, SIGCONT, SIGINT in sequence. If force=True, also tries SIGKILL. :param force: If True, use SIGKILL as last resort. :returns: True if child was terminated, False otherwise. """ import signal if not self._isalive(): return True assert self.child_pid is not None signals = [signal.SIGHUP, signal.SIGCONT, signal.SIGINT] if force: signals.append(signal.SIGKILL) for sig in signals: try: os.kill(self.child_pid, sig) except ProcessLookupError: return True time.sleep(_TERMINATE_DELAY) if not self._isalive(): return True return not self._isalive() def cleanup(self) -> None: """Kill child process and close PTY fd.""" # Cancel any pending timers if self._ga_timer is not None: self._ga_timer.cancel() self._ga_timer = None if self._naws_timer is not None: self._naws_timer.cancel() self._naws_timer = None self._naws_pending = None # Flush any remaining output buffer with final=True to emit buffered bytes if self._output_buffer: self._flush_output(self._output_buffer, final=True) self._output_buffer = b"" if self.master_fd is not None: try: os.close(self.master_fd) except OSError: pass self.master_fd = None self.exit_code = None if self.child_pid is not None: self._terminate(force=True) try: _, status = os.waitpid(self.child_pid, os.WNOHANG) if os.WIFEXITED(status): self.exit_code = os.WEXITSTATUS(status) except ChildProcessError: pass self.child_pid = None async def _wait_for_terminal_info( writer: Union[TelnetWriter, TelnetWriterUnicode], timeout: float = 2.0 ) -> None: """ Wait for TERM and window size to be negotiated. :param writer: TelnetWriter instance. :param timeout: Maximum time to wait in seconds. """ loop = asyncio.get_running_loop() start = loop.time() while loop.time() - start < timeout: term = writer.get_extra_info("TERM") rows = writer.get_extra_info("rows") if term and rows: return await asyncio.sleep(_TERMINAL_INFO_POLL)
[docs] async def pty_shell( reader: Union[TelnetReader, TelnetReaderUnicode], writer: Union[TelnetWriter, TelnetWriterUnicode], program: str, args: Optional[List[str]] = None, preexec_fn: Optional[Callable[[], None]] = None, raw_mode: bool = False, ) -> Optional[int]: """ PTY shell callback for telnet server. :param reader: TelnetReader instance. :param writer: TelnetWriter instance. :param program: Path to program to execute. :param args: List of arguments for the program. :param preexec_fn: Optional callable to run in child before exec. :param raw_mode: If True, disable PTY echo and canonical mode. Use for programs that handle their own terminal I/O (e.g., blessed, curses, ucs-detect). :returns: Child process exit code, or ``None`` if unknown. """ _platform_check() await _wait_for_terminal_info(writer, timeout=2.0) # Echo handling depends on raw_mode: # - Normal mode: Send WONT ECHO so client does local echo, PTY handles # echo with proper ONLCR translation (\n -> \r\n) for input() display. # - Raw mode: Keep WILL ECHO so client doesn't local-echo, but PTY echo # is disabled. This prevents terminal responses (CPR, etc.) from being # echoed back. The program handles its own output. if not raw_mode and writer.will_echo: writer.iac(WONT, ECHO) await writer.drain() session = PTYSession(reader, writer, program, args, preexec_fn=preexec_fn, raw_mode=raw_mode) try: session.start() await session.run() finally: session.cleanup() writer.close() return session.exit_code
[docs] def make_pty_shell( program: str, args: Optional[List[str]] = None, preexec_fn: Optional[Callable[[], None]] = None, raw_mode: bool = False, ) -> Callable[ [Union[TelnetReader, TelnetReaderUnicode], Union[TelnetWriter, TelnetWriterUnicode]], Awaitable[None], ]: """ Factory returning a shell callback for PTY execution. :param program: Path to program to execute. :param args: List of arguments for the program. :param preexec_fn: Optional callable to run in child before exec. Useful for test coverage tracking in the forked child process. :param raw_mode: If True, disable PTY echo and canonical mode. Use for programs that handle their own terminal I/O (e.g., blessed, curses, ucs-detect). :returns: Async shell callback suitable for use with create_server(). Example usage:: from telnetlib3 import create_server, make_pty_shell server = await create_server( host='localhost', port=6023, shell=make_pty_shell('/bin/bash', ['-l']) ) """ async def shell( reader: Union[TelnetReader, TelnetReaderUnicode], writer: Union[TelnetWriter, TelnetWriterUnicode], ) -> None: await pty_shell(reader, writer, program, args, preexec_fn=preexec_fn, raw_mode=raw_mode) return shell