blob: b67f283d54fe500040c52e003ed1bbdeb1e80edb [file] [log] [blame]
"""Tests for mechanize._response.seek_wrapper and friends."""
import copy
import cStringIO
from unittest import TestCase
class TestUnSeekable:
def __init__(self, text):
self._file = cStringIO.StringIO(text)
self.log = []
def tell(self): return self._file.tell()
def seek(self, offset, whence=0): assert False
def read(self, size=-1):
self.log.append(("read", size))
return self._file.read(size)
def readline(self, size=-1):
self.log.append(("readline", size))
return self._file.readline(size)
def readlines(self, sizehint=-1):
self.log.append(("readlines", sizehint))
return self._file.readlines(sizehint)
class TestUnSeekableResponse(TestUnSeekable):
def __init__(self, text, headers):
TestUnSeekable.__init__(self, text)
self.code = 200
self.msg = "OK"
self.headers = headers
self.url = "http://example.com/"
def geturl(self):
return self.url
def info(self):
return self.headers
def close(self):
pass
class SeekableTests(TestCase):
text = """\
The quick brown fox
jumps over the lazy
dog.
"""
text_lines = map(lambda l: l+"\n", text.split("\n")[:-1])
def testSeekable(self):
from mechanize._response import seek_wrapper
text = self.text
for ii in range(1, 6):
fh = TestUnSeekable(text)
sfh = seek_wrapper(fh)
test = getattr(self, "_test%d" % ii)
test(sfh)
# copies have independent seek positions
fh = TestUnSeekable(text)
sfh = seek_wrapper(fh)
self._testCopy(sfh)
def _testCopy(self, sfh):
sfh2 = copy.copy(sfh)
sfh.read(10)
text = self.text
self.assertEqual(sfh2.read(10), text[:10])
sfh2.seek(5)
self.assertEqual(sfh.read(10), text[10:20])
self.assertEqual(sfh2.read(10), text[5:15])
sfh.seek(0)
sfh2.seek(0)
return sfh2
def _test1(self, sfh):
text = self.text
text_lines = self.text_lines
assert sfh.read(10) == text[:10] # calls fh.read
assert sfh.log[-1] == ("read", 10) # .log delegated to fh
sfh.seek(0) # doesn't call fh.seek
assert sfh.read(10) == text[:10] # doesn't call fh.read
assert len(sfh.log) == 1
sfh.seek(0)
assert sfh.read(5) == text[:5] # read only part of cached data
assert len(sfh.log) == 1
sfh.seek(0)
assert sfh.read(25) == text[:25] # calls fh.read
assert sfh.log[1] == ("read", 15)
lines = []
sfh.seek(-1, 1)
while 1:
l = sfh.readline()
if l == "": break
lines.append(l)
assert lines == ["s over the lazy\n"]+text_lines[2:]
assert sfh.log[2:] == [("readline", -1)]*5
sfh.seek(0)
lines = []
while 1:
l = sfh.readline()
if l == "": break
lines.append(l)
assert lines == text_lines
def _test2(self, sfh):
text = self.text
sfh.read(5)
sfh.seek(0)
assert sfh.read() == text
assert sfh.read() == ""
sfh.seek(0)
assert sfh.read() == text
sfh.seek(0)
assert sfh.readline(5) == "The q"
assert sfh.read() == text[5:]
sfh.seek(0)
assert sfh.readline(5) == "The q"
assert sfh.readline() == "uick brown fox\n"
def _test3(self, sfh):
text_lines = self.text_lines
sfh.read(25)
sfh.seek(-1, 1)
self.assertEqual(sfh.readlines(), ["s over the lazy\n"]+text_lines[2:])
sfh.seek(0)
assert sfh.readlines() == text_lines
def _test4(self, sfh):
text_lines = self.text_lines
count = 0
limit = 10
while count < limit:
if count == 5:
self.assertRaises(StopIteration, sfh.next)
break
else:
sfh.next() == text_lines[count]
count = count + 1
else:
assert False, "StopIteration not raised"
def _test5(self, sfh):
text = self.text
sfh.read(10)
sfh.seek(5)
self.assert_(sfh.invariant())
sfh.seek(0, 2)
self.assert_(sfh.invariant())
sfh.seek(0)
self.assertEqual(sfh.read(), text)
def testResponseSeekWrapper(self):
from mechanize import response_seek_wrapper
hdrs = {"Content-type": "text/html"}
r = TestUnSeekableResponse(self.text, hdrs)
rsw = response_seek_wrapper(r)
rsw2 = self._testCopy(rsw)
self.assert_(rsw is not rsw2)
self.assertEqual(rsw.info(), rsw2.info())
self.assert_(rsw.info() is not rsw2.info())
# should be able to close already-closed object
rsw2.close()
rsw2.close()
def testSetResponseData(self):
from mechanize import response_seek_wrapper
r = TestUnSeekableResponse(self.text, {'blah': 'yawn'})
rsw = response_seek_wrapper(r)
rsw.set_data("""\
A Seeming somwhat more than View;
That doth instruct the Mind
In Things that ly behind,
""")
self.assertEqual(rsw.read(9), "A Seeming")
self.assertEqual(rsw.read(13), " somwhat more")
rsw.seek(0)
self.assertEqual(rsw.read(9), "A Seeming")
self.assertEqual(rsw.readline(), " somwhat more than View;\n")
rsw.seek(0)
self.assertEqual(rsw.readline(), "A Seeming somwhat more than View;\n")
rsw.seek(-1, 1)
self.assertEqual(rsw.read(7), "\n That")
r = TestUnSeekableResponse(self.text, {'blah': 'yawn'})
rsw = response_seek_wrapper(r)
rsw.set_data(self.text)
self._test2(rsw)
rsw.seek(0)
self._test4(rsw)
def testGetResponseData(self):
from mechanize import response_seek_wrapper
r = TestUnSeekableResponse(self.text, {'blah': 'yawn'})
rsw = response_seek_wrapper(r)
self.assertEqual(rsw.get_data(), self.text)
self._test2(rsw)
rsw.seek(0)
self._test4(rsw)
if __name__ == "__main__":
import unittest
unittest.main()