"""
Special Line Character support for Telnet Linemode Option (:rfc:`1184`).
"""
from .accessories import eightbits, name_unicode
from .telopt import theNULL
__all__ = (
"SLC",
"SLC_AYT",
"NSLC",
"BSD_SLC_TAB",
"generate_slctab",
"Linemode",
"LMODE_MODE_REMOTE",
"SLC_SYNCH",
"SLC_IP",
"SLC_AYT",
"SLC_ABORT",
"SLC_SUSP",
"SLC_EL",
"SLC_RP",
"SLC_XON",
"snoop",
"generate_forwardmask",
"Forwardmask",
"name_slc_command",
"LMODE_FORWARDMASK",
"LMODE_MODE",
"NSLC",
"LMODE_MODE",
"LMODE_SLC",
"SLC",
"SLC_nosupport",
"SLC_DEFAULT",
"SLC_VARIABLE",
"SLC_NOSUPPORT",
"SLC_ACK",
"SLC_CANTCHANGE",
"SLC_LNEXT",
"SLC_EC",
"SLC_EW",
"SLC_EOF",
"SLC_AO",
)
(SLC_NOSUPPORT, SLC_CANTCHANGE, SLC_VARIABLE, SLC_DEFAULT) = (
bytes([const]) for const in range(4)
) # 0, 1, 2, 3
(SLC_FLUSHOUT, SLC_FLUSHIN, SLC_ACK) = (
bytes([2 ** const]) for const in range(5, 8)
) # 32, 64, 128
SLC_LEVELBITS = 0x03
NSLC = 30
(
SLC_SYNCH,
SLC_BRK,
SLC_IP,
SLC_AO,
SLC_AYT,
SLC_EOR,
SLC_ABORT,
SLC_EOF,
SLC_SUSP,
SLC_EC,
SLC_EL,
SLC_EW,
SLC_RP,
SLC_LNEXT,
SLC_XON,
SLC_XOFF,
SLC_FORW1,
SLC_FORW2,
SLC_MCL,
SLC_MCR,
SLC_MCWL,
SLC_MCWR,
SLC_MCBOL,
SLC_MCEOL,
SLC_INSRT,
SLC_OVER,
SLC_ECR,
SLC_EWR,
SLC_EBOL,
SLC_EEOL,
) = (bytes([const]) for const in range(1, NSLC + 1))
(LMODE_MODE, LMODE_FORWARDMASK, LMODE_SLC) = (bytes([const]) for const in range(1, 4))
(LMODE_MODE_REMOTE, LMODE_MODE_LOCAL, LMODE_MODE_TRAPSIG) = (
bytes([const]) for const in range(3)
)
(LMODE_MODE_ACK, LMODE_MODE_SOFT_TAB, LMODE_MODE_LIT_ECHO) = (
bytes([4]),
bytes([8]),
bytes([16]),
)
[docs]class SLC(object):
def __init__(self, mask=SLC_DEFAULT, value=theNULL):
"""
Defines the willingness to support a Special Linemode Character.
Defined by its SLC support level, ``mask`` and default keyboard
ASCII byte ``value`` (may be negotiated by client).
"""
# The default byte mask ``SLC_DEFAULT`` and value ``b'\x00'`` infer
# our willingness to support the option, but with no default value.
# The value must be negotiated by client to activate the callback.
assert type(mask) is bytes and type(value) is bytes, (mask, value)
assert len(mask) == 1 and len(value) == 1, (mask, value)
self.mask = mask
self.val = value
@property
def level(self):
""" Returns SLC level of support. """
return bytes([ord(self.mask) & SLC_LEVELBITS])
@property
def nosupport(self):
""" Returns True if SLC level is SLC_NOSUPPORT. """
return self.level == SLC_NOSUPPORT
@property
def cantchange(self):
""" Returns True if SLC level is SLC_CANTCHANGE. """
return self.level == SLC_CANTCHANGE
@property
def variable(self):
""" Returns True if SLC level is SLC_VARIABLE. """
return self.level == SLC_VARIABLE
@property
def default(self):
""" Returns True if SLC level is SLC_DEFAULT. """
return self.level == SLC_DEFAULT
@property
def ack(self):
""" Returns True if SLC_ACK bit is set. """
return ord(self.mask) & ord(SLC_ACK)
@property
def flushin(self):
""" Returns True if SLC_FLUSHIN bit is set. """
return ord(self.mask) & ord(SLC_FLUSHIN)
@property
def flushout(self):
""" Returns True if SLC_FLUSHIN bit is set. """
return ord(self.mask) & ord(SLC_FLUSHOUT)
[docs] def set_value(self, value):
""" Set SLC keyboard ascii value to ``byte``. """
assert type(value) is bytes and len(value) == 1, value
self.val = value
[docs] def set_mask(self, mask):
""" Set SLC option mask, ``mask``. """
assert type(mask) is bytes and len(mask) == 1
self.mask = mask
[docs] def set_flag(self, flag):
""" Set SLC option flag, ``flag``. """
assert type(flag) is bytes and len(flag) == 1
self.mask = bytes([ord(self.mask) | ord(flag)])
def __str__(self):
""" SLC definition as string '(value, flag(|s))'. """
flags = list()
for flag in (
"nosupport",
"variable",
"default",
"ack",
"flushin",
"flushout",
"cantchange",
):
if getattr(self, flag):
flags.append(flag)
return "({value}, {flags})".format(
value=(
name_unicode(self.val)
if self.val != _POSIX_VDISABLE
else "(DISABLED:\\xff)"
),
flags="|".join(flags),
)
[docs]class SLC_nosupport(SLC):
def __init__(self):
"""
SLC definition inferring our unwillingness to support the option.
"""
SLC.__init__(self, SLC_NOSUPPORT, _POSIX_VDISABLE)
#: SLC value may be changed, flushes input and output
_SLC_VARIABLE_FIO = bytes([ord(SLC_VARIABLE) | ord(SLC_FLUSHIN) | ord(SLC_FLUSHOUT)])
#: SLC value may be changed, flushes input
_SLC_VARIABLE_FI = bytes([ord(SLC_VARIABLE) | ord(SLC_FLUSHIN)])
#: SLC value may be changed, flushes output
_SLC_VARIABLE_FO = bytes([ord(SLC_VARIABLE) | ord(SLC_FLUSHOUT)])
#: SLC function for this value is not supported
_POSIX_VDISABLE = b"\xff"
#: This SLC tab when sent to a BSD client warrants no reply; their
# tabs match exactly. These values are found in ttydefaults.h of
# termios family of functions.
BSD_SLC_TAB = {
SLC_FORW1: SLC_nosupport(), # unsupported; causes all buffered
SLC_FORW2: SLC_nosupport(), # characters to be sent immediately,
SLC_EOF: SLC(SLC_VARIABLE, b"\x04"), # ^D VEOF
SLC_EC: SLC(SLC_VARIABLE, b"\x7f"), # BS VERASE
SLC_EL: SLC(SLC_VARIABLE, b"\x15"), # ^U VKILL
SLC_IP: SLC(_SLC_VARIABLE_FIO, b"\x03"), # ^C VINTR
SLC_ABORT: SLC(_SLC_VARIABLE_FIO, b"\x1c"), # ^\ VQUIT
SLC_XON: SLC(SLC_VARIABLE, b"\x11"), # ^Q VSTART
SLC_XOFF: SLC(SLC_VARIABLE, b"\x13"), # ^S VSTOP
SLC_EW: SLC(SLC_VARIABLE, b"\x17"), # ^W VWERASE
SLC_RP: SLC(SLC_VARIABLE, b"\x12"), # ^R VREPRINT
SLC_LNEXT: SLC(SLC_VARIABLE, b"\x16"), # ^V VLNEXT
SLC_AO: SLC(_SLC_VARIABLE_FO, b"\x0f"), # ^O VDISCARD
SLC_SUSP: SLC(_SLC_VARIABLE_FI, b"\x1a"), # ^Z VSUSP
SLC_AYT: SLC(SLC_VARIABLE, b"\x14"), # ^T VSTATUS
# no default value for break, sync, end-of-record,
SLC_BRK: SLC(),
SLC_SYNCH: SLC(),
SLC_EOR: SLC(),
}
[docs]def generate_slctab(tabset=BSD_SLC_TAB):
"""Returns full 'SLC Tab' for definitions found using ``tabset``.
Functions not listed in ``tabset`` are set as SLC_NOSUPPORT.
"""
# ``slctab`` is a dictionary of SLC functions, such as SLC_IP,
# to a tuple of the handling character and support level.
_slctab = {}
for slc in [bytes([const]) for const in range(1, NSLC + 1)]:
_slctab[slc] = tabset.get(slc, SLC_nosupport())
return _slctab
[docs]def generate_forwardmask(binary_mode, tabset, ack=False):
"""
Generate a :class:`~.Forwardmask` instance.
Generate a 32-byte (``binary_mode`` is True) or 16-byte (False) Forwardmask
instance appropriate for the specified ``slctab``. A Forwardmask is formed
by a bitmask of all 256 possible 8-bit keyboard ascii input, or, when not
'outbinary', a 16-byte 7-bit representation of each value, and whether or
not they should be "forwarded" by the client on the transport stream
"""
num_bytes, msb = (32, 256) if binary_mode else (16, 127)
mask32 = [theNULL] * num_bytes
for mask in range(msb // 8):
start = mask * 8
last = start + 7
byte = theNULL
for char in range(start, last + 1):
(func, slc_name, slc_def) = snoop(bytes([char]), tabset, dict())
if func is not None and not slc_def.nosupport:
# set bit for this character, it is a supported slc char
byte = bytes([ord(byte) | 1])
if char != last:
# shift byte left for next character,
# except for the final byte.
byte = bytes([ord(byte) << 1])
mask32[mask] = byte
return Forwardmask(b"".join(mask32), ack)
[docs]def snoop(byte, slctab, slc_callbacks):
"""Scan ``slctab`` for matching ``byte`` values.
Returns (callback, func_byte, slc_definition) on match.
Otherwise, (None, None, None). If no callback is assigned,
the value of callback is always None.
"""
for slc_func, slc_def in slctab.items():
if byte == slc_def.val and slc_def.val != theNULL:
return (slc_callbacks.get(slc_func, None), slc_func, slc_def)
return (None, None, None)
[docs]class Linemode(object):
""" """
def __init__(self, mask=b"\x00"):
"""A mask of ``LMODE_MODE_LOCAL`` means that all line editing is
performed on the client side (default). A mask of theNULL (\x00)
indicates that editing is performed on the remote side.
Valid bit flags of mask are: ``LMODE_MODE_TRAPSIG``,
``LMODE_MODE_ACK``, ``LMODE_MODE_SOFT_TAB``, and
``LMODE_MODE_LIT_ECHO``.
"""
assert type(mask) is bytes and len(mask) == 1, (repr(mask), mask)
self.mask = mask
def __eq__(self, other):
"""Compare by another Linemode (LMODE_MODE_ACK ignored)."""
# the inverse OR(|) of acknowledge bit UNSET in comparator,
# would be the AND OR(& ~) to compare modes without acknowledge
# bit set.
return (ord(self.mask) | ord(LMODE_MODE_ACK)) == (
ord(other.mask) | ord(LMODE_MODE_ACK)
)
@property
def local(self):
""" True if linemode is local. """
return bool(ord(self.mask) & ord(LMODE_MODE_LOCAL))
@property
def remote(self):
""" True if linemode is remote. """
return not self.local
@property
def trapsig(self):
""" True if signals are trapped by client. """
return bool(ord(self.mask) & ord(LMODE_MODE_TRAPSIG))
@property
def ack(self):
""" Returns True if mode has been acknowledged. """
return bool(ord(self.mask) & ord(LMODE_MODE_ACK))
@property
def soft_tab(self):
""" Returns True if client will expand horizontal tab (\x09). """
return bool(ord(self.mask) & ord(LMODE_MODE_SOFT_TAB))
@property
def lit_echo(self):
""" Returns True if non-printable characters are displayed as-is. """
return bool(ord(self.mask) & ord(LMODE_MODE_LIT_ECHO))
def __str__(self):
""" Returns string representation of line mode, for debugging """
return "remote" if self.remote else "local"
def __repr__(self):
return "<{0!r}: {1}>".format(
self.mask,
", ".join(
[
"{0}:{1}".format(prop, getattr(self, prop))
for prop in (
"lit_echo",
"soft_tab",
"ack",
"trapsig",
"remote",
"local",
)
]
),
)
[docs]class Forwardmask(object):
""" """
def __init__(self, value, ack=False):
"""
Forwardmask object using the bytemask value received by server.
:param bytes value: bytemask ``value`` received by server after ``IAC SB
LINEMODE DO FORWARDMASK``. It must be a bytearray of length 16 or 32.
"""
assert isinstance(value, (bytes, bytearray)), value
assert len(value) in (16, 32), len(value)
self.value = value
self.ack = ack
[docs] def description_table(self):
"""
Returns list of strings describing obj as a tabular ASCII map.
"""
result = []
MRK_CONT = "(...)"
continuing = lambda: len(result) and result[-1] == MRK_CONT
is_last = lambda mask: mask == len(self.value) - 1
same_as_last = lambda row: (
len(result) and result[-1].endswith(row.split()[-1])
)
for mask, byte in enumerate(self.value):
if byte == 0:
if continuing() and not is_last(mask):
continue
row = "[%2d] %s" % (
mask,
eightbits(0),
)
if not same_as_last(row) or is_last(mask):
result.append(row)
else:
result.append(MRK_CONT)
else:
start = mask * 8
last = start + 7
characters = ", ".join(
[
name_unicode(chr(char))
for char in range(start, last + 1)
if char in self
]
)
result.append(
"[%2d] %s %s"
% (
mask,
eightbits(byte),
characters,
)
)
return result
def __str__(self):
"""Returns single string of binary 0 and 1 describing obj."""
return "0b%s" % (
"".join(
[
value
for (prefix, value) in [
eightbits(byte).split("b") for byte in self.value
]
]
),
)
def __contains__(self, number):
"""Whether forwardmask contains keycode ``number``."""
mask, flag = number // 8, 2 ** (7 - (number % 8))
return bool(self.value[mask] & flag)
#: List of globals that may match an slc function byte
_DEBUG_SLC_OPTS = dict(
[
(value, key)
for key, value in locals().items()
if key
in (
"SLC_SYNCH",
"SLC_BRK",
"SLC_IP",
"SLC_AO",
"SLC_AYT",
"SLC_EOR",
"SLC_ABORT",
"SLC_EOF",
"SLC_SUSP",
"SLC_EC",
"SLC_EL",
"SLC_EW",
"SLC_RP",
"SLC_LNEXT",
"SLC_XON",
"SLC_XOFF",
"SLC_FORW1",
"SLC_FORW2",
"SLC_MCL",
"SLC_MCR",
"SLC_MCWL",
"SLC_MCWR",
"SLC_MCBOL",
"SLC_MCEOL",
"SLC_INSRT",
"SLC_OVER",
"SLC_ECR",
"SLC_EWR",
"SLC_EBOL",
"SLC_EEOL",
)
]
)
[docs]def name_slc_command(byte):
""" Given an SLC ``byte``, return global mnemonic as string. """
return repr(byte) if byte not in _DEBUG_SLC_OPTS else _DEBUG_SLC_OPTS[byte]