Source code for telnetlib3.server_shell

import asyncio

CR, LF, NUL = '\r\n\x00'
from . import slc
from . import telopt
from . import accessories

__all__ = ('telnet_server_shell',)


[docs]@asyncio.coroutine def telnet_server_shell(reader, writer): """ A default telnet shell, appropriate for use with telnetlib3.create_server. This shell provides a very simple REPL, allowing introspection and state toggling of the connected client session. This function is a :func:`~asyncio.coroutine`. """ writer.write("Ready." + CR + LF) linereader = readline(reader, writer) linereader.send(None) command = None while True: if command: writer.write(CR + LF) writer.write('tel:sh> ') command = None while command is None: # TODO: use reader.readline() inp = yield from reader.read(1) if not inp: return command = linereader.send(inp) writer.write(CR + LF) if command == 'quit': writer.write('Goodbye.' + CR + LF) break elif command == 'help': writer.write('quit, writer, slc, toggle [option|all], ' 'reader, proto') elif command == 'writer': writer.write(repr(writer)) elif command == 'reader': writer.write(repr(reader)) elif command == 'proto': writer.write(repr(writer.protocol)) elif command == 'version': writer.write(accessories.get_version()) elif command == 'slc': writer.write(get_slcdata(writer)) elif command.startswith('toggle'): option = command[len('toggle '):] or None writer.write(do_toggle(writer, option)) elif command: writer.write('no such command.') writer.close()
@asyncio.coroutine def readline(reader, writer): """ A very crude readline coroutine interface. This function is a :func:`~asyncio.coroutine`. """ command, inp, last_inp = '', '', '' inp = yield None while True: if inp in (LF, NUL) and last_inp == CR: last_inp = inp inp = yield None elif inp in (CR, LF): # first CR or LF yields command last_inp = inp inp = yield command command = '' elif inp in ('\b', '\x7f'): # backspace over input if command: command = command[:-1] writer.echo('\b \b') last_inp = inp inp = yield None else: # buffer and echo input command += inp writer.echo(inp) last_inp = inp inp = yield None def get_slcdata(writer): """Display Special Line Editing (SLC) characters.""" _slcs = sorted([ '{:>15}: {}'.format(slc.name_slc_command(slc_func), slc_def) for (slc_func, slc_def) in sorted(writer.slctab.items()) if not (slc_def.nosupport or slc_def.val == slc.theNULL)]) _unset = sorted([ slc.name_slc_command(slc_func) for (slc_func, slc_def) in sorted(writer.slctab.items()) if slc_def.val == slc.theNULL]) _nosupport = sorted([ slc.name_slc_command(slc_func) for (slc_func, slc_def) in sorted( writer.slctab.items()) if slc_def.nosupport]) return ('Special Line Characters:\r\n' + '\r\n'.join(_slcs) + '\r\nUnset by client: ' + ', '.join(_unset) + '\r\nNot supported by server: ' + ', '.join(_nosupport)) def do_toggle(writer, option): """Display or toggle telnet session parameters.""" tbl_opt = { 'echo': writer.local_option.enabled(telopt.ECHO), 'goahead': not writer.local_option.enabled(telopt.SGA), 'outbinary': writer.outbinary, 'inbinary': writer.inbinary, 'binary': writer.outbinary and writer.inbinary, 'xon-any': writer.xon_any, 'lflow': writer.lflow, } if not option: return ('\r\n'.join('{0} {1}'.format( opt, 'ON' if enabled else 'off') for opt, enabled in sorted(tbl_opt.items()))) msgs = [] if option in ('echo', 'all'): cmd = (telopt.WONT if tbl_opt['echo'] else telopt.WILL) writer.iac(cmd, telopt.ECHO) msgs.append('{} echo.'.format( telopt.name_command(cmd).lower())) if option in ('goahead', 'all'): cmd = (telopt.WILL if tbl_opt['goahead'] else telopt.WONT) writer.iac(cmd, telopt.SGA) msgs.append('{} suppress go-ahead.'.format( telopt.name_command(cmd).lower())) if option in ('outbinary', 'binary', 'all'): cmd = (telopt.WONT if tbl_opt['outbinary'] else telopt.WILL) writer.iac(cmd, telopt.BINARY) msgs.append('{} outbinary.'.format( telopt.name_command(cmd).lower())) if option in ('inbinary', 'binary', 'all'): cmd = (telopt.DONT if tbl_opt['inbinary'] else telopt.DO) writer.iac(cmd, telopt.BINARY) msgs.append('{} inbinary.'.format( telopt.name_command(cmd).lower())) if option in ('xon-any', 'all'): writer.xon_any = not tbl_opt['xon-any'] writer.send_lineflow_mode() msgs.append('xon-any {}abled.'.format( 'en' if writer.xon_any else 'dis')) if option in ('lflow', 'all'): writer.lflow = not tbl_opt['lflow'] writer.send_lineflow_mode() msgs.append('lineflow {}abled.'.format( 'en' if writer.lflow else 'dis')) if option not in tbl_opt and option != 'all': msgs.append('toggle: not an option.') return '\r\n'.join(msgs)