blob: 6447078e7e98fa2ae1370c9a9c794680dae756ba [file] [log] [blame]
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is govered by a BSD-style
# license that can be found in the LICENSE file or at
"""Fake object classes that are useful for unit tests."""
import collections
import logging
import re
import settings
from framework import framework_helpers
from framework import monorailrequest
from framework import permissions
from framework import validate
from proto import project_pb2
from proto import tracker_pb2
from proto import user_pb2
from proto import usergroup_pb2
from services import caches
from services import issue_svc
from services import project_svc
from services import user_svc
from tracker import tracker_bizobj
from tracker import tracker_constants
# Many fakes return partial or constant values, regardless of their arguments.
# pylint: disable=unused-argument
BOUNDARY = '-----thisisaboundary'
def Project(
project_name='proj', project_id=None, state=project_pb2.ProjectState.LIVE,
access=project_pb2.ProjectAccess.ANYONE, moved_to=None,
owner_ids=None, committer_ids=None, contributor_ids=None):
"""Returns a project protocol buffer with the given attributes."""
project_id = project_id or hash(project_name)
return project_pb2.MakeProject(
project_name, project_id=project_id, state=state, access=access,
moved_to=moved_to, cached_content_timestamp=cached_content_timestamp,
owner_ids=owner_ids, committer_ids=committer_ids,
def MakeTestIssue(
project_id, local_id, summary, status, owner_id, labels=None,
derived_labels=None, derived_status=None, merged_into=0, star_count=0,
derived_owner_id=0, issue_id=None, reporter_id=None, opened_timestamp=None,
closed_timestamp=None, modified_timestamp=None, is_spam=False,
component_ids=None, project_name=None, field_values=None):
"""Easily make an Issue for testing."""
issue = tracker_pb2.Issue()
issue.project_id = project_id
issue.project_name = project_name
issue.local_id = local_id
issue.issue_id = issue_id if issue_id else 100000 + local_id
issue.reporter_id = reporter_id if reporter_id else owner_id
issue.summary = summary
issue.status = status
issue.owner_id = owner_id
issue.derived_owner_id = derived_owner_id
issue.star_count = star_count
issue.merged_into = merged_into
issue.is_spam = is_spam
if opened_timestamp:
issue.opened_timestamp = opened_timestamp
if modified_timestamp:
issue.modified_timestamp = modified_timestamp
if closed_timestamp:
issue.closed_timestamp = closed_timestamp
if labels is not None:
if isinstance(labels, basestring):
labels = labels.split()
if derived_labels is not None:
if isinstance(derived_labels, basestring):
derived_labels = derived_labels.split()
if derived_status is not None:
issue.derived_status = derived_status
if component_ids is not None:
issue.component_ids = component_ids
if field_values is not None:
issue.field_values = field_values
return issue
def MakeTestConfig(project_id, labels, statuses):
"""Convenient function to make a ProjectIssueConfig object."""
config = tracker_bizobj.MakeDefaultProjectIssueConfig(project_id)
if isinstance(labels, basestring):
labels = labels.split()
if isinstance(statuses, basestring):
statuses = statuses.split()
config.well_known_labels = [
tracker_pb2.LabelDef(label=lab) for lab in labels]
config.well_known_statuses = [
tracker_pb2.StatusDef(status=stat) for stat in statuses]
return config
class MonorailConnection(object):
"""Fake connection to databases for use in tests."""
def Commit(self):
def Close(self):
class MonorailRequest(monorailrequest.MonorailRequest):
"""Subclass of MonorailRequest suitable for testing."""
def __init__(self, user_info=None, project=None, perms=None, **kwargs):
"""Construct a test MonorailRequest.
Typically, this is constructed via testing.helpers.GetRequestObjects,
which also causes url parsing and optionally initializes the user,
project, and permissions info.
user_info: a dict of user attributes to set on a MonorailRequest object.
For example, "user_id: 5" causes self.auth.user_id=5.
project: the Project pb for this request.
perms: a PermissionSet for this request.
super(MonorailRequest, self).__init__(**kwargs)
if user_info is not None:
for key in user_info:
setattr(self.auth, key, user_info[key])
if 'user_id' in user_info:
self.auth.effective_ids = {user_info['user_id']}
self.perms = perms or permissions.ADMIN_PERMISSIONSET
self.project = project
class UserGroupService(object):
"""Fake UserGroupService class for testing other code."""
def __init__(self):
self.group_settings = {}
self.group_members = {}
self.group_addrs = {}
self.role_dict = {}
def TestAddGroupSettings(
self, group_id, email, who_can_view=None, anyone_can_join=False,
who_can_add=None, external_group_type=None,
last_sync_time=0, friend_projects=None):
"""Set up a fake group for testing.
group_id: int user ID of the new user group.
email: string email address to identify the user group.
who_can_view: string enum 'owners', 'members', or 'anyone'.
anyone_can_join: optional boolean to allow any users to join the group.
who_can_add: optional list of int user IDs of users who can add
more members to the group.
friend_projects = friend_projects or []
group_settings = usergroup_pb2.MakeSettings(
who_can_view or 'members',
external_group_type, last_sync_time, friend_projects)
self.group_settings[group_id] = group_settings
self.group_addrs[group_id] = email
# TODO(jrobbins): store the other settings.
def TestAddMembers(self, group_id, user_ids, role='member'):
self.group_members.setdefault(group_id, []).extend(user_ids)
for user_id in user_ids:
self.role_dict.setdefault(group_id, {})[user_id] = role
def LookupMemberships(self, _cnxn, user_id):
memberships = {
group_id for group_id, member_ids in self.group_members.iteritems()
if user_id in member_ids}
return memberships
def DetermineWhichUserIDsAreGroups(self, _cnxn, user_ids):
return [uid for uid in user_ids
if uid in self.group_settings]
def GetAllUserGroupsInfo(self, cnxn):
infos = []
for group_id in self.group_settings:
len(self.group_members.get(group_id, [])),
self.group_settings[group_id], group_id))
return infos
def GetAllGroupSettings(self, _cnxn, group_ids):
return {gid: self.group_settings[gid]
for gid in group_ids
if gid in self.group_settings}
def GetGroupSettings(self, cnxn, group_id):
return self.GetAllGroupSettings(cnxn, [group_id]).get(group_id)
def CreateGroup(self, cnxn, services, email, who_can_view_members,
ext_group_type=None, friend_projects=None):
friend_projects = friend_projects or []
group_id = services.user.LookupUserID(
cnxn, email, autocreate=True, allowgroups=True)
group_settings = usergroup_pb2.MakeSettings(
who_can_view_members, ext_group_type, 0, friend_projects)
self.UpdateSettings(cnxn, group_id, group_settings)
return group_id
def DeleteGroups(self, cnxn, group_ids):
member_ids_dict, owner_ids_dict = self.LookupMembers(cnxn, group_ids)
citizens_id_dict = collections.defaultdict(list)
for g_id, user_ids in member_ids_dict.iteritems():
for g_id, user_ids in owner_ids_dict.iteritems():
for g_id, citizen_ids in citizens_id_dict.iteritems():
# Remove group members, friend projects and settings
self.RemoveMembers(cnxn, g_id, citizen_ids)
self.group_settings.pop(g_id, None)
def LookupMembers(self, _cnxn, group_id_list):
members_dict = {}
owners_dict = {}
for gid in group_id_list:
members_dict[gid] = []
owners_dict[gid] = []
for mid in self.group_members.get(gid, []):
if self.role_dict.get(gid, {}).get(mid) == 'owner':
elif self.role_dict.get(gid, {}).get(mid) == 'member':
return members_dict, owners_dict
def LookupAllMembers(self, _cnxn, group_id_list):
direct_members, direct_owners = self.LookupMembers(
_cnxn, group_id_list)
members_dict = {}
owners_dict = {}
for gid in group_id_list:
members = direct_members[gid]
owners = direct_owners[gid]
owners_dict[gid] = owners
members_dict[gid] = members
group_ids = set([uid for uid in members + owners
if uid in self.group_settings])
while group_ids:
indirect_members, indirect_owners = self.LookupMembers(
_cnxn, group_ids)
child_members = set()
child_owners = set()
for _, children in indirect_members.iteritems():
for _, children in indirect_owners.iteritems():
group_ids = set(self.DetermineWhichUserIDsAreGroups(
_cnxn, list(child_members) + list(child_owners)))
members_dict[gid] = list(set(members_dict[gid]))
return members_dict, owners_dict
def RemoveMembers(self, _cnxn, group_id, old_member_ids):
current_member_ids = self.group_members.get(group_id, [])
revised_member_ids = [mid for mid in current_member_ids
if mid not in old_member_ids]
self.group_members[group_id] = revised_member_ids
def UpdateMembers(self, _cnxn, group_id, member_ids, new_role):
self.RemoveMembers(_cnxn, group_id, member_ids)
self.TestAddMembers(group_id, member_ids, new_role)
def UpdateSettings(self, _cnxn, group_id, group_settings):
self.group_settings[group_id] = group_settings
def ExpandAnyUserGroups(self, cnxn, user_ids):
group_ids = set(self.DetermineWhichUserIDsAreGroups(cnxn, user_ids))
direct_ids = [uid for uid in user_ids if uid not in group_ids]
member_ids_dict, owner_ids_dict = self.LookupAllMembers(cnxn, group_ids)
indirect_ids = set()
for gid in group_ids:
# It's possible that a user has both direct and indirect memberships of
# one group. In this case, mark the user as direct member only.
indirect_ids = [iid for iid in indirect_ids if iid not in direct_ids]
return direct_ids, list(indirect_ids)
def LookupVisibleMembers(
self, cnxn, group_id_list, perms, effective_ids, services):
settings_dict = self.GetAllGroupSettings(cnxn, group_id_list)
group_ids = settings_dict.keys()
direct_member_ids_dict, direct_owner_ids_dict = self.LookupMembers(
cnxn, group_ids)
all_member_ids_dict, all_owner_ids_dict = self.LookupAllMembers(
cnxn, group_ids)
visible_member_ids_dict = {}
visible_owner_ids_dict = {}
for gid in group_ids:
member_ids = all_member_ids_dict[gid]
owner_ids = all_owner_ids_dict[gid]
if permissions.CanViewGroup(perms, effective_ids, settings_dict[gid],
member_ids, owner_ids, []):
visible_member_ids_dict[gid] = direct_member_ids_dict[gid]
visible_owner_ids_dict[gid] = direct_owner_ids_dict[gid]
return visible_member_ids_dict, visible_owner_ids_dict
def ValidateFriendProjects(self, cnxn, services, friend_projects):
project_names = filter(None, re.split('; |, | |;|,', friend_projects))
id_dict = services.project.LookupProjectIDs(cnxn, project_names)
missed_projects = []
result = []
for p_name in project_names:
if p_name in id_dict:
error_msg = ''
if missed_projects:
error_msg = 'Project(s) %s do not exist' % ', '.join(missed_projects)
return None, error_msg
return result, None
class CacheManager(object):
def __init__(self, invalidate_tbl=None):
self.last_call = None
self.processed_invalidations_up_to = 0
def MakeCache(self, kind, max_size=None, use_value_centric_cache=False):
"""Make a new cache and register it for future invalidations."""
if use_value_centric_cache:
cache = caches.ValueCentricRamCache(self, kind, max_size=max_size)
cache = caches.RamCache(self, kind, max_size=max_size)
return cache
def DoDistributedInvalidation(self, cnxn):
"""Drop any cache entries that were invalidated by other jobs."""
self.last_call = 'DoDistributedInvalidation', cnxn
def StoreInvalidateRows(self, cnxn, kind, keys):
"""Store database rows to let all frontends know to invalidate."""
self.last_call = 'StoreInvalidateRows', cnxn, kind, keys
def StoreInvalidateAll(self, cnxn, kind):
"""Store a database row to let all frontends know to invalidate."""
self.last_call = 'StoreInvalidateAll', cnxn, kind
class UserService(object):
def __init__(self):
"""Creates a test-appropriate UserService object."""
self.users_by_email = {}
self.users_by_id = {}
self.test_users = {}
def TestAddUser(self, email, user_id, add_user=True, banned=False):
"""Add a user to the fake UserService instance.
email: Email of the user.
user_id: int user ID.
add_user: Flag whether user pb should be created, i.e. whether a
Monorail account should be created
banned: Boolean to set the user as banned
The User PB that was added, or None.
self.users_by_email[email] = user_id
self.users_by_id[user_id] = email
user = None
if add_user:
user = user_pb2.MakeUser()
user.is_site_admin = False = email
user.obscure_email = True
if banned:
user.banned = 'is banned'
self.test_users[user_id] = user
return user
def GetUser(self, _cnxn, user_id):
return self.test_users.get(user_id)
def _CreateUser(self, _cnxn, email):
if email in self.users_by_email:
user_id = framework_helpers.MurmurHash3_x86_32(email)
self.users_by_id[user_id] = email
self.users_by_email[email] = user_id
def _CreateUsers(self, cnxn, emails):
for email in emails:
self._CreateUser(cnxn, email)
def LookupUserID(self, cnxn, email, autocreate=False, allowgroups=False):
user_id = self.users_by_email.get(email)
if not user_id and validate.IsValidEmail(email):
if autocreate:
self._CreateUser(cnxn, email)
user_id = self.users_by_email.get(email)
raise user_svc.NoSuchUserException(email)
return user_id
def GetUsersByIDs(self, cnxn, user_ids, use_cache=True):
user_dict = {}
for user_id in user_ids:
if user_id and self.test_users.get(user_id):
user_dict[user_id] = self.test_users[user_id]
return user_dict
def LookupExistingUserIDs(self, cnxn, emails):
email_dict = {
email: self.users_by_email[email]
for email in emails
if email in self.users_by_email}
return email_dict
def LookupUserIDs(self, cnxn, emails, autocreate=False,
email_dict = {}
for email in emails:
user_id = self.LookupUserID(
cnxn, email, autocreate=autocreate, allowgroups=allowgroups)
if user_id:
email_dict[email] = user_id
return email_dict
def LookupUserEmail(self, _cnxn, user_id):
email = self.users_by_id.get(user_id)
return email
def LookupUserEmails(self, cnxn, user_ids):
user_dict = {
user_id: self.LookupUserEmail(cnxn, user_id)
for user_id in user_ids}
return user_dict
def UpdateUser(self, _cnxn, user_id, user):
"""Updates the user pb."""
self.test_users[user_id] = user
def UpdateUserSettings(
self, cnxn, user_id, user, notify=None, notify_starred=None,
obscure_email=None, after_issue_update=None,
is_site_admin=None, ignore_action_limits=None,
is_banned=None, banned_reason=None, action_limit_updates=None,
dismissed_cues=None, keep_people_perms_open=None, preview_on_hover=None):
self.UpdateUser(cnxn, user_id, user)
class AbstractStarService(object):
"""Fake StarService."""
def __init__(self):
self.stars_by_item_id = {}
self.stars_by_starrer_id = {}
self.expunged_item_ids = []
def ExpungeStars(self, _cnxn, item_id):
old_starrer = self.stars_by_item_id.get(item_id)
self.stars_by_item_id[item_id] = []
if self.stars_by_starrer_id.get(old_starrer):
self.stars_by_starrer_id[old_starrer] = [
it for it in self.stars_by_starrer_id[old_starrer]
if it != item_id]
def LookupItemStarrers(self, _cnxn, item_id):
return self.stars_by_item_id.get(item_id, [])
def LookupStarredItemIDs(self, _cnxn, starrer_user_id):
return self.stars_by_starrer_id.get(starrer_user_id, [])
def IsItemStarredBy(self, cnxn, item_id, starrer_user_id):
return item_id in self.LookupStarredItemIDs(cnxn, starrer_user_id)
def CountItemStars(self, cnxn, item_id):
return len(self.LookupItemStarrers(cnxn, item_id))
def CountItemsStars(self, cnxn, item_ids):
return {item_id: self.CountItemStars(cnxn, item_id)
for item_id in item_ids}
def SetStar(self, cnxn, item_id, starrer_user_id, starred):
if starred and not self.IsItemStarredBy(cnxn, item_id, starrer_user_id):
self.stars_by_item_id.setdefault(item_id, []).append(starrer_user_id)
self.stars_by_starrer_id.setdefault(starrer_user_id, []).append(item_id)
elif not starred and self.IsItemStarredBy(cnxn, item_id, starrer_user_id):
class UserStarService(AbstractStarService):
class ProjectStarService(AbstractStarService):
class IssueStarService(AbstractStarService):
# pylint: disable=arguments-differ
def SetStar(
self, cnxn, _service, _config, issue_id, starrer_user_id,
super(IssueStarService, self).SetStar(
cnxn, issue_id, starrer_user_id, starred)
class ProjectService(object):
"""Fake ProjectService object.
Provides methods for creating users and projects, which are accessible
through parts of the real ProjectService interface.
def __init__(self):
self.test_projects = {} # project_name -> project_pb
self.projects_by_id = {}
self.test_star_manager = None
self.indexed_projects = {}
self.unindexed_projects = set()
self.index_counter = 0
self.project_commitments = {}
def TestAddProject(
self, name, summary='', state=project_pb2.ProjectState.LIVE,
owner_ids=None, committer_ids=None, contrib_ids=None,
issue_notify_address=None, state_reason='',
description=None, project_id=None, process_inbound_email=None,
"""Add a project to the fake ProjectService object.
name: The name of the project. Will replace any existing project under
the same name.
summary: The summary string of the project.
state: Initial state for the project from project_pb2.ProjectState.
owner_ids: List of user ids for project owners
committer_ids: List of user ids for project committers
contrib_ids: List of user ids for project contributors
issue_notify_address: email address to send issue change notifications
state_reason: string describing the reason the project is in its current
description: The description string for this project
project_id: A unique integer identifier for the created project.
process_inbound_email: True to make this project accept inbound email.
access: One of the values of enum project_pb2.ProjectAccess.
A populated project PB.
proj_pb = project_pb2.Project()
proj_pb.project_id = project_id or hash(name) % 100000
proj_pb.project_name = name
proj_pb.summary = summary
proj_pb.state = state
proj_pb.state_reason = state_reason
if description is not None:
proj_pb.description = description
self.TestAddProjectMembers(owner_ids, proj_pb, OWNER_ROLE)
self.TestAddProjectMembers(committer_ids, proj_pb, COMMITTER_ROLE)
self.TestAddProjectMembers(contrib_ids, proj_pb, CONTRIBUTOR_ROLE)
if issue_notify_address is not None:
proj_pb.issue_notify_address = issue_notify_address
if process_inbound_email is not None:
proj_pb.process_inbound_email = process_inbound_email
if access is not None:
proj_pb.access = access
self.test_projects[name] = proj_pb
self.projects_by_id[proj_pb.project_id] = proj_pb
return proj_pb
def TestAddProjectMembers(self, user_id_list, proj_pb, role):
if user_id_list is not None:
for user_id in user_id_list:
if role == OWNER_ROLE:
elif role == COMMITTER_ROLE:
elif role == CONTRIBUTOR_ROLE:
def LookupProjectIDs(self, cnxn, project_names):
return {
project_name: self.test_projects[project_name].project_id
for project_name in project_names
if project_name in self.test_projects}
def LookupProjectNames(self, cnxn, project_ids):
projects_dict = self.GetProjects(cnxn, project_ids)
return {p.project_id: p.project_name
for p in projects_dict.itervalues()}
def CreateProject(
self, _cnxn, project_name, owner_ids, committer_ids,
contributor_ids, summary, description,
state=project_pb2.ProjectState.LIVE, access=None, read_only=None,
home_page=None, docs_url=None, logo_gcs_id=None, logo_file_name=None):
"""Create and store a Project with the given attributes."""
if project_name in self.test_projects:
raise project_svc.ProjectAlreadyExists()
project_name, summary=summary, state=state,
owner_ids=owner_ids, committer_ids=committer_ids,
contrib_ids=contributor_ids, description=description,
def ExpungeProject(self, _cnxn, project_id):
project = self.projects_by_id.get(project_id)
if project:
self.test_projects.pop(project.project_name, None)
def GetProjectsByName(self, _cnxn, project_name_list, use_cache=True):
return {
pn: self.test_projects[pn] for pn in project_name_list
if pn in self.test_projects}
def GetProjectByName(self, _cnxn, name, use_cache=True):
return self.test_projects.get(name)
def GetProjectList(self, cnxn, project_id_list, use_cache=True):
project_dict = self.GetProjects(cnxn, project_id_list, use_cache=use_cache)
return [project_dict[pid] for pid in project_id_list
if pid in project_dict]
def GetVisibleLiveProjects(self, _cnxn, logged_in_user, effective_ids,
return self.projects_by_id.keys()
def GetProjects(self, _cnxn, project_ids, use_cache=True):
result = {}
for project_id in project_ids:
project = self.projects_by_id.get(project_id)
if project:
result[project_id] = project
return result
def GetProject(self, cnxn, project_id, use_cache=True):
"""Load the specified project from the database."""
project_id_dict = self.GetProjects(cnxn, [project_id], use_cache=use_cache)
return project_id_dict.get(project_id)
def IsValidProjectName(string):
"""Return true if the given string is a valid project name."""
return project_svc.RE_PROJECT_NAME.match(string)
def GetProjectCommitments(self, _cnxn, project_id):
if project_id in self.project_commitments:
return self.project_commitments[project_id]
project_commitments = project_pb2.ProjectCommitments()
project_commitments.project_id = project_id
return project_commitments
def TestStoreProjectCommitments(self, project_commitments):
key = project_commitments.project_id
self.project_commitments[key] = project_commitments
def UpdateProject(
self, _cnxn, project_id, summary=None, description=None,
state=None, state_reason=None, access=None,
issue_notify_address=None, attachment_bytes_used=None,
attachment_quota=None, moved_to=None, process_inbound_email=None,
read_only_reason=None, cached_content_timestamp=None,
only_owners_see_contributors=None, delete_time=None,
recent_activity=None, revision_url_format=None, home_page=None,
docs_url=None, logo_gcs_id=None, logo_file_name=None):
project = self.projects_by_id.get(project_id)
if not project:
raise project_svc.NoSuchProjectException(
'Project "%s" not found!' % project_id)
# TODO(jrobbins): implement all passed arguments - probably as a utility
# method shared with the real persistence implementation.
if read_only_reason is not None:
project.read_only_reason = read_only_reason
def UpdateProjectRoles(
self, _cnxn, project_id, owner_ids, committer_ids,
contributor_ids, now=None):
project = self.projects_by_id.get(project_id)
if not project:
raise project_svc.NoSuchProjectException(
'Project "%s" not found!' % project_id)
project.owner_ids = owner_ids
project.committer_ids = committer_ids
project.contributor_ids = contributor_ids
def MarkProjectDeletable(
self, _cnxn, project_id, _config_service):
project = self.projects_by_id[project_id]
project.project_name = 'DELETABLE_%d' % project_id
project.state = project_pb2.ProjectState.DELETABLE
def UpdateRecentActivity(self, _cnxn, _project_id, now=None):
def GetUserRolesInAllProjects(self, _cnxn, effective_ids):
owned_project_ids = set()
membered_project_ids = set()
contrib_project_ids = set()
for project in self.projects_by_id.itervalues():
if not effective_ids.isdisjoint(project.owner_ids):
elif not effective_ids.isdisjoint(project.committer_ids):
elif not effective_ids.isdisjoint(project.contributor_ids):
return owned_project_ids, membered_project_ids, contrib_project_ids
class ConfigService(object):
"""Fake version of ConfigService that just works in-RAM."""
def __init__(self, user_id=None):
self.project_configs = {}
self.next_field_id = 123
self.next_component_id = 345
self.expunged_configs = []
self.component_ids_to_templates = {}
def TemplatesWithComponent(self, _cnxn, component_id, _config):
return self.component_ids_to_templates.get(component_id, [])
def ExpungeConfig(self, _cnxn, project_id):
def GetLabelDefRows(self, cnxn, project_id):
"""This always returns empty results. Mock it to test other cases."""
return []
def GetLabelDefRowsAnyProject(self, cnxn, where=None):
"""This always returns empty results. Mock it to test other cases."""
return []
def LookupLabel(self, cnxn, project_id, label_id):
if label_id == 999:
return None
return 'label_%d_%d' % (project_id, label_id)
def LookupLabelID(self, cnxn, project_id, label, autocreate=True):
return 1
def LookupLabelIDs(self, cnxn, project_id, labels, autocreate=False):
return [idx for idx, _label in enumerate(labels)]
def LookupIDsOfLabelsMatching(self, cnxn, project_id, regex):
return [1, 2, 3]
def LookupStatus(self, cnxn, project_id, status_id):
return 'status_%d_%d' % (project_id, status_id)
def LookupStatusID(self, cnxn, project_id, status, autocreate=True):
if status:
return 1
return 0
def LookupStatusIDs(self, cnxn, project_id, statuses):
return [idx for idx, _status in enumerate(statuses)]
def LookupClosedStatusIDs(self, cnxn, project_id):
return [7, 8, 9]
def StoreConfig(self, _cnxn, config):
self.project_configs[config.project_id] = config
def GetProjectConfig(self, _cnxn, project_id, use_cache=True):
if project_id in self.project_configs:
return self.project_configs[project_id]
return tracker_bizobj.MakeDefaultProjectIssueConfig(project_id)
def GetProjectConfigs(self, _cnxn, project_ids, use_cache=True):
config_dict = {}
for project_id in project_ids:
if project_id in self.project_configs:
config_dict[project_id] = self.project_configs[project_id]
config_dict[project_id] = tracker_bizobj.MakeDefaultProjectIssueConfig(
return config_dict
def UpdateConfig(
self, cnxn, project, well_known_statuses=None,
statuses_offer_merge=None, well_known_labels=None,
excl_label_prefixes=None, templates=None,
default_template_for_developers=None, default_template_for_users=None,
list_prefs=None, restrict_to_known=None):
project_id = project.project_id
project_config = self.GetProjectConfig(cnxn, project_id, use_cache=False)
if well_known_statuses is not None:
tracker_bizobj.SetConfigStatuses(project_config, well_known_statuses)
if statuses_offer_merge is not None:
project_config.statuses_offer_merge = statuses_offer_merge
if well_known_labels is not None:
tracker_bizobj.SetConfigLabels(project_config, well_known_labels)
if excl_label_prefixes is not None:
project_config.exclusive_label_prefixes = excl_label_prefixes
if templates is not None:
project_config.templates = templates
if default_template_for_developers is not None:
project_config.default_template_for_developers = (
if default_template_for_users is not None:
project_config.default_template_for_users = default_template_for_users
if list_prefs:
default_col_spec, default_sort_spec, x_attr, y_attr = list_prefs
project_config.default_col_spec = default_col_spec
project_config.default_sort_spec = default_sort_spec
project_config.default_x_attr = x_attr
project_config.default_y_attr = y_attr
if restrict_to_known is not None:
project_config.restrict_to_known = restrict_to_known
self.StoreConfig(cnxn, project_config)
return project_config
def CreateFieldDef(
self, cnxn, project_id, field_name, field_type_str, applic_type,
applic_pred, is_required, is_multivalued,
min_value, max_value, regex, needs_member, needs_perm,
grants_perm, notify_on, docstring, admin_ids):
config = self.GetProjectConfig(cnxn, project_id)
field_type = tracker_pb2.FieldTypes(field_type_str)
field_id = self.next_field_id
self.next_field_id += 1
fd = tracker_bizobj.MakeFieldDef(
field_id, project_id, field_name, field_type, applic_type, applic_pred,
is_required, is_multivalued, min_value, max_value, regex,
needs_member, needs_perm, grants_perm, notify_on, docstring, False)
self.StoreConfig(cnxn, config)
def SoftDeleteFieldDef(self, cnxn, project_id, field_id):
config = self.GetProjectConfig(cnxn, project_id)
fd = tracker_bizobj.FindFieldDefByID(field_id, config)
fd.is_deleted = True
self.StoreConfig(cnxn, config)
def UpdateFieldDef(
self, cnxn, project_id, field_id, field_name=None,
applicable_type=None, applicable_predicate=None, is_required=None,
is_multivalued=None, min_value=None, max_value=None, regex=None,
needs_member=None, needs_perm=None, grants_perm=None, notify_on=None,
docstring=None, admin_ids=None):
config = self.GetProjectConfig(cnxn, project_id)
fd = tracker_bizobj.FindFieldDefByID(field_id, config)
# pylint: disable=multiple-statements
if field_name is not None: fd.field_name = field_name
if applicable_type is not None: fd.applicable_type = applicable_type
if applicable_predicate is not None:
fd.applicable_predicate = applicable_predicate
if is_required is not None: fd.is_required = is_required
if is_multivalued is not None: fd.is_multivalued = is_multivalued
if min_value is not None: fd.min_value = min_value
if max_value is not None: fd.max_value = max_value
if regex is not None: fd.regex = regex
if docstring is not None: fd.docstring = docstring
if admin_ids is not None: fd.admin_ids = admin_ids
self.StoreConfig(cnxn, config)
def CreateComponentDef(
self, cnxn, project_id, path, docstring, deprecated, admin_ids, cc_ids,
created, creator_id):
config = self.GetProjectConfig(cnxn, project_id)
cd = tracker_bizobj.MakeComponentDef(
self.next_component_id, project_id, path, docstring, deprecated,
admin_ids, cc_ids, created, creator_id)
self.next_component_id += 1
self.StoreConfig(cnxn, config)
return self.next_component_id - 1
def UpdateComponentDef(
self, cnxn, project_id, component_id, path=None, docstring=None,
deprecated=None, admin_ids=None, cc_ids=None, created=None,
creator_id=None, modified=None, modifier_id=None):
config = self.GetProjectConfig(cnxn, project_id)
cd = tracker_bizobj.FindComponentDefByID(component_id, config)
if path is not None:
assert path
cd.path = path
# pylint: disable=multiple-statements
if docstring is not None: cd.docstring = docstring
if deprecated is not None: cd.deprecated = deprecated
if admin_ids is not None: cd.admin_ids = admin_ids
if cc_ids is not None: cd.cc_ids = cc_ids
if created is not None: cd.created = created
if creator_id is not None: cd.creator_id = creator_id
if modified is not None: cd.modified = modified
if modifier_id is not None: cd.modifier_id = modifier_id
self.StoreConfig(cnxn, config)
def DeleteComponentDef(self, cnxn, project_id, component_id):
"""Delete the specified component definition."""
config = self.GetProjectConfig(cnxn, project_id)
config.component_defs = [
cd for cd in config.component_defs
if cd.component_id != component_id]
self.StoreConfig(cnxn, config)
def InvalidateMemcache(self, issues, key_prefix=''):
class IssueService(object):
"""Fake version of IssueService that just works in-RAM."""
# pylint: disable=unused-argument
def __init__(self, user_id=None):
self.user_id = user_id
# Dictionary {project_id: issue_pb_dict}
# where issue_pb_dict is a dictionary of the form
# {local_id: issue_pb}
self.issues_by_project = {}
self.issues_by_iid = {}
# Dictionary {project_id: comment_pb_dict}
# where comment_pb_dict is a dictionary of the form
# {local_id: comment_pb_list}
self.comments_by_project = {}
self.comments_by_iid = {}
self.comments_by_cid = {}
self.attachments_by_id = {}
# Set of issue IDs for issues that have been indexed by calling
# IndexIssues().
self.indexed_issue_iids = set()
# Test-only indication that the indexer would have been called
# by the real DITPersist.
self.indexer_called = False
# Test-only sequence of updated and enqueued.
self.updated_issues = []
self.enqueued_issues = []
# Test-only sequence of expunged issues and projects.
self.expunged_issues = []
self.expunged_former_locations = []
self.expunged_local_ids = []
# Test-only indicators that methods were called.
self.get_all_issues_in_project_called = False
self.update_issues_called = False
self.enqueue_issues_called = False
# The next id to return if it is > 0.
self.next_id = -1
def UpdateIssues(
self, cnxn, issues, update_cols=None, just_derived=False,
commit=True, invalidate=True):
self.update_issues_called = True
def EnqueueIssuesForIndexing(self, _cnxn, issues):
self.enqueue_issues_called = True
def ExpungeIssues(self, _cnxn, issue_ids):
def ExpungeFormerLocations(self, _cnxn, project_id):
def ExpungeLocalIDCounters(self, _cnxn, project_id):
def TestAddIssue(self, issue):
project_id = issue.project_id
self.issues_by_project.setdefault(project_id, {})
self.issues_by_project[project_id][issue.local_id] = issue
self.issues_by_iid[issue.issue_id] = issue
# Adding a new issue should add the first comment to the issue
comment = tracker_pb2.IssueComment()
comment.project_id = issue.project_id
comment.issue_id = issue.issue_id
comment.content = issue.summary
comment.timestamp = issue.opened_timestamp
if issue.reporter_id:
comment.user_id = issue.reporter_id
comment.sequence = 0
self.TestAddComment(comment, issue.local_id)
def TestAddComment(self, comment, local_id):
pid = comment.project_id
if not = len(self.comments_by_cid)
self.comments_by_project.setdefault(pid, {})
self.comments_by_project[pid].setdefault(local_id, []).append(comment)
self.comments_by_iid.setdefault(comment.issue_id, []).append(comment)
self.comments_by_cid[] = comment
def TestAddAttachment(self, attachment, comment_id, issue_id):
if not attachment.attachment_id:
attachment.attachment_id = len(self.attachments_by_id)
aid = attachment.attachment_id
self.attachments_by_id[aid] = attachment, comment_id, issue_id
comment = self.comments_by_cid[comment_id]
if attachment not in comment.attachments:
def GetAttachmentAndContext(self, _cnxn, attachment_id):
if attachment_id in self.attachments_by_id:
attach, comment_id, issue_id = self.attachments_by_id[attachment_id]
if not attach.deleted:
return attach, comment_id, issue_id
raise issue_svc.NoSuchAttachmentException()
def GetComments(self, _cnxn, where=None, order_by=None, **kwargs):
# This is a very limited subset of what the real GetComments() can do.
cid = kwargs.get('id')
comment = self.comments_by_cid.get(cid)
if comment:
return [comment]
return []
def GetComment(self, cnxn, comment_id):
"""Get the requested comment, or raise an exception."""
comments = self.GetComments(cnxn, id=comment_id)
if len(comments) == 1:
return comments[0]
raise issue_svc.NoSuchCommentException()
def ResolveIssueRefs(self, cnxn, ref_projects, default_project_name, refs):
result = []
for project_name, local_id in refs:
project = ref_projects.get(project_name or default_project_name)
if not project or project.state == project_pb2.ProjectState.DELETABLE:
continue # ignore any refs to issues in deleted projects
issue = self.GetIssueByLocalID(cnxn, project.project_id, local_id)
except issue_svc.NoSuchIssueException:
pass # ignore any refs to issues that don't exist
return result
def GetAllIssuesInProject(self, _cnxn, project_id, min_local_id=None):
self.get_all_issues_in_project_called = True
if project_id in self.issues_by_project:
return self.issues_by_project[project_id].values()
return []
def GetIssuesByLocalIDs(
self, _cnxn, project_id, local_id_list, shard_id=None):
results = []
for local_id in local_id_list:
if (project_id in self.issues_by_project
and local_id in self.issues_by_project[project_id]):
return results
def GetIssueByLocalID(self, _cnxn, project_id, local_id):
return self.issues_by_project[project_id][local_id]
except KeyError:
raise issue_svc.NoSuchIssueException()
def GetAnyOnHandIssue(self, issue_ids, start=None, end=None):
return None # Treat them all like misses.
def GetIssue(self, _cnxn, issue_id):
if issue_id in self.issues_by_iid:
return self.issues_by_iid[issue_id]
raise issue_svc.NoSuchIssueException()
def LookupIssueID(self, _cnxn, project_id, local_id):
issue = self.issues_by_project[project_id][local_id]
except KeyError:
raise issue_svc.NoSuchIssueException()
return issue.issue_id
def GetCommentsForIssue(self, _cnxn, issue_id):
comments = self.comments_by_iid.get(issue_id, [])
for idx, c in enumerate(comments):
c.sequence = idx
return comments
def InsertIssue(self, cnxn, issue):
issue.issue_id = issue.project_id * 1000000 + issue.local_id
self.issues_by_project.setdefault(issue.project_id, {})
self.issues_by_project[issue.project_id][issue.local_id] = issue
self.issues_by_iid[issue.issue_id] = issue
return issue.issue_id
def CreateIssue(
self, cnxn, services, project_id,
summary, status, owner_id, cc_ids, labels, field_values,
component_ids, reporter_id, marked_description, blocked_on=None,
blocking=None, attachments=None, timestamp=None, index_now=True):
issue = tracker_pb2.Issue()
issue.project_id = project_id
issue.summary = summary
issue.status = status
if owner_id:
issue.owner_id = owner_id
issue.reporter_id = reporter_id
if timestamp:
issue.opened_timestamp = timestamp
if blocked_on:
if blocking:
if blocking:
issue.local_id = self.AllocateNextLocalID(cnxn, project_id)
issue.issue_id = project_id * 1000000 + issue.local_id
self.comments_by_iid[issue.issue_id][0].content = marked_description
return issue.local_id
def SetUsedLocalID(self, cnxn, project_id):
self.next_id = self.GetHighestLocalID(cnxn, project_id) + 1
def AllocateNextLocalID(self, cnxn, project_id):
return self.GetHighestLocalID(cnxn, project_id) + 1
def GetHighestLocalID(self, _cnxn, project_id):
if self.next_id > 0:
return self.next_id - 1
issue_dict = self.issues_by_project.get(project_id, {})
highest = max([0] + [issue.local_id for issue in issue_dict.itervalues()])
return highest
def ApplyIssueComment(
self, cnxn, services, reporter_id, project_id,
local_id, summary, status, owner_id, cc_ids, labels, field_values,
component_ids, blocked_on, blocking, dangling_blocked_on_refs,
dangling_blocking_refs, merged_into, index_now=True,
page_gen_ts=None, comment=None, inbound_message=None, attachments=None,
"""Feel free to implement a spec-compliant return value."""
issue = self.issues_by_project[project_id][local_id]
amendments = []
if summary and summary != issue.summary:
issue.summary = summary
summary, issue.summary))
if status and status != issue.status:
issue.status = status
status, issue.status))
issue.owner_id = owner_id
issue.cc_ids = cc_ids
issue.labels = labels
issue.field_values = field_values
issue.component_ids = component_ids
if merged_into is not None:
issue.merged_into = merged_into
if amendments or (comment and comment.strip()) or attachments:
comment_pb = self.CreateIssueComment(
cnxn, project_id, local_id, reporter_id, comment,
amendments=amendments, inbound_message=inbound_message)
comment_pb = None
return amendments, comment_pb
def GetCommentsForIssues(self, _cnxn, issue_ids):
comments_dict = {}
for issue_id in issue_ids:
comments_dict[issue_id] = self.comments_by_iid[issue_id]
return comments_dict
def InsertComment(self, cnxn, comment, commit=True):
issue = self.GetIssue(cnxn, comment.issue_id)
self.TestAddComment(comment, issue.local_id)
# pylint: disable=unused-argument
def DeltaUpdateIssue(
self, cnxn, services, reporter_id, project_id,
config, issue, status, owner_id, cc_add, cc_remove, comp_ids_add,
comp_ids_remove, labels_add, labels_remove, field_vals_add,
field_vals_remove, fields_clear, blocked_on_add=None,
blocked_on_remove=None, blocking_add=None, blocking_remove=None,
merged_into=None, index_now=False, comment=None, summary=None,
iids_to_invalidate=None, rules=None, predicate_asts=None,
# Return a bogus amendments list if any of the fields changed
amendments = []
comment_pb = tracker_pb2.IssueComment()
if (status or owner_id or cc_add or cc_remove or labels_add or
labels_remove or field_vals_add or field_vals_remove or fields_clear or
blocked_on_add or blocked_on_remove or blocking_add or
blocking_remove or merged_into or summary):
'Updated', issue.status))
if not amendments and (not comment or not comment.strip()):
return [], None
comment_pb = self.CreateIssueComment(
cnxn, project_id, issue.local_id, reporter_id, comment,
self.indexer_called = index_now
return amendments, comment_pb
def InvalidateIIDs(self, cnxn, iids_to_invalidate):
# pylint: disable=unused-argument
def CreateIssueComment(
self, _cnxn, project_id, local_id, user_id, content,
inbound_message=None, amendments=None, attachments=None, timestamp=None,
is_spam=False, commit=True):
# Add a comment to an issue
issue = self.issues_by_project[project_id][local_id]
comment = tracker_pb2.IssueComment() = len(self.comments_by_cid)
comment.project_id = project_id
comment.issue_id = issue.issue_id
comment.content = content
comment.user_id = user_id
if timestamp is not None:
comment.timestamp = timestamp
comment.timestamp = 1234567890
if amendments:
if inbound_message:
comment.inbound_message = inbound_message
pid = project_id
self.comments_by_project.setdefault(pid, {})
self.comments_by_project[pid].setdefault(local_id, []).append(comment)
self.comments_by_iid.setdefault(issue.issue_id, []).append(comment)
self.comments_by_cid[] = comment
if attachments:
for filename, filecontent, mimetype in attachments:
aid = len(self.attachments_by_id)
attach = comment.attachments_add(
blobkey='blob(%s)' % filename)
self.attachments_by_id[aid] = attach, pid,
return comment
def GetOpenAndClosedIssues(self, _cnxn, issue_ids):
open_issues = []
closed_issues = []
for issue_id in issue_ids:
issue = self.issues_by_iid[issue_id]
if issue.status == 'Fixed':
except KeyError:
return open_issues, closed_issues
def GetIssuesDict(
self, _cnxn, issue_ids, use_cache=True, shard_id=None):
return {iid: self.issues_by_iid[iid] for iid in issue_ids}
def GetIssues(self, _cnxn, issue_ids, use_cache=True, shard_id=None):
results = [self.issues_by_iid[issue_id] for issue_id in issue_ids
if issue_id in self.issues_by_iid]
return results
def SoftDeleteIssue(
self, _cnxn, project_id, local_id, deleted, user_service):
issue = self.issues_by_project[project_id][local_id]
issue.deleted = deleted
def SoftDeleteComment(
self, cnxn, project_id, local_id, sequence_num,
deleted_by_user_id, user_service, delete=True, reindex=True,
issue = self.GetIssueByLocalID(cnxn, project_id, local_id)
comments = self.GetCommentsForIssue(cnxn, issue.issue_id)
if not comments:
raise Exception(
'No comments for issue, project, seq (%s, %s, %s), cannot delete'
% (local_id, project_id, sequence_num))
if len(comments) < sequence_num:
raise Exception(
'Attempting to delete comment %s only %s comments created' %
(sequence_num, len(comments)))
comments[sequence_num].is_spam = is_spam
if delete:
comments[sequence_num].deleted_by = deleted_by_user_id
def DeleteComponentReferences(self, _cnxn, component_id):
for _, issue in self.issues_by_iid.iteritems():
issue.component_ids = [
cid for cid in issue.component_ids if cid != component_id]
def RunIssueQuery(
self, cnxn, left_joins, where, order_by, shard_id=None, limit=None):
"""This always returns empty results. Mock it to test other cases."""
return [], False
def GetIIDsByLabelIDs(self, cnxn, label_ids, project_id, shard_id):
"""This always returns empty results. Mock it to test other cases."""
return []
def GetIIDsByParticipant(self, cnxn, user_ids, project_ids, shard_id):
"""This always returns empty results. Mock it to test other cases."""
return []
def MoveIssues(self, cnxn, dest_project, issues, user_service):
move_to = dest_project.project_id
self.issues_by_project.setdefault(move_to, {})
for issue in issues:
project_id = issue.project_id
issue.local_id = self.AllocateNextLocalID(cnxn, move_to)
self.issues_by_project[move_to][issue.local_id] = issue
issue.project_id = move_to
return []
class SpamService(object):
"""Fake version of SpamService that just works in-RAM."""
def __init__(self, user_id=None):
self.user_id = user_id
self.reports_by_issue_id = collections.defaultdict(list)
self.comment_reports_by_issue_id = collections.defaultdict(dict)
self.manual_verdicts_by_issue_id = collections.defaultdict(dict)
self.manual_verdicts_by_comment_id = collections.defaultdict(dict)
def FlagIssues(self, cnxn, issue_service, issues, user_id, flagged_spam):
for issue in issues:
if flagged_spam:
def FlagComment(self, cnxn, issue_id, comment_id, reported_user_id, user_id,
if not comment_id in self.comment_reports_by_issue_id[issue_id]:
self.comment_reports_by_issue_id[issue_id][comment_id] = []
if flagged_spam:
def RecordManualIssueVerdicts(
self, cnxn, issue_service, issues, user_id, is_spam):
for issue in issues:
self.manual_verdicts_by_issue_id[issue.issue_id][user_id] = is_spam
def RecordManualCommentVerdict(
self, cnxn, issue_service, user_service, comment_id,
sequnce_num, user_id, is_spam):
self.manual_verdicts_by_comment_id[comment_id][user_id] = is_spam
def RecordClassifierIssueVerdict(self, cnxn, issue, is_spam, confidence):
def RecordClassifierCommentVerdict(self, cnxn, issue, is_spam, confidence):
def ClassifyComment(self, comment):
return {'outputLabel': 'ham',
'outputMulti': [{'label': 'ham', 'score': '1.0'}]}
def ClassifyIssue(self, issue, firstComment):
return {'outputLabel': 'ham',
'outputMulti': [{'label': 'ham', 'score': '1.0'}]}
class FeaturesService(object):
"""A fake implementation of FeaturesService."""
def __init__(self):
# Test-only sequence of expunged projects.
self.expunged_saved_queries = []
self.expunged_filter_rules = []
self.expunged_quick_edit = []
def ExpungeSavedQueriesExecuteInProject(self, _cnxn, project_id):
def ExpungeFilterRules(self, _cnxn, project_id):
def ExpungeQuickEditHistory(self, _cnxn, project_id):
def GetFilterRules(self, cnxn, project_id):
return []
def GetCannedQueriesByProjectID(self, cnxn, project_id):
return []
def UpdateCannedQueries(self, cnxn, project_id, canned_queries):
def GetSubscriptionsInProjects(self, cnxn, project_ids):
return {}
def GetSavedQuery(self, cnxn, query_id):
return tracker_pb2.SavedQuery()
class PostData(object):
"""A dictionary-like object that also implements getall()."""
def __init__(self, *args, **kwargs):
self.dictionary = dict(*args, **kwargs)
def getall(self, key):
"""Return all values, assume that the value at key is already a list."""
return self.dictionary.get(key, [])
def get(self, key, default=None):
"""Return first value, assume that the value at key is already a list."""
return self.dictionary.get(key, [default])[0]
def __getitem__(self, key):
"""Return first value, assume that the value at key is already a list."""
return self.dictionary[key][0]
def __contains__(self, key):
return key in self.dictionary
def keys(self):
"""Return the keys in the POST data."""
return self.dictionary.keys()
class FakeFile:
def __init__(self, data=None): = data
def read(self):
def write(self, content):
def __enter__(self):
return self
def __exit__(self, __1, __2, __3):
return None
def gcs_open(filename, mode):
return FakeFile(filename)