blob: 9c2871e62d325d0f0d0e63fe2f5b74421bf063ef [file] [log] [blame]
# Copyright 2020 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""Functions to communicate with ResultDB for swarming tasks."""
import logging
import uuid
from google.appengine.api import app_identity
from google.appengine.ext import ndb
from components import net
from components import prpc
from components import utils
from server import config
@ndb.tasklet
def create_invocation_async(task_run_id, realm, deadline):
"""This is wrapper for CreateInvocation API.
Returns:
update-token for created invocation.
"""
hostname = app_identity.get_default_version_hostname()
response_headers = {}
assert deadline.utcoffset(
) is None, "non-default timezone is not supported, %s" % deadline.utcoffset()
yield _call_resultdb_recorder_api_async(
'CreateInvocation', {
'requestId': str(uuid.uuid4()),
'invocationId': _get_invocation_id(task_run_id),
'invocation': {
'producerResource': '//%s/tasks/%s' % (hostname, task_run_id),
'realm': realm,
'deadline': deadline.isoformat() + 'Z',
}
},
project_id=realm.split(':')[0],
response_headers=response_headers)
update_token = response_headers.get('update-token')
assert update_token, ("response_headers should have valid update-token: %s" %
response_headers)
raise ndb.Return(update_token)
@ndb.tasklet
def finalize_invocation_async(task_run_id, update_token):
"""This is wrapper for FinalizeInvocation API."""
try:
invocation_name = get_invocation_name(task_run_id)
yield _call_resultdb_recorder_api_async(
'FinalizeInvocation', {
'name': invocation_name,
},
headers={'update-token': update_token},
scopes=None)
except net.Error:
logging.exception('Failed to finalize %s', invocation_name)
def get_invocation_name(task_run_id):
return 'invocations/%s' % _get_invocation_id(task_run_id)
### Private code
def _get_invocation_id(task_run_id):
hostname = app_identity.get_default_version_hostname()
return 'task-%s-%s' % (hostname, task_run_id)
@ndb.tasklet
def _call_resultdb_recorder_api_async(method,
request,
headers=None,
response_headers=None,
scopes=(net.EMAIL_SCOPE,),
project_id=None):
cfg = config.settings()
rdb_url = cfg.resultdb.server
if not rdb_url:
raise ValueError('ResultDB integration is not configured')
utils.validate_root_service_url(rdb_url)
# See Recoder API for ResultDB in
# https://chromium.googlesource.com/infra/luci/luci-go/+/HEAD/resultdb/proto/rpc/v1/recorder.proto
# But beware that proto JSON serialization uses camelCase, not snake_case.
yield net.json_request_async(
url='%s/prpc/luci.resultdb.v1.Recorder/%s' % (rdb_url, method),
method='POST',
payload=request,
scopes=scopes,
headers=headers,
response_headers=response_headers,
project_id=project_id)