Source code for telnetlib3.stream_writer

"""Module provides :class:`TelnetWriter` and :class:`TelnetWriterUnicode`."""
# std imports
import asyncio
import collections
import logging
import struct

# local imports
from . import slc
from .telopt import (
    ABORT,
    ACCEPTED,
    AO,
    AYT,
    BINARY,
    BRK,
    CHARSET,
    CMD_EOR,
    COM_PORT_OPTION,
    DM,
    DO,
    DONT,
    EC,
    ECHO,
    EL,
    EOF,
    EOR,
    ESC,
    GA,
    GMCP,
    IAC,
    INFO,
    IP,
    IS,
    LFLOW,
    LFLOW_OFF,
    LFLOW_ON,
    LFLOW_RESTART_ANY,
    LFLOW_RESTART_XON,
    LINEMODE,
    LOGOUT,
    NAWS,
    NEW_ENVIRON,
    NOP,
    REJECTED,
    REQUEST,
    SB,
    SE,
    SEND,
    SGA,
    SNDLOC,
    STATUS,
    SUSP,
    TM,
    TSPEED,
    TTABLE_ACK,
    TTABLE_NAK,
    TTABLE_IS,
    TTABLE_REJECTED,
    TTYPE,
    USERVAR,
    VALUE,
    VAR,
    WILL,
    WONT,
    XDISPLOC,
    name_command,
    name_commands,
    theNULL,
)


__all__ = (
    "TelnetWriter",
    "TelnetWriterUnicode",
)


[docs]class TelnetWriter: """ This is a copy of :class:`asyncio.StreamWriter`, except that it is a Telnet IAC Interpreter implementing the telnet protocol. """ #: Total bytes sent to :meth:`~.feed_byte` byte_count = 0 #: Whether flow control is enabled. lflow = True #: Whether flow control enabled by Transmit-Off (XOFF) (Ctrl-s), should #: re-enable Transmit-On (XON) only on receipt of XON (Ctrl-q). When #: False, any keypress from client re-enables transmission. xon_any = False #: Whether the last byte received by :meth:`~.feed_byte` is the beginning #: of an IAC command. iac_received = None #: Whether the last byte received by :meth:`~.feed_byte` begins an IAC #: command sequence. cmd_received = None #: Whether the last byte received by :meth:`~.feed_byte` is a matching #: special line character value, if negotiated. slc_received = None #: SLC function values and callbacks are fired for clients in Kludge #: mode not otherwise capable of negotiating LINEMODE, providing #: transport remote editing function callbacks for dumb clients. slc_simulated = True default_slc_tab = slc.BSD_SLC_TAB #: Initial line mode requested by server if client supports LINEMODE #: negotiation (remote line editing and literal echo of control chars) default_linemode = slc.Linemode( bytes([ord(slc.LMODE_MODE_REMOTE) | ord(slc.LMODE_MODE_LIT_ECHO)]) ) def __init__( self, transport, protocol, *, client=False, server=False, reader=None, ): """ Almost all negotiation actions are performed through the writer interface, as any action requires writing bytes to the underling stream. This class implements :meth:`~.feed_byte`, which acts as a Telnet *Is-A-Command* (IAC) interpreter. The significance of the last byte passed to this method is tested by instance attribute :attr:`~.is_oob`, following the call to :meth:`~.feed_byte` to determine whether the given byte is in or out of band. A minimal Telnet Protocol method, :meth:`asyncio.Protocol.data_received`, should forward each byte to :meth:`~.feed_byte`, which returns True to indicate the given byte should be forwarded to a Protocol reader method. :param bool client: Whether the IAC interpreter should react from the client point of view. :param bool server: Whether the IAC interpreter should react from the server point of view. """ self._transport = transport self._protocol = protocol # drain() expects that the reader has an exception() method if reader is not None and not callable(getattr(reader, "exception")): raise TypeError( "reader must provide 'exception' method, like " "asyncio.StreamReader.exception, got", reader, ) self._reader = reader self._loop = asyncio.get_event_loop_policy().get_event_loop() self._complete_fut = self._loop.create_future() self._complete_fut.set_result(None) if not any((client, server)) or all((client, server)): raise TypeError( "keyword arguments `client', and `server' are mutually exclusive." ) self._server = server self.log = logging.getLogger(__name__) #: Dictionary of telnet option byte(s) that follow an #: IAC-DO or IAC-DONT command, and contains a value of ``True`` #: until IAC-WILL or IAC-WONT has been received by remote end. self.pending_option = Option("pending_option", self.log) #: Dictionary of telnet option byte(s) that follow an #: IAC-WILL or IAC-WONT command, sent by our end, #: indicating state of local capabilities. self.local_option = Option("local_option", self.log) #: Dictionary of telnet option byte(s) that follow an #: IAC-WILL or IAC-WONT command received by remote end, #: indicating state of remote capabilities. self.remote_option = Option("remote_option", self.log) #: Sub-negotiation buffer self._sb_buffer = collections.deque() #: SLC buffer self._slc_buffer = collections.deque() #: SLC Tab (SLC Functions and their support level, and ascii value) self.slctab = slc.generate_slctab(self.default_slc_tab) #: Represents LINEMODE MODE negotiated or requested by client. #: attribute ``ack`` returns True if it is in use. self._linemode = slc.Linemode() self._connection_closed = False # Set default callback handlers to local methods. A base protocol # wishing not to wire any callbacks at all may simply allow our stream # to gracefully log and do nothing about in most cases. self._iac_callback = {} for iac_cmd, key in ( (BRK, "brk"), (IP, "ip"), (AO, "ao"), (AYT, "ayt"), (EC, "ec"), (EL, "el"), (EOF, "eof"), (SUSP, "susp"), (ABORT, "abort"), (NOP, "nop"), (DM, "dm"), (GA, "ga"), (CMD_EOR, "eor"), (TM, "tm"), ): self.set_iac_callback( cmd=iac_cmd, func=getattr(self, "handle_{}".format(key)) ) self._slc_callback = {} for slc_cmd, key in ( (slc.SLC_SYNCH, "dm"), (slc.SLC_BRK, "brk"), (slc.SLC_IP, "ip"), (slc.SLC_AO, "ao"), (slc.SLC_AYT, "ayt"), (slc.SLC_EOR, "eor"), (slc.SLC_ABORT, "abort"), (slc.SLC_EOF, "eof"), (slc.SLC_SUSP, "susp"), (slc.SLC_EC, "ec"), (slc.SLC_EL, "el"), (slc.SLC_EW, "ew"), (slc.SLC_RP, "rp"), (slc.SLC_LNEXT, "lnext"), (slc.SLC_XON, "xon"), (slc.SLC_XOFF, "xoff"), ): self.set_slc_callback( slc_byte=slc_cmd, func=getattr(self, "handle_{}".format(key)) ) self._ext_callback = {} for ext_cmd, key in ( (LOGOUT, "logout"), (SNDLOC, "sndloc"), (NAWS, "naws"), (TSPEED, "tspeed"), (TTYPE, "ttype"), (XDISPLOC, "xdisploc"), (NEW_ENVIRON, "environ"), (CHARSET, "charset"), ): self.set_ext_callback( cmd=ext_cmd, func=getattr(self, "handle_{}".format(key)) ) self._ext_send_callback = {} for ext_cmd, key in ( (TTYPE, "ttype"), (TSPEED, "tspeed"), (XDISPLOC, "xdisploc"), (NAWS, "naws"), (SNDLOC, "sndloc"), ): self.set_ext_send_callback( cmd=ext_cmd, func=getattr(self, "handle_send_{}".format(key)) ) for ext_cmd, key in ((CHARSET, "charset"), (NEW_ENVIRON, "environ")): _cbname = "handle_send_server_" if self.server else "handle_send_client_" self.set_ext_send_callback(cmd=ext_cmd, func=getattr(self, _cbname + key)) @property def connection_closed(self): return self._connection_closed # Base protocol methods @property def transport(self): return self._transport
[docs] def close(self): if self.connection_closed: return if self._transport is not None: self._transport.close() # break circular refs self._ext_callback.clear() self._ext_send_callback.clear() self._slc_callback.clear() self._iac_callback.clear() self._protocol = None self._transport = None self._connection_closed = True
[docs] def is_closing(self): if self._transport is not None: if self._transport.is_closing(): return True if self.connection_closed: return True return False
def __repr__(self): """Description of stream encoding state.""" info = ["TelnetWriter"] if self.server: info.append("server") endpoint = "client" else: info.append("client") endpoint = "server" info.append("mode:{self.mode}".format(self=self)) # IAC options info.append("{0}lineflow".format("+" if self.lflow else "-")) info.append("{0}xon_any".format("+" if self.xon_any else "-")) info.append("{0}slc_sim".format("+" if self.slc_simulated else "-")) # IAC negotiation status _failed_reply = sorted( [name_commands(opt) for (opt, val) in self.pending_option.items() if val] ) if _failed_reply: info.append("failed-reply:{opts}".format(opts=",".join(_failed_reply))) _local = sorted( [ name_commands(opt) for (opt, val) in self.local_option.items() if self.local_option.enabled(opt) ] ) if _local: localpoint = "server" if self.server else "client" info.append( "{kind}-will:{opts}".format(kind=localpoint, opts=",".join(_local)) ) _remote = sorted( [ name_commands(opt) for (opt, val) in self.remote_option.items() if self.remote_option.enabled(opt) ] ) if _remote: info.append( "{kind}-will:{opts}".format(kind=endpoint, opts=",".join(_remote)) ) return "<{0}>".format(" ".join(info))
[docs] def write(self, data): """ Write a bytes object to the protocol transport. :rtype: None """ if self.connection_closed: self.log.debug("write after close, ignored %s bytes", len(data)) return self._write(data)
[docs] def writelines(self, lines): """ Write unicode strings to transport. Note that newlines are not added. The sequence can be any iterable object producing strings. This is equivalent to calling write() for each string. """ self.write(b"".join(lines))
[docs] def write_eof(self): return self._transport.write_eof()
[docs] def can_write_eof(self): return self._transport.can_write_eof()
[docs] async def drain(self): """Flush the write buffer. The intended use is to write w.write(data) await w.drain() """ if self._reader is not None: exc = self._reader.exception() if exc is not None: raise exc if self._transport.is_closing(): # Wait for protocol.connection_lost() call # Raise connection closing error if any, # ConnectionResetError otherwise # Yield to the event loop so connection_lost() may be # called. Without this, _drain_helper() would return # immediately, and code that calls # write(...); await drain() # in a loop would never call connection_lost(), so it # would not see an error when the socket is closed. await sleep(0) await self._protocol._drain_helper()
# proprietary write helper
[docs] def feed_byte(self, byte): """ Feed a single byte into Telnet option state machine. :param int byte: an 8-bit byte value as integer (0-255), or a bytes array. When a bytes array, it must be of length 1. :rtype bool: Whether the given ``byte`` is "in band", that is, should be duplicated to a connected terminal or device. ``False`` is returned for an ``IAC`` command for each byte until its completion. """ self.byte_count += 1 self.slc_received = None # list of IAC commands needing 3+ bytes (mbs: multibyte sequence) iac_mbs = (DO, DONT, WILL, WONT, SB) # cmd received is toggled False, unless its a mbs, then it is the # actual command that was received in (opt, byte) form. self.cmd_received = self.cmd_received in iac_mbs and self.cmd_received if byte == IAC: self.iac_received = not self.iac_received if not self.iac_received and self.cmd_received == SB: # SB buffer receives escaped IAC values self._sb_buffer.append(IAC) elif self.iac_received and not self.cmd_received: # parse 2nd byte of IAC self.cmd_received = cmd = byte if cmd not in iac_mbs: # DO, DONT, WILL, WONT are 3-byte commands, expect more. # Any other, expect a callback. Otherwise this protocol # does not comprehend the remote end's request. if cmd not in self._iac_callback: raise ValueError( "IAC {0}({1!r}): not a legal 2-byte cmd".format( name_command(cmd), cmd ) ) self._iac_callback[cmd](cmd) self.iac_received = False elif self.iac_received and self.cmd_received == SB: # parse 2nd byte of IAC while while already within # IAC SB sub-negotiation buffer, assert command is SE. self.cmd_received = cmd = byte if cmd != SE: self.log.error( "sub-negotiation buffer interrupted " "by IAC {}".format(name_command(cmd)) ) self._sb_buffer.clear() else: # sub-negotiation end (SE), fire handle_subnegotiation self.log.debug( "sub-negotiation cmd {} SE completion byte".format( name_command(self._sb_buffer[0]) ) ) try: self.handle_subnegotiation(self._sb_buffer) finally: self._sb_buffer.clear() self.iac_received = False self.iac_received = False elif self.cmd_received == SB: # continue buffering of sub-negotiation command. self._sb_buffer.append(byte) assert len(self._sb_buffer) < (1 << 15) # 32k SB buffer elif self.cmd_received: # parse 3rd and final byte of IAC DO, DONT, WILL, WONT. cmd, opt = self.cmd_received, byte self.log.debug( "recv IAC {} {}".format(name_command(cmd), name_command(opt)) ) try: if cmd == DO: try: self.local_option[opt] = self.handle_do(opt) finally: if self.pending_option.enabled(WILL + opt): self.pending_option[WILL + opt] = False elif cmd == DONT: try: self.handle_dont(opt) finally: self.pending_option[WILL + opt] = False self.local_option[opt] = False elif cmd == WILL: if not self.pending_option.enabled(DO + opt) and opt != TM: self.log.debug("WILL {} unsolicited".format(name_command(opt))) try: self.handle_will(opt) finally: if self.pending_option.enabled(DO + opt): self.pending_option[DO + opt] = False # informed client, 'DONT', client responded with # illegal 'WILL' response, cancel any pending option. # Very unlikely state! if self.pending_option.enabled(DONT + opt): self.pending_option[DONT + opt] = False else: # cmd is 'WONT' self.handle_wont(opt) self.pending_option[DO + opt] = False finally: # toggle iac_received on any ValueErrors/AssertionErrors raised self.iac_received = False self.cmd_received = (opt, byte) elif self.mode == "remote" or self.mode == "kludge" and self.slc_simulated: # 'byte' is tested for SLC characters (callback, slc_name, slc_def) = slc.snoop( byte, self.slctab, self._slc_callback ) # Inform caller which SLC function occurred by this attribute. self.slc_received = slc_name if callback: self.log.debug( "slc.snoop({!r}): {}, callback is {}.".format( byte, slc.name_slc_command(slc_name), callback.__name__ ) ) callback(slc_name) # whether this data should be forwarded (to the reader) return not self.is_oob
# Our protocol methods
[docs] def get_extra_info(self, name, default=None): """Get optional server protocol information.""" # StreamWriter uses self._transport.get_extra_info, so we mix it in # here, but _protocol has all of the interesting telnet effects return self._protocol.get_extra_info( name, default ) or self._transport.get_extra_info(name, default)
@property def protocol(self): """The (Telnet) protocol attached to this stream.""" return self._protocol @property def server(self): """Whether this stream is of the server's point of view.""" return bool(self._server) @property def client(self): """Whether this stream is of the client's point of view.""" return bool(not self._server) @property def inbinary(self): """ Whether binary data is expected to be received on reader, :rfc:`856`. """ return self.remote_option.enabled(BINARY) @property def outbinary(self): """Whether binary data may be written to the writer, :rfc:`856`.""" return self.local_option.enabled(BINARY)
[docs] def echo(self, data): """ Conditionally write ``data`` to transport when "remote echo" enabled. :param bytes data: string received as input, conditionally written. :rtype: None The default implementation depends on telnet negotiation willingness for local echo, only an RFC-compliant telnet client will correctly set or unset echo accordingly by demand. """ assert self.server, "Client never performs echo of input received." if self.will_echo: self.write(data=data)
@property def will_echo(self): """ Whether Server end is expected to echo back input sent by client. From server perspective: the server should echo (duplicate) client input back over the wire, the client is awaiting this data to indicate their input has been received. From client perspective: the server will not echo our input, we should chose to duplicate our input to standard out ourselves. """ return (self.server and self.local_option.enabled(ECHO)) or ( self.client and self.remote_option.enabled(ECHO) ) @property def mode(self): """ String describing NVT mode. :rtype str: One of: ``kludge``: Client acknowledges WILL-ECHO, WILL-SGA. character-at- a-time and remote line editing may be provided. ``local``: Default NVT half-duplex mode, client performs line editing and transmits only after pressing send (usually CR) ``remote``: Client supports advanced remote line editing, using mixed-mode local line buffering (optionally, echoing) until send, but also transmits buffer up to and including special line characters (SLCs). """ if self.remote_option.enabled(LINEMODE): if self._linemode.local: return "local" return "remote" if self.server: if self.local_option.enabled(ECHO) and self.local_option.enabled(SGA): return "kludge" return "local" if self.remote_option.enabled(ECHO) and self.remote_option.enabled(SGA): return "kludge" return "local" @property def is_oob(self): """The previous byte should not be received by the API stream.""" return self.iac_received or self.cmd_received @property def linemode(self): """ Linemode instance for stream. .. note:: value is meaningful after successful LINEMODE negotiation, otherwise does not represent the linemode state of the stream. Attributes of the stream's active linemode may be tested using boolean instance attributes, ``edit``, ``trapsig``, ``soft_tab``, ``lit_echo``, ``remote``, ``local``. """ return self._linemode
[docs] def send_iac(self, buf): """ Send a command starting with IAC (base 10 byte value 255). No transformations of bytes are performed. Normally, if the byte value 255 is sent, it is escaped as ``IAC + IAC``. This method ensures it is not escaped,. """ assert isinstance(buf, (bytes, bytearray)), buf assert buf and buf.startswith(IAC), buf self._transport.write(buf)
[docs] def iac(self, cmd, opt=b""): """ Send Is-A-Command 3-byte negotiation command. Returns True if command was sent. Not all commands are legal in the context of client, server, or pending negotiation state, emitting a relevant debug warning to the log handler if not sent. """ if cmd not in (DO, DONT, WILL, WONT): raise ValueError( "Expected DO, DONT, WILL, WONT, got {0}.".format(name_command(cmd)) ) if cmd == DO and opt not in (TM, LOGOUT): if self.remote_option.enabled(opt): self.log.debug( "skip {} {}; remote_option = True".format( name_command(cmd), name_command(opt) ) ) self.pending_option[cmd + opt] = False return False if cmd in (DO, WILL): if self.pending_option.enabled(cmd + opt): self.log.debug( "skip {} {}; pending_option = True".format( name_command(cmd), name_command(opt) ) ) return False self.pending_option[cmd + opt] = True if cmd == WILL and opt not in (TM,): if self.local_option.enabled(opt): self.log.debug( "skip {} {}; local_option = True".format( name_command(cmd), name_command(opt) ) ) self.pending_option[cmd + opt] = False return False if cmd == DONT and opt not in (LOGOUT,): # IAC-DONT-LOGOUT is not a rejection of the negotiation option if opt in self.remote_option and not self.remote_option.enabled(opt): self.log.debug( "skip {} {}; remote_option = False".format( name_command(cmd), name_command(opt) ) ) return False self.remote_option[opt] = False if cmd == WONT: self.local_option[opt] = False self.log.debug("send IAC {} {}".format(name_command(cmd), name_command(opt))) self.send_iac(IAC + cmd + opt) return True
# Public methods for transmission signaling #
[docs] def send_ga(self): """ Transmit IAC GA (Go-Ahead). Returns True if sent. If IAC-DO-SGA has been received, then False is returned and IAC-GA is not transmitted. """ if self.local_option.enabled(SGA): self.log.debug("cannot send GA with receipt of DO SGA") return False self.log.debug("send IAC GA") self.send_iac(IAC + GA) return True
[docs] def send_eor(self): """ Transmit IAC CMD_EOR (End-of-Record), :rfc:`885`. Returns True if sent. If IAC-DO-EOR has not been received, False is returned and IAC-CMD_EOR is not transmitted. """ if not self.local_option.enabled(EOR): self.log.debug("cannot send CMD_EOR without receipt of DO EOR") return False self.log.debug("send IAC CMD_EOR") self.send_iac(IAC + CMD_EOR) return True
# Public methods for notifying about, or soliciting state options. #
[docs] def request_status(self): """ Send ``IAC-SB-STATUS-SEND`` sub-negotiation (:rfc:`859`). This method may only be called after ``IAC-WILL-STATUS`` has been received. Returns True if status request was sent. """ if not self.remote_option.enabled(STATUS): self.log.debug( "cannot send SB STATUS SEND " "without receipt of WILL STATUS" ) elif not self.pending_option.enabled(SB + STATUS): response = [IAC, SB, STATUS, SEND, IAC, SE] self.log.debug("send IAC SB STATUS SEND IAC SE") self.send_iac(b"".join(response)) self.pending_option[SB + STATUS] = True return True else: self.log.info("cannot send SB STATUS SEND, request pending.") return False
[docs] def request_tspeed(self): """ Send IAC-SB-TSPEED-SEND sub-negotiation, :rfc:`1079`. This method may only be called after ``IAC-WILL-TSPEED`` has been received. Returns True if TSPEED request was sent. """ if not self.remote_option.enabled(TSPEED): self.log.debug( "cannot send SB TSPEED SEND " "without receipt of WILL TSPEED" ) elif not self.pending_option.enabled(SB + TSPEED): self.pending_option[SB + TSPEED] = True response = [IAC, SB, TSPEED, SEND, IAC, SE] self.log.debug("send IAC SB TSPEED SEND IAC SE") self.send_iac(b"".join(response)) self.pending_option[SB + TSPEED] = True return True else: self.log.debug("cannot send SB TSPEED SEND, request pending.") return False
[docs] def request_charset(self): """ Request sub-negotiation CHARSET, :rfc:`2066`. Returns True if request is valid for telnet state, and was sent. The sender requests that all text sent to and by it be encoded in one of character sets specified by string list ``codepages``, which is determined by function value returned by callback registered using :meth:`set_ext_send_callback` with value ``CHARSET``. """ if not self.remote_option.enabled(CHARSET): self.log.debug( "cannot send SB CHARSET REQUEST " "without receipt of WILL CHARSET" ) return False if self.pending_option.enabled(SB + CHARSET): self.log.debug("cannot send SB CHARSET REQUEST, request pending.") return False codepages = self._ext_send_callback[CHARSET]() sep = " " response = collections.deque() response.extend([IAC, SB, CHARSET, REQUEST]) response.extend([bytes(sep, "ascii")]) response.extend([bytes(sep.join(codepages), "ascii")]) response.extend([IAC, SE]) self.log.debug( "send IAC SB CHARSET REQUEST {} IAC SE".format(sep.join(codepages)) ) self.send_iac(b"".join(response)) self.pending_option[SB + CHARSET] = True return True
[docs] def request_environ(self): """ Request sub-negotiation NEW_ENVIRON, :rfc:`1572`. Returns True if request is valid for telnet state, and was sent. """ assert self.server, "SB NEW_ENVIRON SEND may only be sent by server" if not self.remote_option.enabled(NEW_ENVIRON): self.log.debug( "cannot send SB NEW_ENVIRON SEND IS " "without receipt of WILL NEW_ENVIRON" ) return False request_list = self._ext_send_callback[NEW_ENVIRON]() if not request_list: self.log.debug( "request_environ: server protocol makes no demand, " "no request will be made." ) return False if self.pending_option.enabled(SB + NEW_ENVIRON): self.log.debug("cannot send SB NEW_ENVIRON SEND IS, " "request pending.") return False response = collections.deque() response.extend([IAC, SB, NEW_ENVIRON, SEND]) for env_key in request_list: if env_key in (VAR, USERVAR): # VAR followed by IAC,SE indicates "send all the variables", # whereas USERVAR indicates "send all the user variables". # In today's era, there is little distinction between them. response.append(env_key) else: response.extend([VAR]) response.extend([_escape_environ(env_key.encode("ascii"))]) response.extend([IAC, SE]) self.log.debug("request_environ: {!r}".format(b"".join(response))) self.pending_option[SB + NEW_ENVIRON] = True self.send_iac(b"".join(response)) return True
[docs] def request_xdisploc(self): """ Send XDISPLOC, SEND sub-negotiation, :rfc:`1086`. Returns True if request is valid for telnet state, and was sent. """ assert self.server, "SB XDISPLOC SEND may only be sent by server end" if not self.remote_option.enabled(XDISPLOC): self.log.debug( "cannot send SB XDISPLOC SEND" "without receipt of WILL XDISPLOC" ) if not self.pending_option.enabled(SB + XDISPLOC): response = [IAC, SB, XDISPLOC, SEND, IAC, SE] self.log.debug("send IAC SB XDISPLOC SEND IAC SE") self.pending_option[SB + XDISPLOC] = True self.send_iac(b"".join(response)) return True self.log.debug("cannot send SB XDISPLOC SEND, request pending.") return False
[docs] def request_ttype(self): """ Send TTYPE SEND sub-negotiation, :rfc:`930`. Returns True if request is valid for telnet state, and was sent. """ assert self.server, "SB TTYPE SEND may only be sent by server end" if not self.remote_option.enabled(TTYPE): self.log.debug("cannot send SB TTYPE SEND" "without receipt of WILL TTYPE") if not self.pending_option.enabled(SB + TTYPE): response = [IAC, SB, TTYPE, SEND, IAC, SE] self.log.debug("send IAC SB TTYPE SEND IAC SE") self.pending_option[SB + TTYPE] = True self.send_iac(b"".join(response)) return True else: self.log.debug("cannot send SB TTYPE SEND, request pending.") return False
[docs] def request_forwardmask(self, fmask=None): """ Request the client forward their terminal control characters. Characters are indicated in the :class:`~.Forwardmask` instance ``fmask``. When fmask is None, a forwardmask is generated for the SLC characters registered by :attr:`~.slctab`. """ assert self.server, "DO FORWARDMASK may only be sent by server end" if not self.remote_option.enabled(LINEMODE): self.log.debug( "cannot send SB LINEMODE DO" "without receipt of WILL LINEMODE" ) else: if fmask is None: opt = SB + LINEMODE + slc.LMODE_FORWARDMASK forwardmask_enabled = ( self.server and self.local_option.get(opt, False) ) or self.remote_option.get(opt, False) fmask = slc.generate_forwardmask( binary_mode=self.local_option.enabled(BINARY), tabset=self.slctab, ack=forwardmask_enabled, ) assert isinstance(fmask, slc.Forwardmask), fmask self.log.debug("send IAC SB LINEMODE DO LMODE_FORWARDMASK::") for maskbit_descr in fmask.description_table(): self.log.debug(" {}".format(maskbit_descr)) self.log.debug("send IAC SE") self.send_iac(IAC + SB + LINEMODE + DO + slc.LMODE_FORWARDMASK) self._transport.write(fmask.value) self.send_iac(IAC + SE) return True return False
[docs] def send_lineflow_mode(self): """Send LFLOW mode sub-negotiation, :rfc:`1372`. Returns True if request is valid for telnet state, and was sent. """ if self.client: self.log.error("only server may send IAC SB LINEFLOW <MODE>") elif not self.remote_option.enabled(LFLOW): self.log.error("cannot send IAC SB LFLOW " "without receipt of WILL LFLOW") else: if self.xon_any: (mode, desc) = (LFLOW_RESTART_ANY, "LFLOW_RESTART_ANY") else: (mode, desc) = (LFLOW_RESTART_XON, "LFLOW_RESTART_XON") self.log.debug("send IAC SB LFLOW {} IAC SE".format(desc)) self.send_iac(b"".join([IAC, SB, LFLOW, mode, IAC, SE])) return True return False
[docs] def send_linemode(self, linemode=None): """ Set and Inform other end to agree to change to linemode, ``linemode``. An instance of the Linemode class, or self.linemode when unset. """ if not ( self.local_option.enabled(LINEMODE) or self.remote_option.enabled(LINEMODE) ): assert False, ( "Cannot send LINEMODE-MODE without first " "(DO, WILL) LINEMODE received." ) if linemode is not None: self.log.debug("set Linemode {0!r}".format(linemode)) self._linemode = linemode self.log.debug( "send IAC SB LINEMODE LINEMODE-MODE {0!r} IAC SE".format(self._linemode) ) self.send_iac(IAC + SB + LINEMODE + slc.LMODE_MODE) self._transport.write(self._linemode.mask) self.send_iac(IAC + SE)
# Public is-a-command (IAC) callbacks #
[docs] def set_iac_callback(self, cmd, func): """ Register callable ``func`` as callback for IAC ``cmd``. BRK, IP, AO, AYT, EC, EL, CMD_EOR, EOF, SUSP, ABORT, and NOP. These callbacks receive a single argument, the IAC ``cmd`` which triggered it. """ assert callable(func), "Argument func must be callable" assert cmd in ( BRK, IP, AO, AYT, EC, EL, CMD_EOR, EOF, SUSP, ABORT, NOP, DM, GA, TM, ), name_command(cmd) self._iac_callback[cmd] = func
[docs] def handle_nop(self, cmd): """Handle IAC No-Operation (NOP).""" self.log.debug("IAC NOP: Null Operation (unhandled).")
[docs] def handle_ga(self, cmd): """Handle IAC Go-Ahead (GA).""" self.log.debug("IAC GA: Go-Ahead (unhandled).")
[docs] def handle_dm(self, cmd): """Handle IAC Data-Mark (DM).""" self.log.debug("IAC DM: Data-Mark (unhandled).")
# Public mixed-mode SLC and IAC callbacks #
[docs] def handle_el(self, byte): """ Handle IAC Erase Line (EL, SLC_EL). Provides a function which discards all the data ready on current line of input. The prompt should be re-displayed. """ self.log.debug("IAC EL: Erase Line (unhandled).")
[docs] def handle_eor(self, byte): """Handle IAC End of Record (CMD_EOR, SLC_EOR).""" self.log.debug("IAC EOR: End of Record (unhandled).")
[docs] def handle_abort(self, byte): """ Handle IAC Abort (ABORT, SLC_ABORT). Similar to Interrupt Process (IP), but means only to abort or terminate the process to which the NVT is connected. """ self.log.debug("IAC ABORT: Abort (unhandled).")
[docs] def handle_eof(self, byte): """Handle IAC End of Record (EOF, SLC_EOF).""" self.log.debug("IAC EOF: End of File (unhandled).")
[docs] def handle_susp(self, byte): """ Handle IAC Suspend Process (SUSP, SLC_SUSP). Suspends the execution of the current process attached to the NVT in such a way that another process will take over control of the NVT, and the suspended process can be resumed at a later time. If the receiving system does not support this functionality, it should be ignored. """ self.log.debug("IAC SUSP: Suspend (unhandled).")
[docs] def handle_brk(self, byte): """ Handle IAC Break (BRK, SLC_BRK). Sent by clients to indicate BREAK keypress. This is not the same as IP (^c), but a means to map sysystem-dependent break key such as found on an IBM Systems. """ self.log.debug("IAC BRK: Break (unhandled).")
[docs] def handle_ayt(self, byte): """ Handle IAC Are You There (AYT, SLC_AYT). Provides the user with some visible (e.g., printable) evidence that the system is still up and running. """ self.log.debug("IAC AYT: Are You There? (unhandled).")
[docs] def handle_ip(self, byte): """Handle IAC Interrupt Process (IP, SLC_IP).""" self.log.debug("IAC IP: Interrupt Process (unhandled).")
[docs] def handle_ao(self, byte): """ Handle IAC Abort Output (AO) or SLC_AO. Discards any remaining output on the transport buffer. [...] a reasonable implementation would be to suppress the remainder of the text string, but transmit the prompt character and the preceding <CR><LF>. """ self.log.debug("IAC AO: Abort Output, unhandled.")
[docs] def handle_ec(self, byte): """ Handle IAC Erase Character (EC, SLC_EC). Provides a function which deletes the last preceding undeleted character from data ready on current line of input. """ self.log.debug("IAC EC: Erase Character (unhandled).")
[docs] def handle_tm(self, cmd): """ Handle IAC (WILL, WONT, DO, DONT) Timing Mark (TM). TM is essentially a NOP that any IAC interpreter must answer, if at least it answers WONT to unknown options (required), it may still be used as a means to accurately measure the "ping" time. """ self.log.debug( "IAC TM: Received {} TM (Timing Mark).".format(name_command(cmd)) )
# public Special Line Mode (SLC) callbacks #
[docs] def set_slc_callback(self, slc_byte, func): """ Register ``func`` as callable for receipt of ``slc_byte``. :param bytes slc_byte: any of SLC_SYNCH, SLC_BRK, SLC_IP, SLC_AO, SLC_AYT, SLC_EOR, SLC_ABORT, SLC_EOF, SLC_SUSP, SLC_EC, SLC_EL, SLC_EW, SLC_RP, SLC_XON, SLC_XOFF ... :param Callable func: These callbacks receive a single argument: the SLC function byte that fired it. Some SLC and IAC functions are intermixed; which signaling mechanism used by client can be tested by evaluating this argument. """ assert callable(func), "Argument func must be callable" assert ( type(slc_byte) == bytes and 0 < ord(slc_byte) < slc.NSLC + 1 ), "Uknown SLC byte: {!r}".format(slc_byte) self._slc_callback[slc_byte] = func
[docs] def handle_ew(self, slc): """ Handle SLC_EW (Erase Word). Provides a function which deletes the last preceding undeleted character, and any subsequent bytes until next whitespace character from data ready on current line of input. """ self.log.debug("SLC EC: Erase Word (unhandled).")
[docs] def handle_rp(self, slc): """Handle SLC Repaint (RP).""" self.log.debug("SLC RP: Repaint (unhandled).")
[docs] def handle_lnext(self, slc): """Handle SLC Literal Next (LNEXT) (Next character is received raw).""" self.log.debug("SLC LNEXT: Literal Next (unhandled)")
[docs] def handle_xon(self, byte): """Handle SLC Transmit-On (XON).""" self.log.debug("SLC XON: Transmit On (unhandled).")
[docs] def handle_xoff(self, byte): """Handle SLC Transmit-Off (XOFF).""" self.log.debug("SLC XOFF: Transmit Off.")
# public Telnet extension callbacks #
[docs] def set_ext_send_callback(self, cmd, func): """ Register callback for inquires of sub-negotiation of ``cmd``. :param Callable func: A callable function for the given ``cmd`` byte. Note that the return type must match those documented. :param bytes cmd: These callbacks must return any number of arguments, for each registered ``cmd`` byte, respectively: * SNDLOC: for clients, returning one argument: the string describing client location, such as ``b'ROOM 641-A'``, :rfc:`779`. * NAWS: for clients, returning two integer arguments (width, height), such as (80, 24), :rfc:`1073`. * TSPEED: for clients, returning two integer arguments (rx, tx) such as (57600, 57600), :rfc:`1079`. * TTYPE: for clients, returning one string, usually the terminfo(5) database capability name, such as 'xterm', :rfc:`1091`. * XDISPLOC: for clients, returning one string, the DISPLAY host value, in form of <host>:<dispnum>[.<screennum>], :rfc:`1096`. * NEW_ENVIRON: for clients, returning a dictionary of (key, val) pairs of environment item values, :rfc:`1408`. * CHARSET: for clients, receiving iterable of strings of character sets requested by server, callback must return one of those strings given, :rfc:`2066`. """ assert cmd in (SNDLOC, NAWS, TSPEED, TTYPE, XDISPLOC, NEW_ENVIRON, CHARSET), cmd assert callable(func), "Argument func must be callable" self._ext_send_callback[cmd] = func
[docs] def set_ext_callback(self, cmd, func): """ Register ``func`` as callback for receipt of ``cmd`` negotiation. :param bytes cmd: One of the following listed bytes: * ``LOGOUT``: for servers and clients, receiving one argument. Server end may receive DO or DONT as argument ``cmd``, indicating client's wish to disconnect, or a response to WILL, LOGOUT, indicating it's wish not to be automatically disconnected. Client end may receive WILL or WONT, indicating server's wish to disconnect, or acknowledgment that the client will not be disconnected. * ``SNDLOC``: for servers, receiving one argument: the string describing the client location, such as ``'ROOM 641-A'``, :rfc:`779`. * ``NAWS``: for servers, receiving two integer arguments (width, height), such as (80, 24), :rfc:`1073`. * ``TSPEED``: for servers, receiving two integer arguments (rx, tx) such as (57600, 57600), :rfc:`1079`. * ``TTYPE``: for servers, receiving one string, usually the terminfo(5) database capability name, such as 'xterm', :rfc:`1091`. * ``XDISPLOC``: for servers, receiving one string, the DISPLAY host value, in form of ``<host>:<dispnum>[.<screennum>]``, :rfc:`1096`. * ``NEW_ENVIRON``: for servers, receiving a dictionary of ``(key, val)`` pairs of remote client environment item values, :rfc:`1408`. * ``CHARSET``: for servers, receiving one string, the character set negotiated by client. :rfc:`2066`. """ assert cmd in ( LOGOUT, SNDLOC, NAWS, TSPEED, TTYPE, XDISPLOC, NEW_ENVIRON, CHARSET, ), cmd assert callable(func), "Argument func must be callable" self._ext_callback[cmd] = func
[docs] def handle_xdisploc(self, xdisploc): """Receive XDISPLAY value ``xdisploc``, :rfc:`1096`.""" # xdisploc string format is '<host>:<dispnum>[.<screennum>]'. self.log.debug("X Display is {}".format(xdisploc))
[docs] def handle_send_xdisploc(self): """Send XDISPLAY value ``xdisploc``, :rfc:`1096`.""" # xdisploc string format is '<host>:<dispnum>[.<screennum>]'. self.log.warning("X Display requested, sending empty string.") return ""
[docs] def handle_sndloc(self, location): """Receive LOCATION value ``location``, :rfc:`779`.""" self.log.debug("Location is {}".format(location))
[docs] def handle_send_sndloc(self): """Send LOCATION value ``location``, :rfc:`779`.""" self.log.warning("Location requested, sending empty response.") return ""
[docs] def handle_ttype(self, ttype): """ Receive TTYPE value ``ttype``, :rfc:`1091`. A string value that represents client's emulation capability. Some example values: VT220, VT100, ANSITERM, ANSI, TTY, and 5250. """ self.log.debug("Terminal type is {!r}".format(ttype))
[docs] def handle_send_ttype(self): """Send TTYPE value ``ttype``, :rfc:`1091`.""" self.log.warning("Terminal type requested, sending empty string.") return ""
[docs] def handle_naws(self, width, height): """Receive window size ``width`` and ``height``, :rfc:`1073`.""" self.log.debug("Terminal cols={}, rows={}".format(width, height))
[docs] def handle_send_naws(self): """Send window size ``width`` and ``height``, :rfc:`1073`.""" self.log.warning("Terminal size requested, sending 80x24.") return 80, 24
[docs] def handle_environ(self, env): """Receive environment variables as dict, :rfc:`1572`.""" self.log.debug("Environment values are {!r}".format(env))
[docs] def handle_send_client_environ(self, keys): """ Send environment variables as dict, :rfc:`1572`. If argument ``keys`` is empty, then all available values should be sent. Otherwise, ``keys`` is a set of environment keys explicitly requested. """ self.log.debug("Environment values requested, sending {{}}.") return dict()
[docs] def handle_send_server_environ(self): """Server requests environment variables as list, :rfc:`1572`.""" self.log.debug("Environment values offered, requesting [].") return []
[docs] def handle_tspeed(self, rx, tx): """Receive terminal speed from TSPEED as int, :rfc:`1079`.""" self.log.debug("Terminal Speed rx:{}, tx:{}".format(rx, tx))
[docs] def handle_send_tspeed(self): """Send terminal speed from TSPEED as int, :rfc:`1079`.""" self.log.debug("Terminal Speed requested, sending 9600,9600.") return 9600, 9600
[docs] def handle_charset(self, charset): """Receive character set as string, :rfc:`2066`.""" self.log.debug("Character set: {}".format(charset))
[docs] def handle_send_client_charset(self, charsets): """ Send character set selection as string, :rfc:`2066`. Given the available encodings presented by the server, select and return only one. Returning an empty string indicates that no selection is made (request is ignored). """ assert not self.server self.log.debug("Character Set requested") return ""
[docs] def handle_send_server_charset(self, charsets): """Send character set (encodings) offered to client, :rfc:`2066`.""" assert self.server return ["UTF-8"]
[docs] def handle_logout(self, cmd): """ Handle (IAC, (DO | DONT | WILL | WONT), LOGOUT), :rfc:`727`. Only the server end may receive (DO, DONT). Only the client end may receive (WILL, WONT). """ # Close the transport on receipt of DO, Reply DONT on receipt # of WILL. Nothing is done on receipt of DONT or WONT LOGOFF. if cmd == DO: assert self.server, (cmd, LOGOUT) self.log.debug("client requests DO LOGOUT") self._transport.close() elif cmd == DONT: assert self.server, (cmd, LOGOUT) self.log.debug("client requests DONT LOGOUT") elif cmd == WILL: assert self.client, (cmd, LOGOUT) self.log.debug("recv WILL TIMEOUT (timeout warning)") self.log.debug("send IAC DONT LOGOUT") self.iac(DONT, LOGOUT) elif cmd == WONT: assert self.client, (cmd, LOGOUT) self.log.debug("recv IAC WONT LOGOUT (server refuses logout")
# public derivable methods DO, DONT, WILL, and WONT negotiation #
[docs] def handle_do(self, opt): """ Process byte 3 of series (IAC, DO, opt) received by remote end. This method can be derived to change or extend protocol capabilities, for most cases, simply returning True if supported, False otherwise. In special cases of various RFC statutes, state is stored and answered in willing affirmative, with the exception of: - DO TM is *always* answered WILL TM, even if it was already replied to. No state is stored ("Timing Mark"), and the IAC callback registered by :meth:`set_ext_callback` for cmd TM is called with argument byte ``DO``. - DO LOGOUT executes extended callback registered by cmd LOGOUT with argument DO (indicating a request for voluntary logoff). - DO STATUS sends state of all local, remote, and pending options. """ # For unsupported capabilities, RFC specifies a response of # (IAC, WONT, opt). Similarly, set ``self.local_option[opt]`` # to ``False``. # # This method returns True if the opt enables the willingness of the # remote end to accept a telnet capability, such as NAWS. It returns # False for unsupported option, or an option invalid in that context, # such as LOGOUT. self.log.debug("handle_do({})".format(name_command(opt))) if opt == ECHO and self.client: # What do we have here? A Telnet Server attempting to # fingerprint us as a broken 4.4BSD Telnet Client, which # would respond 'WILL ECHO'. Let us just reply WONT--some # servers, such as dgamelaunch (nethack.alt.org) freeze up # unless we answer IAC-WONT-ECHO. self.iac(WONT, ECHO) elif self.server and opt in ( LINEMODE, TTYPE, NAWS, NEW_ENVIRON, XDISPLOC, LFLOW, ): raise ValueError( "cannot recv DO {0} on server end (ignored).".format(name_command(opt)) ) elif self.client and opt in (LOGOUT,): raise ValueError( "cannot recv DO {0} on client end (ignored).".format(name_command(opt)) ) elif opt == TM: # timing mark is special: simply by replying, the effect # is accomplished ('will' or 'wont' is non-consequential): # the distant end is able to "time" our response. More # importantly, ensure that the IAC interpreter is, in fact, # interpreting, and, that all IAC commands up to this point # have been processed. self.iac(WILL, TM) self._iac_callback[TM](DO) elif opt == LOGOUT: self._ext_callback[LOGOUT](DO) elif opt in ( ECHO, LINEMODE, BINARY, SGA, LFLOW, EOR, TTYPE, NEW_ENVIRON, XDISPLOC, TSPEED, CHARSET, NAWS, STATUS, GMCP, ): # first time we've agreed, respond accordingly. if not self.local_option.enabled(opt): self.iac(WILL, opt) # and respond with status for some, if opt == NAWS: self._send_naws() elif opt == STATUS: self._send_status() # and expect a follow-up sub-negotiation for these others. elif opt in ( LFLOW, TTYPE, NEW_ENVIRON, XDISPLOC, TSPEED, CHARSET, LINEMODE, ): self.pending_option[SB + opt] = True else: self.log.debug("DO {0} not supported.".format(name_command(opt))) if self.local_option.get(opt, None) is None: self.iac(WONT, opt) return False return True
[docs] def handle_dont(self, opt): """ Process byte 3 of series (IAC, DONT, opt) received by remote end. This only results in ``self.local_option[opt]`` set to ``False``, with the exception of (IAC, DONT, LOGOUT), which only signals a callback to ``handle_logout(DONT)``. """ self.log.debug("handle_dont({})".format(name_command(opt))) if opt == LOGOUT: assert self.server, "cannot recv DONT LOGOUT on server end" self._ext_callback[LOGOUT](DONT)
# many implementations (wrongly!) sent a WONT in reply to DONT. It # sounds reasonable, but it can and will cause telnet loops. (ruby?) # Correctly, a DONT can not be declined, so there is no need to # affirm in the negative.
[docs] def handle_will(self, opt): """ Process byte 3 of series (IAC, DONT, opt) received by remote end. The remote end requests we perform any number of capabilities. Most implementations require an answer in the affirmative with DO, unless DO has meaning specific for only client or server end, and dissenting with DONT. WILL ECHO may only be received *for clients*, answered with DO. WILL NAWS may only be received *for servers*, answered with DO. BINARY and SGA are answered with DO. STATUS, NEW_ENVIRON, XDISPLOC, and TTYPE is answered with sub-negotiation SEND. The env variables requested in response to WILL NEW_ENVIRON is "SEND ANY". All others are replied with DONT. The result of a supported capability is a response of (IAC, DO, opt) and the setting of ``self.remote_option[opt]`` of ``True``. For unsupported capabilities, RFC specifies a response of (IAC, DONT, opt). Similarly, set ``self.remote_option[opt]`` to ``False``. """ self.log.debug("handle_will({})".format(name_command(opt))) if opt in (BINARY, SGA, ECHO, NAWS, LINEMODE, EOR, SNDLOC): if opt == ECHO and self.server: raise ValueError("cannot recv WILL ECHO on server end") elif opt in (NAWS, LINEMODE, SNDLOC) and self.client: raise ValueError( "cannot recv WILL {} on client end".format( name_command(opt), ) ) if not self.remote_option.enabled(opt): self.iac(DO, opt) self.remote_option[opt] = True if opt in (NAWS, LINEMODE, SNDLOC): # expect to receive some sort of follow-up subnegotiation self.pending_option[SB + opt] = True if opt == LINEMODE: # server sets the initial mode and sends forwardmask, self.send_linemode(self.default_linemode) elif opt == TM: if opt == TM and not self.pending_option.enabled(DO + TM): raise ValueError("cannot recv WILL TM, must first send DO TM.") self._iac_callback[TM](WILL) self.remote_option[opt] = True elif opt == LOGOUT: if self.client: raise ValueError("cannot recv WILL LOGOUT on server end") self._ext_callback[LOGOUT](WILL) elif opt == STATUS: # Though unnecessary, if the other end claims support for STATUS, # we put them to the test by requesting their status. self.remote_option[opt] = True self.request_status() elif opt in (XDISPLOC, TTYPE, TSPEED, NEW_ENVIRON, LFLOW, CHARSET): # CHARSET is bi-directional: "WILL CHARSET indicates the sender # REQUESTS permission to, or AGREES to, use CHARSET option # sub-negotiation to choose a character set."; however, the # selected encoding is, regarding SB CHARSET REQUEST, "The sender # requests that all text sent to and by it be encoded in one of the # specified character sets. " # # Though Others -- XDISPLOC, TTYPE, TSPEED, are 1-directional. if not self.server and opt not in (CHARSET,): raise ValueError( "cannot recv WILL {} on client end.".format(name_command(opt)) ) self.remote_option[opt] = True # call one of the following callbacks. { XDISPLOC: self.request_xdisploc, TTYPE: self.request_ttype, TSPEED: self.request_tspeed, CHARSET: self.request_charset, NEW_ENVIRON: self.request_environ, LFLOW: self.send_lineflow_mode, }[opt]() else: # option value of -1 toggles opt.unsupported() self.iac(DONT, opt) self.remote_option[opt] = -1 self.log.warning( "Unhandled: WILL {}.".format( name_command(opt), ) ) self.local_option[opt] = -1 if self.pending_option.enabled(DO + opt): self.pending_option[DO + opt] = False
[docs] def handle_wont(self, opt): """ Process byte 3 of series (IAC, WONT, opt) received by remote end. (IAC, WONT, opt) is a negative acknowledgment of (IAC, DO, opt) sent. The remote end requests we do not perform a telnet capability. It is not possible to decline a WONT. ``T.remote_option[opt]`` is set False to indicate the remote end's refusal to perform ``opt``. """ self.log.debug("handle_wont({})".format(name_command(opt))) if opt == TM and not self.pending_option.enabled(DO + TM): raise ValueError("WONT TM received but DO TM was not sent") elif opt == TM: self.log.debug("WONT TIMING-MARK") self.remote_option[opt] = False elif opt == LOGOUT: assert not (self.server), "cannot recv WONT LOGOUT on server end" if not self.pending_option.enabled(DO + LOGOUT): self.log.warning("Server sent WONT LOGOUT unsolicited") self._ext_callback[LOGOUT](WONT) else: self.remote_option[opt] = False
# public derivable Sub-Negotation parsing #
[docs] def handle_subnegotiation(self, buf): """ Callback for end of sub-negotiation buffer. SB options handled here are TTYPE, XDISPLOC, NEW_ENVIRON, NAWS, and STATUS, and are delegated to their ``handle_`` equivalent methods. Implementors of additional SB options should extend this method. """ if not buf: raise ValueError("SE: buffer empty") if buf[0] == theNULL: raise ValueError("SE: buffer is NUL") if len(buf) == 1: raise ValueError("SE: buffer too short: {!r}".format(buf)) cmd = buf[0] if self.pending_option.enabled(SB + cmd): self.pending_option[SB + cmd] = False else: self.log.debug("[SB + {}] unsolicited".format(name_command(cmd))) fn_call = { LINEMODE: self._handle_sb_linemode, LFLOW: self._handle_sb_lflow, NAWS: self._handle_sb_naws, SNDLOC: self._handle_sb_sndloc, NEW_ENVIRON: self._handle_sb_environ, CHARSET: self._handle_sb_charset, TTYPE: self._handle_sb_ttype, TSPEED: self._handle_sb_tspeed, XDISPLOC: self._handle_sb_xdisploc, STATUS: self._handle_sb_status, COM_PORT_OPTION: self._handle_sb_comport, GMCP: self._handle_sb_gmcp, }.get(cmd) if fn_call is None: raise ValueError( "SB unhandled: cmd={}, buf={!r}".format(name_command(cmd), buf) ) fn_call(buf)
# Our Private API methods @staticmethod def _escape_iac(buf): r"""Replace bytes in buf ``IAC`` (``b'\xff'``) by ``IAC IAC``.""" return buf.replace(IAC, IAC + IAC) def _write(self, buf, escape_iac=True): """ Write bytes to transport, conditionally escaping IAC. :param bytes buf: bytes to write to transport. :param bool escape_iac: whether bytes in buffer ``buf`` should be escaped of byte ``IAC``. This should be set ``False`` for direct writes of ``IAC`` commands. """ if not isinstance(buf, (bytes, bytearray)): raise TypeError("buf expected bytes, got {0}".format(type(buf))) if escape_iac: # when escape_iac is True, we may safely assume downstream # application has provided an encoded string. Prior to 2.0.1, `buf` # was inspected to raise TypeError for any bytes of ordinal value # greater than 127, but it was removed for performance. buf = self._escape_iac(buf) self._transport.write(buf) # Private sub-negotiation (SB) routines def _handle_sb_charset(self, buf): cmd = buf.popleft() assert cmd == CHARSET opt = buf.popleft() if opt == REQUEST: # "<Sep> is a separator octet, the value of which is chosen by the # sender. Examples include a space or a semicolon." sep = buf.popleft() # decode any offered character sets (b'CHAR-SET') # to a python-normalized unicode string ('charset'). offers = [charset.decode("ascii") for charset in b"".join(buf).split(sep)] selected = self._ext_send_callback[CHARSET](offers) if selected is None: self.log.debug("send IAC SB CHARSET REJECTED IAC SE") self.send_iac(IAC + SB + CHARSET + REJECTED + IAC + SE) else: response = collections.deque() response.extend([IAC, SB, CHARSET, ACCEPTED]) response.extend([bytes(selected, "ascii")]) response.extend([IAC, SE]) self.log.debug( "send IAC SB CHARSET ACCEPTED {} IAC SE".format(selected) ) self.send_iac(b"".join(response)) elif opt == ACCEPTED: charset = b"".join(buf).decode("ascii") self.log.debug("recv IAC SB CHARSET ACCEPTED {} IAC SE".format(charset)) self._ext_callback[CHARSET](charset) elif opt == REJECTED: self.log.warning("recv IAC SB CHARSET REJECTED IAC SE") elif opt in (TTABLE_IS, TTABLE_ACK, TTABLE_NAK, TTABLE_REJECTED): raise NotImplementedError( "Translation table command received " "but not supported: {!r}".format(opt) ) else: raise ValueError("Illegal option follows IAC SB CHARSET: {!r}.".format(opt)) def _handle_sb_tspeed(self, buf): """Callback handles IAC-SB-TSPEED-<buf>-SE.""" cmd = buf.popleft() opt = buf.popleft() assert cmd == TSPEED, (cmd, name_command(cmd)) assert opt in (IS, SEND), opt opt_kind = {IS: "IS", SEND: "SEND"}.get(opt) self.log.debug( "recv {} {}: {!r}".format( name_command(cmd), opt_kind, b"".join(buf), ) ) if opt == IS: assert self.server, "SE: cannot recv from server: {} {}".format( name_command(cmd), opt_kind, ) rx, tx = str(), str() while len(buf): value = buf.popleft() if value == b",": break rx += value.decode("ascii") while len(buf): value = buf.popleft() if value == b",": break tx += value.decode("ascii") self.log.debug("sb_tspeed: {}, {}".format(rx, tx)) try: rx, tx = int(rx), int(tx) except ValueError as err: self.log.error( "illegal TSPEED values received " "(rx={!r}, tx={!r}: {}", rx, tx, err, ) return self._ext_callback[TSPEED](rx, tx) elif opt == SEND: assert self.client, "SE: cannot recv from client: {} {}".format( name_command(cmd), opt_kind, ) (rx, tx) = self._ext_send_callback[TSPEED]() assert (type(rx), type(tx),) == ( int, int, ), (rx, tx) brx = "{}".format(rx).encode("ascii") btx = "{}".format(tx).encode("ascii") response = [IAC, SB, TSPEED, IS, brx, b",", btx, IAC, SE] self.log.debug("send: IAC SB TSPEED IS {0!r},{1!r} IAC SE".format(brx, btx)) self.send_iac(b"".join(response)) if self.pending_option.enabled(WILL + TSPEED): self.pending_option[WILL + TSPEED] = False def _handle_sb_xdisploc(self, buf): """Callback handles IAC-SB-XIDISPLOC-<buf>-SE.""" cmd = buf.popleft() opt = buf.popleft() assert cmd == XDISPLOC, (cmd, name_command(cmd)) assert opt in (IS, SEND), opt opt_kind = {IS: "IS", SEND: "SEND"}.get(opt) self.log.debug( "recv {} {}: {!r}".format( name_command(cmd), opt_kind, b"".join(buf), ) ) if opt == IS: assert self.server, "SE: cannot recv from server: {} {}".format( name_command(cmd), opt, ) xdisploc_str = b"".join(buf).decode("ascii") self.log.debug("recv IAC SB XDISPLOC IS {0!r} IAC SE".format(xdisploc_str)) self._ext_callback[XDISPLOC](xdisploc_str) elif opt == SEND: assert self.client, "SE: cannot recv from client: {} {}".format( name_command(cmd), opt, ) xdisploc_str = self._ext_send_callback[XDISPLOC]().encode("ascii") response = [IAC, SB, XDISPLOC, IS, xdisploc_str, IAC, SE] self.log.debug("send IAC SB XDISPLOC IS {0!r} IAC SE".format(xdisploc_str)) self.send_iac(b"".join(response)) if self.pending_option.enabled(WILL + XDISPLOC): self.pending_option[WILL + XDISPLOC] = False def _handle_sb_ttype(self, buf): """Callback handles IAC-SB-TTYPE-<buf>-SE.""" cmd = buf.popleft() opt = buf.popleft() assert cmd == TTYPE, name_command(cmd) assert opt in (IS, SEND), opt opt_kind = {IS: "IS", SEND: "SEND"}.get(opt) self.log.debug( "recv {} {}: {!r}".format( name_command(cmd), opt_kind, b"".join(buf), ) ) if opt == IS: assert self.server, "SE: cannot recv from server: {} {}".format( name_command(cmd), opt, ) ttype_str = b"".join(buf).decode("ascii") self.log.debug("recv IAC SB TTYPE IS {0!r}".format(ttype_str)) self._ext_callback[TTYPE](ttype_str) elif opt == SEND: assert self.client, "SE: cannot recv from client: {} {}".format( name_command(cmd), opt, ) ttype_str = self._ext_send_callback[TTYPE]().encode("ascii") response = [IAC, SB, TTYPE, IS, ttype_str, IAC, SE] self.log.debug("send IAC SB TTYPE IS {0!r} IAC SE".format(ttype_str)) self.send_iac(b"".join(response)) if self.pending_option.enabled(WILL + TTYPE): self.pending_option[WILL + TTYPE] = False def _handle_sb_environ(self, buf): """ Callback handles (IAC, SB, NEW_ENVIRON, <buf>, SE), :rfc:`1572`. For requests beginning with IS, or subsequent requests beginning with INFO, any callback registered by :meth:`set_ext_callback` of cmd NEW_ENVIRON is passed a dictionary of (key, value) replied-to by client. For requests beginning with SEND, the callback registered by ``set_ext_send_callback`` is provided with a list of keys requested from the server; or None if only VAR and/or USERVAR is requested, indicating to "send them all". """ cmd = buf.popleft() opt = buf.popleft() assert cmd == NEW_ENVIRON, (cmd, name_command(cmd)) assert opt in (IS, SEND, INFO), opt opt_kind = {IS: "IS", INFO: "INFO", SEND: "SEND"}.get(opt) self.log.debug( "recv {} {}: {!r}".format( name_command(cmd), opt_kind, b"".join(buf), ) ) env = _decode_env_buf(b"".join(buf)) if opt in (IS, INFO): assert self.server, "SE: cannot recv from server: {} {}".format( name_command(cmd), opt_kind, ) if opt == IS: if not self.pending_option.enabled(SB + cmd): self.log.debug( "{} {} unsolicited".format(name_command(cmd), opt_kind) ) self.pending_option[SB + cmd] = False elif self.pending_option.get(SB + cmd, None) is False: # a pending option of value of 'False' means it was previously # completed, subsequent environment values *should* have been # sent as command INFO ... self.log.warning( "{} IS already recv; expected INFO.".format(name_command(cmd)) ) if env: self._ext_callback[cmd](env) elif opt == SEND: assert self.client, "SE: cannot recv from client: {} {}".format( name_command(cmd), opt_kind ) # client-side, we do _not_ honor the 'send all VAR' or 'send all # USERVAR' requests -- it is a small bit of a security issue. send_env = _encode_env_buf(self._ext_send_callback[NEW_ENVIRON](env.keys())) response = [IAC, SB, NEW_ENVIRON, IS, send_env, IAC, SE] self.log.debug("env send: {!r}".format(response)) self.send_iac(b"".join(response)) if self.pending_option.enabled(WILL + TTYPE): self.pending_option[WILL + TTYPE] = False def _handle_sb_sndloc(self, buf): """Fire callback for IAC-SB-SNDLOC-<buf>-SE (:rfc:`779`).""" assert buf.popleft() == SNDLOC location_str = b"".join(buf).decode("ascii") self._ext_callback[SNDLOC](location_str) def _send_naws(self): """Fire callback for IAC-DO-NAWS from server.""" # Similar to the callback method order fired by _handle_sb_naws(), # we expect our parameters in order of (rows, cols), matching the # termios.TIOCGWINSZ and terminfo(5) cup capability order. rows, cols = self._ext_send_callback[NAWS]() # NAWS limits columns and rows to a size of 0-65535 (unsigned short). # # >>> struct.unpack('!HH', b'\xff\xff\xff\xff') # (65535, 65535). rows, cols = max(min(65535, rows), 0), max(min(65535, cols), 0) # NAWS is sent in (col, row) order: # # IAC SB NAWS WIDTH[1] WIDTH[0] HEIGHT[1] HEIGHT[0] IAC SE # value = self._escape_iac(struct.pack("!HH", cols, rows)) response = [IAC, SB, NAWS, value, IAC, SE] self.log.debug( "send IAC SB NAWS (rows={0}, cols={1}) IAC SE".format(rows, cols) ) self.send_iac(b"".join(response)) def _handle_sb_naws(self, buf): """Fire callback for IAC-SB-NAWS-<cols_rows[4]>-SE (:rfc:`1073`).""" cmd = buf.popleft() assert cmd == NAWS, name_command(cmd) assert len(buf) == 4, "bad NAWS length {}: {!r}".format(len(buf), buf) assert self.remote_option.enabled( NAWS ), "received IAC SB NAWS without receipt of IAC WILL NAWS" # note a similar formula: # # cols, rows = ((256 * buf[0]) + buf[1], # (256 * buf[2]) + buf[3]) cols, rows = struct.unpack("!HH", b"".join(buf)) self.log.debug( "recv IAC SB NAWS (cols={0}, rows={1}) IAC SE".format(cols, rows) ) # Flip the bytestream order (cols, rows) -> (rows, cols). # # This is for good reason: it matches the termios.TIOCGWINSZ # structure, which also matches the terminfo(5) capability, 'cup'. self._ext_callback[NAWS](rows, cols) def _handle_sb_lflow(self, buf): """Callback responds to IAC SB LFLOW, :rfc:`1372`.""" buf.popleft() # LFLOW if not self.local_option.enabled(LFLOW): raise ValueError( "received IAC SB LFLOW without " "first receiving IAC DO LFLOW." ) opt = buf.popleft() if opt in (LFLOW_OFF, LFLOW_ON): self.lflow = opt is LFLOW_ON self.log.debug( "LFLOW (toggle-flow-control) {}".format("ON" if self.lflow else "OFF") ) elif opt in (LFLOW_RESTART_ANY, LFLOW_RESTART_XON): self.xon_any = opt is LFLOW_RESTART_XON self.log.debug( "LFLOW (toggle-flow-control) {}".format( "RESTART_ANY" if self.xon_any else "RESTART_XON" ) ) else: raise ValueError("Unknown IAC SB LFLOW option received: {!r}".format(buf)) def _handle_sb_status(self, buf): """ Callback responds to IAC SB STATUS, :rfc:`859`. This method simply delegates to either of :meth:`_receive_status` or :meth:`_send_status`. """ buf.popleft() opt = buf.popleft() if opt == SEND: self._send_status() elif opt == IS: self._receive_status(buf) else: raise ValueError( "Illegal byte following IAC SB STATUS: {!r}, " "expected SEND or IS.".format(opt) ) def _receive_status(self, buf): """ Callback responds to IAC SB STATUS IS, :rfc:`859`. :param bytes buf: sub-negotiation byte buffer containing status data. This implementation does its best to analyze our perspective's state to the state options given. Any discrepancies are reported to the error log, but no action is taken. """ for pos in range(len(buf) // 2): cmd = buf.popleft() try: opt = buf.popleft() except IndexError: # a remainder in division step-by-two, presumed nonsense. raise ValueError( "STATUS incomplete at pos {}, cmd: {}".format( pos, name_command(cmd) ) ) matching = False if cmd not in (DO, DONT, WILL, WONT): raise ValueError( "STATUS invalid cmd at pos {}: {}, " "expected DO DONT WILL WONT.".format(pos, cmd) ) if cmd in (DO, DONT): _side = "local" enabled = self.local_option.enabled(opt) matching = (cmd == DO and enabled) or (cmd == DONT and not enabled) else: # (WILL, WONT) _side = "remote" enabled = self.remote_option.enabled(opt) matching = (cmd == WILL and enabled) or (cmd == WONT and not enabled) _mode = "enabled" if enabled else "not enabled" if not matching: self.log.error( "STATUS {cmd} {opt}: disagreed, " "{side} option is {mode}.".format( cmd=name_command(cmd), opt=name_command(opt), side=_side, mode=_mode, ) ) self.log.error( "remote {!r} is {}".format( [ (name_commands(_opt), _val) for _opt, _val in self.remote_option.items() ], self.remote_option.enabled(opt), ) ) self.log.error( " local {!r} is {}".format( [ (name_commands(_opt), _val) for _opt, _val in self.local_option.items() ], self.local_option.enabled(opt), ) ) continue self.log.debug( "STATUS {} {} (agreed).".format(name_command(cmd), name_command(opt)) ) def _send_status(self): """Callback responds to IAC SB STATUS SEND, :rfc:`859`.""" if not ( self.pending_option.enabled(WILL + STATUS) or self.local_option.enabled(STATUS) ): raise ValueError( "Only sender of IAC WILL STATUS " "may reply by IAC SB STATUS IS." ) response = collections.deque() response.extend([IAC, SB, STATUS, IS]) for opt, status in self.local_option.items(): # status is 'WILL' for local option states that are True, # and 'WONT' for options that are False. if opt == STATUS: continue response.extend([WILL if status else WONT, opt]) for opt, status in self.remote_option.items(): # status is 'DO' for remote option states that are True, # or for any DO option requests pending reply. status is # 'DONT' for any remote option states that are False, # or for any DONT option requests pending reply. if opt == STATUS: continue if status or DO + opt in self.pending_option: response.extend([DO, opt]) elif not status or DONT + opt in self.pending_option: response.extend([DONT, opt]) response.extend([IAC, SE]) self.log.debug( "send IAC SB STATUS IS {} IAC SE".format( " ".join([name_command(byte) for byte in list(response)[4:-2]]) ) ) self.send_iac(b"".join(response)) if self.pending_option.enabled(WILL + STATUS): self.pending_option[WILL + STATUS] = False # Special Line Character and other LINEMODE functions. # def _handle_sb_linemode(self, buf): """Callback responds to bytes following IAC SB LINEMODE.""" buf.popleft() opt = buf.popleft() if opt == slc.LMODE_MODE: self._handle_sb_linemode_mode(buf) elif opt == slc.LMODE_SLC: self._handle_sb_linemode_slc(buf) elif opt in (DO, DONT, WILL, WONT): sb_opt = buf.popleft() if sb_opt != slc.LMODE_FORWARDMASK: raise ValueError( "Illegal byte follows IAC SB LINEMODE {}: {!r}, " " expected LMODE_FORWARDMASK.".format(name_command(opt), sb_opt) ) self.log.debug( "recv IAC SB LINEMODE {} LMODE_FORWARDMASK,".format(name_command(opt)) ) self._handle_sb_forwardmask(LINEMODE, buf) else: raise ValueError("Illegal IAC SB LINEMODE option {!r}".format(opt)) def _handle_sb_linemode_mode(self, mode): """ Callback handles mode following IAC SB LINEMODE LINEMODE_MODE. :param bytes mode: a single byte Result of agreement to enter ``mode`` given applied by setting the value of ``self.linemode``, and sending acknowledgment if necessary. """ suggest_mode = slc.Linemode(mode[0]) self.log.debug( "recv IAC SB LINEMODE LINEMODE-MODE {0!r} IAC SE".format(suggest_mode.mask) ) if not suggest_mode.ack: # This implementation acknowledges and sets local linemode # to *any* setting the remote end suggests, requiring a # reply. See notes later under server receipt of acknowledged # linemode. self.send_linemode( linemode=slc.Linemode( mask=bytes([ord(suggest_mode.mask) | ord(slc.LMODE_MODE_ACK)]) ) ) return # " In all cases, a response is never generated to a MODE # command that has the MODE_ACK bit set." # # simply: cannot call self.send_linemode() here forward. if self.client: if self._linemode != suggest_mode: # " When a MODE command is received with the MODE_ACK bit set, # and the mode is different that what the current mode is, # the client will ignore the new mode" # self.log.warning( "server mode differs from local mode, " "though ACK bit is set. Local mode will " "remain." ) self.log.warning("!remote: {0!r}".format(suggest_mode)) self.log.warning(" local: {0!r}".format(self._linemode)) return self.log.debug("Linemode matches, acknowledged by server.") self._linemode = suggest_mode return # as a server, we simply honor whatever is given. This is also # problematic in some designers may wish to implement shells # that specifically do not honor some parts of the bitmask, we # must provide them an any/force-on/force-off mode-table interface. if self._linemode != suggest_mode: self.log.debug("We suggested, - {0!r}".format(self._linemode)) self.log.debug("Client choses + {0!r}".format(suggest_mode)) else: self.log.debug("Linemode agreed by client: {0!r}".format(self._linemode)) self._linemode = suggest_mode def _handle_sb_linemode_slc(self, buf): """ Callback handles IAC-SB-LINEMODE-SLC-<buf>. Processes SLC command function triplets found in ``buf`` and replies accordingly. """ if not len(buf) - 2 % 3: raise ValueError( "SLC buffer wrong size: expect multiple of 3: {}".format(len(buf) - 2) ) self._slc_start() while len(buf): func = buf.popleft() flag = buf.popleft() value = buf.popleft() slc_def = slc.SLC(flag, value) self._slc_process(func, slc_def) self._slc_end() self.request_forwardmask() def _slc_end(self): """Transmit SLC commands buffered by :meth:`_slc_send`.""" if len(self._slc_buffer): self.log.debug("send (slc_end): {!r}".format(b"".join(self._slc_buffer))) buf = b"".join(self._slc_buffer) self._transport.write(self._escape_iac(buf)) self._slc_buffer.clear() self.log.debug("slc_end: [..] IAC SE") self.send_iac(IAC + SE) def _slc_start(self): """Send IAC SB LINEMODE SLC header.""" self.log.debug("slc_start: IAC SB LINEMODE SLC [..]") self.send_iac(IAC + SB + LINEMODE + slc.LMODE_SLC) def _slc_send(self, slctab=None): """ Send supported SLC characters of current tabset, or specified tabset. :param dict slctab: SLC byte tabset as dictionary, such as slc.BSD_SLC_TAB. """ send_count = 0 slctab = slctab or self.slctab for func in range(slc.NSLC + 1): if func == 0 and self.client: # only the server may send an octet with the first # byte (func) set as 0 (SLC_NOSUPPORT). continue _default = slc.SLC_nosupport() if self.slctab.get(bytes([func]), _default).nosupport: continue self._slc_add(bytes([func])) send_count += 1 self.log.debug("slc_send: {} functions queued.".format(send_count)) def _slc_add(self, func, slc_def=None): """ Prepare slc triplet response (function, flag, value) for transmission. For the given SLC_func byte and slc_def instance providing byte attributes ``flag`` and ``val``. If no slc_def is provided, the slc definition of ``slctab`` is used by key ``func``. """ if slc_def is None: slc_def = self.slctab[func] self.log.debug( "_slc_add ({:<10} {})".format(slc.name_slc_command(func) + ",", slc_def) ) if len(self._slc_buffer) >= slc.NSLC * 6: raise ValueError("SLC: buffer full!") self._slc_buffer.extend([func, slc_def.mask, slc_def.val]) def _slc_process(self, func, slc_def): """ Process an SLC definition provided by remote end. Ensure the function definition is in-bounds and an SLC option we support. Store SLC_VARIABLE changes to self.slctab, keyed by SLC byte function ``func``. The special definition (0, SLC_DEFAULT|SLC_VARIABLE, 0) has the side-effect of replying with a full slc tabset, resetting to the default tabset, if indicated. """ # out of bounds checking if ord(func) > slc.NSLC: self.log.warning("SLC not supported (out of range): ({!r})".format(func)) self._slc_add(func, slc.SLC_nosupport()) return # process special request if func == theNULL: if slc_def.level == slc.SLC_DEFAULT: # client requests we send our default tab, self.log.debug("_slc_process: client request SLC_DEFAULT") self._slc_send(self.default_slc_tab) elif slc_def.level == slc.SLC_VARIABLE: # client requests we send our current tab, self.log.debug("_slc_process: client request SLC_VARIABLE") self._slc_send() else: self.log.warning("func(0) flag expected, got {}.".format(slc_def)) return self.log.debug( "_slc_process {:<9} mine={}, his={}".format( slc.name_slc_command(func), self.slctab[func], slc_def ) ) # evaluate slc mylevel, myvalue = (self.slctab[func].level, self.slctab[func].val) if slc_def.level == mylevel and myvalue == slc_def.val: return elif slc_def.level == mylevel and slc_def.ack: return elif slc_def.ack: self.log.debug( "slc value mismatch with ack bit set: ({!r},{!r})".format( myvalue, slc_def.val ) ) return else: self._slc_change(func, slc_def) def _slc_change(self, func, slc_def): """ Update SLC tabset with SLC definition provided by remote end. Modify private attribute ``slctab`` appropriately for the level and value indicated, except for slc tab functions of value SLC_NOSUPPORT and reply as appropriate through :meth:`_slc_add`. """ hislevel = slc_def.level mylevel = self.slctab[func].level if hislevel == slc.SLC_NOSUPPORT: # client end reports SLC_NOSUPPORT; use a # nosupport definition with ack bit set self.slctab[func] = slc.SLC_nosupport() self.slctab[func].set_flag(slc.SLC_ACK) self._slc_add(func) return if hislevel == slc.SLC_DEFAULT: # client end requests we use our default level if mylevel == slc.SLC_DEFAULT: # client end telling us to use SLC_DEFAULT on an SLC we do not # support (such as SYNCH). Set flag to SLC_NOSUPPORT instead # of the SLC_DEFAULT value that it begins with self.slctab[func].set_mask(slc.SLC_NOSUPPORT) else: # set current flag to the flag indicated in default tab self.slctab[func].set_mask(self.default_slc_tab.get(func).mask) # set current value to value indicated in default tab self.default_slc_tab.get(func, slc.SLC_nosupport()) self.slctab[func].set_value(slc_def.val) self._slc_add(func) return # client wants to change to a new value, or, # refuses to change to our value, accept their value. if self.slctab[func].val != theNULL: self.slctab[func].set_value(slc_def.val) self.slctab[func].set_mask(slc_def.mask) slc_def.set_flag(slc.SLC_ACK) self._slc_add(func, slc_def) return # if our byte value is b'\x00', it is not possible for us to support # this request. If our level is default, just ack whatever was sent. # it is a value we cannot change. if mylevel == slc.SLC_DEFAULT: # If our level is default, store & ack whatever was sent self.slctab[func].set_mask(slc_def.mask) self.slctab[func].set_value(slc_def.val) slc_def.set_flag(slc.SLC_ACK) self._slc_add(func, slc_def) elif slc_def.level == slc.SLC_CANTCHANGE and mylevel == slc.SLC_CANTCHANGE: # "degenerate to SLC_NOSUPPORT" self.slctab[func].set_mask(slc.SLC_NOSUPPORT) self._slc_add(func) else: # mask current level to levelbits (clears ack), self.slctab[func].set_mask(self.slctab[func].level) if mylevel == slc.SLC_CANTCHANGE: slc_def = self.default_slc_tab.get(func, slc.SLC_nosupport()) self.slctab[func].val = slc_def.val self._slc_add(func) def _handle_sb_forwardmask(self, cmd, buf): """ Callback handles request for LINEMODE <cmd> LMODE_FORWARDMASK. :param bytes cmd: one of DO, DONT, WILL, WONT. :param bytes buf: bytes following IAC SB LINEMODE DO FORWARDMASK. """ # set and report about pending options by 2-byte opt, # not well tested, no known implementations exist ! if self.server: assert self.remote_option.enabled( LINEMODE ), "cannot recv LMODE_FORWARDMASK {} ({!r}) " "without first sending DO LINEMODE.".format( cmd, buf, ) assert cmd not in ( DO, DONT, ), "cannot recv {} LMODE_FORWARDMASK on server end".format( name_command(cmd) ) if self.client: assert self.local_option.enabled(LINEMODE), ( "cannot recv {} LMODE_FORWARDMASK without first " " sending WILL LINEMODE.".format(name_command(cmd)) ) assert cmd not in ( WILL, WONT, ), "cannot recv {} LMODE_FORWARDMASK on client end".format( name_command(cmd) ) assert ( cmd not in (DONT,) or len(buf) == 0 ), "Illegal bytes follow DONT LMODE_FORWARDMASK: {!r}".format(buf) assert cmd not in (DO,) and len( buf ), "bytes must follow DO LMODE_FORWARDMASK" opt = SB + LINEMODE + slc.LMODE_FORWARDMASK if cmd in ( WILL, WONT, ): self.remote_option[opt] = bool(cmd is WILL) elif cmd in ( DO, DONT, ): self.local_option[opt] = bool(cmd is DO) if cmd == DO: self._handle_do_forwardmask(buf) def _handle_sb_comport(self, buf): """ Callback handles IAC-SB-COM-PORT-OPTION. This callback simply logs the subnegotiation but does not perform any action. :param bytes buf: bytes following IAC SB LINEMODE DO FORWARDMASK. """ self.log.debug( "SB unhandled: cmd={}, buf={!r}".format(name_command(COM_PORT_OPTION), buf) ) return def _handle_sb_gmcp(self, buf): """ Callback handles request for Generic Mud Communication Protocol (GMCP). This callback simply logs the subnegotiation but does not perform any action. :param bytes buf: bytes following IAC SB GMCP. """ self.log.debug( "SB unhandled: cmd={}, buf={!r}".format(name_command(GMCP), b"".join(buf)) ) return def _handle_do_forwardmask(self, buf): """ Callback handles request for LINEMODE DO FORWARDMASK. :param bytes buf: bytes following IAC SB LINEMODE DO FORWARDMASK. :raises NotImplementedError """ raise NotImplementedError
[docs]class TelnetWriterUnicode(TelnetWriter): """ A Unicode StreamWriter interface for Telnet protocol. See ancestor class, :class:`TelnetWriter` for details. Requires the ``fn_encoding`` callback, receiving mutually boolean keyword argument ``outgoing=True`` to determine what encoding should be used to decode the value in the direction specified. The encoding may be conditionally negotiated by CHARSET, :rfc:`2066`, or discovered by ``LANG`` environment variables by NEW_ENVIRON, :rfc:`1572`. """ def __init__( self, transport, protocol, fn_encoding, *, encoding_errors="strict", **kwds ): self.fn_encoding = fn_encoding self.encoding_errors = encoding_errors super().__init__(transport, protocol, **kwds)
[docs] def encode(self, string, errors): """ Encode ``string`` using protocol-preferred encoding. :param str errors: same as meaning in :meth:`codecs.Codec.encode`. When None, value of ``encoding_errors`` given to class initializer is used. :param str errors: same as meaning in :meth:`codecs.Codec.encode`, when ``None`` (default), value of class initializer keyword argument, ``encoding_errors``. .. note: though a unicode interface, when ``outbinary`` mode has not been protocol negotiated, ``fn_encoding`` strictly enforces 7-bit ASCII range (ordinal byte values less than 128), as a strict compliance of the telnet RFC. """ encoding = self.fn_encoding(outgoing=True) return bytes(string, encoding, errors or self.encoding_errors)
[docs] def write(self, string, errors=None): """ Write unicode string to transport, using protocol-preferred encoding. If the connection is closed, nothing is done. :param str string: unicode string text to write to endpoint using the protocol's preferred encoding. When the protocol ``encoding`` keyword is explicitly set to ``False``, the given string should be only raw ``b'bytes'``. :param str errors: same as meaning in :meth:`codecs.Codec.encode`, when ``None`` (default), value of class initializer keyword argument, ``encoding_errors``. :rtype: None """ if self.connection_closed: return errors = errors or self.encoding_errors self._write(self.encode(string, errors))
[docs] def writelines(self, lines, errors=None): """ Write unicode strings to transport. Note that newlines are not added. The sequence can be any iterable object producing strings. This is equivalent to calling write() for each string. """ self.write(string="".join(lines), errors=errors)
[docs] def echo(self, string, errors=None): """ Conditionally write ``string`` to transport when "remote echo" enabled. :param str string: string received as input, conditionally written. :param str errors: same as meaning in :meth:`codecs.Codec.encode`. This method may only be called from the server perspective. The default implementation depends on telnet negotiation willingness for local echo: only an RFC-compliant telnet client will correctly set or unset echo accordingly by demand. """ assert self.server, "Client never performs echo of input received." if self.will_echo: self.write(string=string, errors=errors)
class Option(dict): """ Telnet option state negotiation helper class. This class simply acts as a logging decorator for state changes of a dictionary describing telnet option negotiation. """ def __init__(self, name, log): """ Class initializer. :param str name: decorated name representing option class, such as 'local', 'remote', or 'pending'. """ self.name, self.log = name, log dict.__init__(self) def enabled(self, key): """ Return True if option is enabled. :param bytes key: telnet option :rtype: bool """ return bool(self.get(key, None) is True) def __setitem__(self, key, value): # the real purpose of this class, tracking state negotiation. if value != dict.get(self, key, None): descr = " + ".join( [name_command(bytes([byte])) for byte in key[:2]] + [repr(byte) for byte in key[2:]] ) self.log.debug("{}[{}] = {}".format(self.name, descr, value)) dict.__setitem__(self, key, value) def _escape_environ(buf): """ Return new buffer with VAR and USERVAR escaped, if present in ``buf``. :param bytes buf: given bytes buffer :returns: bytes buffer with escape characters inserted. :rtype: bytes """ return buf.replace(VAR, ESC + VAR).replace(USERVAR, ESC + USERVAR) def _unescape_environ(buf): """ Return new buffer with escape characters removed for VAR and USERVAR. :param bytes buf: given bytes buffer :returns: bytes buffer with escape characters removed. :rtype: bytes """ return buf.replace(ESC + VAR, VAR).replace(ESC + USERVAR, USERVAR) def _encode_env_buf(env): """ Encode dictionary for transmission as environment variables, :rfc:`1572`. :param bytes buf: dictionary of environment values. :returns: bytes buffer meant to follow sequence IAC SB NEW_ENVIRON IS. It is not terminated by IAC SE. :rtype: bytes Returns bytes array ``buf`` for use in sequence (IAC, SB, NEW_ENVIRON, IS, <buf>, IAC, SE) as set forth in :rfc:`1572`. """ buf = collections.deque() for key, value in env.items(): buf.append(VAR) buf.extend([_escape_environ(key.encode("ascii"))]) buf.append(VALUE) buf.extend([_escape_environ("{}".format(value).encode("ascii"))]) return b"".join(buf) def _decode_env_buf(buf): """ Decode environment values to dictionary, :rfc:`1572`. :param bytes buf: bytes array following sequence IAC SB NEW_ENVIRON SEND or IS up to IAC SE. :returns: dictionary representing the environment values decoded from buf. :rtype: dict This implementation does not distinguish between ``USERVAR`` and ``VAR``. """ env = {} # build table of (non-escaped) delimiters by index of buf[]. breaks = [ idx for (idx, byte) in enumerate(buf) if ( bytes([byte]) in ( VAR, USERVAR, ) and (idx == 0 or bytes([buf[idx - 1]]) != ESC) ) ] for idx, ptr in enumerate(breaks): # find buf[] starting, ending positions, begin after # buf[0], which is currently valued VAR or USERVAR start = ptr + 1 if idx == len(breaks) - 1: end = len(buf) else: end = breaks[idx + 1] pair = buf[start:end].split(VALUE, 1) key = _unescape_environ(pair[0]).decode("ascii", "strict") if len(pair) == 1: value = "" else: value = _unescape_environ(pair[1]).decode("ascii", "strict") env[key] = value return env