blob: 1a4e2c0281571b8c7bbc7da8dcb377835adbb091 [file] [log] [blame]
"""HTML handling.
Copyright 2003-2006 John J. Lee <jjl@pobox.com>
This code is free software; you can redistribute it and/or modify it under
the terms of the BSD or ZPL 2.1 licenses (see the file COPYING.txt
included with the distribution).
"""
import codecs
import copy
import htmlentitydefs
import re
import _sgmllib_copy as sgmllib
import _beautifulsoup
import _form
from _headersutil import split_header_words, is_html as _is_html
import _request
import _rfc3986
DEFAULT_ENCODING = "latin-1"
COMPRESS_RE = re.compile(r"\s+")
class CachingGeneratorFunction(object):
"""Caching wrapper around a no-arguments iterable."""
def __init__(self, iterable):
self._cache = []
# wrap iterable to make it non-restartable (otherwise, repeated
# __call__ would give incorrect results)
self._iterator = iter(iterable)
def __call__(self):
cache = self._cache
for item in cache:
yield item
for item in self._iterator:
cache.append(item)
yield item
class EncodingFinder:
def __init__(self, default_encoding):
self._default_encoding = default_encoding
def encoding(self, response):
# HTTPEquivProcessor may be in use, so both HTTP and HTTP-EQUIV
# headers may be in the response. HTTP-EQUIV headers come last,
# so try in order from first to last.
for ct in response.info().getheaders("content-type"):
for k, v in split_header_words([ct])[0]:
if k == "charset":
encoding = v
try:
codecs.lookup(v)
except LookupError:
continue
else:
return encoding
return self._default_encoding
class ResponseTypeFinder:
def __init__(self, allow_xhtml):
self._allow_xhtml = allow_xhtml
def is_html(self, response, encoding):
ct_hdrs = response.info().getheaders("content-type")
url = response.geturl()
# XXX encoding
return _is_html(ct_hdrs, url, self._allow_xhtml)
class Args(object):
# idea for this argument-processing trick is from Peter Otten
def __init__(self, args_map):
self.__dict__["dictionary"] = dict(args_map)
def __getattr__(self, key):
try:
return self.dictionary[key]
except KeyError:
return getattr(self.__class__, key)
def __setattr__(self, key, value):
if key == "dictionary":
raise AttributeError()
self.dictionary[key] = value
def form_parser_args(
select_default=False,
form_parser_class=None,
request_class=None,
backwards_compat=False,
):
return Args(locals())
class Link:
def __init__(self, base_url, url, text, tag, attrs):
assert None not in [url, tag, attrs]
self.base_url = base_url
self.absolute_url = _rfc3986.urljoin(base_url, url)
self.url, self.text, self.tag, self.attrs = url, text, tag, attrs
def __cmp__(self, other):
try:
for name in "url", "text", "tag", "attrs":
if getattr(self, name) != getattr(other, name):
return -1
except AttributeError:
return -1
return 0
def __repr__(self):
return "Link(base_url=%r, url=%r, text=%r, tag=%r, attrs=%r)" % (
self.base_url, self.url, self.text, self.tag, self.attrs)
class LinksFactory:
def __init__(self,
link_parser_class=None,
link_class=Link,
urltags=None,
):
import _pullparser
if link_parser_class is None:
link_parser_class = _pullparser.TolerantPullParser
self.link_parser_class = link_parser_class
self.link_class = link_class
if urltags is None:
urltags = {
"a": "href",
"area": "href",
"frame": "src",
"iframe": "src",
}
self.urltags = urltags
self._response = None
self._encoding = None
def set_response(self, response, base_url, encoding):
self._response = response
self._encoding = encoding
self._base_url = base_url
def links(self):
"""Return an iterator that provides links of the document."""
response = self._response
encoding = self._encoding
base_url = self._base_url
p = self.link_parser_class(response, encoding=encoding)
try:
for token in p.tags(*(self.urltags.keys()+["base"])):
if token.type == "endtag":
continue
if token.data == "base":
base_href = dict(token.attrs).get("href")
if base_href is not None:
base_url = base_href
continue
attrs = dict(token.attrs)
tag = token.data
text = None
# XXX use attr_encoding for ref'd doc if that doc does not
# provide one by other means
#attr_encoding = attrs.get("charset")
url = attrs.get(self.urltags[tag]) # XXX is "" a valid URL?
if not url:
# Probably an <A NAME="blah"> link or <AREA NOHREF...>.
# For our purposes a link is something with a URL, so
# ignore this.
continue
url = _rfc3986.clean_url(url, encoding)
if tag == "a":
if token.type != "startendtag":
# hmm, this'd break if end tag is missing
text = p.get_compressed_text(("endtag", tag))
# but this doesn't work for e.g.
# <a href="blah"><b>Andy</b></a>
#text = p.get_compressed_text()
yield Link(base_url, url, text, tag, token.attrs)
except sgmllib.SGMLParseError, exc:
raise _form.ParseError(exc)
class FormsFactory:
"""Makes a sequence of objects satisfying HTMLForm interface.
After calling .forms(), the .global_form attribute is a form object
containing all controls not a descendant of any FORM element.
For constructor argument docs, see ParseResponse argument docs.
"""
def __init__(self,
select_default=False,
form_parser_class=None,
request_class=None,
backwards_compat=False,
):
self.select_default = select_default
if form_parser_class is None:
form_parser_class = _form.FormParser
self.form_parser_class = form_parser_class
if request_class is None:
request_class = _request.Request
self.request_class = request_class
self.backwards_compat = backwards_compat
self._response = None
self.encoding = None
self.global_form = None
def set_response(self, response, encoding):
self._response = response
self.encoding = encoding
self.global_form = None
def forms(self):
encoding = self.encoding
forms = _form.ParseResponseEx(
self._response,
select_default=self.select_default,
form_parser_class=self.form_parser_class,
request_class=self.request_class,
encoding=encoding,
_urljoin=_rfc3986.urljoin,
_urlparse=_rfc3986.urlsplit,
_urlunparse=_rfc3986.urlunsplit,
)
self.global_form = forms[0]
return forms[1:]
class TitleFactory:
def __init__(self):
self._response = self._encoding = None
def set_response(self, response, encoding):
self._response = response
self._encoding = encoding
def _get_title_text(self, parser):
import _pullparser
text = []
tok = None
while 1:
try:
tok = parser.get_token()
except _pullparser.NoMoreTokensError:
break
if tok.type == "data":
text.append(str(tok))
elif tok.type == "entityref":
t = unescape("&%s;" % tok.data,
parser._entitydefs, parser.encoding)
text.append(t)
elif tok.type == "charref":
t = unescape_charref(tok.data, parser.encoding)
text.append(t)
elif tok.type in ["starttag", "endtag", "startendtag"]:
tag_name = tok.data
if tok.type == "endtag" and tag_name == "title":
break
text.append(str(tok))
return COMPRESS_RE.sub(" ", "".join(text).strip())
def title(self):
import _pullparser
p = _pullparser.TolerantPullParser(
self._response, encoding=self._encoding)
try:
try:
p.get_tag("title")
except _pullparser.NoMoreTokensError:
return None
else:
return self._get_title_text(p)
except sgmllib.SGMLParseError, exc:
raise _form.ParseError(exc)
def unescape(data, entities, encoding):
if data is None or "&" not in data:
return data
def replace_entities(match):
ent = match.group()
if ent[1] == "#":
return unescape_charref(ent[2:-1], encoding)
repl = entities.get(ent[1:-1])
if repl is not None:
repl = unichr(repl)
if type(repl) != type(""):
try:
repl = repl.encode(encoding)
except UnicodeError:
repl = ent
else:
repl = ent
return repl
return re.sub(r"&#?[A-Za-z0-9]+?;", replace_entities, data)
def unescape_charref(data, encoding):
name, base = data, 10
if name.startswith("x"):
name, base= name[1:], 16
uc = unichr(int(name, base))
if encoding is None:
return uc
else:
try:
repl = uc.encode(encoding)
except UnicodeError:
repl = "&#%s;" % data
return repl
class MechanizeBs(_beautifulsoup.BeautifulSoup):
_entitydefs = htmlentitydefs.name2codepoint
# don't want the magic Microsoft-char workaround
PARSER_MASSAGE = [(re.compile('(<[^<>]*)/>'),
lambda(x):x.group(1) + ' />'),
(re.compile('<!\s+([^<>]*)>'),
lambda(x):'<!' + x.group(1) + '>')
]
def __init__(self, encoding, text=None, avoidParserProblems=True,
initialTextIsEverything=True):
self._encoding = encoding
_beautifulsoup.BeautifulSoup.__init__(
self, text, avoidParserProblems, initialTextIsEverything)
def handle_charref(self, ref):
t = unescape("&#%s;"%ref, self._entitydefs, self._encoding)
self.handle_data(t)
def handle_entityref(self, ref):
t = unescape("&%s;"%ref, self._entitydefs, self._encoding)
self.handle_data(t)
def unescape_attrs(self, attrs):
escaped_attrs = []
for key, val in attrs:
val = unescape(val, self._entitydefs, self._encoding)
escaped_attrs.append((key, val))
return escaped_attrs
class RobustLinksFactory:
compress_re = COMPRESS_RE
def __init__(self,
link_parser_class=None,
link_class=Link,
urltags=None,
):
if link_parser_class is None:
link_parser_class = MechanizeBs
self.link_parser_class = link_parser_class
self.link_class = link_class
if urltags is None:
urltags = {
"a": "href",
"area": "href",
"frame": "src",
"iframe": "src",
}
self.urltags = urltags
self._bs = None
self._encoding = None
self._base_url = None
def set_soup(self, soup, base_url, encoding):
self._bs = soup
self._base_url = base_url
self._encoding = encoding
def links(self):
bs = self._bs
base_url = self._base_url
encoding = self._encoding
for ch in bs.recursiveChildGenerator():
if (isinstance(ch, _beautifulsoup.Tag) and
ch.name in self.urltags.keys()+["base"]):
link = ch
attrs = bs.unescape_attrs(link.attrs)
attrs_dict = dict(attrs)
if link.name == "base":
base_href = attrs_dict.get("href")
if base_href is not None:
base_url = base_href
continue
url_attr = self.urltags[link.name]
url = attrs_dict.get(url_attr)
if not url:
continue
url = _rfc3986.clean_url(url, encoding)
text = link.fetchText(lambda t: True)
if not text:
# follow _pullparser's weird behaviour rigidly
if link.name == "a":
text = ""
else:
text = None
else:
text = self.compress_re.sub(" ", " ".join(text).strip())
yield Link(base_url, url, text, link.name, attrs)
class RobustFormsFactory(FormsFactory):
def __init__(self, *args, **kwds):
args = form_parser_args(*args, **kwds)
if args.form_parser_class is None:
args.form_parser_class = _form.RobustFormParser
FormsFactory.__init__(self, **args.dictionary)
def set_response(self, response, encoding):
self._response = response
self.encoding = encoding
class RobustTitleFactory:
def __init__(self):
self._bs = self._encoding = None
def set_soup(self, soup, encoding):
self._bs = soup
self._encoding = encoding
def title(self):
title = self._bs.first("title")
if title == _beautifulsoup.Null:
return None
else:
inner_html = "".join([str(node) for node in title.contents])
return COMPRESS_RE.sub(" ", inner_html.strip())
class Factory:
"""Factory for forms, links, etc.
This interface may expand in future.
Public methods:
set_request_class(request_class)
set_response(response)
forms()
links()
Public attributes:
Note that accessing these attributes may raise ParseError.
encoding: string specifying the encoding of response if it contains a text
document (this value is left unspecified for documents that do not have
an encoding, e.g. an image file)
is_html: true if response contains an HTML document (XHTML may be
regarded as HTML too)
title: page title, or None if no title or not HTML
global_form: form object containing all controls that are not descendants
of any FORM element, or None if the forms_factory does not support
supplying a global form
"""
LAZY_ATTRS = ["encoding", "is_html", "title", "global_form"]
def __init__(self, forms_factory, links_factory, title_factory,
encoding_finder=EncodingFinder(DEFAULT_ENCODING),
response_type_finder=ResponseTypeFinder(allow_xhtml=False),
):
"""
Pass keyword arguments only.
default_encoding: character encoding to use if encoding cannot be
determined (or guessed) from the response. You should turn on
HTTP-EQUIV handling if you want the best chance of getting this right
without resorting to this default. The default value of this
parameter (currently latin-1) may change in future.
"""
self._forms_factory = forms_factory
self._links_factory = links_factory
self._title_factory = title_factory
self._encoding_finder = encoding_finder
self._response_type_finder = response_type_finder
self.set_response(None)
def set_request_class(self, request_class):
"""Set request class (mechanize.Request by default).
HTMLForm instances returned by .forms() will return instances of this
class when .click()ed.
"""
self._forms_factory.request_class = request_class
def set_response(self, response):
"""Set response.
The response must either be None or implement the same interface as
objects returned by mechanize.urlopen().
"""
self._response = response
self._forms_genf = self._links_genf = None
self._get_title = None
for name in self.LAZY_ATTRS:
try:
delattr(self, name)
except AttributeError:
pass
def __getattr__(self, name):
if name not in self.LAZY_ATTRS:
return getattr(self.__class__, name)
if name == "encoding":
self.encoding = self._encoding_finder.encoding(
copy.copy(self._response))
return self.encoding
elif name == "is_html":
self.is_html = self._response_type_finder.is_html(
copy.copy(self._response), self.encoding)
return self.is_html
elif name == "title":
if self.is_html:
self.title = self._title_factory.title()
else:
self.title = None
return self.title
elif name == "global_form":
self.forms()
return self.global_form
def forms(self):
"""Return iterable over HTMLForm-like objects.
Raises mechanize.ParseError on failure.
"""
# this implementation sets .global_form as a side-effect, for benefit
# of __getattr__ impl
if self._forms_genf is None:
try:
self._forms_genf = CachingGeneratorFunction(
self._forms_factory.forms())
except: # XXXX define exception!
self.set_response(self._response)
raise
self.global_form = getattr(
self._forms_factory, "global_form", None)
return self._forms_genf()
def links(self):
"""Return iterable over mechanize.Link-like objects.
Raises mechanize.ParseError on failure.
"""
if self._links_genf is None:
try:
self._links_genf = CachingGeneratorFunction(
self._links_factory.links())
except: # XXXX define exception!
self.set_response(self._response)
raise
return self._links_genf()
class DefaultFactory(Factory):
"""Based on sgmllib."""
def __init__(self, i_want_broken_xhtml_support=False):
Factory.__init__(
self,
forms_factory=FormsFactory(),
links_factory=LinksFactory(),
title_factory=TitleFactory(),
response_type_finder=ResponseTypeFinder(
allow_xhtml=i_want_broken_xhtml_support),
)
def set_response(self, response):
Factory.set_response(self, response)
if response is not None:
self._forms_factory.set_response(
copy.copy(response), self.encoding)
self._links_factory.set_response(
copy.copy(response), response.geturl(), self.encoding)
self._title_factory.set_response(
copy.copy(response), self.encoding)
class RobustFactory(Factory):
"""Based on BeautifulSoup, hopefully a bit more robust to bad HTML than is
DefaultFactory.
"""
def __init__(self, i_want_broken_xhtml_support=False,
soup_class=None):
Factory.__init__(
self,
forms_factory=RobustFormsFactory(),
links_factory=RobustLinksFactory(),
title_factory=RobustTitleFactory(),
response_type_finder=ResponseTypeFinder(
allow_xhtml=i_want_broken_xhtml_support),
)
if soup_class is None:
soup_class = MechanizeBs
self._soup_class = soup_class
def set_response(self, response):
Factory.set_response(self, response)
if response is not None:
data = response.read()
soup = self._soup_class(self.encoding, data)
self._forms_factory.set_response(
copy.copy(response), self.encoding)
self._links_factory.set_soup(
soup, response.geturl(), self.encoding)
self._title_factory.set_soup(soup, self.encoding)