blob: 2422ccb0e5a54e0c174ba73ff6f7395bf69f3eba [file] [log] [blame]
// Copyright 2023 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package main
import (
"context"
"fmt"
"log"
"os"
"strings"
"time"
"go.chromium.org/chromiumos/test/publish/cmd/publishserver/storage"
"go.chromium.org/chromiumos/test/util/common"
"cloud.google.com/go/pubsub"
"github.com/google/uuid"
"google.golang.org/api/option"
"google.golang.org/grpc"
"go.chromium.org/chromiumos/config/go/test/api"
)
const (
perfCollectorInterval = time.Second * 10
healthCheckInterval = time.Second * 30
waitTimout = time.Second * 30
perfInitPollInterval = time.Millisecond * 100
perfInitWaitTimeout = time.Second * 1
)
type perfServer struct {
exec execUtil
logger *log.Logger
dut *api.DutServiceClient
storageClient storageWriter
healthMonitor *monitor
perfCollector *perfCollector
perfCmd *perfCmd
stopMonitor chan bool
workDir string
}
type dutFileFetcher struct{}
func (dutFileFetcher) Copy(ctx context.Context, file string, dut api.DutServiceClient) (string, error) {
return common.GetFile(ctx, file, dut)
}
type dutExecUtil struct{}
func (dutExecUtil) RunCmd(ctx context.Context, cmd string, args []string, dut api.DutServiceClient) (string, error) {
return common.RunCmd(ctx, cmd, args, dut)
}
func (dutExecUtil) RunCmdAsync(ctx context.Context, cmd string, args []string, dut api.DutServiceClient) error {
log.Printf("Run cmd: %s, %s\n", cmd, args)
req := api.ExecCommandRequest{
Command: cmd,
Args: args,
}
// TODO(b:326288495) Use appropriate Async API when its available.
// Commands like perf-record are non-terminating in nature. They will
// block the session and keep on running until a kill signal is given.
// For these commands, there is no point in calling Recv() on ExecCommand's response.
// Therefore just ignore the retun value of the ExecCommand.
if _, err := dut.ExecCommand(ctx, &req); err != nil {
log.Printf("Run cmd FAILED: %s\n", err)
return err
}
return nil
}
func NewServer(ctx context.Context, logger *log.Logger, workDir string) *perfServer {
return &perfServer{
logger: logger,
workDir: workDir,
}
}
func waitForTermination(ch chan bool, processName string) error {
log.Printf("\nWaiting for the %s to stop.", processName)
for {
select {
case val := <-ch:
if val {
log.Printf("\n%s terminated.", processName)
return nil
} else {
return fmt.Errorf("did not expect false value for terminate %s", processName)
}
case <-time.After(1 * time.Second):
log.Print(".")
case <-time.After(waitTimout):
return fmt.Errorf("timedout while waiting for %s to stop", processName)
}
}
}
func (ps *perfServer) cleanPerfDir(ctx context.Context) error {
_, err := ps.exec.RunCmd(ctx, rm, []string{"-rf", outputPerfDir}, *ps.dut)
if err != nil && strings.Contains(err.Error(), noDirMsg) {
return nil
}
return err
}
func (ps *perfServer) setupDUTConnection(dutIP string) (api.DutServiceClient, error) {
conn, err := grpc.Dial(dutIP, grpc.WithInsecure())
if err != nil {
return nil, err
}
defer conn.Close()
return api.NewDutServiceClient(conn), nil
}
func (ps *perfServer) setupPublisher(ctx context.Context, topic, project, credsFile string) (*common.Publisher, error) {
pubsubClient, err := pubsub.NewClient(ctx, project, option.WithCredentialsFile(credsFile))
if err != nil {
return nil, err
}
return &common.Publisher{
Topic: pubsubClient.Topic(topic),
}, nil
}
// setupPerfCollector sets up all the necessary plumbing before we can actually start perf.
// This includes setting up the clients so that we can store and publish data.
// Additionally, this makes sure we have an active connection to the DUT.
func (ps *perfServer) setupPerfCollector(ctx context.Context, msg *api.GenericMessage) (*perfCollector, error) {
args := msg.GetValues()
credentialsFile := string(args["authTokenFilePath"].GetValue())
sc, err := storage.NewStorageClientWithCredsFile(ctx, credentialsFile)
if err != nil {
return nil, err
}
project := string(args["projectId"].GetValue())
topic := string(args["topicId"].GetValue())
pub, err := ps.setupPublisher(ctx, topic, project, credentialsFile)
if err != nil {
return nil, fmt.Errorf("could not setup publisher for project: %s and topic: %s due to err: %w", project, topic, err)
}
dutIP := string(args["dutIP"].GetValue())
dut, err := ps.setupDUTConnection(dutIP)
if err != nil {
return nil, fmt.Errorf("could not set up connection to DUT at IP %s: %w", dutIP, err)
}
exec := dutExecUtil{}
// TODO(b/327285753): Throw error if DUT's version does not match active version.
pc := perfCollector{
fileFetcher: dutFileFetcher{},
exec: exec,
dut: dut,
perfCollectorInterval: perfCollectorInterval,
tarUtil: common.Zip{},
storage: sc,
bucket: string(args["bucket"].GetValue()),
publisher: pub,
stopCollector: make(chan bool),
terminated: make(chan bool),
board: string(args["board"].GetValue()),
version: string(args["version"].GetValue()),
snapshotVersion: string(args["snapshotVersion"].GetValue()),
}
return &pc, nil
}
func (ps *perfServer) setupServer(ctx context.Context, msg *api.GenericMessage) error {
pc, err := ps.setupPerfCollector(ctx, msg)
if err != nil {
return fmt.Errorf("could not set up perf collector: %w", err)
}
ps.perfCollector = pc
ps.storageClient = pc.storage
ps.stopMonitor = make(chan bool)
ps.exec = pc.exec
ps.dut = &pc.dut
ps.perfCmd = &perfCmd{
dut: *ps.dut,
exec: ps.exec,
perfInitPollInterval: perfInitPollInterval,
perfInitWaitTimeout: perfInitWaitTimeout,
}
ps.healthMonitor = &monitor{
exec: ps.exec,
dut: *ps.dut,
healthCheckInterval: healthCheckInterval,
perf: ps.perfCmd,
stopMonitoring: make(chan bool),
terminated: make(chan bool),
}
return nil
}
// Start is called by TestRunner before cros-test
func (ps *perfServer) Start(ctx context.Context, req *api.GenericStartRequest) (*api.GenericStartResponse, error) {
log.Println("Received start request.")
if fileInfo, err := os.Stat(ps.workDir); os.IsNotExist(err) || !fileInfo.IsDir() {
return nil, fmt.Errorf("%s does not exist or is not a directory", ps.workDir)
}
// Setup perf collector so that we can start perf on DUT.
if err := ps.setupServer(ctx, req.GetMessage()); err != nil {
return nil, fmt.Errorf("unable to setup perf server: %w", err)
}
// In rare event, previous runs might miss terminating perf and cleaning perf dir.
// Lets do that before starting a new perf cmd.
if err := ps.perfCmd.killAll(ctx); err != nil {
return nil, err
}
if err := ps.cleanPerfDir(ctx); err != nil {
return nil, err
}
pid, err := ps.perfCmd.startPerf(ctx)
if err != nil {
return nil, err
}
go ps.healthMonitor.start(pid)
go ps.perfCollector.start(outputPerfDir, ps.workDir)
return &api.GenericStartResponse{}, nil
}
// Run is called by TestRunner after calling Start, but before cros-test
func (ps *perfServer) Run(ctx context.Context, _ *api.GenericRunRequest) (*api.GenericRunResponse, error) {
return &api.GenericRunResponse{}, nil
}
// Stop is called by TestRunner after cros-test
func (ps *perfServer) Stop(ctx context.Context, _ *api.GenericStopRequest) (*api.GenericStopResponse, error) {
log.Println("Received stop request.")
// Stop monitoring.
ps.healthMonitor.stopMonitoring <- true
if err := waitForTermination(ps.healthMonitor.terminated, "Health Monitor"); err != nil {
return nil, err
}
// Stop the Perf collector.
ps.perfCollector.stopCollector <- true
if err := waitForTermination(ps.perfCollector.terminated, "Perf Collector"); err != nil {
return nil, err
}
// Kill the perf process.
if err := ps.perfCmd.killAll(ctx); err != nil {
return nil, err
}
// Previous step(killAll) resulted in new perf.data dumped on DUT.
// Lets process them as well.
if err := ps.perfCollector.gatherPerf(ctx, outputPerfDir, uuid.New().String(), ps.workDir); err != nil {
return nil, err
}
if err := ps.cleanPerfDir(ctx); err != nil {
return nil, err
}
return &api.GenericStopResponse{}, nil
}