| # Copyright 2013 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Interactive tool for finding reviewers/owners for a change.""" |
| |
| from __future__ import print_function |
| |
| import os |
| import copy |
| import owners as owners_module |
| import random |
| |
| |
| import gclient_utils |
| |
| |
| def first(iterable): |
| for element in iterable: |
| return element |
| |
| |
| class OwnersFinder(object): |
| COLOR_LINK = '\033[4m' |
| COLOR_BOLD = '\033[1;32m' |
| COLOR_GREY = '\033[0;37m' |
| COLOR_RESET = '\033[0m' |
| |
| indentation = 0 |
| |
| def __init__(self, files, local_root, author, reviewers, |
| fopen, os_path, |
| email_postfix='@chromium.org', |
| disable_color=False, |
| override_files=None, |
| ignore_author=False): |
| self.email_postfix = email_postfix |
| |
| if os.name == 'nt' or disable_color: |
| self.COLOR_LINK = '' |
| self.COLOR_BOLD = '' |
| self.COLOR_GREY = '' |
| self.COLOR_RESET = '' |
| |
| self.db = owners_module.Database(local_root, fopen, os_path) |
| self.db.override_files = override_files or {} |
| self.db.load_data_needed_for(files) |
| |
| self.os_path = os_path |
| |
| self.author = author |
| |
| filtered_files = files |
| |
| reviewers = list(reviewers) |
| if author and not ignore_author: |
| reviewers.append(author) |
| |
| # Eliminate files that existing reviewers can review. |
| filtered_files = list(self.db.files_not_covered_by( |
| filtered_files, reviewers)) |
| |
| # If some files are eliminated. |
| if len(filtered_files) != len(files): |
| files = filtered_files |
| # Reload the database. |
| self.db = owners_module.Database(local_root, fopen, os_path) |
| self.db.override_files = override_files or {} |
| self.db.load_data_needed_for(files) |
| |
| self.all_possible_owners = self.db.all_possible_owners(files, None) |
| if author and author in self.all_possible_owners: |
| del self.all_possible_owners[author] |
| |
| self.owners_to_files = {} |
| self._map_owners_to_files(files) |
| |
| self.files_to_owners = {} |
| self._map_files_to_owners() |
| |
| self.owners_score = self.db.total_costs_by_owner( |
| self.all_possible_owners, files) |
| |
| self.original_files_to_owners = copy.deepcopy(self.files_to_owners) |
| self.comments = self.db.comments |
| |
| # This is the queue that will be shown in the interactive questions. |
| # It is initially sorted by the score in descending order. In the |
| # interactive questions a user can choose to "defer" its decision, then the |
| # owner will be put to the end of the queue and shown later. |
| self.owners_queue = [] |
| |
| self.unreviewed_files = set() |
| self.reviewed_by = {} |
| self.selected_owners = set() |
| self.deselected_owners = set() |
| self.reset() |
| |
| def run(self): |
| self.reset() |
| while self.owners_queue and self.unreviewed_files: |
| owner = self.owners_queue[0] |
| |
| if (owner in self.selected_owners) or (owner in self.deselected_owners): |
| continue |
| |
| if not any((file_name in self.unreviewed_files) |
| for file_name in self.owners_to_files[owner]): |
| self.deselect_owner(owner) |
| continue |
| |
| self.print_info(owner) |
| |
| while True: |
| inp = self.input_command(owner) |
| if inp == 'y' or inp == 'yes': |
| self.select_owner(owner) |
| break |
| elif inp == 'n' or inp == 'no': |
| self.deselect_owner(owner) |
| break |
| elif inp == '' or inp == 'd' or inp == 'defer': |
| self.owners_queue.append(self.owners_queue.pop(0)) |
| break |
| elif inp == 'f' or inp == 'files': |
| self.list_files() |
| break |
| elif inp == 'o' or inp == 'owners': |
| self.list_owners(self.owners_queue) |
| break |
| elif inp == 'p' or inp == 'pick': |
| self.pick_owner(gclient_utils.AskForData('Pick an owner: ')) |
| break |
| elif inp.startswith('p ') or inp.startswith('pick '): |
| self.pick_owner(inp.split(' ', 2)[1].strip()) |
| break |
| elif inp == 'r' or inp == 'restart': |
| self.reset() |
| break |
| elif inp == 'q' or inp == 'quit': |
| # Exit with error |
| return 1 |
| |
| self.print_result() |
| return 0 |
| |
| def _map_owners_to_files(self, files): |
| for owner in self.all_possible_owners: |
| for dir_name, _ in self.all_possible_owners[owner]: |
| for file_name in files: |
| if file_name.startswith(dir_name): |
| self.owners_to_files.setdefault(owner, set()) |
| self.owners_to_files[owner].add(file_name) |
| |
| def _map_files_to_owners(self): |
| for owner in self.owners_to_files: |
| for file_name in self.owners_to_files[owner]: |
| self.files_to_owners.setdefault(file_name, set()) |
| self.files_to_owners[file_name].add(owner) |
| |
| def reset(self): |
| self.files_to_owners = copy.deepcopy(self.original_files_to_owners) |
| self.unreviewed_files = set(self.files_to_owners.keys()) |
| self.reviewed_by = {} |
| self.selected_owners = set() |
| self.deselected_owners = set() |
| |
| # Randomize owners' names so that if many reviewers have identical scores |
| # they will be randomly ordered to avoid bias. |
| owners = list(self.owners_to_files.keys()) |
| random.shuffle(owners) |
| self.owners_queue = sorted(owners, |
| key=lambda owner: self.owners_score[owner]) |
| self.find_mandatory_owners() |
| |
| def select_owner(self, owner, findMandatoryOwners=True): |
| if owner in self.selected_owners or owner in self.deselected_owners\ |
| or not (owner in self.owners_queue): |
| return |
| self.writeln('Selected: ' + owner) |
| self.owners_queue.remove(owner) |
| self.selected_owners.add(owner) |
| for file_name in filter( |
| lambda file_name: file_name in self.unreviewed_files, |
| self.owners_to_files[owner]): |
| self.unreviewed_files.remove(file_name) |
| self.reviewed_by[file_name] = owner |
| if findMandatoryOwners: |
| self.find_mandatory_owners() |
| |
| def deselect_owner(self, owner, findMandatoryOwners=True): |
| if owner in self.selected_owners or owner in self.deselected_owners\ |
| or not (owner in self.owners_queue): |
| return |
| self.writeln('Deselected: ' + owner) |
| self.owners_queue.remove(owner) |
| self.deselected_owners.add(owner) |
| for file_name in self.owners_to_files[owner] & self.unreviewed_files: |
| self.files_to_owners[file_name].remove(owner) |
| if findMandatoryOwners: |
| self.find_mandatory_owners() |
| |
| def find_mandatory_owners(self): |
| continues = True |
| for owner in self.owners_queue: |
| if owner in self.selected_owners: |
| continue |
| if owner in self.deselected_owners: |
| continue |
| if len(self.owners_to_files[owner] & self.unreviewed_files) == 0: |
| self.deselect_owner(owner, False) |
| |
| while continues: |
| continues = False |
| for file_name in filter( |
| lambda file_name: len(self.files_to_owners[file_name]) == 1, |
| self.unreviewed_files): |
| owner = first(self.files_to_owners[file_name]) |
| self.select_owner(owner, False) |
| continues = True |
| break |
| |
| def print_comments(self, owner): |
| if owner not in self.comments: |
| self.writeln(self.bold_name(owner)) |
| else: |
| self.writeln(self.bold_name(owner) + ' is commented as:') |
| self.indent() |
| if owners_module.GLOBAL_STATUS in self.comments[owner]: |
| self.writeln( |
| self.greyed(self.comments[owner][owners_module.GLOBAL_STATUS]) + |
| ' (global status)') |
| if len(self.comments[owner]) == 1: |
| self.unindent() |
| return |
| for path in self.comments[owner]: |
| if path == owners_module.GLOBAL_STATUS: |
| continue |
| elif len(self.comments[owner][path]) > 0: |
| self.writeln(self.greyed(self.comments[owner][path]) + |
| ' (at ' + self.bold(path or '<root>') + ')') |
| else: |
| self.writeln(self.greyed('[No comment] ') + ' (at ' + |
| self.bold(path or '<root>') + ')') |
| self.unindent() |
| |
| def print_file_info(self, file_name, except_owner=''): |
| if file_name not in self.unreviewed_files: |
| self.writeln(self.greyed(file_name + |
| ' (by ' + |
| self.bold_name(self.reviewed_by[file_name]) + |
| ')')) |
| else: |
| if len(self.files_to_owners[file_name]) <= 3: |
| other_owners = [] |
| for ow in self.files_to_owners[file_name]: |
| if ow != except_owner: |
| other_owners.append(self.bold_name(ow)) |
| self.writeln(file_name + |
| ' [' + (', '.join(other_owners)) + ']') |
| else: |
| self.writeln(file_name + ' [' + |
| self.bold(str(len(self.files_to_owners[file_name]))) + |
| ']') |
| |
| def print_file_info_detailed(self, file_name): |
| self.writeln(file_name) |
| self.indent() |
| for ow in sorted(self.files_to_owners[file_name]): |
| if ow in self.deselected_owners: |
| self.writeln(self.bold_name(self.greyed(ow))) |
| elif ow in self.selected_owners: |
| self.writeln(self.bold_name(self.greyed(ow))) |
| else: |
| self.writeln(self.bold_name(ow)) |
| self.unindent() |
| |
| def print_owned_files_for(self, owner): |
| # Print owned files |
| self.print_comments(owner) |
| self.writeln(self.bold_name(owner) + ' owns ' + |
| str(len(self.owners_to_files[owner])) + ' file(s):') |
| self.indent() |
| for file_name in sorted(self.owners_to_files[owner]): |
| self.print_file_info(file_name, owner) |
| self.unindent() |
| self.writeln() |
| |
| def list_owners(self, owners_queue): |
| if (len(self.owners_to_files) - len(self.deselected_owners) - |
| len(self.selected_owners)) > 3: |
| for ow in owners_queue: |
| if ow not in self.deselected_owners and ow not in self.selected_owners: |
| self.print_comments(ow) |
| else: |
| for ow in owners_queue: |
| if ow not in self.deselected_owners and ow not in self.selected_owners: |
| self.writeln() |
| self.print_owned_files_for(ow) |
| |
| def list_files(self): |
| self.indent() |
| if len(self.unreviewed_files) > 5: |
| for file_name in sorted(self.unreviewed_files): |
| self.print_file_info(file_name) |
| else: |
| for file_name in self.unreviewed_files: |
| self.print_file_info_detailed(file_name) |
| self.unindent() |
| |
| def pick_owner(self, ow): |
| # Allowing to omit domain suffixes |
| if ow not in self.owners_to_files: |
| if ow + self.email_postfix in self.owners_to_files: |
| ow += self.email_postfix |
| |
| if ow not in self.owners_to_files: |
| self.writeln('You cannot pick ' + self.bold_name(ow) + ' manually. ' + |
| 'It\'s an invalid name or not related to the change list.') |
| return False |
| elif ow in self.selected_owners: |
| self.writeln('You cannot pick ' + self.bold_name(ow) + ' manually. ' + |
| 'It\'s already selected.') |
| return False |
| elif ow in self.deselected_owners: |
| self.writeln('You cannot pick ' + self.bold_name(ow) + ' manually.' + |
| 'It\'s already unselected.') |
| return False |
| |
| self.select_owner(ow) |
| return True |
| |
| def print_result(self): |
| # Print results |
| self.writeln() |
| self.writeln() |
| if len(self.selected_owners) == 0: |
| self.writeln('This change list already has owner-reviewers for all ' |
| 'files.') |
| self.writeln('Use --ignore-current if you want to ignore them.') |
| else: |
| self.writeln('** You selected these owners **') |
| self.writeln() |
| for owner in self.selected_owners: |
| self.writeln(self.bold_name(owner) + ':') |
| self.indent() |
| for file_name in sorted(self.owners_to_files[owner]): |
| self.writeln(file_name) |
| self.unindent() |
| |
| def bold(self, text): |
| return self.COLOR_BOLD + text + self.COLOR_RESET |
| |
| def bold_name(self, name): |
| return (self.COLOR_BOLD + |
| name.replace(self.email_postfix, '') + self.COLOR_RESET) |
| |
| def greyed(self, text): |
| return self.COLOR_GREY + text + self.COLOR_RESET |
| |
| def indent(self): |
| self.indentation += 1 |
| |
| def unindent(self): |
| self.indentation -= 1 |
| |
| def print_indent(self): |
| return ' ' * self.indentation |
| |
| def writeln(self, text=''): |
| print(self.print_indent() + text) |
| |
| def hr(self): |
| self.writeln('=====================') |
| |
| def print_info(self, owner): |
| self.hr() |
| self.writeln( |
| self.bold(str(len(self.unreviewed_files))) + ' file(s) left.') |
| self.print_owned_files_for(owner) |
| |
| def input_command(self, owner): |
| self.writeln('Add ' + self.bold_name(owner) + ' as your reviewer? ') |
| return gclient_utils.AskForData( |
| '[yes/no/Defer/pick/files/owners/quit/restart]: ').lower() |