Source code for telnetlib3.client_shell

# std imports
import collections
import contextlib
import logging
import asyncio
import sys

# local
from . import accessories

__all__ = ('telnet_client_shell', )


# TODO: needs 'wait_for' implementation (see DESIGN.rst)
# task = telnet_writer.wait_for(lambda: telnet_writer.local_mode[ECHO] == True)

if sys.platform == 'win32':
    @asyncio.coroutine
    def telnet_client_shell(telnet_reader, telnet_writer):
        raise NotImplementedError(
            'win32 not yet supported as telnet client. Please contribute!')

else:
    import termios
    import os

    @contextlib.contextmanager
    def _set_tty(fobj, tty_func):
        """
        return context manager for manipulating stdin tty state.

        if stdin is not attached to a terminal, no action is performed
        before or after yielding.
        """

    class Terminal(object):
        """
        Context manager that yields (sys.stdin, sys.stdout) for POSIX systems.

        When sys.stdin is a attached to a terminal, it is configured for
        the matching telnet modes negotiated for the given telnet_writer.
        """
        ModeDef = collections.namedtuple(
            'mode', ['iflag', 'oflag', 'cflag', 'lflag',
                     'ispeed', 'ospeed', 'cc'])

        def __init__(self, telnet_writer, loop=None):
            self.telnet_writer = telnet_writer
            self.loop = loop or asyncio.get_event_loop()
            self._fileno = sys.stdin.fileno()
            self._istty = os.path.sameopenfile(0, 1)

        def __enter__(self):
            self._save_mode = self.get_mode()
            if self._istty:
                self.set_mode(self.determine_mode(self._save_mode))
            return self

        def __exit__(self, *_):
            if self._istty:
                termios.tcsetattr(
                    self._fileno, termios.TCSAFLUSH, list(self._save_mode))

        def get_mode(self):
            if self._istty:
                return self.ModeDef(*termios.tcgetattr(self._fileno))

        def set_mode(self, mode):
            termios.tcsetattr(
                sys.stdin.fileno(), termios.TCSAFLUSH, list(mode))

        def determine_mode(self, mode):
            """
            Return copy of 'mode' with changes suggested for telnet connection.
            """
            from telnetlib3.telopt import ECHO

            if not self.telnet_writer.will_echo:
                # return mode as-is
                self.telnet_writer.log.debug('local echo, linemode')
                return mode
            self.telnet_writer.log.debug('server echo, kludge mode')

            # "Raw mode", see tty.py function setraw.  This allows sending
            # of ^J, ^C, ^S, ^\, and others, which might otherwise
            # interrupt with signals or map to another character.  We also
            # trust the remote server to manage CR/LF without mapping.
            #
            iflag = mode.iflag & ~(
                termios.BRKINT |  # Do not send INTR signal on break
                termios.ICRNL  |  # Do not map CR to NL on input
                termios.INPCK  |  # Disable input parity checking
                termios.ISTRIP |  # Do not strip input characters to 7 bits
                termios.IXON)     # Disable START/STOP output control

            # Disable parity generation and detection,
            # Select eight bits per byte character size.
            cflag = mode.cflag & ~(termios.CSIZE | termios.PARENB)
            cflag = cflag | termios.CS8

            # Disable canonical input (^H and ^C processing),
            # disable any other special control characters,
            # disable checking for INTR, QUIT, and SUSP input.
            lflag = mode.lflag & ~(
                termios.ICANON | termios.IEXTEN | termios.ISIG | termios.ECHO)

            # Disable post-output processing,
            # such as mapping LF('\n') to CRLF('\r\n') in output.
            oflag = mode.oflag & ~(termios.OPOST | termios.ONLCR)

            # "A pending read is not satisfied until MIN bytes are received
            #  (i.e., the pending read until MIN bytes are received), or a
            #  signal is received.  A program that uses this case to read
            #  record-based terminal I/O may block indefinitely in the read
            #  operation."
            cc = list(mode.cc)
            cc[termios.VMIN] = 1
            cc[termios.VTIME] = 0

            return self.ModeDef(
                iflag=iflag, oflag=oflag, cflag=cflag, lflag=lflag,
                ispeed=mode.ispeed, ospeed=mode.ospeed, cc=cc)

        @asyncio.coroutine
        def make_stdio(self):
            """
            Return (reader, writer) pair for sys.stdin, sys.stdout.

            This method is a coroutine.
            """
            reader = asyncio.StreamReader()
            reader_protocol = asyncio.StreamReaderProtocol(reader)

            # Thanks:
            #
            #   https://gist.github.com/nathan-hoad/8966377
            #
            # After some experimentation, this 'sameopenfile' conditional seems
            # allow us to handle stdin as a pipe or a keyboard.  In the case of
            # a tty, 0 and 1 are the same open file, we use:
            #
            #    https://github.com/orochimarufan/.files/blob/master/bin/mpr
            write_fobj = sys.stdout
            if self._istty:
                write_fobj = sys.stdin

            writer_transport, writer_protocol = yield from (
                self.loop.connect_write_pipe(
                    asyncio.streams.FlowControlMixin, write_fobj))

            writer = asyncio.StreamWriter(
                writer_transport, writer_protocol, None, self.loop)

            yield from self.loop.connect_read_pipe(
                lambda: reader_protocol, sys.stdin)

            return reader, writer

[docs] @asyncio.coroutine def telnet_client_shell(telnet_reader, telnet_writer): """ Minimal telnet client shell for POSIX terminals. This shell performs minimal tty mode handling when a terminal is attached to standard in (keyboard), notably raw mode is often set and this shell may exit only by disconnect from server, or the escape character, ^]. stdin or stdout may also be a pipe or file, behaving much like nc(1). This function is a :func:`~asyncio.coroutine`. """ loop = asyncio.get_event_loop() keyboard_escape = '\x1d' with Terminal(telnet_writer=telnet_writer, loop=loop) as term: linesep = '\n' if term._istty and telnet_writer.will_echo: linesep = '\r\n' stdin, stdout = yield from term.make_stdio() stdout.write("Escape character is '{escape}'.{linesep}".format( escape=accessories.name_unicode(keyboard_escape), linesep=linesep).encode()) stdin_task = accessories.make_reader_task(stdin) telnet_task = accessories.make_reader_task(telnet_reader) wait_for = set([stdin_task, telnet_task]) while wait_for: done, pending = yield from asyncio.wait( wait_for, return_when=asyncio.FIRST_COMPLETED) task = done.pop() wait_for.remove(task) telnet_writer.log.debug('task=%s, wait_for=%s', task, wait_for) # client input if task == stdin_task: inp = task.result() if inp: if keyboard_escape in inp.decode(): # on ^], close connection to remote host telnet_task.cancel() wait_for.remove(telnet_task) stdout.write( "\033[m{linesep}Connection closed.{linesep}" .format(linesep=linesep).encode()) else: telnet_writer.write(inp.decode()) stdin_task = accessories.make_reader_task(stdin) wait_for.add(stdin_task) else: telnet_writer.log.debug('EOF from client stdin') # server output if task == telnet_task: out = task.result() # TODO: We should not require to check for '_eof' value, # but for some systems, htc.zapto.org, it is required, # where b'' is received even though connection is on?. if not out and telnet_reader._eof: if stdin_task in wait_for: stdin_task.cancel() wait_for.remove(stdin_task) stdout.write(("\033[m{linesep}Connection closed " "by foreign host.{linesep}").format( linesep=linesep).encode()) else: stdout.write(out.encode() or b':?!?:') telnet_task = accessories.make_reader_task( telnet_reader) wait_for.add(telnet_task)