blob: 96de44ead13678797518aa88ab93eaccac1fa664 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Directly processes text of dos.xml.
DosXmlParser is called with an XML string to produce a list of BlackListEntry
objects containing the data from the XML.
DosXmlParser: converts XML to list of BlackListEntrys.
BlacklistEntry: describes a blacklisted IP.
"""
import re
from xml.etree import ElementTree
import ipaddr
from google.appengine.tools import xml_parser_utils
from google.appengine.tools.app_engine_config_exception import AppEngineConfigException
MISSING_SUBNET = '<blacklist> node must have a subnet specified'
BAD_IPV_SUBNET = '"%s" is not a valid IPv4 or IPv6 subnet'
BAD_PREFIX_LENGTH = ('Prefix length of subnet "%s" must be an integer '
'(quad-dotted masks are not supported)')
def GetDosYaml(unused_application, dos_xml_str):
return _MakeDosListIntoYaml(DosXmlParser().ProcessXml(dos_xml_str))
def _MakeDosListIntoYaml(dos_list):
"""Converts yaml statement list of blacklisted IP's into a string."""
statements = ['blacklist:']
for entry in dos_list:
statements += entry.ToYaml()
return '\n'.join(statements) + '\n'
class DosXmlParser(object):
"""Provides logic for walking down XML tree and pulling data."""
def ProcessXml(self, xml_str):
"""Parses XML string and returns object representation of relevant info.
Args:
xml_str: The XML string.
Returns:
A list of BlacklistEntry objects containing information about blacklisted
IP's specified in the XML.
Raises:
AppEngineConfigException: In case of malformed XML or illegal inputs.
"""
try:
self.blacklist_entries = []
self.errors = []
xml_root = ElementTree.fromstring(xml_str)
if xml_root.tag != 'blacklistentries':
raise AppEngineConfigException('Root tag must be <blacklistentries>')
for child in xml_root.getchildren():
self.ProcessBlacklistNode(child)
if self.errors:
raise AppEngineConfigException('\n'.join(self.errors))
return self.blacklist_entries
except ElementTree.ParseError:
raise AppEngineConfigException('Bad input -- not valid XML')
def ProcessBlacklistNode(self, node):
"""Processes XML <blacklist> nodes into BlacklistEntry objects.
The following information is parsed out:
subnet: The IP, in CIDR notation.
description: (optional)
If there are no errors, the data is loaded into a BlackListEntry object
and added to a list. Upon error, a description of the error is added to
a list and the method terminates.
Args:
node: <blacklist> XML node in dos.xml.
"""
tag = xml_parser_utils.GetTag(node)
if tag != 'blacklist':
self.errors.append('Unrecognized node: <%s>' % tag)
return
entry = BlacklistEntry()
entry.subnet = xml_parser_utils.GetChildNodeText(node, 'subnet')
entry.description = xml_parser_utils.GetChildNodeText(node, 'description')
validation = self._ValidateEntry(entry)
if validation:
self.errors.append(validation)
return
self.blacklist_entries.append(entry)
def _ValidateEntry(self, entry):
if not entry.subnet:
return MISSING_SUBNET
try:
ipaddr.IPNetwork(entry.subnet)
except ValueError:
return BAD_IPV_SUBNET % entry.subnet
parts = entry.subnet.split('/')
if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
return BAD_PREFIX_LENGTH % entry.subnet
class BlacklistEntry(object):
"""Instances contain information about individual blacklist entries."""
def ToYaml(self):
statements = ['- subnet: %s' % self.subnet]
if self.description:
statements.append(
' description: %s' % self._SanitizeForYaml(self.description))
return statements
def _SanitizeForYaml(self, dirty_str):
return "'%s'" % dirty_str.replace('\n', ' ')