blob: 84cff772b57df4449143bca6b00a4f57e29e3edd [file] [log] [blame]
// Copyright 2021 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package handler
import (
"context"
"time"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/cv/internal/run"
"go.chromium.org/luci/cv/internal/run/eventpb"
"go.chromium.org/luci/cv/internal/run/impl/state"
)
// longOpGracePeriod is additional time waited for the long operation
// completion event to be received before force-expiring it.
const longOpGracePeriod = time.Minute
// OnLongOpCompleted implements Handler interface.
func (impl *Impl) OnLongOpCompleted(ctx context.Context, rs *state.RunState, result *eventpb.LongOpCompleted) (*Result, error) {
if run.IsEnded(rs.Status) {
logging.Debugf(ctx, "Ignoring %s long operation %q because Run is %s", result.GetStatus(), result.GetOperationId(), rs.Status)
return &Result{State: rs}, nil
}
op := rs.OngoingLongOps.GetOps()[result.GetOperationId()]
if op == nil {
logging.Warningf(ctx, "Long operation %q has no entry in Run (maybe already expired?)", result.GetOperationId())
return &Result{State: rs}, nil
}
switch w := op.GetWork().(type) {
case *run.OngoingLongOps_Op_PostStartMessage:
return impl.onCompletedPostStartMessage(ctx, rs, op, result)
case *run.OngoingLongOps_Op_CancelTriggers:
return impl.onCompletedCancelTriggers(ctx, rs, op, result)
case *run.OngoingLongOps_Op_ExecuteTryjobs:
return impl.onCompletedExecuteTryjobs(ctx, rs, op, result)
default:
logging.Errorf(ctx, "Unknown long operation %q work type %T finished with:\n%s", result.GetOperationId(), w, result)
// Remove the long op from the Run anyway, and move on.
rs = rs.ShallowCopy()
rs.RemoveCompletedLongOp(result.GetOperationId())
return &Result{State: rs}, nil
}
}
// processExpiredLongOps checks for and handles any long operations whose
// deadline has passed.
//
// Normally, a long operation is expected to send LongOpCompleted event before
// the deadline, which is then processed by OnLongOpCompleted() and ultimately
// removed from the Run.OngoingLongOps.
//
// processExpiredLongOps is a fail-safe for abnormal cases to ensure that a Run
// doesn't remain stuck.
func (impl *Impl) processExpiredLongOps(ctx context.Context, rs *state.RunState) (*Result, error) {
cutoff := clock.Now(ctx).Add(-longOpGracePeriod)
for opID, op := range rs.OngoingLongOps.GetOps() {
if op.GetDeadline().AsTime().Before(cutoff) {
logging.Warningf(ctx, "Long operation %q has expired at %s", opID, op.GetDeadline().AsTime())
// In practice, there should be at most 1 ongoing long op.
// TODO(tandrii): once `Result` objects can be combined, process all
// expired long ops at once.
return impl.OnLongOpCompleted(ctx, rs, &eventpb.LongOpCompleted{
OperationId: opID,
Status: eventpb.LongOpCompleted_EXPIRED,
})
}
}
return &Result{State: rs}, nil
}