# std imports
import collections
import contextlib
import logging
import asyncio
import sys
# local
from . import accessories
__all__ = ("telnet_client_shell",)
# TODO: needs 'wait_for' implementation (see DESIGN.rst)
# task = telnet_writer.wait_for(lambda: telnet_writer.local_mode[ECHO] == True)
if sys.platform == "win32":
async def telnet_client_shell(telnet_reader, telnet_writer):
raise NotImplementedError(
"win32 not yet supported as telnet client. Please contribute!"
)
else:
import termios
import os
@contextlib.contextmanager
def _set_tty(fobj, tty_func):
"""
return context manager for manipulating stdin tty state.
if stdin is not attached to a terminal, no action is performed
before or after yielding.
"""
class Terminal(object):
"""
Context manager that yields (sys.stdin, sys.stdout) for POSIX systems.
When sys.stdin is a attached to a terminal, it is configured for
the matching telnet modes negotiated for the given telnet_writer.
"""
ModeDef = collections.namedtuple(
"mode", ["iflag", "oflag", "cflag", "lflag", "ispeed", "ospeed", "cc"]
)
def __init__(self, telnet_writer):
self.telnet_writer = telnet_writer
self._fileno = sys.stdin.fileno()
self._istty = os.path.sameopenfile(0, 1)
def __enter__(self):
self._save_mode = self.get_mode()
if self._istty:
self.set_mode(self.determine_mode(self._save_mode))
return self
def __exit__(self, *_):
if self._istty:
termios.tcsetattr(
self._fileno, termios.TCSAFLUSH, list(self._save_mode)
)
def get_mode(self):
if self._istty:
return self.ModeDef(*termios.tcgetattr(self._fileno))
def set_mode(self, mode):
termios.tcsetattr(sys.stdin.fileno(), termios.TCSAFLUSH, list(mode))
def determine_mode(self, mode):
"""
Return copy of 'mode' with changes suggested for telnet connection.
"""
from telnetlib3.telopt import ECHO
if not self.telnet_writer.will_echo:
# return mode as-is
self.telnet_writer.log.debug("local echo, linemode")
return mode
self.telnet_writer.log.debug("server echo, kludge mode")
# "Raw mode", see tty.py function setraw. This allows sending
# of ^J, ^C, ^S, ^\, and others, which might otherwise
# interrupt with signals or map to another character. We also
# trust the remote server to manage CR/LF without mapping.
#
iflag = mode.iflag & ~(
termios.BRKINT
| termios.ICRNL # Do not send INTR signal on break
| termios.INPCK # Do not map CR to NL on input
| termios.ISTRIP # Disable input parity checking
| termios.IXON # Do not strip input characters to 7 bits
) # Disable START/STOP output control
# Disable parity generation and detection,
# Select eight bits per byte character size.
cflag = mode.cflag & ~(termios.CSIZE | termios.PARENB)
cflag = cflag | termios.CS8
# Disable canonical input (^H and ^C processing),
# disable any other special control characters,
# disable checking for INTR, QUIT, and SUSP input.
lflag = mode.lflag & ~(
termios.ICANON | termios.IEXTEN | termios.ISIG | termios.ECHO
)
# Disable post-output processing,
# such as mapping LF('\n') to CRLF('\r\n') in output.
oflag = mode.oflag & ~(termios.OPOST | termios.ONLCR)
# "A pending read is not satisfied until MIN bytes are received
# (i.e., the pending read until MIN bytes are received), or a
# signal is received. A program that uses this case to read
# record-based terminal I/O may block indefinitely in the read
# operation."
cc = list(mode.cc)
cc[termios.VMIN] = 1
cc[termios.VTIME] = 0
return self.ModeDef(
iflag=iflag,
oflag=oflag,
cflag=cflag,
lflag=lflag,
ispeed=mode.ispeed,
ospeed=mode.ospeed,
cc=cc,
)
async def make_stdio(self):
"""
Return (reader, writer) pair for sys.stdin, sys.stdout.
This method is a coroutine.
"""
reader = asyncio.StreamReader()
reader_protocol = asyncio.StreamReaderProtocol(reader)
# Thanks:
#
# https://gist.github.com/nathan-hoad/8966377
#
# After some experimentation, this 'sameopenfile' conditional seems
# allow us to handle stdin as a pipe or a keyboard. In the case of
# a tty, 0 and 1 are the same open file, we use:
#
# https://github.com/orochimarufan/.files/blob/master/bin/mpr
write_fobj = sys.stdout
if self._istty:
write_fobj = sys.stdin
loop = asyncio.get_event_loop_policy().get_event_loop()
writer_transport, writer_protocol = await loop.connect_write_pipe(
asyncio.streams.FlowControlMixin, write_fobj
)
writer = asyncio.StreamWriter(writer_transport, writer_protocol, None, loop)
await loop.connect_read_pipe(lambda: reader_protocol, sys.stdin)
return reader, writer
[docs] async def telnet_client_shell(telnet_reader, telnet_writer):
"""
Minimal telnet client shell for POSIX terminals.
This shell performs minimal tty mode handling when a terminal is
attached to standard in (keyboard), notably raw mode is often set
and this shell may exit only by disconnect from server, or the
escape character, ^].
stdin or stdout may also be a pipe or file, behaving much like nc(1).
"""
keyboard_escape = "\x1d"
with Terminal(telnet_writer=telnet_writer) as term:
linesep = "\n"
if term._istty and telnet_writer.will_echo:
linesep = "\r\n"
stdin, stdout = await term.make_stdio()
stdout.write(
"Escape character is '{escape}'.{linesep}".format(
escape=accessories.name_unicode(keyboard_escape), linesep=linesep
).encode()
)
stdin_task = accessories.make_reader_task(stdin)
telnet_task = accessories.make_reader_task(telnet_reader)
wait_for = set([stdin_task, telnet_task])
while wait_for:
done, pending = await asyncio.wait(
wait_for, return_when=asyncio.FIRST_COMPLETED
)
task = done.pop()
wait_for.remove(task)
telnet_writer.log.debug("task=%s, wait_for=%s", task, wait_for)
# client input
if task == stdin_task:
inp = task.result()
if inp:
if keyboard_escape in inp.decode():
# on ^], close connection to remote host
telnet_task.cancel()
wait_for.remove(telnet_task)
stdout.write(
"\033[m{linesep}Connection closed.{linesep}".format(
linesep=linesep
).encode()
)
else:
telnet_writer.write(inp.decode())
stdin_task = accessories.make_reader_task(stdin)
wait_for.add(stdin_task)
else:
telnet_writer.log.debug("EOF from client stdin")
# server output
if task == telnet_task:
out = task.result()
# TODO: We should not require to check for '_eof' value,
# but for some systems, htc.zapto.org, it is required,
# where b'' is received even though connection is on?.
if not out and telnet_reader._eof:
if stdin_task in wait_for:
stdin_task.cancel()
wait_for.remove(stdin_task)
stdout.write(
(
"\033[m{linesep}Connection closed "
"by foreign host.{linesep}"
)
.format(linesep=linesep)
.encode()
)
else:
stdout.write(out.encode() or b":?!?:")
telnet_task = accessories.make_reader_task(telnet_reader)
wait_for.add(telnet_task)