blob: bf7f4147d59a4d3c65075e6faf112bd067a754b4 [file] [log] [blame]
#!/usr/bin/env python
# These tests access the network.
# thanks Moof (aka Giles Antonio Radford) for some of these
import errno
import os
import socket
import sys
import tempfile
import urllib
import mechanize
from mechanize import build_opener, install_opener, urlopen, urlretrieve
from mechanize import CookieJar, HTTPCookieProcessor, \
HTTPHandler, HTTPRefreshProcessor, \
HTTPEquivProcessor, HTTPRedirectHandler, \
HTTPRedirectDebugProcessor, HTTPResponseDebugProcessor
from mechanize._rfc3986 import urljoin
from mechanize._util import hide_experimental_warnings, \
reset_experimental_warnings
import mechanize._sockettimeout
from mechanize._testcase import TestCase
#from cookielib import CookieJar
#from urllib2 import build_opener, install_opener, urlopen
#from urllib2 import HTTPCookieProcessor, HTTPHandler
#from mechanize import CreateBSDDBCookieJar
## import logging
## logger = logging.getLogger("mechanize")
## logger.addHandler(logging.StreamHandler(sys.stdout))
## #logger.setLevel(logging.DEBUG)
## logger.setLevel(logging.INFO)
def sanepathname2url(path):
import urllib
urlpath = urllib.pathname2url(path)
if os.name == "nt" and urlpath.startswith("///"):
urlpath = urlpath[2:]
# XXX don't ask me about the mac...
return urlpath
def read_file(filename):
fh = open(filename)
try:
return fh.read()
finally:
fh.close()
class SocketTimeoutTest(TestCase):
# the timeout tests in this module aren't full functional tests: in order
# to speed things up, don't actually call .settimeout on the socket. XXX
# allow running the tests against a slow server with a real timeout
def _monkey_patch_socket(self):
class Delegator(object):
def __init__(self, delegate):
self._delegate = delegate
def __getattr__(self, name):
return getattr(self._delegate, name)
assertEquals = self.assertEquals
class TimeoutLog(object):
AnyValue = object()
def __init__(self):
self._nr_sockets = 0
self._timeouts = []
self.start()
def start(self):
self._monitoring = True
def stop(self):
self._monitoring = False
def socket_created(self):
if self._monitoring:
self._nr_sockets += 1
def settimeout_called(self, timeout):
if self._monitoring:
self._timeouts.append(timeout)
def verify(self, value=AnyValue):
if sys.version_info[:2] < (2, 6):
# per-connection timeout not supported in Python 2.5
self.verify_default()
else:
assertEquals(len(self._timeouts), self._nr_sockets)
if value is not self.AnyValue:
for timeout in self._timeouts:
assertEquals(timeout, value)
def verify_default(self):
assertEquals(len(self._timeouts), 0)
log = TimeoutLog()
def settimeout(timeout):
log.settimeout_called(timeout)
orig_socket = socket.socket
def make_socket(*args, **kwds):
sock = Delegator(orig_socket(*args, **kwds))
log.socket_created()
sock.settimeout = settimeout
return sock
self.monkey_patch(socket, "socket", make_socket)
return log
class SimpleTests(SocketTimeoutTest):
# thanks Moof (aka Giles Antonio Radford)
def setUp(self):
super(SimpleTests, self).setUp()
self.browser = mechanize.Browser()
def test_simple(self):
self.browser.open(self.uri)
self.assertEqual(self.browser.title(), 'Python bits')
# relative URL
self.browser.open('/mechanize/')
self.assertEqual(self.browser.title(), 'mechanize')
def test_basic_auth(self):
uri = urljoin(self.uri, "basic_auth")
self.assertRaises(mechanize.URLError, self.browser.open, uri)
self.browser.add_password(uri, "john", "john")
self.browser.open(uri)
self.assertEqual(self.browser.title(), 'Basic Auth Protected Area')
def test_digest_auth(self):
uri = urljoin(self.uri, "digest_auth")
self.assertRaises(mechanize.URLError, self.browser.open, uri)
self.browser.add_password(uri, "digestuser", "digestuser")
self.browser.open(uri)
self.assertEqual(self.browser.title(), 'Digest Auth Protected Area')
def test_open_with_default_timeout(self):
timeout_log = self._monkey_patch_socket()
self.browser.open(self.uri)
self.assertEqual(self.browser.title(), 'Python bits')
timeout_log.verify_default()
def test_open_with_timeout(self):
timeout_log = self._monkey_patch_socket()
timeout = 10.
self.browser.open(self.uri, timeout=timeout)
self.assertEqual(self.browser.title(), 'Python bits')
timeout_log.verify(timeout)
def test_urlopen_with_default_timeout(self):
timeout_log = self._monkey_patch_socket()
response = mechanize.urlopen(self.uri)
self.assert_contains(response.read(), "Python bits")
timeout_log.verify_default()
def test_urlopen_with_timeout(self):
timeout_log = self._monkey_patch_socket()
timeout = 10.
response = mechanize.urlopen(self.uri, timeout=timeout)
self.assert_contains(response.read(), "Python bits")
timeout_log.verify(timeout)
def test_302_and_404(self):
# the combination of 302 and 404 (/redirected is configured to redirect
# to a non-existent URL /nonexistent) has caused problems in the past
# due to accidental double-wrapping of the error response
import urllib2
self.assertRaises(
urllib2.HTTPError,
self.browser.open, urljoin(self.uri, "/redirected"),
)
def test_reread(self):
# closing response shouldn't stop methods working (this happens also to
# be true for e.g. mechanize.OpenerDirector when mechanize's own
# handlers are in use, but is guaranteed to be true for
# mechanize.Browser)
r = self.browser.open(self.uri)
data = r.read()
r.close()
r.seek(0)
self.assertEqual(r.read(), data)
self.assertEqual(self.browser.response().read(), data)
def test_error_recovery(self):
self.assertRaises(mechanize.URLError, self.browser.open,
'file:///c|thisnoexistyiufheiurgbueirgbue')
self.browser.open(self.uri)
self.assertEqual(self.browser.title(), 'Python bits')
def test_redirect(self):
# 301 redirect due to missing final '/'
r = self.browser.open(urljoin(self.uri, "bits"))
self.assertEqual(r.code, 200)
self.assert_("GeneralFAQ.html" in r.read(2048))
def test_refresh(self):
def refresh_request(seconds):
uri = urljoin(self.uri, "/cgi-bin/cookietest.cgi")
val = urllib.quote_plus('%d; url="%s"' % (seconds, self.uri))
return uri + ("?refresh=%s" % val)
self.browser.set_handle_refresh(True, honor_time=False)
r = self.browser.open(refresh_request(5))
self.assertEqual(r.geturl(), self.uri)
# Set a maximum refresh time of 30 seconds (these long refreshes tend
# to be there only because the website owner wants you to see the
# latest news, or whatever -- they're not essential to the operation of
# the site, and not really useful or appropriate when scraping).
refresh_uri = refresh_request(60)
self.browser.set_handle_refresh(True, max_time=30., honor_time=True)
r = self.browser.open(refresh_uri)
self.assertEqual(r.geturl(), refresh_uri)
# allow long refreshes (but don't actually wait 60 seconds)
self.browser.set_handle_refresh(True, max_time=None, honor_time=False)
r = self.browser.open(refresh_request(60))
self.assertEqual(r.geturl(), self.uri)
def test_file_url(self):
url = "file://%s" % sanepathname2url(
os.path.abspath('functional_tests.py'))
r = self.browser.open(url)
self.assert_("this string appears in this file ;-)" in r.read())
def test_open_local_file(self):
# Since the file: URL scheme is not well standardised, Browser has a
# special method to open files by name, for convenience:
br = mechanize.Browser()
response = br.open_local_file("mechanize/_mechanize.py")
self.assert_("def open_local_file(self, filename):" in
response.get_data())
def test_open_novisit(self):
def test_state(br):
self.assert_(br.request is None)
self.assert_(br.response() is None)
self.assertRaises(mechanize.BrowserStateError, br.back)
test_state(self.browser)
uri = urljoin(self.uri, "bits")
# note this involves a redirect, which should itself be non-visiting
r = self.browser.open_novisit(uri)
test_state(self.browser)
self.assert_("GeneralFAQ.html" in r.read(2048))
# Request argument instead of URL
r = self.browser.open_novisit(mechanize.Request(uri))
test_state(self.browser)
self.assert_("GeneralFAQ.html" in r.read(2048))
def test_non_seekable(self):
# check everything still works without response_seek_wrapper and
# the .seek() method on response objects
ua = mechanize.UserAgent()
ua.set_seekable_responses(False)
ua.set_handle_equiv(False)
response = ua.open(self.uri)
self.failIf(hasattr(response, "seek"))
data = response.read()
self.assert_("Python bits" in data)
class ResponseTests(TestCase):
def test_seek(self):
br = mechanize.Browser()
r = br.open(self.uri)
html = r.read()
r.seek(0)
self.assertEqual(r.read(), html)
def test_seekable_response_opener(self):
opener = mechanize.OpenerFactory(
mechanize.SeekableResponseOpener).build_opener()
r = opener.open(urljoin(self.uri, "bits/cctest2.txt"))
r.read()
r.seek(0)
self.assertEqual(r.read(),
r.get_data(),
"Hello ClientCookie functional test suite.\n")
def test_seek_wrapper_class_name(self):
opener = mechanize.UserAgent()
opener.set_seekable_responses(True)
try:
opener.open(urljoin(self.uri, "nonexistent"))
except mechanize.HTTPError, exc:
self.assert_("HTTPError instance" in repr(exc))
def test_no_seek(self):
# should be possible to turn off UserAgent's .seek() functionality
def check_no_seek(opener):
r = opener.open(urljoin(self.uri, "bits/cctest2.txt"))
self.assert_(not hasattr(r, "seek"))
try:
opener.open(urljoin(self.uri, "nonexistent"))
except mechanize.HTTPError, exc:
self.assert_(not hasattr(exc, "seek"))
# mechanize.UserAgent
opener = mechanize.UserAgent()
opener.set_handle_equiv(False)
opener.set_seekable_responses(False)
opener.set_debug_http(False)
check_no_seek(opener)
# mechanize.OpenerDirector
opener = mechanize.build_opener()
check_no_seek(opener)
def test_consistent_seek(self):
# if we explicitly request that returned response objects have the
# .seek() method, then raised HTTPError exceptions should also have the
# .seek() method
def check(opener, excs_also):
r = opener.open(urljoin(self.uri, "bits/cctest2.txt"))
data = r.read()
r.seek(0)
self.assertEqual(data, r.read(), r.get_data())
try:
opener.open(urljoin(self.uri, "nonexistent"))
except mechanize.HTTPError, exc:
data = exc.read()
if excs_also:
exc.seek(0)
self.assertEqual(data, exc.read(), exc.get_data())
else:
self.assert_(False)
opener = mechanize.UserAgent()
opener.set_debug_http(False)
# Here, only the .set_handle_equiv() causes .seek() to be present, so
# exceptions don't necessarily support the .seek() method (and do not,
# at present).
opener.set_handle_equiv(True)
opener.set_seekable_responses(False)
check(opener, excs_also=False)
# Here, (only) the explicit .set_seekable_responses() causes .seek() to
# be present (different mechanism from .set_handle_equiv()). Since
# there's an explicit request, ALL responses are seekable, even
# exception responses (HTTPError instances).
opener.set_handle_equiv(False)
opener.set_seekable_responses(True)
check(opener, excs_also=True)
def test_set_response(self):
br = mechanize.Browser()
r = br.open(self.uri)
html = r.read()
self.assertEqual(br.title(), "Python bits")
newhtml = """<html><body><a href="spam">click me</a></body></html>"""
r.set_data(newhtml)
self.assertEqual(r.read(), newhtml)
self.assertEqual(br.response().read(), html)
br.response().set_data(newhtml)
self.assertEqual(br.response().read(), html)
self.assertEqual(list(br.links())[0].url, 'http://sourceforge.net')
br.set_response(r)
self.assertEqual(br.response().read(), newhtml)
self.assertEqual(list(br.links())[0].url, "spam")
def test_new_response(self):
br = mechanize.Browser()
data = "<html><head><title>Test</title></head><body><p>Hello.</p></body></html>"
response = mechanize.make_response(
data,
[("Content-type", "text/html")],
"http://example.com/",
200,
"OK"
)
br.set_response(response)
self.assertEqual(br.response().get_data(), data)
def hidden_test_close_pickle_load(self):
print ("Test test_close_pickle_load is expected to fail unless Python "
"standard library patch http://python.org/sf/1144636 has been "
"applied")
import pickle
b = mechanize.Browser()
r = b.open(urljoin(self.uri, "bits/cctest2.txt"))
r.read()
r.close()
r.seek(0)
self.assertEqual(r.read(),
"Hello ClientCookie functional test suite.\n")
HIGHEST_PROTOCOL = -1
p = pickle.dumps(b, HIGHEST_PROTOCOL)
b = pickle.loads(p)
r = b.response()
r.seek(0)
self.assertEqual(r.read(),
"Hello ClientCookie functional test suite.\n")
class FunctionalTests(SocketTimeoutTest):
def test_referer(self):
br = mechanize.Browser()
br.set_handle_refresh(True, honor_time=False)
referer = urljoin(self.uri, "bits/referertest.html")
info = urljoin(self.uri, "/cgi-bin/cookietest.cgi")
r = br.open(info)
self.assert_(referer not in r.get_data())
br.open(referer)
r = br.follow_link(text="Here")
self.assert_(referer in r.get_data())
def test_cookies(self):
import urllib2
# this test page depends on cookies, and an http-equiv refresh
#cj = CreateBSDDBCookieJar("/home/john/db.db")
cj = CookieJar()
handlers = [
HTTPCookieProcessor(cj),
HTTPRefreshProcessor(max_time=None, honor_time=False),
HTTPEquivProcessor(),
HTTPRedirectHandler(), # needed for Refresh handling in 2.4.0
# HTTPHandler(True),
# HTTPRedirectDebugProcessor(),
# HTTPResponseDebugProcessor(),
]
o = apply(build_opener, handlers)
try:
install_opener(o)
try:
r = urlopen(urljoin(self.uri, "/cgi-bin/cookietest.cgi"))
except urllib2.URLError, e:
#print e.read()
raise
data = r.read()
#print data
self.assert_(
data.find("Your browser supports cookies!") >= 0)
self.assert_(len(cj) == 1)
# test response.seek() (added by HTTPEquivProcessor)
r.seek(0)
samedata = r.read()
r.close()
self.assert_(samedata == data)
finally:
o.close()
install_opener(None)
def test_robots(self):
plain_opener = mechanize.build_opener(mechanize.HTTPRobotRulesProcessor)
browser = mechanize.Browser()
for opener in plain_opener, browser:
r = opener.open(urljoin(self.uri, "robots"))
self.assertEqual(r.code, 200)
self.assertRaises(
mechanize.RobotExclusionError,
opener.open, urljoin(self.uri, "norobots"))
def _check_retrieve(self, url, filename, headers):
from urllib import urlopen
self.assertEqual(headers.get('Content-Type'), 'text/html')
self.assertEqual(read_file(filename), urlopen(url).read())
def test_retrieve_to_named_file(self):
url = urljoin(self.uri, "/mechanize/")
test_filename = os.path.join(self.make_temp_dir(), "python.html")
opener = mechanize.build_opener()
verif = CallbackVerifier(self)
filename, headers = opener.retrieve(url, test_filename, verif.callback)
self.assertEqual(filename, test_filename)
self._check_retrieve(url, filename, headers)
self.assert_(os.path.isfile(filename))
def test_retrieve(self):
# not passing an explicit filename downloads to a temporary file
# using a Request object instead of a URL works
url = urljoin(self.uri, "/mechanize/")
opener = mechanize.build_opener()
verif = CallbackVerifier(self)
request = mechanize.Request(url)
filename, headers = opener.retrieve(request, reporthook=verif.callback)
self.assertEquals(request.visit, False)
self._check_retrieve(url, filename, headers)
opener.close()
# closing the opener removed the temporary file
self.failIf(os.path.isfile(filename))
def test_urlretrieve(self):
timeout_log = self._monkey_patch_socket()
timeout = 10.
url = urljoin(self.uri, "/mechanize/")
verif = CallbackVerifier(self)
filename, headers = mechanize.urlretrieve(url,
reporthook=verif.callback,
timeout=timeout)
timeout_log.stop()
self._check_retrieve(url, filename, headers)
timeout_log.verify(timeout)
def test_reload_read_incomplete(self):
from mechanize import Browser
browser = Browser()
r1 = browser.open(urljoin(self.uri, "bits/mechanize_reload_test.html"))
# if we don't do anything and go straight to another page, most of the
# last page's response won't be .read()...
r2 = browser.open(urljoin(self.uri, "mechanize"))
self.assert_(len(r1.get_data()) < 4097) # we only .read() a little bit
# ...so if we then go back, .follow_link() for a link near the end (a
# few kb in, past the point that always gets read in HTML files because
# of HEAD parsing) will only work if it causes a .reload()...
r3 = browser.back()
browser.follow_link(text="near the end")
# ... good, no LinkNotFoundError, so we did reload.
# we have .read() the whole file
self.assertEqual(len(r3._seek_wrapper__cache.getvalue()), 4202)
## def test_cacheftp(self):
## from urllib2 import CacheFTPHandler, build_opener
## o = build_opener(CacheFTPHandler())
## r = o.open("ftp://ftp.python.org/pub/www.python.org/robots.txt")
## data1 = r.read()
## r.close()
## r = o.open("ftp://ftp.python.org/pub/www.python.org/2.3.2/announce.txt")
## data2 = r.read()
## r.close()
## self.assert_(data1 != data2)
class CookieJarTests(TestCase):
def test_mozilla_cookiejar(self):
filename = tempfile.mktemp()
try:
def get_cookiejar():
cj = mechanize.MozillaCookieJar(filename=filename)
try:
cj.revert()
except IOError, exc:
if exc.errno != errno.ENOENT:
raise
return cj
def commit(cj):
cj.save()
self._test_cookiejar(get_cookiejar, commit)
finally:
try:
os.remove(filename)
except OSError, exc:
if exc.errno != errno.ENOENT:
raise
def test_firefox3_cookiejar(self):
try:
mechanize.Firefox3CookieJar
except AttributeError:
# firefox 3 cookiejar is only supported in Python 2.5 and later;
# also, sqlite3 must be available
return
filename = tempfile.mktemp()
try:
def get_cookiejar():
hide_experimental_warnings()
try:
cj = mechanize.Firefox3CookieJar(filename=filename)
finally:
reset_experimental_warnings()
cj.connect()
return cj
def commit(cj):
pass
self._test_cookiejar(get_cookiejar, commit)
finally:
os.remove(filename)
def _test_cookiejar(self, get_cookiejar, commit):
cookiejar = get_cookiejar()
br = mechanize.Browser()
br.set_cookiejar(cookiejar)
br.set_handle_refresh(False)
url = urljoin(self.uri, "/cgi-bin/cookietest.cgi")
# no cookie was set on the first request
html = br.open(url).read()
self.assertEquals(html.find("Your browser supports cookies!"), -1)
self.assertEquals(len(cookiejar), 1)
# ... but now we have the cookie
html = br.open(url).read()
self.assert_("Your browser supports cookies!" in html)
commit(cookiejar)
# should still have the cookie when we load afresh
cookiejar = get_cookiejar()
br.set_cookiejar(cookiejar)
html = br.open(url).read()
self.assert_("Your browser supports cookies!" in html)
class CallbackVerifier:
# for .test_urlretrieve()
def __init__(self, testcase):
self._count = 0
self._testcase = testcase
def callback(self, block_nr, block_size, total_size):
self._testcase.assertEqual(block_nr, self._count)
self._count = self._count + 1
if __name__ == "__main__":
import sys
sys.path.insert(0, "test-tools")
test_path = os.path.join(os.path.dirname(sys.argv[0]), "test")
sys.path.insert(0, test_path)
import testprogram
USAGE_EXAMPLES = """
Examples:
%(progName)s
- run all tests
%(progName)s functional_tests.SimpleTests
- run all 'test*' test methods in class SimpleTests
%(progName)s functional_tests.SimpleTests.test_redirect
- run SimpleTests.test_redirect
%(progName)s -l
- start a local Twisted HTTP server and run the functional
tests against that, rather than against SourceForge
(quicker!)
If this option doesn't work on Windows/Mac, somebody please
tell me about it, or I'll never find out...
"""
prog = testprogram.TestProgram(
["functional_tests"],
localServerProcess=testprogram.TwistedServerProcess(),
usageExamples=USAGE_EXAMPLES,
)
result = prog.runTests()