| import array |
| import threading |
| import time |
| import unittest |
| |
| import io # C implementation of io |
| import _pyio as pyio # Python implementation of io |
| |
| |
| try: |
| import ctypes |
| except ImportError: |
| def byteslike(*pos, **kw): |
| return array.array("b", bytes(*pos, **kw)) |
| else: |
| class EmptyStruct(ctypes.Structure): |
| pass |
| |
| def byteslike(*pos, **kw): |
| """Create a bytes-like object having no string or sequence methods""" |
| data = bytes(*pos, **kw) |
| obj = EmptyStruct() |
| ctypes.resize(obj, len(data)) |
| memoryview(obj).cast("B")[:] = data |
| return obj |
| |
| |
| class MockRawIOWithoutRead: |
| """A RawIO implementation without read(), so as to exercise the default |
| RawIO.read() which calls readinto().""" |
| |
| def __init__(self, read_stack=()): |
| self._read_stack = list(read_stack) |
| self._write_stack = [] |
| self._reads = 0 |
| self._extraneous_reads = 0 |
| |
| def write(self, b): |
| self._write_stack.append(bytes(b)) |
| return len(b) |
| |
| def writable(self): |
| return True |
| |
| def fileno(self): |
| return 42 |
| |
| def readable(self): |
| return True |
| |
| def seekable(self): |
| return True |
| |
| def seek(self, pos, whence): |
| return 0 # wrong but we gotta return something |
| |
| def tell(self): |
| return 0 # same comment as above |
| |
| def readinto(self, buf): |
| self._reads += 1 |
| max_len = len(buf) |
| try: |
| data = self._read_stack[0] |
| except IndexError: |
| self._extraneous_reads += 1 |
| return 0 |
| if data is None: |
| del self._read_stack[0] |
| return None |
| n = len(data) |
| if len(data) <= max_len: |
| del self._read_stack[0] |
| buf[:n] = data |
| return n |
| else: |
| buf[:] = data[:max_len] |
| self._read_stack[0] = data[max_len:] |
| return max_len |
| |
| def truncate(self, pos=None): |
| return pos |
| |
| class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase): |
| pass |
| |
| class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase): |
| pass |
| |
| |
| class MockRawIO(MockRawIOWithoutRead): |
| |
| def read(self, n=None): |
| self._reads += 1 |
| try: |
| return self._read_stack.pop(0) |
| except: |
| self._extraneous_reads += 1 |
| return b"" |
| |
| class CMockRawIO(MockRawIO, io.RawIOBase): |
| pass |
| |
| class PyMockRawIO(MockRawIO, pyio.RawIOBase): |
| pass |
| |
| |
| class MisbehavedRawIO(MockRawIO): |
| def write(self, b): |
| return super().write(b) * 2 |
| |
| def read(self, n=None): |
| return super().read(n) * 2 |
| |
| def seek(self, pos, whence): |
| return -123 |
| |
| def tell(self): |
| return -456 |
| |
| def readinto(self, buf): |
| super().readinto(buf) |
| return len(buf) * 5 |
| |
| class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase): |
| pass |
| |
| class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase): |
| pass |
| |
| |
| class SlowFlushRawIO(MockRawIO): |
| def __init__(self): |
| super().__init__() |
| self.in_flush = threading.Event() |
| |
| def flush(self): |
| self.in_flush.set() |
| time.sleep(0.25) |
| |
| class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase): |
| pass |
| |
| class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase): |
| pass |
| |
| |
| class CloseFailureIO(MockRawIO): |
| closed = 0 |
| |
| def close(self): |
| if not self.closed: |
| self.closed = 1 |
| raise OSError |
| |
| class CCloseFailureIO(CloseFailureIO, io.RawIOBase): |
| pass |
| |
| class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase): |
| pass |
| |
| |
| class MockFileIO: |
| |
| def __init__(self, data): |
| self.read_history = [] |
| super().__init__(data) |
| |
| def read(self, n=None): |
| res = super().read(n) |
| self.read_history.append(None if res is None else len(res)) |
| return res |
| |
| def readinto(self, b): |
| res = super().readinto(b) |
| self.read_history.append(res) |
| return res |
| |
| class CMockFileIO(MockFileIO, io.BytesIO): |
| pass |
| |
| class PyMockFileIO(MockFileIO, pyio.BytesIO): |
| pass |
| |
| |
| class MockUnseekableIO: |
| def seekable(self): |
| return False |
| |
| def seek(self, *args): |
| raise self.UnsupportedOperation("not seekable") |
| |
| def tell(self, *args): |
| raise self.UnsupportedOperation("not seekable") |
| |
| def truncate(self, *args): |
| raise self.UnsupportedOperation("not seekable") |
| |
| class CMockUnseekableIO(MockUnseekableIO, io.BytesIO): |
| UnsupportedOperation = io.UnsupportedOperation |
| |
| class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO): |
| UnsupportedOperation = pyio.UnsupportedOperation |
| |
| |
| class MockCharPseudoDevFileIO(MockFileIO): |
| # GH-95782 |
| # ftruncate() does not work on these special files (and CPython then raises |
| # appropriate exceptions), so truncate() does not have to be accounted for |
| # here. |
| def __init__(self, data): |
| super().__init__(data) |
| |
| def seek(self, *args): |
| return 0 |
| |
| def tell(self, *args): |
| return 0 |
| |
| class CMockCharPseudoDevFileIO(MockCharPseudoDevFileIO, io.BytesIO): |
| pass |
| |
| class PyMockCharPseudoDevFileIO(MockCharPseudoDevFileIO, pyio.BytesIO): |
| pass |
| |
| |
| class MockNonBlockWriterIO: |
| |
| def __init__(self): |
| self._write_stack = [] |
| self._blocker_char = None |
| |
| def pop_written(self): |
| s = b"".join(self._write_stack) |
| self._write_stack[:] = [] |
| return s |
| |
| def block_on(self, char): |
| """Block when a given char is encountered.""" |
| self._blocker_char = char |
| |
| def readable(self): |
| return True |
| |
| def seekable(self): |
| return True |
| |
| def seek(self, pos, whence=0): |
| # naive implementation, enough for tests |
| return 0 |
| |
| def writable(self): |
| return True |
| |
| def write(self, b): |
| b = bytes(b) |
| n = -1 |
| if self._blocker_char: |
| try: |
| n = b.index(self._blocker_char) |
| except ValueError: |
| pass |
| else: |
| if n > 0: |
| # write data up to the first blocker |
| self._write_stack.append(b[:n]) |
| return n |
| else: |
| # cancel blocker and indicate would block |
| self._blocker_char = None |
| return None |
| self._write_stack.append(b) |
| return len(b) |
| |
| class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase): |
| BlockingIOError = io.BlockingIOError |
| |
| class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase): |
| BlockingIOError = pyio.BlockingIOError |
| |
| |
| # Build classes which point to all the right mocks per io implementation |
| class CTestCase(unittest.TestCase): |
| io = io |
| is_C = True |
| |
| MockRawIO = CMockRawIO |
| MisbehavedRawIO = CMisbehavedRawIO |
| MockFileIO = CMockFileIO |
| CloseFailureIO = CCloseFailureIO |
| MockNonBlockWriterIO = CMockNonBlockWriterIO |
| MockUnseekableIO = CMockUnseekableIO |
| MockRawIOWithoutRead = CMockRawIOWithoutRead |
| SlowFlushRawIO = CSlowFlushRawIO |
| MockCharPseudoDevFileIO = CMockCharPseudoDevFileIO |
| |
| # Use the class as a proxy to the io module members. |
| def __getattr__(self, name): |
| return getattr(io, name) |
| |
| |
| class PyTestCase(unittest.TestCase): |
| io = pyio |
| is_C = False |
| |
| MockRawIO = PyMockRawIO |
| MisbehavedRawIO = PyMisbehavedRawIO |
| MockFileIO = PyMockFileIO |
| CloseFailureIO = PyCloseFailureIO |
| MockNonBlockWriterIO = PyMockNonBlockWriterIO |
| MockUnseekableIO = PyMockUnseekableIO |
| MockRawIOWithoutRead = PyMockRawIOWithoutRead |
| SlowFlushRawIO = PySlowFlushRawIO |
| MockCharPseudoDevFileIO = PyMockCharPseudoDevFileIO |
| |
| # Use the class as a proxy to the _pyio module members. |
| def __getattr__(self, name): |
| return getattr(pyio, name) |