blob: 5e859f5a75be3005fef0d97d78ea12192993483c [file] [log] [blame]
// Copyright 2023 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package workflow
import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"slices"
"strings"
"go.chromium.org/luci/cipkg/base/actions"
"go.chromium.org/luci/cipkg/base/generators"
"go.chromium.org/luci/cipkg/core"
"go.chromium.org/luci/common/data/stringset"
"go.chromium.org/luci/common/exec"
"go.chromium.org/luci/common/logging"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/reflect/protoreflect"
)
type PreExpandHook func(ctx context.Context, pkg actions.Package) error
type PreExecuteHook func(ctx context.Context, pkg actions.Package) (context.Context, error)
type PostExecuteHook func(ctx context.Context, pkg actions.Package, execErr error) error
// ExecutionConfig includes all configs for Executor.
type ExecutionConfig struct {
OutputDir string
WorkingDir string
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
}
// Executor is the function interface for executing the provided derivation.
// This can be subject to the platform or using sandbox for isolation.
type Executor func(ctx context.Context, cfg *ExecutionConfig, drv *core.Derivation) error
// Execute is the default Executor which runs the command presented by the
// derivation.
func Execute(ctx context.Context, cfg *ExecutionConfig, drv *core.Derivation) error {
cmd := exec.CommandContext(ctx, drv.Args[0], drv.Args[1:]...)
cmd.Path = drv.Args[0]
cmd.Dir = cfg.WorkingDir
cmd.Stdin = cfg.Stdin
cmd.Stdout = cfg.Stdout
cmd.Stderr = cfg.Stderr
cmd.Env = slices.Clone(drv.Env)
// Set out last to make sure it won't be overridden.
cmd.Env = append(cmd.Env, "out="+cfg.OutputDir)
return cmd.Run()
}
// PackageExecutor is the executor for package to be built by running executor.
// It will ensure all package's dependencies become available in storage when
// the package is executed.
type PackageExecutor struct {
// Refs are all available packages being referenced by executor. After a
// package being built, it will also be added to refs.
// Mapping derivation id to package.
refs map[string]actions.Package
preExpandFn PreExpandHook
preExecFn PreExecuteHook
postExecFn PostExecuteHook
execFn Executor
// tempDir is the temporary directory used as the working directory during
// execution. Since artifacts should be installed into output directory at the
// end of the execution, we can use the path of temporary directory to detect
// if any path burned into outputs may affect portability or being potential
// subjects for runtime rewrite.
tempDir string
}
// NewPackageExecutor creates a package executor to make packages available.
// All hooks and execFn are optional.
// - preExpandFn can be provided to e.g. fetch package from remote cache.
// - preExecFn can be provided to e.g. setup execution environment.
// - postExecFn can be provided to e.g. cleanup execution environment.
// - If execFn is nil, builder.Execute will be used.
// tempDir will be used as the working directory during execution and can be
// removed by caller after execution.
func NewPackageExecutor(tempDir string, preExpandFn PreExpandHook, preExecFn PreExecuteHook, postExecFn PostExecuteHook, execFn Executor) *PackageExecutor {
if execFn == nil {
execFn = Execute
}
return &PackageExecutor{
refs: make(map[string]actions.Package),
preExpandFn: preExpandFn,
preExecFn: preExecFn,
postExecFn: postExecFn,
execFn: execFn,
tempDir: tempDir,
}
}
// Expand expands a package with all its dependencies to a flattened slice of
// packages for execution. If preExpandFn is not empty, it will be executed to
// possibly make the package available so its dependencies don't need to be
// expanded.
// preExpandFn will be invoked on the package before on it's dependencies.
func (p *PackageExecutor) Expand(ctx context.Context, pkg actions.Package) ([]actions.Package, error) {
if _, ok := p.refs[pkg.DerivationID]; ok {
return []actions.Package{pkg}, nil
}
// PreExpandHook is promised to be executed on the package before all its
// dependencies.
if p.preExpandFn != nil {
if err := p.preExpandFn(ctx, pkg); err != nil {
return nil, fmt.Errorf("failed to run preExpand hook for the package: %s: %w", pkg.ActionID, err)
}
}
var newPkgs []actions.Package
// Expand runtime dependencies.
for _, d := range pkg.RuntimeDependencies {
pkgs, err := p.Expand(ctx, d)
if err != nil {
return nil, err
}
newPkgs = append(newPkgs, pkgs...)
}
switch err := pkg.Handler.IncRef(); {
case errors.Is(err, core.ErrPackageNotExist):
for _, d := range pkg.BuildDependencies {
pkgs, err := p.Expand(ctx, d)
if err != nil {
return nil, err
}
newPkgs = append(newPkgs, pkgs...)
}
newPkgs = append(newPkgs, pkg)
return newPkgs, nil
case err == nil:
p.refs[pkg.DerivationID] = pkg
newPkgs = append(newPkgs, pkg)
return newPkgs, nil
default:
return nil, err
}
}
// Execute executes packages' derivations and keeps the reference to the
// package. PreExecHook and PostExecHook, if not nil, are invoked before and
// after the execution. If PreExecHook returns error, Execute will be skipped
// but PostExecHook should be invoked with the error.
func (p *PackageExecutor) Execute(ctx context.Context, pkg actions.Package) error {
var errs error
if p.preExecFn != nil {
ctx, errs = p.preExecFn(ctx, pkg)
}
if errs == nil {
errs = errors.Join(errs, p.execute(ctx, pkg))
}
if p.postExecFn != nil {
errs = errors.Join(errs, p.postExecFn(ctx, pkg, errs))
}
return errs
}
func (p *PackageExecutor) execute(ctx context.Context, pkg actions.Package) error {
// Skip execution if we have referenced the derivation outputs.
if _, ok := p.refs[pkg.DerivationID]; ok {
return nil
}
if err := pkg.Handler.Build(func() error {
if err := dumpProto(pkg.Action, pkg.Handler.LoggingDirectory(), "action.pb"); err != nil {
return err
}
if err := dumpProto(pkg.Derivation, pkg.Handler.LoggingDirectory(), "derivation.pb"); err != nil {
return err
}
// Ensure all build dependencies are referenced by the PackageExecutor.
for _, dep := range pkg.BuildDependencies {
if _, ok := p.refs[dep.DerivationID]; !ok {
return fmt.Errorf("dependency not available: %s", dep.DerivationID)
}
}
logging.Infof(ctx, "build package %s", pkg.DerivationID)
d, err := os.MkdirTemp(p.tempDir, fmt.Sprintf("%s-", pkg.DerivationID))
if err != nil {
return err
}
var out strings.Builder
cfg := &ExecutionConfig{
OutputDir: pkg.Handler.OutputDirectory(),
WorkingDir: d,
Stdout: &out,
Stderr: &out,
}
if err := p.execFn(ctx, cfg, pkg.Derivation); err != nil {
logging.Errorf(ctx, "\n%s\n", out.String())
return err
}
logging.Debugf(ctx, "\n%s", out.String())
return nil
}); err != nil {
return fmt.Errorf("failed to build package: %s: %w", pkg.DerivationID, err)
}
if err := pkg.Handler.IncRef(); err != nil {
return fmt.Errorf("failed to reference the package: %s: %w", pkg.DerivationID, err)
}
p.refs[pkg.DerivationID] = pkg
return nil
}
// Release releases all packages referenced by the PackageExecutor.
func (p *PackageExecutor) Release() error {
var errs error
for id, pkg := range p.refs {
if err := pkg.Handler.DecRef(); err != nil {
errs = errors.Join(errs, err)
continue
}
delete(p.refs, id)
}
return errs
}
func dumpProto(m protoreflect.ProtoMessage, dir, name string) error {
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
return err
}
f, err := os.Create(filepath.Join(dir, name))
if err != nil {
return err
}
defer f.Close()
b, err := protojson.MarshalOptions{Multiline: true}.Marshal(m)
if err != nil {
return err
}
if _, err := f.Write(b); err != nil {
return err
}
return nil
}
type Builder struct {
platforms generators.Platforms
packages core.PackageManager
processor *actions.ActionProcessor
}
// NewBuilder creates a Builder to manage the standard build workflow for
// actions.Package by converting generators.Generator to actions.Package and
// execute the *core.Derivation to make the package available locally.
func NewBuilder(plats generators.Platforms, pm core.PackageManager, ap *actions.ActionProcessor) *Builder {
return &Builder{
platforms: plats,
packages: pm,
processor: ap,
}
}
// Build triggers the standard workflow converting generators.Generator to
// actions.Package which are available in the storage.
func (b *Builder) Build(ctx context.Context, pe *PackageExecutor, g generators.Generator) (actions.Package, error) {
pkgs, err := b.BuildAll(ctx, pe, []generators.Generator{g})
if err != nil {
return actions.Package{}, err
}
return pkgs[0], nil
}
// BuildAll triggers the standard workflow converting []generators.Generator to
// []actions.Package which are available in the storage.
func (b *Builder) BuildAll(ctx context.Context, pe *PackageExecutor, gs []generators.Generator) ([]actions.Package, error) {
pkgs, err := b.GeneratePackages(ctx, gs)
if err != nil {
return nil, err
}
if err := b.BuildPackages(ctx, pe, pkgs, false); err != nil {
return nil, err
}
return pkgs, nil
}
// GeneratePackages triggers the standard workflow converting
// []generators.Generator to []actions.Package without building them.
func (b *Builder) GeneratePackages(ctx context.Context, gs []generators.Generator) ([]actions.Package, error) {
// generators.Generator -> *core.Action
var acts []*core.Action
for _, g := range gs {
a, err := g.Generate(ctx, b.platforms)
if err != nil {
return nil, err
}
acts = append(acts, a)
}
// *core.Action -> actions.Package
var pkgs []actions.Package
for _, a := range acts {
pkg, err := b.processor.Process(b.platforms.Build.String(), b.packages, a)
if err != nil {
return nil, err
}
pkgs = append(pkgs, pkg)
}
return pkgs, nil
}
// BuildPackages builds the packages and make all packages available in the
// storage. All packages will be dereferenced after the build. Leave
// it to the user to decide those of which packages will be used at the
// runtime.
// There may be a chance that a package is removed during the short amount of
// time. But since IncRef will update the last accessed timestamp, cleaning up
// with any reasonable time window (e.g. 1 hour) is highly unlikely to remove
// packages just dereferenced and may be IncRef within seconds. And even if it's
// happened, the caller can retry the process.
func (b *Builder) BuildPackages(ctx context.Context, pe *PackageExecutor, pkgs []actions.Package, continueOnExecError bool) error {
var newPkgs []actions.Package
defer func() { _ = pe.Release() }()
var errs error
added := stringset.New(len(pkgs))
for _, pkg := range pkgs {
expands, err := pe.Expand(ctx, pkg)
if err != nil {
return err
}
for _, expand := range expands {
if added.Add(expand.ActionID) {
newPkgs = append(newPkgs, expand)
}
}
}
executed := stringset.New(len(newPkgs))
for _, pkg := range newPkgs {
if !executed.Add(pkg.DerivationID) {
continue
}
if err := pe.Execute(ctx, pkg); err != nil {
if continueOnExecError {
errs = errors.Join(errs, err)
continue
} else {
return err
}
}
}
return errs
}