Source code for telnetlib3.slc

"""Special Line Character support for Telnet Linemode Option (:rfc:`1184`)."""

from __future__ import annotations

# std imports
from typing import Any, Dict, List, Tuple, Union, Callable, Optional

# local
from .telopt import theNULL
from .accessories import eightbits, name_unicode

__all__ = (
    "BSD_SLC_TAB",
    "Forwardmask",
    "generate_forwardmask",
    "generate_slctab",
    "Linemode",
    "LMODE_FORWARDMASK",
    "LMODE_MODE",
    "LMODE_MODE_EDIT",
    "LMODE_MODE_REMOTE",
    "LMODE_SLC",
    "name_slc_command",
    "NSLC",
    "SLC",
    "SLC_ABORT",
    "SLC_ACK",
    "SLC_AO",
    "SLC_AYT",
    "SLC_CANTCHANGE",
    "SLC_DEFAULT",
    "SLC_EC",
    "SLC_EL",
    "SLC_EOF",
    "SLC_EW",
    "SLC_IP",
    "SLC_LNEXT",
    "SLC_nosupport",
    "SLC_NOSUPPORT",
    "SLC_RP",
    "SLC_SUSP",
    "SLC_SYNCH",
    "SLC_VARIABLE",
    "SLC_XON",
    "snoop",
    "theNULL",
)

SLC_NOSUPPORT, SLC_CANTCHANGE, SLC_VARIABLE, SLC_DEFAULT = (
    bytes([const]) for const in range(4)
)  # 0, 1, 2, 3
SLC_FLUSHOUT, SLC_FLUSHIN, SLC_ACK = (bytes([2**const]) for const in range(5, 8))  # 32, 64, 128

SLC_LEVELBITS = 0x03
NSLC = 30
(
    SLC_SYNCH,
    SLC_BRK,
    SLC_IP,
    SLC_AO,
    SLC_AYT,
    SLC_EOR,
    SLC_ABORT,
    SLC_EOF,
    SLC_SUSP,
    SLC_EC,
    SLC_EL,
    SLC_EW,
    SLC_RP,
    SLC_LNEXT,
    SLC_XON,
    SLC_XOFF,
    SLC_FORW1,
    SLC_FORW2,
    SLC_MCL,
    SLC_MCR,
    SLC_MCWL,
    SLC_MCWR,
    SLC_MCBOL,
    SLC_MCEOL,
    SLC_INSRT,
    SLC_OVER,
    SLC_ECR,
    SLC_EWR,
    SLC_EBOL,
    SLC_EEOL,
) = (bytes([const]) for const in range(1, NSLC + 1))

LMODE_MODE, LMODE_FORWARDMASK, LMODE_SLC = (bytes([const]) for const in range(1, 4))
LMODE_MODE_REMOTE, LMODE_MODE_LOCAL, LMODE_MODE_TRAPSIG = (bytes([const]) for const in range(3))
LMODE_MODE_ACK, LMODE_MODE_SOFT_TAB, LMODE_MODE_LIT_ECHO = (bytes([4]), bytes([8]), bytes([16]))

#: RFC 1184's name for LMODE_MODE_LOCAL (EDIT bit)
LMODE_MODE_EDIT = LMODE_MODE_LOCAL


[docs] class SLC: """Defines the willingness to support a Special Linemode Character.""" def __init__(self, mask: bytes = SLC_DEFAULT, value: bytes = theNULL) -> None: """ Initialize SLC with the given mask and value. Defined by its SLC support level, ``mask`` and default keyboard ASCII byte ``value`` (may be negotiated by client). """ # The default byte mask ``SLC_DEFAULT`` and value ``b'\x00'`` infer # our willingness to support the option, but with no default value. # The value must be negotiated by client to activate the callback. self.mask = mask self.val = value @property def level(self) -> bytes: """Returns SLC level of support.""" return bytes([ord(self.mask) & SLC_LEVELBITS]) @property def nosupport(self) -> bool: """Returns True if SLC level is SLC_NOSUPPORT.""" return self.level == SLC_NOSUPPORT @property def cantchange(self) -> bool: """Returns True if SLC level is SLC_CANTCHANGE.""" return self.level == SLC_CANTCHANGE @property def variable(self) -> bool: """Returns True if SLC level is SLC_VARIABLE.""" return self.level == SLC_VARIABLE @property def default(self) -> bool: """Returns True if SLC level is SLC_DEFAULT.""" return self.level == SLC_DEFAULT @property def ack(self) -> int: """Returns True if SLC_ACK bit is set.""" return ord(self.mask) & ord(SLC_ACK) @property def flushin(self) -> int: """Returns True if SLC_FLUSHIN bit is set.""" return ord(self.mask) & ord(SLC_FLUSHIN) @property def flushout(self) -> int: """Returns True if SLC_FLUSHOUT bit is set.""" return ord(self.mask) & ord(SLC_FLUSHOUT)
[docs] def set_value(self, value: bytes) -> None: """Set SLC keyboard ascii value to ``byte``.""" self.val = value
[docs] def set_mask(self, mask: bytes) -> None: """Set SLC option mask, ``mask``.""" self.mask = mask
[docs] def set_flag(self, flag: bytes) -> None: """Set SLC option flag, ``flag``.""" self.mask = bytes([ord(self.mask) | ord(flag)])
def __str__(self) -> str: """SLC definition as string '(value, flag(|s))'.""" flags = [] for flag in ( "nosupport", "variable", "default", "ack", "flushin", "flushout", "cantchange", ): if getattr(self, flag): flags.append(flag) value_str = ( name_unicode(chr(self.val[0])) if self.val != _POSIX_VDISABLE else "(DISABLED:\\xff)" ) return f"({value_str}, {'|'.join(flags)})"
[docs] class SLC_nosupport(SLC): """SLC definition inferring our unwillingness to support the option.""" def __init__(self) -> None: """Initialize SLC_nosupport with NOSUPPORT level and disabled value.""" SLC.__init__(self, SLC_NOSUPPORT, _POSIX_VDISABLE)
#: SLC value may be changed, flushes input and output _SLC_VARIABLE_FIO = bytes([ord(SLC_VARIABLE) | ord(SLC_FLUSHIN) | ord(SLC_FLUSHOUT)]) #: SLC value may be changed, flushes input _SLC_VARIABLE_FI = bytes([ord(SLC_VARIABLE) | ord(SLC_FLUSHIN)]) #: SLC value may be changed, flushes output _SLC_VARIABLE_FO = bytes([ord(SLC_VARIABLE) | ord(SLC_FLUSHOUT)]) #: SLC function for this value is not supported _POSIX_VDISABLE = b"\xff" #: This SLC tab when sent to a BSD client warrants no reply; their # tabs match exactly. These values are found in ttydefaults.h of # termios family of functions. BSD_SLC_TAB = { SLC_FORW1: SLC_nosupport(), # unsupported; causes all buffered SLC_FORW2: SLC_nosupport(), # characters to be sent immediately, SLC_EOF: SLC(SLC_VARIABLE, b"\x04"), # ^D VEOF SLC_EC: SLC(SLC_VARIABLE, b"\x7f"), # BS VERASE SLC_EL: SLC(SLC_VARIABLE, b"\x15"), # ^U VKILL SLC_IP: SLC(_SLC_VARIABLE_FIO, b"\x03"), # ^C VINTR SLC_ABORT: SLC(_SLC_VARIABLE_FIO, b"\x1c"), # ^\ VQUIT SLC_XON: SLC(SLC_VARIABLE, b"\x11"), # ^Q VSTART SLC_XOFF: SLC(SLC_VARIABLE, b"\x13"), # ^S VSTOP SLC_EW: SLC(SLC_VARIABLE, b"\x17"), # ^W VWERASE SLC_RP: SLC(SLC_VARIABLE, b"\x12"), # ^R VREPRINT SLC_LNEXT: SLC(SLC_VARIABLE, b"\x16"), # ^V VLNEXT SLC_AO: SLC(_SLC_VARIABLE_FO, b"\x0f"), # ^O VDISCARD SLC_SUSP: SLC(_SLC_VARIABLE_FI, b"\x1a"), # ^Z VSUSP SLC_AYT: SLC(SLC_VARIABLE, b"\x14"), # ^T VSTATUS # no default value for break, sync, end-of-record, SLC_BRK: SLC(), SLC_SYNCH: SLC(), SLC_EOR: SLC(), }
[docs] def generate_slctab(tabset: Optional[Dict[bytes, SLC]] = None) -> Dict[bytes, SLC]: """ Returns full 'SLC Tab' for definitions found using ``tabset``. Functions not listed in ``tabset`` are set as SLC_NOSUPPORT. """ if tabset is None: tabset = BSD_SLC_TAB # ``slctab`` is a dictionary of SLC functions, such as SLC_IP, # to a tuple of the handling character and support level. _slctab = {} for slc in [bytes([const]) for const in range(1, NSLC + 1)]: _slctab[slc] = tabset.get(slc, SLC_nosupport()) return _slctab
[docs] def generate_forwardmask( binary_mode: bool, tabset: Dict[bytes, SLC], ack: bool = False ) -> "Forwardmask": """ Generate a Forwardmask instance. Generate a 32-byte (``binary_mode`` is True) or 16-byte (False) Forwardmask instance appropriate for the specified ``slctab``. A Forwardmask is formed by a bitmask of all 256 possible 8-bit keyboard ascii input, or, when not 'outbinary', a 16-byte 7-bit representation of each value, and whether they should be "forwarded" by the client on the transport stream """ num_bytes, msb = (32, 256) if binary_mode else (16, 127) mask32 = [theNULL] * num_bytes for mask in range(msb // 8): start = mask * 8 last = start + 7 byte = theNULL for char in range(start, last + 1): func, _, slc_def = snoop(bytes([char]), tabset, {}) if func is not None and slc_def is not None and not slc_def.nosupport: # set bit for this character, it is a supported slc char byte = bytes([ord(byte) | 1]) if char != last: # shift byte left for next character, # except for the final byte. byte = bytes([ord(byte) << 1]) mask32[mask] = byte return Forwardmask(b"".join(mask32), ack)
[docs] def snoop( byte: bytes, slctab: Dict[bytes, SLC], slc_callbacks: Dict[bytes, Callable[..., Any]] ) -> Tuple[Optional[Callable[..., Any]], Optional[bytes], Optional[SLC]]: """ Scan ``slctab`` for matching ``byte`` values. Returns (callback, func_byte, slc_definition) on match. Otherwise, (None, None, None). If no callback is assigned, the value of callback is always None. """ for slc_func, slc_def in slctab.items(): if byte == slc_def.val and slc_def.val != theNULL: return (slc_callbacks.get(slc_func, None), slc_func, slc_def) return (None, None, None)
[docs] class Linemode: r""" Represents the LINEMODE negotiation state. A mask of ``LMODE_MODE_LOCAL`` means that all line editing is performed on the client side (default). A mask of theNULL (``\x00``) indicates that editing is performed on the remote side. """ def __init__(self, mask: bytes = b"\x00") -> None: """ Initialize Linemode with the given mask. Valid bit flags of mask are: ``LMODE_MODE_TRAPSIG``, ``LMODE_MODE_ACK``, ``LMODE_MODE_SOFT_TAB``, and ``LMODE_MODE_LIT_ECHO``. """ self.mask = mask def __eq__(self, other: object) -> bool: """Compare by another Linemode (LMODE_MODE_ACK ignored).""" # the inverse OR(|) of acknowledge bit UNSET in comparator, # would be the AND OR(& ~) to compare modes without acknowledge # bit set. if not isinstance(other, Linemode): return NotImplemented return (ord(self.mask) | ord(LMODE_MODE_ACK)) == (ord(other.mask) | ord(LMODE_MODE_ACK)) @property def local(self) -> bool: """True if linemode is local.""" return bool(ord(self.mask) & ord(LMODE_MODE_LOCAL)) @property def edit(self) -> bool: """ True if EDIT bit set (client performs local line editing). RFC 1184 name. """ return self.local @property def remote(self) -> bool: """True if linemode is remote.""" return not self.local @property def trapsig(self) -> bool: """True if signals are trapped by client.""" return bool(ord(self.mask) & ord(LMODE_MODE_TRAPSIG)) @property def ack(self) -> bool: """Returns True if mode has been acknowledged.""" return bool(ord(self.mask) & ord(LMODE_MODE_ACK)) @property def soft_tab(self) -> bool: r"""Returns True if client will expand horizontal tab (``\x09``).""" return bool(ord(self.mask) & ord(LMODE_MODE_SOFT_TAB)) @property def lit_echo(self) -> bool: """Returns True if non-printable characters are displayed as-is.""" return bool(ord(self.mask) & ord(LMODE_MODE_LIT_ECHO)) def __str__(self) -> str: """Returns string representation of line mode, for debugging.""" return "remote" if self.remote else "local" def __repr__(self) -> str: props = ", ".join( f"{prop}:{getattr(self, prop)}" for prop in ("lit_echo", "soft_tab", "ack", "trapsig", "remote", "local") ) return f"<{self.mask!r}: {props}>"
[docs] class Forwardmask: """Forwardmask object using the bytemask value received by server.""" def __init__(self, value: Union[bytes, bytearray], ack: bool = False) -> None: """ Initialize Forwardmask with the given value. :param value: Bytemask ``value`` received by server after ``IAC SB LINEMODE DO FORWARDMASK``. It must be a bytearray of length 16 or 32. """ self.value = value self.ack = ack
[docs] def description_table(self) -> List[str]: """Returns list of strings describing obj as a tabular ASCII map.""" result: List[str] = [] mrk_cont = "(...)" def continuing() -> bool: return bool(result and result[-1] == mrk_cont) def is_last(mask: int) -> bool: return mask == len(self.value) - 1 def same_as_last(row: str) -> bool: return bool(result and result[-1].endswith(row.split()[-1])) for mask, byte in enumerate(self.value): if byte == 0: if continuing() and not is_last(mask): continue row = f"[{mask:2d}] {eightbits(0)}" if not same_as_last(row) or is_last(mask): result.append(row) else: result.append(mrk_cont) else: start = mask * 8 last = start + 7 characters = ", ".join( [name_unicode(chr(char)) for char in range(start, last + 1) if char in self] ) result.append(f"[{mask:2d}] {eightbits(byte)} {characters}") return result
def __str__(self) -> str: """Returns single string of binary 0 and 1 describing obj.""" bits = "".join(value for (_, value) in [eightbits(byte).split("b") for byte in self.value]) return f"0b{bits}" def __contains__(self, number: int) -> bool: """Whether forwardmask contains keycode ``number``.""" mask, flag = number // 8, 2 ** (7 - (number % 8)) return bool(self.value[mask] & flag)
#: List of globals that may match an slc function byte _DEBUG_SLC_OPTS = { value: key for key, value in locals().items() if key in ( "SLC_SYNCH", "SLC_BRK", "SLC_IP", "SLC_AO", "SLC_AYT", "SLC_EOR", "SLC_ABORT", "SLC_EOF", "SLC_SUSP", "SLC_EC", "SLC_EL", "SLC_EW", "SLC_RP", "SLC_LNEXT", "SLC_XON", "SLC_XOFF", "SLC_FORW1", "SLC_FORW2", "SLC_MCL", "SLC_MCR", "SLC_MCWL", "SLC_MCWR", "SLC_MCBOL", "SLC_MCEOL", "SLC_INSRT", "SLC_OVER", "SLC_ECR", "SLC_EWR", "SLC_EBOL", "SLC_EEOL", ) }
[docs] def name_slc_command(byte: bytes) -> str: """Given an SLC ``byte``, return global mnemonic as string.""" return repr(byte) if byte not in _DEBUG_SLC_OPTS else _DEBUG_SLC_OPTS[byte]