blob: b7e930b760aaa4922ecd4fd33f8bca70b8474cfb [file] [log] [blame]
# Copyright 2022 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unittest for the tracker helpers module."""
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import copy
import mock
import unittest
import io
import six
import settings
from businesslogic import work_env
from framework import exceptions
from framework import framework_constants
from framework import framework_helpers
from framework import permissions
from framework import template_helpers
from framework import urls
from mrproto import project_pb2
from mrproto import tracker_pb2
from mrproto import user_pb2
from services import service_manager
from testing import fake
from testing import testing_helpers
from tracker import tracker_bizobj
from tracker import tracker_constants
from tracker import tracker_helpers
from werkzeug.datastructures import FileStorage
TEST_ID_MAP = {
'a@example.com': 1,
'b@example.com': 2,
'c@example.com': 3,
'd@example.com': 4,
}
def _Issue(project_name, local_id, summary='', status='', project_id=789):
issue = tracker_pb2.Issue()
issue.project_name = project_name
issue.project_id = project_id
issue.local_id = local_id
issue.issue_id = 100000 + local_id
issue.summary = summary
issue.status = status
return issue
def _MakeConfig():
config = tracker_pb2.ProjectIssueConfig()
config.well_known_statuses.append(tracker_pb2.StatusDef(
means_open=True, status='New', deprecated=False))
config.well_known_statuses.append(tracker_pb2.StatusDef(
status='Old', means_open=False, deprecated=False))
config.well_known_statuses.append(tracker_pb2.StatusDef(
status='StatusThatWeDontUseAnymore', means_open=False, deprecated=True))
return config
class HelpersTest(unittest.TestCase):
def setUp(self):
self.services = service_manager.Services(
project=fake.ProjectService(),
config=fake.ConfigService(),
issue=fake.IssueService(),
user=fake.UserService(),
usergroup=fake.UserGroupService())
for email, user_id in TEST_ID_MAP.items():
self.services.user.TestAddUser(email, user_id)
self.services.project.TestAddProject('testproj', project_id=789)
self.issue1 = fake.MakeTestIssue(789, 1, 'one', 'New', 111)
self.issue1.project_name = 'testproj'
self.services.issue.TestAddIssue(self.issue1)
self.issue2 = fake.MakeTestIssue(789, 2, 'two', 'New', 111)
self.issue2.project_name = 'testproj'
self.services.issue.TestAddIssue(self.issue2)
self.issue3 = fake.MakeTestIssue(789, 3, 'three', 'New', 111)
self.issue3.project_name = 'testproj'
self.services.issue.TestAddIssue(self.issue3)
self.cnxn = 'fake connextion'
self.errors = template_helpers.EZTError()
self.default_colspec_param = 'colspec=%s' % (
tracker_constants.DEFAULT_COL_SPEC.replace(' ', '%20'))
self.services.usergroup.TestAddGroupSettings(999, 'group@example.com')
def testParseIssueRequest_Empty(self):
post_data = fake.PostData()
errors = template_helpers.EZTError()
parsed = tracker_helpers.ParseIssueRequest(
'fake cnxn', post_data, self.services, errors, 'proj')
self.assertEqual('', parsed.summary)
self.assertEqual('', parsed.comment)
self.assertEqual('', parsed.status)
self.assertEqual('', parsed.users.owner_username)
self.assertEqual(0, parsed.users.owner_id)
self.assertEqual([], parsed.users.cc_usernames)
self.assertEqual([], parsed.users.cc_usernames_remove)
self.assertEqual([], parsed.users.cc_ids)
self.assertEqual([], parsed.users.cc_ids_remove)
self.assertEqual('', parsed.template_name)
self.assertEqual([], parsed.labels)
self.assertEqual([], parsed.labels_remove)
self.assertEqual({}, parsed.fields.vals)
self.assertEqual({}, parsed.fields.vals_remove)
self.assertEqual([], parsed.fields.fields_clear)
self.assertEqual('', parsed.blocked_on.entered_str)
self.assertEqual([], parsed.blocked_on.iids)
def testParseIssueRequest_Normal(self):
post_data = fake.PostData({
'summary': ['some summary'],
'comment': ['some comment'],
'status': ['SomeStatus'],
'template_name': ['some template'],
'label': ['lab1', '-lab2'],
'custom_123': ['field1123a', 'field1123b'],
})
errors = template_helpers.EZTError()
parsed = tracker_helpers.ParseIssueRequest(
'fake cnxn', post_data, self.services, errors, 'proj')
self.assertEqual('some summary', parsed.summary)
self.assertEqual('some comment', parsed.comment)
self.assertEqual('SomeStatus', parsed.status)
self.assertEqual('', parsed.users.owner_username)
self.assertEqual(0, parsed.users.owner_id)
self.assertEqual([], parsed.users.cc_usernames)
self.assertEqual([], parsed.users.cc_usernames_remove)
self.assertEqual([], parsed.users.cc_ids)
self.assertEqual([], parsed.users.cc_ids_remove)
self.assertEqual('some template', parsed.template_name)
self.assertEqual(['lab1'], parsed.labels)
self.assertEqual(['lab2'], parsed.labels_remove)
self.assertEqual({123: ['field1123a', 'field1123b']}, parsed.fields.vals)
self.assertEqual({}, parsed.fields.vals_remove)
self.assertEqual([], parsed.fields.fields_clear)
def testMarkupDescriptionOnInput(self):
content = 'What?\nthat\nWhy?\nidk\nWhere?\n'
tmpl_txt = 'What?\nWhy?\nWhere?\nWhen?'
desc = '<b>What?</b>\nthat\n<b>Why?</b>\nidk\n<b>Where?</b>\n'
self.assertEqual(tracker_helpers.MarkupDescriptionOnInput(
content, tmpl_txt), desc)
def testMarkupDescriptionLineOnInput(self):
line = 'What happened??'
tmpl_lines = ['What happened??','Why?']
self.assertEqual(tracker_helpers._MarkupDescriptionLineOnInput(
line, tmpl_lines), '<b>What happened??</b>')
line = 'Something terrible!!!'
self.assertEqual(tracker_helpers._MarkupDescriptionLineOnInput(
line, tmpl_lines), 'Something terrible!!!')
def testClassifyPlusMinusItems(self):
add, remove = tracker_helpers._ClassifyPlusMinusItems([])
self.assertEqual([], add)
self.assertEqual([], remove)
add, remove = tracker_helpers._ClassifyPlusMinusItems(
['', ' ', ' \t', '-'])
six.assertCountEqual(self, [], add)
six.assertCountEqual(self, [], remove)
add, remove = tracker_helpers._ClassifyPlusMinusItems(
['a', 'b', 'c'])
six.assertCountEqual(self, ['a', 'b', 'c'], add)
six.assertCountEqual(self, [], remove)
add, remove = tracker_helpers._ClassifyPlusMinusItems(
['a-a-a', 'b-b', 'c-'])
six.assertCountEqual(self, ['a-a-a', 'b-b', 'c-'], add)
six.assertCountEqual(self, [], remove)
add, remove = tracker_helpers._ClassifyPlusMinusItems(
['-a'])
six.assertCountEqual(self, [], add)
six.assertCountEqual(self, ['a'], remove)
add, remove = tracker_helpers._ClassifyPlusMinusItems(
['-a', 'b', 'c-c'])
six.assertCountEqual(self, ['b', 'c-c'], add)
six.assertCountEqual(self, ['a'], remove)
add, remove = tracker_helpers._ClassifyPlusMinusItems(
['-a', '-b-b', '-c-'])
six.assertCountEqual(self, [], add)
six.assertCountEqual(self, ['a', 'b-b', 'c-'], remove)
# We dedup, but we don't cancel out items that are both added and removed.
add, remove = tracker_helpers._ClassifyPlusMinusItems(
['a', 'a', '-a'])
six.assertCountEqual(self, ['a'], add)
six.assertCountEqual(self, ['a'], remove)
def testParseIssueRequestFields(self):
parsed_fields = tracker_helpers._ParseIssueRequestFields(fake.PostData({
'custom_1': ['https://hello.com'],
'custom_12': ['https://blah.com'],
'custom_14': ['https://remove.com'],
'custom_15_goats': ['2', '3'],
'custom_15_sheep': ['3', '5'],
'custom_16_sheep': ['yarn'],
'op_custom_14': ['remove'],
'op_custom_12': ['clear'],
'op_custom_16_sheep': ['remove'],
'ignore': 'no matter',}))
self.assertEqual(
parsed_fields,
tracker_helpers.ParsedFields(
{
1: ['https://hello.com'],
12: ['https://blah.com']
}, {14: ['https://remove.com']}, [12],
{15: {
'goats': ['2', '3'],
'sheep': ['3', '5']
}}, {16: {
'sheep': ['yarn']
}}))
def testParseIssueRequestAttachments(self):
file1 = FileStorage(
stream=io.BytesIO(b'hello world'),
filename='hello.c',
)
file2 = FileStorage(
stream=io.BytesIO(b'Welcome to our project'),
filename='README',
)
file3 = FileStorage(
stream=io.BytesIO(b'Abort, Retry, or Fail?'),
filename='c:\\dir\\subdir\\FILENAME.EXT',
)
# Browsers send this if FILE field was not filled in.
file4 = FileStorage(
stream=io.BytesIO(b''),
filename='',
)
attachments = tracker_helpers._ParseIssueRequestAttachments({})
self.assertEqual([], attachments)
attachments = tracker_helpers._ParseIssueRequestAttachments(fake.PostData({
'file1': [file1],
}))
self.assertEqual([('hello.c', b'hello world', 'text/plain')], attachments)
file1.seek(0)
attachments = tracker_helpers._ParseIssueRequestAttachments(fake.PostData({
'file1': [file1],
'file2': [file2],
}))
self.assertEqual(
[
('hello.c', b'hello world', 'text/plain'),
('README', b'Welcome to our project', 'text/plain')
], attachments)
file1.seek(0)
file2.seek(0)
attachments = tracker_helpers._ParseIssueRequestAttachments(fake.PostData({
'file3': [file3],
}))
self.assertEqual(
[
(
'FILENAME.EXT', b'Abort, Retry, or Fail?',
'application/octet-stream')
], attachments)
file3.seek(0)
attachments = tracker_helpers._ParseIssueRequestAttachments(fake.PostData({
'file1': [file4], # Does not appear in result
'file3': [file3],
'file4': [file4], # Does not appear in result
}))
self.assertEqual(
[
(
'FILENAME.EXT', b'Abort, Retry, or Fail?',
'application/octet-stream')
], attachments)
file3.seek(0)
def testParseIssueRequestKeptAttachments(self):
pass # TODO(jrobbins): Write this test.
def testParseIssueRequestUsers(self):
post_data = {}
parsed_users = tracker_helpers._ParseIssueRequestUsers(
'fake connection', post_data, self.services)
self.assertEqual('', parsed_users.owner_username)
self.assertEqual(
framework_constants.NO_USER_SPECIFIED, parsed_users.owner_id)
self.assertEqual([], parsed_users.cc_usernames)
self.assertEqual([], parsed_users.cc_usernames_remove)
self.assertEqual([], parsed_users.cc_ids)
self.assertEqual([], parsed_users.cc_ids_remove)
post_data = fake.PostData({
'owner': [''],
})
parsed_users = tracker_helpers._ParseIssueRequestUsers(
'fake connection', post_data, self.services)
self.assertEqual('', parsed_users.owner_username)
self.assertEqual(
framework_constants.NO_USER_SPECIFIED, parsed_users.owner_id)
self.assertEqual([], parsed_users.cc_usernames)
self.assertEqual([], parsed_users.cc_usernames_remove)
self.assertEqual([], parsed_users.cc_ids)
self.assertEqual([], parsed_users.cc_ids_remove)
post_data = fake.PostData({
'owner': [' \t'],
})
parsed_users = tracker_helpers._ParseIssueRequestUsers(
'fake connection', post_data, self.services)
self.assertEqual('', parsed_users.owner_username)
self.assertEqual(
framework_constants.NO_USER_SPECIFIED, parsed_users.owner_id)
self.assertEqual([], parsed_users.cc_usernames)
self.assertEqual([], parsed_users.cc_usernames_remove)
self.assertEqual([], parsed_users.cc_ids)
self.assertEqual([], parsed_users.cc_ids_remove)
post_data = fake.PostData({
'owner': ['b@example.com'],
})
parsed_users = tracker_helpers._ParseIssueRequestUsers(
'fake connection', post_data, self.services)
self.assertEqual('b@example.com', parsed_users.owner_username)
self.assertEqual(TEST_ID_MAP['b@example.com'], parsed_users.owner_id)
self.assertEqual([], parsed_users.cc_usernames)
self.assertEqual([], parsed_users.cc_usernames_remove)
self.assertEqual([], parsed_users.cc_ids)
self.assertEqual([], parsed_users.cc_ids_remove)
post_data = fake.PostData({
'owner': ['b@example.com'],
})
parsed_users = tracker_helpers._ParseIssueRequestUsers(
'fake connection', post_data, self.services)
self.assertEqual('b@example.com', parsed_users.owner_username)
self.assertEqual(TEST_ID_MAP['b@example.com'], parsed_users.owner_id)
self.assertEqual([], parsed_users.cc_usernames)
self.assertEqual([], parsed_users.cc_usernames_remove)
self.assertEqual([], parsed_users.cc_ids)
self.assertEqual([], parsed_users.cc_ids_remove)
post_data = fake.PostData({
'cc': ['b@example.com'],
})
parsed_users = tracker_helpers._ParseIssueRequestUsers(
'fake connection', post_data, self.services)
self.assertEqual('', parsed_users.owner_username)
self.assertEqual(
framework_constants.NO_USER_SPECIFIED, parsed_users.owner_id)
self.assertEqual(['b@example.com'], parsed_users.cc_usernames)
self.assertEqual([], parsed_users.cc_usernames_remove)
self.assertEqual([TEST_ID_MAP['b@example.com']], parsed_users.cc_ids)
self.assertEqual([], parsed_users.cc_ids_remove)
post_data = fake.PostData({
'cc': ['-b@example.com, c@example.com,,'
'a@example.com,'],
})
parsed_users = tracker_helpers._ParseIssueRequestUsers(
'fake connection', post_data, self.services)
self.assertEqual('', parsed_users.owner_username)
self.assertEqual(
framework_constants.NO_USER_SPECIFIED, parsed_users.owner_id)
six.assertCountEqual(
self, ['c@example.com', 'a@example.com'], parsed_users.cc_usernames)
self.assertEqual(['b@example.com'], parsed_users.cc_usernames_remove)
six.assertCountEqual(
self, [TEST_ID_MAP['c@example.com'], TEST_ID_MAP['a@example.com']],
parsed_users.cc_ids)
self.assertEqual([TEST_ID_MAP['b@example.com']],
parsed_users.cc_ids_remove)
post_data = fake.PostData({
'owner': ['fuhqwhgads@example.com'],
'cc': ['c@example.com, fuhqwhgads@example.com'],
})
parsed_users = tracker_helpers._ParseIssueRequestUsers(
'fake connection', post_data, self.services)
self.assertEqual('fuhqwhgads@example.com', parsed_users.owner_username)
gen_uid = framework_helpers.MurmurHash3_x86_32(parsed_users.owner_username)
self.assertEqual(gen_uid, parsed_users.owner_id) # autocreated user
six.assertCountEqual(
self, ['c@example.com', 'fuhqwhgads@example.com'],
parsed_users.cc_usernames)
self.assertEqual([], parsed_users.cc_usernames_remove)
six.assertCountEqual(
self, [TEST_ID_MAP['c@example.com'], gen_uid], parsed_users.cc_ids)
self.assertEqual([], parsed_users.cc_ids_remove)
post_data = fake.PostData({
'cc': ['C@example.com, b@exAmple.cOm'],
})
parsed_users = tracker_helpers._ParseIssueRequestUsers(
'fake connection', post_data, self.services)
six.assertCountEqual(
self, ['c@example.com', 'b@example.com'], parsed_users.cc_usernames)
self.assertEqual([], parsed_users.cc_usernames_remove)
six.assertCountEqual(
self, [TEST_ID_MAP['c@example.com'], TEST_ID_MAP['b@example.com']],
parsed_users.cc_ids)
self.assertEqual([], parsed_users.cc_ids_remove)
def testParseBlockers_BlockedOnNothing(self):
"""Was blocked on nothing, still nothing."""
post_data = {tracker_helpers.BLOCKED_ON: ''}
parsed_blockers = tracker_helpers._ParseBlockers(
self.cnxn, post_data, self.services, self.errors, 'testproj',
tracker_helpers.BLOCKED_ON)
self.assertEqual('', parsed_blockers.entered_str)
self.assertEqual([], parsed_blockers.iids)
self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKED_ON))
self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKING))
def testParseBlockers_BlockedOnAdded(self):
"""Was blocked on nothing; now 1, 2, 3."""
post_data = {tracker_helpers.BLOCKED_ON: '1, 2, 3'}
parsed_blockers = tracker_helpers._ParseBlockers(
self.cnxn, post_data, self.services, self.errors, 'testproj',
tracker_helpers.BLOCKED_ON)
self.assertEqual('1, 2, 3', parsed_blockers.entered_str)
self.assertEqual([100001, 100002, 100003], parsed_blockers.iids)
self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKED_ON))
self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKING))
def testParseBlockers_BlockedOnDuplicateRef(self):
"""Was blocked on nothing; now just 2, but repeated in input."""
post_data = {tracker_helpers.BLOCKED_ON: '2, 2, 2'}
parsed_blockers = tracker_helpers._ParseBlockers(
self.cnxn, post_data, self.services, self.errors, 'testproj',
tracker_helpers.BLOCKED_ON)
self.assertEqual('2, 2, 2', parsed_blockers.entered_str)
self.assertEqual([100002], parsed_blockers.iids)
self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKED_ON))
self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKING))
def testParseBlockers_Missing(self):
"""Parsing an input field that was not in the POST."""
post_data = {}
parsed_blockers = tracker_helpers._ParseBlockers(
self.cnxn, post_data, self.services, self.errors, 'testproj',
tracker_helpers.BLOCKED_ON)
self.assertEqual('', parsed_blockers.entered_str)
self.assertEqual([], parsed_blockers.iids)
self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKED_ON))
self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKING))
def testParseBlockers_SameIssueNoProject(self):
"""Adding same issue as blocker should modify the errors object."""
post_data = {'id': '2', tracker_helpers.BLOCKING: '2, 3'}
parsed_blockers = tracker_helpers._ParseBlockers(
self.cnxn, post_data, self.services, self.errors, 'testproj',
tracker_helpers.BLOCKING)
self.assertEqual('2, 3', parsed_blockers.entered_str)
self.assertEqual([], parsed_blockers.iids)
self.assertEqual(
getattr(self.errors, tracker_helpers.BLOCKING),
'Cannot be blocking the same issue')
self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKED_ON))
def testParseBlockers_SameIssueSameProject(self):
"""Adding same issue as blocker should modify the errors object."""
post_data = {'id': '2', tracker_helpers.BLOCKING: 'testproj:2, 3'}
parsed_blockers = tracker_helpers._ParseBlockers(
self.cnxn, post_data, self.services, self.errors, 'testproj',
tracker_helpers.BLOCKING)
self.assertEqual('testproj:2, 3', parsed_blockers.entered_str)
self.assertEqual([], parsed_blockers.iids)
self.assertEqual(
getattr(self.errors, tracker_helpers.BLOCKING),
'Cannot be blocking the same issue')
self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKED_ON))
def testParseBlockers_SameIssueDifferentProject(self):
"""Adding different blocker issue should not modify the errors object."""
post_data = {'id': '2', tracker_helpers.BLOCKING: 'testproj:2'}
parsed_blockers = tracker_helpers._ParseBlockers(
self.cnxn, post_data, self.services, self.errors, 'testprojB',
tracker_helpers.BLOCKING)
self.assertEqual('testproj:2', parsed_blockers.entered_str)
self.assertEqual([100002], parsed_blockers.iids)
self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKING))
self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKED_ON))
def testParseBlockers_Invalid(self):
"""Input fields with invalid values should modify the errors object."""
post_data = {tracker_helpers.BLOCKING: '2, foo',
tracker_helpers.BLOCKED_ON: '3, bar'}
parsed_blockers = tracker_helpers._ParseBlockers(
self.cnxn, post_data, self.services, self.errors, 'testproj',
tracker_helpers.BLOCKING)
self.assertEqual('2, foo', parsed_blockers.entered_str)
self.assertEqual([100002], parsed_blockers.iids)
self.assertEqual(
getattr(self.errors, tracker_helpers.BLOCKING), 'Invalid issue ID foo')
self.assertIsNone(getattr(self.errors, tracker_helpers.BLOCKED_ON))
parsed_blockers = tracker_helpers._ParseBlockers(
self.cnxn, post_data, self.services, self.errors, 'testproj',
tracker_helpers.BLOCKED_ON)
self.assertEqual('3, bar', parsed_blockers.entered_str)
self.assertEqual([100003], parsed_blockers.iids)
self.assertEqual(
getattr(self.errors, tracker_helpers.BLOCKED_ON),
'Invalid issue ID bar')
def testParseBlockers_Dangling(self):
"""A ref to a sanctioned projected should be allowed."""
post_data = {'id': '2', tracker_helpers.BLOCKING: 'otherproj:2'}
real_codesite_projects = settings.recognized_codesite_projects
settings.recognized_codesite_projects = ['otherproj']
parsed_blockers = tracker_helpers._ParseBlockers(
self.cnxn, post_data, self.services, self.errors, 'testproj',
tracker_helpers.BLOCKING)
self.assertEqual('otherproj:2', parsed_blockers.entered_str)
self.assertEqual([('otherproj', 2)], parsed_blockers.dangling_refs)
settings.recognized_codesite_projects = real_codesite_projects
def testParseBlockers_FederatedReferences(self):
"""Should parse and return FedRefs."""
post_data = {'id': '9', tracker_helpers.BLOCKING: '2, b/123, 3, b/789'}
parsed_blockers = tracker_helpers._ParseBlockers(
self.cnxn, post_data, self.services, self.errors, 'testproj',
tracker_helpers.BLOCKING)
self.assertEqual('2, b/123, 3, b/789', parsed_blockers.entered_str)
self.assertEqual([100002, 100003], parsed_blockers.iids)
self.assertEqual(['b/123', 'b/789'], parsed_blockers.federated_ref_strings)
def testIsValidIssueOwner(self):
project = project_pb2.Project()
project.owner_ids.extend([1, 2])
project.committer_ids.extend([3])
project.contributor_ids.extend([4, 999])
valid, _ = tracker_helpers.IsValidIssueOwner(
'fake cnxn', project, framework_constants.NO_USER_SPECIFIED,
self.services)
self.assertTrue(valid)
valid, _ = tracker_helpers.IsValidIssueOwner(
'fake cnxn', project, 1,
self.services)
self.assertTrue(valid)
valid, _ = tracker_helpers.IsValidIssueOwner(
'fake cnxn', project, 2,
self.services)
self.assertTrue(valid)
valid, _ = tracker_helpers.IsValidIssueOwner(
'fake cnxn', project, 3,
self.services)
self.assertTrue(valid)
valid, _ = tracker_helpers.IsValidIssueOwner(
'fake cnxn', project, 4,
self.services)
self.assertTrue(valid)
valid, _ = tracker_helpers.IsValidIssueOwner(
'fake cnxn', project, 7,
self.services)
self.assertFalse(valid)
valid, _ = tracker_helpers.IsValidIssueOwner(
'fake cnxn', project, 999,
self.services)
self.assertFalse(valid)
# MakeViewsForUsersInIssuesTest is tested in MakeViewsForUsersInIssuesTest.
def testGetAllowedOpenedAndClosedIssues(self):
pass # TOOD(jrobbins): Write this test.
def testFormatIssueListURL_JumpedToIssue(self):
"""If we jumped to issue 123, the list is can=1&q=id-123."""
config = tracker_pb2.ProjectIssueConfig()
path = '/p/proj/issues/detail?id=123&q=123'
mr = testing_helpers.MakeMonorailRequest(
path=path, headers={'Host': 'code.google.com'})
mr.ComputeColSpec(config)
absolute_base_url = 'http://code.google.com'
url_1 = tracker_helpers.FormatIssueListURL(mr, config)
self.assertEqual(
'%s/p/proj/issues/list?can=1&%s&q=id%%3D123' % (
absolute_base_url, self.default_colspec_param),
url_1)
def testFormatIssueListURL_NoCurrentState(self):
config = tracker_pb2.ProjectIssueConfig()
path = '/p/proj/issues/detail?id=123'
mr = testing_helpers.MakeMonorailRequest(
path=path, headers={'Host': 'code.google.com'})
mr.ComputeColSpec(config)
absolute_base_url = 'http://code.google.com'
url_1 = tracker_helpers.FormatIssueListURL(mr, config)
self.assertEqual(
'%s/p/proj/issues/list?%s&q=' % (
absolute_base_url, self.default_colspec_param),
url_1)
url_2 = tracker_helpers.FormatIssueListURL(
mr, config, foo=123)
self.assertEqual(
'%s/p/proj/issues/list?%s&foo=123&q=' % (
absolute_base_url, self.default_colspec_param),
url_2)
url_3 = tracker_helpers.FormatIssueListURL(
mr, config, foo=123, bar='abc')
self.assertEqual(
'%s/p/proj/issues/list?bar=abc&%s&foo=123&q=' % (
absolute_base_url, self.default_colspec_param),
url_3)
url_4 = tracker_helpers.FormatIssueListURL(
mr, config, baz='escaped+encoded&and100% "safe"')
self.assertEqual(
'%s/p/proj/issues/list?'
'baz=escaped%%2Bencoded%%26and100%%25%%20%%22safe%%22&%s&q=' % (
absolute_base_url, self.default_colspec_param),
url_4)
def testFormatIssueListURL_KeepCurrentState(self):
config = tracker_pb2.ProjectIssueConfig()
path = '/p/proj/issues/detail?id=123&sort=aa&colspec=a b c&groupby=d'
mr = testing_helpers.MakeMonorailRequest(
path=path, headers={'Host': 'localhost:8080'})
mr.ComputeColSpec(config)
absolute_base_url = 'http://localhost:8080'
url_1 = tracker_helpers.FormatIssueListURL(mr, config)
self.assertEqual(
'%s/p/proj/issues/list?colspec=a%%20b%%20c'
'&groupby=d&q=&sort=aa' % absolute_base_url,
url_1)
url_2 = tracker_helpers.FormatIssueListURL(
mr, config, foo=123)
self.assertEqual(
'%s/p/proj/issues/list?'
'colspec=a%%20b%%20c&foo=123&groupby=d&q=&sort=aa' % absolute_base_url,
url_2)
url_3 = tracker_helpers.FormatIssueListURL(
mr, config, colspec='X Y Z')
self.assertEqual(
'%s/p/proj/issues/list?colspec=a%%20b%%20c'
'&groupby=d&q=&sort=aa' % absolute_base_url,
url_3)
def testFormatRelativeIssueURL(self):
self.assertEqual(
'/p/proj/issues/attachment',
tracker_helpers.FormatRelativeIssueURL(
'proj', urls.ISSUE_ATTACHMENT))
self.assertEqual(
'/p/proj/issues/detail?id=123',
tracker_helpers.FormatRelativeIssueURL(
'proj', urls.ISSUE_DETAIL, id=123))
@mock.patch('google.appengine.api.app_identity.get_application_id')
def testFormatCrBugURL_Prod(self, mock_get_app_id):
mock_get_app_id.return_value = 'monorail-prod'
self.assertEqual(
'https://crbug.com/proj/123',
tracker_helpers.FormatCrBugURL('proj', 123))
self.assertEqual(
'https://crbug.com/123456',
tracker_helpers.FormatCrBugURL('chromium', 123456))
@mock.patch('google.appengine.api.app_identity.get_application_id')
def testFormatCrBugURL_NonProd(self, mock_get_app_id):
mock_get_app_id.return_value = 'monorail-staging'
self.assertEqual(
'/p/proj/issues/detail?id=123',
tracker_helpers.FormatCrBugURL('proj', 123))
self.assertEqual(
'/p/chromium/issues/detail?id=123456',
tracker_helpers.FormatCrBugURL('chromium', 123456))
@mock.patch('tracker.tracker_constants.ISSUE_ATTACHMENTS_QUOTA_HARD', 1)
def testComputeNewQuotaBytesUsed_ProjectQuota(self):
upload_1 = framework_helpers.AttachmentUpload(
'matter not', b'three men make a tiger', 'matter not')
upload_2 = framework_helpers.AttachmentUpload(
'matter not', b'chicken', 'matter not')
attachments = [upload_1, upload_2]
project = fake.Project()
project.attachment_bytes_used = 10
project.attachment_quota = project.attachment_bytes_used + len(
upload_1.contents + upload_2.contents) + 1
actual_new = tracker_helpers.ComputeNewQuotaBytesUsed(project, attachments)
expected_new = project.attachment_quota - 1
self.assertEqual(actual_new, expected_new)
upload_3 = framework_helpers.AttachmentUpload(
'matter not', b'donut', 'matter not')
attachments.append(upload_3)
with self.assertRaises(exceptions.OverAttachmentQuota):
tracker_helpers.ComputeNewQuotaBytesUsed(project, attachments)
@mock.patch(
'tracker.tracker_constants.ISSUE_ATTACHMENTS_QUOTA_HARD', len('tiger'))
def testComputeNewQuotaBytesUsed_GeneralQuota(self):
upload_1 = framework_helpers.AttachmentUpload(
'matter not', b'tiger', 'matter not')
attachments = [upload_1]
project = fake.Project()
actual_new = tracker_helpers.ComputeNewQuotaBytesUsed(project, attachments)
expected_new = len(upload_1.contents)
self.assertEqual(actual_new, expected_new)
upload_2 = framework_helpers.AttachmentUpload(
'matter not', b'donut', 'matter not')
attachments.append(upload_2)
with self.assertRaises(exceptions.OverAttachmentQuota):
tracker_helpers.ComputeNewQuotaBytesUsed(project, attachments)
upload_3 = framework_helpers.AttachmentUpload(
'matter not', b'donut', 'matter not')
attachments.append(upload_3)
with self.assertRaises(exceptions.OverAttachmentQuota):
tracker_helpers.ComputeNewQuotaBytesUsed(project, attachments)
def testIsUnderSoftAttachmentQuota(self):
pass # TODO(jrobbins): Write this test.
# GetAllIssueProjects is tested in GetAllIssueProjectsTest.
def testGetPermissionsInAllProjects(self):
pass # TODO(jrobbins): Write this test.
# FilterOutNonViewableIssues is tested in FilterOutNonViewableIssuesTest.
def testMeansOpenInProject(self):
config = _MakeConfig()
# ensure open means open
self.assertTrue(tracker_helpers.MeansOpenInProject('New', config))
self.assertTrue(tracker_helpers.MeansOpenInProject('new', config))
# ensure an unrecognized status means open
self.assertTrue(tracker_helpers.MeansOpenInProject(
'_undefined_status_', config))
# ensure closed means closed
self.assertFalse(tracker_helpers.MeansOpenInProject('Old', config))
self.assertFalse(tracker_helpers.MeansOpenInProject('old', config))
self.assertFalse(tracker_helpers.MeansOpenInProject(
'StatusThatWeDontUseAnymore', config))
def testIsNoisy(self):
self.assertTrue(tracker_helpers.IsNoisy(778, 320))
self.assertFalse(tracker_helpers.IsNoisy(20, 500))
self.assertFalse(tracker_helpers.IsNoisy(500, 20))
self.assertFalse(tracker_helpers.IsNoisy(1, 1))
def testMergeCCsAndAddComment(self):
target_issue = fake.MakeTestIssue(
789, 10, 'Target issue', 'New', 111)
source_issue = fake.MakeTestIssue(
789, 100, 'Source issue', 'New', 222)
source_issue.cc_ids.append(111)
# Issue without owner
source_issue_2 = fake.MakeTestIssue(
789, 101, 'Source issue 2', 'New', 0)
self.services.issue.TestAddIssue(target_issue)
self.services.issue.TestAddIssue(source_issue)
self.services.issue.TestAddIssue(source_issue_2)
# We copy this list so that it isn't updated by the test framework
initial_issue_comments = (
self.services.issue.GetCommentsForIssue(
'fake cnxn', target_issue.issue_id)[:])
mr = testing_helpers.MakeMonorailRequest(user_info={'user_id': 111})
# Merging source into target should create a comment.
self.assertIsNotNone(
tracker_helpers.MergeCCsAndAddComment(
self.services, mr, source_issue, target_issue))
updated_issue_comments = self.services.issue.GetCommentsForIssue(
'fake cnxn', target_issue.issue_id)
for comment in initial_issue_comments:
self.assertIn(comment, updated_issue_comments)
self.assertEqual(
len(initial_issue_comments) + 1, len(updated_issue_comments))
# Merging source into target should add source's owner to target's CCs.
updated_target_issue = self.services.issue.GetIssueByLocalID(
'fake cnxn', 789, 10)
self.assertIn(111, updated_target_issue.cc_ids)
self.assertIn(222, updated_target_issue.cc_ids)
# Merging source 2 into target should make a comment, but not update CCs.
self.assertIsNotNone(
tracker_helpers.MergeCCsAndAddComment(
self.services, mr, source_issue_2, updated_target_issue))
updated_target_issue = self.services.issue.GetIssueByLocalID(
'fake cnxn', 789, 10)
self.assertNotIn(0, updated_target_issue.cc_ids)
def testMergeCCsAndAddComment_RestrictedSourceIssue(self):
target_issue = fake.MakeTestIssue(
789, 10, 'Target issue', 'New', 222)
target_issue_2 = fake.MakeTestIssue(
789, 11, 'Target issue 2', 'New', 222)
source_issue = fake.MakeTestIssue(
789, 100, 'Source issue', 'New', 111)
source_issue.cc_ids.append(111)
source_issue.labels.append('Restrict-View-Commit')
target_issue_2.labels.append('Restrict-View-Commit')
self.services.issue.TestAddIssue(source_issue)
self.services.issue.TestAddIssue(target_issue)
self.services.issue.TestAddIssue(target_issue_2)
# We copy this list so that it isn't updated by the test framework
initial_issue_comments = self.services.issue.GetCommentsForIssue(
'fake cnxn', target_issue.issue_id)[:]
mr = testing_helpers.MakeMonorailRequest(user_info={'user_id': 111})
self.assertIsNotNone(
tracker_helpers.MergeCCsAndAddComment(
self.services, mr, source_issue, target_issue))
# When the source is restricted, we update the target comments...
updated_issue_comments = self.services.issue.GetCommentsForIssue(
'fake cnxn', target_issue.issue_id)
for comment in initial_issue_comments:
self.assertIn(comment, updated_issue_comments)
self.assertEqual(
len(initial_issue_comments) + 1, len(updated_issue_comments))
# ...but not the target CCs...
updated_target_issue = self.services.issue.GetIssueByLocalID(
'fake cnxn', 789, 10)
self.assertNotIn(111, updated_target_issue.cc_ids)
# ...unless both issues have the same restrictions.
self.assertIsNotNone(
tracker_helpers.MergeCCsAndAddComment(
self.services, mr, source_issue, target_issue_2))
updated_target_issue_2 = self.services.issue.GetIssueByLocalID(
'fake cnxn', 789, 11)
self.assertIn(111, updated_target_issue_2.cc_ids)
def testMergeCCsAndAddCommentMultipleIssues(self):
pass # TODO(jrobbins): Write this test.
def testGetAttachmentIfAllowed(self):
pass # TODO(jrobbins): Write this test.
def testLabelsMaskedByFields(self):
pass # TODO(jrobbins): Write this test.
def testLabelsNotMaskedByFields(self):
pass # TODO(jrobbins): Write this test.
def testLookupComponentIDs(self):
pass # TODO(jrobbins): Write this test.
def testParsePostDataUsers(self):
pd_users = 'a@example.com, b@example.com'
pd_users_ids, pd_users_str = tracker_helpers.ParsePostDataUsers(
self.cnxn, pd_users, self.services.user)
self.assertEqual([1, 2], sorted(pd_users_ids))
self.assertEqual('a@example.com, b@example.com', pd_users_str)
def testParsePostDataUsers_Empty(self):
pd_users = ''
pd_users_ids, pd_users_str = tracker_helpers.ParsePostDataUsers(
self.cnxn, pd_users, self.services.user)
self.assertEqual([], sorted(pd_users_ids))
self.assertEqual('', pd_users_str)
def testFilterIssueTypes(self):
pass # TODO(jrobbins): Write this test.
# ParseMergeFields is tested in IssueMergeTest.
# AddIssueStarrers is tested in IssueMergeTest.testMergeIssueStars().
# CanEditProjectIssue is tested in IssueMergeTest.
def testPairDerivedValuesWithRuleExplanations_Nothing(self):
"""Test we return nothing for an issue with no derived values."""
proposed_issue = tracker_pb2.Issue() # No derived values.
traces = {}
derived_users_by_id = {}
actual = tracker_helpers.PairDerivedValuesWithRuleExplanations(
proposed_issue, traces, derived_users_by_id)
(derived_labels_and_why, derived_owner_and_why,
derived_cc_and_why, warnings_and_why, errors_and_why) = actual
self.assertEqual([], derived_labels_and_why)
self.assertEqual([], derived_owner_and_why)
self.assertEqual([], derived_cc_and_why)
self.assertEqual([], warnings_and_why)
self.assertEqual([], errors_and_why)
def testPairDerivedValuesWithRuleExplanations_SomeValues(self):
"""Test we return derived values and explanations for an issue."""
proposed_issue = tracker_pb2.Issue(
derived_owner_id=111, derived_cc_ids=[222, 333],
derived_labels=['aaa', 'zzz'],
derived_warnings=['Watch out'],
derived_errors=['Status Assigned requires an owner'])
traces = {
(tracker_pb2.FieldID.OWNER, 111): 'explain 1',
(tracker_pb2.FieldID.CC, 222): 'explain 2',
(tracker_pb2.FieldID.CC, 333): 'explain 3',
(tracker_pb2.FieldID.LABELS, 'aaa'): 'explain 4',
(tracker_pb2.FieldID.WARNING, 'Watch out'): 'explain 6',
(tracker_pb2.FieldID.ERROR,
'Status Assigned requires an owner'): 'explain 7',
# There can be extra traces that are not used.
(tracker_pb2.FieldID.LABELS, 'bbb'): 'explain 5',
# If there is no trace for some derived value, why is None.
}
derived_users_by_id = {
111: testing_helpers.Blank(display_name='one@example.com'),
222: testing_helpers.Blank(display_name='two@example.com'),
333: testing_helpers.Blank(display_name='three@example.com'),
}
actual = tracker_helpers.PairDerivedValuesWithRuleExplanations(
proposed_issue, traces, derived_users_by_id)
(derived_labels_and_why, derived_owner_and_why,
derived_cc_and_why, warnings_and_why, errors_and_why) = actual
self.assertEqual([
{'value': 'aaa', 'why': 'explain 4'},
{'value': 'zzz', 'why': None},
], derived_labels_and_why)
self.assertEqual([
{'value': 'one@example.com', 'why': 'explain 1'},
], derived_owner_and_why)
self.assertEqual([
{'value': 'two@example.com', 'why': 'explain 2'},
{'value': 'three@example.com', 'why': 'explain 3'},
], derived_cc_and_why)
self.assertEqual([
{'value': 'Watch out', 'why': 'explain 6'},
], warnings_and_why)
self.assertEqual([
{'value': 'Status Assigned requires an owner', 'why': 'explain 7'},
], errors_and_why)
class MakeViewsForUsersInIssuesTest(unittest.TestCase):
def setUp(self):
self.issue1 = _Issue('proj', 1)
self.issue1.owner_id = 1001
self.issue1.reporter_id = 1002
self.issue2 = _Issue('proj', 2)
self.issue2.owner_id = 2001
self.issue2.reporter_id = 2002
self.issue2.cc_ids.extend([1, 1001, 1002, 1003])
self.issue3 = _Issue('proj', 3)
self.issue3.owner_id = 1001
self.issue3.reporter_id = 3002
self.user = fake.UserService()
for user_id in [1, 1001, 1002, 1003, 2001, 2002, 3002]:
self.user.TestAddUser(
'test%d' % user_id, user_id, add_user=True)
def testMakeViewsForUsersInIssues(self):
issue_list = [self.issue1, self.issue2, self.issue3]
users_by_id = tracker_helpers.MakeViewsForUsersInIssues(
'fake cnxn', issue_list, self.user)
six.assertCountEqual(
self, [0, 1, 1001, 1002, 1003, 2001, 2002, 3002],
list(users_by_id.keys()))
for user_id in [1001, 1002, 1003, 2001]:
self.assertEqual(users_by_id[user_id].user_id, user_id)
def testMakeViewsForUsersInIssuesOmittingSome(self):
issue_list = [self.issue1, self.issue2, self.issue3]
users_by_id = tracker_helpers.MakeViewsForUsersInIssues(
'fake cnxn', issue_list, self.user, omit_ids=[1001, 1003])
six.assertCountEqual(
self, [0, 1, 1002, 2001, 2002, 3002], list(users_by_id.keys()))
for user_id in [1002, 2001, 2002, 3002]:
self.assertEqual(users_by_id[user_id].user_id, user_id)
def testMakeViewsForUsersInIssuesEmpty(self):
issue_list = []
users_by_id = tracker_helpers.MakeViewsForUsersInIssues(
'fake cnxn', issue_list, self.user)
six.assertCountEqual(self, [], list(users_by_id.keys()))
class GetAllIssueProjectsTest(unittest.TestCase):
issue_x_1 = tracker_pb2.Issue()
issue_x_1.project_id = 789
issue_x_1.local_id = 1
issue_x_1.reporter_id = 1002
issue_x_2 = tracker_pb2.Issue()
issue_x_2.project_id = 789
issue_x_2.local_id = 2
issue_x_2.reporter_id = 2002
issue_y_1 = tracker_pb2.Issue()
issue_y_1.project_id = 678
issue_y_1.local_id = 1
issue_y_1.reporter_id = 2002
def setUp(self):
self.project_service = fake.ProjectService()
self.project_service.TestAddProject('proj-x', project_id=789)
self.project_service.TestAddProject('proj-y', project_id=678)
self.cnxn = 'fake connection'
def testGetAllIssueProjects_Empty(self):
self.assertEqual(
{}, tracker_helpers.GetAllIssueProjects(
self.cnxn, [], self.project_service))
def testGetAllIssueProjects_Normal(self):
self.assertEqual(
{789: self.project_service.GetProjectByName(self.cnxn, 'proj-x')},
tracker_helpers.GetAllIssueProjects(
self.cnxn, [self.issue_x_1, self.issue_x_2], self.project_service))
self.assertEqual(
{789: self.project_service.GetProjectByName(self.cnxn, 'proj-x'),
678: self.project_service.GetProjectByName(self.cnxn, 'proj-y')},
tracker_helpers.GetAllIssueProjects(
self.cnxn, [self.issue_x_1, self.issue_x_2, self.issue_y_1],
self.project_service))
class FilterOutNonViewableIssuesTest(unittest.TestCase):
owner_id = 111
committer_id = 222
nonmember_1_id = 1002
nonmember_2_id = 2002
nonmember_3_id = 3002
issue1 = tracker_pb2.Issue()
issue1.project_name = 'proj'
issue1.project_id = 789
issue1.local_id = 1
issue1.reporter_id = nonmember_1_id
issue2 = tracker_pb2.Issue()
issue2.project_name = 'proj'
issue2.project_id = 789
issue2.local_id = 2
issue2.reporter_id = nonmember_2_id
issue2.labels.extend(['foo', 'bar'])
issue3 = tracker_pb2.Issue()
issue3.project_name = 'proj'
issue3.project_id = 789
issue3.local_id = 3
issue3.reporter_id = nonmember_3_id
issue3.labels.extend(['restrict-view-commit'])
issue4 = tracker_pb2.Issue()
issue4.project_name = 'proj'
issue4.project_id = 789
issue4.local_id = 4
issue4.reporter_id = nonmember_3_id
issue4.labels.extend(['Foo', 'Restrict-View-Commit'])
def setUp(self):
self.user = user_pb2.User()
self.project = self.MakeProject(project_pb2.ProjectState.LIVE)
self.config = tracker_bizobj.MakeDefaultProjectIssueConfig(
self.project.project_id)
self.project_dict = {self.project.project_id: self.project}
self.config_dict = {self.config.project_id: self.config}
def MakeProject(self, state):
p = project_pb2.Project(
project_id=789, project_name='proj', state=state,
owner_ids=[self.owner_id], committer_ids=[self.committer_id])
return p
def testFilterOutNonViewableIssues_Member(self):
# perms will be permissions.COMMITTER_ACTIVE_PERMISSIONSET
filtered_issues = tracker_helpers.FilterOutNonViewableIssues(
{self.committer_id}, self.user, self.project_dict,
self.config_dict,
[self.issue1, self.issue2, self.issue3, self.issue4])
self.assertListEqual([1, 2, 3, 4],
[issue.local_id for issue in filtered_issues])
def testFilterOutNonViewableIssues_Owner(self):
# perms will be permissions.OWNER_ACTIVE_PERMISSIONSET
filtered_issues = tracker_helpers.FilterOutNonViewableIssues(
{self.owner_id}, self.user, self.project_dict, self.config_dict,
[self.issue1, self.issue2, self.issue3, self.issue4])
self.assertListEqual([1, 2, 3, 4],
[issue.local_id for issue in filtered_issues])
def testFilterOutNonViewableIssues_Empty(self):
# perms will be permissions.COMMITTER_ACTIVE_PERMISSIONSET
filtered_issues = tracker_helpers.FilterOutNonViewableIssues(
{self.committer_id}, self.user, self.project_dict,
self.config_dict, [])
self.assertListEqual([], filtered_issues)
def testFilterOutNonViewableIssues_NonMember(self):
# perms will be permissions.READ_ONLY_PERMISSIONSET
filtered_issues = tracker_helpers.FilterOutNonViewableIssues(
{self.nonmember_1_id}, self.user, self.project_dict,
self.config_dict, [self.issue1, self.issue2, self.issue3, self.issue4])
self.assertListEqual([1, 2],
[issue.local_id for issue in filtered_issues])
def testFilterOutNonViewableIssues_Reporter(self):
# perms will be permissions.READ_ONLY_PERMISSIONSET
filtered_issues = tracker_helpers.FilterOutNonViewableIssues(
{self.nonmember_3_id}, self.user, self.project_dict,
self.config_dict, [self.issue1, self.issue2, self.issue3, self.issue4])
self.assertListEqual([1, 2, 3, 4],
[issue.local_id for issue in filtered_issues])
class IssueMergeTest(unittest.TestCase):
def setUp(self):
self.cnxn = 'fake cnxn'
self.services = service_manager.Services(
config=fake.ConfigService(),
issue=fake.IssueService(),
user=fake.UserService(),
project=fake.ProjectService(),
issue_star=fake.IssueStarService(),
spam=fake.SpamService()
)
self.project = self.services.project.TestAddProject('proj', project_id=987)
self.config = tracker_bizobj.MakeDefaultProjectIssueConfig(
self.project.project_id)
self.project_dict = {self.project.project_id: self.project}
self.config_dict = {self.config.project_id: self.config}
def testParseMergeFields_NotSpecified(self):
issue = fake.MakeTestIssue(987, 1, 'summary', 'New', 111)
errors = template_helpers.EZTError()
post_data = {}
text, merge_into_issue = tracker_helpers.ParseMergeFields(
self.cnxn, None, 'proj', post_data, 'New', self.config, issue, errors)
self.assertEqual('', text)
self.assertEqual(None, merge_into_issue)
text, merge_into_issue = tracker_helpers.ParseMergeFields(
self.cnxn, None, 'proj', post_data, 'Duplicate', self.config, issue,
errors)
self.assertEqual('', text)
self.assertTrue(errors.merge_into_id)
self.assertEqual(None, merge_into_issue)
def testParseMergeFields_WrongStatus(self):
issue = fake.MakeTestIssue(987, 1, 'summary', 'New', 111)
errors = template_helpers.EZTError()
post_data = {'merge_into': '12'}
text, merge_into_issue = tracker_helpers.ParseMergeFields(
self.cnxn, None, 'proj', post_data, 'New', self.config, issue, errors)
self.assertEqual('', text)
self.assertEqual(None, merge_into_issue)
def testParseMergeFields_NoSuchIssue(self):
issue = fake.MakeTestIssue(987, 1, 'summary', 'New', 111)
issue.merged_into = 12
errors = template_helpers.EZTError()
post_data = {'merge_into': '12'}
text, merge_into_issue = tracker_helpers.ParseMergeFields(
self.cnxn, self.services, 'proj', post_data, 'Duplicate',
self.config, issue, errors)
self.assertEqual('12', text)
self.assertEqual(None, merge_into_issue)
def testParseMergeFields_DontSelfMerge(self):
issue = fake.MakeTestIssue(987, 1, 'summary', 'New', 111)
errors = template_helpers.EZTError()
post_data = {'merge_into': '1'}
text, merge_into_issue = tracker_helpers.ParseMergeFields(
self.cnxn, self.services, 'proj', post_data, 'Duplicate', self.config,
issue, errors)
self.assertEqual('1', text)
self.assertEqual(None, merge_into_issue)
self.assertEqual('Cannot merge issue into itself', errors.merge_into_id)
def testParseMergeFields_NewIssueToMerge(self):
merged_issue = fake.MakeTestIssue(
self.project.project_id,
1,
'unused_summary',
'unused_status',
111,
reporter_id=111)
self.services.issue.TestAddIssue(merged_issue)
mergee_issue = fake.MakeTestIssue(
self.project.project_id,
2,
'unused_summary',
'unused_status',
111,
reporter_id=111)
self.services.issue.TestAddIssue(mergee_issue)
errors = template_helpers.EZTError()
post_data = {'merge_into': str(mergee_issue.local_id)}
text, merge_into_issue = tracker_helpers.ParseMergeFields(
self.cnxn, self.services, 'proj', post_data, 'Duplicate', self.config,
merged_issue, errors)
self.assertEqual(str(mergee_issue.local_id), text)
self.assertEqual(mergee_issue, merge_into_issue)
def testCanEditProjectIssue(self):
mr = testing_helpers.MakeMonorailRequest()
issue = fake.MakeTestIssue(
self.project.project_id, 1, 'summary', 'New', 111)
issue.project_name = self.project.project_name
non_member_not_allowed = tracker_helpers.CanEditProjectIssue(
mr, self.project, issue, None)
self.assertEqual(False, non_member_not_allowed)
committer_id = 3
self.project.committer_ids.extend([committer_id])
mr.auth.effective_ids.add(committer_id)
committer_allowed = tracker_helpers.CanEditProjectIssue(
mr, self.project, issue, None)
self.assertEqual(True, committer_allowed)
self.project.state = project_pb2.ProjectState.ARCHIVED
committer_read_only_not_allowed = tracker_helpers.CanEditProjectIssue(
mr, self.project, issue, None)
self.assertEqual(False, committer_read_only_not_allowed)
owner_id = 1
self.project.owner_ids.extend([owner_id])
mr.auth.effective_ids.add(owner_id)
owner_read_only_not_allowed = tracker_helpers.CanEditProjectIssue(
mr, self.project, issue, None)
self.assertEqual(False, owner_read_only_not_allowed)
def testMergeIssueStars(self):
mr = testing_helpers.MakeMonorailRequest()
mr.project_name = self.project.project_name
mr.project = self.project
config = self.services.config.GetProjectConfig(
self.cnxn, self.project.project_id)
self.services.issue_star.SetStar(
self.cnxn, self.services, config, 1, 1, True)
self.services.issue_star.SetStar(
self.cnxn, self.services, config, 1, 2, True)
self.services.issue_star.SetStar(
self.cnxn, self.services, config, 1, 3, True)
self.services.issue_star.SetStar(
self.cnxn, self.services, config, 3, 3, True)
self.services.issue_star.SetStar(
self.cnxn, self.services, config, 3, 6, True)
self.services.issue_star.SetStar(
self.cnxn, self.services, config, 2, 3, True)
self.services.issue_star.SetStar(
self.cnxn, self.services, config, 2, 4, True)
self.services.issue_star.SetStar(
self.cnxn, self.services, config, 2, 5, True)
new_starrers = tracker_helpers.GetNewIssueStarrers(
self.cnxn, self.services, [1, 3], 2)
six.assertCountEqual(self, new_starrers, [1, 2, 6])
tracker_helpers.AddIssueStarrers(
self.cnxn, self.services, mr, 2, self.project, new_starrers)
issue_2_starrers = self.services.issue_star.LookupItemStarrers(
self.cnxn, 2)
# XXX(jrobbins): these tests incorrectly mix local IDs with IIDs.
six.assertCountEqual(self, [1, 2, 3, 4, 5, 6], issue_2_starrers)
class MergeLinkedMembersTest(unittest.TestCase):
def setUp(self):
self.cnxn = 'fake cnxn'
self.services = service_manager.Services(
user=fake.UserService())
self.user1 = self.services.user.TestAddUser('one@example.com', 111)
self.user2 = self.services.user.TestAddUser('two@example.com', 222)
def testNoLinkedAccounts(self):
"""When no candidate accounts are linked, they are all returned."""
actual = tracker_helpers._MergeLinkedMembers(
self.cnxn, self.services.user, [111, 222])
self.assertEqual([111, 222], actual)
def testSomeLinkedButNoMasking(self):
"""If an account has linked accounts, but they are not here, keep it."""
self.user1.linked_child_ids = [999]
self.user2.linked_parent_id = 999
actual = tracker_helpers._MergeLinkedMembers(
self.cnxn, self.services.user, [111, 222])
self.assertEqual([111, 222], actual)
def testParentMasksChild(self):
"""When two accounts linked, only the parent is returned."""
self.user2.linked_parent_id = 111
actual = tracker_helpers._MergeLinkedMembers(
self.cnxn, self.services.user, [111, 222])
self.assertEqual([111], actual)
class FilterMemberDataTest(unittest.TestCase):
def setUp(self):
services = service_manager.Services(
project=fake.ProjectService(),
config=fake.ConfigService(),
issue=fake.IssueService(),
user=fake.UserService())
self.owner_email = 'owner@dom.com'
self.committer_email = 'commit@dom.com'
self.contributor_email = 'contrib@dom.com'
self.indirect_member_email = 'ind@dom.com'
self.all_emails = [self.owner_email, self.committer_email,
self.contributor_email, self.indirect_member_email]
self.project = services.project.TestAddProject('proj')
def DoFiltering(self, perms, unsigned_user=False):
mr = testing_helpers.MakeMonorailRequest(
project=self.project, perms=perms)
if not unsigned_user:
mr.auth.user_id = 111
mr.auth.user_view = testing_helpers.Blank(domain='jrobbins.org')
return tracker_helpers._FilterMemberData(
mr, [self.owner_email], [self.committer_email],
[self.contributor_email], [self.indirect_member_email], mr.project)
def testUnsignedUser_NormalProject(self):
visible_members = self.DoFiltering(
permissions.READ_ONLY_PERMISSIONSET, unsigned_user=True)
six.assertCountEqual(
self, [
self.owner_email, self.committer_email, self.contributor_email,
self.indirect_member_email
], visible_members)
def testUnsignedUser_RestrictedProject(self):
self.project.only_owners_see_contributors = True
visible_members = self.DoFiltering(
permissions.READ_ONLY_PERMISSIONSET, unsigned_user=True)
six.assertCountEqual(
self,
[self.owner_email, self.committer_email, self.indirect_member_email],
visible_members)
def testOwnersAndAdminsCanSeeAll_NormalProject(self):
visible_members = self.DoFiltering(
permissions.OWNER_ACTIVE_PERMISSIONSET)
six.assertCountEqual(self, self.all_emails, visible_members)
visible_members = self.DoFiltering(
permissions.ADMIN_PERMISSIONSET)
six.assertCountEqual(self, self.all_emails, visible_members)
def testOwnersAndAdminsCanSeeAll_HubAndSpoke(self):
self.project.only_owners_see_contributors = True
visible_members = self.DoFiltering(
permissions.OWNER_ACTIVE_PERMISSIONSET)
six.assertCountEqual(self, self.all_emails, visible_members)
visible_members = self.DoFiltering(
permissions.ADMIN_PERMISSIONSET)
six.assertCountEqual(self, self.all_emails, visible_members)
visible_members = self.DoFiltering(
permissions.COMMITTER_ACTIVE_PERMISSIONSET)
six.assertCountEqual(self, self.all_emails, visible_members)
def testNonOwnersCanSeeAll_NormalProject(self):
visible_members = self.DoFiltering(
permissions.COMMITTER_ACTIVE_PERMISSIONSET)
six.assertCountEqual(self, self.all_emails, visible_members)
visible_members = self.DoFiltering(
permissions.CONTRIBUTOR_ACTIVE_PERMISSIONSET)
six.assertCountEqual(self, self.all_emails, visible_members)
def testCommittersSeeOnlySameDomain_HubAndSpoke(self):
self.project.only_owners_see_contributors = True
visible_members = self.DoFiltering(
permissions.CONTRIBUTOR_ACTIVE_PERMISSIONSET)
six.assertCountEqual(
self,
[self.owner_email, self.committer_email, self.indirect_member_email],
visible_members)
class GetLabelOptionsTest(unittest.TestCase):
@mock.patch('tracker.tracker_helpers.LabelsNotMaskedByFields')
def testGetLabelOptions(self, mockLabelsNotMaskedByFields):
mockLabelsNotMaskedByFields.return_value = []
config = tracker_pb2.ProjectIssueConfig()
custom_perms = []
actual = tracker_helpers.GetLabelOptions(config, custom_perms)
expected = [
{'doc': 'Only users who can edit the issue may access it',
'name': 'Restrict-View-EditIssue'},
{'doc': 'Only users who can edit the issue may add comments',
'name': 'Restrict-AddIssueComment-EditIssue'},
{'doc': 'Custom permission CoreTeam is needed to access',
'name': 'Restrict-View-CoreTeam'}
]
self.assertEqual(expected, actual)
def testBuildRestrictionChoices(self):
choices = tracker_helpers._BuildRestrictionChoices([], [], [])
self.assertEqual([], choices)
choices = tracker_helpers._BuildRestrictionChoices(
[], ['Hop', 'Jump'], [])
self.assertEqual([], choices)
freq = [('View', 'B', 'You need permission B to do anything'),
('A', 'B', 'You need B to use A')]
choices = tracker_helpers._BuildRestrictionChoices(freq, [], [])
expected = [dict(name='Restrict-View-B',
doc='You need permission B to do anything'),
dict(name='Restrict-A-B',
doc='You need B to use A')]
self.assertListEqual(expected, choices)
extra_perms = ['Over18', 'Over21']
choices = tracker_helpers._BuildRestrictionChoices(
[], ['Drink', 'Smoke'], extra_perms)
expected = [dict(name='Restrict-Drink-Over18',
doc='Permission Over18 needed to use Drink'),
dict(name='Restrict-Drink-Over21',
doc='Permission Over21 needed to use Drink'),
dict(name='Restrict-Smoke-Over18',
doc='Permission Over18 needed to use Smoke'),
dict(name='Restrict-Smoke-Over21',
doc='Permission Over21 needed to use Smoke')]
self.assertListEqual(expected, choices)
class FilterKeptAttachmentsTest(unittest.TestCase):
def testFilterKeptAttachments(self):
comments = [
tracker_pb2.IssueComment(
is_description=True,
attachments=[tracker_pb2.Attachment(attachment_id=1)]),
tracker_pb2.IssueComment(),
tracker_pb2.IssueComment(
is_description=True,
attachments=[
tracker_pb2.Attachment(attachment_id=2),
tracker_pb2.Attachment(attachment_id=3)]),
tracker_pb2.IssueComment(),
tracker_pb2.IssueComment(
approval_id=24,
is_description=True,
attachments=[tracker_pb2.Attachment(attachment_id=4)])]
filtered = tracker_helpers.FilterKeptAttachments(
True, [1, 2, 3, 4], comments, None)
self.assertEqual([2, 3], filtered)
def testApprovalDescription(self):
comments = [
tracker_pb2.IssueComment(
is_description=True,
attachments=[tracker_pb2.Attachment(attachment_id=1)]),
tracker_pb2.IssueComment(),
tracker_pb2.IssueComment(
is_description=True,
attachments=[
tracker_pb2.Attachment(attachment_id=2),
tracker_pb2.Attachment(attachment_id=3)]),
tracker_pb2.IssueComment(),
tracker_pb2.IssueComment(
approval_id=24,
is_description=True,
attachments=[tracker_pb2.Attachment(attachment_id=4)])]
filtered = tracker_helpers.FilterKeptAttachments(
True, [1, 2, 3, 4], comments, 24)
self.assertEqual([4], filtered)
def testNotAnIssueDescription(self):
comments = [
tracker_pb2.IssueComment(
is_description=True,
attachments=[tracker_pb2.Attachment(attachment_id=1)]),
tracker_pb2.IssueComment(),
tracker_pb2.IssueComment(
is_description=True,
attachments=[
tracker_pb2.Attachment(attachment_id=2),
tracker_pb2.Attachment(attachment_id=3)]),
tracker_pb2.IssueComment(),
tracker_pb2.IssueComment(
approval_id=24,
is_description=True,
attachments=[tracker_pb2.Attachment(attachment_id=4)])]
filtered = tracker_helpers.FilterKeptAttachments(
False, [1, 2, 3, 4], comments, None)
self.assertIsNone(filtered)
def testNoDescriptionsInComments(self):
comments = [
tracker_pb2.IssueComment(),
tracker_pb2.IssueComment()]
filtered = tracker_helpers.FilterKeptAttachments(
True, [1, 2, 3, 4], comments, None)
self.assertEqual([], filtered)
def testNoComments(self):
filtered = tracker_helpers.FilterKeptAttachments(
True, [1, 2, 3, 4], [], None)
self.assertEqual([], filtered)
class EnumFieldHelpersTest(unittest.TestCase):
def test_GetEnumFieldValuesAndDocstrings(self):
"""We can get all choices for an enum field"""
fd = tracker_pb2.FieldDef(
field_id=123,
project_id=1,
field_name='yellow',
field_type=tracker_pb2.FieldTypes.ENUM_TYPE)
ld_1 = tracker_pb2.LabelDef(
label='yellow-submarine', label_docstring='ld_1_docstring')
ld_2 = tracker_pb2.LabelDef(
label='yellow-tisket', label_docstring='ld_2_docstring')
ld_3 = tracker_pb2.LabelDef(
label='yellow-basket', label_docstring='ld_3_docstring')
ld_4 = tracker_pb2.LabelDef(
label='yellow', label_docstring='ld_4_docstring')
ld_5 = tracker_pb2.LabelDef(
label='not-yellow', label_docstring='ld_5_docstring')
ld_6 = tracker_pb2.LabelDef(
label='yellow-tasket',
label_docstring='ld_6_docstring',
deprecated=True)
config = tracker_pb2.ProjectIssueConfig(
default_template_for_developers=1,
default_template_for_users=2,
well_known_labels=[ld_1, ld_2, ld_3, ld_4, ld_5, ld_6])
actual = tracker_helpers._GetEnumFieldValuesAndDocstrings(fd, config)
# Expect to omit labels `yellow` and `not-yellow` due to prefix mismatch
# Also expect to omit label `yellow-tasket` because it's deprecated
expected = [
('submarine', 'ld_1_docstring'), ('tisket', 'ld_2_docstring'),
('basket', 'ld_3_docstring')
]
self.assertEqual(expected, actual)
class CreateIssueHelpersTest(unittest.TestCase):
def setUp(self):
self.services = service_manager.Services(
project=fake.ProjectService(),
config=fake.ConfigService(),
issue=fake.IssueService(),
user=fake.UserService(),
usergroup=fake.UserGroupService())
self.cnxn = 'fake cnxn'
self.project_member = self.services.user.TestAddUser(
'user_1@example.com', 111)
self.project_group_member = self.services.user.TestAddUser(
'group@example.com', 999)
self.project = self.services.project.TestAddProject(
'proj',
project_id=789,
committer_ids=[
self.project_member.user_id, self.project_group_member.user_id
])
self.no_project_user = self.services.user.TestAddUser(
'user_2@example.com', 222)
self.config = fake.MakeTestConfig(self.project.project_id, [], [])
self.int_fd = tracker_bizobj.MakeFieldDef(
123, 789, 'CPU', tracker_pb2.FieldTypes.INT_TYPE, None, '', False,
False, False, None, None, '', False, '', '',
tracker_pb2.NotifyTriggers.NEVER, 'no_action', 'doc', False)
self.int_fd.max_value = 999
self.config.field_defs = [self.int_fd]
self.status_1 = tracker_pb2.StatusDef(
status='New', means_open=True, status_docstring='status_1 docstring')
self.config.well_known_statuses = [self.status_1]
self.component_def_1 = tracker_pb2.ComponentDef(
component_id=1, path='compFOO')
self.component_def_2 = tracker_pb2.ComponentDef(
component_id=2, path='deprecated', deprecated=True)
self.config.component_defs = [self.component_def_1, self.component_def_2]
self.services.config.StoreConfig('cnxn', self.config)
self.services.usergroup.TestAddGroupSettings(999, 'group@example.com')
def testAssertValidIssueForCreate_Valid(self):
input_issue = tracker_pb2.Issue(
summary='sum',
status='New',
owner_id=111,
project_id=789,
component_ids=[1],
cc_ids=[999])
tracker_helpers.AssertValidIssueForCreate(
self.cnxn, self.services, input_issue, 'nonempty description')
def testAssertValidIssueForCreate_ValidatesLabels(self):
input_issue = tracker_pb2.Issue(
summary='sum',
labels=['freeze_new_label'],
status='New',
owner_id=111,
project_id=789)
with self.assertRaisesRegex(
exceptions.InputException,
("The creation of new labels is blocked for the Chromium project"
" in Monorail. To continue with editing your issue, please"
" remove: freeze_new_label label\\(s\\)")):
tracker_helpers.AssertValidIssueForCreate(
self.cnxn, self.services, input_issue, 'nonempty description')
def testAssertValidIssueForCreate_ValidatesOwner(self):
input_issue = tracker_pb2.Issue(
summary='sum', status='New', owner_id=222, project_id=789)
with self.assertRaisesRegex(exceptions.InputException,
'Issue owner must be a project member'):
tracker_helpers.AssertValidIssueForCreate(
self.cnxn, self.services, input_issue, 'nonempty description')
input_issue.owner_id = 333
with self.assertRaisesRegex(exceptions.InputException,
'Issue owner user ID not found'):
tracker_helpers.AssertValidIssueForCreate(
self.cnxn, self.services, input_issue, 'nonempty description')
input_issue.owner_id = 999
with self.assertRaisesRegex(exceptions.InputException,
'Issue owner cannot be a user group'):
tracker_helpers.AssertValidIssueForCreate(
self.cnxn, self.services, input_issue, 'nonempty description')
def testAssertValidIssueForCreate_ValidatesSummary(self):
input_issue = tracker_pb2.Issue(
summary='', status='New', owner_id=111, project_id=789)
with self.assertRaisesRegex(exceptions.InputException,
'Summary is required'):
tracker_helpers.AssertValidIssueForCreate(
self.cnxn, self.services, input_issue, 'nonempty description')
input_issue.summary = ' '
tracker_helpers.AssertValidIssueForCreate(
self.cnxn, self.services, input_issue, 'nonempty description')
def testAssertValidIssueForCreate_ValidatesDescription(self):
input_issue = tracker_pb2.Issue(
summary='sum', status='New', owner_id=111, project_id=789)
with self.assertRaisesRegex(exceptions.InputException,
'Description is required'):
tracker_helpers.AssertValidIssueForCreate(
self.cnxn, self.services, input_issue, '')
tracker_helpers.AssertValidIssueForCreate(
self.cnxn, self.services, input_issue, ' ')
def testAssertValidIssueForCreate_ValidatesFieldDef(self):
fv = tracker_bizobj.MakeFieldValue(
self.int_fd.field_id, 1000, None, None, None, None, False)
input_issue = tracker_pb2.Issue(
summary='sum',
status='New',
owner_id=111,
project_id=789,
field_values=[fv])
with self.assertRaises(exceptions.InputException):
tracker_helpers.AssertValidIssueForCreate(
self.cnxn, self.services, input_issue, 'nonempty description')
def testAssertValidIssueForCreate_ValidatesStatus(self):
input_issue = tracker_pb2.Issue(
summary='sum', status='DNE_status', owner_id=111, project_id=789)
def mock_status_lookup(*_args, **_kwargs):
return None
self.services.config.LookupStatusID = mock_status_lookup
with self.assertRaisesRegex(exceptions.InputException,
'Undefined status: DNE_status'):
tracker_helpers.AssertValidIssueForCreate(
self.cnxn, self.services, input_issue, 'nonempty description')
def testAssertValidIssueForCreate_ValidatesComponents(self):
# Tests an undefined component.
input_issue = tracker_pb2.Issue(
summary='',
status='New',
owner_id=111,
project_id=789,
component_ids=[3])
with self.assertRaisesRegex(exceptions.InputException,
'Undefined or deprecated component with id: 3'):
tracker_helpers.AssertValidIssueForCreate(
self.cnxn, self.services, input_issue, 'nonempty description')
# Tests a deprecated component.
input_issue = tracker_pb2.Issue(
summary='',
status='New',
owner_id=111,
project_id=789,
component_ids=[self.component_def_2.component_id])
with self.assertRaisesRegex(exceptions.InputException,
'Undefined or deprecated component with id: 2'):
tracker_helpers.AssertValidIssueForCreate(
self.cnxn, self.services, input_issue, 'nonempty description')
def testAssertValidIssueForCreate_ValidatesUsers(self):
user_fd = tracker_bizobj.MakeFieldDef(
123, 789, 'CPU', tracker_pb2.FieldTypes.INT_TYPE, None, '', False,
False, False, None, None, '', False, '', '',
tracker_pb2.NotifyTriggers.NEVER, 'no_action', 'doc', False)
self.services.config.TestAddFieldDef(user_fd)
input_issue = tracker_pb2.Issue(
summary='sum',
status='New',
owner_id=111,
project_id=789,
cc_ids=[123],
field_values=[
tracker_bizobj.MakeFieldValue(
user_fd.field_id, None, None, 124, None, None, False)
])
copied_issue = copy.deepcopy(input_issue)
with self.assertRaisesRegex(exceptions.InputException,
r'users/123: .+\nusers/124: .+'):
tracker_helpers.AssertValidIssueForCreate(
self.cnxn, self.services, input_issue, 'nonempty description')
self.assertEqual(input_issue, copied_issue)
self.services.user.TestAddUser('a@test.com', 123)
self.services.user.TestAddUser('a@test.com', 124)
tracker_helpers.AssertValidIssueForCreate(
self.cnxn, self.services, input_issue, 'nonempty description')
self.assertEqual(input_issue, copied_issue)
class ModifyIssuesHelpersTest(unittest.TestCase):
def setUp(self):
self.services = service_manager.Services(
project=fake.ProjectService(),
config=fake.ConfigService(),
issue=fake.IssueService(),
issue_star=fake.IssueStarService(),
user=fake.UserService(),
usergroup=fake.UserGroupService())
self.cnxn = 'fake cnxn'
self.project_member = self.services.user.TestAddUser(
'user_1@example.com', 111)
self.project = self.services.project.TestAddProject(
'proj', project_id=789, committer_ids=[self.project_member.user_id])
self.no_project_user = self.services.user.TestAddUser(
'user_2@example.com', 222)
self.config = fake.MakeTestConfig(self.project.project_id, [], [])
self.int_fd = tracker_bizobj.MakeFieldDef(
123, 789, 'CPU', tracker_pb2.FieldTypes.INT_TYPE, None, '', False,
False, False, None, None, '', False, '', '',
tracker_pb2.NotifyTriggers.NEVER, 'no_action', 'doc', False)
self.int_fd.max_value = 999
self.config.field_defs = [self.int_fd]
self.services.config.StoreConfig('cnxn', self.config)
def testApplyAllIssueChanges(self):
issue_delta_pairs = []
no_change_iid = 78942
expected_issues_to_update = {}
expected_amendments = {}
expected_imp_amendments = {}
expected_old_owners = {}
expected_old_statuses = {}
expected_old_components = {}
expected_merged_from_add = {}
expected_new_starrers = {}
issue_main = _Issue('proj', 100)
issue_main_ref = ('proj', issue_main.local_id)
issue_main.owner_id = 999
issue_main.cc_ids = [111, 222]
issue_main.labels = ['dont_touch', 'remove_me']
expected_main = copy.deepcopy(issue_main)
expected_main.owner_id = 888
expected_main.cc_ids = [111, 333]
expected_main.labels = ['dont_touch', 'add_me']
expected_amendments[issue_main.issue_id] = [
tracker_bizobj.MakeOwnerAmendment(888, 999),
tracker_bizobj.MakeCcAmendment([333], [222]),
tracker_bizobj.MakeLabelsAmendment(['add_me'], ['remove_me'])
]
expected_old_owners[issue_main.issue_id] = 999
# blocked_on issues changes setup.
bo_add = _Issue('proj', 1)
self.services.issue.TestAddIssue(bo_add)
expected_bo_add = copy.deepcopy(bo_add)
# All impacted issues should be fetched within ApplyAllIssueChanges
# directly from the DB, skipping cache with `use_cache=False` in GetIssue().
# So we expect these issues to have assume_stale=False.
expected_bo_add.assume_stale = False
expected_bo_add.blocking_iids = [issue_main.issue_id]
expected_issues_to_update[expected_bo_add.issue_id] = expected_bo_add
expected_imp_amendments[bo_add.issue_id] = [
tracker_bizobj.MakeBlockingAmendment(
[issue_main_ref], [], default_project_name='proj')
]
bo_remove = _Issue('proj', 2)
bo_remove.blocking_iids = [issue_main.issue_id]
self.services.issue.TestAddIssue(bo_remove)
expected_bo_remove = copy.deepcopy(bo_remove)
expected_bo_remove.assume_stale = False
expected_bo_remove.blocking_iids = []
expected_issues_to_update[expected_bo_remove.issue_id] = expected_bo_remove
expected_imp_amendments[bo_remove.issue_id] = [
tracker_bizobj.MakeBlockingAmendment(
[], [issue_main_ref], default_project_name='proj')
]
issue_main.blocked_on_iids = [no_change_iid, bo_remove.issue_id]
# By default new blocked_on issues that appear in blocked_on_iids
# with no prior rank associated with it are un-ranked and assigned rank 0.
# See SortBlockedOn in issue_svc.py.
issue_main.blocked_on_ranks = [0, 0]
expected_main.blocked_on_iids = [no_change_iid, bo_add.issue_id]
expected_main.blocked_on_ranks = [0, 0]
expected_amendments[issue_main.issue_id].append(
tracker_bizobj.MakeBlockedOnAmendment(
[('proj', bo_add.local_id)], [('proj', bo_remove.local_id)],
default_project_name='proj'))
# blocking_issues changes setup.
b_add = _Issue('proj', 3)
self.services.issue.TestAddIssue(b_add)
expected_b_add = copy.deepcopy(b_add)
expected_b_add.assume_stale = False
expected_b_add.blocked_on_iids = [issue_main.issue_id]
expected_b_add.blocked_on_ranks = [0]
expected_issues_to_update[expected_b_add.issue_id] = expected_b_add
expected_imp_amendments[b_add.issue_id] = [
tracker_bizobj.MakeBlockedOnAmendment(
[issue_main_ref], [], default_project_name='proj')
]
b_remove = _Issue('proj', 4)
b_remove.blocked_on_iids = [issue_main.issue_id]
self.services.issue.TestAddIssue(b_remove)
expected_b_remove = copy.deepcopy(b_remove)
expected_b_remove.assume_stale = False
expected_b_remove.blocked_on_iids = []
# Test we can process delta changes and impact changes.
delta_b_remove = tracker_pb2.IssueDelta(labels_add=['more_chickens'])
expected_b_remove.labels = ['more_chickens']
issue_delta_pairs.append((b_remove, delta_b_remove))
expected_issues_to_update[expected_b_remove.issue_id] = expected_b_remove
expected_imp_amendments[b_remove.issue_id] = [
tracker_bizobj.MakeBlockedOnAmendment(
[], [issue_main_ref], default_project_name='proj')
]
expected_amendments[b_remove.issue_id] = [
tracker_bizobj.MakeLabelsAmendment(['more_chickens'], [])
]
issue_main.blocking_iids = [no_change_iid, b_remove.issue_id]
expected_main.blocking_iids = [no_change_iid, b_add.issue_id]
expected_amendments[issue_main.issue_id].append(
tracker_bizobj.MakeBlockingAmendment(
[('proj', b_add.local_id)], [('proj', b_remove.local_id)],
default_project_name='proj'))
# Merged issues changes setup.
merge_remove = _Issue('proj', 5)
self.services.issue.TestAddIssue(merge_remove)
expected_merge_remove = copy.deepcopy(merge_remove)
expected_merge_remove.assume_stale = False
expected_issues_to_update[
expected_merge_remove.issue_id] = expected_merge_remove
expected_imp_amendments[merge_remove.issue_id] = [
tracker_bizobj.MakeMergedIntoAmendment(
[], [issue_main_ref], default_project_name='proj')
]
merge_add = _Issue('proj', 6)
self.services.issue.TestAddIssue(merge_add)
expected_merge_add = copy.deepcopy(merge_add)
expected_merge_add.assume_stale = False
# We are adding 333 and removing 222 in issue_main with delta_main.
expected_merge_add.cc_ids = sorted([expected_main.owner_id, 111, 333])
expected_merged_from_add[expected_merge_add.issue_id] = [
issue_main.issue_id
]
expected_imp_amendments[merge_add.issue_id] = [
tracker_bizobj.MakeCcAmendment(expected_merge_add.cc_ids, []),
tracker_bizobj.MakeMergedIntoAmendment(
[issue_main_ref], [], default_project_name='proj')
]
# We are merging issue_main into merge_add, so issue_main's starrers
# should be merged into merge_add's starrers.
self.services.issue_star.SetStar(
self.cnxn, self.services, None, issue_main.issue_id, 111, True)
self.services.issue_star.SetStar(
self.cnxn, self.services, None, issue_main.issue_id, 222, True)
expected_merge_add.star_count = 2
expected_new_starrers[merge_add.issue_id] = [222, 111]
expected_issues_to_update[expected_merge_add.issue_id] = expected_merge_add
issue_main.merged_into = merge_remove.issue_id
expected_main.merged_into = merge_add.issue_id
expected_amendments[issue_main.issue_id].append(
tracker_bizobj.MakeMergedIntoAmendment(
[('proj', merge_add.local_id)], [('proj', merge_remove.local_id)],
default_project_name='proj'))
self.services.issue.TestAddIssue(issue_main)
expected_issues_to_update[expected_main.issue_id] = expected_main
# Issues we'll put in delta_main.*_remove fields that aren't in issue_main.
# These issues should not show up in issues_to_update.
missing_1 = _Issue('proj', 404)
expected_missing_1 = copy.deepcopy(missing_1)
expected_missing_1.assume_stale = False
self.services.issue.TestAddIssue(missing_1)
missing_2 = _Issue('proj', 405)
self.services.issue.TestAddIssue(missing_2)
expected_missing_2 = copy.deepcopy(missing_2)
expected_missing_2.assume_stale = False
delta_main = tracker_pb2.IssueDelta(
owner_id=888,
cc_ids_remove=[222, 404], cc_ids_add=[333],
labels_remove=['remove_me', 'remove_404'], labels_add=['add_me'],
merged_into=merge_add.issue_id,
blocked_on_add=[bo_add.issue_id],
blocked_on_remove=[bo_remove.issue_id, missing_1.issue_id],
blocking_add=[b_add.issue_id],
blocking_remove=[b_remove.issue_id, missing_2.issue_id])
issue_delta_pairs.append((issue_main, delta_main))
actual_tuple = tracker_helpers.ApplyAllIssueChanges(
self.cnxn, issue_delta_pairs, self.services)
expected_tuple = tracker_helpers._IssueChangesTuple(
expected_issues_to_update, expected_merged_from_add,
expected_amendments, expected_imp_amendments, expected_old_owners,
expected_old_statuses, expected_old_components, expected_new_starrers)
self.assertEqual(actual_tuple, expected_tuple)
self.assertEqual(missing_1, expected_missing_1)
self.assertEqual(missing_2, expected_missing_2)
def testApplyAllIssueChanges_NOOP(self):
"""Check we can ignore issue-delta pairs that are NOOP."""
noop_issue = _Issue('proj', 1)
bo_add_noop = _Issue('proj', 2)
bo_remove_noop = _Issue('proj', 3)
noop_issue.owner_id = 111
noop_issue.cc_ids = [222]
noop_issue.blocked_on_iids = [bo_add_noop.issue_id]
bo_add_noop.blocking_iids = [noop_issue.issue_id]
self.services.issue.TestAddIssue(noop_issue)
self.services.issue.TestAddIssue(bo_add_noop)
self.services.issue.TestAddIssue(bo_remove_noop)
expected_noop_issue = copy.deepcopy(noop_issue)
noop_delta = tracker_pb2.IssueDelta(
owner_id=noop_issue.owner_id,
cc_ids_add=noop_issue.cc_ids, cc_ids_remove=[333],
blocked_on_add=noop_issue.blocked_on_iids,
blocked_on_remove=[bo_remove_noop.issue_id])
issue_delta_pairs = [(noop_issue, noop_delta)]
actual_tuple = tracker_helpers.ApplyAllIssueChanges(
self.cnxn, issue_delta_pairs, self.services)
expected_tuple = tracker_helpers._IssueChangesTuple(
{}, {}, {}, {}, {}, {}, {}, {})
self.assertEqual(actual_tuple, expected_tuple)
self.assertEqual(noop_issue, expected_noop_issue)
def testApplyAllIssueChanges_Empty(self):
issue_delta_pairs = []
actual_tuple = tracker_helpers.ApplyAllIssueChanges(
self.cnxn, issue_delta_pairs, self.services)
expected_tuple = tracker_helpers._IssueChangesTuple(
{}, {}, {}, {}, {}, {}, {}, {})
self.assertEqual(actual_tuple, expected_tuple)
def testUpdateClosedTimestamp(self):
config = tracker_pb2.ProjectIssueConfig()
config.well_known_statuses.append(
tracker_pb2.StatusDef(status='New', means_open=True))
config.well_known_statuses.append(
tracker_pb2.StatusDef(status='Accepted', means_open=True))
config.well_known_statuses.append(
tracker_pb2.StatusDef(status='Old', means_open=False))
config.well_known_statuses.append(
tracker_pb2.StatusDef(status='Closed', means_open=False))
issue = tracker_pb2.Issue()
issue.local_id = 1234
issue.status = 'New'
# ensure the default value is undef
self.assertTrue(not issue.closed_timestamp)
# ensure transitioning to the same and other open states
# doesn't set the timestamp
issue.status = 'New'
tracker_helpers.UpdateClosedTimestamp(config, issue, 'New')
self.assertTrue(not issue.closed_timestamp)
issue.status = 'Accepted'
tracker_helpers.UpdateClosedTimestamp(config, issue, 'New')
self.assertTrue(not issue.closed_timestamp)
# ensure transitioning from open to closed sets the timestamp
issue.status = 'Closed'
tracker_helpers.UpdateClosedTimestamp(config, issue, 'Accepted')
self.assertTrue(issue.closed_timestamp)
# ensure that the timestamp is cleared when transitioning from
# closed to open
issue.status = 'New'
tracker_helpers.UpdateClosedTimestamp(config, issue, 'Closed')
self.assertTrue(not issue.closed_timestamp)
def testGroupUniqueDeltaIssues(self):
"""We can identify unique IssueDeltas and group Issues by their deltas."""
issue_1 = _Issue('proj', 1)
delta_1 = tracker_pb2.IssueDelta(cc_ids_add=[111])
issue_2 = _Issue('proj', 2)
delta_2 = tracker_pb2.IssueDelta(cc_ids_add=[111], cc_ids_remove=[222])
issue_3 = _Issue('proj', 3)
delta_3 = tracker_pb2.IssueDelta(cc_ids_add=[111])
issue_4 = _Issue('proj', 4)
delta_4 = tracker_pb2.IssueDelta()
issue_5 = _Issue('proj', 5)
delta_5 = tracker_pb2.IssueDelta()
issue_delta_pairs = [
(issue_1, delta_1), (issue_2, delta_2), (issue_3, delta_3),
(issue_4, delta_4), (issue_5, delta_5)
]
unique_deltas, issues_for_deltas = tracker_helpers.GroupUniqueDeltaIssues(
issue_delta_pairs)
expected_unique_deltas = [delta_1, delta_2, delta_4]
self.assertEqual(unique_deltas, expected_unique_deltas)
expected_issues_for_deltas = [
[issue_1, issue_3], [issue_2], [issue_4, issue_5]
]
self.assertEqual(issues_for_deltas, expected_issues_for_deltas)
def testEnforceAttachmentQuotaLimits(self):
self.services.project.TestAddProject('Circe', project_id=798)
issue_a1 = _Issue('Circe', 1, project_id=798)
delta_a1 = tracker_pb2.IssueDelta()
issue_a2 = _Issue('Circe', 2, project_id=798)
delta_a2 = tracker_pb2.IssueDelta()
self.services.project.TestAddProject('Patroclus', project_id=788)
issue_b1 = _Issue('Patroclus', 1, project_id=788)
delta_b1 = tracker_pb2.IssueDelta()
issue_delta_pairs = [
(issue_a1, delta_a1), (issue_a2, delta_a2), (issue_b1, delta_b1)
]
upload_1 = framework_helpers.AttachmentUpload(
'dragon', b'OOOOOO\n', 'text/plain')
upload_2 = framework_helpers.AttachmentUpload(
'snake', b'ooooo\n', 'text/plain')
attachment_uploads = [upload_1, upload_2]
actual = tracker_helpers._EnforceAttachmentQuotaLimits(
self.cnxn, issue_delta_pairs, self.services, attachment_uploads)
expected = {
798: len(upload_1.contents + upload_2.contents) * 2,
788: len(upload_1.contents + upload_2.contents)
}
self.assertEqual(actual, expected)
@mock.patch('tracker.tracker_constants.ISSUE_ATTACHMENTS_QUOTA_HARD', 1)
def testEnforceAttachmentQuotaLimits_Exceeded(self):
self.services.project.TestAddProject('Circe', project_id=798)
issue_a1 = _Issue('Circe', 1, project_id=798)
delta_a1 = tracker_pb2.IssueDelta()
issue_a2 = _Issue('Circe', 2, project_id=798)
delta_a2 = tracker_pb2.IssueDelta()
self.services.project.TestAddProject('Patroclus', project_id=788)
issue_b1 = _Issue('Patroclus', 1, project_id=788)
delta_b1 = tracker_pb2.IssueDelta()
issue_delta_pairs = [
(issue_a1, delta_a1), (issue_a2, delta_a2), (issue_b1, delta_b1)
]
upload_1 = framework_helpers.AttachmentUpload(
'dragon', b'OOOOOO\n', 'text/plain')
upload_2 = framework_helpers.AttachmentUpload(
'snake', b'ooooo\n', 'text/plain')
attachment_uploads = [upload_1, upload_2]
with self.assertRaisesRegex(exceptions.OverAttachmentQuota,
r'.+ project Patroclus\n.+ project Circe'):
tracker_helpers._EnforceAttachmentQuotaLimits(
self.cnxn, issue_delta_pairs, self.services, attachment_uploads)
def testAssertIssueChangesValid_Valid(self):
"""We can assert when deltas are valid for issues."""
impacted_issue = _Issue('chicken', 101)
self.services.issue.TestAddIssue(impacted_issue)
issue_1 = _Issue('chicken', 1)
self.services.issue.TestAddIssue(issue_1)
delta_1 = tracker_pb2.IssueDelta(
merged_into=impacted_issue.issue_id, status='Duplicate')
exp_d1 = copy.deepcopy(delta_1)
issue_2 = _Issue('chicken', 2)
self.services.issue.TestAddIssue(issue_2)
delta_2 = tracker_pb2.IssueDelta(blocked_on_add=[impacted_issue.issue_id])
exp_d2 = copy.deepcopy(delta_2)
issue_3 = _Issue('chicken', 3)
self.services.issue.TestAddIssue(issue_3)
delta_3 = tracker_pb2.IssueDelta()
exp_d3 = copy.deepcopy(delta_3)
issue_4 = _Issue('chicken', 4)
self.services.issue.TestAddIssue(issue_4)
delta_4 = tracker_pb2.IssueDelta(owner_id=self.project_member.user_id)
exp_d4 = copy.deepcopy(delta_4)
issue_5 = _Issue('chicken', 5)
self.services.issue.TestAddIssue(issue_5)
fv = tracker_bizobj.MakeFieldValue(
self.int_fd.field_id, 998, None, None, None, None, False)
delta_5 = tracker_pb2.IssueDelta(field_vals_add=[fv])
exp_d5 = copy.deepcopy(delta_5)
issue_6 = _Issue('chicken', 6)
self.services.issue.TestAddIssue(issue_6)
delta_6 = tracker_pb2.IssueDelta(
summary=' ' + 's' * tracker_constants.MAX_SUMMARY_CHARS + ' ')
exp_d6 = copy.deepcopy(delta_6)
issue_7 = _Issue('chicken', 7)
self.services.issue.TestAddIssue(issue_7)
issue_8 = _Issue('chicken', 8)
self.services.issue.TestAddIssue(issue_8)
# We are fine with duplicate/consistent deltas.
delta_7 = tracker_pb2.IssueDelta(blocked_on_add=[issue_8.issue_id])
exp_d7 = copy.deepcopy(delta_7)
delta_8 = tracker_pb2.IssueDelta(blocking_add=[issue_7.issue_id])
exp_d8 = copy.deepcopy(delta_8)
issue_9 = _Issue('chicken', 9)
self.services.issue.TestAddIssue(issue_9)
issue_10 = _Issue('chicken', 10)
self.services.issue.TestAddIssue(issue_10)
delta_9 = tracker_pb2.IssueDelta(blocked_on_remove=[issue_10.issue_id])
exp_d9 = copy.deepcopy(delta_9)
delta_10 = tracker_pb2.IssueDelta(blocking_remove=[issue_9.issue_id])
exp_d10 = copy.deepcopy(delta_10)
issue_11 = _Issue('chicken', 11)
user_fd = tracker_bizobj.MakeFieldDef(
123, 789, 'CPU', tracker_pb2.FieldTypes.USER_TYPE, None, '', False,
False, False, None, None, '', False, '', '',
tracker_pb2.NotifyTriggers.NEVER, 'no_action', 'doc', False)
self.services.config.TestAddFieldDef(user_fd)
a_user = self.services.user.TestAddUser('a_user@test.com', 123)
delta_11 = tracker_pb2.IssueDelta(
cc_ids_add=[222],
field_vals_add=[
tracker_bizobj.MakeFieldValue(
user_fd.field_id, None, None, a_user.user_id, None, None, False)
])
exp_d11 = copy.deepcopy(delta_11)
issue_delta_pairs = [
(issue_1, delta_1), (issue_2, delta_2), (issue_3, delta_3),
(issue_4, delta_4), (issue_5, delta_5), (issue_6, delta_6),
(issue_7, delta_7), (issue_8, delta_8), (issue_9, delta_9),
(issue_10, delta_10), (issue_11, delta_11)
]
comment = ' ' + 'c' * tracker_constants.MAX_COMMENT_CHARS + ' '
tracker_helpers._AssertIssueChangesValid(
self.cnxn, issue_delta_pairs, self.services, comment_content=comment)
# Check we can handle None `comment_content`.
tracker_helpers._AssertIssueChangesValid(
self.cnxn, issue_delta_pairs, self.services)
self.assertEqual(
[
exp_d1, exp_d2, exp_d3, exp_d4, exp_d5, exp_d6, exp_d7, exp_d8,
exp_d9, exp_d10, exp_d11
], [
delta_1, delta_2, delta_3, delta_4, delta_5, delta_6, delta_7,
delta_8, delta_9, delta_10, delta_11
])
def testAssertIssueChangesValid_ValidatesLabels(self):
"""Asserts labels."""
issue_1 = _Issue('chicken', 1)
self.services.issue.TestAddIssue(issue_1)
delta_1 = tracker_pb2.IssueDelta(labels_add=['freeze_new_label'])
issue_delta_pairs = [(issue_1, delta_1)]
comment = 'just a plain comment'
with self.assertRaisesRegex(
exceptions.InputException,
("The creation of new labels is blocked for the Chromium project"
" in Monorail. To continue with editing your issue, please"
" remove: freeze_new_label label\\(s\\).")):
tracker_helpers._AssertIssueChangesValid(
self.cnxn, issue_delta_pairs, self.services, comment_content=comment)
def testAssertIssueChangesValid_RequiredField(self):
"""Asserts fields and requried fields.."""
issue_1 = _Issue('chicken', 1)
self.services.issue.TestAddIssue(issue_1)
delta_1 = tracker_pb2.IssueDelta()
exp_d1 = copy.deepcopy(delta_1)
required_fd = tracker_bizobj.MakeFieldDef(
124, 789, 'StrField', tracker_pb2.FieldTypes.STR_TYPE, None, '', True,
False, False, None, None, '', False, '', '',
tracker_pb2.NotifyTriggers.NEVER, 'no_action', 'doc', False)
self.services.config.TestAddFieldDef(required_fd)
issue_delta_pairs = [(issue_1, delta_1)]
comment = 'just a plain comment'
tracker_helpers._AssertIssueChangesValid(
self.cnxn, issue_delta_pairs, self.services, comment_content=comment)
# Check we can handle adding a field value when issue is in invalid state.
fv = tracker_bizobj.MakeFieldValue(
self.int_fd.field_id, 998, None, None, None, None, False)
delta_2 = tracker_pb2.IssueDelta(field_vals_add=[fv])
exp_d2 = copy.deepcopy(delta_2)
tracker_helpers._AssertIssueChangesValid(
self.cnxn, issue_delta_pairs, self.services)
self.assertEqual([exp_d1, exp_d2], [delta_1, delta_2])
def testAssertIssueChangesValid_Invalid(self):
"""We can raise exceptions when deltas are not valid for issues. """
def getRef(issue):
return '%s:%d' % (issue.project_name, issue.local_id)
issue_delta_pairs = []
expected_err_msgs = []
comment = 'c' * (tracker_constants.MAX_COMMENT_CHARS + 1)
expected_err_msgs.append('Comment is too long.')
issue_1 = _Issue('chicken', 1)
self.services.issue.TestAddIssue(issue_1)
issue_1_ref = getRef(issue_1)
delta_1 = tracker_pb2.IssueDelta(
merged_into=issue_1.issue_id,
blocked_on_add=[issue_1.issue_id],
summary='',
status='',
cc_ids_add=[9876])
issue_delta_pairs.append((issue_1, delta_1))
expected_err_msgs.extend(
[
('%s: MERGED type statuses must accompany mergedInto values.') %
issue_1_ref,
'%s: Cannot merge an issue into itself.' % issue_1_ref,
'%s: Cannot block an issue on itself.' % issue_1_ref,
'users/9876: User does not exist.',
'%s: Summary required.' % issue_1_ref,
'%s: Status is required.' % issue_1_ref
])
issue_2 = _Issue('chicken', 2)
self.services.issue.TestAddIssue(issue_2)
issue_2_ref = getRef(issue_2)
fv = tracker_bizobj.MakeFieldValue(
self.int_fd.field_id, 1000, None, None, None, None, False)
delta_2 = tracker_pb2.IssueDelta(
status='Duplicate',
blocking_add=[issue_2.issue_id],
summary='s' * (tracker_constants.MAX_SUMMARY_CHARS + 1),
owner_id=self.no_project_user.user_id,
field_vals_add=[fv])
issue_delta_pairs.append((issue_2, delta_2))
expected_err_msgs.extend(
[
('%s: MERGED type statuses must accompany mergedInto values.') %
issue_2_ref,
'%s: Cannot block an issue on itself.' % issue_2_ref,
'%s: Issue owner must be a project member.' % issue_2_ref,
'%s: Summary is too long.' % issue_2_ref,
'%s: Error for %r: Value must be <= 999.' % (issue_2_ref, fv)
])
issue_3 = _Issue('chicken', 3)
issue_3.status = 'Duplicate'
issue_3.merged_into = 78911
self.services.issue.TestAddIssue(issue_3)
issue_3_ref = getRef(issue_3)
delta_3 = tracker_pb2.IssueDelta(
status='Available', merged_into_external='b/123')
issue_delta_pairs.append((issue_3, delta_3))
expected_err_msgs.append(
'%s: MERGED type statuses must accompany mergedInto values.' %
issue_3_ref)
with self.assertRaisesRegex(exceptions.InputException,
'\n'.join(expected_err_msgs)):
tracker_helpers._AssertIssueChangesValid(
self.cnxn, issue_delta_pairs, self.services, comment_content=comment)
def testAssertIssueChangesValid_ConflictingDeltas(self):
def getRef(issue):
return '%s:%d' % (issue.project_name, issue.local_id)
expected_err_msgs = []
issue_3 = _Issue('chicken', 3)
self.services.issue.TestAddIssue(issue_3)
issue_3_ref = getRef(issue_3)
issue_4 = _Issue('chicken', 4)
self.services.issue.TestAddIssue(issue_4)
issue_4_ref = getRef(issue_4)
issue_5 = _Issue('chicken', 5)
self.services.issue.TestAddIssue(issue_5)
issue_5_ref = getRef(issue_5)
issue_6 = _Issue('chicken', 6)
self.services.issue.TestAddIssue(issue_6)
issue_6_ref = getRef(issue_6)
issue_7 = _Issue('chicken', 7)
self.services.issue.TestAddIssue(issue_7)
issue_7_ref = getRef(issue_7)
delta_3 = tracker_pb2.IssueDelta(
blocking_add=[issue_4.issue_id],
blocked_on_add=[issue_5.issue_id, issue_6.issue_id])
delta_4 = tracker_pb2.IssueDelta(
blocked_on_remove=[issue_3.issue_id], blocking_add=[issue_5.issue_id])
expected_err_msgs.append(
'Changes for %s conflict for %s' % (issue_4_ref, issue_3_ref))
delta_5 = tracker_pb2.IssueDelta(
blocking_remove=[issue_3.issue_id],
blocked_on_remove=[issue_4.issue_id])
expected_err_msgs.append(
'Changes for %s conflict for %s, %s' %
(issue_5_ref, issue_3_ref, issue_4_ref))
delta_6 = tracker_pb2.IssueDelta(blocking_remove=[issue_3.issue_id])
expected_err_msgs.append(
'Changes for %s conflict for %s' % (issue_6_ref, issue_3_ref))
impacted_issue = _Issue('chicken', 11)
self.services.issue.TestAddIssue(impacted_issue)
impacted_issue_ref = getRef(impacted_issue)
delta_7 = tracker_pb2.IssueDelta(
blocking_remove=[issue_3.issue_id],
blocking_add=[issue_3.issue_id],
blocked_on_remove=[impacted_issue.issue_id],
blocked_on_add=[impacted_issue.issue_id])
expected_err_msgs.append(
'Changes for %s conflict for %s, %s' %
(issue_7_ref, issue_3_ref, impacted_issue_ref))
issue_delta_pairs = [
(issue_3, delta_3),
(issue_4, delta_4),
(issue_5, delta_5),
(issue_6, delta_6),
(issue_7, delta_7),
]
with self.assertRaisesRegex(exceptions.InputException,
'\n'.join(expected_err_msgs)):
tracker_helpers._AssertIssueChangesValid(
self.cnxn, issue_delta_pairs, self.services)
def testComputeNewCcsFromIssueMerge(self):
"""We can compute the new ccs to add to a merge-into issue."""
target_issue = fake.MakeTestIssue(789, 10, 'Target issue', 'New', 111)
source_issue_1 = fake.MakeTestIssue(
789, 11, 'Source issue', 'New', 111) # different restrictions
source_issue_2 = fake.MakeTestIssue(
789, 12, 'Source issue', 'New', 222) # same restrictions
source_issue_3 = fake.MakeTestIssue(
789, 13, 'Source issue', 'New', 222) # no restrictions
source_issue_4 = fake.MakeTestIssue(
789, 14, 'Source issue', 'New', 666) # empty ccs
source_issue_5 = fake.MakeTestIssue(
788, 15, 'Source issue', 'New', 666) # different project
source_issue_1.cc_ids.append(333)
source_issue_2.cc_ids.append(444)
source_issue_3.cc_ids.append(555)
source_issue_5.cc_ids.append(999)
target_issue.labels.append('Restrict-View-Chicken')
source_issue_1.labels.append('Restrict-View-Cow')
source_issue_2.labels.append('Restrict-View-Chicken')
self.services.issue.TestAddIssue(target_issue)
self.services.issue.TestAddIssue(source_issue_1)
self.services.issue.TestAddIssue(source_issue_2)
self.services.issue.TestAddIssue(source_issue_3)
self.services.issue.TestAddIssue(source_issue_4)
self.services.issue.TestAddIssue(source_issue_5)
new_cc_ids = tracker_helpers._ComputeNewCcsFromIssueMerge(
target_issue, [source_issue_1, source_issue_2, source_issue_3])
six.assertCountEqual(self, new_cc_ids, [444, 555, 222])
def testComputeNewCcsFromIssueMerge_Empty(self):
target_issue = fake.MakeTestIssue(789, 10, 'Target issue', 'New', 111)
self.services.issue.TestAddIssue(target_issue)
new_cc_ids = tracker_helpers._ComputeNewCcsFromIssueMerge(target_issue, [])
six.assertCountEqual(self, new_cc_ids, [])
def testEnforceNonMergeStatusDeltas(self):
# No updates: user is setting to a non-MERGED status with no
# existing merged_into values.
issue_1 = _Issue('chicken', 1)
self.services.issue.TestAddIssue(issue_1)
delta_1 = tracker_pb2.IssueDelta(status='Available')
exp_delta_1 = copy.deepcopy(delta_1)
# No updates: user is setting to a MERGED status. Whether this request
# goes through will be handled by _AssertIssueChangesValid().
issue_2 = _Issue('chicken', 2)
self.services.issue.TestAddIssue(issue_2)
delta_2 = tracker_pb2.IssueDelta(status='Duplicate')
exp_delta_2 = copy.deepcopy(delta_2)
# No updates: user is setting to a MERGED status. (This test issue starts
# out with a merged_into value but a non-MERGED status. We don't expect
# real data to ever be in this state)
issue_3 = _Issue('chicken', 3)
issue_3.merged_into = 7011
self.services.issue.TestAddIssue(issue_3)
delta_3 = tracker_pb2.IssueDelta(status='Duplicate')
exp_delta_3 = copy.deepcopy(delta_3)
# No updates: same situation as above.
issue_4 = _Issue('chicken', 4)
issue_4.merged_into_external = 'b/123'
self.services.issue.TestAddIssue(issue_4)
delta_4 = tracker_pb2.IssueDelta(status='Duplicate')
exp_delta_4 = copy.deepcopy(delta_4)
# Update delta: user is setting status AWAY from a MERGED status, so we
# auto-remove any existing merged_into values.
issue_5 = _Issue('chicken', 5)
issue_5.merged_into = 7011
self.services.issue.TestAddIssue(issue_5)
delta_5 = tracker_pb2.IssueDelta(status='Available')
exp_delta_5 = copy.deepcopy(delta_5)
exp_delta_5.merged_into = 0
# Update delta: user is setting status AWAY from a MERGED status, so we
# auto-remove any existing merged_into values.
issue_6 = _Issue('chicken', 6)
issue_6.merged_into_external = 'b/123'
self.services.issue.TestAddIssue(issue_6)
delta_6 = tracker_pb2.IssueDelta(status='Available')
exp_delta_6 = copy.deepcopy(delta_6)
exp_delta_6.merged_into_external = ''
# No updates: user is setting to a non-MERGED status while also setting
# a merged_into value. This will be rejected down the line by
# _AssertIssueChangesValid()
issue_7 = _Issue('chicken', 7)
issue_7.merged_into = 7011
self.services.issue.TestAddIssue(issue_7)
delta_7 = tracker_pb2.IssueDelta(
merged_into_external='b/123', status='Available')
exp_delta_7 = copy.deepcopy(delta_7)
# No updates: user is setting to a non-MERGED status while also setting
# a merged_into value. This will be rejected down the line by
# _AssertIssueChangesValid()
issue_8 = _Issue('chicken', 8)
issue_8.merged_into_external = 'b/123'
self.services.issue.TestAddIssue(issue_8)
delta_8 = tracker_pb2.IssueDelta(merged_into=8011, status='Available')
exp_delta_8 = copy.deepcopy(delta_8)
pairs = [
(issue_1, delta_1), (issue_2, delta_2), (issue_3, delta_3),
(issue_4, delta_4), (issue_5, delta_5), (issue_6, delta_6),
(issue_7, delta_7), (issue_8, delta_8)
]
tracker_helpers._EnforceNonMergeStatusDeltas(
self.cnxn, pairs, self.services)
self.assertEqual(
[
delta_1, delta_2, delta_3, delta_4, delta_5, delta_6, delta_7,
delta_8
], [
exp_delta_1, exp_delta_2, exp_delta_3, exp_delta_4, exp_delta_5,
exp_delta_6, exp_delta_7, exp_delta_8
])
class IssueChangeImpactedIssuesTest(unittest.TestCase):
"""Tests for the _IssueChangeImpactedIssues class."""
def setUp(self):
self.services = service_manager.Services(
issue=fake.IssueService(), issue_star=fake.IssueStarService())
self.cnxn = 'fake connection'
def testComputeAllImpactedIDs(self):
tracker = tracker_helpers._IssueChangeImpactedIssues()
tracker.blocking_add[78901].append(1)
tracker.blocking_remove[78902].append(2)
tracker.blocked_on_add[78903].append(1)
tracker.blocked_on_remove[78904].append(1)
tracker.merged_from_add[78905].append(3)
tracker.merged_from_remove[78906].append(3)
# Repeat a few iids.
tracker.blocked_on_remove[78901].append(1)
tracker.merged_from_add[78903].append(1)
actual = tracker.ComputeAllImpactedIIDs()
expected = {78901, 78902, 78903, 78904, 78905, 78906}
self.assertEqual(actual, expected)
def testComputeAllImpactedIDs_Empty(self):
tracker = tracker_helpers._IssueChangeImpactedIssues()
actual = tracker.ComputeAllImpactedIIDs()
self.assertEqual(actual, set())
def testTrackImpactedIssues(self):
issue_delta_pairs = []
issue_1 = _Issue('project', 1)
issue_1.merged_into = 78906
delta_1 = tracker_pb2.IssueDelta(
merged_into=78905,
blocked_on_add=[78901, 78902],
blocked_on_remove=[78903, 78904],
)
issue_delta_pairs.append((issue_1, delta_1))
issue_2 = _Issue('project', 2)
issue_2.merged_into = 78905
delta_2 = tracker_pb2.IssueDelta(
merged_into=78905, # This should be ignored.
blocking_add=[78901, 78902],
blocking_remove=[78903, 78904],
)
issue_delta_pairs.append((issue_2, delta_2))
issue_3 = _Issue('project', 3)
issue_3.merged_into = 78902
delta_3 = tracker_pb2.IssueDelta(merged_into=78901)
issue_delta_pairs.append((issue_3, delta_3))
issue_4 = _Issue('project', 4)
issue_4.merged_into = 78901
delta_4 = tracker_pb2.IssueDelta(
merged_into=framework_constants.NO_ISSUE_SPECIFIED)
issue_delta_pairs.append((issue_4, delta_4))
impacted_issues = tracker_helpers._IssueChangeImpactedIssues()
for issue, delta in issue_delta_pairs:
impacted_issues.TrackImpactedIssues(issue, delta)
self.assertEqual(
impacted_issues.blocking_add, {
78901: [issue_1.issue_id],
78902: [issue_1.issue_id]
})
self.assertEqual(
impacted_issues.blocking_remove, {
78903: [issue_1.issue_id],
78904: [issue_1.issue_id]
})
self.assertEqual(
impacted_issues.blocked_on_add, {
78901: [issue_2.issue_id],
78902: [issue_2.issue_id]
})
self.assertEqual(
impacted_issues.blocked_on_remove, {
78903: [issue_2.issue_id],
78904: [issue_2.issue_id]
})
self.assertEqual(
impacted_issues.merged_from_add, {
78901: [issue_3.issue_id],
78905: [issue_1.issue_id],
})
self.assertEqual(
impacted_issues.merged_from_remove, {
78901: [issue_4.issue_id],
78902: [issue_3.issue_id],
78906: [issue_1.issue_id],
})
def testApplyImpactedIssueChanges(self):
impacted_tracker = tracker_helpers._IssueChangeImpactedIssues()
impacted_issue = _Issue('proj', 1)
self.services.issue.TestAddIssue(impacted_issue)
impacted_iid = impacted_issue.issue_id
# Setup.
bo_add = _Issue('proj', 2)
self.services.issue.TestAddIssue(bo_add)
impacted_tracker.blocked_on_add[impacted_iid].append(bo_add.issue_id)
bo_remove = _Issue('proj', 3)
self.services.issue.TestAddIssue(bo_remove)
impacted_tracker.blocked_on_remove[impacted_iid].append(
bo_remove.issue_id)
b_add = _Issue('proj', 4)
self.services.issue.TestAddIssue(b_add)
impacted_tracker.blocking_add[impacted_iid].append(
b_add.issue_id)
b_remove = _Issue('proj', 5)
self.services.issue.TestAddIssue(b_remove)
impacted_tracker.blocking_remove[impacted_iid].append(
b_remove.issue_id)
m_add = _Issue('proj', 6)
m_add.cc_ids = [666, 777]
self.services.issue.TestAddIssue(m_add)
m_add_no_ccs = _Issue('proj', 7, '', '')
self.services.issue.TestAddIssue(m_add_no_ccs)
impacted_tracker.merged_from_add[impacted_iid].extend(
[m_add.issue_id, m_add_no_ccs.issue_id])
# Set up starrers.
self.services.issue_star.SetStar(
self.cnxn, self.services, None, impacted_iid, 111, True)
self.services.issue_star.SetStar(
self.cnxn, self.services, None, impacted_iid, 222, True)
self.services.issue_star.SetStar(
self.cnxn, self.services, None, m_add.issue_id, 222, True)
self.services.issue_star.SetStar(
self.cnxn, self.services, None, m_add.issue_id, 333, True)
self.services.issue_star.SetStar(
self.cnxn, self.services, None, m_add.issue_id, 444, True)
m_remove = _Issue('proj', 8)
m_remove.cc_ids = [888]
self.services.issue.TestAddIssue(m_remove)
impacted_tracker.merged_from_remove[impacted_iid].append(
m_remove.issue_id)
impacted_issue.cc_ids = [666]
impacted_issue.blocked_on_iids = [78404, bo_remove.issue_id]
impacted_issue.blocking_iids = [78405, b_remove.issue_id]
expected_issue = copy.deepcopy(impacted_issue)
# Verify.
(actual_amendments,
actual_new_starrers) = impacted_tracker.ApplyImpactedIssueChanges(
self.cnxn, impacted_issue, self.services)
expected_amendments = [
tracker_bizobj.MakeBlockedOnAmendment(
[('proj', bo_add.local_id)],
[('proj', bo_remove.local_id)], default_project_name='proj'),
tracker_bizobj.MakeBlockingAmendment(
[('proj', b_add.local_id)],
[('proj', b_remove.local_id)], default_project_name='proj'),
tracker_bizobj.MakeCcAmendment([777], []),
tracker_bizobj.MakeMergedIntoAmendment(
[('proj', m_add.local_id), ('proj', m_add_no_ccs.local_id)],
[('proj', m_remove.local_id)], default_project_name='proj')
]
self.assertEqual(actual_amendments, expected_amendments)
six.assertCountEqual(self, actual_new_starrers, [333, 444])
expected_issue.cc_ids.append(777)
expected_issue.blocked_on_iids = [78404, bo_add.issue_id]
# By default new blocked_on issues that appear in blocked_on_iids
# with no prior rank associated with it are un-ranked and assigned rank 0.
# See SortBlockedOn in issue_svc.py.
expected_issue.blocked_on_ranks = [0, 0]
expected_issue.blocking_iids = [78405, b_add.issue_id]
expected_issue.star_count = 4
self.assertEqual(impacted_issue, expected_issue)
def testApplyImpactedIssueChanges_Empty(self):
impacted_tracker = tracker_helpers._IssueChangeImpactedIssues()
impacted_issue = _Issue('proj', 1)
expected_issue = copy.deepcopy(impacted_issue)
(actual_amendments,
actual_new_starrers) = impacted_tracker.ApplyImpactedIssueChanges(
self.cnxn, impacted_issue, self.services)
expected_amendments = []
self.assertEqual(actual_amendments, expected_amendments)
expected_new_starrers = []
self.assertEqual(actual_new_starrers, expected_new_starrers)
self.assertEqual(impacted_issue, expected_issue)
def testApplyImpactedIssueChanges_PartiallyEmptyMergedFrom(self):
"""We can process merged_from changes when one of the lists is empty."""
impacted_tracker = tracker_helpers._IssueChangeImpactedIssues()
impacted_issue = _Issue('proj', 1)
impacted_iid = impacted_issue.issue_id
expected_issue = copy.deepcopy(impacted_issue)
m_add = _Issue('proj', 2)
self.services.issue.TestAddIssue(m_add)
impacted_tracker.merged_from_add[impacted_iid].append(
m_add.issue_id)
# We're leaving impacted_tracker.merged_from_remove empty.
(actual_amendments,
actual_new_starrers) = impacted_tracker.ApplyImpactedIssueChanges(
self.cnxn, impacted_issue, self.services)
expected_amendments = [tracker_bizobj.MakeMergedIntoAmendment(
[('proj', m_add.local_id)], [], default_project_name='proj')]
self.assertEqual(actual_amendments, expected_amendments)
expected_new_starrers = []
self.assertEqual(actual_new_starrers, expected_new_starrers)
self.assertEqual(impacted_issue, expected_issue)
class AssertUsersExistTest(unittest.TestCase):
def setUp(self):
self.cnxn = 'fake cnxn'
self.services = service_manager.Services(user=fake.UserService())
for user_id in [1, 1001, 1002, 1003, 2001, 2002, 3002]:
self.services.user.TestAddUser('test%d' % user_id, user_id, add_user=True)
def test_AssertUsersExist_Passes(self):
existing = [1, 1001, 1002, 1003, 2001, 2002, 3002]
with exceptions.ErrorAggregator(exceptions.InputException) as err_agg:
tracker_helpers.AssertUsersExist(
self.cnxn, self.services, existing, err_agg)
def test_AssertUsersExist_Empty(self):
with exceptions.ErrorAggregator(exceptions.InputException) as err_agg:
tracker_helpers.AssertUsersExist(
self.cnxn, self.services, [], err_agg)
def test_AssertUsersExist(self):
dne_users = [2, 3]
existing = [1, 1001, 1002, 1003, 2001, 2002, 3002]
all_users = existing + dne_users
with self.assertRaisesRegex(
exceptions.InputException,
'users/2: User does not exist.\nusers/3: User does not exist.'):
with exceptions.ErrorAggregator(exceptions.InputException) as err_agg:
tracker_helpers.AssertUsersExist(
self.cnxn, self.services, all_users, err_agg)