| # Copyright 2016 The Chromium Authors. All rights reserved. |
| # Use of this source code is govered by a BSD-style |
| # license that can be found in the LICENSE file or at |
| # https://developers.google.com/open-source/licenses/bsd |
| |
| """Fake object classes that are useful for unit tests.""" |
| |
| import collections |
| import logging |
| import re |
| |
| import settings |
| from framework import framework_helpers |
| from framework import monorailrequest |
| from framework import permissions |
| from framework import validate |
| from proto import project_pb2 |
| from proto import tracker_pb2 |
| from proto import user_pb2 |
| from proto import usergroup_pb2 |
| from services import caches |
| from services import issue_svc |
| from services import project_svc |
| from services import user_svc |
| from tracker import tracker_bizobj |
| from tracker import tracker_constants |
| |
| # Many fakes return partial or constant values, regardless of their arguments. |
| # pylint: disable=unused-argument |
| |
| BOUNDARY = '-----thisisaboundary' |
| OWNER_ROLE = 'OWNER_ROLE' |
| COMMITTER_ROLE = 'COMMITTER_ROLE' |
| CONTRIBUTOR_ROLE = 'CONTRIBUTOR_ROLE' |
| |
| |
| def Project( |
| project_name='proj', project_id=None, state=project_pb2.ProjectState.LIVE, |
| access=project_pb2.ProjectAccess.ANYONE, moved_to=None, |
| cached_content_timestamp=None, |
| owner_ids=None, committer_ids=None, contributor_ids=None): |
| """Returns a project protocol buffer with the given attributes.""" |
| project_id = project_id or hash(project_name) |
| return project_pb2.MakeProject( |
| project_name, project_id=project_id, state=state, access=access, |
| moved_to=moved_to, cached_content_timestamp=cached_content_timestamp, |
| owner_ids=owner_ids, committer_ids=committer_ids, |
| contributor_ids=contributor_ids) |
| |
| |
| def MakeTestIssue( |
| project_id, local_id, summary, status, owner_id, labels=None, |
| derived_labels=None, derived_status=None, merged_into=0, star_count=0, |
| derived_owner_id=0, issue_id=None, reporter_id=None, opened_timestamp=None, |
| closed_timestamp=None, modified_timestamp=None, is_spam=False, |
| component_ids=None, project_name=None, field_values=None): |
| """Easily make an Issue for testing.""" |
| issue = tracker_pb2.Issue() |
| issue.project_id = project_id |
| issue.project_name = project_name |
| issue.local_id = local_id |
| issue.issue_id = issue_id if issue_id else 100000 + local_id |
| issue.reporter_id = reporter_id if reporter_id else owner_id |
| issue.summary = summary |
| issue.status = status |
| issue.owner_id = owner_id |
| issue.derived_owner_id = derived_owner_id |
| issue.star_count = star_count |
| issue.merged_into = merged_into |
| issue.is_spam = is_spam |
| if opened_timestamp: |
| issue.opened_timestamp = opened_timestamp |
| if modified_timestamp: |
| issue.modified_timestamp = modified_timestamp |
| if closed_timestamp: |
| issue.closed_timestamp = closed_timestamp |
| if labels is not None: |
| if isinstance(labels, basestring): |
| labels = labels.split() |
| issue.labels.extend(labels) |
| if derived_labels is not None: |
| if isinstance(derived_labels, basestring): |
| derived_labels = derived_labels.split() |
| issue.derived_labels.extend(derived_labels) |
| if derived_status is not None: |
| issue.derived_status = derived_status |
| if component_ids is not None: |
| issue.component_ids = component_ids |
| if field_values is not None: |
| issue.field_values = field_values |
| return issue |
| |
| |
| def MakeTestConfig(project_id, labels, statuses): |
| """Convenient function to make a ProjectIssueConfig object.""" |
| config = tracker_bizobj.MakeDefaultProjectIssueConfig(project_id) |
| if isinstance(labels, basestring): |
| labels = labels.split() |
| if isinstance(statuses, basestring): |
| statuses = statuses.split() |
| config.well_known_labels = [ |
| tracker_pb2.LabelDef(label=lab) for lab in labels] |
| config.well_known_statuses = [ |
| tracker_pb2.StatusDef(status=stat) for stat in statuses] |
| return config |
| |
| |
| class MonorailConnection(object): |
| """Fake connection to databases for use in tests.""" |
| |
| def Commit(self): |
| pass |
| |
| def Close(self): |
| pass |
| |
| |
| class MonorailRequest(monorailrequest.MonorailRequest): |
| """Subclass of MonorailRequest suitable for testing.""" |
| |
| def __init__(self, user_info=None, project=None, perms=None, **kwargs): |
| """Construct a test MonorailRequest. |
| |
| Typically, this is constructed via testing.helpers.GetRequestObjects, |
| which also causes url parsing and optionally initializes the user, |
| project, and permissions info. |
| |
| Args: |
| user_info: a dict of user attributes to set on a MonorailRequest object. |
| For example, "user_id: 5" causes self.auth.user_id=5. |
| project: the Project pb for this request. |
| perms: a PermissionSet for this request. |
| """ |
| super(MonorailRequest, self).__init__(**kwargs) |
| |
| if user_info is not None: |
| for key in user_info: |
| setattr(self.auth, key, user_info[key]) |
| if 'user_id' in user_info: |
| self.auth.effective_ids = {user_info['user_id']} |
| |
| self.perms = perms or permissions.ADMIN_PERMISSIONSET |
| self.project = project |
| |
| |
| class UserGroupService(object): |
| """Fake UserGroupService class for testing other code.""" |
| |
| def __init__(self): |
| self.group_settings = {} |
| self.group_members = {} |
| self.group_addrs = {} |
| self.role_dict = {} |
| |
| def TestAddGroupSettings( |
| self, group_id, email, who_can_view=None, anyone_can_join=False, |
| who_can_add=None, external_group_type=None, |
| last_sync_time=0, friend_projects=None): |
| """Set up a fake group for testing. |
| |
| Args: |
| group_id: int user ID of the new user group. |
| email: string email address to identify the user group. |
| who_can_view: string enum 'owners', 'members', or 'anyone'. |
| anyone_can_join: optional boolean to allow any users to join the group. |
| who_can_add: optional list of int user IDs of users who can add |
| more members to the group. |
| """ |
| friend_projects = friend_projects or [] |
| group_settings = usergroup_pb2.MakeSettings( |
| who_can_view or 'members', |
| external_group_type, last_sync_time, friend_projects) |
| self.group_settings[group_id] = group_settings |
| self.group_addrs[group_id] = email |
| # TODO(jrobbins): store the other settings. |
| |
| def TestAddMembers(self, group_id, user_ids, role='member'): |
| self.group_members.setdefault(group_id, []).extend(user_ids) |
| for user_id in user_ids: |
| self.role_dict.setdefault(group_id, {})[user_id] = role |
| |
| def LookupMemberships(self, _cnxn, user_id): |
| memberships = { |
| group_id for group_id, member_ids in self.group_members.iteritems() |
| if user_id in member_ids} |
| return memberships |
| |
| def DetermineWhichUserIDsAreGroups(self, _cnxn, user_ids): |
| return [uid for uid in user_ids |
| if uid in self.group_settings] |
| |
| def GetAllUserGroupsInfo(self, cnxn): |
| infos = [] |
| for group_id in self.group_settings: |
| infos.append( |
| (self.group_addrs[group_id], |
| len(self.group_members.get(group_id, [])), |
| self.group_settings[group_id], group_id)) |
| |
| return infos |
| |
| def GetAllGroupSettings(self, _cnxn, group_ids): |
| return {gid: self.group_settings[gid] |
| for gid in group_ids |
| if gid in self.group_settings} |
| |
| def GetGroupSettings(self, cnxn, group_id): |
| return self.GetAllGroupSettings(cnxn, [group_id]).get(group_id) |
| |
| def CreateGroup(self, cnxn, services, email, who_can_view_members, |
| ext_group_type=None, friend_projects=None): |
| friend_projects = friend_projects or [] |
| group_id = services.user.LookupUserID( |
| cnxn, email, autocreate=True, allowgroups=True) |
| group_settings = usergroup_pb2.MakeSettings( |
| who_can_view_members, ext_group_type, 0, friend_projects) |
| self.UpdateSettings(cnxn, group_id, group_settings) |
| return group_id |
| |
| def DeleteGroups(self, cnxn, group_ids): |
| member_ids_dict, owner_ids_dict = self.LookupMembers(cnxn, group_ids) |
| citizens_id_dict = collections.defaultdict(list) |
| for g_id, user_ids in member_ids_dict.iteritems(): |
| citizens_id_dict[g_id].extend(user_ids) |
| for g_id, user_ids in owner_ids_dict.iteritems(): |
| citizens_id_dict[g_id].extend(user_ids) |
| for g_id, citizen_ids in citizens_id_dict.iteritems(): |
| # Remove group members, friend projects and settings |
| self.RemoveMembers(cnxn, g_id, citizen_ids) |
| self.group_settings.pop(g_id, None) |
| |
| def LookupMembers(self, _cnxn, group_id_list): |
| members_dict = {} |
| owners_dict = {} |
| for gid in group_id_list: |
| members_dict[gid] = [] |
| owners_dict[gid] = [] |
| for mid in self.group_members.get(gid, []): |
| if self.role_dict.get(gid, {}).get(mid) == 'owner': |
| owners_dict[gid].append(mid) |
| elif self.role_dict.get(gid, {}).get(mid) == 'member': |
| members_dict[gid].append(mid) |
| return members_dict, owners_dict |
| |
| def LookupAllMembers(self, _cnxn, group_id_list): |
| direct_members, direct_owners = self.LookupMembers( |
| _cnxn, group_id_list) |
| members_dict = {} |
| owners_dict = {} |
| for gid in group_id_list: |
| members = direct_members[gid] |
| owners = direct_owners[gid] |
| owners_dict[gid] = owners |
| members_dict[gid] = members |
| group_ids = set([uid for uid in members + owners |
| if uid in self.group_settings]) |
| while group_ids: |
| indirect_members, indirect_owners = self.LookupMembers( |
| _cnxn, group_ids) |
| child_members = set() |
| child_owners = set() |
| for _, children in indirect_members.iteritems(): |
| child_members.update(children) |
| for _, children in indirect_owners.iteritems(): |
| child_owners.update(children) |
| members_dict[gid].extend(list(child_members)) |
| owners_dict[gid].extend(list(child_owners)) |
| group_ids = set(self.DetermineWhichUserIDsAreGroups( |
| _cnxn, list(child_members) + list(child_owners))) |
| members_dict[gid] = list(set(members_dict[gid])) |
| return members_dict, owners_dict |
| |
| |
| def RemoveMembers(self, _cnxn, group_id, old_member_ids): |
| current_member_ids = self.group_members.get(group_id, []) |
| revised_member_ids = [mid for mid in current_member_ids |
| if mid not in old_member_ids] |
| self.group_members[group_id] = revised_member_ids |
| |
| def UpdateMembers(self, _cnxn, group_id, member_ids, new_role): |
| self.RemoveMembers(_cnxn, group_id, member_ids) |
| self.TestAddMembers(group_id, member_ids, new_role) |
| |
| def UpdateSettings(self, _cnxn, group_id, group_settings): |
| self.group_settings[group_id] = group_settings |
| |
| def ExpandAnyUserGroups(self, cnxn, user_ids): |
| group_ids = set(self.DetermineWhichUserIDsAreGroups(cnxn, user_ids)) |
| direct_ids = [uid for uid in user_ids if uid not in group_ids] |
| member_ids_dict, owner_ids_dict = self.LookupAllMembers(cnxn, group_ids) |
| |
| indirect_ids = set() |
| for gid in group_ids: |
| indirect_ids.update(member_ids_dict[gid]) |
| indirect_ids.update(owner_ids_dict[gid]) |
| # It's possible that a user has both direct and indirect memberships of |
| # one group. In this case, mark the user as direct member only. |
| indirect_ids = [iid for iid in indirect_ids if iid not in direct_ids] |
| |
| return direct_ids, list(indirect_ids) |
| |
| def LookupVisibleMembers( |
| self, cnxn, group_id_list, perms, effective_ids, services): |
| settings_dict = self.GetAllGroupSettings(cnxn, group_id_list) |
| group_ids = settings_dict.keys() |
| |
| direct_member_ids_dict, direct_owner_ids_dict = self.LookupMembers( |
| cnxn, group_ids) |
| all_member_ids_dict, all_owner_ids_dict = self.LookupAllMembers( |
| cnxn, group_ids) |
| visible_member_ids_dict = {} |
| visible_owner_ids_dict = {} |
| for gid in group_ids: |
| member_ids = all_member_ids_dict[gid] |
| owner_ids = all_owner_ids_dict[gid] |
| if permissions.CanViewGroup(perms, effective_ids, settings_dict[gid], |
| member_ids, owner_ids, []): |
| visible_member_ids_dict[gid] = direct_member_ids_dict[gid] |
| visible_owner_ids_dict[gid] = direct_owner_ids_dict[gid] |
| |
| return visible_member_ids_dict, visible_owner_ids_dict |
| |
| def ValidateFriendProjects(self, cnxn, services, friend_projects): |
| project_names = filter(None, re.split('; |, | |;|,', friend_projects)) |
| id_dict = services.project.LookupProjectIDs(cnxn, project_names) |
| missed_projects = [] |
| result = [] |
| for p_name in project_names: |
| if p_name in id_dict: |
| result.append(id_dict[p_name]) |
| else: |
| missed_projects.append(p_name) |
| error_msg = '' |
| if missed_projects: |
| error_msg = 'Project(s) %s do not exist' % ', '.join(missed_projects) |
| return None, error_msg |
| else: |
| return result, None |
| |
| |
| class CacheManager(object): |
| |
| def __init__(self, invalidate_tbl=None): |
| self.last_call = None |
| self.processed_invalidations_up_to = 0 |
| |
| def MakeCache(self, kind, max_size=None, use_value_centric_cache=False): |
| """Make a new cache and register it for future invalidations.""" |
| if use_value_centric_cache: |
| cache = caches.ValueCentricRamCache(self, kind, max_size=max_size) |
| else: |
| cache = caches.RamCache(self, kind, max_size=max_size) |
| return cache |
| |
| def DoDistributedInvalidation(self, cnxn): |
| """Drop any cache entries that were invalidated by other jobs.""" |
| self.last_call = 'DoDistributedInvalidation', cnxn |
| |
| def StoreInvalidateRows(self, cnxn, kind, keys): |
| """Store database rows to let all frontends know to invalidate.""" |
| self.last_call = 'StoreInvalidateRows', cnxn, kind, keys |
| |
| def StoreInvalidateAll(self, cnxn, kind): |
| """Store a database row to let all frontends know to invalidate.""" |
| self.last_call = 'StoreInvalidateAll', cnxn, kind |
| |
| |
| |
| class UserService(object): |
| |
| def __init__(self): |
| """Creates a test-appropriate UserService object.""" |
| self.users_by_email = {} |
| self.users_by_id = {} |
| self.test_users = {} |
| |
| def TestAddUser(self, email, user_id, add_user=True, banned=False): |
| """Add a user to the fake UserService instance. |
| |
| Args: |
| email: Email of the user. |
| user_id: int user ID. |
| add_user: Flag whether user pb should be created, i.e. whether a |
| Monorail account should be created |
| banned: Boolean to set the user as banned |
| |
| Returns: |
| The User PB that was added, or None. |
| """ |
| self.users_by_email[email] = user_id |
| self.users_by_id[user_id] = email |
| |
| user = None |
| if add_user: |
| user = user_pb2.MakeUser() |
| user.is_site_admin = False |
| user.email = email |
| user.obscure_email = True |
| if banned: |
| user.banned = 'is banned' |
| self.test_users[user_id] = user |
| |
| return user |
| |
| def GetUser(self, _cnxn, user_id): |
| return self.test_users.get(user_id) |
| |
| def _CreateUser(self, _cnxn, email): |
| if email in self.users_by_email: |
| return |
| user_id = framework_helpers.MurmurHash3_x86_32(email) |
| self.users_by_id[user_id] = email |
| self.users_by_email[email] = user_id |
| |
| def _CreateUsers(self, cnxn, emails): |
| for email in emails: |
| self._CreateUser(cnxn, email) |
| |
| def LookupUserID(self, cnxn, email, autocreate=False, allowgroups=False): |
| user_id = self.users_by_email.get(email) |
| if not user_id and validate.IsValidEmail(email): |
| if autocreate: |
| self._CreateUser(cnxn, email) |
| user_id = self.users_by_email.get(email) |
| else: |
| raise user_svc.NoSuchUserException(email) |
| |
| return user_id |
| |
| def GetUsersByIDs(self, cnxn, user_ids, use_cache=True): |
| user_dict = {} |
| for user_id in user_ids: |
| if user_id and self.test_users.get(user_id): |
| user_dict[user_id] = self.test_users[user_id] |
| return user_dict |
| |
| def LookupExistingUserIDs(self, cnxn, emails): |
| email_dict = { |
| email: self.users_by_email[email] |
| for email in emails |
| if email in self.users_by_email} |
| return email_dict |
| |
| def LookupUserIDs(self, cnxn, emails, autocreate=False, |
| allowgroups=False): |
| email_dict = {} |
| for email in emails: |
| user_id = self.LookupUserID( |
| cnxn, email, autocreate=autocreate, allowgroups=allowgroups) |
| if user_id: |
| email_dict[email] = user_id |
| return email_dict |
| |
| def LookupUserEmail(self, _cnxn, user_id): |
| email = self.users_by_id.get(user_id) |
| return email |
| |
| def LookupUserEmails(self, cnxn, user_ids): |
| user_dict = { |
| user_id: self.LookupUserEmail(cnxn, user_id) |
| for user_id in user_ids} |
| return user_dict |
| |
| def UpdateUser(self, _cnxn, user_id, user): |
| """Updates the user pb.""" |
| self.test_users[user_id] = user |
| |
| def UpdateUserSettings( |
| self, cnxn, user_id, user, notify=None, notify_starred=None, |
| obscure_email=None, after_issue_update=None, |
| is_site_admin=None, ignore_action_limits=None, |
| is_banned=None, banned_reason=None, action_limit_updates=None, |
| dismissed_cues=None, keep_people_perms_open=None, preview_on_hover=None): |
| self.UpdateUser(cnxn, user_id, user) |
| |
| |
| class AbstractStarService(object): |
| """Fake StarService.""" |
| |
| def __init__(self): |
| self.stars_by_item_id = {} |
| self.stars_by_starrer_id = {} |
| self.expunged_item_ids = [] |
| |
| def ExpungeStars(self, _cnxn, item_id): |
| self.expunged_item_ids.append(item_id) |
| old_starrer = self.stars_by_item_id.get(item_id) |
| self.stars_by_item_id[item_id] = [] |
| if self.stars_by_starrer_id.get(old_starrer): |
| self.stars_by_starrer_id[old_starrer] = [ |
| it for it in self.stars_by_starrer_id[old_starrer] |
| if it != item_id] |
| |
| def LookupItemStarrers(self, _cnxn, item_id): |
| return self.stars_by_item_id.get(item_id, []) |
| |
| def LookupStarredItemIDs(self, _cnxn, starrer_user_id): |
| return self.stars_by_starrer_id.get(starrer_user_id, []) |
| |
| def IsItemStarredBy(self, cnxn, item_id, starrer_user_id): |
| return item_id in self.LookupStarredItemIDs(cnxn, starrer_user_id) |
| |
| def CountItemStars(self, cnxn, item_id): |
| return len(self.LookupItemStarrers(cnxn, item_id)) |
| |
| def CountItemsStars(self, cnxn, item_ids): |
| return {item_id: self.CountItemStars(cnxn, item_id) |
| for item_id in item_ids} |
| |
| def SetStar(self, cnxn, item_id, starrer_user_id, starred): |
| if starred and not self.IsItemStarredBy(cnxn, item_id, starrer_user_id): |
| self.stars_by_item_id.setdefault(item_id, []).append(starrer_user_id) |
| self.stars_by_starrer_id.setdefault(starrer_user_id, []).append(item_id) |
| |
| elif not starred and self.IsItemStarredBy(cnxn, item_id, starrer_user_id): |
| self.stars_by_item_id[item_id].remove(starrer_user_id) |
| self.stars_by_starrer_id[starrer_user_id].remove(item_id) |
| |
| |
| class UserStarService(AbstractStarService): |
| pass |
| |
| |
| class ProjectStarService(AbstractStarService): |
| pass |
| |
| |
| class IssueStarService(AbstractStarService): |
| |
| # pylint: disable=arguments-differ |
| def SetStar( |
| self, cnxn, _service, _config, issue_id, starrer_user_id, |
| starred): |
| super(IssueStarService, self).SetStar( |
| cnxn, issue_id, starrer_user_id, starred) |
| |
| |
| class ProjectService(object): |
| """Fake ProjectService object. |
| |
| Provides methods for creating users and projects, which are accessible |
| through parts of the real ProjectService interface. |
| """ |
| |
| def __init__(self): |
| self.test_projects = {} # project_name -> project_pb |
| self.projects_by_id = {} |
| self.test_star_manager = None |
| self.indexed_projects = {} |
| self.unindexed_projects = set() |
| self.index_counter = 0 |
| self.project_commitments = {} |
| |
| def TestAddProject( |
| self, name, summary='', state=project_pb2.ProjectState.LIVE, |
| owner_ids=None, committer_ids=None, contrib_ids=None, |
| issue_notify_address=None, state_reason='', |
| description=None, project_id=None, process_inbound_email=None, |
| access=None): |
| """Add a project to the fake ProjectService object. |
| |
| Args: |
| name: The name of the project. Will replace any existing project under |
| the same name. |
| summary: The summary string of the project. |
| state: Initial state for the project from project_pb2.ProjectState. |
| owner_ids: List of user ids for project owners |
| committer_ids: List of user ids for project committers |
| contrib_ids: List of user ids for project contributors |
| issue_notify_address: email address to send issue change notifications |
| state_reason: string describing the reason the project is in its current |
| state. |
| description: The description string for this project |
| project_id: A unique integer identifier for the created project. |
| process_inbound_email: True to make this project accept inbound email. |
| access: One of the values of enum project_pb2.ProjectAccess. |
| |
| Returns: |
| A populated project PB. |
| """ |
| proj_pb = project_pb2.Project() |
| proj_pb.project_id = project_id or hash(name) % 100000 |
| proj_pb.project_name = name |
| proj_pb.summary = summary |
| proj_pb.state = state |
| proj_pb.state_reason = state_reason |
| if description is not None: |
| proj_pb.description = description |
| |
| self.TestAddProjectMembers(owner_ids, proj_pb, OWNER_ROLE) |
| self.TestAddProjectMembers(committer_ids, proj_pb, COMMITTER_ROLE) |
| self.TestAddProjectMembers(contrib_ids, proj_pb, CONTRIBUTOR_ROLE) |
| |
| if issue_notify_address is not None: |
| proj_pb.issue_notify_address = issue_notify_address |
| if process_inbound_email is not None: |
| proj_pb.process_inbound_email = process_inbound_email |
| if access is not None: |
| proj_pb.access = access |
| |
| self.test_projects[name] = proj_pb |
| self.projects_by_id[proj_pb.project_id] = proj_pb |
| return proj_pb |
| |
| def TestAddProjectMembers(self, user_id_list, proj_pb, role): |
| if user_id_list is not None: |
| for user_id in user_id_list: |
| if role == OWNER_ROLE: |
| proj_pb.owner_ids.append(user_id) |
| elif role == COMMITTER_ROLE: |
| proj_pb.committer_ids.append(user_id) |
| elif role == CONTRIBUTOR_ROLE: |
| proj_pb.contributor_ids.append(user_id) |
| |
| def LookupProjectIDs(self, cnxn, project_names): |
| return { |
| project_name: self.test_projects[project_name].project_id |
| for project_name in project_names |
| if project_name in self.test_projects} |
| |
| def LookupProjectNames(self, cnxn, project_ids): |
| projects_dict = self.GetProjects(cnxn, project_ids) |
| return {p.project_id: p.project_name |
| for p in projects_dict.itervalues()} |
| |
| def CreateProject( |
| self, _cnxn, project_name, owner_ids, committer_ids, |
| contributor_ids, summary, description, |
| state=project_pb2.ProjectState.LIVE, access=None, read_only=None, |
| home_page=None, docs_url=None, logo_gcs_id=None, logo_file_name=None): |
| """Create and store a Project with the given attributes.""" |
| if project_name in self.test_projects: |
| raise project_svc.ProjectAlreadyExists() |
| self.TestAddProject( |
| project_name, summary=summary, state=state, |
| owner_ids=owner_ids, committer_ids=committer_ids, |
| contrib_ids=contributor_ids, description=description, |
| access=access) |
| |
| def ExpungeProject(self, _cnxn, project_id): |
| project = self.projects_by_id.get(project_id) |
| if project: |
| self.test_projects.pop(project.project_name, None) |
| |
| def GetProjectsByName(self, _cnxn, project_name_list, use_cache=True): |
| return { |
| pn: self.test_projects[pn] for pn in project_name_list |
| if pn in self.test_projects} |
| |
| def GetProjectByName(self, _cnxn, name, use_cache=True): |
| return self.test_projects.get(name) |
| |
| def GetProjectList(self, cnxn, project_id_list, use_cache=True): |
| project_dict = self.GetProjects(cnxn, project_id_list, use_cache=use_cache) |
| return [project_dict[pid] for pid in project_id_list |
| if pid in project_dict] |
| |
| def GetVisibleLiveProjects(self, _cnxn, logged_in_user, effective_ids, |
| use_cache=True): |
| return self.projects_by_id.keys() |
| |
| def GetProjects(self, _cnxn, project_ids, use_cache=True): |
| result = {} |
| for project_id in project_ids: |
| project = self.projects_by_id.get(project_id) |
| if project: |
| result[project_id] = project |
| return result |
| |
| def GetProject(self, cnxn, project_id, use_cache=True): |
| """Load the specified project from the database.""" |
| project_id_dict = self.GetProjects(cnxn, [project_id], use_cache=use_cache) |
| return project_id_dict.get(project_id) |
| |
| @staticmethod |
| def IsValidProjectName(string): |
| """Return true if the given string is a valid project name.""" |
| return project_svc.RE_PROJECT_NAME.match(string) |
| |
| def GetProjectCommitments(self, _cnxn, project_id): |
| if project_id in self.project_commitments: |
| return self.project_commitments[project_id] |
| |
| project_commitments = project_pb2.ProjectCommitments() |
| project_commitments.project_id = project_id |
| return project_commitments |
| |
| def TestStoreProjectCommitments(self, project_commitments): |
| key = project_commitments.project_id |
| self.project_commitments[key] = project_commitments |
| |
| def UpdateProject( |
| self, _cnxn, project_id, summary=None, description=None, |
| state=None, state_reason=None, access=None, |
| issue_notify_address=None, attachment_bytes_used=None, |
| attachment_quota=None, moved_to=None, process_inbound_email=None, |
| only_owners_remove_restrictions=None, |
| read_only_reason=None, cached_content_timestamp=None, |
| only_owners_see_contributors=None, delete_time=None, |
| recent_activity=None, revision_url_format=None, home_page=None, |
| docs_url=None, logo_gcs_id=None, logo_file_name=None): |
| project = self.projects_by_id.get(project_id) |
| if not project: |
| raise project_svc.NoSuchProjectException( |
| 'Project "%s" not found!' % project_id) |
| |
| # TODO(jrobbins): implement all passed arguments - probably as a utility |
| # method shared with the real persistence implementation. |
| if read_only_reason is not None: |
| project.read_only_reason = read_only_reason |
| |
| def UpdateProjectRoles( |
| self, _cnxn, project_id, owner_ids, committer_ids, |
| contributor_ids, now=None): |
| project = self.projects_by_id.get(project_id) |
| if not project: |
| raise project_svc.NoSuchProjectException( |
| 'Project "%s" not found!' % project_id) |
| |
| project.owner_ids = owner_ids |
| project.committer_ids = committer_ids |
| project.contributor_ids = contributor_ids |
| |
| def MarkProjectDeletable( |
| self, _cnxn, project_id, _config_service): |
| project = self.projects_by_id[project_id] |
| project.project_name = 'DELETABLE_%d' % project_id |
| project.state = project_pb2.ProjectState.DELETABLE |
| |
| def UpdateRecentActivity(self, _cnxn, _project_id, now=None): |
| pass |
| |
| def GetUserRolesInAllProjects(self, _cnxn, effective_ids): |
| owned_project_ids = set() |
| membered_project_ids = set() |
| contrib_project_ids = set() |
| |
| for project in self.projects_by_id.itervalues(): |
| if not effective_ids.isdisjoint(project.owner_ids): |
| owned_project_ids.add(project.project_id) |
| elif not effective_ids.isdisjoint(project.committer_ids): |
| membered_project_ids.add(project.project_id) |
| elif not effective_ids.isdisjoint(project.contributor_ids): |
| contrib_project_ids.add(project.project_id) |
| |
| return owned_project_ids, membered_project_ids, contrib_project_ids |
| |
| |
| class ConfigService(object): |
| """Fake version of ConfigService that just works in-RAM.""" |
| |
| def __init__(self, user_id=None): |
| self.project_configs = {} |
| self.next_field_id = 123 |
| self.next_component_id = 345 |
| self.expunged_configs = [] |
| self.component_ids_to_templates = {} |
| |
| def TemplatesWithComponent(self, _cnxn, component_id, _config): |
| return self.component_ids_to_templates.get(component_id, []) |
| |
| def ExpungeConfig(self, _cnxn, project_id): |
| self.expunged_configs.append(project_id) |
| |
| def GetLabelDefRows(self, cnxn, project_id): |
| """This always returns empty results. Mock it to test other cases.""" |
| return [] |
| |
| def GetLabelDefRowsAnyProject(self, cnxn, where=None): |
| """This always returns empty results. Mock it to test other cases.""" |
| return [] |
| |
| def LookupLabel(self, cnxn, project_id, label_id): |
| if label_id == 999: |
| return None |
| return 'label_%d_%d' % (project_id, label_id) |
| |
| def LookupLabelID(self, cnxn, project_id, label, autocreate=True): |
| return 1 |
| |
| def LookupLabelIDs(self, cnxn, project_id, labels, autocreate=False): |
| return [idx for idx, _label in enumerate(labels)] |
| |
| def LookupIDsOfLabelsMatching(self, cnxn, project_id, regex): |
| return [1, 2, 3] |
| |
| def LookupStatus(self, cnxn, project_id, status_id): |
| return 'status_%d_%d' % (project_id, status_id) |
| |
| def LookupStatusID(self, cnxn, project_id, status, autocreate=True): |
| if status: |
| return 1 |
| else: |
| return 0 |
| |
| def LookupStatusIDs(self, cnxn, project_id, statuses): |
| return [idx for idx, _status in enumerate(statuses)] |
| |
| def LookupClosedStatusIDs(self, cnxn, project_id): |
| return [7, 8, 9] |
| |
| def StoreConfig(self, _cnxn, config): |
| self.project_configs[config.project_id] = config |
| |
| def GetProjectConfig(self, _cnxn, project_id, use_cache=True): |
| if project_id in self.project_configs: |
| return self.project_configs[project_id] |
| else: |
| return tracker_bizobj.MakeDefaultProjectIssueConfig(project_id) |
| |
| def GetProjectConfigs(self, _cnxn, project_ids, use_cache=True): |
| config_dict = {} |
| for project_id in project_ids: |
| if project_id in self.project_configs: |
| config_dict[project_id] = self.project_configs[project_id] |
| else: |
| config_dict[project_id] = tracker_bizobj.MakeDefaultProjectIssueConfig( |
| project_id) |
| |
| return config_dict |
| |
| def UpdateConfig( |
| self, cnxn, project, well_known_statuses=None, |
| statuses_offer_merge=None, well_known_labels=None, |
| excl_label_prefixes=None, templates=None, |
| default_template_for_developers=None, default_template_for_users=None, |
| list_prefs=None, restrict_to_known=None): |
| project_id = project.project_id |
| project_config = self.GetProjectConfig(cnxn, project_id, use_cache=False) |
| |
| if well_known_statuses is not None: |
| tracker_bizobj.SetConfigStatuses(project_config, well_known_statuses) |
| |
| if statuses_offer_merge is not None: |
| project_config.statuses_offer_merge = statuses_offer_merge |
| |
| if well_known_labels is not None: |
| tracker_bizobj.SetConfigLabels(project_config, well_known_labels) |
| |
| if excl_label_prefixes is not None: |
| project_config.exclusive_label_prefixes = excl_label_prefixes |
| |
| if templates is not None: |
| project_config.templates = templates |
| |
| if default_template_for_developers is not None: |
| project_config.default_template_for_developers = ( |
| default_template_for_developers) |
| if default_template_for_users is not None: |
| project_config.default_template_for_users = default_template_for_users |
| |
| if list_prefs: |
| default_col_spec, default_sort_spec, x_attr, y_attr = list_prefs |
| project_config.default_col_spec = default_col_spec |
| project_config.default_sort_spec = default_sort_spec |
| project_config.default_x_attr = x_attr |
| project_config.default_y_attr = y_attr |
| |
| if restrict_to_known is not None: |
| project_config.restrict_to_known = restrict_to_known |
| |
| self.StoreConfig(cnxn, project_config) |
| return project_config |
| |
| def CreateFieldDef( |
| self, cnxn, project_id, field_name, field_type_str, applic_type, |
| applic_pred, is_required, is_multivalued, |
| min_value, max_value, regex, needs_member, needs_perm, |
| grants_perm, notify_on, docstring, admin_ids): |
| config = self.GetProjectConfig(cnxn, project_id) |
| field_type = tracker_pb2.FieldTypes(field_type_str) |
| field_id = self.next_field_id |
| self.next_field_id += 1 |
| fd = tracker_bizobj.MakeFieldDef( |
| field_id, project_id, field_name, field_type, applic_type, applic_pred, |
| is_required, is_multivalued, min_value, max_value, regex, |
| needs_member, needs_perm, grants_perm, notify_on, docstring, False) |
| config.field_defs.append(fd) |
| self.StoreConfig(cnxn, config) |
| |
| def SoftDeleteFieldDef(self, cnxn, project_id, field_id): |
| config = self.GetProjectConfig(cnxn, project_id) |
| fd = tracker_bizobj.FindFieldDefByID(field_id, config) |
| fd.is_deleted = True |
| self.StoreConfig(cnxn, config) |
| |
| def UpdateFieldDef( |
| self, cnxn, project_id, field_id, field_name=None, |
| applicable_type=None, applicable_predicate=None, is_required=None, |
| is_multivalued=None, min_value=None, max_value=None, regex=None, |
| needs_member=None, needs_perm=None, grants_perm=None, notify_on=None, |
| docstring=None, admin_ids=None): |
| config = self.GetProjectConfig(cnxn, project_id) |
| fd = tracker_bizobj.FindFieldDefByID(field_id, config) |
| # pylint: disable=multiple-statements |
| if field_name is not None: fd.field_name = field_name |
| if applicable_type is not None: fd.applicable_type = applicable_type |
| if applicable_predicate is not None: |
| fd.applicable_predicate = applicable_predicate |
| if is_required is not None: fd.is_required = is_required |
| if is_multivalued is not None: fd.is_multivalued = is_multivalued |
| if min_value is not None: fd.min_value = min_value |
| if max_value is not None: fd.max_value = max_value |
| if regex is not None: fd.regex = regex |
| if docstring is not None: fd.docstring = docstring |
| if admin_ids is not None: fd.admin_ids = admin_ids |
| self.StoreConfig(cnxn, config) |
| |
| def CreateComponentDef( |
| self, cnxn, project_id, path, docstring, deprecated, admin_ids, cc_ids, |
| created, creator_id): |
| config = self.GetProjectConfig(cnxn, project_id) |
| cd = tracker_bizobj.MakeComponentDef( |
| self.next_component_id, project_id, path, docstring, deprecated, |
| admin_ids, cc_ids, created, creator_id) |
| config.component_defs.append(cd) |
| self.next_component_id += 1 |
| self.StoreConfig(cnxn, config) |
| return self.next_component_id - 1 |
| |
| def UpdateComponentDef( |
| self, cnxn, project_id, component_id, path=None, docstring=None, |
| deprecated=None, admin_ids=None, cc_ids=None, created=None, |
| creator_id=None, modified=None, modifier_id=None): |
| config = self.GetProjectConfig(cnxn, project_id) |
| cd = tracker_bizobj.FindComponentDefByID(component_id, config) |
| if path is not None: |
| assert path |
| cd.path = path |
| # pylint: disable=multiple-statements |
| if docstring is not None: cd.docstring = docstring |
| if deprecated is not None: cd.deprecated = deprecated |
| if admin_ids is not None: cd.admin_ids = admin_ids |
| if cc_ids is not None: cd.cc_ids = cc_ids |
| if created is not None: cd.created = created |
| if creator_id is not None: cd.creator_id = creator_id |
| if modified is not None: cd.modified = modified |
| if modifier_id is not None: cd.modifier_id = modifier_id |
| self.StoreConfig(cnxn, config) |
| |
| def DeleteComponentDef(self, cnxn, project_id, component_id): |
| """Delete the specified component definition.""" |
| config = self.GetProjectConfig(cnxn, project_id) |
| config.component_defs = [ |
| cd for cd in config.component_defs |
| if cd.component_id != component_id] |
| self.StoreConfig(cnxn, config) |
| |
| def InvalidateMemcache(self, issues, key_prefix=''): |
| pass |
| |
| |
| class IssueService(object): |
| """Fake version of IssueService that just works in-RAM.""" |
| # pylint: disable=unused-argument |
| |
| def __init__(self, user_id=None): |
| self.user_id = user_id |
| # Dictionary {project_id: issue_pb_dict} |
| # where issue_pb_dict is a dictionary of the form |
| # {local_id: issue_pb} |
| self.issues_by_project = {} |
| self.issues_by_iid = {} |
| # Dictionary {project_id: comment_pb_dict} |
| # where comment_pb_dict is a dictionary of the form |
| # {local_id: comment_pb_list} |
| self.comments_by_project = {} |
| self.comments_by_iid = {} |
| self.comments_by_cid = {} |
| self.attachments_by_id = {} |
| |
| # Set of issue IDs for issues that have been indexed by calling |
| # IndexIssues(). |
| self.indexed_issue_iids = set() |
| |
| # Test-only indication that the indexer would have been called |
| # by the real DITPersist. |
| self.indexer_called = False |
| |
| # Test-only sequence of updated and enqueued. |
| self.updated_issues = [] |
| self.enqueued_issues = [] |
| |
| # Test-only sequence of expunged issues and projects. |
| self.expunged_issues = [] |
| self.expunged_former_locations = [] |
| self.expunged_local_ids = [] |
| |
| # Test-only indicators that methods were called. |
| self.get_all_issues_in_project_called = False |
| self.update_issues_called = False |
| self.enqueue_issues_called = False |
| |
| # The next id to return if it is > 0. |
| self.next_id = -1 |
| |
| def UpdateIssues( |
| self, cnxn, issues, update_cols=None, just_derived=False, |
| commit=True, invalidate=True): |
| self.update_issues_called = True |
| self.updated_issues.extend(issues) |
| |
| def EnqueueIssuesForIndexing(self, _cnxn, issues): |
| self.enqueue_issues_called = True |
| self.enqueued_issues.extend(issues) |
| |
| def ExpungeIssues(self, _cnxn, issue_ids): |
| self.expunged_issues.extend(issue_ids) |
| |
| def ExpungeFormerLocations(self, _cnxn, project_id): |
| self.expunged_former_locations.append(project_id) |
| |
| def ExpungeLocalIDCounters(self, _cnxn, project_id): |
| self.expunged_local_ids.append(project_id) |
| |
| def TestAddIssue(self, issue): |
| project_id = issue.project_id |
| self.issues_by_project.setdefault(project_id, {}) |
| self.issues_by_project[project_id][issue.local_id] = issue |
| self.issues_by_iid[issue.issue_id] = issue |
| |
| # Adding a new issue should add the first comment to the issue |
| comment = tracker_pb2.IssueComment() |
| comment.project_id = issue.project_id |
| comment.issue_id = issue.issue_id |
| comment.content = issue.summary |
| comment.timestamp = issue.opened_timestamp |
| if issue.reporter_id: |
| comment.user_id = issue.reporter_id |
| comment.sequence = 0 |
| self.TestAddComment(comment, issue.local_id) |
| |
| def TestAddComment(self, comment, local_id): |
| pid = comment.project_id |
| if not comment.id: |
| comment.id = len(self.comments_by_cid) |
| |
| self.comments_by_project.setdefault(pid, {}) |
| self.comments_by_project[pid].setdefault(local_id, []).append(comment) |
| self.comments_by_iid.setdefault(comment.issue_id, []).append(comment) |
| self.comments_by_cid[comment.id] = comment |
| |
| def TestAddAttachment(self, attachment, comment_id, issue_id): |
| if not attachment.attachment_id: |
| attachment.attachment_id = len(self.attachments_by_id) |
| |
| aid = attachment.attachment_id |
| self.attachments_by_id[aid] = attachment, comment_id, issue_id |
| comment = self.comments_by_cid[comment_id] |
| if attachment not in comment.attachments: |
| comment.attachments.extend([attachment]) |
| |
| def GetAttachmentAndContext(self, _cnxn, attachment_id): |
| if attachment_id in self.attachments_by_id: |
| attach, comment_id, issue_id = self.attachments_by_id[attachment_id] |
| if not attach.deleted: |
| return attach, comment_id, issue_id |
| |
| raise issue_svc.NoSuchAttachmentException() |
| |
| def GetComments(self, _cnxn, where=None, order_by=None, **kwargs): |
| # This is a very limited subset of what the real GetComments() can do. |
| cid = kwargs.get('id') |
| |
| comment = self.comments_by_cid.get(cid) |
| if comment: |
| return [comment] |
| else: |
| return [] |
| |
| def GetComment(self, cnxn, comment_id): |
| """Get the requested comment, or raise an exception.""" |
| comments = self.GetComments(cnxn, id=comment_id) |
| if len(comments) == 1: |
| return comments[0] |
| |
| raise issue_svc.NoSuchCommentException() |
| |
| def ResolveIssueRefs(self, cnxn, ref_projects, default_project_name, refs): |
| result = [] |
| for project_name, local_id in refs: |
| project = ref_projects.get(project_name or default_project_name) |
| if not project or project.state == project_pb2.ProjectState.DELETABLE: |
| continue # ignore any refs to issues in deleted projects |
| try: |
| issue = self.GetIssueByLocalID(cnxn, project.project_id, local_id) |
| result.append(issue.issue_id) |
| except issue_svc.NoSuchIssueException: |
| pass # ignore any refs to issues that don't exist |
| |
| return result |
| |
| def GetAllIssuesInProject(self, _cnxn, project_id, min_local_id=None): |
| self.get_all_issues_in_project_called = True |
| if project_id in self.issues_by_project: |
| return self.issues_by_project[project_id].values() |
| else: |
| return [] |
| |
| def GetIssuesByLocalIDs( |
| self, _cnxn, project_id, local_id_list, shard_id=None): |
| results = [] |
| for local_id in local_id_list: |
| if (project_id in self.issues_by_project |
| and local_id in self.issues_by_project[project_id]): |
| results.append(self.issues_by_project[project_id][local_id]) |
| |
| return results |
| |
| def GetIssueByLocalID(self, _cnxn, project_id, local_id): |
| try: |
| return self.issues_by_project[project_id][local_id] |
| except KeyError: |
| raise issue_svc.NoSuchIssueException() |
| |
| def GetAnyOnHandIssue(self, issue_ids, start=None, end=None): |
| return None # Treat them all like misses. |
| |
| def GetIssue(self, _cnxn, issue_id): |
| if issue_id in self.issues_by_iid: |
| return self.issues_by_iid[issue_id] |
| else: |
| raise issue_svc.NoSuchIssueException() |
| |
| def LookupIssueID(self, _cnxn, project_id, local_id): |
| try: |
| issue = self.issues_by_project[project_id][local_id] |
| except KeyError: |
| raise issue_svc.NoSuchIssueException() |
| return issue.issue_id |
| |
| def GetCommentsForIssue(self, _cnxn, issue_id): |
| comments = self.comments_by_iid.get(issue_id, []) |
| for idx, c in enumerate(comments): |
| c.sequence = idx |
| |
| return comments |
| |
| def InsertIssue(self, cnxn, issue): |
| issue.issue_id = issue.project_id * 1000000 + issue.local_id |
| self.issues_by_project.setdefault(issue.project_id, {}) |
| self.issues_by_project[issue.project_id][issue.local_id] = issue |
| self.issues_by_iid[issue.issue_id] = issue |
| return issue.issue_id |
| |
| def CreateIssue( |
| self, cnxn, services, project_id, |
| summary, status, owner_id, cc_ids, labels, field_values, |
| component_ids, reporter_id, marked_description, blocked_on=None, |
| blocking=None, attachments=None, timestamp=None, index_now=True): |
| issue = tracker_pb2.Issue() |
| issue.project_id = project_id |
| issue.summary = summary |
| issue.status = status |
| if owner_id: |
| issue.owner_id = owner_id |
| issue.cc_ids.extend(cc_ids) |
| issue.labels.extend(labels) |
| issue.field_values.extend(field_values) |
| issue.reporter_id = reporter_id |
| if timestamp: |
| issue.opened_timestamp = timestamp |
| |
| if blocked_on: |
| issue.blocked_on_iids.extend(blocked_on) |
| if blocking: |
| issue.blocking.extend(blocking) |
| |
| if blocking: |
| issue.blocking_iids.extend(blocking) |
| |
| issue.local_id = self.AllocateNextLocalID(cnxn, project_id) |
| issue.issue_id = project_id * 1000000 + issue.local_id |
| |
| self.TestAddIssue(issue) |
| self.comments_by_iid[issue.issue_id][0].content = marked_description |
| return issue.local_id |
| |
| def SetUsedLocalID(self, cnxn, project_id): |
| self.next_id = self.GetHighestLocalID(cnxn, project_id) + 1 |
| |
| def AllocateNextLocalID(self, cnxn, project_id): |
| return self.GetHighestLocalID(cnxn, project_id) + 1 |
| |
| def GetHighestLocalID(self, _cnxn, project_id): |
| if self.next_id > 0: |
| return self.next_id - 1 |
| else: |
| issue_dict = self.issues_by_project.get(project_id, {}) |
| highest = max([0] + [issue.local_id for issue in issue_dict.itervalues()]) |
| return highest |
| |
| def ApplyIssueComment( |
| self, cnxn, services, reporter_id, project_id, |
| local_id, summary, status, owner_id, cc_ids, labels, field_values, |
| component_ids, blocked_on, blocking, dangling_blocked_on_refs, |
| dangling_blocking_refs, merged_into, index_now=True, |
| page_gen_ts=None, comment=None, inbound_message=None, attachments=None, |
| timestamp=None): |
| """Feel free to implement a spec-compliant return value.""" |
| issue = self.issues_by_project[project_id][local_id] |
| amendments = [] |
| |
| if summary and summary != issue.summary: |
| issue.summary = summary |
| amendments.append(tracker_bizobj.MakeSummaryAmendment( |
| summary, issue.summary)) |
| |
| if status and status != issue.status: |
| issue.status = status |
| amendments.append(tracker_bizobj.MakeStatusAmendment( |
| status, issue.status)) |
| |
| issue.owner_id = owner_id |
| issue.cc_ids = cc_ids |
| issue.labels = labels |
| issue.field_values = field_values |
| issue.component_ids = component_ids |
| |
| issue.blocked_on_iids.extend(blocked_on) |
| issue.blocking_iids.extend(blocking) |
| issue.dangling_blocked_on_refs.extend(dangling_blocked_on_refs) |
| issue.dangling_blocking_refs.extend(dangling_blocking_refs) |
| |
| if merged_into is not None: |
| issue.merged_into = merged_into |
| |
| if amendments or (comment and comment.strip()) or attachments: |
| comment_pb = self.CreateIssueComment( |
| cnxn, project_id, local_id, reporter_id, comment, |
| amendments=amendments, inbound_message=inbound_message) |
| else: |
| comment_pb = None |
| |
| return amendments, comment_pb |
| |
| def GetCommentsForIssues(self, _cnxn, issue_ids): |
| comments_dict = {} |
| for issue_id in issue_ids: |
| comments_dict[issue_id] = self.comments_by_iid[issue_id] |
| |
| return comments_dict |
| |
| def InsertComment(self, cnxn, comment, commit=True): |
| issue = self.GetIssue(cnxn, comment.issue_id) |
| self.TestAddComment(comment, issue.local_id) |
| |
| # pylint: disable=unused-argument |
| def DeltaUpdateIssue( |
| self, cnxn, services, reporter_id, project_id, |
| config, issue, status, owner_id, cc_add, cc_remove, comp_ids_add, |
| comp_ids_remove, labels_add, labels_remove, field_vals_add, |
| field_vals_remove, fields_clear, blocked_on_add=None, |
| blocked_on_remove=None, blocking_add=None, blocking_remove=None, |
| merged_into=None, index_now=False, comment=None, summary=None, |
| iids_to_invalidate=None, rules=None, predicate_asts=None, |
| timestamp=None): |
| # Return a bogus amendments list if any of the fields changed |
| amendments = [] |
| comment_pb = tracker_pb2.IssueComment() |
| if (status or owner_id or cc_add or cc_remove or labels_add or |
| labels_remove or field_vals_add or field_vals_remove or fields_clear or |
| blocked_on_add or blocked_on_remove or blocking_add or |
| blocking_remove or merged_into or summary): |
| amendments.append(tracker_bizobj.MakeStatusAmendment( |
| 'Updated', issue.status)) |
| |
| if not amendments and (not comment or not comment.strip()): |
| return [], None |
| |
| comment_pb = self.CreateIssueComment( |
| cnxn, project_id, issue.local_id, reporter_id, comment, |
| amendments=amendments) |
| |
| self.indexer_called = index_now |
| return amendments, comment_pb |
| |
| def InvalidateIIDs(self, cnxn, iids_to_invalidate): |
| pass |
| |
| # pylint: disable=unused-argument |
| def CreateIssueComment( |
| self, _cnxn, project_id, local_id, user_id, content, |
| inbound_message=None, amendments=None, attachments=None, timestamp=None, |
| is_spam=False, commit=True): |
| # Add a comment to an issue |
| issue = self.issues_by_project[project_id][local_id] |
| |
| comment = tracker_pb2.IssueComment() |
| comment.id = len(self.comments_by_cid) |
| comment.project_id = project_id |
| comment.issue_id = issue.issue_id |
| comment.content = content |
| comment.user_id = user_id |
| if timestamp is not None: |
| comment.timestamp = timestamp |
| else: |
| comment.timestamp = 1234567890 |
| if amendments: |
| comment.amendments.extend(amendments) |
| if inbound_message: |
| comment.inbound_message = inbound_message |
| |
| pid = project_id |
| self.comments_by_project.setdefault(pid, {}) |
| self.comments_by_project[pid].setdefault(local_id, []).append(comment) |
| self.comments_by_iid.setdefault(issue.issue_id, []).append(comment) |
| self.comments_by_cid[comment.id] = comment |
| |
| if attachments: |
| for filename, filecontent, mimetype in attachments: |
| aid = len(self.attachments_by_id) |
| attach = comment.attachments_add( |
| attachment_id=aid, |
| filename=filename, |
| filesize=len(filecontent), |
| mimetype=mimetype, |
| blobkey='blob(%s)' % filename) |
| self.attachments_by_id[aid] = attach, pid, comment.id |
| |
| return comment |
| |
| def GetOpenAndClosedIssues(self, _cnxn, issue_ids): |
| open_issues = [] |
| closed_issues = [] |
| for issue_id in issue_ids: |
| try: |
| issue = self.issues_by_iid[issue_id] |
| if issue.status == 'Fixed': |
| closed_issues.append(issue) |
| else: |
| open_issues.append(issue) |
| except KeyError: |
| continue |
| |
| return open_issues, closed_issues |
| |
| def GetIssuesDict( |
| self, _cnxn, issue_ids, use_cache=True, shard_id=None): |
| return {iid: self.issues_by_iid[iid] for iid in issue_ids} |
| |
| def GetIssues(self, _cnxn, issue_ids, use_cache=True, shard_id=None): |
| results = [self.issues_by_iid[issue_id] for issue_id in issue_ids |
| if issue_id in self.issues_by_iid] |
| |
| return results |
| |
| def SoftDeleteIssue( |
| self, _cnxn, project_id, local_id, deleted, user_service): |
| issue = self.issues_by_project[project_id][local_id] |
| issue.deleted = deleted |
| |
| def SoftDeleteComment( |
| self, cnxn, project_id, local_id, sequence_num, |
| deleted_by_user_id, user_service, delete=True, reindex=True, |
| is_spam=False): |
| issue = self.GetIssueByLocalID(cnxn, project_id, local_id) |
| comments = self.GetCommentsForIssue(cnxn, issue.issue_id) |
| if not comments: |
| raise Exception( |
| 'No comments for issue, project, seq (%s, %s, %s), cannot delete' |
| % (local_id, project_id, sequence_num)) |
| if len(comments) < sequence_num: |
| raise Exception( |
| 'Attempting to delete comment %s only %s comments created' % |
| (sequence_num, len(comments))) |
| comments[sequence_num].is_spam = is_spam |
| if delete: |
| comments[sequence_num].deleted_by = deleted_by_user_id |
| else: |
| comments[sequence_num].reset('deleted_by') |
| |
| def DeleteComponentReferences(self, _cnxn, component_id): |
| for _, issue in self.issues_by_iid.iteritems(): |
| issue.component_ids = [ |
| cid for cid in issue.component_ids if cid != component_id] |
| |
| def RunIssueQuery( |
| self, cnxn, left_joins, where, order_by, shard_id=None, limit=None): |
| """This always returns empty results. Mock it to test other cases.""" |
| return [], False |
| |
| def GetIIDsByLabelIDs(self, cnxn, label_ids, project_id, shard_id): |
| """This always returns empty results. Mock it to test other cases.""" |
| return [] |
| |
| def GetIIDsByParticipant(self, cnxn, user_ids, project_ids, shard_id): |
| """This always returns empty results. Mock it to test other cases.""" |
| return [] |
| |
| def MoveIssues(self, cnxn, dest_project, issues, user_service): |
| move_to = dest_project.project_id |
| self.issues_by_project.setdefault(move_to, {}) |
| for issue in issues: |
| project_id = issue.project_id |
| self.issues_by_project[project_id].pop(issue.local_id) |
| issue.local_id = self.AllocateNextLocalID(cnxn, move_to) |
| self.issues_by_project[move_to][issue.local_id] = issue |
| issue.project_id = move_to |
| return [] |
| |
| |
| class SpamService(object): |
| """Fake version of SpamService that just works in-RAM.""" |
| |
| def __init__(self, user_id=None): |
| self.user_id = user_id |
| self.reports_by_issue_id = collections.defaultdict(list) |
| self.comment_reports_by_issue_id = collections.defaultdict(dict) |
| self.manual_verdicts_by_issue_id = collections.defaultdict(dict) |
| self.manual_verdicts_by_comment_id = collections.defaultdict(dict) |
| |
| def FlagIssues(self, cnxn, issue_service, issues, user_id, flagged_spam): |
| for issue in issues: |
| if flagged_spam: |
| self.reports_by_issue_id[issue.issue_id].append(user_id) |
| else: |
| self.reports_by_issue_id[issue.issue_id].remove(user_id) |
| |
| def FlagComment(self, cnxn, issue_id, comment_id, reported_user_id, user_id, |
| flagged_spam): |
| if not comment_id in self.comment_reports_by_issue_id[issue_id]: |
| self.comment_reports_by_issue_id[issue_id][comment_id] = [] |
| if flagged_spam: |
| self.comment_reports_by_issue_id[issue_id][comment_id].append(user_id) |
| else: |
| self.comment_reports_by_issue_id[issue_id][comment_id].remove(user_id) |
| |
| def RecordManualIssueVerdicts( |
| self, cnxn, issue_service, issues, user_id, is_spam): |
| for issue in issues: |
| self.manual_verdicts_by_issue_id[issue.issue_id][user_id] = is_spam |
| |
| def RecordManualCommentVerdict( |
| self, cnxn, issue_service, user_service, comment_id, |
| sequnce_num, user_id, is_spam): |
| self.manual_verdicts_by_comment_id[comment_id][user_id] = is_spam |
| |
| def RecordClassifierIssueVerdict(self, cnxn, issue, is_spam, confidence): |
| return |
| |
| def RecordClassifierCommentVerdict(self, cnxn, issue, is_spam, confidence): |
| return |
| |
| def ClassifyComment(self, comment): |
| return {'outputLabel': 'ham', |
| 'outputMulti': [{'label': 'ham', 'score': '1.0'}]} |
| |
| def ClassifyIssue(self, issue, firstComment): |
| return {'outputLabel': 'ham', |
| 'outputMulti': [{'label': 'ham', 'score': '1.0'}]} |
| |
| |
| class FeaturesService(object): |
| """A fake implementation of FeaturesService.""" |
| def __init__(self): |
| # Test-only sequence of expunged projects. |
| self.expunged_saved_queries = [] |
| self.expunged_filter_rules = [] |
| self.expunged_quick_edit = [] |
| |
| def ExpungeSavedQueriesExecuteInProject(self, _cnxn, project_id): |
| self.expunged_saved_queries.append(project_id) |
| |
| def ExpungeFilterRules(self, _cnxn, project_id): |
| self.expunged_filter_rules.append(project_id) |
| |
| def ExpungeQuickEditHistory(self, _cnxn, project_id): |
| self.expunged_quick_edit.append(project_id) |
| |
| def GetFilterRules(self, cnxn, project_id): |
| return [] |
| |
| def GetCannedQueriesByProjectID(self, cnxn, project_id): |
| return [] |
| |
| def UpdateCannedQueries(self, cnxn, project_id, canned_queries): |
| pass |
| |
| def GetSubscriptionsInProjects(self, cnxn, project_ids): |
| return {} |
| |
| def GetSavedQuery(self, cnxn, query_id): |
| return tracker_pb2.SavedQuery() |
| |
| |
| class PostData(object): |
| """A dictionary-like object that also implements getall().""" |
| |
| def __init__(self, *args, **kwargs): |
| self.dictionary = dict(*args, **kwargs) |
| |
| def getall(self, key): |
| """Return all values, assume that the value at key is already a list.""" |
| return self.dictionary.get(key, []) |
| |
| def get(self, key, default=None): |
| """Return first value, assume that the value at key is already a list.""" |
| return self.dictionary.get(key, [default])[0] |
| |
| def __getitem__(self, key): |
| """Return first value, assume that the value at key is already a list.""" |
| return self.dictionary[key][0] |
| |
| def __contains__(self, key): |
| return key in self.dictionary |
| |
| def keys(self): |
| """Return the keys in the POST data.""" |
| return self.dictionary.keys() |
| |
| |
| class FakeFile: |
| def __init__(self, data=None): |
| self.data = data |
| |
| def read(self): |
| return self.data |
| |
| def write(self, content): |
| return |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, __1, __2, __3): |
| return None |
| |
| |
| def gcs_open(filename, mode): |
| return FakeFile(filename) |