blob: 6bb92120b51a62bfa57db89e831cc43ea84b0b7f [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package jobsim
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"net/http"
"time"
"github.com/golang/protobuf/jsonpb"
"google.golang.org/grpc/codes"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/data/rand/cryptorand"
"go.chromium.org/luci/common/lhttp"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/proto/google"
"go.chromium.org/luci/dm/api/distributor/jobsim"
dm "go.chromium.org/luci/dm/api/service/v1"
"go.chromium.org/luci/grpc/grpcutil"
"go.chromium.org/luci/grpc/prpc"
authlib "go.chromium.org/luci/server/auth"
)
// state is the opaque state data that DM will pass between re-executions of the
// same Attempt. For jobsim, which just does accumulating math calculations,
// this state is very simple. For a recipe, it could be more complicated, and on
// swarming, 'state' could be e.g. ISOLATED_OUTDIR's isolate hash.
//
// Generically, this COULD be used to cache e.g. all previous dependencies, but
// it's generally better to cache the result of consuming those dependencies,
// if possible (to avoid doing unnecessary work on subsequent executions).
type state struct {
Stage int
Sum int64
}
func (st *state) toPersistentState() string {
ret, err := json.Marshal(st)
if err != nil {
panic(err)
}
return string(ret)
}
func (st *state) fromPersistentState(s string) (err error) {
if s == "" {
return nil
}
return json.Unmarshal([]byte(s), st)
}
// runner holds the context for a single execution of the JobSim task. You could
// think of this as the recipe engine. Everything that the runner does would
// need to be done by any DM client during a single Execution in some form or
// another.
type runner struct {
c context.Context
// auth contains the current Execution_Auth for this Execution. It begins with
// Token == activation token (what DM submits to swarming), which is then
// transmuted by activateExecution into an execution token (basically, the
// Execution becomes active on it's first contact with DM, which prevents
// accidental retries/duplication of Execution work. Retries of failed
// Executions need to be handled by DM for this reason).
auth *dm.Execution_Auth
// dmc is just a DM pRPC client
dmc dm.DepsClient
// state is the mutable state for this Execution. It begins with the value of
// the previous Execution. When this execution finishes, the new value will be
// passed to the next Execution.
state *state
}
// getRandom returns a random byte sequence of `want` length, or an error if
// that number of random bytes could not be generated.
//
// This uses cryptorand to generate cryptographically secure random numbers.
func getRandom(c context.Context, want int) ([]byte, error) {
ret := make([]byte, want)
_, err := cryptorand.Read(c, ret)
if err != nil {
logging.WithError(err).Errorf(c, "could not generate random data")
return nil, err
}
return ret, nil
}
// activateExecution is essentially a direct map to ActivateExecution, except
// that it transforms the runner's `auth` from a needs-activation authentication
// to an activated one.
//
// This returns an OK bool, plus an error. If the bool is true, it means that
// we're now authenticated. This will never return true with a non-nil error.
//
// The return value (false, nil) means the runner should quit peacefully
// (no-error), and can occur if this execution payload is retried (e.g. by
// taskqueue).
func (r *runner) activateExecution() (ok bool, err error) {
newToken, err := getRandom(r.c, 128)
if err != nil {
return false, err
}
_, err = r.dmc.ActivateExecution(r.c, &dm.ActivateExecutionReq{
Auth: r.auth,
ExecutionToken: newToken,
})
if err != nil {
logging.WithError(err).Errorf(r.c, "could not activate execution")
if grpcutil.Code(err) == codes.Unauthenticated {
// means we got retried, so invalidate ExecutionKey and quit peacefully.
return false, nil
}
return false, err
}
r.auth.Token = newToken
logging.Fields{"token": r.auth.Token}.Infof(r.c, "activated execution")
return true, nil
}
// doReturnStage implements the FinishAttempt portion of the DM Attempt
// lifecycle. This sets the final result for the Attempt, and will prevent any
// further executions of this Attempt. This is analogous to the recipe returning
// a final result.
func (r *runner) doReturnStage(stg *jobsim.ReturnStage) error {
retval := int64(0)
if stg != nil {
retval += stg.Retval
}
if r.state != nil {
retval += r.state.Sum
}
_, err := r.dmc.FinishAttempt(r.c, &dm.FinishAttemptReq{
Auth: r.auth,
Data: executionResult(true, retval, google.TimeFromProto(stg.GetExpiration())),
})
if err != nil {
logging.WithError(err).Warningf(r.c, "got error on FinishAttempt")
}
return err
}
// doDeps will call EnsureGraphData to make sure the provided
// quests+attempts+deps exist. DM will see if they're done or not, and let us
// know if we should stop to be retried later.
func (r *runner) doDeps(seed int64, stg *jobsim.DepsStage, cfgName string) (stop bool, err error) {
logging.Infof(r.c, "doing deps")
stg.ExpandShards()
req := &dm.EnsureGraphDataReq{}
req.ForExecution = r.auth
req.Include = &dm.EnsureGraphDataReq_Include{
Attempt: &dm.EnsureGraphDataReq_Include_Options{Result: true}}
var (
rnd = rand.New(rand.NewSource(seed))
maxAttemptNum = map[string]uint32{}
currentAttempt = map[string]uint32{}
)
for i, dep := range stg.Deps {
dep.Seed(rnd, seed, int64(i))
desc := &dm.Quest_Desc{}
desc.DistributorConfigName = cfgName
desc.Parameters, err = (&jsonpb.Marshaler{}).MarshalToString(dep.Phrase)
if err != nil {
panic(err)
}
if err := desc.Normalize(); err != nil {
panic(err)
}
req.Quest = append(req.Quest, desc)
qid := desc.QuestID()
currentAttempt[qid] = 1
switch x := dep.AttemptStrategy.(type) {
case *jobsim.Dependency_Attempts:
req.QuestAttempt = append(req.QuestAttempt, &dm.AttemptList_Nums{Nums: x.Attempts.ToSlice()})
case *jobsim.Dependency_Retries:
maxAttemptNum[qid] = x.Retries + 1
req.QuestAttempt = append(req.QuestAttempt, &dm.AttemptList_Nums{Nums: []uint32{1}})
}
}
rsp, err := r.dmc.EnsureGraphData(r.c, req)
if err != nil {
logging.Fields{"err": err}.Infof(r.c, "error after first rpc")
return
}
if stop = rsp.ShouldHalt; stop {
logging.Infof(r.c, "halt after first rpc")
return
}
sum := int64(0)
for {
req.Quest = nil
req.QuestAttempt = nil
req.RawAttempts = dm.NewAttemptList(nil)
// TODO(iannucci): we could use the state api to remember that we did
// retries on the previous execution. The recipe engine should probably do
// this for recipes, but for this simple simulator, we'll make multiple
// RPCs.
for qid, q := range rsp.Result.Quests {
logging.Fields{"qid": qid}.Infof(r.c, "grabbing result")
var tr *jobsim.Result
if currentAttempt[qid] == 0 {
// rsp contains bonus data
continue
}
logging.Fields{"atmpt": q.Attempts[currentAttempt[qid]].Data}.Infof(r.c, "grabbing payload")
payload := q.Attempts[currentAttempt[qid]].Data.GetFinished().Data
tr, err = executionResultFromJSON(payload)
if err != nil {
return
}
logging.Fields{"qid": qid, "tr": tr}.Infof(r.c, "decoded TaskResult")
if tr.Success {
sum += tr.Value
} else {
current := currentAttempt[qid]
if maxAttempt := maxAttemptNum[qid]; maxAttempt > current {
if current > maxAttempt {
logging.Fields{
"qid": qid, "current": current, "max": maxAttempt,
}.Infof(r.c, "too many retries")
return r.doFailure(seed, 1.1) // guarantee failure
}
logging.Fields{
"qid": qid, "current": current, "max": maxAttempt,
}.Infof(r.c, "retrying")
current++
currentAttempt[qid] = current
req.RawAttempts.To[qid].Nums = []uint32{current}
} else {
logging.Fields{
"qid": qid,
}.Infof(r.c, "no retries allowed")
return r.doFailure(seed, 1.1) // guarantee failure
}
}
}
if req.Quest != nil {
rsp, err = r.dmc.EnsureGraphData(r.c, req)
if err != nil {
logging.Fields{"err": err}.Infof(r.c, "err after Nth rpc")
return
}
if stop = rsp.ShouldHalt; stop {
logging.Infof(r.c, "halt after Nth rpc")
return
}
} else {
r.state.Sum += sum
logging.Fields{"sum": sum}.Infof(r.c, "added and advancing")
return
}
}
}
func (r *runner) doStall(stg *jobsim.StallStage) {
dur := google.DurationFromProto(stg.Delay)
logging.Fields{"duration": dur}.Infof(r.c, "stalling")
clock.Sleep(r.c, dur)
}
// doFailure implements a jobsim task having some flakiness (e.g. it will
// non-deterministically select to fail at this stage). If it fails, this will
// cause the overall application-level Attempt Result to indicate failure to any
// other Attempts which depend on this one.
//
// This is analogous to a recipe running a flaky test and then setting its
// result to be failure. As far as DM is concerned, the recipe ran to completion
// (e.g. FINISHED). Other recipes may decide to take some action at this stage
// (e.g. issue new Attempts of this same Quest).
func (r *runner) doFailure(seed int64, chance float32) (stop bool, err error) {
failed := chance >= 1.0
if !failed {
logging.Fields{"chance": chance}.Infof(r.c, "failing with probability")
rndVal := rand.New(rand.NewSource(seed)).Float32()
failed = rndVal <= chance
logging.Fields{"rndVal": rndVal}.Infof(r.c, "failed")
} else {
logging.Infof(r.c, "failed (guaranteed)")
}
if !failed {
logging.Infof(r.c, "passed")
return
}
stop = true
_, err = r.dmc.FinishAttempt(r.c, &dm.FinishAttemptReq{
Auth: r.auth,
Data: executionResult(false, 0, time.Time{}),
})
if err != nil {
logging.WithError(err).Warningf(r.c, "got error on FinishAttempt")
}
return
}
// runJob is analogous to a single Execution of a recipe. It will:
// * Activate itself with DM.
// * Inspect its previous State to determine where it left off on the previous
// execution.
// * Execute stages (incrementing the Stage counter in the state, and/or
// accumulating into Sum) until it hits a stop condition:
// * depending on incomplete Attempts
// * arriving at a final result
//
// If it hits some underlying error it will return that error, and expect to be
// retried by DM.
func runJob(c context.Context, host string, state *state, job *jobsim.Phrase, auth *dm.Execution_Auth, cfgName string) error {
tr, err := authlib.GetRPCTransport(c, authlib.NoAuth)
if err != nil {
return err
}
pcli := &prpc.Client{
C: &http.Client{Transport: tr},
Host: host,
Options: prpc.DefaultOptions(),
}
pcli.Options.Insecure = lhttp.IsLocalHost(host)
dmc := dm.NewDepsPRPCClient(pcli)
r := runner{c, auth, dmc, state}
ok, err := r.activateExecution()
if !ok || err != nil {
return err
}
stop := false
for ; r.state.Stage < len(job.Stages); r.state.Stage++ {
switch stg := job.Stages[r.state.Stage].StageType.(type) {
case *jobsim.Stage_Deps:
stop, err = r.doDeps(job.Seed, stg.Deps, cfgName)
case *jobsim.Stage_Stall:
r.doStall(stg.Stall)
case *jobsim.Stage_Failure:
stop, err = r.doFailure(job.Seed, stg.Failure.Chance)
default:
err = fmt.Errorf("don't know how to handle StageType: %T", stg)
}
if stop || err != nil {
return err
}
}
return r.doReturnStage(job.ReturnStage)
}