Source code for telnetlib3.server

"""
The ``main`` function here is wired to the command line tool by name
telnetlib3-server.  If this server's PID receives the SIGTERM signal, it
attempts to shutdown gracefully.

The :class:`TelnetServer` class negotiates a character-at-a-time (WILL-SGA,
WILL-ECHO) session with support for negotiation about window size, environment
variables, terminal type name, and to automatically close connections clients
after an idle period.
"""
# std imports
import collections
import argparse
import asyncio
import logging
import signal

# local
from . import server_base
from . import accessories

__all__ = ("TelnetServer", "create_server", "run_server", "parse_server_args")

CONFIG = collections.namedtuple(
    "CONFIG",
    [
        "host",
        "port",
        "loglevel",
        "logfile",
        "logfmt",
        "shell",
        "encoding",
        "force_binary",
        "timeout",
        "connect_maxwait",
    ],
)(
    host="localhost",
    port=6023,
    loglevel="info",
    logfile=None,
    logfmt=accessories._DEFAULT_LOGFMT,
    shell=accessories.function_lookup("telnetlib3.telnet_server_shell"),
    encoding="utf8",
    force_binary=False,
    timeout=300,
    connect_maxwait=4.0,
)
logger = logging.getLogger("telnetlib3.server")


[docs]class TelnetServer(server_base.BaseServer): """Telnet Server protocol performing common negotiation.""" #: Maximum number of cycles to seek for all terminal types. We are seeking #: the repeat or cycle of a terminal table, choosing the first -- but when #: negotiated by MUD clients, we chose the must Unix TERM appropriate, TTYPE_LOOPMAX = 8 # Derived methods from base class def __init__(self, term="unknown", cols=80, rows=25, timeout=300, *args, **kwargs): super().__init__(*args, **kwargs) self.waiter_encoding = asyncio.Future() self._tasks.append(self.waiter_encoding) self._ttype_count = 1 self._timer = None self._extra.update( { "term": term, "charset": kwargs.get("encoding", ""), "cols": cols, "rows": rows, "timeout": timeout, } )
[docs] def connection_made(self, transport): from .telopt import NAWS, NEW_ENVIRON, TSPEED, TTYPE, XDISPLOC, CHARSET super().connection_made(transport) # begin timeout timer self.set_timeout() # Wire extended rfc callbacks for responses to # requests of terminal attributes, environment values, etc. for tel_opt, callback_fn in [ (NAWS, self.on_naws), (NEW_ENVIRON, self.on_environ), (TSPEED, self.on_tspeed), (TTYPE, self.on_ttype), (XDISPLOC, self.on_xdisploc), (CHARSET, self.on_charset), ]: self.writer.set_ext_callback(tel_opt, callback_fn) # Wire up a callbacks that return definitions for requests. for tel_opt, callback_fn in [ (NEW_ENVIRON, self.on_request_environ), (CHARSET, self.on_request_charset), ]: self.writer.set_ext_send_callback(tel_opt, callback_fn)
[docs] def data_received(self, data): self.set_timeout() super().data_received(data)
[docs] def begin_negotiation(self): from .telopt import DO, TTYPE super().begin_negotiation() self.writer.iac(DO, TTYPE)
[docs] def begin_advanced_negotiation(self): from .telopt import DO, WILL, SGA, ECHO, BINARY, NEW_ENVIRON, NAWS, CHARSET super().begin_advanced_negotiation() self.writer.iac(WILL, SGA) self.writer.iac(WILL, ECHO) self.writer.iac(WILL, BINARY) self.writer.iac(DO, NEW_ENVIRON) self.writer.iac(DO, NAWS) if self.default_encoding: self.writer.iac(DO, CHARSET)
[docs] def check_negotiation(self, final=False): from .telopt import TTYPE parent = super().check_negotiation() # in addition to the base class negotiation check, periodically check # for completion of bidirectional encoding negotiation. result = self._check_encoding() encoding = self.encoding(outgoing=True, incoming=True) if not self.waiter_encoding.done() and result: logger.debug("encoding complete: {0!r}".format(encoding)) self.waiter_encoding.set_result(result) elif ( not self.waiter_encoding.done() and self.writer.remote_option.get(TTYPE) is False ): # if the remote end doesn't support TTYPE, which is agreed upon # to continue towards advanced negotiation of CHARSET, we assume # the distant end would not support it, declaring encoding failed. logger.debug( "encoding failed after {0:1.2f}s: {1}".format(self.duration, encoding) ) self.waiter_encoding.set_result(result) # False return parent elif not self.waiter_encoding.done() and final: logger.debug( "encoding failed after {0:1.2f}s: {1}".format(self.duration, encoding) ) self.waiter_encoding.set_result(result) # False return parent return parent and result
# new methods
[docs] def encoding(self, outgoing=None, incoming=None): """ Return encoding for the given stream direction. :param bool outgoing: Whether the return value is suitable for encoding bytes for transmission to client end. :param bool incoming: Whether the return value is suitable for decoding bytes received from the client. :raises TypeError: when a direction argument, either ``outgoing`` or ``incoming``, was not set ``True``. :returns: ``'US-ASCII'`` for the directions indicated, unless ``BINARY`` :rfc:`856` has been negotiated for the direction indicated or :attr`force_binary` is set ``True``. :rtype: str """ if not (outgoing or incoming): raise TypeError( "encoding arguments 'outgoing' and 'incoming' " "are required: toggle at least one." ) # may we encode in the direction indicated? _outgoing_only = outgoing and not incoming _incoming_only = not outgoing and incoming _bidirectional = outgoing and incoming may_encode = ( (_outgoing_only and self.writer.outbinary) or (_incoming_only and self.writer.inbinary) or (_bidirectional and self.writer.outbinary and self.writer.inbinary) ) if self.force_binary or may_encode: # prefer 'LANG' environment variable forwarded by client, if any. # for modern systems, this is the preferred method of encoding # negotiation. _lang = self.get_extra_info("LANG", "") if _lang and _lang != "C": return accessories.encoding_from_lang(_lang) # otherwise, the less CHARSET negotiation may be found in many # East-Asia BBS and Western MUD systems. return self.get_extra_info("charset") or self.default_encoding return "US-ASCII"
[docs] def set_timeout(self, duration=-1): """ Restart or unset timeout for client. :param int duration: When specified as a positive integer, schedules Future for callback of :meth:`on_timeout`. When ``-1``, the value of ``self.get_extra_info('timeout')`` is used. When non-True, it is canceled. """ if duration == -1: duration = self.get_extra_info("timeout") if self._timer is not None: if self._timer in self._tasks: self._tasks.remove(self._timer) self._timer.cancel() if duration: loop = asyncio.get_event_loop() self._timer = loop.call_later(duration, self.on_timeout) self._tasks.append(self._timer) self._extra["timeout"] = duration
# Callback methods
[docs] def on_timeout(self): """ Callback received on session timeout. Default implementation writes "Timeout." bound by CRLF and closes. This can be disabled by calling :meth:`set_timeout` with :paramref:`~.set_timeout.duration` value of ``0`` or value of the same for keyword argument ``timeout``. """ logger.debug("Timeout after {self.idle:1.2f}s".format(self=self)) self.writer.write("\r\nTimeout.\r\n") self.timeout_connection()
[docs] def on_naws(self, rows, cols): """ Callback receives NAWS response, :rfc:`1073`. :param int rows: screen size, by number of cells in height. :param int cols: screen size, by number of cells in width. """ self._extra.update({"rows": rows, "cols": cols})
[docs] def on_request_environ(self): """ Definition for NEW_ENVIRON request of client, :rfc:`1572`. This method is a callback from :meth:`~.TelnetWriter.request_environ`, first entered on receipt of (WILL, NEW_ENVIRON) by server. The return value *defines the request made to the client* for environment values. :rtype list: a list of unicode character strings of US-ASCII characters, indicating the environment keys the server requests of the client. If this list contains the special byte constants, ``USERVAR`` or ``VAR``, the client is allowed to volunteer any other additional user or system values. Any empty return value indicates that no request should be made. The default return value is:: ['LANG', 'TERM', 'COLUMNS', 'LINES', 'DISPLAY', 'COLORTERM', VAR, USERVAR, 'COLORTERM'] """ from .telopt import VAR, USERVAR return [ "LANG", "TERM", "COLUMNS", "LINES", "DISPLAY", "COLORTERM", VAR, USERVAR, ]
[docs] def on_environ(self, mapping): """Callback receives NEW_ENVIRON response, :rfc:`1572`.""" # A well-formed client responds with empty values for variables to # mean "no value". They might have it, they just may not wish to # divulge that information. We pop these keys as a side effect in # the result statement of the following list comprehension. no_value = [ mapping.pop(key) or key for key, val in list(mapping.items()) if not val ] # because we are working with "untrusted input", we make one fair # distinction: all keys received by NEW_ENVIRON are in uppercase. # this ensures a client may not override trusted values such as # 'peer'. u_mapping = {key.upper(): val for key, val in list(mapping.items())} logger.debug("on_environ received: {0!r}".format(u_mapping)) self._extra.update(u_mapping)
[docs] def on_request_charset(self): """ Definition for CHARSET request by client, :rfc:`2066`. This method is a callback from :meth:`~.TelnetWriter.request_charset`, first entered on receipt of (WILL, CHARSET) by server. The return value *defines the request made to the client* for encodings. :rtype list: a list of unicode character strings of US-ASCII characters, indicating the encodings offered by the server in its preferred order. Any empty return value indicates that no encodings are offered. The default return value begins:: ['UTF-8', 'UTF-16', 'LATIN1', 'US-ASCII', 'BIG5', 'GBK', ...] """ return ( [ "UTF-8", "UTF-16", "LATIN1", "US-ASCII", "BIG5", "GBK", "SHIFTJIS", "GB18030", "KOI8-R", "KOI8-U", ] + [ # "Part 12 was slated for Latin/Devanagari, # but abandoned in 1997" "ISO8859-{}".format(iso) for iso in range(1, 16) if iso != 12 ] + [ "CP{}".format(cp) for cp in ( 154, 437, 500, 737, 775, 850, 852, 855, 856, 857, 860, 861, 862, 863, 864, 865, 866, 869, 874, 875, 932, 949, 950, 1006, 1026, 1140, 1250, 1251, 1252, 1253, 1254, 1255, 1257, 1257, 1258, 1361, ) ] )
[docs] def on_charset(self, charset): """Callback for CHARSET response, :rfc:`2066`.""" self._extra["charset"] = charset
[docs] def on_tspeed(self, rx, tx): """Callback for TSPEED response, :rfc:`1079`.""" self._extra["tspeed"] = "{0},{1}".format(rx, tx)
[docs] def on_ttype(self, ttype): """Callback for TTYPE response, :rfc:`930`.""" # TTYPE may be requested multiple times, we honor this system and # attempt to cause the client to cycle, as their first response may # not be their most significant. All responses held as 'ttype{n}', # where {n} is their serial response order number. # # The most recently received terminal type by the server is # assumed TERM by this implementation, even when unsolicited. key = "ttype{}".format(self._ttype_count) self._extra[key] = ttype if ttype: self._extra["TERM"] = ttype _lastval = self.get_extra_info("ttype{0}".format(self._ttype_count - 1)) if key != "ttype1" and ttype == self.get_extra_info("ttype1", None): # cycle has looped, stop logger.debug("ttype cycle stop at {0}: {1}, looped.".format(key, ttype)) elif not ttype or self._ttype_count > self.TTYPE_LOOPMAX: # empty reply string or too many responses! logger.warning("ttype cycle stop at {0}: {1}.".format(key, ttype)) elif self._ttype_count == 3 and ttype.upper().startswith("MTTS "): val = self.get_extra_info("ttype2") logger.debug( "ttype cycle stop at {0}: {1}, using {2} from ttype2.".format( key, ttype, val ) ) self._extra["TERM"] = val elif ttype == _lastval: logger.debug("ttype cycle stop at {0}: {1}, repeated.".format(key, ttype)) else: logger.debug("ttype cycle cont at {0}: {1}.".format(key, ttype)) self._ttype_count += 1 self.writer.request_ttype()
[docs] def on_xdisploc(self, xdisploc): """Callback for XDISPLOC response, :rfc:`1096`.""" self._extra["xdisploc"] = xdisploc
# private methods def _check_encoding(self): # Periodically check for completion of ``waiter_encoding``. from .telopt import DO, BINARY if ( self.writer.outbinary and not self.writer.inbinary and not DO + BINARY in self.writer.pending_option ): logger.debug("BINARY in: direction request.") self.writer.iac(DO, BINARY) return False # are we able to negotiate BINARY bidirectionally? return self.writer.outbinary and self.writer.inbinary
[docs]async def create_server(host=None, port=23, protocol_factory=TelnetServer, **kwds): """ Create a TCP Telnet server. :param str host: The host parameter can be a string, in that case the TCP server is bound to host and port. The host parameter can also be a sequence of strings, and in that case the TCP server is bound to all hosts of the sequence. :param int port: listen port for TCP Server. :param server_base.BaseServer protocol_factory: An alternate protocol factory for the server, when unspecified, :class:`TelnetServer` is used. :param Callable shell: A :func:`asyncio.coroutine` that is called after negotiation completes, receiving arguments ``(reader, writer)``. The reader is a :class:`~.TelnetReader` instance, the writer is a :class:`~.TelnetWriter` instance. :param str encoding: The default assumed encoding, or ``False`` to disable unicode support. Encoding may be negotiation to another value by the client through NEW_ENVIRON :rfc:`1572` by sending environment value of ``LANG``, or by any legal value for CHARSET :rfc:`2066` negotiation. The server's attached ``reader, writer`` streams accept and return unicode, unless this value explicitly set ``False``. In that case, the attached streams interfaces are bytes-only. :param str encoding_errors: Same meaning as :meth:`codecs.Codec.encode`. Default value is ``strict``. :param bool force_binary: When ``True``, the encoding specified is used for both directions even when BINARY mode, :rfc:`856`, is not negotiated for the direction specified. This parameter has no effect when ``encoding=False``. :param str term: Value returned for ``writer.get_extra_info('term')`` until negotiated by TTYPE :rfc:`930`, or NAWS :rfc:`1572`. Default value is ``'unknown'``. :param int cols: Value returned for ``writer.get_extra_info('cols')`` until negotiated by NAWS :rfc:`1572`. Default value is 80 columns. :param int rows: Value returned for ``writer.get_extra_info('rows')`` until negotiated by NAWS :rfc:`1572`. Default value is 25 rows. :param int timeout: Causes clients to disconnect if idle for this duration, in seconds. This ensures resources are freed on busy servers. When explicitly set to ``False``, clients will not be disconnected for timeout. Default value is 300 seconds (5 minutes). :param float connect_maxwait: If the remote end is not complaint, or otherwise confused by our demands, the shell continues anyway after the greater of this value has elapsed. A client that is not answering option negotiation will delay the start of the shell by this amount. :param int limit: The buffer limit for the reader stream. :return asyncio.Server: The return value is the same as :meth:`asyncio.loop.create_server`, An object which can be used to stop the service. This function is a :func:`~asyncio.coroutine`. """ protocol_factory = protocol_factory or TelnetServer loop = asyncio.get_event_loop() return await loop.create_server(lambda: protocol_factory(**kwds), host, port)
async def _sigterm_handler(server, log): logger.info("SIGTERM received, closing server.") # This signals the completion of the server.wait_closed() Future, # allowing the main() function to complete. server.close()
[docs]def parse_server_args(): parser = argparse.ArgumentParser( description="Telnet protocol server", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument("host", nargs="?", default=CONFIG.host, help="bind address") parser.add_argument( "port", nargs="?", type=int, default=CONFIG.port, help="bind port" ) parser.add_argument("--loglevel", default=CONFIG.loglevel, help="level name") parser.add_argument("--logfile", default=CONFIG.logfile, help="filepath") parser.add_argument("--logfmt", default=CONFIG.logfmt, help="log format") parser.add_argument( "--shell", default=CONFIG.shell, type=accessories.function_lookup, help="module.function_name", ) parser.add_argument("--encoding", default=CONFIG.encoding, help="encoding name") parser.add_argument( "--force-binary", action="store_true", default=CONFIG.force_binary, help="force binary transmission", ) parser.add_argument( "--timeout", default=CONFIG.timeout, help="idle disconnect (0 disables)" ) parser.add_argument( "--connect-maxwait", type=float, default=CONFIG.connect_maxwait, help="timeout for pending negotiation", ) return vars(parser.parse_args())
[docs]async def run_server( host=CONFIG.host, port=CONFIG.port, loglevel=CONFIG.loglevel, logfile=CONFIG.logfile, logfmt=CONFIG.logfmt, shell=CONFIG.shell, encoding=CONFIG.encoding, force_binary=CONFIG.force_binary, timeout=CONFIG.timeout, connect_maxwait=CONFIG.connect_maxwait, ): """ Program entry point for server daemon. This function configures a logger and creates a telnet server for the given keyword arguments, serving forever, completing only upon receipt of SIGTERM. """ log = accessories.make_logger( name="telnetlib3.server", loglevel=loglevel, logfile=logfile, logfmt=logfmt ) # log all function arguments. _locals = locals() _cfg_mapping = ", ".join( ("{0}={{{0}}}".format(field) for field in CONFIG._fields) ).format(**_locals) logger.debug("Server configuration: {}".format(_cfg_mapping)) loop = asyncio.get_event_loop() # bind server = await create_server( host, port, shell=shell, encoding=encoding, force_binary=force_binary, timeout=timeout, connect_maxwait=connect_maxwait, ) # SIGTERM cases server to gracefully stop loop.add_signal_handler( signal.SIGTERM, asyncio.ensure_future, _sigterm_handler(server, log) ) logger.info("Server ready on {0}:{1}".format(host, port)) # await completion of server stop try: await server.wait_closed() finally: # remove signal handler on stop loop.remove_signal_handler(signal.SIGTERM) logger.info("Server stop.")
def main(): asyncio.run(run_server(**parse_server_args())) if __name__ == "__main__": main()