| import io |
| import unittest |
| |
| |
| class TestFileBasedBuffer(unittest.TestCase): |
| def _makeOne(self, file=None, from_buffer=None): |
| from waitress.buffers import FileBasedBuffer |
| |
| buf = FileBasedBuffer(file, from_buffer=from_buffer) |
| self.buffers_to_close.append(buf) |
| return buf |
| |
| def setUp(self): |
| self.buffers_to_close = [] |
| |
| def tearDown(self): |
| for buf in self.buffers_to_close: |
| buf.close() |
| |
| def test_ctor_from_buffer_None(self): |
| inst = self._makeOne("file") |
| self.assertEqual(inst.file, "file") |
| |
| def test_ctor_from_buffer(self): |
| from_buffer = io.BytesIO(b"data") |
| from_buffer.getfile = lambda *x: from_buffer |
| f = io.BytesIO() |
| inst = self._makeOne(f, from_buffer) |
| self.assertEqual(inst.file, f) |
| del from_buffer.getfile |
| self.assertEqual(inst.remain, 4) |
| from_buffer.close() |
| |
| def test___len__(self): |
| inst = self._makeOne() |
| inst.remain = 10 |
| self.assertEqual(len(inst), 10) |
| |
| def test___nonzero__(self): |
| inst = self._makeOne() |
| inst.remain = 10 |
| self.assertEqual(bool(inst), True) |
| inst.remain = 0 |
| self.assertEqual(bool(inst), True) |
| |
| def test_append(self): |
| f = io.BytesIO(b"data") |
| inst = self._makeOne(f) |
| inst.append(b"data2") |
| self.assertEqual(f.getvalue(), b"datadata2") |
| self.assertEqual(inst.remain, 5) |
| |
| def test_get_skip_true(self): |
| f = io.BytesIO(b"data") |
| inst = self._makeOne(f) |
| result = inst.get(100, skip=True) |
| self.assertEqual(result, b"data") |
| self.assertEqual(inst.remain, -4) |
| |
| def test_get_skip_false(self): |
| f = io.BytesIO(b"data") |
| inst = self._makeOne(f) |
| result = inst.get(100, skip=False) |
| self.assertEqual(result, b"data") |
| self.assertEqual(inst.remain, 0) |
| |
| def test_get_skip_bytes_less_than_zero(self): |
| f = io.BytesIO(b"data") |
| inst = self._makeOne(f) |
| result = inst.get(-1, skip=False) |
| self.assertEqual(result, b"data") |
| self.assertEqual(inst.remain, 0) |
| |
| def test_skip_remain_gt_bytes(self): |
| f = io.BytesIO(b"d") |
| inst = self._makeOne(f) |
| inst.remain = 1 |
| inst.skip(1) |
| self.assertEqual(inst.remain, 0) |
| |
| def test_skip_remain_lt_bytes(self): |
| f = io.BytesIO(b"d") |
| inst = self._makeOne(f) |
| inst.remain = 1 |
| self.assertRaises(ValueError, inst.skip, 2) |
| |
| def test_newfile(self): |
| inst = self._makeOne() |
| self.assertRaises(NotImplementedError, inst.newfile) |
| |
| def test_prune_remain_notzero(self): |
| f = io.BytesIO(b"d") |
| inst = self._makeOne(f) |
| inst.remain = 1 |
| nf = io.BytesIO() |
| inst.newfile = lambda *x: nf |
| inst.prune() |
| self.assertTrue(inst.file is not f) |
| self.assertEqual(nf.getvalue(), b"d") |
| |
| def test_prune_remain_zero_tell_notzero(self): |
| f = io.BytesIO(b"d") |
| inst = self._makeOne(f) |
| nf = io.BytesIO(b"d") |
| inst.newfile = lambda *x: nf |
| inst.remain = 0 |
| inst.prune() |
| self.assertTrue(inst.file is not f) |
| self.assertEqual(nf.getvalue(), b"d") |
| |
| def test_prune_remain_zero_tell_zero(self): |
| f = io.BytesIO() |
| inst = self._makeOne(f) |
| inst.remain = 0 |
| inst.prune() |
| self.assertTrue(inst.file is f) |
| |
| def test_close(self): |
| f = io.BytesIO() |
| inst = self._makeOne(f) |
| inst.close() |
| self.assertTrue(f.closed) |
| self.buffers_to_close.remove(inst) |
| |
| |
| class TestTempfileBasedBuffer(unittest.TestCase): |
| def _makeOne(self, from_buffer=None): |
| from waitress.buffers import TempfileBasedBuffer |
| |
| buf = TempfileBasedBuffer(from_buffer=from_buffer) |
| self.buffers_to_close.append(buf) |
| return buf |
| |
| def setUp(self): |
| self.buffers_to_close = [] |
| |
| def tearDown(self): |
| for buf in self.buffers_to_close: |
| buf.close() |
| |
| def test_newfile(self): |
| inst = self._makeOne() |
| r = inst.newfile() |
| self.assertTrue(hasattr(r, "fileno")) # file |
| r.close() |
| |
| |
| class TestBytesIOBasedBuffer(unittest.TestCase): |
| def _makeOne(self, from_buffer=None): |
| from waitress.buffers import BytesIOBasedBuffer |
| |
| return BytesIOBasedBuffer(from_buffer=from_buffer) |
| |
| def test_ctor_from_buffer_not_None(self): |
| f = io.BytesIO() |
| f.getfile = lambda *x: f |
| inst = self._makeOne(f) |
| self.assertTrue(hasattr(inst.file, "read")) |
| |
| def test_ctor_from_buffer_None(self): |
| inst = self._makeOne() |
| self.assertTrue(hasattr(inst.file, "read")) |
| |
| def test_newfile(self): |
| inst = self._makeOne() |
| r = inst.newfile() |
| self.assertTrue(hasattr(r, "read")) |
| |
| |
| class TestReadOnlyFileBasedBuffer(unittest.TestCase): |
| def _makeOne(self, file, block_size=8192): |
| from waitress.buffers import ReadOnlyFileBasedBuffer |
| |
| buf = ReadOnlyFileBasedBuffer(file, block_size) |
| self.buffers_to_close.append(buf) |
| return buf |
| |
| def setUp(self): |
| self.buffers_to_close = [] |
| |
| def tearDown(self): |
| for buf in self.buffers_to_close: |
| buf.close() |
| |
| def test_prepare_not_seekable(self): |
| f = KindaFilelike(b"abc") |
| inst = self._makeOne(f) |
| self.assertFalse(hasattr(inst, "seek")) |
| self.assertFalse(hasattr(inst, "tell")) |
| result = inst.prepare() |
| self.assertEqual(result, False) |
| self.assertEqual(inst.remain, 0) |
| |
| def test_prepare_not_seekable_closeable(self): |
| f = KindaFilelike(b"abc", close=1) |
| inst = self._makeOne(f) |
| result = inst.prepare() |
| self.assertEqual(result, False) |
| self.assertEqual(inst.remain, 0) |
| self.assertTrue(hasattr(inst, "close")) |
| |
| def test_prepare_seekable_closeable(self): |
| f = Filelike(b"abc", close=1, tellresults=[0, 10]) |
| inst = self._makeOne(f) |
| self.assertEqual(inst.seek, f.seek) |
| self.assertEqual(inst.tell, f.tell) |
| result = inst.prepare() |
| self.assertEqual(result, 10) |
| self.assertEqual(inst.remain, 10) |
| self.assertEqual(inst.file.seeked, 0) |
| self.assertTrue(hasattr(inst, "close")) |
| |
| def test_get_numbytes_neg_one(self): |
| f = io.BytesIO(b"abcdef") |
| inst = self._makeOne(f) |
| inst.remain = 2 |
| result = inst.get(-1) |
| self.assertEqual(result, b"ab") |
| self.assertEqual(inst.remain, 2) |
| self.assertEqual(f.tell(), 0) |
| |
| def test_get_numbytes_gt_remain(self): |
| f = io.BytesIO(b"abcdef") |
| inst = self._makeOne(f) |
| inst.remain = 2 |
| result = inst.get(3) |
| self.assertEqual(result, b"ab") |
| self.assertEqual(inst.remain, 2) |
| self.assertEqual(f.tell(), 0) |
| |
| def test_get_numbytes_lt_remain(self): |
| f = io.BytesIO(b"abcdef") |
| inst = self._makeOne(f) |
| inst.remain = 2 |
| result = inst.get(1) |
| self.assertEqual(result, b"a") |
| self.assertEqual(inst.remain, 2) |
| self.assertEqual(f.tell(), 0) |
| |
| def test_get_numbytes_gt_remain_withskip(self): |
| f = io.BytesIO(b"abcdef") |
| inst = self._makeOne(f) |
| inst.remain = 2 |
| result = inst.get(3, skip=True) |
| self.assertEqual(result, b"ab") |
| self.assertEqual(inst.remain, 0) |
| self.assertEqual(f.tell(), 2) |
| |
| def test_get_numbytes_lt_remain_withskip(self): |
| f = io.BytesIO(b"abcdef") |
| inst = self._makeOne(f) |
| inst.remain = 2 |
| result = inst.get(1, skip=True) |
| self.assertEqual(result, b"a") |
| self.assertEqual(inst.remain, 1) |
| self.assertEqual(f.tell(), 1) |
| |
| def test___iter__(self): |
| data = b"a" * 10000 |
| f = io.BytesIO(data) |
| inst = self._makeOne(f) |
| r = b"" |
| for val in inst: |
| r += val |
| self.assertEqual(r, data) |
| |
| def test_append(self): |
| inst = self._makeOne(None) |
| self.assertRaises(NotImplementedError, inst.append, "a") |
| |
| |
| class TestOverflowableBuffer(unittest.TestCase): |
| def _makeOne(self, overflow=10): |
| from waitress.buffers import OverflowableBuffer |
| |
| buf = OverflowableBuffer(overflow) |
| self.buffers_to_close.append(buf) |
| return buf |
| |
| def setUp(self): |
| self.buffers_to_close = [] |
| |
| def tearDown(self): |
| for buf in self.buffers_to_close: |
| buf.close() |
| |
| def test___len__buf_is_None(self): |
| inst = self._makeOne() |
| self.assertEqual(len(inst), 0) |
| |
| def test___len__buf_is_not_None(self): |
| inst = self._makeOne() |
| inst.buf = b"abc" |
| self.assertEqual(len(inst), 3) |
| self.buffers_to_close.remove(inst) |
| |
| def test___nonzero__(self): |
| inst = self._makeOne() |
| inst.buf = b"abc" |
| self.assertEqual(bool(inst), True) |
| inst.buf = b"" |
| self.assertEqual(bool(inst), False) |
| self.buffers_to_close.remove(inst) |
| |
| def test___nonzero___on_int_overflow_buffer(self): |
| inst = self._makeOne() |
| |
| class int_overflow_buf(bytes): |
| def __len__(self): |
| # maxint + 1 |
| return 0x7FFFFFFFFFFFFFFF + 1 |
| |
| inst.buf = int_overflow_buf() |
| self.assertEqual(bool(inst), True) |
| inst.buf = b"" |
| self.assertEqual(bool(inst), False) |
| self.buffers_to_close.remove(inst) |
| |
| def test__create_buffer_large(self): |
| from waitress.buffers import TempfileBasedBuffer |
| |
| inst = self._makeOne() |
| inst.strbuf = b"x" * 11 |
| inst._create_buffer() |
| self.assertEqual(inst.buf.__class__, TempfileBasedBuffer) |
| self.assertEqual(inst.buf.get(100), b"x" * 11) |
| self.assertEqual(inst.strbuf, b"") |
| |
| def test__create_buffer_small(self): |
| from waitress.buffers import BytesIOBasedBuffer |
| |
| inst = self._makeOne() |
| inst.strbuf = b"x" * 5 |
| inst._create_buffer() |
| self.assertEqual(inst.buf.__class__, BytesIOBasedBuffer) |
| self.assertEqual(inst.buf.get(100), b"x" * 5) |
| self.assertEqual(inst.strbuf, b"") |
| |
| def test_append_with_len_more_than_max_int(self): |
| from waitress.compat import MAXINT |
| |
| inst = self._makeOne() |
| inst.overflowed = True |
| buf = DummyBuffer(length=MAXINT) |
| inst.buf = buf |
| result = inst.append(b"x") |
| # we don't want this to throw an OverflowError on Python 2 (see |
| # https://github.com/Pylons/waitress/issues/47) |
| self.assertEqual(result, None) |
| self.buffers_to_close.remove(inst) |
| |
| def test_append_buf_None_not_longer_than_srtbuf_limit(self): |
| inst = self._makeOne() |
| inst.strbuf = b"x" * 5 |
| inst.append(b"hello") |
| self.assertEqual(inst.strbuf, b"xxxxxhello") |
| |
| def test_append_buf_None_longer_than_strbuf_limit(self): |
| inst = self._makeOne(10000) |
| inst.strbuf = b"x" * 8192 |
| inst.append(b"hello") |
| self.assertEqual(inst.strbuf, b"") |
| self.assertEqual(len(inst.buf), 8197) |
| |
| def test_append_overflow(self): |
| inst = self._makeOne(10) |
| inst.strbuf = b"x" * 8192 |
| inst.append(b"hello") |
| self.assertEqual(inst.strbuf, b"") |
| self.assertEqual(len(inst.buf), 8197) |
| |
| def test_append_sz_gt_overflow(self): |
| from waitress.buffers import BytesIOBasedBuffer |
| |
| f = io.BytesIO(b"data") |
| inst = self._makeOne(f) |
| buf = BytesIOBasedBuffer() |
| inst.buf = buf |
| inst.overflow = 2 |
| inst.append(b"data2") |
| self.assertEqual(f.getvalue(), b"data") |
| self.assertTrue(inst.overflowed) |
| self.assertNotEqual(inst.buf, buf) |
| |
| def test_get_buf_None_skip_False(self): |
| inst = self._makeOne() |
| inst.strbuf = b"x" * 5 |
| r = inst.get(5) |
| self.assertEqual(r, b"xxxxx") |
| |
| def test_get_buf_None_skip_True(self): |
| inst = self._makeOne() |
| inst.strbuf = b"x" * 5 |
| r = inst.get(5, skip=True) |
| self.assertFalse(inst.buf is None) |
| self.assertEqual(r, b"xxxxx") |
| |
| def test_skip_buf_None(self): |
| inst = self._makeOne() |
| inst.strbuf = b"data" |
| inst.skip(4) |
| self.assertEqual(inst.strbuf, b"") |
| self.assertNotEqual(inst.buf, None) |
| |
| def test_skip_buf_None_allow_prune_True(self): |
| inst = self._makeOne() |
| inst.strbuf = b"data" |
| inst.skip(4, True) |
| self.assertEqual(inst.strbuf, b"") |
| self.assertEqual(inst.buf, None) |
| |
| def test_prune_buf_None(self): |
| inst = self._makeOne() |
| inst.prune() |
| self.assertEqual(inst.strbuf, b"") |
| |
| def test_prune_with_buf(self): |
| inst = self._makeOne() |
| |
| class Buf: |
| def prune(self): |
| self.pruned = True |
| |
| inst.buf = Buf() |
| inst.prune() |
| self.assertEqual(inst.buf.pruned, True) |
| self.buffers_to_close.remove(inst) |
| |
| def test_prune_with_buf_overflow(self): |
| inst = self._makeOne() |
| |
| class DummyBuffer(io.BytesIO): |
| def getfile(self): |
| return self |
| |
| def prune(self): |
| return True |
| |
| def __len__(self): |
| return 5 |
| |
| def close(self): |
| pass |
| |
| buf = DummyBuffer(b"data") |
| inst.buf = buf |
| inst.overflowed = True |
| inst.overflow = 10 |
| inst.prune() |
| self.assertNotEqual(inst.buf, buf) |
| |
| def test_prune_with_buflen_more_than_max_int(self): |
| from waitress.compat import MAXINT |
| |
| inst = self._makeOne() |
| inst.overflowed = True |
| buf = DummyBuffer(length=MAXINT + 1) |
| inst.buf = buf |
| result = inst.prune() |
| # we don't want this to throw an OverflowError on Python 2 (see |
| # https://github.com/Pylons/waitress/issues/47) |
| self.assertEqual(result, None) |
| |
| def test_getfile_buf_None(self): |
| inst = self._makeOne() |
| f = inst.getfile() |
| self.assertTrue(hasattr(f, "read")) |
| |
| def test_getfile_buf_not_None(self): |
| inst = self._makeOne() |
| buf = io.BytesIO() |
| buf.getfile = lambda *x: buf |
| inst.buf = buf |
| f = inst.getfile() |
| self.assertEqual(f, buf) |
| |
| def test_close_nobuf(self): |
| inst = self._makeOne() |
| inst.buf = None |
| self.assertEqual(inst.close(), None) # doesnt raise |
| self.buffers_to_close.remove(inst) |
| |
| def test_close_withbuf(self): |
| class Buffer: |
| def close(self): |
| self.closed = True |
| |
| buf = Buffer() |
| inst = self._makeOne() |
| inst.buf = buf |
| inst.close() |
| self.assertTrue(buf.closed) |
| self.buffers_to_close.remove(inst) |
| |
| |
| class KindaFilelike: |
| def __init__(self, bytes, close=None, tellresults=None): |
| self.bytes = bytes |
| self.tellresults = tellresults |
| if close is not None: |
| self.close = lambda: close |
| |
| |
| class Filelike(KindaFilelike): |
| def seek(self, v, whence=0): |
| self.seeked = v |
| |
| def tell(self): |
| v = self.tellresults.pop(0) |
| return v |
| |
| |
| class DummyBuffer: |
| def __init__(self, length=0): |
| self.length = length |
| |
| def __len__(self): |
| return self.length |
| |
| def append(self, s): |
| self.length = self.length + len(s) |
| |
| def prune(self): |
| pass |
| |
| def close(self): |
| pass |