// Copyright 2016 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package apiservers
import (
// SchedulerServer implements scheduler.Scheduler API.
type SchedulerServer struct {
Engine engine.Engine
Catalog catalog.Catalog
var _ scheduler.SchedulerServer = (*SchedulerServer)(nil)
// GetJobs fetches all jobs satisfying JobsRequest and visibility ACLs.
func (s *SchedulerServer) GetJobs(ctx context.Context, in *scheduler.JobsRequest) (*scheduler.JobsReply, error) {
if in.GetCursor() != "" {
// Paging in GetJobs isn't implemented until we have enough jobs to care.
// Until then, not empty cursor implies no more jobs to return.
return &scheduler.JobsReply{Jobs: []*scheduler.Job{}, NextCursor: ""}, nil
var ejobs []*engine.Job
var err error
if in.GetProject() == "" {
ejobs, err = s.Engine.GetVisibleJobs(ctx)
} else {
ejobs, err = s.Engine.GetVisibleProjectJobs(ctx, in.GetProject())
if err != nil {
return nil, status.Errorf(codes.Internal, "internal error: %s", err)
jobs := make([]*scheduler.Job, len(ejobs))
for i, ej := range ejobs {
traits, err := presentation.GetJobTraits(ctx, s.Catalog, ej)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get traits: %s", err)
jobs[i] = &scheduler.Job{
JobRef: &scheduler.JobRef{
Project: ej.ProjectID,
Job: ej.JobName(),
Schedule: ej.Schedule,
State: &scheduler.JobState{
UiStatus: string(presentation.GetPublicStateKind(ej, traits)),
Paused: ej.Paused,
return &scheduler.JobsReply{Jobs: jobs, NextCursor: ""}, nil
func (s *SchedulerServer) GetInvocations(ctx context.Context, in *scheduler.InvocationsRequest) (*scheduler.InvocationsReply, error) {
job, err := s.getJob(ctx, in.GetJobRef())
if err != nil {
return nil, err
pageSize := 50
if in.PageSize > 0 && int(in.PageSize) < pageSize {
pageSize = int(in.PageSize)
einvs, cursor, err := s.Engine.ListInvocations(ctx, job, engine.ListInvocationsOpts{
PageSize: pageSize,
Cursor: in.Cursor,
if err != nil {
return nil, status.Errorf(codes.Internal, "internal error: %s", err)
invs := make([]*scheduler.Invocation, len(einvs))
for i, einv := range einvs {
invs[i] = invToProto(einv, in.JobRef)
return &scheduler.InvocationsReply{Invocations: invs, NextCursor: cursor}, nil
func (s *SchedulerServer) GetInvocation(ctx context.Context, in *scheduler.InvocationRef) (*scheduler.Invocation, error) {
job, err := s.getJob(ctx, in.GetJobRef())
if err != nil {
return nil, err
switch inv, err := s.Engine.GetInvocation(ctx, job, in.InvocationId); {
case err == engine.ErrNoSuchInvocation:
return nil, status.Errorf(codes.NotFound, "no such invocation")
case err != nil:
return nil, status.Errorf(codes.Internal, "internal error: %s", err)
return invToProto(inv, in.JobRef), nil
//// Actions.
func (s *SchedulerServer) PauseJob(ctx context.Context, in *scheduler.JobRef) (*emptypb.Empty, error) {
return s.runAction(ctx, in, func(job *engine.Job) error {
return s.Engine.PauseJob(ctx, job, "paused through RPC API")
func (s *SchedulerServer) ResumeJob(ctx context.Context, in *scheduler.JobRef) (*emptypb.Empty, error) {
return s.runAction(ctx, in, func(job *engine.Job) error {
return s.Engine.ResumeJob(ctx, job, "resumed through RPC API")
func (s *SchedulerServer) AbortJob(ctx context.Context, in *scheduler.JobRef) (*emptypb.Empty, error) {
return s.runAction(ctx, in, func(job *engine.Job) error {
return s.Engine.AbortJob(ctx, job)
func (s *SchedulerServer) AbortInvocation(ctx context.Context, in *scheduler.InvocationRef) (*emptypb.Empty, error) {
return s.runAction(ctx, in.GetJobRef(), func(job *engine.Job) error {
return s.Engine.AbortInvocation(ctx, job, in.GetInvocationId())
func (s *SchedulerServer) EmitTriggers(ctx context.Context, in *scheduler.EmitTriggersRequest) (*emptypb.Empty, error) {
caller := auth.CurrentIdentity(ctx)
// Optionally use client-provided time if it is within reasonable margins.
// This is needed to make EmitTriggers idempotent (when it emits a batch).
now := clock.Now(ctx)
if in.Timestamp != 0 {
if in.Timestamp < 0 || in.Timestamp > (1<<53) {
return nil, status.Errorf(codes.InvalidArgument,
"the provided timestamp doesn't look like a valid number of microseconds since epoch")
ts := time.Unix(0, in.Timestamp*1000)
if ts.After(now.Add(15 * time.Minute)) {
return nil, status.Errorf(codes.InvalidArgument,
"the provided timestamp (%s) is more than 15 min in the future based on the server clock value %s",
ts, now)
if ts.Before(now.Add(-15 * time.Minute)) {
return nil, status.Errorf(codes.InvalidArgument,
"the provided timestamp (%s) is more than 15 min in the past based on the server clock value %s",
ts, now)
now = ts
// Build a mapping "jobID => list of triggers", convert public representation
// of a trigger into internal one, validating them.
triggersPerJobID := map[string][]*internal.Trigger{}
for index, batch := range in.Batches {
tr, err := internalTrigger(batch.Trigger, now, caller, index)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "bad trigger #%d (%q) - %s", index, batch.Trigger.Id, err)
for _, jobRef := range batch.Jobs {
jobID := jobRef.GetProject() + "/" + jobRef.GetJob()
triggersPerJobID[jobID] = append(triggersPerJobID[jobID], tr)
// Check jobs presence and "" permission.
jobIDs := make([]string, 0, len(triggersPerJobID))
for id := range triggersPerJobID {
jobIDs = append(jobIDs, id)
visible, err := s.Engine.GetVisibleJobBatch(ctx, jobIDs)
switch {
case err != nil:
return nil, status.Errorf(codes.Internal, "internal error: %s", err)
case len(visible) != len(jobIDs):
missing := make([]string, 0, len(jobIDs)-len(visible))
for _, j := range jobIDs {
if visible[j] == nil {
missing = append(missing, j)
return nil, status.Errorf(codes.NotFound,
"no such job or no permission to see it: %s", strings.Join(missing, ", "))
// Submit the request to the Engine.
triggersPerJob := make(map[*engine.Job][]*internal.Trigger, len(visible))
for id, job := range visible {
triggersPerJob[job] = triggersPerJobID[id]
switch err = s.Engine.EmitTriggers(ctx, triggersPerJob); {
case err == engine.ErrNoPermission:
return nil, status.Errorf(codes.PermissionDenied, "no permission to execute the action")
case err != nil:
return nil, status.Errorf(codes.Internal, "internal error: %s", err)
return &emptypb.Empty{}, nil
//// Private helpers.
// getJob fetches a job, checking "" permission.
// Returns grpc errors that can be returned as is.
func (s *SchedulerServer) getJob(ctx context.Context, ref *scheduler.JobRef) (*engine.Job, error) {
jobID := ref.GetProject() + "/" + ref.GetJob()
switch job, err := s.Engine.GetVisibleJob(ctx, jobID); {
case err == nil:
return job, nil
case err == engine.ErrNoSuchJob:
return nil, status.Errorf(codes.NotFound, "no such job or no permission to see it")
return nil, status.Errorf(codes.Internal, "internal error when fetching job: %s", err)
func (s *SchedulerServer) runAction(ctx context.Context, ref *scheduler.JobRef, action func(*engine.Job) error) (*emptypb.Empty, error) {
job, err := s.getJob(ctx, ref)
if err != nil {
return nil, err
switch err := action(job); {
case err == nil:
return &emptypb.Empty{}, nil
case err == engine.ErrNoSuchJob:
return nil, status.Errorf(codes.NotFound, "no such job or no permission to see it")
case err == engine.ErrNoPermission:
return nil, status.Errorf(codes.PermissionDenied, "no permission to execute the action")
case err == engine.ErrNoSuchInvocation:
return nil, status.Errorf(codes.NotFound, "no such invocation")
return nil, status.Errorf(codes.Internal, "internal error: %s", err)
func internalTrigger(t *scheduler.Trigger, now time.Time, who identity.Identity, index int) (*internal.Trigger, error) {
if t.Id == "" {
return nil, fmt.Errorf("trigger id is required")
out := &internal.Trigger{
Id: t.Id,
Created: timestamppb.New(now),
OrderInBatch: int64(index),
Title: t.Title,
Url: t.Url,
EmittedByUser: string(who),
if t.Payload != nil {
// Ugh...
switch v := t.Payload.(type) {
case *scheduler.Trigger_Cron:
return nil, errors.New("emitting cron triggers through API is not allowed")
case *scheduler.Trigger_Webui:
return nil, errors.New("emitting web UI triggers through API is not allowed")
case *scheduler.Trigger_Noop:
out.Payload = &internal.Trigger_Noop{Noop: v.Noop}
case *scheduler.Trigger_Gitiles:
out.Payload = &internal.Trigger_Gitiles{Gitiles: v.Gitiles}
case *scheduler.Trigger_Buildbucket:
out.Payload = &internal.Trigger_Buildbucket{Buildbucket: v.Buildbucket}
return nil, errors.New("unrecognized trigger payload")
return out, nil
func invToProto(inv *engine.Invocation, jobRef *scheduler.JobRef) *scheduler.Invocation {
out := &scheduler.Invocation{
InvocationRef: &scheduler.InvocationRef{
JobRef: jobRef,
InvocationId: inv.ID,
StartedTs: inv.Started.UnixNano() / 1000,
TriggeredBy: string(inv.TriggeredBy),
Status: string(inv.Status),
Final: inv.Status.Final(),
ConfigRevision: inv.Revision,
ViewUrl: inv.ViewURL,
if inv.Status.Final() {
out.FinishedTs = inv.Finished.UnixNano() / 1000
return out