blob: b37949b877391fd19920a511e70b7746e616147f [file] [log] [blame]
import io
import unittest
class TestFileBasedBuffer(unittest.TestCase):
def _makeOne(self, file=None, from_buffer=None):
from waitress.buffers import FileBasedBuffer
buf = FileBasedBuffer(file, from_buffer=from_buffer)
self.buffers_to_close.append(buf)
return buf
def setUp(self):
self.buffers_to_close = []
def tearDown(self):
for buf in self.buffers_to_close:
buf.close()
def test_ctor_from_buffer_None(self):
inst = self._makeOne("file")
self.assertEqual(inst.file, "file")
def test_ctor_from_buffer(self):
from_buffer = io.BytesIO(b"data")
from_buffer.getfile = lambda *x: from_buffer
f = io.BytesIO()
inst = self._makeOne(f, from_buffer)
self.assertEqual(inst.file, f)
del from_buffer.getfile
self.assertEqual(inst.remain, 4)
from_buffer.close()
def test___len__(self):
inst = self._makeOne()
inst.remain = 10
self.assertEqual(len(inst), 10)
def test___nonzero__(self):
inst = self._makeOne()
inst.remain = 10
self.assertEqual(bool(inst), True)
inst.remain = 0
self.assertEqual(bool(inst), True)
def test_append(self):
f = io.BytesIO(b"data")
inst = self._makeOne(f)
inst.append(b"data2")
self.assertEqual(f.getvalue(), b"datadata2")
self.assertEqual(inst.remain, 5)
def test_get_skip_true(self):
f = io.BytesIO(b"data")
inst = self._makeOne(f)
result = inst.get(100, skip=True)
self.assertEqual(result, b"data")
self.assertEqual(inst.remain, -4)
def test_get_skip_false(self):
f = io.BytesIO(b"data")
inst = self._makeOne(f)
result = inst.get(100, skip=False)
self.assertEqual(result, b"data")
self.assertEqual(inst.remain, 0)
def test_get_skip_bytes_less_than_zero(self):
f = io.BytesIO(b"data")
inst = self._makeOne(f)
result = inst.get(-1, skip=False)
self.assertEqual(result, b"data")
self.assertEqual(inst.remain, 0)
def test_skip_remain_gt_bytes(self):
f = io.BytesIO(b"d")
inst = self._makeOne(f)
inst.remain = 1
inst.skip(1)
self.assertEqual(inst.remain, 0)
def test_skip_remain_lt_bytes(self):
f = io.BytesIO(b"d")
inst = self._makeOne(f)
inst.remain = 1
self.assertRaises(ValueError, inst.skip, 2)
def test_newfile(self):
inst = self._makeOne()
self.assertRaises(NotImplementedError, inst.newfile)
def test_prune_remain_notzero(self):
f = io.BytesIO(b"d")
inst = self._makeOne(f)
inst.remain = 1
nf = io.BytesIO()
inst.newfile = lambda *x: nf
inst.prune()
self.assertTrue(inst.file is not f)
self.assertEqual(nf.getvalue(), b"d")
def test_prune_remain_zero_tell_notzero(self):
f = io.BytesIO(b"d")
inst = self._makeOne(f)
nf = io.BytesIO(b"d")
inst.newfile = lambda *x: nf
inst.remain = 0
inst.prune()
self.assertTrue(inst.file is not f)
self.assertEqual(nf.getvalue(), b"d")
def test_prune_remain_zero_tell_zero(self):
f = io.BytesIO()
inst = self._makeOne(f)
inst.remain = 0
inst.prune()
self.assertTrue(inst.file is f)
def test_close(self):
f = io.BytesIO()
inst = self._makeOne(f)
inst.close()
self.assertTrue(f.closed)
self.buffers_to_close.remove(inst)
class TestTempfileBasedBuffer(unittest.TestCase):
def _makeOne(self, from_buffer=None):
from waitress.buffers import TempfileBasedBuffer
buf = TempfileBasedBuffer(from_buffer=from_buffer)
self.buffers_to_close.append(buf)
return buf
def setUp(self):
self.buffers_to_close = []
def tearDown(self):
for buf in self.buffers_to_close:
buf.close()
def test_newfile(self):
inst = self._makeOne()
r = inst.newfile()
self.assertTrue(hasattr(r, "fileno")) # file
r.close()
class TestBytesIOBasedBuffer(unittest.TestCase):
def _makeOne(self, from_buffer=None):
from waitress.buffers import BytesIOBasedBuffer
return BytesIOBasedBuffer(from_buffer=from_buffer)
def test_ctor_from_buffer_not_None(self):
f = io.BytesIO()
f.getfile = lambda *x: f
inst = self._makeOne(f)
self.assertTrue(hasattr(inst.file, "read"))
def test_ctor_from_buffer_None(self):
inst = self._makeOne()
self.assertTrue(hasattr(inst.file, "read"))
def test_newfile(self):
inst = self._makeOne()
r = inst.newfile()
self.assertTrue(hasattr(r, "read"))
class TestReadOnlyFileBasedBuffer(unittest.TestCase):
def _makeOne(self, file, block_size=8192):
from waitress.buffers import ReadOnlyFileBasedBuffer
buf = ReadOnlyFileBasedBuffer(file, block_size)
self.buffers_to_close.append(buf)
return buf
def setUp(self):
self.buffers_to_close = []
def tearDown(self):
for buf in self.buffers_to_close:
buf.close()
def test_prepare_not_seekable(self):
f = KindaFilelike(b"abc")
inst = self._makeOne(f)
self.assertFalse(hasattr(inst, "seek"))
self.assertFalse(hasattr(inst, "tell"))
result = inst.prepare()
self.assertEqual(result, False)
self.assertEqual(inst.remain, 0)
def test_prepare_not_seekable_closeable(self):
f = KindaFilelike(b"abc", close=1)
inst = self._makeOne(f)
result = inst.prepare()
self.assertEqual(result, False)
self.assertEqual(inst.remain, 0)
self.assertTrue(hasattr(inst, "close"))
def test_prepare_seekable_closeable(self):
f = Filelike(b"abc", close=1, tellresults=[0, 10])
inst = self._makeOne(f)
self.assertEqual(inst.seek, f.seek)
self.assertEqual(inst.tell, f.tell)
result = inst.prepare()
self.assertEqual(result, 10)
self.assertEqual(inst.remain, 10)
self.assertEqual(inst.file.seeked, 0)
self.assertTrue(hasattr(inst, "close"))
def test_get_numbytes_neg_one(self):
f = io.BytesIO(b"abcdef")
inst = self._makeOne(f)
inst.remain = 2
result = inst.get(-1)
self.assertEqual(result, b"ab")
self.assertEqual(inst.remain, 2)
self.assertEqual(f.tell(), 0)
def test_get_numbytes_gt_remain(self):
f = io.BytesIO(b"abcdef")
inst = self._makeOne(f)
inst.remain = 2
result = inst.get(3)
self.assertEqual(result, b"ab")
self.assertEqual(inst.remain, 2)
self.assertEqual(f.tell(), 0)
def test_get_numbytes_lt_remain(self):
f = io.BytesIO(b"abcdef")
inst = self._makeOne(f)
inst.remain = 2
result = inst.get(1)
self.assertEqual(result, b"a")
self.assertEqual(inst.remain, 2)
self.assertEqual(f.tell(), 0)
def test_get_numbytes_gt_remain_withskip(self):
f = io.BytesIO(b"abcdef")
inst = self._makeOne(f)
inst.remain = 2
result = inst.get(3, skip=True)
self.assertEqual(result, b"ab")
self.assertEqual(inst.remain, 0)
self.assertEqual(f.tell(), 2)
def test_get_numbytes_lt_remain_withskip(self):
f = io.BytesIO(b"abcdef")
inst = self._makeOne(f)
inst.remain = 2
result = inst.get(1, skip=True)
self.assertEqual(result, b"a")
self.assertEqual(inst.remain, 1)
self.assertEqual(f.tell(), 1)
def test___iter__(self):
data = b"a" * 10000
f = io.BytesIO(data)
inst = self._makeOne(f)
r = b""
for val in inst:
r += val
self.assertEqual(r, data)
def test_append(self):
inst = self._makeOne(None)
self.assertRaises(NotImplementedError, inst.append, "a")
class TestOverflowableBuffer(unittest.TestCase):
def _makeOne(self, overflow=10):
from waitress.buffers import OverflowableBuffer
buf = OverflowableBuffer(overflow)
self.buffers_to_close.append(buf)
return buf
def setUp(self):
self.buffers_to_close = []
def tearDown(self):
for buf in self.buffers_to_close:
buf.close()
def test___len__buf_is_None(self):
inst = self._makeOne()
self.assertEqual(len(inst), 0)
def test___len__buf_is_not_None(self):
inst = self._makeOne()
inst.buf = b"abc"
self.assertEqual(len(inst), 3)
self.buffers_to_close.remove(inst)
def test___nonzero__(self):
inst = self._makeOne()
inst.buf = b"abc"
self.assertEqual(bool(inst), True)
inst.buf = b""
self.assertEqual(bool(inst), False)
self.buffers_to_close.remove(inst)
def test___nonzero___on_int_overflow_buffer(self):
inst = self._makeOne()
class int_overflow_buf(bytes):
def __len__(self):
# maxint + 1
return 0x7FFFFFFFFFFFFFFF + 1
inst.buf = int_overflow_buf()
self.assertEqual(bool(inst), True)
inst.buf = b""
self.assertEqual(bool(inst), False)
self.buffers_to_close.remove(inst)
def test__create_buffer_large(self):
from waitress.buffers import TempfileBasedBuffer
inst = self._makeOne()
inst.strbuf = b"x" * 11
inst._create_buffer()
self.assertEqual(inst.buf.__class__, TempfileBasedBuffer)
self.assertEqual(inst.buf.get(100), b"x" * 11)
self.assertEqual(inst.strbuf, b"")
def test__create_buffer_small(self):
from waitress.buffers import BytesIOBasedBuffer
inst = self._makeOne()
inst.strbuf = b"x" * 5
inst._create_buffer()
self.assertEqual(inst.buf.__class__, BytesIOBasedBuffer)
self.assertEqual(inst.buf.get(100), b"x" * 5)
self.assertEqual(inst.strbuf, b"")
def test_append_with_len_more_than_max_int(self):
from waitress.compat import MAXINT
inst = self._makeOne()
inst.overflowed = True
buf = DummyBuffer(length=MAXINT)
inst.buf = buf
result = inst.append(b"x")
# we don't want this to throw an OverflowError on Python 2 (see
# https://github.com/Pylons/waitress/issues/47)
self.assertEqual(result, None)
self.buffers_to_close.remove(inst)
def test_append_buf_None_not_longer_than_srtbuf_limit(self):
inst = self._makeOne()
inst.strbuf = b"x" * 5
inst.append(b"hello")
self.assertEqual(inst.strbuf, b"xxxxxhello")
def test_append_buf_None_longer_than_strbuf_limit(self):
inst = self._makeOne(10000)
inst.strbuf = b"x" * 8192
inst.append(b"hello")
self.assertEqual(inst.strbuf, b"")
self.assertEqual(len(inst.buf), 8197)
def test_append_overflow(self):
inst = self._makeOne(10)
inst.strbuf = b"x" * 8192
inst.append(b"hello")
self.assertEqual(inst.strbuf, b"")
self.assertEqual(len(inst.buf), 8197)
def test_append_sz_gt_overflow(self):
from waitress.buffers import BytesIOBasedBuffer
f = io.BytesIO(b"data")
inst = self._makeOne(f)
buf = BytesIOBasedBuffer()
inst.buf = buf
inst.overflow = 2
inst.append(b"data2")
self.assertEqual(f.getvalue(), b"data")
self.assertTrue(inst.overflowed)
self.assertNotEqual(inst.buf, buf)
def test_get_buf_None_skip_False(self):
inst = self._makeOne()
inst.strbuf = b"x" * 5
r = inst.get(5)
self.assertEqual(r, b"xxxxx")
def test_get_buf_None_skip_True(self):
inst = self._makeOne()
inst.strbuf = b"x" * 5
r = inst.get(5, skip=True)
self.assertFalse(inst.buf is None)
self.assertEqual(r, b"xxxxx")
def test_skip_buf_None(self):
inst = self._makeOne()
inst.strbuf = b"data"
inst.skip(4)
self.assertEqual(inst.strbuf, b"")
self.assertNotEqual(inst.buf, None)
def test_skip_buf_None_allow_prune_True(self):
inst = self._makeOne()
inst.strbuf = b"data"
inst.skip(4, True)
self.assertEqual(inst.strbuf, b"")
self.assertEqual(inst.buf, None)
def test_prune_buf_None(self):
inst = self._makeOne()
inst.prune()
self.assertEqual(inst.strbuf, b"")
def test_prune_with_buf(self):
inst = self._makeOne()
class Buf:
def prune(self):
self.pruned = True
inst.buf = Buf()
inst.prune()
self.assertEqual(inst.buf.pruned, True)
self.buffers_to_close.remove(inst)
def test_prune_with_buf_overflow(self):
inst = self._makeOne()
class DummyBuffer(io.BytesIO):
def getfile(self):
return self
def prune(self):
return True
def __len__(self):
return 5
def close(self):
pass
buf = DummyBuffer(b"data")
inst.buf = buf
inst.overflowed = True
inst.overflow = 10
inst.prune()
self.assertNotEqual(inst.buf, buf)
def test_prune_with_buflen_more_than_max_int(self):
from waitress.compat import MAXINT
inst = self._makeOne()
inst.overflowed = True
buf = DummyBuffer(length=MAXINT + 1)
inst.buf = buf
result = inst.prune()
# we don't want this to throw an OverflowError on Python 2 (see
# https://github.com/Pylons/waitress/issues/47)
self.assertEqual(result, None)
def test_getfile_buf_None(self):
inst = self._makeOne()
f = inst.getfile()
self.assertTrue(hasattr(f, "read"))
def test_getfile_buf_not_None(self):
inst = self._makeOne()
buf = io.BytesIO()
buf.getfile = lambda *x: buf
inst.buf = buf
f = inst.getfile()
self.assertEqual(f, buf)
def test_close_nobuf(self):
inst = self._makeOne()
inst.buf = None
self.assertEqual(inst.close(), None) # doesnt raise
self.buffers_to_close.remove(inst)
def test_close_withbuf(self):
class Buffer:
def close(self):
self.closed = True
buf = Buffer()
inst = self._makeOne()
inst.buf = buf
inst.close()
self.assertTrue(buf.closed)
self.buffers_to_close.remove(inst)
class KindaFilelike:
def __init__(self, bytes, close=None, tellresults=None):
self.bytes = bytes
self.tellresults = tellresults
if close is not None:
self.close = lambda: close
class Filelike(KindaFilelike):
def seek(self, v, whence=0):
self.seeked = v
def tell(self):
v = self.tellresults.pop(0)
return v
class DummyBuffer:
def __init__(self, length=0):
self.length = length
def __len__(self):
return self.length
def append(self, s):
self.length = self.length + len(s)
def prune(self):
pass
def close(self):
pass