blob: 160fca1befeb5bedd27ee231577684ff566a3f25 [file] [log] [blame]
# Copyright 2018 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""API for interacting with the LUCI Scheduler service.
Depends on 'prpc' binary available in $PATH:
Documentation for scheduler API is in
RPCExplorer available at
import uuid
from recipe_engine import recipe_api
class SchedulerApi(recipe_api.RecipeApi):
"""A module for interacting with LUCI Scheduler service."""
def __init__(self, **kwargs):
super(SchedulerApi, self).__init__(**kwargs)
self._host = ''
self._fake_uuid_count = 0
def set_host(self, host):
"""Changes the backend hostname used by this module.
host (str): server host (e.g. '').
self._host = host
class Trigger(object):
"""Generic Trigger accepted by LUCI Scheduler API.
All supported triggers are documented here:
def __init__(self, id=None, title=None, url=None, payload=None):
self._id = id
self._title = title
self._url = url
self._payload = payload
def _serialize(self, api_self):
t = {}
t['id'] = self._id or api_self._next_uuid()
t['title'] = self._title or ('%s/%s' % (
# TODO(tandrii): find a way to get URL of current build.
if self._url:
t['url'] = self._url
return t
def _serialize_payload(self, api_self):
return self._payload
class BuildbucketTrigger(Trigger):
"""Trigger with buildbucket payload for buildbucket jobs.
* properties (dict, optional): key -> value properties.
* tags (dict, optional): custom tags to add. See also `inherit_tags`.
If tag's value is None, this tag will be removed from resulting tags,
however if you rely on this, consider using `inherit_tags=False`
* inherit_tags (bool): if true (default), auto-adds tags using
`api.buildbucket.tags_for_child_build` api.
def __init__(self, properties=None, tags=None, inherit_tags=True, **kwargs):
super(SchedulerApi.BuildbucketTrigger, self).__init__(**kwargs)
self._properties = properties
self._tags = tags
self._inherit_tags = inherit_tags
def _serialize_payload(self, api_self):
tags = {}
if self._inherit_tags:
tags = api_self.m.buildbucket.tags_for_child_build.copy()
if self._tags:
return {'buildbucket': {
'properties': self._properties or {},
'tags': map(':'.join, sorted(
(k, v) for k, v in tags.iteritems() if v is not None)),
class GitilesTrigger(Trigger):
"""Trigger with new Gitiles commit payload, typically for buildbucket jobs.
repo (str): URL of a repo that changed.
ref (str): a ref that changed, in full, e.g. "refs/heads/master".
revision (str): a revision (SHA1 in hex) pointed to by the ref.
def __init__(self, repo, ref, revision, **kwargs):
kwargs['payload'] = {'gitiles': {
'repo': repo,
'ref': ref,
'revision': revision,
super(SchedulerApi.GitilesTrigger, self).__init__(**kwargs)
def emit_trigger(self, trigger, project, jobs, step_name=None):
"""Emits trigger to one or more jobs of a given project.
trigger (Trigger): defines payload to trigger jobs with.
project (str): name of the project in LUCI Config service, which is used
by LUCI Scheduler instance. See
jobs (iterable of str): job names per LUCI Scheduler config for the given
project. These typically are the same as builder names.
return self.emit_triggers([(trigger, project, jobs)], step_name=step_name)
def emit_triggers(
self, trigger_project_jobs, timestamp_usec=None, step_name=None):
"""Emits a batch of triggers spanning one or more projects.
Up to date documentation is at
trigger_project_jobs (iterable of tuples(trigger, project, jobs)):
each tuple corresponds to parameters of `emit_trigger` API above.
timestamp_usec (int): unix timestamp in microseconds.
Useful for idempotency of calls if your recipe is doing its own retries.
req = {
'batches': [
'trigger': trigger._serialize(self),
'jobs': [{'project': project, 'job': job} for job in jobs],
for trigger, project, jobs in trigger_project_jobs
if timestamp_usec:
assert isinstance(timestamp_usec, int), timestamp_usec
timestamp_usec = int(self.m.time.time() * 1e6)
req['timestamp'] = timestamp_usec
# There is no output from EmitTriggers API.
'EmitTriggers', req, step_name=step_name,
step_test_data=lambda: self.m.json.test_api.output_stream({}))
def _run(self, method, input_data, step_test_data=None, step_name=None):
assert self.m.runtime.is_luci, 'scheduler module only works on LUCI stack'
# TODO(tandrii): encapsulate running prpc command in a standalone module.
step_name = step_name or ('luci-scheduler.' + method)
args = ['prpc', 'call', '-format=json', self._host,
'scheduler.Scheduler.' + method]
step_result = None
step_result = self.m.step(
# TODO(tandrii): add hostname to step presentation's links.
# TODO(tandrii): handle errors nicely.
self.m.step.active_result.presentation.step_text = 'input %s' % (
self.m.json.dumps(input_data, indent=4))
return step_result.stdout
def _next_uuid(self):
if self._test_data.enabled:
self._fake_uuid_count += 1
return '6a0a73b0-070b-492b-9135-9f26a2a' + '%05d' % (
else: # pragma: no cover
return str(uuid.uuid4())