blob: 6a98c459c3472a2978ecb199c8307ddc019bab3e [file] [log] [blame]
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is govered by a BSD-style
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd
"""A set of functions that provide persistence for Monorail issue tracking.
This module provides functions to get, update, create, and (in some
cases) delete each type of business object. It provides a logical
persistence layer on top of an SQL database.
Business objects are described in tracker_pb2.py and tracker_bizobj.py.
"""
import collections
import json
import logging
import os
import time
import uuid
from google.appengine.api import app_identity
from google.appengine.api import images
from third_party import cloudstorage
import settings
from features import filterrules_helpers
from framework import framework_bizobj
from framework import framework_constants
from framework import framework_helpers
from framework import gcs_helpers
from framework import permissions
from framework import sql
from infra_libs import ts_mon
from proto import project_pb2
from proto import tracker_pb2
from services import caches
from services import tracker_fulltext
from tracker import tracker_bizobj
from tracker import tracker_helpers
ISSUE_TABLE_NAME = 'Issue'
ISSUESUMMARY_TABLE_NAME = 'IssueSummary'
ISSUE2LABEL_TABLE_NAME = 'Issue2Label'
ISSUE2COMPONENT_TABLE_NAME = 'Issue2Component'
ISSUE2CC_TABLE_NAME = 'Issue2Cc'
ISSUE2NOTIFY_TABLE_NAME = 'Issue2Notify'
ISSUE2FIELDVALUE_TABLE_NAME = 'Issue2FieldValue'
COMMENT_TABLE_NAME = 'Comment'
ATTACHMENT_TABLE_NAME = 'Attachment'
ISSUERELATION_TABLE_NAME = 'IssueRelation'
DANGLINGRELATION_TABLE_NAME = 'DanglingIssueRelation'
ISSUEUPDATE_TABLE_NAME = 'IssueUpdate'
ISSUEFORMERLOCATIONS_TABLE_NAME = 'IssueFormerLocations'
REINDEXQUEUE_TABLE_NAME = 'ReindexQueue'
LOCALIDCOUNTER_TABLE_NAME = 'LocalIDCounter'
ISSUE_COLS = [
'id', 'project_id', 'local_id', 'status_id', 'owner_id', 'reporter_id',
'opened', 'closed', 'modified', 'derived_owner_id', 'derived_status_id',
'deleted', 'star_count', 'attachment_count', 'is_spam']
ISSUESUMMARY_COLS = ['issue_id', 'summary']
ISSUE2LABEL_COLS = ['issue_id', 'label_id', 'derived']
ISSUE2COMPONENT_COLS = ['issue_id', 'component_id', 'derived']
ISSUE2CC_COLS = ['issue_id', 'cc_id', 'derived']
ISSUE2NOTIFY_COLS = ['issue_id', 'email']
ISSUE2FIELDVALUE_COLS = [
'issue_id', 'field_id', 'int_value', 'str_value', 'user_id', 'derived']
COMMENT_COLS = [
'Comment.id', 'issue_id', 'created', 'Comment.project_id', 'commenter_id',
'content', 'inbound_message', 'was_escaped', 'deleted_by',
'Comment.is_spam']
ABBR_COMMENT_COLS = ['Comment.id', 'commenter_id', 'deleted_by']
ATTACHMENT_COLS = [
'id', 'issue_id', 'comment_id', 'filename', 'filesize', 'mimetype',
'deleted', 'gcs_object_id']
ISSUERELATION_COLS = ['issue_id', 'dst_issue_id', 'kind']
DANGLINGRELATION_COLS = [
'issue_id', 'dst_issue_project', 'dst_issue_local_id', 'kind']
ISSUEUPDATE_COLS = [
'id', 'issue_id', 'comment_id', 'field', 'old_value', 'new_value',
'added_user_id', 'removed_user_id', 'custom_field_name']
ISSUEFORMERLOCATIONS_COLS = ['issue_id', 'project_id', 'local_id']
REINDEXQUEUE_COLS = ['issue_id', 'created']
CHUNK_SIZE = 1000
class IssueIDTwoLevelCache(caches.AbstractTwoLevelCache):
"""Class to manage RAM and memcache for Issue IDs."""
def __init__(self, cache_manager, issue_service):
super(IssueIDTwoLevelCache, self).__init__(
cache_manager, 'issue_id', 'issue_id:', int,
max_size=settings.issue_cache_max_size, use_value_centric_cache=True)
self.issue_service = issue_service
def _DeserializeIssueIDs(self, project_local_issue_ids):
"""Convert database rows into a dict {(project_id, local_id): issue_id}."""
return {(project_id, local_id): issue_id
for (project_id, local_id, issue_id) in project_local_issue_ids}
def FetchItems(self, cnxn, keys):
"""On RAM and memcache miss, hit the database."""
local_ids_by_pid = collections.defaultdict(list)
for project_id, local_id in keys:
local_ids_by_pid[project_id].append(local_id)
where = [] # We OR per-project pairs of conditions together.
for project_id, local_ids_in_project in local_ids_by_pid.iteritems():
term_str = ('(Issue.project_id = %%s AND Issue.local_id IN (%s))' %
sql.PlaceHolders(local_ids_in_project))
where.append((term_str, [project_id] + local_ids_in_project))
rows = self.issue_service.issue_tbl.Select(
cnxn, cols=['project_id', 'local_id', 'id'],
where=where, or_where_conds=True)
return self._DeserializeIssueIDs(rows)
def _KeyToStr(self, key):
"""This cache uses pairs of ints as keys. Convert them to strings."""
return '%d,%d' % key
def _StrToKey(self, key_str):
"""This cache uses pairs of ints as keys. Convert them from strings."""
project_id_str, local_id_str = key_str.split(',')
return int(project_id_str), int(local_id_str)
class IssueTwoLevelCache(caches.AbstractTwoLevelCache):
"""Class to manage RAM and memcache for Issue PBs."""
def __init__(
self, cache_manager, issue_service, project_service, config_service):
super(IssueTwoLevelCache, self).__init__(
cache_manager, 'issue', 'issue:', tracker_pb2.Issue,
max_size=settings.issue_cache_max_size)
self.issue_service = issue_service
self.project_service = project_service
self.config_service = config_service
def _UnpackIssue(self, cnxn, issue_row):
"""Partially construct an issue object using info from a DB row."""
(issue_id, project_id, local_id, status_id, owner_id, reporter_id,
opened, closed, modified, derived_owner_id, derived_status_id,
deleted, star_count, attachment_count, is_spam) = issue_row
issue = tracker_pb2.Issue()
project = self.project_service.GetProject(cnxn, project_id)
issue.project_name = project.project_name
issue.issue_id = issue_id
issue.project_id = project_id
issue.local_id = local_id
if status_id is not None:
status = self.config_service.LookupStatus(cnxn, project_id, status_id)
issue.status = status
issue.owner_id = owner_id or 0
issue.reporter_id = reporter_id or 0
issue.derived_owner_id = derived_owner_id or 0
if derived_status_id is not None:
derived_status = self.config_service.LookupStatus(
cnxn, project_id, derived_status_id)
issue.derived_status = derived_status
issue.deleted = bool(deleted)
if opened:
issue.opened_timestamp = opened
if closed:
issue.closed_timestamp = closed
if modified:
issue.modified_timestamp = modified
issue.star_count = star_count
issue.attachment_count = attachment_count
issue.is_spam = bool(is_spam)
return issue
def _UnpackFieldValue(self, fv_row):
"""Construct a field value object from a DB row."""
(issue_id, field_id, int_value, str_value, user_id, derived) = fv_row
fv = tracker_bizobj.MakeFieldValue(
field_id, int_value, str_value, user_id, bool(derived))
return fv, issue_id
def _DeserializeIssues(
self, cnxn, issue_rows, summary_rows, label_rows, component_rows,
cc_rows, notify_rows, fieldvalue_rows, relation_rows,
dangling_relation_rows):
"""Convert the given DB rows into a dict of Issue PBs."""
results_dict = {}
for issue_row in issue_rows:
issue = self._UnpackIssue(cnxn, issue_row)
results_dict[issue.issue_id] = issue
for issue_id, summary in summary_rows:
results_dict[issue_id].summary = summary
# TODO(jrobbins): it would be nice to order labels by rank and name.
for issue_id, label_id, derived in label_rows:
issue = results_dict.get(issue_id)
if not issue:
logging.info('Got label for an unknown issue: %r %r',
label_rows, issue_rows)
continue
label = self.config_service.LookupLabel(cnxn, issue.project_id, label_id)
assert label, ('Label ID %r on IID %r not found in project %r' %
(label_id, issue_id, issue.project_id))
if derived:
results_dict[issue_id].derived_labels.append(label)
else:
results_dict[issue_id].labels.append(label)
for issue_id, component_id, derived in component_rows:
if derived:
results_dict[issue_id].derived_component_ids.append(component_id)
else:
results_dict[issue_id].component_ids.append(component_id)
for issue_id, user_id, derived in cc_rows:
if derived:
results_dict[issue_id].derived_cc_ids.append(user_id)
else:
results_dict[issue_id].cc_ids.append(user_id)
for issue_id, email in notify_rows:
results_dict[issue_id].derived_notify_addrs.append(email)
for fv_row in fieldvalue_rows:
fv, issue_id = self._UnpackFieldValue(fv_row)
results_dict[issue_id].field_values.append(fv)
for issue_id, dst_issue_id, kind in relation_rows:
src_issue = results_dict.get(issue_id)
dst_issue = results_dict.get(dst_issue_id)
assert src_issue or dst_issue, (
'Neither source issue %r nor dest issue %r was found' %
(issue_id, dst_issue_id))
if src_issue:
if kind == 'blockedon':
src_issue.blocked_on_iids.append(dst_issue_id)
elif kind == 'mergedinto':
src_issue.merged_into = dst_issue_id
else:
logging.info('unknown relation kind %r', kind)
continue
if dst_issue:
if kind == 'blockedon':
dst_issue.blocking_iids.append(issue_id)
for issue_id, dst_issue_proj, dst_issue_id, kind in dangling_relation_rows:
src_issue = results_dict.get(issue_id)
if kind == 'blockedon':
src_issue.dangling_blocked_on_refs.append(
tracker_bizobj.MakeDanglingIssueRef(dst_issue_proj, dst_issue_id))
elif kind == 'blocking':
src_issue.dangling_blocking_refs.append(
tracker_bizobj.MakeDanglingIssueRef(dst_issue_proj, dst_issue_id))
else:
logging.warn('unhandled danging relation kind %r', kind)
continue
return results_dict
# Note: sharding is used to here to allow us to load issues from the replicas
# without placing load on the master. Writes are not sharded.
# pylint: disable=arguments-differ
def FetchItems(self, cnxn, issue_ids, shard_id=None):
"""Retrieve and deserialize issues."""
issue_rows = self.issue_service.issue_tbl.Select(
cnxn, cols=ISSUE_COLS, id=issue_ids, shard_id=shard_id)
summary_rows = self.issue_service.issuesummary_tbl.Select(
cnxn, cols=ISSUESUMMARY_COLS, shard_id=shard_id, issue_id=issue_ids)
label_rows = self.issue_service.issue2label_tbl.Select(
cnxn, cols=ISSUE2LABEL_COLS, shard_id=shard_id, issue_id=issue_ids)
component_rows = self.issue_service.issue2component_tbl.Select(
cnxn, cols=ISSUE2COMPONENT_COLS, shard_id=shard_id, issue_id=issue_ids)
cc_rows = self.issue_service.issue2cc_tbl.Select(
cnxn, cols=ISSUE2CC_COLS, shard_id=shard_id, issue_id=issue_ids)
notify_rows = self.issue_service.issue2notify_tbl.Select(
cnxn, cols=ISSUE2NOTIFY_COLS, shard_id=shard_id, issue_id=issue_ids)
fieldvalue_rows = self.issue_service.issue2fieldvalue_tbl.Select(
cnxn, cols=ISSUE2FIELDVALUE_COLS, shard_id=shard_id,
issue_id=issue_ids)
if issue_ids:
ph = sql.PlaceHolders(issue_ids)
relation_rows = self.issue_service.issuerelation_tbl.Select(
cnxn, cols=ISSUERELATION_COLS,
where=[('(issue_id IN (%s) OR dst_issue_id IN (%s))' % (ph, ph),
issue_ids + issue_ids)])
dangling_relation_rows = self.issue_service.danglingrelation_tbl.Select(
cnxn, cols=DANGLINGRELATION_COLS, issue_id=issue_ids)
else:
relation_rows = []
dangling_relation_rows = []
return self._DeserializeIssues(
cnxn, issue_rows, summary_rows, label_rows, component_rows, cc_rows,
notify_rows, fieldvalue_rows, relation_rows, dangling_relation_rows)
class IssueService(object):
"""The persistence layer for Monorail's issues, comments, and attachments."""
spam_labels = ts_mon.CounterMetric('monorail/issue_svc/spam_label')
def __init__(self, project_service, config_service, cache_manager):
"""Initialize this object so that it is ready to use.
Args:
project_service: services object for project info.
config_service: services object for tracker configuration info.
cache_manager: local cache with distributed invalidation.
"""
# Tables that represent issue data.
self.issue_tbl = sql.SQLTableManager(ISSUE_TABLE_NAME)
self.issuesummary_tbl = sql.SQLTableManager(ISSUESUMMARY_TABLE_NAME)
self.issue2label_tbl = sql.SQLTableManager(ISSUE2LABEL_TABLE_NAME)
self.issue2component_tbl = sql.SQLTableManager(ISSUE2COMPONENT_TABLE_NAME)
self.issue2cc_tbl = sql.SQLTableManager(ISSUE2CC_TABLE_NAME)
self.issue2notify_tbl = sql.SQLTableManager(ISSUE2NOTIFY_TABLE_NAME)
self.issue2fieldvalue_tbl = sql.SQLTableManager(ISSUE2FIELDVALUE_TABLE_NAME)
self.issuerelation_tbl = sql.SQLTableManager(ISSUERELATION_TABLE_NAME)
self.danglingrelation_tbl = sql.SQLTableManager(DANGLINGRELATION_TABLE_NAME)
self.issueformerlocations_tbl = sql.SQLTableManager(
ISSUEFORMERLOCATIONS_TABLE_NAME)
# Tables that represent comments.
self.comment_tbl = sql.SQLTableManager(COMMENT_TABLE_NAME)
self.issueupdate_tbl = sql.SQLTableManager(ISSUEUPDATE_TABLE_NAME)
self.attachment_tbl = sql.SQLTableManager(ATTACHMENT_TABLE_NAME)
# Tables for cron tasks.
self.reindexqueue_tbl = sql.SQLTableManager(REINDEXQUEUE_TABLE_NAME)
# Tables for generating sequences of local IDs.
self.localidcounter_tbl = sql.SQLTableManager(LOCALIDCOUNTER_TABLE_NAME)
# Like a dictionary {(project_id, local_id): issue_id}
# Use value centric cache here because we cannot store a tuple in the
# Invalidate table.
self.issue_id_2lc = IssueIDTwoLevelCache(cache_manager, self)
# Like a dictionary {issue_id: issue}
self.issue_2lc = IssueTwoLevelCache(
cache_manager, self, project_service, config_service)
self._config_service = config_service
### Issue ID lookups
def LookupIssueIDs(self, cnxn, project_local_id_pairs):
"""Find the global issue IDs given the project ID and local ID of each."""
issue_id_dict, _misses = self.issue_id_2lc.GetAll(
cnxn, project_local_id_pairs)
# Put the Issue IDs in the order specified by project_local_id_pairs
issue_ids = [issue_id_dict[pair] for pair in project_local_id_pairs
if pair in issue_id_dict]
return issue_ids
def LookupIssueID(self, cnxn, project_id, local_id):
"""Find the global issue ID given the project ID and local ID."""
issue_ids = self.LookupIssueIDs(cnxn, [(project_id, local_id)])
try:
return issue_ids[0]
except IndexError:
raise NoSuchIssueException()
def ResolveIssueRefs(
self, cnxn, ref_projects, default_project_name, refs):
"""Look up all the referenced issues and return their issue_ids.
Args:
cnxn: connection to SQL database.
ref_projects: pre-fetched dict {project_name: project} of all projects
mentioned in the refs as well as the default project.
default_project_name: string name of the current project, this is used
when the project_name in a ref is None.
refs: list of (project_name, local_id) pairs. These are parsed from
textual references in issue descriptions, comments, and the input
in the blocked-on field.
Returns:
A list of issue_ids for all the referenced issues. References to issues
in deleted projects and any issues not found are simply ignored.
"""
if not refs:
return []
project_local_id_pairs = []
for project_name, local_id in refs:
project = ref_projects.get(project_name or default_project_name)
if not project or project.state == project_pb2.ProjectState.DELETABLE:
continue # ignore any refs to issues in deleted projects
project_local_id_pairs.append((project.project_id, local_id))
issue_ids = self.LookupIssueIDs(cnxn, project_local_id_pairs)
return issue_ids
### Issue objects
def CreateIssue(
self, cnxn, services, project_id, summary, status,
owner_id, cc_ids, labels, field_values, component_ids, reporter_id,
marked_description, blocked_on=None, blocking=None, attachments=None,
timestamp=None, index_now=True):
"""Create and store a new issue with all the given information.
Args:
cnxn: connection to SQL database.
services: persistence layer for users, issues, and projects.
project_id: int ID for the current project.
summary: one-line summary string summarizing this issue.
status: string issue status value. E.g., 'New'.
owner_id: user ID of the issue owner.
cc_ids: list of user IDs for users to be CC'd on changes.
labels: list of label strings. E.g., 'Priority-High'.
field_values: list of FieldValue PBs.
component_ids: list of int component IDs.
reporter_id: user ID of the user who reported the issue.
marked_description: issue description with initial HTML markup.
blocked_on: list of issue_ids that this issue is blocked on.
blocking: list of issue_ids that this issue blocks.
attachments: [(filename, contents, mimetype),...] attachments uploaded at
the time the comment was made.
timestamp: time that the issue was entered, defaults to now.
index_now: True if the issue should be updated in the full text index.
Returns:
The integer local ID of the new issue.
"""
config = self._config_service.GetProjectConfig(cnxn, project_id)
iids_to_invalidate = set()
status = framework_bizobj.CanonicalizeLabel(status)
labels = [framework_bizobj.CanonicalizeLabel(l) for l in labels]
labels = [l for l in labels if l]
issue = tracker_pb2.Issue()
issue.project_id = project_id
issue.summary = summary
issue.status = status
issue.owner_id = owner_id
issue.cc_ids.extend(cc_ids)
issue.labels.extend(labels)
issue.field_values.extend(field_values)
issue.component_ids.extend(component_ids)
issue.reporter_id = reporter_id
if blocked_on is not None:
iids_to_invalidate.update(blocked_on)
issue.blocked_on_iids = blocked_on
if blocking is not None:
iids_to_invalidate.update(blocking)
issue.blocking_iids = blocking
if attachments:
issue.attachment_count = len(attachments)
timestamp = timestamp or int(time.time())
issue.opened_timestamp = timestamp
issue.modified_timestamp = timestamp
comment = self._MakeIssueComment(
project_id, reporter_id, marked_description,
attachments=attachments, timestamp=timestamp, was_escaped=True)
# Set the closed_timestamp both before and after filter rules.
if not tracker_helpers.MeansOpenInProject(
tracker_bizobj.GetStatus(issue), config):
issue.closed_timestamp = timestamp
filterrules_helpers.ApplyFilterRules(cnxn, services, issue, config)
if not tracker_helpers.MeansOpenInProject(
tracker_bizobj.GetStatus(issue), config):
issue.closed_timestamp = timestamp
classification = services.spam.ClassifyIssue(issue, comment)
label = classification['outputLabel']
logging.info('issue/comment classification: %s' % classification)
score = 0
for output in classification['outputMulti']:
if output['label'] == label:
score = float(output['score'])
self.spam_labels.increment({'type': label})
if label == 'spam' and score > settings.classifier_spam_thresh:
# Must be negative so as not to use up actual local_ids.
# This can be fixed later if a human declares it to be ham.
issue.local_id = self.AllocateNextSpamLocalID(cnxn, project_id)
issue.is_spam = True
else:
issue.local_id = self.AllocateNextLocalID(cnxn, project_id)
issue_id = self.InsertIssue(cnxn, issue)
comment.issue_id = issue_id
self.InsertComment(cnxn, comment)
issue.issue_id = issue_id
services.spam.RecordClassifierIssueVerdict(
cnxn, issue, label=='spam', score)
if permissions.HasRestrictions(issue, 'view'):
self._config_service.InvalidateMemcache(
[issue], key_prefix='nonviewable:')
# Add a comment to existing issues saying they are now blocking or
# blocked on this issue.
blocked_add_issues = self.GetIssues(cnxn, blocked_on or [])
for add_issue in blocked_add_issues:
self.CreateIssueComment(
cnxn, add_issue.project_id, add_issue.local_id, reporter_id,
content='',
amendments=[tracker_bizobj.MakeBlockingAmendment(
[(issue.project_name, issue.local_id)], [],
default_project_name=add_issue.project_name)])
blocking_add_issues = self.GetIssues(cnxn, blocking or [])
for add_issue in blocking_add_issues:
self.CreateIssueComment(
cnxn, add_issue.project_id, add_issue.local_id, reporter_id,
content='',
amendments=[tracker_bizobj.MakeBlockedOnAmendment(
[(issue.project_name, issue.local_id)], [],
default_project_name=add_issue.project_name)])
self._UpdateIssuesModified(
cnxn, iids_to_invalidate, modified_timestamp=timestamp)
if index_now:
tracker_fulltext.IndexIssues(
cnxn, [issue], services.user, self, self._config_service)
return issue.local_id
def AllocateNewLocalIDs(self, cnxn, issues):
# Filter to just the issues that need new local IDs.
issues = [issue for issue in issues if issue.local_id < 0]
for issue in issues:
if issue.local_id < 0:
issue.local_id = self.AllocateNextLocalID(cnxn, issue.project_id)
self.UpdateIssues(cnxn, issues)
logging.info("AllocateNewLocalIDs")
def GetAllIssuesInProject(self, cnxn, project_id, min_local_id=None):
"""Special query to efficiently get ALL issues in a project.
This is not done while the user is waiting, only by backround tasks.
Args:
cnxn: connection to SQL database.
project_id: the ID of the project.
min_local_id: optional int to start at.
Returns:
A list of Issue protocol buffers for all issues.
"""
all_local_ids = self.GetAllLocalIDsInProject(
cnxn, project_id, min_local_id=min_local_id)
return self.GetIssuesByLocalIDs(cnxn, project_id, all_local_ids)
def GetAnyOnHandIssue(self, issue_ids, start=None, end=None):
"""Get any one issue from RAM or memcache, otherwise return None."""
return self.issue_2lc.GetAnyOnHandItem(issue_ids, start=start, end=end)
def GetIssuesDict(self, cnxn, issue_ids, use_cache=True, shard_id=None):
"""Get a dict {iid: issue} from the DB or cache."""
issue_dict, _missed_iids = self.issue_2lc.GetAll(
cnxn, issue_ids, use_cache=use_cache, shard_id=shard_id)
return issue_dict
def GetIssues(self, cnxn, issue_ids, use_cache=True, shard_id=None):
"""Get a list of Issue PBs from the DB or cache.
Args:
cnxn: connection to SQL database.
issue_ids: integer global issue IDs of the issues.
use_cache: optional boolean to turn off using the cache.
shard_id: optional int shard_id to limit retrieval.
Returns:
A list of Issue PBs in the same order as the given issue_ids.
"""
issue_dict = self.GetIssuesDict(
cnxn, issue_ids, use_cache=use_cache, shard_id=shard_id)
# Return a list that is ordered the same as the given issue_ids.
issue_list = [issue_dict[issue_id] for issue_id in issue_ids
if issue_id in issue_dict]
return issue_list
def GetIssue(self, cnxn, issue_id):
"""Get one Issue PB from the DB.
Args:
cnxn: connection to SQL database.
issue_id: integer global issue ID of the issue.
Returns:
The requested Issue protocol buffer.
Raises:
NoSuchIssueException: the issue was not found.
"""
issues = self.GetIssues(cnxn, [issue_id])
try:
return issues[0]
except IndexError:
raise NoSuchIssueException()
def GetIssuesByLocalIDs(
self, cnxn, project_id, local_id_list, shard_id=None):
"""Get all the requested issues.
Args:
cnxn: connection to SQL database.
project_id: int ID of the project to which the issues belong.
local_id_list: list of integer local IDs for the requested issues.
shard_id: optional int shard_id to choose a replica.
Returns:
List of Issue PBs for the requested issues. The result Issues
will be ordered in the same order as local_id_list.
"""
issue_ids_to_fetch = self.LookupIssueIDs(
cnxn, [(project_id, local_id) for local_id in local_id_list])
issues = self.GetIssues(cnxn, issue_ids_to_fetch, shard_id=shard_id)
return issues
def GetIssueByLocalID(self, cnxn, project_id, local_id):
"""Get one Issue PB from the DB.
Args:
cnxn: connection to SQL database.
project_id: the ID of the project to which the issue belongs.
local_id: integer local ID of the issue.
Returns:
The requested Issue protocol buffer.
"""
issues = self.GetIssuesByLocalIDs(cnxn, project_id, [local_id])
try:
return issues[0]
except IndexError:
raise NoSuchIssueException('The issue %s:%d does not exist.' % (
project_id, local_id))
def GetOpenAndClosedIssues(self, cnxn, issue_ids):
"""Return the requested issues in separate open and closed lists.
Args:
cnxn: connection to SQL database.
issue_ids: list of int issue issue_ids.
Returns:
A pair of lists, the first with open issues, second with closed issues.
"""
if not issue_ids:
return [], [] # make one common case efficient
issues = self.GetIssues(cnxn, issue_ids)
project_ids = {issue.project_id for issue in issues}
configs = self._config_service.GetProjectConfigs(cnxn, project_ids)
open_issues = []
closed_issues = []
for issue in issues:
config = configs[issue.project_id]
if tracker_helpers.MeansOpenInProject(
tracker_bizobj.GetStatus(issue), config):
open_issues.append(issue)
else:
closed_issues.append(issue)
return open_issues, closed_issues
def GetCurrentLocationOfMovedIssue(self, cnxn, project_id, local_id):
"""Return the current location of a moved issue based on old location."""
issue_id = int(self.issueformerlocations_tbl.SelectValue(
cnxn, 'issue_id', default=0, project_id=project_id, local_id=local_id))
if not issue_id:
return None, None
project_id, local_id = self.issue_tbl.SelectRow(
cnxn, cols=['project_id', 'local_id'], id=issue_id)
return project_id, local_id
def GetPreviousLocations(self, cnxn, issue):
"""Get all the previous locations of an issue."""
location_rows = self.issueformerlocations_tbl.Select(
cnxn, cols=['project_id', 'local_id'], issue_id=issue.issue_id)
locations = [(pid, local_id) for (pid, local_id) in location_rows
if pid != issue.project_id or local_id != issue.local_id]
return locations
def InsertIssue(self, cnxn, issue):
"""Store the given issue in SQL.
Args:
cnxn: connection to SQL database.
issue: Issue PB to insert into the database.
Returns:
The int issue_id of the newly created issue.
"""
status_id = self._config_service.LookupStatusID(
cnxn, issue.project_id, issue.status)
row = (issue.project_id, issue.local_id, status_id,
issue.owner_id or None,
issue.reporter_id,
issue.opened_timestamp,
issue.closed_timestamp,
issue.modified_timestamp,
issue.derived_owner_id or None,
self._config_service.LookupStatusID(
cnxn, issue.project_id, issue.derived_status),
bool(issue.deleted),
issue.star_count, issue.attachment_count,
issue.is_spam)
# ISSUE_COLs[1:] to skip setting the ID
# Insert into the Master DB.
generated_ids = self.issue_tbl.InsertRows(
cnxn, ISSUE_COLS[1:], [row], commit=False, return_generated_ids=True)
issue_id = generated_ids[0]
issue.issue_id = issue_id
self.issue_tbl.Update(
cnxn, {'shard': issue_id % settings.num_logical_shards},
id=issue.issue_id, commit=False)
self._UpdateIssuesSummary(cnxn, [issue], commit=False)
self._UpdateIssuesLabels(
cnxn, [issue], issue.project_id, commit=False)
self._UpdateIssuesFields(cnxn, [issue], commit=False)
self._UpdateIssuesComponents(cnxn, [issue], commit=False)
self._UpdateIssuesCc(cnxn, [issue], commit=False)
self._UpdateIssuesNotify(cnxn, [issue], commit=False)
self._UpdateIssuesRelation(cnxn, [issue], commit=False)
cnxn.Commit()
self._config_service.InvalidateMemcache([issue])
return issue_id
def UpdateIssues(
self, cnxn, issues, update_cols=None, just_derived=False, commit=True,
invalidate=True):
"""Update the given issues in SQL.
Args:
cnxn: connection to SQL database.
issues: list of issues to update.
update_cols: optional list of just the field names to update.
just_derived: set to True when only updating derived fields.
commit: set to False to skip the DB commit and do it in the caller.
invalidate: set to False to leave cache invalidatation to the caller.
"""
if not issues:
return
project_id = issues[0].project_id # All must be in the same project.
assert all(issue.project_id == project_id for issue in issues)
for issue in issues: # slow, but mysql will not allow REPLACE rows.
delta = {
'project_id': issue.project_id,
'local_id': issue.local_id,
'owner_id': issue.owner_id or None,
'status_id': self._config_service.LookupStatusID(
cnxn, issue.project_id, issue.status) or None,
'opened': issue.opened_timestamp,
'closed': issue.closed_timestamp,
'modified': issue.modified_timestamp,
'derived_owner_id': issue.derived_owner_id or None,
'derived_status_id': self._config_service.LookupStatusID(
cnxn, issue.project_id, issue.derived_status) or None,
'deleted': bool(issue.deleted),
'star_count': issue.star_count,
'attachment_count': issue.attachment_count,
'is_spam': issue.is_spam,
}
if update_cols is not None:
delta = {key: val for key, val in delta.iteritems()
if key in update_cols}
self.issue_tbl.Update(cnxn, delta, id=issue.issue_id, commit=False)
if not update_cols:
self._UpdateIssuesLabels(
cnxn, issues, project_id, commit=False)
self._UpdateIssuesCc(cnxn, issues, commit=False)
self._UpdateIssuesFields(cnxn, issues, commit=False)
self._UpdateIssuesComponents(cnxn, issues, commit=False)
self._UpdateIssuesNotify(cnxn, issues, commit=False)
if not just_derived:
self._UpdateIssuesSummary(cnxn, issues, commit=False)
self._UpdateIssuesRelation(cnxn, issues, commit=False)
iids_to_invalidate = [issue.issue_id for issue in issues]
if just_derived and invalidate:
self.issue_2lc.InvalidateAllKeys(cnxn, iids_to_invalidate)
elif invalidate:
self.issue_2lc.InvalidateKeys(cnxn, iids_to_invalidate)
if commit:
cnxn.Commit()
if invalidate:
self._config_service.InvalidateMemcache(issues)
def UpdateIssue(
self, cnxn, issue, update_cols=None, just_derived=False, commit=True,
invalidate=True):
"""Update the given issue in SQL.
Args:
cnxn: connection to SQL database.
issue: the issue to update.
update_cols: optional list of just the field names to update.
just_derived: set to True when only updating derived fields.
commit: set to False to skip the DB commit and do it in the caller.
invalidate: set to False to leave cache invalidatation to the caller.
"""
self.UpdateIssues(
cnxn, [issue], update_cols=update_cols, just_derived=just_derived,
commit=commit, invalidate=invalidate)
def _UpdateIssuesSummary(self, cnxn, issues, commit=True):
"""Update the IssueSummary table rows for the given issues."""
self.issuesummary_tbl.InsertRows(
cnxn, ISSUESUMMARY_COLS,
[(issue.issue_id, issue.summary) for issue in issues],
replace=True, commit=commit)
def _UpdateIssuesLabels(self, cnxn, issues, project_id, commit=True):
"""Update the Issue2Label table rows for the given issues."""
label_rows = []
for issue in issues:
issue_shard = issue.issue_id % settings.num_logical_shards
# TODO(jrobbins): If the user adds many novel labels in one issue update,
# that could be slow. Solution is to add all new labels in a batch first.
label_rows.extend(
(issue.issue_id,
self._config_service.LookupLabelID(cnxn, project_id, label), False,
issue_shard)
for label in issue.labels)
label_rows.extend(
(issue.issue_id,
self._config_service.LookupLabelID(cnxn, project_id, label), True,
issue_shard)
for label in issue.derived_labels)
self.issue2label_tbl.Delete(
cnxn, issue_id=[issue.issue_id for issue in issues],
commit=False)
self.issue2label_tbl.InsertRows(
cnxn, ISSUE2LABEL_COLS + ['issue_shard'],
label_rows, ignore=True, commit=commit)
def _UpdateIssuesFields(self, cnxn, issues, commit=True):
"""Update the Issue2FieldValue table rows for the given issues."""
fieldvalue_rows = []
for issue in issues:
issue_shard = issue.issue_id % settings.num_logical_shards
for fv in issue.field_values:
fieldvalue_rows.append(
(issue.issue_id, fv.field_id, fv.int_value, fv.str_value,
fv.user_id or None, fv.derived, issue_shard))
self.issue2fieldvalue_tbl.Delete(
cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
self.issue2fieldvalue_tbl.InsertRows(
cnxn, ISSUE2FIELDVALUE_COLS + ['issue_shard'],
fieldvalue_rows, commit=commit)
def _UpdateIssuesComponents(self, cnxn, issues, commit=True):
"""Update the Issue2Component table rows for the given issues."""
issue2component_rows = []
for issue in issues:
issue_shard = issue.issue_id % settings.num_logical_shards
issue2component_rows.extend(
(issue.issue_id, component_id, False, issue_shard)
for component_id in issue.component_ids)
issue2component_rows.extend(
(issue.issue_id, component_id, True, issue_shard)
for component_id in issue.derived_component_ids)
self.issue2component_tbl.Delete(
cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
self.issue2component_tbl.InsertRows(
cnxn, ISSUE2COMPONENT_COLS + ['issue_shard'],
issue2component_rows, ignore=True, commit=commit)
def _UpdateIssuesCc(self, cnxn, issues, commit=True):
"""Update the Issue2Cc table rows for the given issues."""
cc_rows = []
for issue in issues:
issue_shard = issue.issue_id % settings.num_logical_shards
cc_rows.extend(
(issue.issue_id, cc_id, False, issue_shard)
for cc_id in issue.cc_ids)
cc_rows.extend(
(issue.issue_id, cc_id, True, issue_shard)
for cc_id in issue.derived_cc_ids)
self.issue2cc_tbl.Delete(
cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
self.issue2cc_tbl.InsertRows(
cnxn, ISSUE2CC_COLS + ['issue_shard'],
cc_rows, ignore=True, commit=commit)
def _UpdateIssuesNotify(self, cnxn, issues, commit=True):
"""Update the Issue2Notify table rows for the given issues."""
notify_rows = []
for issue in issues:
derived_rows = [[issue.issue_id, email]
for email in issue.derived_notify_addrs]
notify_rows.extend(derived_rows)
self.issue2notify_tbl.Delete(
cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
self.issue2notify_tbl.InsertRows(
cnxn, ISSUE2NOTIFY_COLS, notify_rows, ignore=True, commit=commit)
def _UpdateIssuesRelation(self, cnxn, issues, commit=True):
"""Update the IssueRelation table rows for the given issues."""
relation_rows = []
dangling_relation_rows = []
for issue in issues:
for dst_issue_id in issue.blocked_on_iids:
relation_rows.append((issue.issue_id, dst_issue_id, 'blockedon'))
for dst_issue_id in issue.blocking_iids:
relation_rows.append((dst_issue_id, issue.issue_id, 'blockedon'))
for dst_ref in issue.dangling_blocked_on_refs:
dangling_relation_rows.append((
issue.issue_id, dst_ref.project, dst_ref.issue_id, 'blockedon'))
for dst_ref in issue.dangling_blocking_refs:
dangling_relation_rows.append((
issue.issue_id, dst_ref.project, dst_ref.issue_id, 'blocking'))
if issue.merged_into:
relation_rows.append((issue.issue_id, issue.merged_into, 'mergedinto'))
self.issuerelation_tbl.Delete(
cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
self.issuerelation_tbl.Delete(
cnxn, dst_issue_id=[issue.issue_id for issue in issues],
kind='blockedon', commit=False)
self.issuerelation_tbl.InsertRows(
cnxn, ISSUERELATION_COLS, relation_rows, ignore=True, commit=commit)
self.danglingrelation_tbl.Delete(
cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
self.danglingrelation_tbl.InsertRows(
cnxn, DANGLINGRELATION_COLS, dangling_relation_rows, ignore=True,
commit=commit)
def _UpdateIssuesModified(
self, cnxn, iids, modified_timestamp=None, invalidate=True):
"""Store a modified timestamp for each of the specified issues."""
delta = {'modified': modified_timestamp or int(time.time())}
self.issue_tbl.Update(cnxn, delta, id=iids, commit=False)
if invalidate:
self.InvalidateIIDs(cnxn, iids)
def DeltaUpdateIssue(
self, cnxn, services, reporter_id, project_id,
config, issue, status, owner_id, cc_add, cc_remove, comp_ids_add,
comp_ids_remove, labels_add, labels_remove, field_vals_add,
field_vals_remove, fields_clear, blocked_on_add=None,
blocked_on_remove=None, blocking_add=None, blocking_remove=None,
merged_into=None, index_now=False, comment=None, summary=None,
iids_to_invalidate=None, rules=None, predicate_asts=None,
timestamp=None):
"""Update the issue in the database and return a set of update tuples.
Args:
cnxn: connection to SQL database.
services: connections to persistence layer.
reporter_id: user ID of the user making this change.
project_id: int ID for the current project.
config: ProjectIssueConfig PB for this project.
issue: Issue PB of issue to update.
status: new issue status string, if a change is desired.
owner_id: user ID of the new issue owner, if a change is desired.
cc_add: list of user IDs of users to add to CC list.
cc_remove: list of user IDs of users to remove from CC list.
comp_ids_add: list of component IDs to add to the issue.
comp_ids_remove: list of component IDs to remove from the issue.
labels_add: list of issue label strings to add.
labels_remove: list of issue label strings to remove.
field_vals_add: dict of FieldValue PBs to add.
field_vals_remove: list of FieldValue PBs to remove.
fields_clear: list of custom field IDs to clear.
blocked_on_add: list of IIDs that this issue is now blocked on.
blocked_on_remove: list of IIDs that this issue is no longer blocked on.
blocking_add: list of IIDs that this issue is blocking.
blocking_remove: list of IIDs that this issue is no longer blocking.
merged_into: IID of issue that this issue was merged into, 0 to clear,
or None for no change.
index_now: True if the issue should be updated in the full text index.
comment: This should be the content of the comment
corresponding to this change.
summary: new issue summary, currently only used by GData API.
rules: optional list of preloaded FilterRule PBs for this project.
predicate_asts: optional list of QueryASTs for the rules. If rules are
provided, then predicate_asts should also be provided.
timestamp: int timestamp set during testing, otherwise defaults to
int(time.time()).
Returns:
A list of Amendment PBs that describe the set of metadata updates that
the user made. This tuple is later used in making the IssueComment.
"""
old_effective_status = tracker_bizobj.GetStatus(issue)
# Make all user input safe to echo out again later.
status = framework_bizobj.CanonicalizeLabel(status)
labels_add = [framework_bizobj.CanonicalizeLabel(l) for l in labels_add]
labels_add = [l for l in labels_add if l]
labels_remove = [framework_bizobj.CanonicalizeLabel(l)
for l in labels_remove]
labels_remove = [l for l in labels_remove if l]
logging.info(
'Bulk edit to project_id %s issue.local_id %s',
project_id, issue.local_id)
if iids_to_invalidate is None:
iids_to_invalidate = set([issue.issue_id])
invalidate = True
else:
iids_to_invalidate.add(issue.issue_id)
invalidate = False # Caller will do it.
# Store each updated value in the issue PB, and compute Update PBs
amendments = []
if status is not None and status != issue.status:
amendments.append(tracker_bizobj.MakeStatusAmendment(
status, issue.status))
issue.status = status
if owner_id is not None and owner_id != issue.owner_id:
amendments.append(tracker_bizobj.MakeOwnerAmendment(
owner_id, issue.owner_id))
issue.owner_id = owner_id
# compute the set of cc'd users added and removed
cc_add = [cc for cc in cc_add if cc not in issue.cc_ids]
cc_remove = [cc for cc in cc_remove if cc in issue.cc_ids]
if cc_add or cc_remove:
cc_ids = [cc for cc in list(issue.cc_ids) + cc_add
if cc not in cc_remove]
issue.cc_ids = cc_ids
amendments.append(tracker_bizobj.MakeCcAmendment(cc_add, cc_remove))
# compute the set of components added and removed
comp_ids_add = [c for c in comp_ids_add if c not in issue.component_ids]
comp_ids_remove = [c for c in comp_ids_remove if c in issue.component_ids]
if comp_ids_add or comp_ids_remove:
comp_ids = [cid for cid in list(issue.component_ids) + comp_ids_add
if cid not in comp_ids_remove]
issue.component_ids = comp_ids
amendments.append(tracker_bizobj.MakeComponentsAmendment(
comp_ids_add, comp_ids_remove, config))
# compute the set of labels added and removed
(labels, update_labels_add,
update_labels_remove) = framework_bizobj.MergeLabels(
issue.labels, labels_add, labels_remove,
config.exclusive_label_prefixes)
if update_labels_add or update_labels_remove:
issue.labels = labels
amendments.append(tracker_bizobj.MakeLabelsAmendment(
update_labels_add, update_labels_remove))
# compute the set of custom fields added and removed
(field_vals, update_fields_add,
update_fields_remove) = tracker_bizobj.MergeFields(
issue.field_values, field_vals_add, field_vals_remove,
config.field_defs)
if update_fields_add or update_fields_remove:
issue.field_values = field_vals
for fd in config.field_defs:
added_values_this_field = [
fv for fv in update_fields_add if fv.field_id == fd.field_id]
if added_values_this_field:
amendments.append(tracker_bizobj.MakeFieldAmendment(
fd.field_id, config,
[tracker_bizobj.GetFieldValue(fv, {})
for fv in added_values_this_field],
old_values=[]))
removed_values_this_field = [
fv for fv in update_fields_remove if fv.field_id == fd.field_id]
if removed_values_this_field:
amendments.append(tracker_bizobj.MakeFieldAmendment(
fd.field_id, config, [],
old_values=[tracker_bizobj.GetFieldValue(fv, {})
for fv in removed_values_this_field]))
if fields_clear:
field_clear_set = set(fields_clear)
revised_fields = []
for fd in config.field_defs:
if fd.field_id not in field_clear_set:
revised_fields.extend(
fv for fv in issue.field_values if fv.field_id == fd.field_id)
else:
amendments.append(
tracker_bizobj.MakeFieldClearedAmendment(fd.field_id, config))
if fd.field_type == tracker_pb2.FieldTypes.ENUM_TYPE:
prefix = fd.field_name.lower() + '-'
filtered_labels = [
lab for lab in issue.labels
if not lab.lower().startswith(prefix)]
issue.labels = filtered_labels
issue.field_values = revised_fields
if blocked_on_add or blocked_on_remove:
old_blocked_on = issue.blocked_on_iids
blocked_on_add = [iid for iid in blocked_on_add
if iid not in old_blocked_on]
add_refs = [(ref_issue.project_name, ref_issue.local_id)
for ref_issue in self.GetIssues(cnxn, blocked_on_add)]
blocked_on_rm = [iid for iid in blocked_on_remove
if iid in old_blocked_on]
remove_refs = [
(ref_issue.project_name, ref_issue.local_id)
for ref_issue in self.GetIssues(cnxn, blocked_on_rm)]
amendments.append(tracker_bizobj.MakeBlockedOnAmendment(
add_refs, remove_refs, default_project_name=issue.project_name))
blocked_on = [iid for iid in old_blocked_on + blocked_on_add
if iid not in blocked_on_remove]
issue.blocked_on_iids = blocked_on
iids_to_invalidate.update(blocked_on_add + blocked_on_remove)
if blocking_add or blocking_remove:
old_blocking = issue.blocking_iids
blocking_add = [iid for iid in blocking_add
if iid not in old_blocking]
add_refs = [(ref_issue.project_name, ref_issue.local_id)
for ref_issue in self.GetIssues(cnxn, blocking_add)]
blocking_remove = [iid for iid in blocking_remove
if iid in old_blocking]
remove_refs = [
(ref_issue.project_name, ref_issue.local_id)
for ref_issue in self.GetIssues(cnxn, blocking_remove)]
amendments.append(tracker_bizobj.MakeBlockingAmendment(
add_refs, remove_refs, default_project_name=issue.project_name))
blocking_refs = [iid for iid in old_blocking + blocking_add
if iid not in blocking_remove]
issue.blocking_iids = blocking_refs
iids_to_invalidate.update(blocking_add + blocking_remove)
if merged_into is not None and merged_into != issue.merged_into:
merged_remove = issue.merged_into
merged_add = merged_into
issue.merged_into = merged_into
try:
remove_issue = self.GetIssue(cnxn, merged_remove)
remove_ref = remove_issue.project_name, remove_issue.local_id
iids_to_invalidate.add(merged_remove)
except NoSuchIssueException:
remove_ref = None
try:
add_issue = self.GetIssue(cnxn, merged_add)
add_ref = add_issue.project_name, add_issue.local_id
iids_to_invalidate.add(merged_add)
except NoSuchIssueException:
add_ref = None
amendments.append(tracker_bizobj.MakeMergedIntoAmendment(
add_ref, remove_ref, default_project_name=issue.project_name))
if summary and summary != issue.summary:
amendments.append(tracker_bizobj.MakeSummaryAmendment(
summary, issue.summary))
issue.summary = summary
# If this was a no-op with no comment, bail out and don't save,
# invalidate, or re-index anything.
if not amendments and (not comment or not comment.strip()):
return [], None
# Note: no need to check for collisions when the user is doing a delta.
# update the modified_timestamp for any comment added, even if it was
# just a text comment with no issue fields changed.
issue.modified_timestamp = timestamp or int(time.time())
# Update the closed timestamp before filter rules so that rules
# can test for closed_timestamp, and also after filter rules
# so that closed_timestamp will be set if the issue is closed by the rule.
_UpdateClosedTimestamp(config, issue, old_effective_status)
if rules is None:
logging.info('Rules were not given')
rules = services.features.GetFilterRules(cnxn, config.project_id)
predicate_asts = filterrules_helpers.ParsePredicateASTs(
rules, config, None)
filterrules_helpers.ApplyGivenRules(
cnxn, services, issue, config, rules, predicate_asts)
_UpdateClosedTimestamp(config, issue, old_effective_status)
# Store the issue in SQL.
self.UpdateIssue(cnxn, issue, commit=False, invalidate=False)
comment_pb = self.CreateIssueComment(
cnxn, project_id, issue.local_id, reporter_id, comment,
amendments=amendments, commit=False)
self._UpdateIssuesModified(
cnxn, iids_to_invalidate, modified_timestamp=issue.modified_timestamp,
invalidate=invalidate)
if not invalidate:
cnxn.Commit()
if index_now:
tracker_fulltext.IndexIssues(
cnxn, [issue], services.user_service, self, self._config_service)
return amendments, comment_pb
def InvalidateIIDs(self, cnxn, iids_to_invalidate):
"""Invalidate the specified issues in the Invalidate table and memcache."""
issues_to_invalidate = self.GetIssues(cnxn, iids_to_invalidate)
self.issue_2lc.InvalidateKeys(cnxn, iids_to_invalidate)
self._config_service.InvalidateMemcache(issues_to_invalidate)
def ApplyIssueComment(
self, cnxn, services, reporter_id, project_id,
local_id, summary, status, owner_id, cc_ids, labels, field_values,
component_ids, blocked_on, blocking, dangling_blocked_on_refs,
dangling_blocking_refs, merged_into, index_now=True,
page_gen_ts=None, comment=None, inbound_message=None, attachments=None,
timestamp=None):
"""Update the issue in the database and return info for notifications.
Args:
cnxn: connection to SQL database.
services: connection to persistence layer.
reporter_id: user ID of the user making this change.
project_id: int Project ID for the current project.
local_id: integer local ID of the issue to update.
summary: new issue summary string.
status: new issue status string.
owner_id: user ID of the new issue owner.
cc_ids: list of user IDs of users to CC when the issue changes.
labels: list of new issue label strings.
field_values: list of FieldValue PBs.
component_ids: list of int component IDs.
blocked_on: list of IIDs that this issue is blocked on.
blocking: list of IIDs that this issue is blocking.
dangling_blocked_on_refs: list of Codesite issues this is blocked on.
dangling_blocking_refs: list of Codesite issues this is blocking.
merged_into: IID of issue that this issue was merged into, 0 to clear.
index_now: True if the issue should be updated in the full text index.
page_gen_ts: time at which the issue HTML page was generated,
used in detecting mid-air collisions.
comment: This should be the content of the comment
corresponding to this change.
inbound_message: optional string full text of an email that caused
this comment to be added.
attachments: This should be a list of
[(filename, contents, mimetype),...] attachments uploaded at
the time the comment was made.
timestamp: int timestamp set during testing, otherwise defaults to
int(time.time()).
Returns:
(amendments, comment_pb). Amendments is a list of Amendment PBs
that describe the set of metadata updates that the user made.
Comment_pb is the IssueComment for the change.
Raises:
MidAirCollisionException: indicates that the issue has been
changed since the user loaded the page.
"""
status = framework_bizobj.CanonicalizeLabel(status)
labels = [framework_bizobj.CanonicalizeLabel(l) for l in labels]
labels = [l for l in labels if l]
# Use canonical label names
label_ids = self._config_service.LookupLabelIDs(
cnxn, project_id, labels, autocreate=True)
labels = [self._config_service.LookupLabel(cnxn, project_id, l_id)
for l_id in label_ids]
# Get the issue and project configurations.
config = self._config_service.GetProjectConfig(cnxn, project_id)
issue = self.GetIssueByLocalID(cnxn, project_id, local_id)
# Store each updated value in the issue PB, and compute amendments
amendments = []
iids_to_invalidate = set()
if summary and summary != issue.summary:
amendments.append(tracker_bizobj.MakeSummaryAmendment(
summary, issue.summary))
issue.summary = summary
old_effective_status = tracker_bizobj.GetStatus(issue)
if status != issue.status:
amendments.append(tracker_bizobj.MakeStatusAmendment(
status, issue.status))
issue.status = status
if owner_id != issue.owner_id:
amendments.append(tracker_bizobj.MakeOwnerAmendment(
owner_id, issue.owner_id))
if owner_id == framework_constants.NO_USER_SPECIFIED:
issue.reset('owner_id')
else:
issue.owner_id = owner_id
# TODO(jrobbins): factor the CC code into a method and add a test
# compute the set of cc'd users added and removed
cc_added = [cc for cc in cc_ids if cc not in issue.cc_ids]
cc_removed = [cc for cc in issue.cc_ids if cc not in cc_ids]
if cc_added or cc_removed:
amendments.append(tracker_bizobj.MakeCcAmendment(cc_added, cc_removed))
issue.cc_ids = cc_ids
# TODO(jrobbins): factor the labels code into a method and add a test
# compute the set of labels added and removed
labels_added = [lab for lab in labels
if lab not in issue.labels]
labels_removed = [lab for lab in issue.labels
if lab not in labels]
if labels_added or labels_removed:
amendments.append(tracker_bizobj.MakeLabelsAmendment(
labels_added, labels_removed))
issue.labels = labels
old_field_values = collections.defaultdict(list)
for ofv in issue.field_values:
# Passing {} because I just want the user_id, not the email address.
old_field_values[ofv.field_id].append(
tracker_bizobj.GetFieldValue(ofv, {}))
for field_id, values in old_field_values.iteritems():
old_field_values[field_id] = sorted(values)
new_field_values = collections.defaultdict(list)
for nfv in field_values:
new_field_values[nfv.field_id].append(
tracker_bizobj.GetFieldValue(nfv, {}))
for field_id, values in new_field_values.iteritems():
new_field_values[field_id] = sorted(values)
field_ids_added = {fv.field_id for fv in field_values
if fv.field_id not in old_field_values}
field_ids_removed = {ofv.field_id for ofv in issue.field_values
if ofv.field_id not in new_field_values}
field_ids_changed = {
fv.field_id for fv in field_values
if (fv.field_id in old_field_values and
old_field_values[fv.field_id] != new_field_values[fv.field_id])}
if field_ids_added or field_ids_removed or field_ids_changed:
amendments.extend(
tracker_bizobj.MakeFieldAmendment(fid, config, new_field_values[fid])
for fid in field_ids_added)
amendments.extend(
tracker_bizobj.MakeFieldAmendment(
fid, config, new_field_values[fid],
old_values=old_field_values[fid])
for fid in field_ids_changed)
amendments.extend(
tracker_bizobj.MakeFieldAmendment(fid, config, [])
for fid in field_ids_removed)
issue.field_values = field_values
comps_added = [comp for comp in component_ids
if comp not in issue.component_ids]
comps_removed = [comp for comp in issue.component_ids
if comp not in component_ids]
if comps_added or comps_removed:
amendments.append(tracker_bizobj.MakeComponentsAmendment(
comps_added, comps_removed, config))
issue.component_ids = component_ids
if merged_into != issue.merged_into:
# TODO(jrobbins): refactor this into LookupIssueRefByIssueID().
try:
merged_remove = self.GetIssue(cnxn, issue.merged_into)
remove_ref = merged_remove.project_name, merged_remove.local_id
iids_to_invalidate.add(issue.merged_into)
except NoSuchIssueException:
remove_ref = None
try:
merged_add = self.GetIssue(cnxn, merged_into)
add_ref = merged_add.project_name, merged_add.local_id
iids_to_invalidate.add(merged_into)
except NoSuchIssueException:
add_ref = None
issue.merged_into = merged_into
amendments.append(tracker_bizobj.MakeMergedIntoAmendment(
add_ref, remove_ref, default_project_name=issue.project_name))
blockers_added, blockers_removed = framework_helpers.ComputeListDeltas(
issue.blocked_on_iids, blocked_on)
danglers_added, danglers_removed = framework_helpers.ComputeListDeltas(
issue.dangling_blocked_on_refs, dangling_blocked_on_refs)
blocked_add_issues = []
blocked_remove_issues = []
if blockers_added or blockers_removed or danglers_added or danglers_removed:
blocked_add_issues = self.GetIssues(cnxn, blockers_added)
add_refs = [(ref_issue.project_name, ref_issue.local_id)
for ref_issue in blocked_add_issues]
add_refs.extend([(ref.project, ref.issue_id) for ref in danglers_added])
blocked_remove_issues = self.GetIssues(cnxn, blockers_removed)
remove_refs = [
(ref_issue.project_name, ref_issue.local_id)
for ref_issue in blocked_remove_issues]
remove_refs.extend([(ref.project, ref.issue_id)
for ref in danglers_removed])
amendments.append(tracker_bizobj.MakeBlockedOnAmendment(
add_refs, remove_refs, default_project_name=issue.project_name))
issue.blocked_on_iids = blocked_on
issue.dangling_blocked_on_refs = dangling_blocked_on_refs
iids_to_invalidate.update(blockers_added + blockers_removed)
blockers_added, blockers_removed = framework_helpers.ComputeListDeltas(
issue.blocking_iids, blocking)
danglers_added, danglers_removed = framework_helpers.ComputeListDeltas(
issue.dangling_blocking_refs, dangling_blocking_refs)
blocking_add_issues = []
blocking_remove_issues = []
if blockers_added or blockers_removed or danglers_added or danglers_removed:
blocking_add_issues = self.GetIssues(cnxn, blockers_added)
add_refs = [(ref_issue.project_name, ref_issue.local_id)
for ref_issue in blocking_add_issues]
add_refs.extend([(ref.project, ref.issue_id) for ref in danglers_added])
blocking_remove_issues = self.GetIssues(cnxn, blockers_removed)
remove_refs = [
(ref_issue.project_name, ref_issue.local_id)
for ref_issue in blocking_remove_issues]
remove_refs.extend([(ref.project, ref.issue_id)
for ref in danglers_removed])
amendments.append(tracker_bizobj.MakeBlockingAmendment(
add_refs, remove_refs, default_project_name=issue.project_name))
issue.blocking_iids = blocking
issue.dangling_blocking_refs = dangling_blocking_refs
iids_to_invalidate.update(blockers_added + blockers_removed)
logging.info('later amendments so far is %r', amendments)
# Raise an exception if the issue was changed by another user
# while this user was viewing/editing the issue.
if page_gen_ts and amendments:
# The issue timestamp is stored in seconds, convert to microseconds to
# match the page_gen_ts.
issue_ts = issue.modified_timestamp * 1000000
if issue_ts > page_gen_ts:
logging.info('%d > %d', issue_ts, page_gen_ts)
logging.info('amendments: %s', amendments)
# Forget all the modificiations made to this issue in RAM.
self.issue_2lc.InvalidateKeys(cnxn, [issue.issue_id])
raise MidAirCollisionException('issue %d' % local_id, local_id)
# update the modified_timestamp for any comment added, even if it was
# just a text comment with no issue fields changed.
issue.modified_timestamp = timestamp or int(time.time())
# Update closed_timestamp both before and after filter rules.
_UpdateClosedTimestamp(config, issue, old_effective_status)
filterrules_helpers.ApplyFilterRules(cnxn, services, issue, config)
_UpdateClosedTimestamp(config, issue, old_effective_status)
self.UpdateIssue(cnxn, issue)
# TODO(jrobbins): only invalidate nonviewable if the following changed:
# restriction label, owner, cc, or user-type custom field.
self._config_service.InvalidateMemcache([issue], key_prefix='nonviewable:')
classification = services.spam.ClassifyComment(comment)
label = classification['outputLabel']
logging.info('comment classification: %s' % classification)
score = 0
is_spam = False
for output in classification['outputMulti']:
if output['label'] == label:
score = float(output['score'])
if label == 'spam' and score > settings.classifier_spam_thresh:
logging.info('spam comment: %s' % comment)
is_spam = True
if amendments or (comment and comment.strip()) or attachments:
logging.info('amendments = %r', amendments)
comment_pb = self.CreateIssueComment(
cnxn, project_id, local_id, reporter_id, comment,
amendments=amendments, attachments=attachments,
inbound_message=inbound_message, is_spam=is_spam)
services.spam.RecordClassifierCommentVerdict(
cnxn, comment_pb, is_spam, score)
else:
comment_pb = None
# Add a comment to the newly added issues saying they are now blocking
# this issue.
for add_issue in blocked_add_issues:
self.CreateIssueComment(
cnxn, add_issue.project_id, add_issue.local_id, reporter_id,
content='',
amendments=[tracker_bizobj.MakeBlockingAmendment(
[(issue.project_name, issue.local_id)], [],
default_project_name=add_issue.project_name)])
# Add a comment to the newly removed issues saying they are no longer
# blocking this issue.
for remove_issue in blocked_remove_issues:
self.CreateIssueComment(
cnxn, remove_issue.project_id, remove_issue.local_id, reporter_id,
content='',
amendments=[tracker_bizobj.MakeBlockingAmendment(
[], [(issue.project_name, issue.local_id)],
default_project_name=remove_issue.project_name)])
# Add a comment to the newly added issues saying they are now blocked on
# this issue.
for add_issue in blocking_add_issues:
self.CreateIssueComment(
cnxn, add_issue.project_id, add_issue.local_id, reporter_id,
content='',
amendments=[tracker_bizobj.MakeBlockedOnAmendment(
[(issue.project_name, issue.local_id)], [],
default_project_name=add_issue.project_name)])
# Add a comment to the newly removed issues saying they are no longer
# blocked on this issue.
for remove_issue in blocking_remove_issues:
self.CreateIssueComment(
cnxn, remove_issue.project_id, remove_issue.local_id, reporter_id,
content='',
amendments=[tracker_bizobj.MakeBlockedOnAmendment(
[], [(issue.project_name, issue.local_id)],
default_project_name=remove_issue.project_name)])
self._UpdateIssuesModified(
cnxn, iids_to_invalidate, modified_timestamp=issue.modified_timestamp)
if index_now:
tracker_fulltext.IndexIssues(
cnxn, [issue], services.user, self, self._config_service)
if is_spam:
sequence_num = len(self.GetCommentsForIssue(cnxn, issue.issue_id)) - 1
# Soft-deletes have to have a user ID, so spam comments are
# just "deleted" by the commenter.
self.SoftDeleteComment(cnxn, project_id, local_id, sequence_num,
reporter_id, services.user, is_spam=True)
return amendments, comment_pb
def RelateIssues(self, cnxn, issue_relation_dict, commit=True):
"""Update the IssueRelation table rows for the given relationships.
issue_relation_dict is a mapping of 'source' issues to 'destination' issues,
paired with the kind of relationship connecting the two.
"""
relation_rows = []
for src_iid, dests in issue_relation_dict.iteritems():
for dst_iid, kind in dests:
if kind == 'blocking':
relation_rows.append((dst_iid, src_iid, 'blockedon'))
elif kind == 'blockedon' or kind == 'mergedinto':
relation_rows.append((src_iid, dst_iid, kind))
self.issuerelation_tbl.InsertRows(
cnxn, ISSUERELATION_COLS, relation_rows, ignore=True, commit=commit)
def CopyIssues(self, cnxn, dest_project, issues, user_service, copier_id):
"""Copy the given issues into the destination project."""
created_issues = []
iids_to_invalidate = set()
for target_issue in issues:
new_issue = tracker_pb2.Issue()
new_issue.project_id = dest_project.project_id
new_issue.project_name = dest_project.project_name
new_issue.summary = target_issue.summary
new_issue.labels.extend(target_issue.labels)
new_issue.field_values.extend(target_issue.field_values)
new_issue.reporter_id = copier_id
timestamp = int(time.time())
new_issue.opened_timestamp = timestamp
new_issue.modified_timestamp = timestamp
target_comments = self.GetCommentsForIssue(cnxn, target_issue.issue_id)
initial_summary_comment = target_comments[0]
# Note that blocking and merge_into are not copied.
if target_issue.blocked_on_iids:
blocked_on = target_issue.blocked_on_iids
iids_to_invalidate.update(blocked_on)
new_issue.blocked_on_iids = blocked_on
# Gather list of attachments from the target issue's summary comment.
# MakeIssueComments expects a list of [(filename, contents, mimetype),...]
attachments = []
for attachment in initial_summary_comment.attachments:
object_path = ('/' + app_identity.get_default_gcs_bucket_name() +
attachment.gcs_object_id)
with cloudstorage.open(object_path, 'r') as f:
content = f.read()
attachments.append(
[attachment.filename, content, attachment.mimetype])
if attachments:
new_issue.attachment_count = len(attachments)
# Create the same summary comment as the target issue.
comment = self._MakeIssueComment(
dest_project.project_id, copier_id, initial_summary_comment.content,
attachments=attachments, timestamp=timestamp, was_escaped=True)
new_issue.local_id = self.AllocateNextLocalID(
cnxn, dest_project.project_id)
issue_id = self.InsertIssue(cnxn, new_issue)
comment.issue_id = issue_id
self.InsertComment(cnxn, comment)
if permissions.HasRestrictions(new_issue, 'view'):
self._config_service.InvalidateMemcache(
[new_issue], key_prefix='nonviewable:')
tracker_fulltext.IndexIssues(
cnxn, [new_issue], user_service, self, self._config_service)
created_issues.append(new_issue)
# The referenced issues are all modified when the relationship is added.
self._UpdateIssuesModified(
cnxn, iids_to_invalidate, modified_timestamp=timestamp)
return created_issues
def MoveIssues(self, cnxn, dest_project, issues, user_service):
"""Move the given issues into the destination project."""
old_location_rows = [
(issue.issue_id, issue.project_id, issue.local_id)
for issue in issues]
moved_back_iids = set()
former_locations_in_project = self.issueformerlocations_tbl.Select(
cnxn, cols=ISSUEFORMERLOCATIONS_COLS,
project_id=dest_project.project_id,
issue_id=[issue.issue_id for issue in issues])
former_locations = {
issue_id: local_id
for issue_id, project_id, local_id in former_locations_in_project}
# Remove the issue id from issue_id_2lc so that it does not stay
# around in cache and memcache.
# The Key of IssueIDTwoLevelCache is (project_id, local_id).
issue_id_2lc_key = (issues[0].project_id, issues[0].local_id)
self.issue_id_2lc.InvalidateKeys(cnxn, [issue_id_2lc_key])
for issue in issues:
if issue.issue_id in former_locations:
dest_id = former_locations[issue.issue_id]
moved_back_iids.add(issue.issue_id)
else:
dest_id = self.AllocateNextLocalID(cnxn, dest_project.project_id)
issue.local_id = dest_id
issue.project_id = dest_project.project_id
issue.project_name = dest_project.project_name
# Rewrite each whole issue so that status and label IDs are looked up
# in the context of the destination project.
self.UpdateIssues(cnxn, issues)
# Comments also have the project_id because it is needed for an index.
self.comment_tbl.Update(
cnxn, {'project_id': dest_project.project_id},
issue_id=[issue.issue_id for issue in issues], commit=False)
# Record old locations so that we can offer links if the user looks there.
self.issueformerlocations_tbl.InsertRows(
cnxn, ISSUEFORMERLOCATIONS_COLS, old_location_rows, ignore=True,
commit=False)
cnxn.Commit()
tracker_fulltext.IndexIssues(
cnxn, issues, user_service, self, self._config_service)
return moved_back_iids
def ExpungeFormerLocations(self, cnxn, project_id):
"""Delete history of issues that were in this project but moved out."""
self.issueformerlocations_tbl.Delete(cnxn, project_id=project_id)
def ExpungeIssues(self, cnxn, issue_ids):
"""Completely delete the specified issues from the database."""
logging.info('expunging the issues %r', issue_ids)
tracker_fulltext.UnindexIssues(issue_ids)
remaining_iids = issue_ids[:]
# Note: these are purposely not done in a transaction to allow
# incremental progress in what might be a very large change.
# We are not concerned about non-atomic deletes because all
# this data will be gone eventually anyway.
while remaining_iids:
iids_in_chunk = remaining_iids[:CHUNK_SIZE]
remaining_iids = remaining_iids[CHUNK_SIZE:]
self.issuesummary_tbl.Delete(cnxn, issue_id=iids_in_chunk)
self.issue2label_tbl.Delete(cnxn, issue_id=iids_in_chunk)
self.issue2component_tbl.Delete(cnxn, issue_id=iids_in_chunk)
self.issue2cc_tbl.Delete(cnxn, issue_id=iids_in_chunk)
self.issue2notify_tbl.Delete(cnxn, issue_id=iids_in_chunk)
self.issueupdate_tbl.Delete(cnxn, issue_id=iids_in_chunk)
self.attachment_tbl.Delete(cnxn, issue_id=iids_in_chunk)
self.comment_tbl.Delete(cnxn, issue_id=iids_in_chunk)
self.issuerelation_tbl.Delete(cnxn, issue_id=iids_in_chunk)
self.issuerelation_tbl.Delete(cnxn, dst_issue_id=iids_in_chunk)
self.danglingrelation_tbl.Delete(cnxn, issue_id=iids_in_chunk)
self.issueformerlocations_tbl.Delete(cnxn, issue_id=iids_in_chunk)
self.reindexqueue_tbl.Delete(cnxn, issue_id=iids_in_chunk)
self.issue_tbl.Delete(cnxn, id=iids_in_chunk)
def SoftDeleteIssue(self, cnxn, project_id, local_id, deleted, user_service):
"""Set the deleted boolean on the indicated issue and store it.
Args:
cnxn: connection to SQL database.
project_id: int project ID for the current project.
local_id: int local ID of the issue to freeze/unfreeze.
deleted: boolean, True to soft-delete, False to undelete.
user_service: persistence layer for users, used to lookup user IDs.
"""
issue = self.GetIssueByLocalID(cnxn, project_id, local_id)
issue.deleted = deleted
self.UpdateIssue(cnxn, issue, update_cols=['deleted'])
tracker_fulltext.IndexIssues(
cnxn, [issue], user_service, self, self._config_service)
def DeleteComponentReferences(self, cnxn, component_id):
"""Delete any references to the specified component."""
# TODO(jrobbins): add tasks to re-index any affected issues.
# Note: if this call fails, some data could be left
# behind, but it would not be displayed, and it could always be
# GC'd from the DB later.
self.issue2component_tbl.Delete(cnxn, component_id=component_id)
### Local ID generation
def InitializeLocalID(self, cnxn, project_id):
"""Initialize the local ID counter for the specified project to zero.
Args:
cnxn: connection to SQL database.
project_id: int ID of the project.
"""
self.localidcounter_tbl.InsertRow(
cnxn, project_id=project_id, used_local_id=0, used_spam_id=0)
def SetUsedLocalID(self, cnxn, project_id):
"""Set the local ID counter based on existing issues.
Args:
cnxn: connection to SQL database.
project_id: int ID of the project.
"""
highest_id = self.GetHighestLocalID(cnxn, project_id)
self.localidcounter_tbl.Update(
cnxn, {'used_local_id': highest_id}, project_id=project_id)
return highest_id
def AllocateNextLocalID(self, cnxn, project_id):
"""Return the next available issue ID in the specified project.
Args:
cnxn: connection to SQL database.
project_id: int ID of the project.
Returns:
The next local ID.
"""
try:
next_local_id = self.localidcounter_tbl.IncrementCounterValue(
cnxn, 'used_local_id', project_id=project_id)
except AssertionError:
next_local_id = self.SetUsedLocalID(cnxn, project_id) + 1
return next_local_id
def SetUsedSpamID(self, cnxn, project_id):
"""Set the local ID counter based on existing issues.
Args:
cnxn: connection to SQL database.
project_id: int ID of the project.
"""
current_id = self.localidcounter_tbl.SelectValue(
cnxn, 'used_spam_id', project_id=project_id)
current_id = current_id or 0 # Will be None if project has no issues.
self.localidcounter_tbl.Update(
cnxn, {'used_spam_id': current_id + 1}, project_id=project_id)
return current_id + 1
def AllocateNextSpamLocalID(self, cnxn, project_id):
"""Return the next available spam issue ID in the specified project.
Args:
cnxn: connection to SQL database.
project_id: int ID of the project.
Returns:
The next local ID.
"""
try:
next_spam_id = self.localidcounter_tbl.IncrementCounterValue(
cnxn, 'used_spam_id', project_id=project_id)
except AssertionError:
next_spam_id = self.SetUsedSpamID(cnxn, project_id) + 1
return -next_spam_id
def GetHighestLocalID(self, cnxn, project_id):
"""Return the highest used issue ID in the specified project.
Args:
cnxn: connection to SQL database.
project_id: int ID of the project.
Returns:
The highest local ID for an active or moved issues.
"""
highest = self.issue_tbl.SelectValue(
cnxn, 'MAX(local_id)', project_id=project_id)
highest = highest or 0 # It will be None if the project has no issues.
highest_former = self.issueformerlocations_tbl.SelectValue(
cnxn, 'MAX(local_id)', project_id=project_id)
highest_former = highest_former or 0
return max(highest, highest_former)
def GetAllLocalIDsInProject(self, cnxn, project_id, min_local_id=None):
"""Return the list of local IDs only, not the actual issues.
Args:
cnxn: connection to SQL database.
project_id: the ID of the project to which the issue belongs.
min_local_id: point to start at.
Returns:
A range object of local IDs from 1 to N, or from min_local_id to N. It
may be the case that some of those local IDs are no longer used, e.g.,
if some issues were moved out of this project.
"""
if not min_local_id:
min_local_id = 1
highest_local_id = self.GetHighestLocalID(cnxn, project_id)
return range(min_local_id, highest_local_id + 1)
def ExpungeLocalIDCounters(self, cnxn, project_id):
"""Delete history of local ids that were in this project."""
self.localidcounter_tbl.Delete(cnxn, project_id=project_id)
### Comments
def _UnpackComment(self, comment_row):
"""Partially construct a Comment PB from a DB row."""
(comment_id, issue_id, created, project_id, commenter_id, content,
inbound_message, was_escaped, deleted_by, is_spam) = comment_row
comment = tracker_pb2.IssueComment()
comment.id = comment_id
comment.issue_id = issue_id
comment.timestamp = created
comment.project_id = project_id
comment.user_id = commenter_id
comment.content = content or ''
comment.inbound_message = inbound_message or ''
comment.was_escaped = bool(was_escaped)
comment.deleted_by = deleted_by or 0
comment.is_spam = bool(is_spam)
return comment
def _UnpackAmendment(self, amendment_row):
"""Construct an Amendment PB from a DB row."""
(_id, _issue_id, comment_id, field_name,
old_value, new_value, added_user_id, removed_user_id,
custom_field_name) = amendment_row
amendment = tracker_pb2.Amendment()
field_enum = tracker_pb2.FieldID(field_name.upper())
amendment.field = field_enum
# TODO(jrobbins): display old values in more cases.
if new_value is not None:
amendment.newvalue = new_value
if old_value is not None:
amendment.oldvalue = old_value
if added_user_id:
amendment.added_user_ids.append(added_user_id)
if removed_user_id:
amendment.removed_user_ids.append(removed_user_id)
if custom_field_name:
amendment.custom_field_name = custom_field_name
return amendment, comment_id
def _ConsolidateAmendments(self, amendments):
"""Consoliodate amendments of the same field in one comment into one
amendment PB."""
fields_dict = {}
result = []
for amendment in amendments:
fields_dict.setdefault(amendment.field, []).append(amendment)
for field, amendments in fields_dict.iteritems():
new_amendment = tracker_pb2.Amendment()
new_amendment.field = field
for amendment in amendments:
if amendment.newvalue is not None:
new_amendment.newvalue = amendment.newvalue
if amendment.oldvalue is not None:
new_amendment.oldvalue = amendment.oldvalue
if amendment.added_user_ids:
new_amendment.added_user_ids.extend(amendment.added_user_ids)
if amendment.removed_user_ids:
new_amendment.removed_user_ids.extend(amendment.removed_user_ids)
if amendment.custom_field_name:
new_amendment.custom_field_name = amendment.custom_field_name
result.append(new_amendment)
return result
def _UnpackAttachment(self, attachment_row):
"""Construct an Attachment PB from a DB row."""
(attachment_id, _issue_id, comment_id, filename, filesize, mimetype,
deleted, gcs_object_id) = attachment_row
attach = tracker_pb2.Attachment()
attach.attachment_id = attachment_id
attach.filename = filename
attach.filesize = filesize
attach.mimetype = mimetype
attach.deleted = bool(deleted)
attach.gcs_object_id = gcs_object_id
return attach, comment_id
def _DeserializeComments(
self, comment_rows, amendment_rows, attachment_rows):
"""Turn rows into IssueComment PBs."""
results = [] # keep objects in the same order as the rows
results_dict = {} # for fast access when joining.
for comment_row in comment_rows:
comment = self._UnpackComment(comment_row)
results.append(comment)
results_dict[comment.id] = comment
for amendment_row in amendment_rows:
amendment, comment_id = self._UnpackAmendment(amendment_row)
try:
results_dict[comment_id].amendments.extend([amendment])
except KeyError:
logging.error('Found amendment for missing comment: %r', comment_id)
for attachment_row in attachment_rows:
attach, comment_id = self._UnpackAttachment(attachment_row)
try:
results_dict[comment_id].attachments.append(attach)
except KeyError:
logging.error('Found attachment for missing comment: %r', comment_id)
for c in results:
c.amendments = self._ConsolidateAmendments(c.amendments)
return results
# TODO(jrobbins): make this a private method and expose just the interface
# needed by activities.py.
def GetComments(self, cnxn, where=None, order_by=None, **kwargs):
"""Retrieve comments from SQL."""
# Explicitly specify column Comment.id to allow joins on other tables that
# have an id column.
order_by = order_by or [('created', [])]
comment_rows = self.comment_tbl.Select(
cnxn, cols=COMMENT_COLS, where=where,
order_by=order_by, **kwargs)
cids = [row[0] for row in comment_rows]
amendment_rows = self.issueupdate_tbl.Select(
cnxn, cols=ISSUEUPDATE_COLS, comment_id=cids)
attachment_rows = self.attachment_tbl.Select(
cnxn, cols=ATTACHMENT_COLS, comment_id=cids)
comments = self._DeserializeComments(
comment_rows, amendment_rows, attachment_rows)
return comments
def GetComment(self, cnxn, comment_id):
"""Get the requested comment, or raise an exception."""
comments = self.GetComments(cnxn, id=comment_id)
try:
return comments[0]
except IndexError:
raise NoSuchCommentException()
def GetCommentsForIssue(self, cnxn, issue_id):
"""Return all IssueComment PBs for the specified issue.
Args:
cnxn: connection to SQL database.
issue_id: int global ID of the issue.
Returns:
A list of the IssueComment protocol buffers for the description
and comments on this issue.
"""
comments = self.GetComments(cnxn, issue_id=[issue_id])
for i, comment in enumerate(comments):
comment.sequence = i
return comments
def GetCommentsByID(self, cnxn, comment_ids, sequences):
"""Return all IssueComment PBs by comment ids.
Args:
cnxn: connection to SQL database.
comment_ids: a list of comment ids.
sequences: sequence of the comments.
Returns:
A list of the IssueComment protocol buffers for the description
and comments on this issue.
"""
order_by = [('created ASC', [])]
comment_rows = self.comment_tbl.Select(
cnxn, cols=COMMENT_COLS, order_by=order_by, id=comment_ids)
amendment_rows = self.issueupdate_tbl.Select(
cnxn, cols=ISSUEUPDATE_COLS, comment_id=comment_ids)
attachment_rows = self.attachment_tbl.Select(
cnxn, cols=ATTACHMENT_COLS, comment_id=comment_ids)
comments = self._DeserializeComments(
comment_rows, amendment_rows, attachment_rows)
for i in xrange(len(comment_ids)):
comments[i].sequence = sequences[i]
return comments
def GetAbbrCommentsForIssue(self, cnxn, issue_id):
"""Get all abbreviated comments for the specified issue."""
order_by = [('created ASC', [])]
comment_rows = self.comment_tbl.Select(
cnxn, cols=ABBR_COMMENT_COLS, issue_id=[issue_id], order_by=order_by)
return comment_rows
# TODO(jrobbins): remove this message because it is too slow when an issue
# has a huge number of comments.
def GetCommentsForIssues(self, cnxn, issue_ids):
"""Return all IssueComment PBs for each issue ID in the given list.
Args:
cnxn: connection to SQL database.
issue_ids: list of integer global issue IDs.
Returns:
Dict {issue_id: [IssueComment, ...]} with IssueComment protocol
buffers for the description and comments on each issue.
"""
comments = self.GetComments(cnxn, issue_id=issue_ids)
comments_dict = collections.defaultdict(list)
for comment in comments:
comment.sequence = len(comments_dict[comment.issue_id])
comments_dict[comment.issue_id].append(comment)
return comments_dict
def InsertComment(self, cnxn, comment, commit=True):
"""Store the given issue comment in SQL.
Args:
cnxn: connection to SQL database.
comment: IssueComment PB to insert into the database.
commit: set to False to avoid doing the commit for now.
"""
comment_id = self.comment_tbl.InsertRow(
cnxn, issue_id=comment.issue_id, created=comment.timestamp,
project_id=comment.project_id,
commenter_id=comment.user_id, content=comment.content,
inbound_message=comment.inbound_message,
was_escaped=comment.was_escaped,
deleted_by=comment.deleted_by or None,
is_spam=comment.is_spam,
commit=commit)
comment.id = comment_id
amendment_rows = []
for amendment in comment.amendments:
field_enum = str(amendment.field).lower()
if (amendment.get_assigned_value('newvalue') is not None and
not amendment.added_user_ids and not amendment.removed_user_ids):
amendment_rows.append((
comment.issue_id, comment_id, field_enum,
amendment.oldvalue, amendment.newvalue,
None, None, amendment.custom_field_name))
for added_user_id in amendment.added_user_ids:
amendment_rows.append((
comment.issue_id, comment_id, field_enum, None, None,
added_user_id, None, amendment.custom_field_name))
for removed_user_id in amendment.removed_user_ids:
amendment_rows.append((
comment.issue_id, comment_id, field_enum, None, None,
None, removed_user_id, amendment.custom_field_name))
# ISSUEUPDATE_COLS[1:] to skip id column.
self.issueupdate_tbl.InsertRows(
cnxn, ISSUEUPDATE_COLS[1:], amendment_rows, commit=commit)
attachment_rows = []
for attach in comment.attachments:
attachment_rows.append([
comment.issue_id, comment.id, attach.filename, attach.filesize,
attach.mimetype, attach.deleted, attach.gcs_object_id])
self.attachment_tbl.InsertRows(
cnxn, ATTACHMENT_COLS[1:], attachment_rows, commit=commit)
def _UpdateComment(self, cnxn, comment, update_cols=None):
"""Update the given issue comment in SQL.
Args:
cnxn: connection to SQL database.
comment: IssueComment PB to update in the database.
update_cols: optional list of just the field names to update.
"""
delta = {
'commenter_id': comment.user_id,
'content': comment.content,
'deleted_by': comment.deleted_by or None,
'is_spam': comment.is_spam,
}
if update_cols is not None:
delta = {key: val for key, val in delta.iteritems()
if key in update_cols}
self.comment_tbl.Update(cnxn, delta, id=comment.id)
def _MakeIssueComment(
self, project_id, user_id, content, inbound_message=None,
amendments=None, attachments=None, timestamp=None, was_escaped=False,
is_spam=False):
"""Create in IssueComment protocol buffer in RAM.
Args:
project_id: Project with the issue.
user_id: the user ID of the user who entered the comment.
content: string body of the comment.
inbound_message: optional string full text of an email that
caused this comment to be added.
amendments: list of Amendment PBs describing the
metadata changes that the user made along w/ comment.
attachments: [(filename, contents, mimetype),...] attachments uploaded at
the time the comment was made.
timestamp: time at which the comment was made, defaults to now.
was_escaped: True if the comment was HTML escaped already.
is_spam: True if the comment was classified as spam.
Returns:
The new IssueComment protocol buffer.
The content may have some markup done during input processing.
Any attachments are immediately stored.
"""
comment = tracker_pb2.IssueComment()
comment.project_id = project_id
comment.user_id = user_id
comment.content = content or ''
comment.was_escaped = was_escaped
comment.is_spam = is_spam
if not timestamp:
timestamp = int(time.time())
comment.timestamp = int(timestamp)
if inbound_message:
comment.inbound_message = inbound_message
if amendments:
logging.info('amendments is %r', amendments)
comment.amendments.extend(amendments)
if attachments:
for filename, body, mimetype in attachments:
gcs_object_id = gcs_helpers.StoreObjectInGCS(body, mimetype, project_id)
attach = tracker_pb2.Attachment()
# attachment id is determined later by the SQL DB.
attach.filename = filename
attach.filesize = len(body)
attach.mimetype = mimetype
attach.gcs_object_id = gcs_object_id
comment.attachments.extend([attach])
logging.info("Save attachment with object_id: %s" % gcs_object_id)
return comment
def CreateIssueComment(
self, cnxn, project_id, local_id, user_id, content, inbound_message=None,
amendments=None, attachments=None, timestamp=None, is_spam=False,
commit=True):
"""Create and store a new comment on the specified issue.
Args:
cnxn: connection to SQL database.
project_id: int ID of the current Project.
local_id: the issue on which to add the comment.
user_id: the user ID of the user who entered the comment.
content: string body of the comment.
inbound_message: optional string full text of an email that caused
this comment to be added.
amendments: list of Amendment PBs describing the
metadata changes that the user made along w/ comment.
attachments: [(filename, contents, mimetype),...] attachments uploaded at
the time the comment was made.
timestamp: time at which the comment was made, defaults to now.
is_spam: True if the comment is classified as spam.
commit: set to False to not commit to DB yet.
Returns:
The new IssueComment protocol buffer.
Note that we assume that the content is safe to echo out
again. The content may have some markup done during input
processing.
"""
issue = self.GetIssueByLocalID(cnxn, project_id, local_id)
comment = self._MakeIssueComment(
issue.project_id, user_id, content, amendments=amendments,
inbound_message=inbound_message, attachments=attachments,
timestamp=timestamp, is_spam=is_spam)
comment.issue_id = issue.issue_id
if attachments:
issue.attachment_count = issue.attachment_count + len(attachments)
self.UpdateIssue(cnxn, issue, update_cols=['attachment_count'])
self.InsertComment(cnxn, comment, commit=commit)
return comment
def SoftDeleteComment(
self, cnxn, project_id, local_id, sequence_num, deleted_by_user_id,
user_service, delete=True, reindex=True, is_spam=False):
"""Mark comment as un/deleted, which shows/hides it from average users."""
issue = self.GetIssueByLocalID(cnxn, project_id, local_id)
all_comments = self.GetCommentsForIssue(cnxn, issue.issue_id)
try:
issue_comment = all_comments[sequence_num]
except IndexError:
logging.warning(
'Tried to (un)delete non-existent comment #%s in issue %s:%s',
sequence_num, project_id, local_id)
return
# Update number of attachments
attachments = 0
if issue_comment.attachments:
for attachment in issue_comment.attachments:
if not attachment.deleted:
attachments += 1
# Delete only if it's not in deleted state
if delete:
if not issue_comment.deleted_by:
issue_comment.deleted_by = deleted_by_user_id
issue.attachment_count = issue.attachment_count - attachments
# Undelete only if it's in deleted state
elif issue_comment.deleted_by:
issue_comment.deleted_by = 0
issue.attachment_count = issue.attachment_count + attachments
issue_comment.is_spam = is_spam
self._UpdateComment(
cnxn, issue_comment, update_cols=['deleted_by', 'is_spam'])
self.UpdateIssue(cnxn, issue, update_cols=['attachment_count'])
# Reindex the issue to take the comment deletion/undeletion into account.
if reindex:
tracker_fulltext.IndexIssues(
cnxn, [issue], user_service, self, self._config_service)
### Attachments
def GetAttachmentAndContext(self, cnxn, attachment_id):
"""Load a IssueAttachment from database, and its comment ID and IID.
Args:
cnxn: connection to SQL database.
attachment_id: long integer unique ID of desired issue attachment.
Returns:
An Attachment protocol buffer that contains metadata about the attached
file, or None if it doesn't exist. Also, the comment ID and issue IID
of the comment and issue that contain this attachment.
Raises:
NoSuchAttachmentException: the attachment was not found.
"""
if attachment_id is None:
raise NoSuchAttachmentException()
attachment_row = self.attachment_tbl.SelectRow(
cnxn, cols=ATTACHMENT_COLS, id=attachment_id)
if attachment_row:
(attach_id, issue_id, comment_id, filename, filesize, mimetype,
deleted, gcs_object_id) = attachment_row
if not deleted:
attachment = tracker_pb2.Attachment(
attachment_id=attach_id, filename=filename, filesize=filesize,
mimetype=mimetype, deleted=bool(deleted),
gcs_object_id=gcs_object_id)
return attachment, comment_id, issue_id
raise NoSuchAttachmentException()
def _UpdateAttachment(self, cnxn, attach, update_cols=None):
"""Update attachment metadata in the DB.
Args:
cnxn: connection to SQL database.
attach: IssueAttachment PB to update in the DB.
update_cols: optional list of just the field names to update.
"""
delta = {
'filename': attach.filename,
'filesize': attach.filesize,
'mimetype': attach.mimetype,
'deleted': bool(attach.deleted),
}
if update_cols is not None:
delta = {key: val for key, val in delta.iteritems()
if key in update_cols}
self.attachment_tbl.Update(cnxn, delta, id=attach.attachment_id)
def SoftDeleteAttachment(
self, cnxn, project_id, local_id, seq_num, attach_id, user_service,
delete=True, index_now=True):
"""Mark attachment as un/deleted, which shows/hides it from avg users."""
issue = self.GetIssueByLocalID(cnxn, project_id, local_id)
all_comments = self.GetCommentsForIssue(cnxn, issue.issue_id)
try:
issue_comment = all_comments[seq_num]
except IndexError:
logging.warning(
'Tried to (un)delete attachment on non-existent comment #%s in '
'issue %s:%s', seq_num, project_id, local_id)
return
attachment = None
for attach in issue_comment.attachments:
if attach.attachment_id == attach_id:
attachment = attach
if not attachment:
logging.warning(
'Tried to (un)delete non-existent attachment #%s in project '
'%s issue %s', attach_id, project_id, local_id)
return
if not issue_comment.deleted_by:
# Decrement attachment count only if it's not in deleted state
if delete:
if not attachment.deleted:
issue.attachment_count = issue.attachment_count - 1
# Increment attachment count only if it's in deleted state
elif attachment.deleted:
issue.attachment_count = issue.attachment_count + 1
attachment.deleted = delete
self._UpdateAttachment(cnxn, attachment, update_cols=['deleted'])
self.UpdateIssue(cnxn, issue, update_cols=['attachment_count'])
if index_now:
tracker_fulltext.IndexIssues(
cnxn, [issue], user_service, self, self._config_service)
### Reindex queue
def EnqueueIssuesForIndexing(self, cnxn, issue_ids):
"""Add the given issue IDs to the ReindexQueue table."""
reindex_rows = [(issue_id,) for issue_id in issue_ids]
self.reindexqueue_tbl.InsertRows(
cnxn, ['issue_id'], reindex_rows, ignore=True)
def ReindexIssues(self, cnxn, num_to_reindex, user_service):
"""Reindex some issues specified in the IndexQueue table."""
rows = self.reindexqueue_tbl.Select(
cnxn, order_by=[('created', [])], limit=num_to_reindex)
issue_ids = [row[0] for row in rows]
if issue_ids:
issues = self.GetIssues(cnxn, issue_ids)
tracker_fulltext.IndexIssues(
cnxn, issues, user_service, self, self._config_service)
self.reindexqueue_tbl.Delete(cnxn, issue_id=issue_ids)
return len(issue_ids)
### Search functions
def RunIssueQuery(
self, cnxn, left_joins, where, order_by, shard_id=None, limit=None):
"""Run a SQL query to find matching issue IDs.
Args:
cnxn: connection to SQL database.
left_joins: list of SQL LEFT JOIN clauses.
where: list of SQL WHERE clauses.
order_by: list of SQL ORDER BY clauses.
shard_id: int shard ID to focus the search.
limit: int maximum number of results, defaults to
settings.search_limit_per_shard.
Returns:
(issue_ids, capped) where issue_ids is a list of the result issue IDs,
and capped is True if the number of results reached the limit.
"""
limit = limit or settings.search_limit_per_shard
where = where + [('Issue.deleted = %s', [False])]
rows = self.issue_tbl.Select(
cnxn, shard_id=shard_id, distinct=True, cols=['Issue.id'],
left_joins=left_joins, where=where, order_by=order_by,
limit=limit)
issue_ids = [row[0] for row in rows]
capped = len(issue_ids) >= limit
return issue_ids, capped
def GetIIDsByLabelIDs(self, cnxn, label_ids, project_id, shard_id):
"""Return a list of IIDs for issues with any of the given label IDs."""
where = []
if shard_id is not None:
slice_term = ('shard = %s', [shard_id])
where.append(slice_term)
rows = self.issue_tbl.Select(
cnxn, shard_id=shard_id, cols=['id'],
left_joins=[('Issue2Label ON Issue.id = Issue2Label.issue_id', [])],
label_id=label_ids, project_id=project_id, where=where)
return [row[0] for row in rows]
def GetIIDsByParticipant(self, cnxn, user_ids, project_ids, shard_id):
"""Return IIDs for issues where any of the given users participate."""
iids = []
where = []
if shard_id is not None:
where.append(('shard = %s', [shard_id]))
if project_ids:
cond_str = 'Issue.project_id IN (%s)' % sql.PlaceHolders(project_ids)
where.append((cond_str, project_ids))
# TODO(jrobbins): Combine these 3 queries into one with ORs. It currently
# is not the bottleneck.
rows = self.issue_tbl.Select(
cnxn, cols=['id'], reporter_id=user_ids,
where=where, shard_id=shard_id)
for row in rows:
iids.append(row[0])
rows = self.issue_tbl.Select(
cnxn, cols=['id'], owner_id=user_ids,
where=where, shard_id=shard_id)
for row in rows:
iids.append(row[0])
rows = self.issue_tbl.Select(
cnxn, cols=['id'], derived_owner_id=user_ids,
where=where, shard_id=shard_id)
for row in rows:
iids.append(row[0])
rows = self.issue_tbl.Select(
cnxn, cols=['id'],
left_joins=[('Issue2Cc ON Issue2Cc.issue_id = Issue.id', [])],
cc_id=user_ids,
where=where + [('cc_id IS NOT NULL', [])],
shard_id=shard_id)
for row in rows:
iids.append(row[0])
rows = self.issue_tbl.Select(
cnxn, cols=['Issue.id'],
left_joins=[
('Issue2FieldValue ON Issue.id = Issue2FieldValue.issue_id', []),
('FieldDef ON Issue2FieldValue.field_id = FieldDef.id', [])],
user_id=user_ids, grants_perm='View',
where=where + [('user_id IS NOT NULL', [])],
shard_id=shard_id)
for row in rows:
iids.append(row[0])
return iids
def _UpdateClosedTimestamp(config, issue, old_effective_status):
"""Sets or unsets the closed_timestamp based based on status changes.
If the status is changing from open to closed, the closed_timestamp is set to
the current time.
If the status is changing form closed to open, the close_timestamp is unset.
If the status is changing from one closed to another closed, or from one
open to another open, no operations are performed.
Args:
config: the project configuration
issue: the issue being updated (a protocol buffer)
old_effective_status: the old issue status string. E.g., 'New'
"""
# open -> closed
if (tracker_helpers.MeansOpenInProject(old_effective_status, config)
and not tracker_helpers.MeansOpenInProject(
tracker_bizobj.GetStatus(issue), config)):
logging.info('setting closed_timestamp on issue: %d', issue.local_id)
issue.closed_timestamp = int(time.time())
return
# closed -> open
if (not tracker_helpers.MeansOpenInProject(old_effective_status, config)
and tracker_helpers.MeansOpenInProject(
tracker_bizobj.GetStatus(issue), config)):
logging.info('clearing closed_timestamp on issue: %s', issue.local_id)
issue.reset('closed_timestamp')
return
class Error(Exception):
"""Base exception class for this package."""
pass
class NoSuchIssueException(Error):
"""The requested issue was not found."""
pass
class NoSuchAttachmentException(Error):
"""The requested attachment was not found."""
pass
class NoSuchCommentException(Error):
"""The requested comment was not found."""
pass
class MidAirCollisionException(Error):
"""The item was updated by another user at the same time."""
def __init__(self, name, continue_issue_id):
super(MidAirCollisionException, self).__init__()
self.name = name # human-readable name for the artifact being edited.
self.continue_issue_id = continue_issue_id # ID of issue to start over.