Source code for telnetlib3.stream_reader

"""Module provides class TelnetReader and TelnetReaderUnicode."""
# std imports
import codecs
import asyncio
import logging

__all__ = ('TelnetReader', 'TelnetReaderUnicode', )


[docs]class TelnetReader(asyncio.StreamReader): """A reader interface for the telnet protocol.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._connection_closed = False self.log = logging.getLogger(__name__) @property def connection_closed(self): return self._connection_closed def _cancel_task(self): if self._waiter is None: return try: # cancel the ongoing task when connection is closed # raises asyncio.CancelledError self._waiter.cancel() except asyncio.CancelledError: self.log.debug('Connection closed, _waiter cancelled.') def close(self): self._connection_closed = True self._cancel_task()
[docs] @asyncio.coroutine def readline(self): r""" Read one line. Where "line" is a sequence of characters ending with CR LF, LF, or CR NUL. This readline function is a strict interpretation of Telnet Protocol :rfc:`854`. The sequence "CR LF" must be treated as a single "new line" character and used whenever their combined action is intended; The sequence "CR NUL" must be used where a carriage return alone is actually desired; and the CR character must be avoided in other contexts. And therefor, a line does not yield for a stream containing a CR if it is not succeeded by NUL or LF. ================= ===================== Given stream readline() yields ================= ===================== ``--\r\x00---`` ``--\r``, ``---`` *...* ``--\r\n---`` ``--\r\n``, ``---`` *...* ``--\n---`` ``--\n``, ``---`` *...* ``--\r---`` ``--\r``, ``---`` *...* ================= ===================== If EOF is received before the termination of a line, the method will yield the partially read string. This method is a :func:`~asyncio.coroutine`. """ if self._exception is not None: raise self._exception line = bytearray() not_enough = True while not_enough: while self._buffer and not_enough: search = [ (self._buffer.find(b'\r\n'), b'\r\n'), (self._buffer.find(b'\r\x00'), b'\r\x00'), (self._buffer.find(b'\r'), b'\r'), (self._buffer.find(b'\n'), b'\n'), ] # sort by (position, length * -1), so that the # smallest sorted value is the longest-match, # preferring '\r\n' over '\r', for example. matches = [(_pos, len(_kind) * -1, _kind) for _pos, _kind in search if _pos != -1] if not matches: line.extend(self._buffer) self._buffer.clear() continue # position is nearest match, pos, _, kind = min(matches) if kind == b'\r\x00': # trim out '\x00' begin, end = pos + 1, pos + 2 elif kind == b'\r\n': begin = end = pos + 2 else: # '\r' or '\n' begin = end = pos + 1 line.extend(self._buffer[:begin]) del self._buffer[:end] not_enough = False if self._eof: break if not_enough: yield from self._wait_for_data('readline') self._maybe_resume_transport() buf = bytes(line) return buf
def __repr__(self): """Description of stream encoding state.""" return ('<TelnetReader encoding=False limit={self._limit} buflen={0} ' 'eof={self._eof}>'.format(len(self._buffer), self=self))
[docs]class TelnetReaderUnicode(TelnetReader): #: Late-binding instance of :class:`codecs.IncrementalDecoder`, some #: bytes may be lost if the protocol's encoding is changed after #: previously receiving a partial multibyte. This isn't common in #: practice, however. _decoder = None def __init__(self, fn_encoding, *, limit=asyncio.streams._DEFAULT_LIMIT, loop=None, encoding_errors='replace'): """ A Unicode StreamReader interface for Telnet protocol. :param Callable fn_encoding: function callback, receiving boolean keyword argument, ``incoming=True``, which is used by the callback to determine what encoding should be used to decode the value in the direction specified. """ loop = loop or asyncio.get_event_loop() super().__init__(limit=limit, loop=loop) assert callable(fn_encoding), fn_encoding self.fn_encoding = fn_encoding self.encoding_errors = encoding_errors
[docs] def decode(self, buf, final=False): """Decode bytes ``buf`` using preferred encoding.""" if buf == b'': return '' # EOF encoding = self.fn_encoding(incoming=True) # late-binding, if (self._decoder is None or encoding != self._decoder._encoding): self._decoder = codecs.getincrementaldecoder(encoding)( errors=self.encoding_errors) self._decoder._encoding = encoding return self._decoder.decode(buf, final)
[docs] @asyncio.coroutine def readline(self): """ Read one line. See ancestor method, :func:`~TelnetReader.readline` for details. This method is a :func:`~asyncio.coroutine`. """ buf = yield from super().readline() return self.decode(buf)
[docs] @asyncio.coroutine def read(self, n=-1): """ Read up to *n* bytes. If the EOF was received and the internal buffer is empty, return an empty string. :param int n: If *n* is not provided, or set to -1, read until EOF and return all characters as one large string. :rtype: str This method is a :func:`~asyncio.coroutine`. """ if self._exception is not None: raise self._exception if not n: return u'' if n < 0: # This used to just loop creating a new waiter hoping to # collect everything in self._buffer, but that would # deadlock if the subprocess sends more than self.limit # bytes. So just call self.read(self._limit) until EOF. blocks = [] while True: block = yield from self.read(self._limit) if not block: # eof break blocks.append(block) return u''.join(blocks) else: if not self._buffer and not self._eof: yield from self._wait_for_data('read') buf = self.decode(bytes(self._buffer)) if n < 0 or len(buf) <= n: u_data = buf self._buffer.clear() else: u_data = u'' while n > len(u_data): u_data += self.decode(bytes([self._buffer.pop(0)])) self._maybe_resume_transport() return u_data
[docs] @asyncio.coroutine def readexactly(self, n): """ Read exactly *n* unicode characters. :raises asyncio.IncompleteReadError: if the end of the stream is reached before *n* can be read. the :attr:`asyncio.IncompleteReadError.partial` attribute of the exception contains the partial read characters. :rtype: str This method is a :func:`~asyncio.coroutine`. """ if self._exception is not None: raise self._exception blocks = [] while n > 0: block = yield from self.read(n) if not block: partial = u''.join(blocks) raise asyncio.IncompleteReadError(partial, len(partial) + n) blocks.append(block) n -= len(block) return u''.join(blocks)
def __repr__(self): """Description of stream encoding state.""" if callable(self.fn_encoding): encoding = self.fn_encoding(incoming=True) return ('<TelnetReaderUnicode encoding={0!r} limit={self._limit} ' 'buflen={1} eof={self._eof}>'.format( encoding, len(self._buffer), self=self))