blob: e67b75db51531146f49e8af8581e6141170aeff2 [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Jobsim client is a self-contained binary that implements various toy job
// algorithms for use in testing DM with live distributors (like swarming).
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/maruel/subcommands"
"go.chromium.org/luci/common/cli"
"go.chromium.org/luci/common/data/rand/cryptorand"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/logging/gologger"
"go.chromium.org/luci/common/retry"
dm "go.chromium.org/luci/dm/api/service/v1"
"go.chromium.org/luci/grpc/prpc"
"go.chromium.org/luci/lucictx"
)
type cmdRun struct {
subcommands.CommandRunBase
context.Context
exAuth *dm.Execution_Auth
questDescPath string
questDesc *dm.Quest_Desc
dmHost string
client dm.DepsClient
cmd *subcommands.Command
}
func (r *cmdRun) registerOptions() {
r.Flags.StringVar(&r.questDescPath, "quest-desc-path", "",
"The path to a JSONPB encoded dm.Quest.Data.Desc message of how this client"+
" was invoked")
r.Flags.StringVar(&r.dmHost, "dm-host", "", "The hostname of the DM service")
}
func loadJSONPB(c context.Context, flavor, path string, msg proto.Message) {
if path == "" {
logging.Errorf(c, "processing %q: required parameter", flavor)
os.Exit(-1)
}
fil, err := os.Open(path)
if err != nil {
logging.Errorf(c, "processing %q: %s", flavor, err)
os.Exit(-1)
}
defer fil.Close()
if err := jsonpb.Unmarshal(fil, msg); err != nil {
panic(err)
}
}
func (r *cmdRun) start(a subcommands.Application, cr subcommands.CommandRun, env subcommands.Env, c *subcommands.Command) {
r.cmd = c
r.Context = cli.GetContext(a, cr, env)
r.exAuth = &dm.Execution_Auth{}
s := lucictx.GetSwarming(r.Context)
if s == nil || s.SecretBytes == nil {
panic("LUCI_CONTEXT['swarming']['secret_bytes'] is missing")
}
if err := jsonpb.Unmarshal(bytes.NewReader(s.SecretBytes), r.exAuth); err != nil {
panic(fmt.Errorf("while decoding execution auth: %s", err))
}
r.questDesc = &dm.Quest_Desc{}
loadJSONPB(r, "quest-desc-path", r.questDescPath, r.questDesc)
// initialize rpc client
r.client = dm.NewDepsPRPCClient(&prpc.Client{
Host: r.dmHost,
Options: &prpc.Options{
Retry: retry.Default,
},
})
key := make([]byte, 32)
_, err := cryptorand.Read(r, key)
if err != nil {
panic(err)
}
req := &dm.ActivateExecutionReq{
Auth: r.exAuth,
ExecutionToken: key,
}
err = retry.Retry(r, retry.Default, func() error {
_, err := r.client.ActivateExecution(r, req)
return err
}, retry.LogCallback(r, "ActivateExecution"))
if err != nil {
panic(err)
}
// activated with key!
r.exAuth.Token = key
}
func (r *cmdRun) depOn(jobProperties ...interface{}) (data []string, stop bool) {
req := &dm.EnsureGraphDataReq{
ForExecution: r.exAuth,
Quest: make([]*dm.Quest_Desc, len(jobProperties)),
QuestAttempt: make([]*dm.AttemptList_Nums, len(jobProperties)),
Include: &dm.EnsureGraphDataReq_Include{
Attempt: &dm.EnsureGraphDataReq_Include_Options{Result: true},
},
}
for i, jProps := range jobProperties {
data, err := json.Marshal(jProps)
if err != nil {
panic(err)
}
req.Quest[i] = proto.Clone(r.questDesc).(*dm.Quest_Desc)
req.Quest[i].Parameters = string(data)
req.QuestAttempt[i] = &dm.AttemptList_Nums{Nums: []uint32{1}}
}
rsp := (*dm.EnsureGraphDataRsp)(nil)
err := retry.Retry(r, retry.Default, func() (err error) {
rsp, err = r.client.EnsureGraphData(r, req)
return
}, retry.LogCallback(r, "EnsureGraphData"))
if err != nil {
panic(err)
}
if rsp.ShouldHalt {
logging.Infof(r, "got ShouldHalt")
return nil, true
}
ret := make([]string, len(jobProperties))
for i, qid := range rsp.QuestIds {
ret[i] = rsp.Result.Quests[qid.Id].Attempts[1].Data.GetFinished().Data.Object
}
return ret, false
}
func (r *cmdRun) finish(data interface{}) int {
encData, err := json.Marshal(data)
if err != nil {
panic(err)
}
err = retry.Retry(r, retry.Default, func() error {
_, err := r.client.FinishAttempt(r, &dm.FinishAttemptReq{
Auth: r.exAuth, Data: dm.NewJsonResult(string(encData)),
})
return err
}, retry.LogCallback(r, "FinishAttempt"))
if err != nil {
panic(err)
}
return 0
}
var application = &cli.Application{
Name: "jobsim_client",
Title: "Executable Job simulator client",
Context: func(ctx context.Context) context.Context {
return gologger.StdConfig.Use(ctx)
},
Commands: []*subcommands.Command{
cmdEditDistance,
subcommands.CmdHelp,
},
}
func main() {
os.Exit(subcommands.Run(application, os.Args[1:]))
}