| #!/usr/bin/env vpython | 
 | # Copyright 2020 The Chromium Authors. All rights reserved. | 
 | # Use of this source code is governed by a BSD-style license that can be | 
 | # found in the LICENSE file. | 
 | """Contains the parsers for .tsv and .xml files, annotations.tsv and | 
 | grouping.xml  respectively. Also includes methods to parse the json object | 
 | returned by the Google Doc API's .get() method. | 
 |  | 
 | These parsers are used to populate the duplicated Google Doc template with | 
 | several placeholders, and, to populate the traffic annotations with their | 
 | relevant attributes, e.g. description, policy, etc. | 
 | """ | 
 |  | 
 | from __future__ import print_function | 
 | from collections import namedtuple | 
 | from collections import OrderedDict | 
 | import xml.etree.ElementTree | 
 | import enum | 
 | import json | 
 | import csv | 
 | import sys | 
 | import io | 
 | import re | 
 |  | 
 | TrafficAnnotation = namedtuple( | 
 |     "TrafficAnnotation", | 
 |     ["unique_id", "description", "trigger", "data", "settings", "policy"]) | 
 |  | 
 |  | 
 | class Placeholder(str, enum.Enum): | 
 |   GROUP = "group" | 
 |   SENDER = "sender" | 
 |   ANNOTATION = "annotation" | 
 |   ANNOTATION_BOLD = "annotation_bold" | 
 |  | 
 |  | 
 | PLACEHOLDER_STYLES = { | 
 |     Placeholder.GROUP: { | 
 |         "bold": False, | 
 |         "font": "Roboto", | 
 |         "fontSize": 20, | 
 |         "namedStyleType": "HEADING_1" | 
 |     }, | 
 |     Placeholder.SENDER: { | 
 |         "bold": True, | 
 |         "font": "Roboto", | 
 |         "fontSize": 14, | 
 |         "namedStyleType": "HEADING_2" | 
 |     }, | 
 |     Placeholder.ANNOTATION: { | 
 |         "bold": False, | 
 |         "font": "Roboto", | 
 |         "fontSize": 9 | 
 |     }, | 
 |     Placeholder.ANNOTATION_BOLD: { | 
 |         "bold": True, | 
 |         "font": "Roboto", | 
 |         "fontSize": 9 | 
 |     } | 
 | } | 
 |  | 
 |  | 
 | def utf_8_encoder(input_file): | 
 |   for line in input_file: | 
 |     yield line.encode("utf-8") | 
 |  | 
 |  | 
 | def load_tsv_file(file_path, verbose): | 
 |   """ Loads annotations TSV file. | 
 |  | 
 |   Args: | 
 |     file_path: str | 
 |       Path to the TSV file. | 
 |     verbose: bool | 
 |       Whether to print messages about ignored rows. | 
 |  | 
 |   Returns: | 
 |     list of list Table of loaded annotations. | 
 |   """ | 
 |   rows = [] | 
 |   with io.open(file_path, mode="r", encoding="utf-8") as csvfile: | 
 |     # CSV library does not support unicode, so encoding to utf-8 and back. | 
 |     reader = csv.reader(utf_8_encoder(csvfile), delimiter="\t") | 
 |     for row in reader: | 
 |       row = [unicode(col, "utf-8") for col in row] | 
 |       # If the last column of the file_row is empty, the row belongs to a | 
 |       # platform different from the one that TSV file is generated on, hence it | 
 |       # should be ignored. | 
 |       if row[-1]: | 
 |         rows.append(row) | 
 |       elif verbose: | 
 |         print("Ignored from other platforms: %s" % row[0]) | 
 |   return rows | 
 |  | 
 |  | 
 | def map_annotations(tsv_contents): | 
 |   """Creates a mapping between the unique_id of a given annotation and its | 
 |   relevant attributes, e.g. description, trigger, data, etc. | 
 |  | 
 |   Args: | 
 |     tsv_contents: List[List] | 
 |       Table of loaded annotations. | 
 |  | 
 |   Returns: | 
 |     unique_id_rel_attributes_map: <Dict[str, TrafficAnnotation]> | 
 |   """ | 
 |   unique_id_rel_attributes_map = {} | 
 |   for annotation_row in tsv_contents: | 
 |     unique_id = annotation_row[0].encode("utf-8") | 
 |     description = annotation_row[3].encode("utf-8") | 
 |     trigger = annotation_row[4].encode("utf-8") | 
 |     data = annotation_row[5].encode("utf-8") | 
 |     settings = annotation_row[9].encode("utf-8") | 
 |     policy = annotation_row[10].encode("utf-8") | 
 |     payload = [unique_id, description, trigger, data, settings, policy] | 
 |  | 
 |     unique_id_rel_attributes_map[unique_id] = TrafficAnnotation._make(payload) | 
 |   return unique_id_rel_attributes_map | 
 |  | 
 |  | 
 | class XMLParser: | 
 |   """Parses grouping.xml with the aim of generating the placeholders list""" | 
 |  | 
 |   def __init__(self, file_path, annotations_mapping): | 
 |     """ | 
 |     Args: | 
 |       file_path: str | 
 |         The file path to the xml to parse. Ostensibly, grouping.xml located | 
 |         within traffic_annotation/summary. | 
 |       annotations_mapping: Dict[str, dict] | 
 |           The mapping between a given annotation's unique_id and its relevant | 
 |           attributes, e.g. description, policy, data, etc. | 
 |     """ | 
 |     self.parsed_xml = {} | 
 |     self.annotations_mapping = annotations_mapping | 
 |  | 
 |     self.parse_xml(file_path) | 
 |  | 
 |   def parse_xml(self, file_path): | 
 |     """Parses the grouping.xml file and populates self.parsed_xml. | 
 |  | 
 |     self.parsed_xml: <{Group1: {sender: [traffic_annotations]}, ...}> | 
 |     """ | 
 |     tree = xml.etree.ElementTree.parse(file_path) | 
 |     root = tree.getroot() | 
 |     for group in root.iter("group"): | 
 |       assert group.tag == "group" | 
 |       group_name = group.attrib["name"] | 
 |       # Suppress if hidden="true" in the group block. Will not include any of | 
 |       # the senders and annotations in the block. | 
 |       if group.attrib.get("hidden", "") == "true": | 
 |         continue | 
 |       self.parsed_xml[group_name] = {} | 
 |  | 
 |       for sender in group.iter("sender"): | 
 |         sender_name = sender.attrib["name"] | 
 |         # Suppress if hidden="true" (or hidden is even mentioned) in the given | 
 |         # annotation, don't include in traffic_annotations. | 
 |         traffic_annotations = sorted([ | 
 |             t_annotation.attrib["unique_id"] | 
 |             for t_annotation in sender.iter("traffic_annotation") | 
 |             if t_annotation.attrib.get("hidden", "") != "true" | 
 |         ]) | 
 |         self.parsed_xml[group_name][sender_name] = traffic_annotations | 
 |  | 
 |   def _sort_parsed_xml(self): | 
 |     """Sort on the group and sender keys in alphabetical order, note that | 
 |     annotations are already sorted.""" | 
 |     self.parsed_xml = { | 
 |         k: OrderedDict(sorted(v.items())) | 
 |         for k, v in self.parsed_xml.items() | 
 |     } | 
 |     self.parsed_xml = OrderedDict( | 
 |         sorted(self.parsed_xml.items(), key=lambda t: t[0])) | 
 |  | 
 |   def _add_group_placeholder(self, name): | 
 |     return {"type": Placeholder.GROUP, "name": name} | 
 |  | 
 |   def _add_sender_placeholder(self, name): | 
 |     return {"type": Placeholder.SENDER, "name": name} | 
 |  | 
 |   def _add_annotation_placeholder(self, unique_id): | 
 |     """ | 
 |     Args: | 
 |       unique_id: str | 
 |         The annotation's unique_id. | 
 |     """ | 
 |     traffic_annotation = self.annotations_mapping.get(unique_id, None) | 
 |     is_complete = traffic_annotation and all(traffic_annotation) | 
 |     if not is_complete: | 
 |       print( | 
 |           "Warning: {} row is empty in annotations.tsv but is in grouping.xml". | 
 |           format(unique_id)) | 
 |       traffic_annotation = TrafficAnnotation(unique_id, "NA", "NA", "NA", "NA", | 
 |                                              "NA") | 
 |  | 
 |     return { | 
 |         "type": Placeholder.ANNOTATION, | 
 |         "traffic_annotation": traffic_annotation | 
 |     } | 
 |  | 
 |   def build_placeholders(self): | 
 |     """ | 
 |     Returns: | 
 |       The placeholders <list> to be added in the order of their appearance. | 
 |       The annotations are the TrafficAnnotation objects with the relevant | 
 |       information. | 
 |     """ | 
 |     self._sort_parsed_xml() | 
 |     placeholders = [] | 
 |  | 
 |     for group, senders in self.parsed_xml.items(): | 
 |       placeholders.append(self._add_group_placeholder(group)) | 
 |       for sender, annotations in senders.items(): | 
 |         placeholders.append(self._add_sender_placeholder(sender)) | 
 |         for annotation in annotations: | 
 |           placeholders.append(self._add_annotation_placeholder(annotation)) | 
 |     return placeholders | 
 |  | 
 |  | 
 | def jprint(msg): | 
 |   print(json.dumps(msg, indent=4), file=sys.stderr) | 
 |  | 
 |  | 
 | def extract_body(document=None, target="body", json_file_path="template.json"): | 
 |   """Google Doc API returns a .json object. Parse this doc object to obtain its | 
 |   body. | 
 |  | 
 |   The |template.json| object of the current state of | 
 |   the document can be obtained by running the update_annotations_doc.py script | 
 |   using the --debug flag. | 
 |   """ | 
 |   if document: | 
 |     doc = document | 
 |   else: | 
 |     try: | 
 |       with open(json_file_path) as json_file: | 
 |         doc = json.load(json_file) | 
 |     except IOError: | 
 |       print("Couldn't find the .json file.") | 
 |  | 
 |   if target == "all": | 
 |     return doc | 
 |   return doc[target] | 
 |  | 
 |  | 
 | def find_first_index(doc): | 
 |   """Finds the cursor index (location) that comes right after the Introduction | 
 |   section. Namely, the endIndex of the paragraph block the |target_text| belongs | 
 |   to. | 
 |  | 
 |   Returns: int | 
 |     The first cursor index (loc) of the template document, right after the | 
 |     Introduction section. | 
 |   """ | 
 |   target_text = "The policy, if one exists, to control this type of network" | 
 |   padding = 1  # We pad so as to overwrite cleanly. | 
 |  | 
 |   body = extract_body(document=doc) | 
 |   contents = body["content"] | 
 |   for element in contents: | 
 |     if "paragraph" in element: | 
 |       end_index = element["endIndex"] | 
 |       lines = element["paragraph"]["elements"] | 
 |       for text_run in lines: | 
 |         if target_text in text_run["textRun"]["content"]: | 
 |           return end_index + padding | 
 |  | 
 |  | 
 | def find_last_index(doc): | 
 |   """ | 
 |   Returns: int | 
 |     The last cursor index (loc) of the template document. | 
 |   """ | 
 |   body = extract_body(document=doc) | 
 |   contents = body["content"] | 
 |   last_index = contents[-1]["endIndex"] | 
 |   return last_index - 1 | 
 |  | 
 |  | 
 | def find_chrome_browser_version(doc): | 
 |   """Finds what the current chrome browser version is in the document. | 
 |  | 
 |   We grab the current "Chrome Browser version MAJOR.MINOR.BUILD.PATCH" from the | 
 |   document's header. | 
 |  | 
 |   Returns: str | 
 |     The chrome browser version string. | 
 |   """ | 
 |   # Only one header. | 
 |   header = extract_body(document=doc, target="headers").values()[0] | 
 |   header_elements = header["content"][0]["paragraph"]["elements"] | 
 |   text = header_elements[0]["textRun"]["content"] | 
 |   current_version = re.search(r"([\d.]+)", text).group() | 
 |   return current_version | 
 |  | 
 |  | 
 | def find_bold_ranges(doc, debug=False): | 
 |   """Finds parts to bold given the targets of "trigger", "data", etc. | 
 |  | 
 |   Returns: | 
 |     The startIndex <int> and endIndex <int> tuple pairs as a list for all | 
 |     occurrences of the targets. <List[Tuple[int, int]]> | 
 |   """ | 
 |   bold_ranges = [] | 
 |   targets = ["Trigger", "Data", "Settings", "Policy"] | 
 |   content = extract_body(document=doc)["content"] | 
 |  | 
 |   for i, element in enumerate(content): | 
 |     element_type = list(element.keys())[-1] | 
 |  | 
 |     if element_type != "table": | 
 |       continue | 
 |  | 
 |     # Recall that table is 1x2 in Docs, first cell contains unique_id, second | 
 |     # cell has traffic annotation relevant attributes. | 
 |  | 
 |     # Unique id column, messy parsing through. You can inspect the json output | 
 |     # with jprint() to confirm/debug if broken. | 
 |     unique_id_col = element["table"]["tableRows"][0]["tableCells"][0][ | 
 |         "content"][0]["paragraph"]["elements"][0] | 
 |     if debug: | 
 |       jprint(unique_id_col) | 
 |     assert "textRun" in unique_id_col, "Not the correct unique_id cell" | 
 |  | 
 |     start_index = unique_id_col["startIndex"] | 
 |     end_index = unique_id_col["endIndex"] | 
 |     bold_ranges.append((start_index, end_index)) | 
 |  | 
 |     start_index, end_index = None, None  # Reset | 
 |  | 
 |     # The info column, messy parsing through. You can inspect the json output | 
 |     # with jprint() to confirm/debug if broken. | 
 |     info_elements = element["table"]["tableRows"][0]["tableCells"][1]["content"] | 
 |     for i, info_col in enumerate(info_elements): | 
 |       info_col = info_elements[i] | 
 |  | 
 |       start_index = info_col["startIndex"] | 
 |       content = info_col["paragraph"]["elements"][0]["textRun"]["content"] | 
 |       # To find the end_index, run through and find something in targets. | 
 |       for target in targets: | 
 |         if content.find("{}:".format(target)) != -1: | 
 |           # Contains the string "|target|:" | 
 |           end_index = start_index + len(target) + 1 | 
 |           bold_ranges.append((start_index, end_index)) | 
 |           break | 
 |  | 
 |       if debug: | 
 |         jprint(info_col) | 
 |         print("#" * 30) | 
 |  | 
 |   return bold_ranges |