"""
The ``main`` function here is wired to the command line tool by name
telnetlib3-server. If this server's PID receives the SIGTERM signal, it
attempts to shutdown gracefully.
The :class:`TelnetServer` class negotiates a character-at-a-time (WILL-SGA,
WILL-ECHO) session with support for negotiation about window size, environment
variables, terminal type name, and to automatically close connections clients
after an idle period.
"""
# std imports
import collections
import argparse
import asyncio
import logging
import signal
from weakref import proxy
# local
from . import server_base
from . import accessories
__all__ = ('TelnetServer', 'create_server', 'run_server', 'parse_server_args')
CONFIG = collections.namedtuple('CONFIG', [
'host', 'port', 'loglevel', 'logfile', 'logfmt', 'shell', 'encoding',
'force_binary', 'timeout', 'connect_maxwait'])(
host='localhost', port=6023, loglevel='info',
logfile=None, logfmt=accessories._DEFAULT_LOGFMT ,
shell=accessories.function_lookup('telnetlib3.telnet_server_shell'),
encoding='utf8', force_binary=False, timeout=300, connect_maxwait=4.0)
[docs]class TelnetServer(server_base.BaseServer):
"""Telnet Server protocol performing common negotiation."""
#: Maximum number of cycles to seek for all terminal types. We are seeking
#: the repeat or cycle of a terminal table, choosing the first -- but when
#: negotiated by MUD clients, we chose the must Unix TERM appropriate,
TTYPE_LOOPMAX = 8
# Derived methods from base class
def __init__(self, term='unknown', cols=80, rows=25, timeout=300,
*args, **kwargs):
super().__init__(*args, **kwargs)
self.waiter_encoding = asyncio.Future()
self._tasks.append(self.waiter_encoding)
self._ttype_count = 1
self._timer = None
self._extra.update({
'term': term,
'charset': kwargs.get('encoding', ''),
'cols': cols,
'rows': rows,
'timeout': timeout
})
[docs] def connection_made(self, transport):
from .telopt import NAWS, NEW_ENVIRON, TSPEED, TTYPE, XDISPLOC, CHARSET
super().connection_made(transport)
# begin timeout timer
self.set_timeout()
# Wire extended rfc callbacks for responses to
# requests of terminal attributes, environment values, etc.
for tel_opt, callback_fn in [
(NAWS, self.on_naws),
(NEW_ENVIRON, self.on_environ),
(TSPEED, self.on_tspeed),
(TTYPE, self.on_ttype),
(XDISPLOC, self.on_xdisploc),
(CHARSET, self.on_charset),
]:
self.writer.set_ext_callback(tel_opt, callback_fn)
# Wire up a callbacks that return definitions for requests.
for tel_opt, callback_fn in [
(NEW_ENVIRON, self.on_request_environ),
(CHARSET, self.on_request_charset),
]:
self.writer.set_ext_send_callback(tel_opt, callback_fn)
[docs] def data_received(self, data):
self.set_timeout()
super().data_received(data)
[docs] def begin_negotiation(self):
from .telopt import DO, TTYPE
super().begin_negotiation()
self.writer.iac(DO, TTYPE)
[docs] def begin_advanced_negotiation(self):
from .telopt import (DO, WILL, SGA, ECHO, BINARY,
NEW_ENVIRON, NAWS, CHARSET)
super().begin_advanced_negotiation()
self.writer.iac(WILL, SGA)
self.writer.iac(WILL, ECHO)
self.writer.iac(WILL, BINARY)
self.writer.iac(DO, NEW_ENVIRON)
self.writer.iac(DO, NAWS)
if self.default_encoding:
self.writer.iac(DO, CHARSET)
[docs] def check_negotiation(self, final=False):
from .telopt import TTYPE
parent = super().check_negotiation()
# in addition to the base class negotiation check, periodically check
# for completion of bidirectional encoding negotiation.
result = self._check_encoding()
encoding = self.encoding(outgoing=True, incoming=True)
if not self.waiter_encoding.done() and result:
self.log.debug('encoding complete: {0!r}'.format(encoding))
self.waiter_encoding.set_result(proxy(self))
elif (not self.waiter_encoding.done() and
self.writer.remote_option.get(TTYPE) is False):
# if the remote end doesn't support TTYPE, which is agreed upon
# to continue towards advanced negotiation of CHARSET, we assume
# the distant end would not support it, declaring encoding failed.
self.log.debug('encoding failed after {0:1.2f}s: {1}'
.format(self.duration, encoding))
self.waiter_encoding.set_result(proxy(self))
return parent
elif not self.waiter_encoding.done() and final:
self.log.debug('encoding failed after {0:1.2f}s: {1}'
.format(self.duration, encoding))
self.waiter_encoding.set_result(proxy(self))
return parent
return parent and result
# new methods
[docs] def encoding(self, outgoing=None, incoming=None):
"""
Return encoding for the given stream direction.
:param bool outgoing: Whether the return value is suitable for
encoding bytes for transmission to client end.
:param bool incoming: Whether the return value is suitable for
decoding bytes received from the client.
:raises TypeError: when a direction argument, either ``outgoing``
or ``incoming``, was not set ``True``.
:returns: ``'US-ASCII'`` for the directions indicated, unless
``BINARY`` :rfc:`856` has been negotiated for the direction
indicated or :attr`force_binary` is set ``True``.
:rtype: str
"""
if not (outgoing or incoming):
raise TypeError("encoding arguments 'outgoing' and 'incoming' "
"are required: toggle at least one.")
# may we encode in the direction indicated?
_outgoing_only = outgoing and not incoming
_incoming_only = not outgoing and incoming
_bidirectional = outgoing and incoming
may_encode = ((_outgoing_only and self.writer.outbinary) or
(_incoming_only and self.writer.inbinary) or
(_bidirectional and
self.writer.outbinary and self.writer.inbinary))
if self.force_binary or may_encode:
# prefer 'LANG' environment variable forwarded by client, if any.
# for modern systems, this is the preferred method of encoding
# negotiation.
_lang = self.get_extra_info('LANG', '')
if _lang and _lang != 'C':
return accessories.encoding_from_lang(_lang)
# otherwise, the less CHARSET negotiation may be found in many
# East-Asia BBS and Western MUD systems.
return self.get_extra_info('charset') or self.default_encoding
return 'US-ASCII'
[docs] def set_timeout(self, duration=-1):
"""
Restart or unset timeout for client.
:param int duration: When specified as a positive integer,
schedules Future for callback of :meth:`on_timeout`. When ``-1``,
the value of ``self.get_extra_info('timeout')`` is used. When
non-True, it is canceled.
"""
if duration == -1:
duration = self.get_extra_info('timeout')
if self._timer is not None:
if self._timer in self._tasks:
self._tasks.remove(self._timer)
self._timer.cancel()
if duration:
self._timer = self._loop.call_later(duration, self.on_timeout)
self._tasks.append(self._timer)
self._extra['timeout'] = duration
# Callback methods
[docs] def on_timeout(self):
"""
Callback received on session timeout.
Default implementation writes "Timeout." bound by CRLF and closes.
This can be disabled by calling :meth:`set_timeout` with
:paramref:`~.set_timeout.duration` value of ``0`` or value of
the same for keyword argument ``timeout``.
"""
self.log.debug('Timeout after {self.idle:1.2f}s'.format(self=self))
self.writer.write('\r\nTimeout.\r\n')
self.writer.close()
[docs] def on_naws(self, rows, cols):
"""
Callback receives NAWS response, :rfc:`1073`.
:param int rows: screen size, by number of cells in height.
:param int cols: screen size, by number of cells in width.
"""
self._extra.update({'rows': rows, 'cols': cols})
[docs] def on_request_environ(self):
"""
Definition for NEW_ENVIRON request of client, :rfc:`1572`.
This method is a callback from :meth:`~.TelnetWriter.request_environ`,
first entered on receipt of (WILL, NEW_ENVIRON) by server. The return
value *defines the request made to the client* for environment values.
:rtype list: a list of unicode character strings of US-ASCII
characters, indicating the environment keys the server requests
of the client. If this list contains the special byte constants,
``USERVAR`` or ``VAR``, the client is allowed to volunteer any
other additional user or system values.
Any empty return value indicates that no request should be made.
The default return value is::
['LANG', 'TERM', 'COLUMNS', 'LINES', 'DISPLAY', 'COLORTERM',
VAR, USERVAR, 'COLORTERM']
"""
from .telopt import VAR, USERVAR
return ['LANG', 'TERM', 'COLUMNS', 'LINES', 'DISPLAY', 'COLORTERM',
VAR, USERVAR]
[docs] def on_environ(self, mapping):
"""Callback receives NEW_ENVIRON response, :rfc:`1572`."""
# A well-formed client responds with empty values for variables to
# mean "no value". They might have it, they just may not wish to
# divulge that information. We pop these keys as a side effect in
# the result statement of the following list comprehension.
no_value = [mapping.pop(key) or key
for key, val in list(mapping.items())
if not val]
# because we are working with "untrusted input", we make one fair
# distinction: all keys received by NEW_ENVIRON are in uppercase.
# this ensures a client may not override trusted values such as
# 'peer'.
u_mapping = {key.upper(): val for key, val in list(mapping.items())}
self.log.debug('on_environ received: {0!r}'.format(u_mapping))
self._extra.update(u_mapping)
[docs] def on_request_charset(self):
"""
Definition for CHARSET request by client, :rfc:`2066`.
This method is a callback from :meth:`~.TelnetWriter.request_charset`,
first entered on receipt of (WILL, CHARSET) by server. The return
value *defines the request made to the client* for encodings.
:rtype list: a list of unicode character strings of US-ASCII
characters, indicating the encodings offered by the server in
its preferred order.
Any empty return value indicates that no encodings are offered.
The default return value begins::
['UTF-8', 'UTF-16', 'LATIN1', 'US-ASCII', 'BIG5', 'GBK', ...]
"""
return ['UTF-8', 'UTF-16', 'LATIN1', 'US-ASCII', 'BIG5',
'GBK', 'SHIFTJIS', 'GB18030', 'KOI8-R', 'KOI8-U',
] + [
# "Part 12 was slated for Latin/Devanagari,
# but abandoned in 1997"
'ISO8859-{}'.format(iso) for iso in range(1, 16)
if iso != 12
] + ['CP{}'.format(cp) for cp in (
154, 437, 500, 737, 775, 850, 852, 855, 856, 857,
860, 861, 862, 863, 864, 865, 866, 869, 874, 875,
932, 949, 950, 1006, 1026, 1140, 1250, 1251, 1252,
1253, 1254, 1255, 1257, 1257, 1258, 1361,
)]
[docs] def on_charset(self, charset):
"""Callback for CHARSET response, :rfc:`2066`."""
self._extra['charset'] = charset
[docs] def on_tspeed(self, rx, tx):
"""Callback for TSPEED response, :rfc:`1079`."""
self._extra['tspeed'] = '{0},{1}'.format(rx, tx)
[docs] def on_ttype(self, ttype):
"""Callback for TTYPE response, :rfc:`930`."""
# TTYPE may be requested multiple times, we honor this system and
# attempt to cause the client to cycle, as their first response may
# not be their most significant. All responses held as 'ttype{n}',
# where {n} is their serial response order number.
#
# The most recently received terminal type by the server is
# assumed TERM by this implementation, even when unsolicited.
key = 'ttype{}'.format(self._ttype_count)
self._extra[key] = ttype
if ttype:
self._extra['TERM'] = ttype
_lastval = self.get_extra_info('ttype{0}'.format(
self._ttype_count - 1))
if key != 'ttype1' and ttype == self.get_extra_info('ttype1', None):
# cycle has looped, stop
self.log.debug('ttype cycle stop at {0}: {1}, looped.'
.format(key, ttype))
elif (not ttype or self._ttype_count > self.TTYPE_LOOPMAX):
# empty reply string or too many responses!
self.log.warning('ttype cycle stop at {0}: {1}.'.format(key, ttype))
elif (self._ttype_count == 3 and ttype.upper().startswith('MTTS ')):
val = self.get_extra_info('ttype2')
self.log.debug(
'ttype cycle stop at {0}: {1}, using {2} from ttype2.'
.format(key, ttype, val))
self._extra['TERM'] = val
elif (ttype == _lastval):
self.log.debug('ttype cycle stop at {0}: {1}, repeated.'
.format(key, ttype))
else:
self.log.debug('ttype cycle cont at {0}: {1}.'
.format(key, ttype))
self._ttype_count += 1
self.writer.request_ttype()
[docs] def on_xdisploc(self, xdisploc):
"""Callback for XDISPLOC response, :rfc:`1096`."""
self._extra['xdisploc'] = xdisploc
# private methods
def _check_encoding(self):
# Periodically check for completion of ``waiter_encoding``.
from .telopt import DO, BINARY
if (self.writer.outbinary and not self.writer.inbinary and
not DO + BINARY in self.writer.pending_option):
self.log.debug('BINARY in: direction request.')
self.writer.iac(DO, BINARY)
return False
# are we able to negotiation BINARY bidirectionally?
return self.writer.outbinary and self.writer.inbinary
[docs]@asyncio.coroutine
def create_server(host=None, port=23, protocol_factory=TelnetServer, **kwds):
"""
Create a TCP Telnet server.
:param str host: The host parameter can be a string, in that case the TCP
server is bound to host and port. The host parameter can also be a
sequence of strings, and in that case the TCP server is bound to all
hosts of the sequence.
:param int port: listen port for TCP Server.
:param server_base.BaseServer protocol_factory: An alternate protocol
factory for the server, when unspecified, :class:`TelnetServer` is
used.
:param Callable shell: A :func:`asyncio.coroutine` that is called after
negotiation completes, receiving arguments ``(reader, writer)``.
The reader is a :class:`~.TelnetReader` instance, the writer is
a :class:`~.TelnetWriter` instance.
:param logging.Logger log: target logger, if None is given, one is created
using the namespace ``'telnetlib3.server'``.
:param str encoding: The default assumed encoding, or ``False`` to disable
unicode support. Encoding may be negotiation to another value by
the client through NEW_ENVIRON :rfc:`1572` by sending environment value
of ``LANG``, or by any legal value for CHARSET :rfc:`2066` negotiation.
The server's attached ``reader, writer`` streams accept and return
unicode, unless this value explicitly set ``False``. In that case, the
attached streams interfaces are bytes-only.
:param str encoding_errors: Same meaning as :meth:`codecs.Codec.encode`.
Default value is ``strict``.
:param bool force_binary: When ``True``, the encoding specified is
used for both directions even when BINARY mode, :rfc:`856`, is not
negotiated for the direction specified. This parameter has no effect
when ``encoding=False``.
:param str term: Value returned for ``writer.get_extra_info('term')``
until negotiated by TTYPE :rfc:`930`, or NAWS :rfc:`1572`. Default value
is ``'unknown'``.
:param int cols: Value returned for ``writer.get_extra_info('cols')``
until negotiated by NAWS :rfc:`1572`. Default value is 80 columns.
:param int rows: Value returned for ``writer.get_extra_info('rows')``
until negotiated by NAWS :rfc:`1572`. Default value is 25 rows.
:param int timeout: Causes clients to disconnect if idle for this duration,
in seconds. This ensures resources are freed on busy servers. When
explicitly set to ``False``, clients will not be disconnected for
timeout. Default value is 300 seconds (5 minutes).
:param float connect_maxwait: If the remote end is not complaint, or
otherwise confused by our demands, the shell continues anyway after the
greater of this value has elapsed. A client that is not answering
option negotiation will delay the start of the shell by this amount.
:param int limit: The buffer limit for the reader stream.
:return asyncio.Server: The return value is the same as
:meth:`asyncio.loop.create_server`, An object which can be used
to stop the service.
This function is a :func:`~asyncio.coroutine`.
"""
protocol_factory = protocol_factory or TelnetServer
loop = kwds.get('loop', asyncio.get_event_loop())
return (yield from loop.create_server(
lambda: protocol_factory(**kwds), host, port))
@asyncio.coroutine
def _sigterm_handler(server, log):
log.info('SIGTERM received, closing server.')
# This signals the completion of the server.wait_closed() Future,
# allowing the main() function to complete.
server.close()
def parse_server_args():
parser = argparse.ArgumentParser(
description="Telnet protocol server",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('host', nargs='?', default=CONFIG.host,
help='bind address')
parser.add_argument('port', nargs='?', type=int, default=CONFIG.port,
help='bind port')
parser.add_argument('--loglevel', default=CONFIG.loglevel,
help='level name')
parser.add_argument('--logfile', default=CONFIG.logfile,
help='filepath')
parser.add_argument('--logfmt', default=CONFIG.logfmt,
help='log format')
parser.add_argument('--shell', default=CONFIG.shell,
type=accessories.function_lookup,
help='module.function_name')
parser.add_argument('--encoding', default=CONFIG.encoding,
help='encoding name')
parser.add_argument('--force-binary', action='store_true',
default=CONFIG.force_binary,
help='force binary transmission')
parser.add_argument('--timeout', default=CONFIG.timeout,
help='idle disconnect (0 disables)')
parser.add_argument('--connect-maxwait', type=float,
default=CONFIG.connect_maxwait,
help='timeout for pending negotiation')
return vars(parser.parse_args())
[docs]def run_server(host=CONFIG.host, port=CONFIG.port, loglevel=CONFIG.loglevel,
logfile=CONFIG.logfile, logfmt=CONFIG.logfmt,
shell=CONFIG.shell, encoding=CONFIG.encoding,
force_binary=CONFIG.force_binary, timeout=CONFIG.timeout,
connect_maxwait=CONFIG.connect_maxwait):
"""
Program entry point for server daemon.
This function configures a logger and creates a telnet server for the
given keyword arguments, serving forever, completing only upon receipt of
SIGTERM.
"""
log = accessories.make_logger(
name='telnetlib3.server', loglevel=loglevel,
logfile=logfile, logfmt=logfmt)
# log all function arguments.
_locals = locals()
_cfg_mapping = ', '.join(('{0}={{{0}}}'.format(field)
for field in CONFIG._fields)).format(**_locals)
log.debug('Server configuration: {}'.format(_cfg_mapping))
loop = asyncio.get_event_loop()
# bind
server = loop.run_until_complete(
create_server(host, port, shell=shell, encoding=encoding,
force_binary=force_binary, timeout=timeout,
connect_maxwait=connect_maxwait))
# SIGTERM cases server to gracefully stop
loop.add_signal_handler(signal.SIGTERM, asyncio.ensure_future,
_sigterm_handler(server, log))
log.info('Server ready on {0}:{1}'.format(host, port))
# await completion of server stop
try:
loop.run_until_complete(server.wait_closed())
finally:
# remove signal handler on stop
loop.remove_signal_handler(signal.SIGTERM)
log.info('Server stop.')
def main():
"""Command-line 'telnetlib3-server' entry point, via setuptools."""
return run_server(**parse_server_args())
if __name__ == '__main__':
exit(main())