Source code for telnetlib3.server_shell

import types

CR, LF, NUL = "\r\n\x00"
from . import slc
from . import telopt
from . import accessories

__all__ = ("telnet_server_shell",)


[docs]async def telnet_server_shell(reader, writer): """ A default telnet shell, appropriate for use with telnetlib3.create_server. This shell provides a very simple REPL, allowing introspection and state toggling of the connected client session. """ writer.write("Ready." + CR + LF) linereader = readline(reader, writer) linereader.send(None) command = None while True: if command: writer.write(CR + LF) writer.write("tel:sh> ") command = None while command is None: await writer.drain() inp = await reader.read(1) if not inp: return command = linereader.send(inp) writer.write(CR + LF) if command == "quit": writer.write("Goodbye." + CR + LF) break elif command == "help": writer.write("quit, writer, slc, toggle [option|all], reader, proto, dump") elif command == "writer": writer.write(repr(writer)) elif command == "reader": writer.write(repr(reader)) elif command == "proto": writer.write(repr(writer.protocol)) elif command == "version": writer.write(accessories.get_version()) elif command == "slc": writer.write(get_slcdata(writer)) elif command.startswith("toggle"): option = command[len("toggle ") :] or None writer.write(do_toggle(writer, option)) elif command.startswith("dump"): # dump [kb] [ms_delay] [drain|nodrain] try: kb_limit = int(command.split()[1]) except (ValueError, IndexError): kb_limit = 1000 try: ms_delay = int(command.split()[2]) * 1000 except (ValueError, IndexError): ms_delay = 0 try: drain = command.split()[3] == "drain" except IndexError: drain = False for lineout in character_dump(kb_limit): writer.write(lineout) if ms_delay: await asyncio.sleep(ms_delay) if drain: await writer.drain() elif command: writer.write("no such command.") writer.close()
def character_dump(kb_limit): num_bytes = 0 while (num_bytes) < (kb_limit * 1024): for char in ("/", "\\"): lineout = (char * 80) + "\033[1G" yield lineout num_bytes += len(lineout) yield ("\033[1G" + "wrote " + str(num_bytes) + " bytes") @types.coroutine def readline(reader, writer): """ A very crude readline coroutine interface. """ command, inp, last_inp = "", "", "" inp = yield None while True: if inp in (LF, NUL) and last_inp == CR: last_inp = inp inp = yield None elif inp in (CR, LF): # first CR or LF yields command last_inp = inp inp = yield command command = "" elif inp in ("\b", "\x7f"): # backspace over input if command: command = command[:-1] writer.echo("\b \b") last_inp = inp inp = yield None else: # buffer and echo input command += inp writer.echo(inp) last_inp = inp inp = yield None def get_slcdata(writer): """Display Special Line Editing (SLC) characters.""" _slcs = sorted( [ "{:>15}: {}".format(slc.name_slc_command(slc_func), slc_def) for (slc_func, slc_def) in sorted(writer.slctab.items()) if not (slc_def.nosupport or slc_def.val == slc.theNULL) ] ) _unset = sorted( [ slc.name_slc_command(slc_func) for (slc_func, slc_def) in sorted(writer.slctab.items()) if slc_def.val == slc.theNULL ] ) _nosupport = sorted( [ slc.name_slc_command(slc_func) for (slc_func, slc_def) in sorted(writer.slctab.items()) if slc_def.nosupport ] ) return ( "Special Line Characters:\r\n" + "\r\n".join(_slcs) + "\r\nUnset by client: " + ", ".join(_unset) + "\r\nNot supported by server: " + ", ".join(_nosupport) ) def do_toggle(writer, option): """Display or toggle telnet session parameters.""" tbl_opt = { "echo": writer.local_option.enabled(telopt.ECHO), "goahead": not writer.local_option.enabled(telopt.SGA), "outbinary": writer.outbinary, "inbinary": writer.inbinary, "binary": writer.outbinary and writer.inbinary, "xon-any": writer.xon_any, "lflow": writer.lflow, } if not option: return "\r\n".join( "{0} {1}".format(opt, "ON" if enabled else "off") for opt, enabled in sorted(tbl_opt.items()) ) msgs = [] if option in ("echo", "all"): cmd = telopt.WONT if tbl_opt["echo"] else telopt.WILL writer.iac(cmd, telopt.ECHO) msgs.append("{} echo.".format(telopt.name_command(cmd).lower())) if option in ("goahead", "all"): cmd = telopt.WILL if tbl_opt["goahead"] else telopt.WONT writer.iac(cmd, telopt.SGA) msgs.append("{} suppress go-ahead.".format(telopt.name_command(cmd).lower())) if option in ("outbinary", "binary", "all"): cmd = telopt.WONT if tbl_opt["outbinary"] else telopt.WILL writer.iac(cmd, telopt.BINARY) msgs.append("{} outbinary.".format(telopt.name_command(cmd).lower())) if option in ("inbinary", "binary", "all"): cmd = telopt.DONT if tbl_opt["inbinary"] else telopt.DO writer.iac(cmd, telopt.BINARY) msgs.append("{} inbinary.".format(telopt.name_command(cmd).lower())) if option in ("xon-any", "all"): writer.xon_any = not tbl_opt["xon-any"] writer.send_lineflow_mode() msgs.append("xon-any {}abled.".format("en" if writer.xon_any else "dis")) if option in ("lflow", "all"): writer.lflow = not tbl_opt["lflow"] writer.send_lineflow_mode() msgs.append("lineflow {}abled.".format("en" if writer.lflow else "dis")) if option not in tbl_opt and option != "all": msgs.append("toggle: not an option.") return "\r\n".join(msgs)