Source code for telnetlib3.slc

"""
Special Line Character support for Telnet Linemode Option (:rfc:`1184`).
"""
from .accessories import eightbits, name_unicode
from .telopt import theNULL

__all__ = ('SLC', 'SLC_AYT', 'NSLC', 'BSD_SLC_TAB', 'generate_slctab',
           'Linemode', 'LMODE_MODE_REMOTE', 'SLC_SYNCH', 'SLC_IP', 'SLC_AYT',
           'SLC_ABORT', 'SLC_SUSP', 'SLC_EL', 'SLC_RP', 'SLC_XON', 'snoop',
           'generate_forwardmask', 'Forwardmask', 'name_slc_command',
           'LMODE_FORWARDMASK', 'LMODE_MODE', 'NSLC', 'LMODE_MODE',
           'LMODE_SLC', 'SLC', 'SLC_nosupport', 'SLC_DEFAULT', 'SLC_VARIABLE',
           'SLC_NOSUPPORT', 'SLC_ACK', 'SLC_CANTCHANGE', 'SLC_LNEXT', 'SLC_EC',
           'SLC_EW', 'SLC_EOF', 'SLC_AO',)

(SLC_NOSUPPORT, SLC_CANTCHANGE, SLC_VARIABLE, SLC_DEFAULT) = (
    bytes([const]) for const in range(4)) # 0, 1, 2, 3
(SLC_FLUSHOUT, SLC_FLUSHIN, SLC_ACK) = (
    bytes([2**const]) for const in range(5, 8))  # 32, 64, 128

SLC_LEVELBITS = 0x03
NSLC = 30
(SLC_SYNCH, SLC_BRK, SLC_IP, SLC_AO, SLC_AYT, SLC_EOR, SLC_ABORT, SLC_EOF,
    SLC_SUSP, SLC_EC, SLC_EL, SLC_EW, SLC_RP, SLC_LNEXT, SLC_XON, SLC_XOFF,
    SLC_FORW1, SLC_FORW2, SLC_MCL, SLC_MCR, SLC_MCWL, SLC_MCWR, SLC_MCBOL,
    SLC_MCEOL, SLC_INSRT, SLC_OVER, SLC_ECR, SLC_EWR, SLC_EBOL, SLC_EEOL
 ) = (bytes([const]) for const in range(1, NSLC + 1))

(LMODE_MODE, LMODE_FORWARDMASK, LMODE_SLC) = (
    bytes([const]) for const in range(1, 4))
(LMODE_MODE_REMOTE, LMODE_MODE_LOCAL, LMODE_MODE_TRAPSIG) = (
    bytes([const]) for const in range(3))
(LMODE_MODE_ACK, LMODE_MODE_SOFT_TAB, LMODE_MODE_LIT_ECHO) = (
    bytes([4]), bytes([8]), bytes([16]))


class SLC(object):
    def __init__(self, mask=SLC_DEFAULT, value=theNULL):
        """
        Defines the willingness to support a Special Linemode Character.

        Defined by its SLC support level, ``mask`` and default keyboard
        ASCII byte ``value`` (may be negotiated by client).
        """
        #   The default byte mask ``SLC_DEFAULT`` and value ``b'\x00'`` infer
        #   our willingness to support the option, but with no default value.
        #   The value must be negotiated by client to activate the callback.
        assert type(mask) is bytes and type(value) is bytes, (mask, value)
        assert len(mask) == 1 and len(value) == 1, (mask, value)
        self.mask = mask
        self.val = value

    @property
    def level(self):
        """ Returns SLC level of support.  """
        return bytes([ord(self.mask) & SLC_LEVELBITS])

    @property
    def nosupport(self):
        """ Returns True if SLC level is SLC_NOSUPPORT. """
        return self.level == SLC_NOSUPPORT

    @property
    def cantchange(self):
        """ Returns True if SLC level is SLC_CANTCHANGE. """
        return self.level == SLC_CANTCHANGE

    @property
    def variable(self):
        """ Returns True if SLC level is SLC_VARIABLE. """
        return self.level == SLC_VARIABLE

    @property
    def default(self):
        """ Returns True if SLC level is SLC_DEFAULT. """
        return self.level == SLC_DEFAULT

    @property
    def ack(self):
        """ Returns True if SLC_ACK bit is set. """
        return ord(self.mask) & ord(SLC_ACK)

    @property
    def flushin(self):
        """ Returns True if SLC_FLUSHIN bit is set. """
        return ord(self.mask) & ord(SLC_FLUSHIN)

    @property
    def flushout(self):
        """ Returns True if SLC_FLUSHIN bit is set.  """
        return ord(self.mask) & ord(SLC_FLUSHOUT)

    def set_value(self, value):
        """ Set SLC keyboard ascii value to ``byte``.  """
        assert type(value) is bytes and len(value) == 1, value
        self.val = value

    def set_mask(self, mask):
        """ Set SLC option mask, ``mask``.  """
        assert type(mask) is bytes and len(mask) == 1
        self.mask = mask

    def set_flag(self, flag):
        """ Set SLC option flag, ``flag``.  """
        assert type(flag) is bytes and len(flag) == 1
        self.mask = bytes([ord(self.mask) | ord(flag)])

    def __str__(self):
        """ SLC definition as string '(value, flag(|s))'. """
        flags = list()
        for flag in ('nosupport', 'variable', 'default', 'ack',
                     'flushin', 'flushout', 'cantchange', ):
            if getattr(self, flag):
                flags.append(flag)
        return '({value}, {flags})'.format(
            value=(name_unicode(self.val)
                   if self.val != _POSIX_VDISABLE
                   else '(DISABLED:\\xff)'),
            flags='|'.join(flags))


class SLC_nosupport(SLC):
    def __init__(self):
        """
        SLC definition inferring our unwillingness to support the option.
        """
        SLC.__init__(self, SLC_NOSUPPORT, _POSIX_VDISABLE)

#: SLC value may be changed, flushes input and output
_SLC_VARIABLE_FIO = bytes(
    [ord(SLC_VARIABLE) | ord(SLC_FLUSHIN) | ord(SLC_FLUSHOUT)])
#: SLC value may be changed, flushes input
_SLC_VARIABLE_FI = bytes(
    [ord(SLC_VARIABLE) | ord(SLC_FLUSHIN)])
#: SLC value may be changed, flushes output
_SLC_VARIABLE_FO = bytes(
    [ord(SLC_VARIABLE) | ord(SLC_FLUSHOUT)])
#: SLC function for this value is not supported
_POSIX_VDISABLE = b'\xff'

#: This SLC tab when sent to a BSD client warrants no reply; their
#  tabs match exactly. These values are found in ttydefaults.h of
#  termios family of functions.
BSD_SLC_TAB = {
    SLC_FORW1: SLC_nosupport(),  # unsupported; causes all buffered
    SLC_FORW2: SLC_nosupport(),  # characters to be sent immediately,
    SLC_EOF: SLC(SLC_VARIABLE,        b'\x04'),  # ^D VEOF
    SLC_EC: SLC(SLC_VARIABLE,         b'\x7f'),  # BS VERASE
    SLC_EL: SLC(SLC_VARIABLE,         b'\x15'),  # ^U VKILL
    SLC_IP: SLC(_SLC_VARIABLE_FIO,    b'\x03'),  # ^C VINTR
    SLC_ABORT: SLC(_SLC_VARIABLE_FIO, b'\x1c'),  # ^\ VQUIT
    SLC_XON: SLC(SLC_VARIABLE,        b'\x11'),  # ^Q VSTART
    SLC_XOFF: SLC(SLC_VARIABLE,       b'\x13'),  # ^S VSTOP
    SLC_EW: SLC(SLC_VARIABLE,         b'\x17'),  # ^W VWERASE
    SLC_RP: SLC(SLC_VARIABLE,         b'\x12'),  # ^R VREPRINT
    SLC_LNEXT: SLC(SLC_VARIABLE,      b'\x16'),  # ^V VLNEXT
    SLC_AO: SLC(_SLC_VARIABLE_FO,     b'\x0f'),  # ^O VDISCARD
    SLC_SUSP: SLC(_SLC_VARIABLE_FI,   b'\x1a'),  # ^Z VSUSP
    SLC_AYT: SLC(SLC_VARIABLE,        b'\x14'),  # ^T VSTATUS
    # no default value for break, sync, end-of-record,
    SLC_BRK: SLC(), SLC_SYNCH: SLC(), SLC_EOR: SLC(),
}


[docs]def generate_slctab(tabset=BSD_SLC_TAB): """ Returns full 'SLC Tab' for definitions found using ``tabset``. Functions not listed in ``tabset`` are set as SLC_NOSUPPORT. """ # ``slctab`` is a dictionary of SLC functions, such as SLC_IP, # to a tuple of the handling character and support level. _slctab = {} for slc in [bytes([const]) for const in range(1, NSLC + 1)]: _slctab[slc] = tabset.get(slc, SLC_nosupport()) return _slctab
[docs]def generate_forwardmask(binary_mode, tabset, ack=False): """ Generate a :class:`~.Forwardmask` instance. Generate a 32-byte (``binary_mode`` is True) or 16-byte (False) Forwardmask instance appropriate for the specified ``slctab``. A Forwardmask is formed by a bitmask of all 256 possible 8-bit keyboard ascii input, or, when not 'outbinary', a 16-byte 7-bit representation of each value, and whether or not they should be "forwarded" by the client on the transport stream """ num_bytes, msb = (32, 256) if binary_mode else (16, 127) mask32 = [theNULL] * num_bytes for mask in range(msb // 8): start = mask * 8 last = start + 7 byte = theNULL for char in range(start, last + 1): (func, slc_name, slc_def) = snoop(bytes([char]), tabset, dict()) if func is not None and not slc_def.nosupport: # set bit for this character, it is a supported slc char byte = bytes([ord(byte) | 1]) if char != last: # shift byte left for next character, # except for the final byte. byte = bytes([ord(byte) << 1]) mask32[mask] = byte return Forwardmask(b''.join(mask32), ack)
[docs]def snoop(byte, slctab, slc_callbacks): """ Scan ``slctab`` for matching ``byte`` values. Returns (callback, func_byte, slc_definition) on match. Otherwise, (None, None, None). If no callback is assigned, the value of callback is always None. """ for slc_func, slc_def in slctab.items(): if byte == slc_def.val and slc_def.val != theNULL: return (slc_callbacks.get(slc_func, None), slc_func, slc_def) return (None, None, None)
[docs]class Linemode(object): """ """ def __init__(self, mask=b'\x00'): """ A mask of ``LMODE_MODE_LOCAL`` means that all line editing is performed on the client side (default). A mask of theNULL (\x00) indicates that editing is performed on the remote side. Valid bit flags of mask are: ``LMODE_MODE_TRAPSIG``, ``LMODE_MODE_ACK``, ``LMODE_MODE_SOFT_TAB``, and ``LMODE_MODE_LIT_ECHO``. """ assert type(mask) is bytes and len(mask) == 1, (repr(mask), mask) self.mask = mask def __eq__(self, other): """Compare by another Linemode (LMODE_MODE_ACK ignored).""" # the inverse OR(|) of acknowledge bit UNSET in comparator, # would be the AND OR(& ~) to compare modes without acknowledge # bit set. return ( (ord(self.mask) | ord(LMODE_MODE_ACK)) == (ord(other.mask) | ord(LMODE_MODE_ACK)) ) @property def local(self): """ True if linemode is local. """ return bool(ord(self.mask) & ord(LMODE_MODE_LOCAL)) @property def remote(self): """ True if linemode is remote. """ return not self.local @property def trapsig(self): """ True if signals are trapped by client. """ return bool(ord(self.mask) & ord(LMODE_MODE_TRAPSIG)) @property def ack(self): """ Returns True if mode has been acknowledged. """ return bool(ord(self.mask) & ord(LMODE_MODE_ACK)) @property def soft_tab(self): """ Returns True if client will expand horizontal tab (\x09). """ return bool(ord(self.mask) & ord(LMODE_MODE_SOFT_TAB)) @property def lit_echo(self): """ Returns True if non-printable characters are displayed as-is. """ return bool(ord(self.mask) & ord(LMODE_MODE_LIT_ECHO)) def __str__(self): """ Returns string representation of line mode, for debugging """ return 'remote' if self.remote else 'local' def __repr__(self): return '<{0!r}: {1}>'.format( self.mask, ', '.join([ '{0}:{1}'.format(prop, getattr(self, prop)) for prop in ('lit_echo', 'soft_tab', 'ack', 'trapsig', 'remote', 'local')]) )
[docs]class Forwardmask(object): """ """ def __init__(self, value, ack=False): """ Forwardmask object using the bytemask value received by server. :param bytes value: bytemask ``value`` received by server after ``IAC SB LINEMODE DO FORWARDMASK``. It must be a bytearray of length 16 or 32. """ assert isinstance(value, (bytes, bytearray)), value assert len(value) in (16, 32), len(value) self.value = value self.ack = ack
[docs] def description_table(self): """ Returns list of strings describing obj as a tabular ASCII map. """ result = [] MRK_CONT = '(...)' continuing = lambda: len(result) and result[-1] == MRK_CONT is_last = lambda mask: mask == len(self.value) - 1 same_as_last = lambda row: ( len(result) and result[-1].endswith(row.split()[-1])) for mask, byte in enumerate(self.value): if byte == 0: if continuing() and not is_last(mask): continue row = '[%2d] %s' % (mask, eightbits(0),) if not same_as_last(row) or is_last(mask): result.append(row) else: result.append(MRK_CONT) else: start = mask * 8 last = start + 7 characters = ', '.join([name_unicode(chr(char)) for char in range(start, last + 1) if char in self]) result.append('[%2d] %s %s' % ( mask, eightbits(byte), characters,)) return result
def __str__(self): """Returns single string of binary 0 and 1 describing obj.""" return '0b%s' % (''.join([value for (prefix, value) in [ eightbits(byte).split('b') for byte in self.value]]),) def __contains__(self, number): """Whether forwardmask contains keycode ``number``.""" mask, flag = number // 8, 2 ** (7 - (number % 8)) return bool(self.value[mask] & flag)
#: List of globals that may match an slc function byte _DEBUG_SLC_OPTS = dict([(value, key) for key, value in locals().items() if key in ('SLC_SYNCH', 'SLC_BRK', 'SLC_IP', 'SLC_AO', 'SLC_AYT', 'SLC_EOR', 'SLC_ABORT', 'SLC_EOF', 'SLC_SUSP', 'SLC_EC', 'SLC_EL', 'SLC_EW', 'SLC_RP', 'SLC_LNEXT', 'SLC_XON', 'SLC_XOFF', 'SLC_FORW1', 'SLC_FORW2', 'SLC_MCL', 'SLC_MCR', 'SLC_MCWL', 'SLC_MCWR', 'SLC_MCBOL', 'SLC_MCEOL', 'SLC_INSRT', 'SLC_OVER', 'SLC_ECR', 'SLC_EWR', 'SLC_EBOL', 'SLC_EEOL',)])
[docs]def name_slc_command(byte): """ Given an SLC ``byte``, return global mnemonic as string. """ return (repr(byte) if byte not in _DEBUG_SLC_OPTS else _DEBUG_SLC_OPTS[byte])