| """ robotparser.py |
| |
| Copyright (C) 2000 Bastian Kleineidam |
| |
| You can choose between two licenses when using this package: |
| 1) GNU GPLv2 |
| 2) PSF license for Python 2.2 |
| |
| The robots.txt Exclusion Protocol is implemented as specified in |
| RFC 9309 |
| """ |
| |
| import collections |
| import re |
| import urllib.error |
| import urllib.parse |
| import urllib.request |
| |
| __all__ = ["RobotFileParser"] |
| |
| RequestRate = collections.namedtuple("RequestRate", "requests seconds") |
| |
| |
| class RobotFileParser: |
| """ This class provides a set of methods to read, parse and answer |
| questions about a single robots.txt file. |
| |
| """ |
| |
| def __init__(self, url=''): |
| self.entries = [] |
| self.groups = {} |
| self.sitemaps = [] |
| self.default_entry = None |
| self.disallow_all = False |
| self.allow_all = False |
| self.set_url(url) |
| self.last_checked = 0 |
| |
| def mtime(self): |
| """Returns the time the robots.txt file was last fetched. |
| |
| This is useful for long-running web spiders that need to |
| check for new robots.txt files periodically. |
| |
| """ |
| return self.last_checked |
| |
| def modified(self): |
| """Sets the time the robots.txt file was last fetched to the |
| current time. |
| |
| """ |
| import time |
| self.last_checked = time.time() |
| |
| def set_url(self, url): |
| """Sets the URL referring to a robots.txt file.""" |
| self.url = url |
| self.host, self.path = urllib.parse.urlsplit(url)[1:3] |
| |
| def read(self): |
| """Reads the robots.txt URL and feeds it to the parser.""" |
| try: |
| f = urllib.request.urlopen(self.url) |
| except urllib.error.HTTPError as err: |
| if err.code in (401, 403): |
| self.disallow_all = True |
| elif err.code >= 400 and err.code < 500: |
| self.allow_all = True |
| err.close() |
| else: |
| raw = f.read() |
| self.parse(raw.decode("utf-8", "surrogateescape").splitlines()) |
| |
| def _add_entry(self, entry): |
| self.entries.append(entry) |
| for agent in entry.useragents: |
| agent = agent.lower() |
| if agent not in self.groups: |
| self.groups[agent] = entry |
| else: |
| self.groups[agent] = merge_entries(self.groups[agent], entry) |
| |
| def parse(self, lines): |
| """Parse the input lines from a robots.txt file. |
| |
| We allow that a user-agent: line is not preceded by |
| one or more blank lines. |
| """ |
| entries = [] |
| # states: |
| # 0: start state |
| # 1: saw user-agent line |
| # 2: saw an allow or disallow line |
| state = 0 |
| entry = Entry() |
| |
| self.modified() |
| for line in lines: |
| # remove optional comment and strip line |
| i = line.find('#') |
| if i >= 0: |
| line = line[:i] |
| line = line.strip() |
| if not line: |
| continue |
| line = line.split(':', 1) |
| if len(line) == 2: |
| line[0] = line[0].strip().lower() |
| line[1] = line[1].strip() |
| if line[0] == "user-agent": |
| if state == 2: |
| self._add_entry(entry) |
| entry = Entry() |
| product_token = line[1] |
| entry.useragents.append(product_token) |
| state = 1 |
| elif line[0] == "disallow": |
| if state != 0: |
| state = 2 |
| try: |
| entry.rulelines.append(RuleLine(line[1], False)) |
| except ValueError: |
| pass |
| elif line[0] == "allow": |
| if state != 0: |
| state = 2 |
| try: |
| entry.rulelines.append(RuleLine(line[1], True)) |
| except ValueError: |
| pass |
| elif line[0] == "crawl-delay": |
| if state != 0: |
| # before trying to convert to int we need to make |
| # sure that robots.txt has valid syntax otherwise |
| # it will crash |
| if line[1].strip().isdigit(): |
| entry.delay = int(line[1]) |
| state = 2 |
| elif line[0] == "request-rate": |
| if state != 0: |
| numbers = line[1].split('/') |
| # check if all values are sane |
| if (len(numbers) == 2 and numbers[0].strip().isdigit() |
| and numbers[1].strip().isdigit()): |
| entry.req_rate = RequestRate(int(numbers[0]), int(numbers[1])) |
| state = 2 |
| elif line[0] == "sitemap": |
| # According to http://www.sitemaps.org/protocol.html |
| # "This directive is independent of the user-agent line, |
| # so it doesn't matter where you place it in your file." |
| # Therefore we do not change the state of the parser. |
| self.sitemaps.append(line[1]) |
| if state != 0: |
| self._add_entry(entry) |
| |
| def _find_entry(self, useragent): |
| entry = self.groups.get(useragent.lower()) |
| if entry is not None: |
| return entry |
| for entry in self.groups.values(): |
| if entry.applies_to(useragent): |
| return entry |
| return self.groups.get('*') |
| |
| def can_fetch(self, useragent, url): |
| """using the parsed robots.txt decide if useragent can fetch url""" |
| if self.disallow_all: |
| return False |
| if self.allow_all: |
| return True |
| # Until the robots.txt file has been read or found not |
| # to exist, we must assume that no url is allowable. |
| # This prevents false positives when a user erroneously |
| # calls can_fetch() before calling read(). |
| if not self.last_checked: |
| return False |
| # TODO: The private API is used in order to preserve an empty query. |
| # This is temporary until the public API starts supporting this feature. |
| parsed_url = urllib.parse._urlsplit(url, '') |
| url = urllib.parse._urlunsplit(None, None, *parsed_url[2:]) |
| url = normalize_uri(url) |
| if not url: |
| url = "/" |
| if url == '/robots.txt': |
| # The /robots.txt URI is implicitly allowed. |
| return True |
| entry = self._find_entry(useragent) |
| if entry is None: |
| return True |
| return entry.allowance(url) |
| |
| def crawl_delay(self, useragent): |
| if not self.mtime(): |
| return None |
| entry = self._find_entry(useragent) |
| if entry is None: |
| return None |
| return entry.delay |
| |
| def request_rate(self, useragent): |
| if not self.mtime(): |
| return None |
| entry = self._find_entry(useragent) |
| if entry is None: |
| return None |
| return entry.req_rate |
| |
| def site_maps(self): |
| if not self.sitemaps: |
| return None |
| return self.sitemaps |
| |
| def __str__(self): |
| entries = self.entries |
| if self.default_entry is not None: |
| entries = entries + [self.default_entry] |
| return '\n\n'.join(filter(None, map(str, entries))) |
| |
| class RuleLine: |
| """A rule line is a single "Allow:" (allowance==True) or "Disallow:" |
| (allowance==False) followed by a path.""" |
| def __init__(self, path, allowance): |
| if path == '' and not allowance: |
| # an empty value means allow all |
| allowance = True |
| path = re.sub(r'[*]{2,}', '*', path) |
| path = re.sub(r'[$][$*]+', '$', path) |
| path = normalize_pattern(path) |
| self.fullmatch = path.endswith('$') |
| path = path.rstrip('$') |
| if '$' in path: |
| raise ValueError('$ not at the end of path') |
| self.matcher = None |
| if '*' in path: |
| pattern = re.compile(translate_pattern(path), re.DOTALL) |
| if self.fullmatch: |
| self.matcher = pattern.fullmatch |
| else: |
| self.matcher = pattern.match |
| self.path = path |
| self.allowance = allowance |
| |
| def applies_to(self, filename): |
| # If the filename matches the rule, return the matching length plus 1. |
| # If it does not match, return 0. |
| if self.matcher is not None: |
| m = self.matcher(filename) |
| if m: |
| return m.end() + 1 |
| else: |
| if self.fullmatch: |
| if filename == self.path: |
| return len(self.path) + 1 |
| else: |
| if filename.startswith(self.path): |
| return len(self.path) + 1 |
| return 0 |
| |
| def __str__(self): |
| return (("Allow" if self.allowance else "Disallow") + ": " + self.path |
| + ('$' if self.fullmatch else '')) |
| |
| |
| class Entry: |
| """An entry has one or more user-agents and zero or more rulelines""" |
| def __init__(self): |
| self.useragents = [] |
| self.rulelines = [] |
| self.delay = None |
| self.req_rate = None |
| |
| def __str__(self): |
| if not self.useragents: |
| return '' |
| ret = [] |
| for agent in self.useragents: |
| ret.append(f"User-agent: {agent}") |
| if self.delay is not None: |
| ret.append(f"Crawl-delay: {self.delay}") |
| if self.req_rate is not None: |
| rate = self.req_rate |
| ret.append(f"Request-rate: {rate.requests}/{rate.seconds}") |
| if self.rulelines: |
| ret.extend(map(str, self.rulelines)) |
| else: |
| ret.append("Allow:") |
| return '\n'.join(ret) |
| |
| def applies_to(self, useragent): |
| """check if this entry applies to the specified agent""" |
| if useragent is None: |
| return '*' in self.useragents |
| # split the name token and make it lower case |
| useragent = useragent.split("/")[0].lower() |
| for agent in self.useragents: |
| if agent != '*': |
| agent = agent.lower() |
| if agent in useragent: |
| return True |
| return False |
| |
| def allowance(self, filename): |
| """Preconditions: |
| - our agent applies to this entry |
| - filename is URL encoded |
| """ |
| best_match = -1 |
| allowance = True |
| for line in self.rulelines: |
| m = line.applies_to(filename) |
| if m: |
| if m > best_match: |
| best_match = m |
| allowance = line.allowance |
| elif m == best_match and not allowance: |
| allowance = line.allowance |
| return allowance |
| |
| |
| def normalize(path): |
| unquoted = urllib.parse.unquote(path, errors='surrogateescape') |
| return urllib.parse.quote(unquoted, errors='surrogateescape') |
| |
| def normalize_uri(path): |
| path, sep, query = path.partition('?') |
| path = normalize(path) |
| if sep: |
| query = re.sub(r'[^=&]+', lambda m: normalize(m[0]), query) |
| path += '?' + query |
| return path |
| |
| def normalize_pattern(path): |
| path, sep, query = path.partition('?') |
| path = re.sub(r'[^*$]+', lambda m: normalize(m[0]), path) |
| if sep: |
| query = re.sub(r'[^=&*$]+', lambda m: normalize(m[0]), query) |
| path += '?' + query |
| return path |
| |
| def translate_pattern(path): |
| parts = list(map(re.escape, path.split('*'))) |
| for i in range(1, len(parts)-1): |
| parts[i] = f'(?>.*?{parts[i]})' |
| parts[-1] = f'.*{parts[-1]}' |
| return ''.join(parts) |
| |
| def merge_entries(e1, e2): |
| entry = Entry() |
| entry.useragents = list(filter(set(e2.useragents).__contains__, e1.useragents)) |
| entry.rulelines = e1.rulelines + e2.rulelines |
| entry.delay = e1.delay if e2.delay is None else e2.delay |
| entry.req_rate = e1.req_rate if e2.req_rate is None else e2.req_rate |
| return entry |