"""Accessory functions."""
from __future__ import annotations
# std imports
import shlex
import asyncio
import logging
import importlib
from typing import TYPE_CHECKING, Any, Dict, Union, Mapping, Callable, Optional
#: Custom TRACE log level, below DEBUG (10).
TRACE = 5
logging.addLevelName(TRACE, "TRACE")
if TYPE_CHECKING: # pragma: no cover
from .stream_reader import TelnetReader, TelnetReaderUnicode
__all__ = (
"TRACE",
"encoding_from_lang",
"name_unicode",
"eightbits",
"hexdump",
"make_logger",
"repr_mapping",
"function_lookup",
"make_reader_task",
)
PATIENCE_MESSAGES = [
"Contemplate the virtue of patience",
"Endure delays with fortitude",
"To wait calmly requires discipline",
"Suspend expectations of imminence",
"The tide hastens for no man",
"Cultivate a stoic calmness",
"The tranquil mind eschews impatience",
"Deliberation is preferable to haste",
]
def get_version() -> str:
"""Return the current version of telnetlib3."""
return "4.0.3" # keep in sync with pyproject.toml !
[docs]
def encoding_from_lang(lang: str) -> Optional[str]:
"""
Parse encoding from LANG environment value.
Returns the encoding portion if present, or None if the LANG value
does not contain an encoding suffix (no '.' separator).
:param lang: LANG environment value (e.g., 'en_US.UTF-8@misc')
:returns: Encoding string (e.g., 'UTF-8') or None if no encoding found.
Example::
>>> encoding_from_lang('en_US.UTF-8@misc')
'UTF-8'
>>> encoding_from_lang('en_IL')
None
"""
if "." not in lang:
return None
_, encoding = lang.split(".", 1)
if "@" in encoding:
encoding, _ = encoding.split("@", 1)
return encoding
[docs]
def name_unicode(ucs: str) -> str:
"""Return 7-bit ascii printable of any string."""
# more or less the same as curses.ascii.unctrl -- but curses
# module is conditionally excluded from many python distributions!
bits = ord(ucs)
if 32 <= bits <= 126:
# ascii printable as one cell, as-is
rep = chr(bits)
elif bits == 127:
rep = "^?"
elif bits < 32:
rep = "^" + chr(((bits & 0x7F) | 0x20) + 0x20)
else:
rep = rf"\x{bits:02x}"
return rep
[docs]
def eightbits(number: int) -> str:
"""
Binary representation of ``number`` padded to 8 bits.
Example::
>>> eightbits(ord('a'))
'0b01100001'
"""
# useful only so far in context of a forwardmask or any bitmask.
_, value = bin(number).split("b")
return f"0b{int(value):08d}"
[docs]
def hexdump(data: bytes, prefix: str = "") -> str:
"""
Format *data* as ``hexdump -C`` style output.
Each 16-byte row shows the offset, hex bytes grouped 8+8,
and printable ASCII on the right::
00000000 48 65 6c 6c 6f 20 57 6f 72 6c 64 0d 0a |Hello World..|
:param data: Raw bytes to format.
:param prefix: String prepended to every line (e.g. ``">> "``).
:rtype: str
"""
lines: list[str] = []
for offset in range(0, len(data), 16):
chunk = data[offset : offset + 16]
hex_left = " ".join(f"{b:02x}" for b in chunk[:8])
hex_right = " ".join(f"{b:02x}" for b in chunk[8:])
ascii_part = "".join(chr(b) if 0x20 <= b < 0x7F else "." for b in chunk)
lines.append(f"{prefix}{offset:08x} {hex_left:<23s} {hex_right:<23s} |{ascii_part}|")
return "\n".join(lines)
_DEFAULT_LOGFMT = " ".join(("%(levelname)s", "%(filename)s:%(lineno)d", "%(message)s"))
[docs]
def make_logger(
name: str,
loglevel: str = "info",
logfile: Optional[str] = None,
logfmt: str = _DEFAULT_LOGFMT,
filemode: str = "a",
) -> logging.Logger:
"""Create and return simple logger for given arguments."""
lvl = getattr(logging, loglevel.upper(), None)
if lvl is None:
lvl = logging.getLevelName(loglevel.upper())
_cfg: Dict[str, Any] = {"format": logfmt}
if logfile:
_cfg["filename"] = logfile
_cfg["filemode"] = filemode
logging.basicConfig(**_cfg)
for handler in logging.getLogger().handlers:
if isinstance(handler, logging.StreamHandler) and not isinstance(
handler, logging.FileHandler
):
handler.terminator = "\r\n"
logging.getLogger().setLevel(lvl)
logging.getLogger(name).setLevel(lvl)
return logging.getLogger(name)
[docs]
def repr_mapping(mapping: Mapping[str, Any]) -> str:
"""Return printable string, 'key=value [key=value ...]' for mapping."""
return " ".join(f"{key}={shlex.quote(str(value))}" for key, value in mapping.items())
[docs]
def function_lookup(pymod_path: str) -> Callable[..., Any]:
"""Return callable function target from standard module.function path."""
module_name, func_name = pymod_path.rsplit(".", 1)
module = importlib.import_module(module_name)
shell_function: Callable[..., Any] = getattr(module, func_name)
return shell_function
[docs]
def make_reader_task(
reader: "Union[TelnetReader, TelnetReaderUnicode, asyncio.StreamReader]", size: int = 2**12
) -> "asyncio.Task[Any]":
"""Return asyncio task wrapping coroutine of reader.read(size)."""
return asyncio.ensure_future(reader.read(size))