Source code for telnetlib3.stream_reader

"""Module provides class TelnetReader and TelnetReaderUnicode."""
# std imports
import codecs
import asyncio
import logging
import warnings
import asyncio

from asyncio import events
from asyncio import format_helpers

__all__ = (
    "TelnetReader",
    "TelnetReaderUnicode",
)

_DEFAULT_LIMIT = 2 ** 16  # 64 KiB


[docs]class TelnetReader: """ This is a copy of :class:`asyncio.StreamReader`, with a little care for telnet-like readline(), and something about _waiter which I don't really """ _source_traceback = None def __init__(self, limit=_DEFAULT_LIMIT): self.log = logging.getLogger(__name__) # The line length limit is a security feature; # it also doubles as half the buffer limit. if limit <= 0: raise ValueError("Limit cannot be <= 0") self._limit = limit self._loop = asyncio.get_event_loop_policy().get_event_loop() self._buffer = bytearray() self._eof = False # Whether we're done. self._waiter = None # A future used by _wait_for_data() self._exception = None self._transport = None self._paused = False if self._loop.get_debug(): self._source_traceback = format_helpers.extract_stack(sys._getframe(1)) def __repr__(self): """Description of stream encoding state.""" info = [type(self).__name__] if self._buffer: info.append(f"{len(self._buffer)} bytes") if self._eof: info.append("eof") if self._limit != _DEFAULT_LIMIT: info.append(f"limit={self._limit}") if self._waiter: info.append(f"waiter={self._waiter!r}") if self._exception: info.append(f"exception={self._exception!r}") if self._transport: info.append(f"transport={self._transport!r}") if self._paused: info.append("paused") info.append("encoding=False") return "<{}>".format(" ".join(info))
[docs] def exception(self): return self._exception
[docs] def set_exception(self, exc): self._exception = exc waiter = self._waiter if waiter is not None: self._waiter = None if not waiter.cancelled(): waiter.set_exception(exc)
def _wakeup_waiter(self): """Wakeup read*() functions waiting for data or EOF.""" waiter = self._waiter if waiter is not None: self._waiter = None if not waiter.cancelled(): waiter.set_result(None)
[docs] def set_transport(self, transport): assert self._transport is None, "Transport already set" self._transport = transport
def _maybe_resume_transport(self): if self._paused and len(self._buffer) <= self._limit: self._paused = False self._transport.resume_reading()
[docs] def feed_eof(self): self._eof = True self._wakeup_waiter()
[docs] def at_eof(self): """Return True if the buffer is empty and 'feed_eof' was called.""" return self._eof and not self._buffer
[docs] def feed_data(self, data): assert not self._eof, "feed_data after feed_eof" if not data: return self._buffer.extend(data) self._wakeup_waiter() if ( self._transport is not None and not self._paused and len(self._buffer) > 2 * self._limit ): try: self._transport.pause_reading() except NotImplementedError: # The transport can't be paused. # We'll just have to buffer all data. # Forget the transport so we don't keep trying. self._transport = None else: self._paused = True
async def _wait_for_data(self, func_name): """Wait until feed_data() or feed_eof() is called. If stream was paused, automatically resume it. """ # StreamReader uses a future to link the protocol feed_data() method # to a read coroutine. Running two read coroutines at the same time # would have an unexpected behaviour. It would not possible to know # which coroutine would get the next data. if self._waiter is not None: raise RuntimeError( f"{func_name}() called while another coroutine is " f"already waiting for incoming data" ) assert not self._eof, "_wait_for_data after EOF" # Waiting for data while paused will make deadlock, so prevent it. # This is essential for readexactly(n) for case when n > self._limit. if self._paused: self._paused = False self._transport.resume_reading() self._waiter = self._loop.create_future() try: await self._waiter finally: self._waiter = None async def readline(self): """Read chunk of data from the stream until newline (b'\n') is found. On success, return chunk that ends with newline. If only partial line can be read due to EOF, return incomplete line without terminating newline. When EOF was reached while no bytes read, empty bytes object is returned. If limit is reached, ValueError will be raised. In that case, if newline was found, complete line including newline will be removed from internal buffer. Else, internal buffer will be cleared. Limit is compared against part of the line without newline. If stream was paused, this function will automatically resume it if needed. """ sep = b"\n" seplen = len(sep) try: line = await self.readuntil(sep) except asyncio.IncompleteReadError as e: return e.partial except asyncio.LimitOverrunError as e: if self._buffer.startswith(sep, e.consumed): del self._buffer[: e.consumed + seplen] else: self._buffer.clear() self._maybe_resume_transport() raise ValueError(e.args[0]) return line
[docs] async def readuntil(self, separator=b"\n"): """Read data from the stream until ``separator`` is found. On success, the data and separator will be removed from the internal buffer (consumed). Returned data will include the separator at the end. Configured stream limit is used to check result. Limit sets the maximal length of data that can be returned, not counting the separator. If an EOF occurs and the complete separator is still not found, an IncompleteReadError exception will be raised, and the internal buffer will be reset. The IncompleteReadError.partial attribute may contain the separator partially. If the data cannot be read because of over limit, a LimitOverrunError exception will be raised, and the data will be left in the internal buffer, so it can be read again. """ seplen = len(separator) if seplen == 0: raise ValueError("Separator should be at least one-byte string") if self._exception is not None: raise self._exception # Consume whole buffer except last bytes, which length is # one less than seplen. Let's check corner cases with # separator='SEPARATOR': # * we have received almost complete separator (without last # byte). i.e buffer='some textSEPARATO'. In this case we # can safely consume len(separator) - 1 bytes. # * last byte of buffer is first byte of separator, i.e. # buffer='abcdefghijklmnopqrS'. We may safely consume # everything except that last byte, but this require to # analyze bytes of buffer that match partial separator. # This is slow and/or require FSM. For this case our # implementation is not optimal, since require rescanning # of data that is known to not belong to separator. In # real world, separator will not be so long to notice # performance problems. Even when reading MIME-encoded # messages :) # `offset` is the number of bytes from the beginning of the buffer # where there is no occurrence of `separator`. offset = 0 # Loop until we find `separator` in the buffer, exceed the buffer size, # or an EOF has happened. while True: buflen = len(self._buffer) # Check if we now have enough data in the buffer for `separator` to # fit. if buflen - offset >= seplen: isep = self._buffer.find(separator, offset) if isep != -1: # `separator` is in the buffer. `isep` will be used later # to retrieve the data. break # see upper comment for explanation. offset = buflen + 1 - seplen if offset > self._limit: raise asyncio.LimitOverrunError( "Separator is not found, and chunk exceed the limit", offset ) # Complete message (with full separator) may be present in buffer # even when EOF flag is set. This may happen when the last chunk # adds data which makes separator be found. That's why we check for # EOF *ater* inspecting the buffer. if self._eof: chunk = bytes(self._buffer) self._buffer.clear() raise asyncio.IncompleteReadError(chunk, None) # _wait_for_data() will resume reading if stream was paused. await self._wait_for_data("readuntil") if isep > self._limit: raise asyncio.LimitOverrunError( "Separator is found, but chunk is longer than limit", isep ) chunk = self._buffer[: isep + seplen] del self._buffer[: isep + seplen] self._maybe_resume_transport() return bytes(chunk)
[docs] async def read(self, n=-1): """Read up to `n` bytes from the stream. If n is not provided, or set to -1, read until EOF and return all read bytes. If the EOF was received and the internal buffer is empty, return an empty bytes object. If n is zero, return empty bytes object immediately. If n is positive, this function try to read `n` bytes, and may return less or equal bytes than requested, but at least one byte. If EOF was received before any byte is read, this function returns empty byte object. Returned value is not limited with limit, configured at stream creation. If stream was paused, this function will automatically resume it if needed. """ if self._exception is not None: raise self._exception if n == 0: return b"" if n < 0: # This used to just loop creating a new waiter hoping to # collect everything in self._buffer, but that would # deadlock if the subprocess sends more than self.limit # bytes. So just call self.read(self._limit) until EOF. blocks = [] while True: block = await self.read(self._limit) if not block: break blocks.append(block) return b"".join(blocks) if not self._buffer and not self._eof: await self._wait_for_data("read") # This will work right even if buffer is less than n bytes data = bytes(self._buffer[:n]) del self._buffer[:n] self._maybe_resume_transport() return data
[docs] async def readexactly(self, n): """Read exactly `n` bytes. Raise an IncompleteReadError if EOF is reached before `n` bytes can be read. The IncompleteReadError.partial attribute of the exception will contain the partial read bytes. if n is zero, return empty bytes object. Returned value is not limited with limit, configured at stream creation. If stream was paused, this function will automatically resume it if needed. """ if n < 0: raise ValueError("readexactly size can not be less than zero") if self._exception is not None: raise self._exception if n == 0: return b"" while len(self._buffer) < n: if self._eof: incomplete = bytes(self._buffer) self._buffer.clear() raise asyncio.IncompleteReadError(incomplete, n) await self._wait_for_data("readexactly") if len(self._buffer) == n: data = bytes(self._buffer) self._buffer.clear() else: data = bytes(self._buffer[:n]) del self._buffer[:n] self._maybe_resume_transport() return data
def __aiter__(self): return self async def __anext__(self): val = await self.readline() if val == b"": raise StopAsyncIteration return val # these next two are deprecated in 2.0.1, feed_eof should just be called, # instead of the commit 260dd63a that introduced a close() method on a # reader. @property def connection_closed(self): warnings.warn( "connection_closed property removed, use at_eof() instead", DeprecationWarning, ) return self._eof
[docs] def close(self): warnings.warn( "connection_closed deprecated, use feed_eof() instead", DeprecationWarning ) self.feed_eof()
[docs] async def readline(self): r""" Read one line. Where "line" is a sequence of characters ending with CR LF, LF, or CR NUL. This readline function is a strict interpretation of Telnet Protocol :rfc:`854`. The sequence "CR LF" must be treated as a single "new line" character and used whenever their combined action is intended; The sequence "CR NUL" must be used where a carriage return alone is actually desired; and the CR character must be avoided in other contexts. And therefor, a line does not yield for a stream containing a CR if it is not succeeded by NUL or LF. ================= ===================== Given stream readline() yields ================= ===================== ``--\r\x00---`` ``--\r``, ``---`` *...* ``--\r\n---`` ``--\r\n``, ``---`` *...* ``--\n---`` ``--\n``, ``---`` *...* ``--\r---`` ``--\r``, ``---`` *...* ================= ===================== If EOF is received before the termination of a line, the method will yield the partially read string. """ if self._exception is not None: raise self._exception line = bytearray() not_enough = True while not_enough: while self._buffer and not_enough: search_results_pos_kind = ( (self._buffer.find(b"\r\n"), b"\r\n"), (self._buffer.find(b"\r\x00"), b"\r\x00"), (self._buffer.find(b"\r"), b"\r"), (self._buffer.find(b"\n"), b"\n"), ) # sort by (position, length * -1), so that the # smallest sorted value is the longest-match, # preferring '\r\n' over '\r', for example. matches = [ (_pos, len(_kind) * -1, _kind) for _pos, _kind in search_results_pos_kind if _pos != -1 ] if not matches: line.extend(self._buffer) self._buffer.clear() continue # position is nearest match, pos, _, kind = min(matches) if kind == b"\r\x00": # trim out '\x00' begin, end = pos + 1, pos + 2 elif kind == b"\r\n": begin = end = pos + 2 else: # '\r' or '\n' begin = end = pos + 1 line.extend(self._buffer[:begin]) del self._buffer[:end] not_enough = False if self._eof: break if not_enough: await self._wait_for_data("readline") self._maybe_resume_transport() buf = bytes(line) return buf
[docs]class TelnetReaderUnicode(TelnetReader): #: Late-binding instance of :class:`codecs.IncrementalDecoder`, some #: bytes may be lost if the protocol's encoding is changed after #: previously receiving a partial multibyte. This isn't common in #: practice, however. _decoder = None def __init__(self, fn_encoding, *, limit=_DEFAULT_LIMIT, encoding_errors="replace"): """ A Unicode StreamReader interface for Telnet protocol. :param Callable fn_encoding: function callback, receiving boolean keyword argument, ``incoming=True``, which is used by the callback to determine what encoding should be used to decode the value in the direction specified. """ super().__init__(limit=limit) assert callable(fn_encoding), fn_encoding self.fn_encoding = fn_encoding self.encoding_errors = encoding_errors
[docs] def decode(self, buf, final=False): """Decode bytes ``buf`` using preferred encoding.""" if buf == b"": return "" # EOF encoding = self.fn_encoding(incoming=True) # late-binding, if self._decoder is None or encoding != self._decoder._encoding: self._decoder = codecs.getincrementaldecoder(encoding)( errors=self.encoding_errors ) self._decoder._encoding = encoding return self._decoder.decode(buf, final)
[docs] async def readline(self): """ Read one line. See ancestor method, :func:`~TelnetReader.readline` for details. """ buf = await super().readline() return self.decode(buf)
[docs] async def read(self, n=-1): """ Read up to *n* bytes. If the EOF was received and the internal buffer is empty, return an empty string. :param int n: If *n* is not provided, or set to -1, read until EOF and return all characters as one large string. :rtype: str """ if self._exception is not None: raise self._exception if not n: return "" if n < 0: # This used to just loop creating a new waiter hoping to # collect everything in self._buffer, but that would # deadlock if the subprocess sends more than self.limit # bytes. So just call self.read(self._limit) until EOF. blocks = [] while True: block = await self.read(self._limit) if not block: # eof break blocks.append(block) return "".join(blocks) else: if not self._buffer and not self._eof: await self._wait_for_data("read") buf = self.decode(bytes(self._buffer)) if n < 0 or len(buf) <= n: u_data = buf self._buffer.clear() else: u_data = "" while n > len(u_data): u_data += self.decode(bytes([self._buffer.pop(0)])) self._maybe_resume_transport() return u_data
[docs] async def readexactly(self, n): """ Read exactly *n* unicode characters. :raises asyncio.IncompleteReadError: if the end of the stream is reached before *n* can be read. the :attr:`asyncio.IncompleteReadError.partial` attribute of the exception contains the partial read characters. :rtype: str """ if self._exception is not None: raise self._exception blocks = [] while n > 0: block = await self.read(n) if not block: partial = "".join(blocks) raise asyncio.IncompleteReadError(partial, len(partial) + n) blocks.append(block) n -= len(block) return "".join(blocks)
def __repr__(self): """Description of stream encoding state.""" encoding = None if callable(self.fn_encoding): encoding = self.fn_encoding(incoming=True) return ( "<TelnetReaderUnicode encoding={encoding!r} limit={self._limit!r} " "buflen={buflen} eof={self._eof}>".format( encoding=encoding, buflen=len(self._buffer), self=self ) )