blob: dd93bf745d0ea5faa529b79a38468b63af0ae8d8 [file] [log] [blame]
# Copyright 2017 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd
"""Tests for the WorkEnv class."""
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import copy
import logging
import sys
import unittest
import mock
from google.appengine.api import memcache
from google.appengine.ext import testbed
import settings
from businesslogic import work_env
from features import filterrules_helpers
from framework import authdata
from framework import exceptions
from framework import framework_constants
from framework import framework_helpers
from framework import framework_views
from framework import permissions
from framework import sorting
from features import send_notifications
from proto import features_pb2
from proto import project_pb2
from proto import tracker_pb2
from proto import user_pb2
from services import config_svc
from services import features_svc
from services import issue_svc
from services import project_svc
from services import user_svc
from services import usergroup_svc
from services import service_manager
from services import spam_svc
from services import star_svc
from services import template_svc
from testing import fake
from testing import testing_helpers
from tracker import tracker_bizobj
from tracker import tracker_constants
def _Issue(project_id, local_id):
# TODO(crbug.com/monorail/8124): Many parts of monorail's codebase
# assumes issue.owner_id could never be None and that issues without
# owners have owner_id = 0.
issue = tracker_pb2.Issue(owner_id=0)
issue.project_name = 'proj-%d' % project_id
issue.project_id = project_id
issue.local_id = local_id
issue.issue_id = project_id*100 + local_id
return issue
class WorkEnvTest(unittest.TestCase):
def setUp(self):
self.cnxn = 'fake connection'
self.services = service_manager.Services(
config=fake.ConfigService(),
cache_manager=fake.CacheManager(),
issue=fake.IssueService(),
user=fake.UserService(),
project=fake.ProjectService(),
issue_star=fake.IssueStarService(),
project_star=fake.ProjectStarService(),
user_star=fake.UserStarService(),
hotlist_star=fake.HotlistStarService(),
features=fake.FeaturesService(),
usergroup=fake.UserGroupService(),
template=mock.Mock(spec=template_svc.TemplateService),
spam=fake.SpamService())
self.project = self.services.project.TestAddProject(
'proj', project_id=789, committer_ids=[111])
self.component_id_1 = self.services.config.CreateComponentDef(
self.cnxn, self.project.project_id, 'Component', 'Docstring', False, [],
[], 0, 111, [])
self.component_id_2 = self.services.config.CreateComponentDef(
self.cnxn, self.project.project_id, 'Component>Test', 'Docstring',
False, [], [], 0, 111, [])
config = fake.MakeTestConfig(self.project.project_id, [], [])
config.well_known_statuses = [
tracker_pb2.StatusDef(status='Fixed', means_open=False)
]
self.services.config.StoreConfig(self.cnxn, config)
self.admin_user = self.services.user.TestAddUser(
'admin@example.com', 444)
self.admin_user.is_site_admin = True
self.user_1 = self.services.user.TestAddUser('user_111@example.com', 111)
self.user_2 = self.services.user.TestAddUser('user_222@example.com', 222)
self.user_3 = self.services.user.TestAddUser('user_333@example.com', 333)
self.hotlist = self.services.features.TestAddHotlist(
'myhotlist', summary='old sum', owner_ids=[self.user_1.user_id],
editor_ids=[self.user_2.user_id], description='old desc',
is_private=True)
# reserved for testing that a hotlist does not exist
self.dne_hotlist_id = 1234
self.mr = testing_helpers.MakeMonorailRequest(project=self.project)
self.mr.perms = permissions.READ_ONLY_PERMISSIONSET
self.field_def_1_name = 'test_field_1'
self.field_def_1 = fake.MakeTestFieldDef(
101, self.project.project_id, tracker_pb2.FieldTypes.INT_TYPE,
field_name=self.field_def_1_name, max_value=10)
self.services.config.TestAddFieldDef(self.field_def_1)
self.PAST_TIME = 12345
self.dne_project_id = 999
sorting.InitializeArtValues(self.services)
self.work_env = work_env.WorkEnv(
self.mr, self.services, 'Testing phase')
def SignIn(self, user_id=111):
self.mr.auth = authdata.AuthData.FromUserID(
self.cnxn, user_id, self.services)
self.mr.perms = permissions.GetPermissions(
self.mr.auth.user_pb, self.mr.auth.effective_ids, self.project)
def testAssertUserCanModifyIssues_Empty(self):
with self.work_env as we:
we._AssertUserCanModifyIssues([], True)
def testAssertUserCanModifyIssues_RestrictedFields(self):
restricted_int_fd = fake.MakeTestFieldDef(
1, 789, tracker_pb2.FieldTypes.INT_TYPE,
field_name='int_field', is_restricted_field=True)
self.services.config.TestAddFieldDef(restricted_int_fd)
restricted_enum_fd = fake.MakeTestFieldDef(
2, 789, tracker_pb2.FieldTypes.ENUM_TYPE,
field_name='enum_field',
is_restricted_field=True)
self.services.config.TestAddFieldDef(restricted_enum_fd)
issue = fake.MakeTestIssue(
789, 1, 'summary', 'Available', self.admin_user.user_id)
self.services.issue.TestAddIssue(issue)
delta = tracker_pb2.IssueDelta(
summary='changing summary',
fields_clear=[restricted_int_fd.field_id],
labels_remove=['enum_field-test'])
issue_delta_pairs = [(issue, delta)]
self.SignIn(user_id=self.user_1.user_id)
with self.assertRaisesRegexp(permissions.PermissionException,
r'.+int_field\n.+enum_field'):
with self.work_env as we:
we._AssertUserCanModifyIssues(issue_delta_pairs, True)
# Add user_1 as an editor
restricted_int_fd.editor_ids = [self.user_1.user_id]
restricted_enum_fd.editor_ids = [self.user_1.user_id]
with self.work_env as we:
we._AssertUserCanModifyIssues(issue_delta_pairs, True)
def testAssertUserCanModifyIssues_HasEditPerms(self):
issue = fake.MakeTestIssue(
789, 1, 'summary', 'Available', self.admin_user.user_id)
self.services.issue.TestAddIssue(issue)
delta = tracker_pb2.IssueDelta(summary='changing summary', cc_ids_add=[111])
issue_delta_pairs = [(issue, delta)]
# Committer can edit issues.
self.SignIn(user_id=self.user_1.user_id)
with self.work_env as we:
we._AssertUserCanModifyIssues(
issue_delta_pairs, True, comment_content='ping')
def testAssertUserCanModifyIssues_MergedInto(self):
issue = fake.MakeTestIssue(
789, 1, 'summary', 'Available', self.admin_user.user_id)
self.services.issue.TestAddIssue(issue)
restricted_issue = fake.MakeTestIssue(
789, 2, 'summary', 'Aavailable', self.admin_user.user_id,
labels=['Restrict-View-Chicken'])
self.services.issue.TestAddIssue(restricted_issue)
issue_delta_pairs = [
(issue, tracker_pb2.IssueDelta(merged_into=restricted_issue.issue_id))
]
# Committer cannot merge into issue they cannot edit.
self.SignIn(user_id=self.user_1.user_id)
with self.assertRaises(permissions.PermissionException):
with self.work_env as we:
we._AssertUserCanModifyIssues(
issue_delta_pairs, True, comment_content='ping')
def testAssertUserCanModifyIssues_HasFineGrainedPerms(self):
self.services.project.TestAddProject(
'projWithExtraPerms',
project_id=788,
contrib_ids=[self.user_1.user_id],
extra_perms=[
project_pb2.Project.ExtraPerms(
member_id=self.user_1.user_id,
perms=[
permissions.ADD_ISSUE_COMMENT,
permissions.EDIT_ISSUE_SUMMARY, permissions.EDIT_ISSUE_OWNER
])
])
error_messages_re = []
# user_1 can update issue summaries in the project.
issue_1 = fake.MakeTestIssue(
788, 1, 'summary', 'Available', self.admin_user.user_id,
project_name='farm')
self.services.issue.TestAddIssue(issue_1)
issue_delta_pairs = [(issue_1, tracker_pb2.IssueDelta(summary='bok bok'))]
# user_1 does not have EDIT_ISSUE_CC perms in project.
error_messages_re.append(r'.+changes to issue farm:2')
issue_2 = fake.MakeTestIssue(
788, 2, 'summary', 'Fixed', self.admin_user.user_id,
project_name='farm')
self.services.issue.TestAddIssue(issue_2)
issue_delta_pairs.append(
(issue_2, tracker_pb2.IssueDelta(cc_ids_add=[777])))
# user_1 does not have EDIT_ISSUE_STATUS perms in project.
error_messages_re.append(r'.+changes to issue farm:3')
issue_3 = fake.MakeTestIssue(
788, 3, 'summary', 'Fixed', self.admin_user.user_id,
project_name='farm')
self.services.issue.TestAddIssue(issue_3)
issue_delta_pairs.append(
(issue_3, tracker_pb2.IssueDelta(status='eggsHatching')))
# user_1 can update issue owners in the project.
issue_4 = fake.MakeTestIssue(
788, 4, 'summary', 'Fixed', self.admin_user.user_id,
project_name='farm')
self.services.issue.TestAddIssue(issue_3)
issue_delta_pairs.append(
(issue_4, tracker_pb2.IssueDelta(owner_id=self.user_2.user_id)))
self.SignIn(user_id=self.user_1.user_id)
with self.assertRaisesRegexp(permissions.PermissionException,
'\n'.join(error_messages_re)):
with self.work_env as we:
we._AssertUserCanModifyIssues(
issue_delta_pairs, False, comment_content='ping')
def testAssertUserCanModifyIssues_IssueGrantedPerms(self):
"""We properly take issue granted permissions into account."""
granting_fd = tracker_pb2.FieldDef(
field_name='grants_editissue',
field_id=1,
field_type=tracker_pb2.FieldTypes.USER_TYPE,
grants_perm='editissue')
config = fake.MakeTestConfig(789, [], [])
config.field_defs = [granting_fd]
self.services.config.StoreConfig('cnxn', config)
# we add user_2 to "grants_editissue" field which should grant them
# "EditIssue" in this issue.
issue = fake.MakeTestIssue(
789, 1, 'summary', 'Available', self.admin_user.user_id,
field_values=[
tracker_pb2.FieldValue(field_id=1, user_id=self.user_2.user_id)
])
self.services.issue.TestAddIssue(issue)
issue_delta_pairs = [
(issue, tracker_pb2.IssueDelta(summary='changing summary'))
]
self.SignIn(user_id=self.user_2.user_id)
with self.work_env as we:
we._AssertUserCanModifyIssues(issue_delta_pairs, False)
self.SignIn(user_id=self.user_3.user_id)
with self.assertRaises(permissions.PermissionException):
with self.work_env as we:
we._AssertUserCanModifyIssues(issue_delta_pairs, False)
# FUTURE: GetSiteReadOnlyState()
# FUTURE: SetSiteReadOnlyState()
# FUTURE: GetSiteBannerMessage()
# FUTURE: SetSiteBannerMessage()
def testCreateProject_Normal(self):
"""We can create a project."""
self.SignIn(user_id=self.admin_user.user_id)
with self.work_env as we:
project_id = we.CreateProject(
'newproj', [111], [222], [333], 'summary', 'desc')
actual = we.GetProject(project_id)
self.assertEqual('summary', actual.summary)
self.assertEqual('desc', actual.description)
self.services.template.CreateDefaultProjectTemplates\
.assert_called_once_with(self.mr.cnxn, project_id)
def testCreateProject_AlreadyExists(self):
"""We can create a project."""
self.SignIn(user_id=self.admin_user.user_id)
# Project 'proj' is created in setUp().
with self.assertRaises(exceptions.ProjectAlreadyExists):
with self.work_env as we:
we.CreateProject('proj', [111], [222], [333], 'summary', 'desc')
self.assertFalse(
self.services.template.CreateDefaultProjectTemplates.called)
def testCreateProject_NotAllowed(self):
"""A user without permissions cannon create a project."""
self.SignIn()
with self.assertRaises(permissions.PermissionException):
with self.work_env as we:
we.CreateProject('proj', [111], [222], [333], 'summary', 'desc')
self.assertFalse(
self.services.template.CreateDefaultProjectTemplates.called)
def testCheckProjectName_OK(self):
"""We can check a project name."""
self.SignIn(user_id=self.admin_user.user_id)
with self.work_env as we:
self.assertIsNone(we.CheckProjectName('foo'))
def testCheckProjectName_InvalidProjectName(self):
"""We can check an invalid project name."""
self.SignIn(user_id=self.admin_user.user_id)
with self.work_env as we:
self.assertIsNotNone(we.CheckProjectName('Foo'))
def testCheckProjectName_AlreadyExists(self):
"""There is already a project with that name."""
self.SignIn(user_id=self.admin_user.user_id)
with self.work_env as we:
self.assertIsNotNone(we.CheckProjectName('proj'))
def testCheckProjectName_NotAllowed(self):
"""Users that can't create a project shouldn't get any information."""
self.SignIn()
with self.assertRaises(permissions.PermissionException):
with self.work_env as we:
we.CheckProjectName('Foo')
def testCheckComponentName_OK(self):
self.SignIn()
with self.work_env as we:
self.assertIsNone(we.CheckComponentName(
self.project.project_id, None, 'Component'))
def testCheckComponentName_ParentComponentOK(self):
self.services.config.CreateComponentDef(
self.cnxn, self.project.project_id, 'Component', 'Docstring',
False, [], [], 0, 111, [])
self.SignIn()
with self.work_env as we:
self.assertIsNone(we.CheckComponentName(
self.project.project_id, 'Component', 'SubComponent'))
def testCheckComponentName_InvalidComponentName(self):
self.SignIn()
with self.work_env as we:
self.assertIsNotNone(we.CheckComponentName(
self.project.project_id, None, 'Component>Foo'))
def testCheckComponentName_ComponentAlreadyExists(self):
self.services.config.CreateComponentDef(
self.cnxn, self.project.project_id, 'Component', 'Docstring',
False, [], [], 0, 111, [])
self.SignIn()
with self.work_env as we:
self.assertIsNotNone(we.CheckComponentName(
self.project.project_id, None, 'Component'))
def testCheckComponentName_NotAllowedToViewProject(self):
self.project.access = project_pb2.ProjectAccess.MEMBERS_ONLY
self.SignIn(333)
with self.assertRaises(permissions.PermissionException):
with self.work_env as we:
we.CheckComponentName(self.project.project_id, None, 'Component')
def testCheckComponentName_ParentComponentDoesntExist(self):
self.SignIn()
with self.assertRaises(exceptions.NoSuchComponentException):
with self.work_env as we:
we.CheckComponentName(
self.project.project_id, 'Component', 'SubComponent')
def testCheckFieldName_OK(self):
self.SignIn()
with self.work_env as we:
self.assertIsNone(we.CheckFieldName(
self.project.project_id, 'Field'))
def testCheckFieldName_InvalidFieldName(self):
self.SignIn()
with self.work_env as we:
self.assertIsNotNone(we.CheckFieldName(
self.project.project_id, '**Field**'))
def testCheckFieldName_FieldAlreadyExists(self):
fd = fake.MakeTestFieldDef(
1, self.project.project_id, tracker_pb2.FieldTypes.STR_TYPE,
field_name='Field')
self.services.config.TestAddFieldDef(fd)
self.SignIn()
with self.work_env as we:
self.assertIsNotNone(we.CheckFieldName(
self.project.project_id, 'Field'))
def testCheckFieldName_FieldIsPrefixOfAnother(self):
fd = fake.MakeTestFieldDef(
1, self.project.project_id, tracker_pb2.FieldTypes.STR_TYPE,
field_name='Field-Foo')
self.services.config.TestAddFieldDef(fd)
self.SignIn()
with self.work_env as we:
self.assertIsNotNone(we.CheckFieldName(
self.project.project_id, 'Field'))
def testCheckFieldName_AnotherFieldIsPrefix(self):
fd = fake.MakeTestFieldDef(
1, self.project.project_id, tracker_pb2.FieldTypes.STR_TYPE,
field_name='Field')
self.services.config.TestAddFieldDef(fd)
self.SignIn()
with self.work_env as we:
self.assertIsNotNone(we.CheckFieldName(
self.project.project_id, 'Field-Foo'))
def testCheckFieldName_ReservedPrefix(self):
self.SignIn()
with self.work_env as we:
self.assertIsNotNone(we.CheckFieldName(
self.project.project_id, 'Summary'))
def testCheckFieldName_ReservedSuffix(self):
self.SignIn()
with self.work_env as we:
self.assertIsNotNone(we.CheckFieldName(
self.project.project_id, 'Chicken-ApproveR'))
def testCheckFieldName_NotAllowedToViewProject(self):
self.project.access = project_pb2.ProjectAccess.MEMBERS_ONLY
self.SignIn(user_id=333)
with self.assertRaises(permissions.PermissionException):
with self.work_env as we:
we.CheckFieldName(self.project.project_id, 'Field')
def testListProjects(self):
"""We can get the project IDs of projects visible to the current user."""
# Project 789 is created in setUp()
self.services.project.TestAddProject(
'proj2', project_id=2, access=project_pb2.ProjectAccess.MEMBERS_ONLY)
self.services.project.TestAddProject('proj3', project_id=3)
with self.work_env as we:
actual = we.ListProjects()
self.assertEqual([3, 789], actual)
@mock.patch('settings.branded_domains',
{'proj3': 'branded.com', '*': 'bugs.chromium.org'})
def testListProjects_BrandedDomain_NotLive(self):
"""Branded domains don't affect localhost and demo servers."""
# Project 789 is created in setUp()
self.services.project.TestAddProject(
'proj2', project_id=2, access=project_pb2.ProjectAccess.MEMBERS_ONLY)
self.services.project.TestAddProject('proj3', project_id=3)
with self.work_env as we:
actual = we.ListProjects(domain='localhost:8080')
self.assertEqual([3, 789], actual)
actual = we.ListProjects(domain='app-id.appspot.com')
self.assertEqual([3, 789], actual)
@mock.patch('settings.branded_domains',
{'proj3': 'branded.com', '*': 'bugs.chromium.org'})
def testListProjects_BrandedDomain_LiveSite(self):
"""Project list only contains projects on the current branded domain."""
# Project 789 is created in setUp()
self.services.project.TestAddProject(
'proj2', project_id=2, access=project_pb2.ProjectAccess.MEMBERS_ONLY)
self.services.project.TestAddProject('proj3', project_id=3)
with self.work_env as we:
actual = we.ListProjects(domain='branded.com')
self.assertEqual([3], actual)
actual = we.ListProjects(domain='bugs.chromium.org')
self.assertEqual([789], actual)
def testGetProject_Normal(self):
"""We can get an existing project by project_id."""
with self.work_env as we:
actual = we.GetProject(789)
self.assertEqual(self.project, actual)
def testGetProject_NoSuchProject(self):
"""We reject attempts to get a non-existent project."""
with self.assertRaises(exceptions.NoSuchProjectException):
with self.work_env as we:
_actual = we.GetProject(999)
def testGetProject_NotAllowed(self):
"""We reject attempts to get a project we don't have permission to."""
self.project.access = project_pb2.ProjectAccess.MEMBERS_ONLY
with self.assertRaises(permissions.PermissionException):
with self.work_env as we:
_actual = we.GetProject(789)
def testGetProjectByName_Normal(self):
"""We can get an existing project by project_name."""
with self.work_env as we:
actual = we.GetProjectByName('proj')
self.assertEqual(self.project, actual)
def testGetProjectByName_NoSuchProject(self):
"""We reject attempts to get a non-existent project."""
with self.assertRaises(exceptions.NoSuchProjectException):
with self.work_env as we:
_actual = we.GetProjectByName('huh-what')
def testGetProjectByName_NoPermission(self):
"""We reject attempts to get a project we don't have permissions to."""
self.project.access = project_pb2.ProjectAccess.MEMBERS_ONLY
with self.assertRaises(permissions.PermissionException):
with self.work_env as we:
_actual = we.GetProjectByName('proj')
def AddUserProjects(self):
project_states = {
'live': project_pb2.ProjectState.LIVE,
'archived': project_pb2.ProjectState.ARCHIVED,
'deletable': project_pb2.ProjectState.DELETABLE}
projects = {}
for name, state in project_states.items():
projects['owner-'+name] = self.services.project.TestAddProject(
'owner-' + name, state=state, owner_ids=[222])
projects['committer-'+name] = self.services.project.TestAddProject(
'committer-' + name, state=state, committer_ids=[222])
projects['contributor-'+name] = self.services.project.TestAddProject(
'contributor-' + name, state=state)
projects['contributor-'+name].contributor_ids = [222]
projects['members-only'] = self.services.project.TestAddProject(
'members-only', owner_ids=[222])
projects['members-only'].access = (
project_pb2.ProjectAccess.MEMBERS_ONLY)
return projects
def testGatherProjectMembershipsForUser_OtherUser(self):
"""We can get the projects in which a user has a role.
Member only projects are hidden."""
projects = self.AddUserProjects()
with self.work_env as we:
owner, committer, contrib = we.GatherProjectMembershipsForUser(222)
self.assertEqual([projects['owner-live'].project_id], owner)
self.assertEqual([projects['committer-live'].project_id], committer)
self.assertEqual([projects['contributor-live'].project_id], contrib)
def testGatherProjectMembershipsForUser_OwnUser(self):
"""We can get the projects in which the logged in user has a role. """
projects = self.AddUserProjects()
self.SignIn(user_id=222)
with self.work_env as we:
owner, committer, contrib = we.GatherProjectMembershipsForUser(222)
self.assertEqual(
[
projects['members-only'].project_id,
projects['owner-live'].project_id
], owner)
self.assertEqual([projects['committer-live'].project_id], committer)
self.assertEqual([projects['contributor-live'].project_id], contrib)
def testGatherProjectMembershipsForUser_Admin(self):
"""Admins can see all project roles another user has. """
projects = self.AddUserProjects()
self.SignIn(user_id=444)
with self.work_env as we:
owner, committer, contrib = we.GatherProjectMembershipsForUser(222)
self.assertEqual(
[
projects['members-only'].project_id,
projects['owner-live'].project_id
], owner)
self.assertEqual([projects['committer-live'].project_id], committer)
self.assertEqual([projects['contributor-live'].project_id], contrib)
def testGetUserRolesInAllProjects_OtherUsers(self):
"""We can get the projects in which the user has a role."""
projects = self.AddUserProjects()
with self.work_env as we:
owner, member, contrib = we.GetUserRolesInAllProjects({222})
by_name = lambda project: project.project_name
self.assertEqual(
[projects['owner-live']],
sorted(list(owner.values()), key=by_name))
self.assertEqual(
[projects['committer-live']],
sorted(list(member.values()), key=by_name))
self.assertEqual(
[projects['contributor-live']],
sorted(list(contrib.values()), key=by_name))
def testGetUserRolesInAllProjects_OwnUser(self):
"""We can get the projects in which the user has a role."""
projects = self.AddUserProjects()
self.SignIn(user_id=222)
with self.work_env as we:
owner, member, contrib = we.GetUserRolesInAllProjects({222})
by_name = lambda project: project.project_name
self.assertEqual(
[projects['members-only'], projects['owner-archived'],
projects['owner-live']],
sorted(list(owner.values()), key=by_name))
self.assertEqual(
[projects['committer-archived'], projects['committer-live']],
sorted(list(member.values()), key=by_name))
self.assertEqual(
[projects['contributor-archived'], projects['contributor-live']],
sorted(list(contrib.values()), key=by_name))
def testGetUserRolesInAllProjects_Admin(self):
"""We can get the projects in which the user has a role."""
projects = self.AddUserProjects()
self.SignIn(user_id=444)
with self.work_env as we:
owner, member, contrib = we.GetUserRolesInAllProjects({222})
by_name = lambda project: project.project_name
self.assertEqual(
[projects['members-only'], projects['owner-archived'],
projects['owner-deletable'], projects['owner-live']],
sorted(list(owner.values()), key=by_name))
self.assertEqual(
[projects['committer-archived'], projects['committer-deletable'],
projects['committer-live']],
sorted(list(member.values()), key=by_name))
self.assertEqual(
[projects['contributor-archived'], projects['contributor-deletable'],
projects['contributor-live']],
sorted(list(contrib.values()), key=by_name))
def testGetUserProjects_OnlyLiveOfOtherUsers(self):
"""Regular users should only see live projects of other users."""
projects = self.AddUserProjects()
self.SignIn()
with self.work_env as we:
owner, archived, member, contrib = we.GetUserProjects({222})
self.assertEqual([projects['owner-live']], owner)
self.assertEqual([], archived)
self.assertEqual([projects['committer-live']], member)
self.assertEqual([projects['contributor-live']], contrib)
def testGetUserProjects_AdminSeesAll(self):
"""Admins should see all projects from other users."""
projects = self.AddUserProjects()
self.SignIn(user_id=444)
with self.work_env as we:
owner, archived, member, contrib = we.GetUserProjects({222})
self.assertEqual([projects['members-only'], projects['owner-live']], owner)
self.assertEqual([projects['owner-archived']], archived)
self.assertEqual([projects['committer-live']], member)
self.assertEqual([projects['contributor-live']], contrib)
def testGetUserProjects_UserSeesOwnProjects(self):
"""Users should see all own projects."""
projects = self.AddUserProjects()
self.SignIn(user_id=222)
with self.work_env as we:
owner, archived, member, contrib = we.GetUserProjects({222})
self.assertEqual([projects['members-only'], projects['owner-live']], owner)
self.assertEqual([projects['owner-archived']], archived)
self.assertEqual([projects['committer-live']], member)
self.assertEqual([projects['contributor-live']], contrib)
def testUpdateProject_Normal(self):
"""We can update an existing project."""
self.SignIn(user_id=self.admin_user.user_id)
with self.work_env as we:
we.UpdateProject(789, read_only_reason='test reason')
project = we.GetProject(789)
self.assertEqual('test reason', project.read_only_reason)
def testUpdateProject_NoSuchProject(self):
"""Updating a nonexistent project raises an exception."""
self.SignIn(user_id=self.admin_user.user_id)
with self.assertRaises(exceptions.NoSuchProjectException):
with self.work_env as we:
we.UpdateProject(999, summary='new summary')
def testDeleteProject_Normal(self):
"""We can mark an existing project as deletable."""
self.SignIn(user_id=self.admin_user.user_id)
with self.work_env as we:
we.DeleteProject(789)
self.assertEqual(project_pb2.ProjectState.DELETABLE, self.project.state)
def testDeleteProject_NoSuchProject(self):
"""Changing a nonexistent project raises an exception."""
self.SignIn(user_id=self.admin_user.user_id)
with self.assertRaises(exceptions.NoSuchProjectException):
with self.work_env as we:
we.DeleteProject(999)
def testStarProject_Normal(self):
"""We can star and unstar a project."""
self.SignIn()
with self.work_env as we:
self.assertFalse(we.IsProjectStarred(789))
we.StarProject(789, True)
self.assertTrue(we.IsProjectStarred(789))
we.StarProject(789, False)
self.assertFalse(we.IsProjectStarred(789))
def testStarProject_NoSuchProject(self):
"""We can't star a nonexistent project."""
self.SignIn()
with self.assertRaises(exceptions.NoSuchProjectException):
with self.work_env as we:
we.StarProject(999, True)
def testStarProject_Anon(self):
"""Anon user can't star a project."""
with self.assertRaises(permissions.PermissionException):
with self.work_env as we:
we.StarProject(789, True)
def testIsProjectStarred_Normal(self):
"""We can check if a project is starred."""
# Tested by method testStarProject_Normal().
pass
def testIsProjectStarred_NoProjectSpecified(self):
"""A project ID must be specified."""
with self.work_env as we:
with self.assertRaises(exceptions.InputException):
self.assertFalse(we.IsProjectStarred(None))
def testIsProjectStarred_NoSuchProject(self):
"""We can't check for stars on a nonexistent project."""
self.SignIn()
with self.assertRaises(exceptions.NoSuchProjectException):
with self.work_env as we:
we.IsProjectStarred(999)
def testGetProjectStarCount_Normal(self):
"""We can count the stars of a project."""
self.SignIn()
with self.work_env as we:
self.assertEqual(0, we.GetProjectStarCount(789))
we.StarProject(789, True)
self.assertEqual(1, we.GetProjectStarCount(789))
self.SignIn(user_id=self.admin_user.user_id)
with self.work_env as we:
we.StarProject(789, True)
self.assertEqual(2, we.GetProjectStarCount(789))
we.StarProject(789, False)
self.assertEqual(1, we.GetProjectStarCount(789))
def testGetProjectStarCount_NoSuchProject(self):
"""We can't count stars of a nonexistent project."""
self.SignIn()
with self.assertRaises(exceptions.NoSuchProjectException):
with self.work_env as we:
we.GetProjectStarCount(999)
def testGetProjectStarCount_NoProjectSpecified(self):
"""A project ID must be specified."""
with self.work_env as we:
with self.assertRaises(exceptions.InputException):
self.assertFalse(we.GetProjectStarCount(None))
def testListStarredProjects_ViewingSelf(self):
"""A user can view their own starred projects, if they still have access."""
project1 = self.services.project.TestAddProject('proj1', project_id=1)
project2 = self.services.project.TestAddProject('proj2', project_id=2)
with self.work_env as we:
self.SignIn()
we.StarProject(project1.project_id, True)
we.StarProject(project2.project_id, True)
self.assertItemsEqual(
[project1, project2], we.ListStarredProjects())
project2.access = project_pb2.ProjectAccess.MEMBERS_ONLY
self.assertItemsEqual(
[project1], we.ListStarredProjects())
def testListStarredProjects_ViewingOther(self):
"""A user can view their own starred projects, if they still have access."""
project1 = self.services.project.TestAddProject('proj1', project_id=1)
project2 = self.services.project.TestAddProject('proj2', project_id=2)
with self.work_env as we:
self.SignIn(user_id=222)
we.StarProject(project1.project_id, True)
we.StarProject(project2.project_id, True)
self.SignIn(user_id=111)
self.assertEqual([], we.ListStarredProjects())
self.assertItemsEqual(
[project1, project2], we.ListStarredProjects(viewed_user_id=222))
project2.access = project_pb2.ProjectAccess.MEMBERS_ONLY
self.assertItemsEqual(
[project1], we.ListStarredProjects(viewed_user_id=222))
def testGetProjectConfig_Normal(self):
"""We can get an existing config by project_id."""
config = fake.MakeTestConfig(789, ['LabelOne'], ['New'])
self.services.config.StoreConfig('cnxn', config)
with self.work_env as we:
actual = we.GetProjectConfig(789)
self.assertEqual(config, actual)
def testGetProjectConfig_NoSuchProject(self):
"""We reject attempts to get a non-existent config."""
self.services.config.strict = True
with self.assertRaises(exceptions.NoSuchProjectException):
with self.work_env as we:
_actual = we.GetProjectConfig(self.dne_project_id)
def testListProjectTemplates_IsMember(self):
private_tmpl = tracker_pb2.TemplateDef(name='Chicken', members_only=True)
public_tmpl = tracker_pb2.TemplateDef(name='Kale', members_only=False)
self.services.template.GetProjectTemplates.return_value = [
private_tmpl, public_tmpl]
self.SignIn() # user 111 is a member of self.project
with self.work_env as we:
actual = we.ListProjectTemplates(self.project.project_id)
self.assertEqual(actual, [private_tmpl, public_tmpl])
self.services.template.GetProjectTemplates.assert_called_once_with(
self.mr.cnxn, self.project.project_id)
def testListProjectTemplates_IsNotMember(self):
private_tmpl = tracker_pb2.TemplateDef(name='Chicken', members_only=True)
public_tmpl = tracker_pb2.TemplateDef(name='Kale', members_only=False)
self.services.template.GetProjectTemplates.return_value = [
private_tmpl, public_tmpl]
with self.work_env as we:
actual = we.ListProjectTemplates(self.project.project_id)
self.assertEqual(actual, [public_tmpl])
self.services.template.GetProjectTemplates.assert_called_once_with(
self.mr.cnxn, self.project.project_id)
def testListComponentDefs(self):
project = self.services.project.TestAddProject(
'Greece', owner_ids=[self.user_1.user_id])
config = fake.MakeTestConfig(project.project_id, [], [])
cd_1 = fake.MakeTestComponentDef(project.project_id, 1, path='Circe')
cd_2 = fake.MakeTestComponentDef(project.project_id, 2, path='Achilles')
cd_3 = fake.MakeTestComponentDef(project.project_id, 3, path='Patroclus')
config.component_defs = [cd_1, cd_2, cd_3]
self.services.config.StoreConfig(self.cnxn, config)
self.SignIn(self.user_1.user_id)
with self.work_env as we:
actual = we.ListComponentDefs(project.project_id, 10, 1)
self.assertEqual(actual, work_env.ListResult([cd_2, cd_3], None))
def testListComponentDefs_NotFound(self):
self.SignIn(self.user_2.user_id)
with self.assertRaises(exceptions.NoSuchProjectException):
with self.work_env as we:
we.ListComponentDefs(404, 10, 1)
project = self.services.project.TestAddProject(
'Greece',
owner_ids=[self.user_1.user_id],
access=project_pb2.ProjectAccess.MEMBERS_ONLY)
config = fake.MakeTestConfig(project.project_id, [], [])
cd_1 = fake.MakeTestComponentDef(project.project_id, 1, path='Circe')
config.component_defs = [cd_1]
self.services.config.StoreConfig(self.cnxn, config)
with self.assertRaises(exceptions.NoSuchProjectException):
with self.work_env as we:
we.ListComponentDefs(project.project_id, 10, 1)
def testListComponentDefs_InvalidPaginate(self):
with self.assertRaises(exceptions.InputException):
with self.work_env as we:
we.ListComponentDefs(404, -1, 10)
with self.assertRaises(exceptions.InputException):
with self.work_env as we:
we.ListComponentDefs(404, 1, -10)
@mock.patch('time.time')
def testCreateComponentDef(self, fake_time):
now = 123
fake_time.return_value = now
project = self.services.project.TestAddProject(
'Music', owner_ids=[self.user_1.user_id])
admin = self.services.user.TestAddUser('admin@test.com', 555)
self.SignIn(self.user_1.user_id)
with self.work_env as we:
actual = we.CreateComponentDef(
project.project_id, 'hanggai', 'hamtlag', [admin.user_id],
[self.user_2.user_id], ['taro', 'mowgli'])
self.assertEqual(actual.project_id, project.project_id)
self.assertEqual(actual.path, 'hanggai')
self.assertEqual(actual.docstring, 'hamtlag')
self.assertEqual(actual.admin_ids, [admin.user_id])
self.assertEqual(actual.cc_ids, [222])
self.assertFalse(actual.deprecated)
self.assertEqual(actual.created, now)
self.assertEqual(actual.creator_id, self.user_1.user_id)
self.assertEqual(
actual.label_ids,
self.services.config.LookupLabelIDs(
self.cnxn, project.project_id, ['taro', 'mowgli']))
# Test with ancestor.
self.SignIn(admin.user_id)
with self.work_env as we:
actual = we.CreateComponentDef(
project.project_id, 'hanggai>band', 'rock band',
[self.user_2.user_id], [], [])
self.assertEqual(actual.project_id, project.project_id)
self.assertEqual(actual.path, 'hanggai>band')
self.assertEqual(actual.docstring, 'rock band')
self.assertEqual(actual.admin_ids, [self.user_2.user_id])
self.assertFalse(actual.deprecated)
self.assertEqual(actual.created, now)
self.assertEqual(actual.creator_id, admin.user_id)
def testCreateComponentDef_InvalidUsers(self):
project = self.services.project.TestAddProject(
'Music', owner_ids=[self.user_1.user_id])
self.SignIn(self.user_1.user_id)
with self.assertRaises(exceptions.InputException):
with self.work_env as we:
we.CreateComponentDef(
project.project_id, 'hanggai', 'hamtlag', [404], [404], [])
def testCreateComponentDef_InvalidLeaf(self):
project = self.services.project.TestAddProject(
'Music', owner_ids=[self.user_1.user_id])
self.SignIn(self.user_1.user_id)
with self.assertRaises(exceptions.InputException):
with self.work_env as we:
we.CreateComponentDef(
project.project_id, 'music>hanggai.rockband', 'hamtlag', [], [], [])
def testCreateComponentDef_LeafAlreadyExists(self):
project = self.services.project.TestAddProject(
'Music', owner_ids=[self.user_1.user_id])
self.SignIn(self.user_1.user_id)
with self.work_env as we:
we.CreateComponentDef(
project.project_id, 'mowgli', 'favorite things',
[self.user_1.user_id], [], [])
with self.assertRaises(exceptions.ComponentDefAlreadyExists):
with self.work_env as we:
we.CreateComponentDef(
project.project_id, 'mowgli', 'more favorite things', [], [], [])
# Test components with ancestors are also checked correctly
with self.work_env as we:
we.CreateComponentDef(
project.project_id, 'mowgli>food', 'lots of chicken', [], [], [])
with self.assertRaises(exceptions.ComponentDefAlreadyExists):
with self.work_env as we:
we.CreateComponentDef(
project.project_id, 'mowgli>food', 'lots of salmon', [], [], [])
def testCreateComponentDef_AncestorNotFound(self):
project = self.services.project.TestAddProject(
'Music', owner_ids=[self.user_1.user_id])
self.SignIn(self.user_1.user_id)
with self.assertRaises(exceptions.InputException):
with self.work_env as we:
we.CreateComponentDef(
project.project_id, 'mowgli>chicken', 'more favorite things', [],
[], [])
def testCreateComponentDef_PermissionDenied(self):
project = self.services.project.TestAddProject(
'Music', owner_ids=[self.user_1.user_id])
admin = self.services.user.TestAddUser('admin@test.com', 888)
self.SignIn(self.user_1.user_id)
with self.work_env as we:
we.CreateComponentDef(
project.project_id, 'mowgli', 'favorite things', [admin.user_id], [],
[])
we.CreateComponentDef(
project.project_id, 'mowgli>beef', 'favorite things', [], [], [])
user = self.services.user.TestAddUser('user@test.com', 777)
self.SignIn(user.user_id)
with self.assertRaises(permissions.PermissionException):
with self.work_env as we:
we.CreateComponentDef(
project.project_id, 'bambi', 'spring time', [], [], [])
with self.assertRaises(permissions.PermissionException):
with self.work_env as we:
we.CreateComponentDef(
project.project_id, 'mowgli>chicken', 'more favorite things', [],
[], [])
with self.assertRaises(permissions.PermissionException):
with self.work_env as we:
we.CreateComponentDef(
project.project_id, 'mowgli>beef>rice', 'more favorite things', [],
[], [])
def testDeleteComponentDef(self):
project = self.services.project.TestAddProject(
'Achilles', owner_ids=[self.user_1.user_id])
config = fake.MakeTestConfig(project.project_id, [], [])
component_def = fake.MakeTestComponentDef(
project.project_id, 1, path='Chickens>Dickens')
config.component_defs = [component_def]
self.services.config.StoreConfig(self.cnxn, config)
self.SignIn(self.user_1.user_id)
with self.work_env as we:
we.DeleteComponentDef(project.project_id, component_def.component_id)
self.assertEqual(config.component_defs, [])
def testDeleteComponentDef_NotFound(self):
project = self.services.project.TestAddProject(
'Achilles', owner_ids=[self.user_1.user_id])
self.SignIn(self.user_1.user_id)
with self.assertRaises(exceptions.NoSuchComponentException):
with self.work_env as we:
we.DeleteComponentDef(project.project_id, 404)
def testDeleteComponentDef_CannotViewProject(self):
project = self.services.project.TestAddProject(
'Achilles',
owner_ids=[self.user_1.user_id],
access=project_pb2.ProjectAccess.MEMBERS_ONLY)
self.SignIn(self.user_2.user_id)
with self.assertRaises(permissions.PermissionException):
with self.work_env as we:
we.DeleteComponentDef(project.project_id, 404)
def testDeleteComponentDef_SubcomponentFound(self):
project = self.services.project.TestAddProject(
'Achilles', owner_ids=[self.user_1.user_id])
config = fake.MakeTestConfig(project.project_id, [], [])
dickens_comp = fake.MakeTestComponentDef(
project.project_id, 1, path='Chickens>Dickens')
chickens_comp = fake.MakeTestComponentDef(
project.project_id, 2, path='Chickens')
config.component_defs = [chickens_comp, dickens_comp]
self.services.config.StoreConfig(self.cnxn, config)
self.SignIn(self.user_1.user_id)
with self.assertRaises(exceptions.InputException):
with self.work_env as we:
we.DeleteComponentDef(project.project_id, chickens_comp.component_id)
def testDeleteComponentDef_NonComponentAdminsCannotDelete(self):
admin = self.services.user.TestAddUser('circe@test.com', 888)
user = self.services.user.TestAddUser('patroclus@test.com', 999)
project = self.services.project.TestAddProject(
'Achilles', owner_ids=[self.user_1.user_id])
config = fake.MakeTestConfig(project.project_id, [], [])
dickens_comp = fake.MakeTestComponentDef(
project.project_id,
1,
path='Chickens>Dickens',
)
dickens_comp.admin_ids = [admin.user_id]
chickens_comp = fake.MakeTestComponentDef(
project.project_id, 2, path='Chickens')
config.component_defs = [chickens_comp, dickens_comp]
self.services.config.StoreConfig(self.cnxn, config)
self.SignIn(admin.user_id)
with self.work_env as we:
we.DeleteComponentDef(project.project_id, dickens_comp.component_id)
self.SignIn(user.user_id)
with self.assertRaises(permissions.PermissionException):
with self.work_env as we:
we.DeleteComponentDef(project.project_id, chickens_comp.component_id)
# FUTURE: labels, statuses, components, rules, templates, and views.
# FUTURE: project saved queries.
# FUTURE: GetProjectPermissionsForUser()
### Field methods
# FUTURE: All other field methods.
def testGetFieldDef_Normal(self):
"""We can get an existing fielddef by field_id."""
fd = fake.MakeTestFieldDef(
2, self.project.project_id, tracker_pb2.FieldTypes.STR_TYPE,
field_name='Field')
self.services.config.TestAddFieldDef(fd)
config = self.services.config.GetProjectConfig(self.cnxn, 789)
with self.work_env as we:
actual = we.GetFieldDef(fd.field_id, self.project)
self.assertEqual(config.field_defs[1], actual)
def testGetFieldDef_NoSuchFieldDef(self):
"""We reject attempts to get a non-existent field."""
with self.assertRaises(exceptions.NoSuchFieldDefException):
with self.work_env as we:
_actual = we.GetFieldDef(999, self.project)
@mock.patch(
'features.send_notifications.PrepareAndSendIssueBlockingNotification')
@mock.patch(
'features.send_notifications.PrepareAndSendIssueChangeNotification')
def testCreateIssue_Normal(self, fake_pasicn, fake_pasibn):
"""We can create an issue."""
self.SignIn(user_id=111)
approval_values = [tracker_pb2.ApprovalValue(approval_id=23, phase_id=3)]
phases = [tracker_pb2.Phase(name='Canary', phase_id=3)]
with self.work_env as we:
actual_issue, comment = we.CreateIssue(
789,
'sum',
'New',
111, [333], ['Hot'], [], [],
'desc',
phases=phases,
approval_values=approval_values)
self.assertEqual(789, actual_issue.project_id)
self.assertEqual('sum', actual_issue.summary)
self.assertEqual('New', actual_issue.status)
self.assertEqual(111, actual_issue.reporter_id)
self.assertEqual(111, actual_issue.owner_id)
self.assertEqual([333], actual_issue.cc_ids)
self.assertEqual([], actual_issue.field_values)
self.assertEqual([], actual_issue.component_ids)
self.assertEqual(approval_values, actual_issue.approval_values)
self.assertEqual(phases, actual_issue.phases)
self.assertEqual('desc', comment.content)
loaded_comments = self.services.issue.GetCommentsForIssue(
self.cnxn, actual_issue.issue_id)
self.assertEqual('desc', loaded_comments[0].content)
# Verify that an indexing task was enqueued for this issue:
self.assertTrue(self.services.issue.enqueue_issues_called)
self.assertEqual(1, len(self.services.issue.enqueued_issues))
self.assertEqual(actual_issue.issue_id,
self.services.issue.enqueued_issues[0])
# Verify that tasks were queued to send email notifications.
hostport = 'testing-app.appspot.com'
fake_pasicn.assert_called_once_with(
actual_issue.issue_id, hostport, 111, comment_id=comment.id)
fake_pasibn.assert_called_once_with(
actual_issue.issue_id, hostport, [], 111)
@mock.patch(
'settings.preferred_domains', {'testing-app.appspot.com': 'example.com'})
@mock.patch(
'settings.branded_domains', {'proj': 'branded.com'})
@mock.patch(
'features.send_notifications.PrepareAndSendIssueBlockingNotification')
@mock.patch(
'features.send_notifications.PrepareAndSendIssueChangeNotification')
def testCreateIssue_Branded(self, fake_pasicn, fake_pasibn):
"""Use branded domains in notification about creating an issue."""
self.SignIn(user_id=111)
with self.work_env as we:
actual_issue, comment = we.CreateIssue(
789, 'sum', 'New', 111, [333], ['Hot'], [], [], 'desc')
self.assertEqual('proj', actual_issue.project_name)
# Verify that tasks were queued to send email notifications.
hostport = 'branded.com'
fake_pasicn.assert_called_once_with(
actual_issue.issue_id, hostport, 111, comment_id=comment.id)
fake_pasibn.assert_called_once_with(
actual_issue.issue_id, hostport, [], 111)
@mock.patch(
'settings.preferred_domains', {'testing-app.appspot.com': 'example.com'})
@mock.patch(
'settings.branded_domains', {'other-proj': 'branded.com'})
@mock.patch(
'features.send_notifications.PrepareAndSendIssueBlockingNotification')
@mock.patch(
'features.send_notifications.PrepareAndSendIssueChangeNotification')
def testCreateIssue_Nonbranded(self, fake_pasicn, fake_pasibn):
"""Don't use branded domains when creating issue in different project."""
self.SignIn(user_id=111)
with self.work_env as we:
actual_issue, comment = we.CreateIssue(
789, 'sum', 'New', 111, [333], ['Hot'], [], [], 'desc')
self.assertEqual('proj', actual_issue.project_name)
# Verify that tasks were queued to send email notifications.
hostport = 'example.com'
fake_pasicn.assert_called_once_with(
actual_issue.issue_id, hostport, 111, comment_id=comment.id)
fake_pasibn.assert_called_once_with(
actual_issue.issue_id, hostport, [], 111)
@mock.patch(
'features.send_notifications.PrepareAndSendIssueBlockingNotification')
@mock.patch(
'features.send_notifications.PrepareAndSendIssueChangeNotification')
def testCreateIssue_DontSendEmail(self, fake_pasicn, fake_pasibn):
"""We can create an issue, without queueing notification tasks."""
self.SignIn(user_id=111)
with self.work_env as we:
actual_issue, comment = we.CreateIssue(
789,
'sum',
'New',
111, [333], ['Hot'], [], [],
'desc',
send_email=False)
self.assertEqual(789, actual_issue.project_id)
self.assertEqual('sum', actual_issue.summary)
self.assertEqual('New', actual_issue.status)
self.assertEqual('desc', comment.content)
# Verify that tasks were not queued to send email notifications.
self.assertEqual([], fake_pasicn.mock_calls)
self.assertEqual([], fake_pasibn.mock_calls)
@mock.patch(
'features.send_notifications.PrepareAndSendIssueBlockingNotification')
@mock.patch(
'features.send_notifications.PrepareAndSendIssueChangeNotification')
def testCreateIssue_ImportedIssue_Allowed(self, _fake_pasicn, _fake_pasibn):
"""We can create an imported issue, if the requester has permission."""
PAST_TIME = 123456
self.project.extra_perms = [project_pb2.Project.ExtraPerms(
member_id=111, perms=['ImportComment'])]
self.SignIn(user_id=111)
with self.work_env as we:
actual_issue, comment = we.CreateIssue(
789,
'sum',
'New',
111, [333], ['Hot'], [], [],
'desc',
send_email=False,
reporter_id=222,
timestamp=PAST_TIME)
self.assertEqual(789, actual_issue.project_id)
self.assertEqual('sum', actual_issue.summary)
self.assertEqual(222, actual_issue.reporter_id)
self.assertEqual(PAST_TIME, actual_issue.opened_timestamp)
self.assertEqual(222, comment.user_id)
self.assertEqual(111, comment.importer_id)
self.assertEqual(PAST_TIME, comment.timestamp)
@mock.patch(
'features.send_notifications.PrepareAndSendIssueBlockingNotification')
@mock.patch(
'features.send_notifications.PrepareAndSendIssueChangeNotification')
def testCreateIssue_ImportedIssue_Denied(self, _fake_pasicn, _fake_pasibn):
"""We can refuse to import an issue, if requester lacks permission."""
PAST_TIME = 123456
# Note: no "ImportComment" permission is granted.
self.SignIn(user_id=111)
with self.assertRaises(permissions.PermissionException):
with self.work_env as we:
we.CreateIssue(
789, 'sum', 'New', 222, [333], ['Hot'], [], [], 'desc',
send_email=False, reporter_id=222, timestamp=PAST_TIME)
@mock.patch(
'features.send_notifications.PrepareAndSendIssueBlockingNotification')
@mock.patch(
'features.send_notifications.PrepareAndSendIssueChangeNotification')
def testCreateIssue_OnwerValidation(self, _fake_pasicn, _fake_pasibn):
"""We validate the owner."""
self.SignIn(user_id=111)
with self.assertRaisesRegexp(exceptions.InputException,
'Issue owner must be a project member'):
with self.work_env as we:
# user_id 222 is not a project member
we.CreateIssue(789, 'sum', 'New', 222, [333], ['Hot'], [], [], 'desc')
@mock.patch(
'features.send_notifications.PrepareAndSendIssueBlockingNotification')
@mock.patch(
'features.send_notifications.PrepareAndSendIssueChangeNotification')
def testCreateIssue_SummaryValidation(self, _fake_pasicn, _fake_pasibn):
"""We validate the summary."""
self.SignIn(user_id=111)
with self.assertRaises(exceptions.InputException):
with self.work_env as we:
# Summary cannot be empty
we.CreateIssue(789, '', 'New', 111, [333], ['Hot'], [], [], 'desc')
with self.assertRaises(exceptions.InputException):
with self.work_env as we:
# Summary cannot be only spaces
we.CreateIssue(789, ' ', 'New', 111, [333], ['Hot'], [], [], 'desc')
@mock.patch(
'features.send_notifications.PrepareAndSendIssueBlockingNotification')
@mock.patch(
'features.send_notifications.PrepareAndSendIssueChangeNotification')
def testCreateIssue_DescriptionValidation(self, _fake_pasicn, _fake_pasibn):
"""We validate the description."""
self.SignIn(user_id=111)
with self.assertRaises(exceptions.InputException):
with self.work_env as we:
# Description cannot be empty
we.CreateIssue(789, 'sum', 'New', 111, [333], ['Hot'], [], [], '')
with self.assertRaises(exceptions.InputException):
with self.work_env as we:
# Description cannot be only spaces
we.CreateIssue(789, 'sum', 'New', 111, [333], ['Hot'], [], [], ' ')
@mock.patch(
'features.send_notifications.PrepareAndSendIssueBlockingNotification')
@mock.patch(
'features.send_notifications.PrepareAndSendIssueChangeNotification')
def testCreateIssue_FieldValueValidation(self, _fake_pasicn, _fake_pasibn):
"""We validate field values against field definitions."""
self.SignIn(user_id=111)
# field_def_1 has a max of 10.
fv = fake.MakeFieldValue(field_id=self.field_def_1.field_id, int_value=11)
with self.assertRaises(exceptions.InputException):
with self.work_env as we:
we.CreateIssue(789, 'sum', 'New', 111, [], [], [fv], [], '')
@mock.patch(
'features.send_notifications.PrepareAndSendIssueBlockingNotification')
@mock.patch(
'features.send_notifications.PrepareAndSendIssueChangeNotification')
def testCreateIssue_AppliesFilterRules(self, _fake_pasicn, _fake_pasibn):
"""We apply filter rules."""
self.services.features.TestAddFilterRule(
789, '-has:component', add_labels=['no-component'])
self.SignIn(user_id=111)
with self.work_env as we:
actual_issue, _ = we.CreateIssue(
789, 'sum', 'New', 111, [333], [], [], [], 'desc')
self.assertEqual(len(actual_issue.derived_labels), 1)
self.assertEqual(actual_issue.derived_labels[0], 'no-component')
@mock.patch(
'features.send_notifications.PrepareAndSendIssueBlockingNotification')
@mock.patch(
'features.send_notifications.PrepareAndSendIssueChangeNotification')
def testCreateIssue_RaiseFilterErrors(self, _fake_pasicn, _fake_pasibn):
"""We raise FilterRuleException if filter rule should show error."""
self.services.features.TestAddFilterRule(789, '-has:component', error='er')
PAST_TIME = 123456
self.SignIn(user_id=111)
with self.assertRaises(exceptions.FilterRuleException):
with self.work_env as we:
we.CreateIssue(
789,
'sum',
'New',
111, [], [], [], [],
'desc',
send_email=False,
timestamp=PAST_TIME)
@mock.patch(
'features.send_notifications.PrepareAndSendIssueBlockingNotification')
@mock.patch(
'features.send_notifications.PrepareAndSendIssueChangeNotification')
def testCreateIssue_IgnoresFilterErrors(self, _fake_pasicn, _fake_pasibn):
"""We can apply filter rules and ignore resulting errors."""
self.services.features.TestAddFilterRule(789, '-has:component', error='er')
self.SignIn(user_id=111)
with self.work_env as we:
actual_issue, _ = we.CreateIssue(
789,
'sum',
'New',
111, [], [], [], [],
'desc',
send_email=False,
raise_filter_errors=False)
self.assertEqual(len(actual_issue.component_ids), 0)
def testMakeIssueFromDelta(self):
# TODO(crbug/monorail/7197): implement tests
pass
@mock.patch(
'features.send_notifications.PrepareAndSendIssueBlockingNotification')
@mock.patch(
'features.send_notifications.PrepareAndSendIssueChangeNotification')
def testMakeIssue_Normal(self, _fake_pasicn, _fake_pasibn):
self.SignIn(user_id=111)
fd_id = self.services.config.CreateFieldDef(
self.cnxn,
self.project.project_id,
'Restricted-Foo',
'STR_TYPE',
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None, [], [111],
is_restricted_field=True)
input_fv = tracker_pb2.FieldValue(field_id=fd_id, str_value='Bar')
input_issue = tracker_pb2.Issue(
project_id=789,
owner_id=111,
summary='sum',
status='New',
field_values=[input_fv])
attachments = [
('README.md', 'readme content', 'text/plain'),
('hello.txt', 'hello content', 'text/plain')]
with self.work_env as we:
actual_issue = we.MakeIssue(
input_issue, 'description', False, attachments)
self.assertEqual(actual_issue.project_id, 789)
self.assertEqual(actual_issue.summary, 'sum')
self.assertEqual(actual_issue.status, 'New')
self.assertEqual(actual_issue.reporter_id, 111)
self.assertEqual(actual_issue.field_values, [input_fv])
self.assertEqual(2, actual_issue.attachment_count)
@mock.patch(
'features.send_notifications.PrepareAndSendIssueBlockingNotification')
@mock.patch(
'features.send_notifications.PrepareAndSendIssueChangeNotification')
def testMakeIssue_ChecksRestrictedFields(self, _fake_pasicn, _fake_pasibn):
self.SignIn(user_id=222)
fd_id = self.services.config.CreateFieldDef(
self.cnxn,
self.project.project_id,
'Restricted-Foo',
'STR_TYPE',
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None, [], [111],
is_restricted_field=True)
input_fv = tracker_pb2.FieldValue(field_id=fd_id, str_value='Bar')
input_issue = tracker_pb2.Issue(
project_id=789, summary='sum', status='New', field_values=[input_fv])
with self.assertRaises(permissions.PermissionException):
with self.work_env as we:
we.MakeIssue(input_issue, 'description', False)
@mock.patch(
'features.send_notifications.PrepareAndSendIssueBlockingNotification')
@mock.patch(
'features.send_notifications.PrepareAndSendIssueChangeNotification')
def testMakeIssue_ChecksRestrictedLabels(self, _fake_pasicn, _fake_pasibn):
"""Also checks restricted field that are masked as labels."""
self.SignIn(user_id=222)
self.services.config.CreateFieldDef(
self.cnxn,
self.project.project_id,
'Rfoo',
'ENUM_TYPE',
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None, [], [111],
is_restricted_field=True)
input_issue = tracker_pb2.Issue(
project_id=789, summary='sum', status='New', labels=['Rfoo-bar'])
with self.assertRaises(permissions.PermissionException):
with self.work_env as we:
we.MakeIssue(input_issue, 'description', False)
@mock.patch('services.tracker_fulltext.IndexIssues')
@mock.patch('services.tracker_fulltext.UnindexIssues')
def testMoveIssue_Normal(self, mock_unindex, mock_index):
"""We can move issues."""
issue = fake.MakeTestIssue(789, 1, 'sum', 'New', 111, issue_id=78901)
self.services.issue.TestAddIssue(issue)
self.project.owner_ids = [111]
target_project = self.services.project.TestAddProject(
'dest', project_id=988, committer_ids=[111])
self.SignIn(user_id=111)
with self.work_env as we:
moved_issue = we.MoveIssue(issue, target_project)
self.assertEqual(moved_issue.project_name, 'dest')
self.assertEqual(moved_issue.local_id, 1)
moved_issue = self.services.issue.GetIssueByLocalID(
'cnxn', target_project.project_id, 1)
self.assertEqual(target_project.project_id, moved_issue.project_id)
self.assertEqual(issue.summary, moved_issue.summary)
self.assertEqual(moved_issue.reporter_id, 111)
mock_unindex.assert_called_once_with([issue.issue_id])
mock_index.assert_called_once_with(
self.mr.cnxn, [issue], self.services.user, self.services.issue,
self.services.config)
@mock.patch('services.tracker_fulltext.IndexIssues')
@mock.patch('services.tracker_fulltext.UnindexIssues')
def testMoveIssue_MoveBackAgain(self, _mock_unindex, _mock_index):
"""We can move issues backt and get the old id."""
issue = fake.MakeTestIssue(789, 1, 'sum', 'New', 111, issue_id=78901)
issue.project_name = 'proj'
self.services.issue.TestAddIssue(issue)
self.project.owner_ids = [111]
target_project = self.services.project.TestAddProject(
'dest', project_id=988, owner_ids=[111])
self.SignIn(user_id=111)
with self.work_env as we:
moved_issue = we.MoveIssue(issue, target_project)
moved_issue = we.MoveIssue(moved_issue, self.project)
self.assertEqual(moved_issue.project_name, 'proj')
self.assertEqual(moved_issue.local_id, 1)
moved_issue = self.services.issue.GetIssueByLocalID(
'cnxn', self.project.project_id, 1)
self.assertEqual(self.project.project_id, moved_issue.project_id)
comments = self.services.issue.GetCommentsForIssue('cnxn', issue.issue_id)
self.assertEqual(
comments[1].content, 'Moved issue proj:1 to now be issue dest:1.')
self.assertEqual(
comments[2].content, 'Moved issue dest:1 back to issue proj:1 again.')
def testMoveIssue_Anon(self):
"""Anon can't move issues."""
issue = fake.MakeTestIssue(789, 1, 'sum', 'New', 111, issue_id=78901)
self.services.issue.TestAddIssue(issue)
target_project = self.services.project.TestAddProject(
'dest', project_id=988)
with self.assertRaises(permissions.PermissionException):
with self.work_env as we:
we.MoveIssue(issue, target_project)
def testMoveIssue_CantDeleteIssue(self):
"""We can't move issues if we don't have DeleteIssue perm on the issue."""
issue = fake.MakeTestIssue(789, 1, 'sum', 'New', 111, issue_id=78901)
self.services.issue.TestAddIssue(issue)
target_project = self.services.project.TestAddProject(
'dest', project_id=988, committer_ids=[111])
self.SignIn(user_id=111)
with self.assertRaises(permissions.PermissionException):
with self.work_env as we:
we.MoveIssue(issue, target_project)
def testMoveIssue_CantEditIssueOnTargetProject(self):
"""We can't move issues if we don't have EditIssue perm on target."""
issue = fake.MakeTestIssue(789, 1, 'sum', 'New', 111, issue_id=78901)
self.services.issue.TestAddIssue(issue)
self.project.owner_ids = [111]
target_project = self.services.project.TestAddProject(
'dest', project_id=989)
self.SignIn(user_id=111)
with self.assertRaises(permissions.PermissionException):
with self.work_env as we:
we.MoveIssue(issue, target_project)
def testMoveIssue_CantRestrictions(self):
"""We can't move issues if they have restriction labels."""
issue = fake.MakeTestIssue(789, 1, 'sum', 'New', 111, issue_id=78901)
issue.labels = ['Restrict-Foo-Bar']
self.services.issue.TestAddIssue(issue)
self.project.owner_ids = [111]
target_project = self.services.project.TestAddProject(
'dest', project_id=989, committer_ids=[111])
self.SignIn(user_id=111)
with self.assertRaises(exceptions.InputException):
with self.work_env as we:
we.MoveIssue(issue, target_project)
def testMoveIssue_TooLongIssue(self):
"""We can't move issues if the comment is too long."""
issue = fake.MakeTestIssue(789, 1, 'sum', 'New', 111, issue_id=78901)
self.services.issue.TestAddIssue(issue)
target_project = self.services.project.TestAddProject(
'dest', project_id=988, committer_ids=[111])
self.SignIn(user_id=111)
with self.assertRaises(permissions.PermissionException):
with self.work_env as we:
we.MoveIssue(issue, target_project)
@mock.patch('services.tracker_fulltext.IndexIssues')
def testCopyIssue_Normal(self, mock_index):
"""We can copy issues."""
issue = fake.MakeTestIssue(
789, 1, 'sum', 'New', 111, issue_id=78901, project_name='proj')
self.services.issue.TestAddIssue(issue)
self.project.owner_ids = [111]
target_project = self.services.project.TestAddProject(
'dest', project_id=988, committer_ids=[111])
self.SignIn(user_id=111)
with self.work_env as we:
copied_issue = we.CopyIssue(issue, target_project)
self.assertEqual(copied_issue.project_name, 'dest')
self.assertEqual(copied_issue.local_id, 1)
# Original issue should still exist.
self.services.issue.GetIssueByLocalID('cnxn', 789, 1)
copied_issue = self.services.issue.GetIssueByLocalID(
'cnxn', target_project.project_id, 1)
self.assertEqual(target_project.project_id, copied_issue.project_id)
self.assertEqual(issue.summary, copied_issue.summary)
self.assertEqual(copied_issue.reporter_id, 111)
mock_index.assert_called_once_with(
self.mr.cnxn, [copied_issue], self.services.user, self.services.issue,
self.services.config)
comment = self.services.issue.GetCommentsForIssue(
'cnxn', copied_issue.issue_id)[-1]
self.assertEqual(1, len(comment.amendments))
amendment = comment.amendments[0]
self.assertEqual(
tracker_pb2.Amendment(
field=tracker_pb2.FieldID.PROJECT,
newvalue='dest',
added_user_ids=[],
removed_user_ids=[]),
amendment)
@mock.patch('services.tracker_fulltext.IndexIssues')
def testCopyIssue_SameProject(self, mock_index):
"""We can copy issues."""
issue = fake.MakeTestIssue(
789, 1, 'sum', 'New', 111, issue_id=78901, project_name='proj')
self.services.issue.TestAddIssue(issue)
self.project.owner_ids = [111]
target_project = self.project
self.SignIn(user_id=111)
with self.work_env as we:
copied_issue = we.CopyIssue(issue, target_project)
self.assertEqual(copied_issue.project_name, 'proj')
self.assertEqual(copied_issue.local_id, 2)
# Original issue should still exist.
self.services.issue.GetIssueByLocalID('cnxn', 789, 1)
copied_issue = self.services.issue.GetIssueByLocalID(
'cnxn', target_project.project_id, 2)
self.assertEqual(target_project.project_id, copied_issue.project_id)
self.assertEqual(issue.summary, copied_issue.summary)
self.assertEqual(copied_issue.reporter_id, 111)
mock_index.assert_called_once_with(
self.mr.cnxn, [copied_issue], self.services.user, self.services.issue,
self.services.config)
comment = self.services.issue.GetCommentsForIssue(
'cnxn', copied_issue.issue_id)[-1]
self.assertEqual(0, len(comment.amendments))
def testCopyIssue_Anon(self):
"""Anon can't copy issues."""
issue = fake.MakeTestIssue(789, 1, 'sum', 'New', 111, issue_id=78901)
self.services.issue.TestAddIssue(issue)
target_project = self.services.project.TestAddProject(
'dest', project_id=988)
with self.assertRaises(permissions.PermissionException):
with self.work_env as we:
we.CopyIssue(issue, target_project)
def testCopyIssue_CantDeleteIssue(self):
"""We can't copy issues if we don't have DeleteIssue perm on the issue."""
issue = fake.MakeTestIssue(789, 1, 'sum', 'New', 111, issue_id=78901)
self.services.issue.TestAddIssue(issue)
target_project = self.services.project.TestAddProject(
'dest', project_id=988, committer_ids=[111])
self.SignIn(user_id=111)
with self.assertRaises(permissions.PermissionException):
with self.work_env as we:
we.CopyIssue(issue, target_project)
def testCopyIssue_CantEditIssueOnTargetProject(self):
"""We can't copy issues if we don't have EditIssue perm on target."""
issue = fake.MakeTestIssue(789, 1, 'sum', 'New', 111, issue_id=78901)
self.services.issue.TestAddIssue(issue)
self.project.owner_ids = [111]
target_project = self.services.project.TestAddProject(
'dest', project_id=989)
self.SignIn(user_id=111)
with self.assertRaises(permissions.PermissionException):
with self.work_env as we:
we.CopyIssue(issue, target_project)
def testCopyIssue_CantRestrictions(self):
"""We can't copy issues if they have restriction labels."""
issue = fake.MakeTestIssue(789, 1, 'sum', 'New', 111, issue_id=78901)
issue.labels = ['Restrict-Foo-Bar']
self.services.issue.TestAddIssue(issue)
self.project.owner_ids = [111]
target_project = self.services.project.TestAddProject(
'dest', project_id=989, committer_ids=[111])
self.SignIn(user_id=111)
with self.assertRaises(exceptions.InputException):
with self.work_env as we:
we.CopyIssue(issue, target_project)
@mock.patch('search.frontendsearchpipeline.FrontendSearchPipeline')
def testSearchIssues(self, mocked_pipeline):
mocked_instance = mocked_pipeline.return_value
mocked_instance.total_count = 10
mocked_instance.visible_results = ['a', 'b']
with self.work_env as we:
actual = we.SearchIssues('', ['proj'], 123, 20, 0, '')
expected = work_env.ListResult(['a', 'b'], None)
self.assertEqual(actual, expected)
@mock.patch('search.frontendsearchpipeline.FrontendSearchPipeline')
def testSearchIssues_paginates(self, mocked_pipeline):
mocked_instance = mocked_pipeline.return_value
mocked_instance.total_count = 50
mocked_instance.visible_results = ['a', 'b']
with self.work_env as we:
actual = we.SearchIssues('', ['proj'], 123, 20, 0, '')
expected = work_env.ListResult(['a', 'b'], 20)
self.assertEqual(actual, expected)
@mock.patch('search.frontendsearchpipeline.FrontendSearchPipeline')
def testSearchIssues_NoSuchProject(self, mocked_pipeline):
mocked_instance = mocked_pipeline.return_value
mocked_instance.total_count = 10
mocked_instance.visible_results = ['a', 'b']
with self.assertRaises(exceptions.NoSuchProjectException):
with self.work_env as we:
we.SearchIssues('', ['chicken'], 123, 20, 0, '')
@mock.patch('search.frontendsearchpipeline.FrontendSearchPipeline')
def testListIssues_Normal(self, mocked_pipeline):
"""We can do a query that generates some results."""
mocked_instance = mocked_pipeline.return_value
with self.work_env as we:
actual = we.ListIssues('', ['a'], 123, 20, 0, 1, '', '', True)
self.assertEqual(actual, mocked_instance)
mocked_instance.SearchForIIDs.assert_called_once()
mocked_instance.MergeAndSortIssues.assert_called_once()
mocked_instance.Paginate.assert_called_once()
def testListIssues_Error(self):
"""Errors are safely reported."""
pass # TODO(jrobbins): add unit test
def testFindIssuePositionInSearch_Normal(self):
"""We can find an issue position for the flipper."""
pass # TODO(jrobbins): add unit test
def testFindIssuePositionInSearch_Error(self):
"""Errors are safely reported."""
pass # TODO(jrobbins): add unit test
def testGetIssuesDict_Normal(self):
"""We can get an existing issue by issue_id."""
issue_1 = fake.MakeTestIssue(789, 1, 'sum', 'New', 111, issue_id=78901)
self.services.issue.TestAddIssue(issue_1)
issue_2 = fake.MakeTestIssue(789, 2, 'sum', 'New', 111, issue_id=78902)
self.services.issue.TestAddIssue(issue_2)
with self.work_env as we:
actual = we.GetIssuesDict([78901, 78902])
self.assertEqual({78901: issue_1, 78902: issue_2}, actual)
def testGetIssuesDict_NoPermission(self):
"""We reject attempts to get issues the user cannot view."""
issue_1 = fake.MakeTestIssue(789, 1, 'sum', 'New', 111, issue_id=78901)
issue_1.labels = ['Restrict-View-CoreTeam']
issue_1.project_name = 'farm-proj'
self.services.issue.TestAddIssue(issue_1)
issue_2 = fake.MakeTestIssue(789, 2, 'sum', 'New', 111, issue_id=78902)
self.services.issue.TestAddIssue(issue_2)
issue_3 = fake.MakeTestIssue(789, 3, 'sum', 'New', 111, issue_id=78903)
issue_3.labels = ['Restrict-View-CoreTeam']
issue_3.project_name = 'farm-proj'
self.services.issue.TestAddIssue(issue_3)
with self.assertRaisesRegexp(
permissions.PermissionException,
'User is not allowed to view issue: farm-proj:1.\n' +
'User is not allowed to view issue: farm-proj:3.'):
with self.work_env as we:
we.GetIssuesDict([78901, 78902, 78903])
def testGetIssuesDict_NoSuchIssue(self):
"""We reject attempts to get a non-existent issue."""
issue_1 = fake.MakeTestIssue(789, 1, 'sum', 'New', 111, issue_id=78901)
self.services.issue.TestAddIssue(issue_1)
with self.assertRaisesRegexp(exceptions.NoSuchIssueException,
'No such issue: 78902\nNo such issue: 78903'):
with self.work_env as we:
_actual = we.GetIssuesDict([78901, 78902, 78903])
def testGetIssue_Normal(self):
"""We can get an existing issue by issue_id."""
issue = fake.MakeTestIssue(789, 1, 'sum', 'New', 111, issue_id=78901)
self.services.issue.TestAddIssue(issue)
with self.work_env as we:
actual = we.GetIssue(78901)
self.assertEqual(issue, actual)
def testGetIssue_NoPermission(self):
"""We reject attempts to get an issue we don't have permission for."""
issue = fake.MakeTestIssue(789, 1, 'sum', 'New', 111, issue_id=78901)
issue.labels = ['Restrict-View-CoreTeam']
self.services.issue.TestAddIssue(issue)
# We should get a permission exception
with self.assertRaises(permissions.PermissionException):
with self.work_env as we:
_actual = we.GetIssue(78901)
# ...unless we have permission to see the issue
self.SignIn(user_id=self.admin_user.user_id)
with self.work_env as we:
actual = we.GetIssue(78901)
self.assertEqual(issue, actual)
def testGetIssue_NoneIssue(self):
"""We reject attempts to get a none issue."""
with self.assertRaises(exceptions.InputException):
with self.work_env as we:
_actual = we.GetIssue(None)
def testGetIssue_NoSuchIssue(self):
"""We reject attempts to get a non-existent issue."""
with self.assertRaises(exceptions.NoSuchIssueException):
with self.work_env as we:
_actual = we.GetIssue(78901)
def testListReferencedIssues(self):
"""We return only existing or visible issues even w/out project names."""
ref_tuples = [
(None, 1), ('other-proj', 1), ('proj', 99),
('ghost-proj', 1), ('proj', 42), ('other-proj', 1)]
issue = fake.MakeTestIssue(789, 1, 'sum', 'New', 111, issue_id=78901)
self.services.issue.TestAddIssue(issue)
private = fake.MakeTestIssue(789, 42, 'sum', 'New', 422, issue_id=78942)
private.labels.append('Restrict-View-CoreTeam')
self.services.issue.TestAddIssue(private)
self.services.project.TestAddProject(
'other-proj', project_id=788)
other_issue = fake.MakeTestIssue(
788, 1, 'sum', 'Fixed', 111, issue_id=78801)
self.services.issue.TestAddIssue(other_issue)
with self.work_env as we:
actual_open, actual_closed = we.ListReferencedIssues(ref_tuples, 'proj')
self.assertEqual([issue], actual_open)
self.assertEqual([other_issue], actual_closed)
def testListReferencedIssues_PreservesOrder(self):
ref_tuples = [('proj', i) for i in range(1, 10)]
# Duplicate some ref_tuples. The result should have no duplicated issues,
# with only the first occurrence being preserved.
ref_tuples += [('proj', 1), ('proj', 5)]
expected_open = [
fake.MakeTestIssue(789, i, 'sum', 'New', 111) for i in range(1, 5)]
expected_closed = [
fake.MakeTestIssue(789, i, 'sum', 'Fixed', 111) for i in range(5, 10)]
for issue in expected_open + expected_closed:
self.services.issue.TestAddIssue(issue)
with self.work_env as we:
actual_open, actual_closed = we.ListReferencedIssues(ref_tuples, 'proj')
self.assertEqual(expected_open, actual_open)
self.assertEqual(expected_closed, actual_closed)
def testGetIssueByLocalID_Normal(self):
"""We can get an existing issue by project_id and local_id."""
issue = fake.MakeTestIssue(789, 1, 'sum', 'New', 111, issue_id=78901)
self.services.issue.TestAddIssue(issue)
with self.work_env as we:
actual = we.GetIssueByLocalID(789, 1)
self.assertEqual(issue, actual)
def testGetIssueByLocalID_ProjectNotSpecified(self):
"""We reject calls with missing information."""
with self.assertRaises(exceptions.InputException):
with self.work_env as we:
_actual = we.GetIssueByLocalID(None, 1)
def testGetIssueByLocalID_IssueNotSpecified(self):
"""We reject calls with missing information."""
with self.assertRaises(exceptions.InputException):
with self.work_env as we:
_actual = we.GetIssueByLocalID(789, None)
def testGetIssueByLocalID_NoSuchIssue(self):
"""We reject attempts to get a non-existent issue."""
with self.assertRaises(exceptions.NoSuchIssueException):
with self.work_env as we:
_actual = we.GetIssueByLocalID(789, 1)
def testGetRelatedIssueRefs_None(self):
"""We handle issues that have no related issues."""
issue = fake.MakeTestIssue(789, 1, 'sum', 'New', 111)
self.services.issue.TestAddIssue(issue)
with self.work_env as we:
actual = we.GetRelatedIssueRefs([issue])
self.assertEqual({}, actual)
def testGetRelatedIssueRefs_Some(self):
"""We can get refs for related issues of a given issue."""
issue = fake.MakeTestIssue(789, 1, 'sum', 'New', 111)
sooner = fake.MakeTestIssue(789, 2, 'sum', 'New', 111, project_name='proj')
later = fake.MakeTestIssue(789, 3, 'sum', 'New', 111, project_name='proj')
better = fake.MakeTestIssue(789, 4, 'sum', 'New', 111, project_name='proj')
issue.blocked_on_iids.append(sooner.issue_id)
issue.blocking_iids.append(later.issue_id)
issue.merged_into = better.issue_id
self.services.issue.TestAddIssue(issue)
self.services.issue.TestAddIssue(sooner)
self.services.issue.TestAddIssue(later)
self.services.issue.TestAddIssue(better)
with self.work_env as we:
actual = we.GetRelatedIssueRefs([issue])
self.assertEqual(
{sooner.issue_id: ('proj', 2),
later.issue_id: ('proj', 3),
better.issue_id: ('proj', 4)},
actual)
def testGetRelatedIssueRefs_MultipleIssues(self):
issue = fake.MakeTestIssue(789, 1, 'sum', 'New', 111)
blocking = fake.MakeTestIssue(
789, 2, 'sum', 'New', 111, project_name='proj')
issue2 = fake.MakeTestIssue(789, 3, 'sum', 'New', 111, project_name='proj')
blocked_on = fake.MakeTestIssue(
789, 4, 'sum', 'New', 111, project_name='proj')
issue3 = fake.MakeTestIssue(789, 5, 'sum', 'New', 111, project_name='proj')
merged_into = fake.MakeTestIssue(
789, 6, 'sum', 'New', 111, project_name='proj')
issue.blocked_on_iids.append(blocked_on.issue_id)
issue2.blocking_iids.append(blocking.issue_id)
issue3.merged_into = merged_into.issue_id
self.services.issue.TestAddIssue(issue)
self.services.issue.TestAddIssue(issue2)
self.services.issue.TestAddIssue(issue3)
self.services.issue.TestAddIssue(blocked_on)
self.services.issue.TestAddIssue(blocking)
self.services.issue.TestAddIssue(merged_into)
with self.work_env as we:
actual = we.GetRelatedIssueRefs([issue, issue2, issue3])
self.assertEqual(
{blocking.issue_id: ('proj', 2),
blocked_on.issue_id: ('proj', 4),
merged_into.issue_id: ('proj', 6)},
actual)
def testGetIssueRefs(self):
issue = fake.MakeTestIssue(789, 1, 'sum', 'New', 111, project_name='proj1')
issue2 = fake.MakeTestIssue(789, 3, 'sum', 'New', 111, project_name='proj')
issue3 = fake.MakeTestIssue(789, 5, 'sum', 'New', 111, project_name='proj')
self.services.issue.TestAddIssue(issue)
self.services.issue.TestAddIssue(issue2)
self.services.issue.TestAddIssue(issue3)
with self.work_env as we:
actual = we.GetIssueRefs(
[issue.issue_id, issue2.issue_id, issue3.issue_id])
self.assertEqual(
{issue.issue_id: ('proj1', 1),
issue2.issue_id: ('proj', 3),
issue3.issue_id: ('proj', 5)},
actual)
@mock.patch('businesslogic.work_env.WorkEnv.UpdateIssueApproval')
def testBulkUpdateIssueApprovals(self, mockUpdateIssueApproval):
updated_issues = [78901, 78902]
def side_effect(issue_id, *_args, **_kwargs):
if issue_id in [78903]:
raise permissions.PermissionException
if issue_id in [78904, 78905]:
raise exceptions.NoSuchIssueApprovalException
mockUpdateIssueApproval.side_effect = side_effect
self.SignIn()
approval_delta = tracker_pb2.ApprovalDelta()
issue_ids = self.work_env.BulkUpdateIssueApprovals(
[78901, 78902, 78903, 78904, 78905], 24, self.project, approval_delta,
'comment', send_email=True)
self.assertEqual(issue_ids, updated_issues)
updateIssueApprovalCalls = [
mock.call(
78901, 24, approval_delta, 'comment', False, send_email=False),
mock.call(
78902, 24, approval_delta, 'comment', False, send_email=False),
mock.call(
78903, 24, approval_delta, 'comment', False, send_email=False),
mock.call(
78904, 24, approval_delta, 'comment', False, send_email=False),
mock.call(
78905, 24, approval_delta, 'comment', False, send_email=False),
]
self.assertEqual(
mockUpdateIssueApproval.call_count, len(updateIssueApprovalCalls))
mockUpdateIssueApproval.assert_has_calls(updateIssueApprovalCalls)
def testBulkUpdateIssueApprovals_AnonUser(self):
approval_delta = tracker_pb2.ApprovalDelta()
with self.assertRaises(permissions.PermissionException):
self.work_env.BulkUpdateIssueApprovals(
[], 24, self.project, approval_delta,
'comment', send_email=True)
def testBulkUpdateIssueApprovals_UserLacksViewPerms(self):
approval_delta = tracker_pb2.ApprovalDelta()
self.SignIn(222)
self.project.access = project_pb2.ProjectAccess.MEMBERS_ONLY
with self.assertRaises(permissions.PermissionException):
self.work_env.BulkUpdateIssueApprovals(
[], 24, self.project, approval_delta,
'comment', send_email=True)
@mock.patch('businesslogic.work_env.WorkEnv.UpdateIssueApproval')
def testBulkUpdateIssueApprovalsV3(self, mockUpdateIssueApproval):
def side_effect(issue_id, approval_id, *_args, **_kwargs):
return (
tracker_pb2.ApprovalValue(approval_id=approval_id),
tracker_pb2.IssueComment(issue_id=issue_id),
tracker_pb2.Issue(issue_id=issue_id))
mockUpdateIssueApproval.side_effect = side_effect
self.SignIn()
approval_delta = tracker_pb2.ApprovalDelta()
approval_delta_2 = tracker_pb2.ApprovalDelta(approver_ids_add=[111])
deltas_by_issue = [
(78901, 1, approval_delta),
(78901, 1, approval_delta),
(78901, 2, approval_delta),
(78901, 2, approval_delta_2),
(78902, 24, approval_delta),
]
updated_approval_values = self.work_env.BulkUpdateIssueApprovalsV3(
deltas_by_issue, 'xyz', send_email=True)
expected = []
for iid, aid, _delta in deltas_by_issue:
issue_approval_value_pair = (
tracker_pb2.Issue(issue_id=iid),
tracker_pb2.ApprovalValue(approval_id=aid))
expected.append(issue_approval_value_pair)
self.assertEqual(updated_approval_values, expected)
updateIssueApprovalCalls = []
for iid, aid, delta in deltas_by_issue:
mock_call = mock.call(
iid, aid, delta, 'xyz', False, send_email=True, update_perms=True)
updateIssueApprovalCalls.append(mock_call)
self.assertEqual(mockUpdateIssueApproval.call_count, len(deltas_by_issue))
mockUpdateIssueApproval.assert_has_calls(updateIssueApprovalCalls)
@mock.patch('businesslogic.work_env.WorkEnv.UpdateIssueApproval')
def testBulkUpdateIssueApprovalsV3_PermError(self, mockUpdateIssueApproval):
mockUpdateIssueApproval.side_effect = mock.Mock(
side_effect=permissions.PermissionException())
approval_delta = tracker_pb2.ApprovalDelta()
deltas_by_issue = [(78901, 1, approval_delta)]
with self.assertRaises(permissions.PermissionException):
self.work_env.BulkUpdateIssueApprovalsV3(
deltas_by_issue, 'comment', send_email=True)
@mock.patch('businesslogic.work_env.WorkEnv.UpdateIssueApproval')
def testBulkUpdateIssueApprovalsV3_NotFound(self, mockUpdateIssueApproval):
mockUpdateIssueApproval.side_effect = mock.Mock(
side_effect=exceptions.NoSuchIssueApprovalException())
approval_delta = tracker_pb2.ApprovalDelta()
deltas_by_issue = [(78901, 1, approval_delta)]
with self.assertRaises(exceptions.NoSuchIssueApprovalException):
self.work_env.BulkUpdateIssueApprovalsV3(
deltas_by_issue, 'comment', send_email=True)
def testBulkUpdateIssueApprovalsV3_UserLacksViewPerms(self):
self.SignIn(222)
self.project.access = project_pb2.ProjectAccess.MEMBERS_ONLY
# No exception raised in v3. Permissions checked in UpdateIssueApprovals.
self.work_env.BulkUpdateIssueApprovalsV3([], 'comment', send_email=True)
@mock.patch(
'features.send_notifications.PrepareAndSendApprovalChangeNotification')
def testUpdateIssueApproval(self, _mockPrepareAndSend):
"""We can update an issue's approval_value."""
self.services.issue.DeltaUpdateIssueApproval = mock.Mock()
self.SignIn()
config = fake.MakeTestConfig(789, [], [])
self.services.config.StoreConfig('cnxn', config)
av_24 = tracker_pb2.ApprovalValue(
approval_id=24, approver_ids=[111],
status=tracker_pb2.ApprovalStatus.NOT_SET, set_on=1234, setter_id=999)
issue = fake.MakeTestIssue(789, 1, 'summary', 'Available', 111,
issue_id=78901, approval_values=[av_24])
self.services.issue.TestAddIssue(issue)
delta = tracker_pb2.ApprovalDelta(
status=tracker_pb2.ApprovalStatus.REVIEW_REQUESTED,
set_on=2345,
approver_ids_add=[222],
setter_id=111)
self.work_env.UpdateIssueApproval(78901, 24, delta, 'please review', False)
self.services.issue.DeltaUpdateIssueApproval.assert_called_once_with(
self.mr.cnxn, 111, config, issue, av_24, delta,
comment_content='please review', is_description=False, attachments=None,
kept_attachments=None)
@mock.patch(
'features.send_notifications.PrepareAndSendApprovalChangeNotification')
def testUpdateIssueApproval_IsDescription(self, _mockPrepareAndSend):
"""We can update an issue's approval survey."""
self.services.issue.DeltaUpdateIssueApproval = mock.Mock()
self.SignIn()
config = fake.MakeTestConfig(789, [], [])
self.services.config.StoreConfig('cnxn', config)
av_24 = tracker_pb2.ApprovalValue(approval_id=24)
issue = fake.MakeTestIssue(789, 1, 'summary', 'Available', 111,
issue_id=78901, approval_values=[av_24])
self.services.issue.TestAddIssue(issue)
delta = tracker_pb2.ApprovalDelta(setter_id=111)
self.work_env.UpdateIssueApproval(78901, 24, delta, 'better response', True)
self.services.issue.DeltaUpdateIssueApproval.assert_called_once_with(
self.mr.cnxn, 111, config, issue, av_24, delta,
comment_content='better response', is_description=True,
attachments=None, kept_attachments=None)
@mock.patch(
'features.send_notifications.PrepareAndSendApprovalChangeNotification')
def testUpdateIssueApproval_Attachments(self, _mockPrepareAndSend):
"""We can attach files as we many an approval change."""
self.services.issue.DeltaUpdateIssueApproval = mock.Mock()
self.SignIn()
config = fake.MakeTestConfig(789, [], [])
self.services.config.StoreConfig('cnxn', config)
av_24 = tracker_pb2.ApprovalValue(
approval_id=24, approver_ids=[111],
status=tracker_pb2.ApprovalStatus.NOT_SET, set_on=1234, setter_id=999)
issue = fake.MakeTestIssue(789, 1, 'summary', 'Available', 111,
issue_id=78901, approval_values=[av_24])
self.services.issue.TestAddIssue(issue)
delta = tracker_pb2.ApprovalDelta(
status=tracker_pb2.ApprovalStatus.REVIEW_REQUESTED,
set_on=2345,
approver_ids_add=[222],
setter_id=111)
attachments = []
self.work_env.UpdateIssueApproval(78901, 24, delta, 'please review', False,
attachments=attachments)
self.services.issue.DeltaUpdateIssueApproval.assert_called_once_with(
self.mr.cnxn, 111, config, issue, av_24, delta,
comment_content='please review', is_description=False,
attachments=attachments, kept_attachments=None)
@mock.patch(
'features.send_notifications.PrepareAndSendApprovalChangeNotification')
@mock.patch(
'tracker.tracker_helpers.FilterKeptAttachments')
def testUpdateIssueApproval_KeptAttachments(
self, mockFilterKeptAttachments, _mockPrepareAndSend):
"""We can keep attachments from previous descriptions."""
self.services.issue.DeltaUpdateIssueApproval = mock.Mock()
mockFilterKeptAttachments.return_value = [1, 2]
self.SignIn()
config = fake.MakeTestConfig(789, [], [])
self.services.config.StoreConfig('cnxn', config)
av_24 = tracker_pb2.ApprovalValue(
approval_id=24, approver_ids=[111],
status=tracker_pb2.ApprovalStatus.NOT_SET, set_on=1234, setter_id=999)
issue = fake.MakeTestIssue(789, 1, 'summary', 'Available', 111,
issue_id=78901, approval_values=[av_24])
self.services.issue.TestAddIssue(issue)
delta = tracker_pb2.ApprovalDelta(setter_id=111)
with self.work_env as we:
we.UpdateIssueApproval(
78901, 24, delta, 'Another Desc', True, kept_attachments=[1, 2, 3])
comments = self.services.issue.GetCommentsForIssue('cnxn', issue.issue_id)
mockFilterKeptAttachments.assert_called_once_with(
True, [1, 2, 3], comments, 24)
self.services.issue.DeltaUpdateIssueApproval.assert_called_once_with(
self.mr.cnxn, 111, config, issue, av_24, delta,
comment_content='Another Desc', is_description=True,
attachments=None, kept_attachments=[1, 2])
def testUpdateIssueApproval_TooLongComment(self):
"""We raise an exception if too long a comment is used when updating an
issue's approval value."""
self.services.issue.DeltaUpdateIssueApproval = mock.Mock()
self.SignIn()
config = fake.MakeTestConfig(789, [], [])
self.services.config.StoreConfig('cnxn', config)
av_24 = tracker_pb2.ApprovalValue(
approval_id=24,
approver_ids=[111],
status=tracker_pb2.ApprovalStatus.NOT_SET,
set_on=1234,
setter_id=999)
issue = fake.MakeTestIssue(
789,
1,
'summary',
'Available',
111,
issue_id=78901,
approval_values=[av_24])
self.services.issue.TestAddIssue(issue)
delta = tracker_pb2.ApprovalDelta(
status=tracker_pb2.ApprovalStatus.REVIEW_REQUESTED,
set_on=2345,
approver_ids_add=[222])
with self.assertRaises(exceptions.InputException):
long_comment = ' ' + 'c' * tracker_constants.MAX_COMMENT_CHARS + ' '
self.work_env.UpdateIssueApproval(78901, 24, delta, long_comment, False)
def testUpdateIssueApproval_NonExistentUsers(self):
"""We raise an exception if adding an approver that does not exist."""
self.services.issue.DeltaUpdateIssueApproval = mock.Mock()
self.SignIn()
config = fake.MakeTestConfig(789, [], [])
self.services.config.StoreConfig('cnxn', config)
av_24 = tracker_pb2.ApprovalValue(
approval_id=24,
approver_ids=[111],