blob: eee71226b54ff62750b5abdf93ac09c71f3e49f0 [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package cipd implements client side of Chrome Infra Package Deployer.
//
// Binary package file format (in free form representation):
// <binary package> := <zipped data>
// <zipped data> := DeterministicZip(<all input files> + <manifest json>)
// <manifest json> := File{
// name: ".cipdpkg/manifest.json",
// data: JSON({
// "FormatVersion": "1",
// "PackageName": <name of the package>
// }),
// }
// DeterministicZip = zip archive with deterministic ordering of files and stripped timestamps
//
// Main package data (<zipped data> above) is deterministic, meaning its content
// depends only on inputs used to built it (byte to byte): contents and names of
// all files added to the package (plus 'executable' file mode bit) and
// a package name (and all other data in the manifest).
//
// Binary package data MUST NOT depend on a timestamp, hostname of machine that
// built it, revision of the source code it was built from, etc. All that
// information will be distributed as a separate metadata packet associated with
// the package when it gets uploaded to the server.
//
// TODO: expand more when there's server-side package data model (labels
// and stuff).
package cipd
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"path/filepath"
"sort"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/proto/google"
"go.chromium.org/luci/common/retry"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/grpc/prpc"
api "go.chromium.org/luci/cipd/api/cipd/v1"
"go.chromium.org/luci/cipd/client/cipd/deployer"
"go.chromium.org/luci/cipd/client/cipd/digests"
"go.chromium.org/luci/cipd/client/cipd/ensure"
"go.chromium.org/luci/cipd/client/cipd/fs"
"go.chromium.org/luci/cipd/client/cipd/internal"
"go.chromium.org/luci/cipd/client/cipd/pkg"
"go.chromium.org/luci/cipd/client/cipd/platform"
"go.chromium.org/luci/cipd/client/cipd/reader"
"go.chromium.org/luci/cipd/client/cipd/template"
"go.chromium.org/luci/cipd/common"
"go.chromium.org/luci/cipd/version"
)
const (
// CASFinalizationTimeout is how long to wait for CAS service to finalize
// the upload in RegisterInstance.
CASFinalizationTimeout = 5 * time.Minute
// SetRefTimeout is how long to wait for an instance to be processed when
// setting a ref in SetRefWhenReady.
SetRefTimeout = 3 * time.Minute
// TagAttachTimeout is how long to wait for an instance to be processed when
// attaching tags in AttachTagsWhenReady.
TagAttachTimeout = 3 * time.Minute
)
// Environment variable definitions
const (
EnvCacheDir = "CIPD_CACHE_DIR"
EnvHTTPUserAgentPrefix = "CIPD_HTTP_USER_AGENT_PREFIX"
)
var (
// ErrFinalizationTimeout is returned if CAS service can not finalize upload
// fast enough.
ErrFinalizationTimeout = errors.New("timeout while waiting for CAS service to finalize the upload", transient.Tag)
// ErrBadUpload is returned when a package file is uploaded, but servers asks
// us to upload it again.
ErrBadUpload = errors.New("package file is uploaded, but servers asks us to upload it again", transient.Tag)
// ErrProcessingTimeout is returned by SetRefWhenReady or AttachTagsWhenReady
// if the instance processing on the backend takes longer than expected. Refs
// and tags can be attached only to processed instances.
ErrProcessingTimeout = errors.New("timeout while waiting for the instance to become ready", transient.Tag)
// ErrDownloadError is returned by FetchInstance on download errors.
ErrDownloadError = errors.New("failed to download the package file after multiple attempts", transient.Tag)
// ErrUploadError is returned by RegisterInstance on upload errors.
ErrUploadError = errors.New("failed to upload the package file after multiple attempts", transient.Tag)
// ErrEnsurePackagesFailed is returned by EnsurePackages if something is not
// right.
ErrEnsurePackagesFailed = errors.New("failed to update packages, see the log")
)
var (
// ClientPackage is a package with the CIPD client. Used during self-update.
ClientPackage = "infra/tools/cipd/${platform}"
// UserAgent is HTTP user agent string for CIPD client.
UserAgent = "cipd 2.2.21"
)
func init() {
ver, err := version.GetStartupVersion()
if err != nil || ver.InstanceID == "" {
return
}
UserAgent += fmt.Sprintf(" (%s@%s)", ver.PackageName, ver.InstanceID)
}
// UploadSession describes open CAS upload session.
type UploadSession struct {
// ID identifies upload session in the backend.
ID string
// URL is where to upload the data to.
URL string
}
// DescribeInstanceOpts is passed to DescribeInstance.
type DescribeInstanceOpts struct {
DescribeRefs bool // if true, will fetch all refs pointing to the instance
DescribeTags bool // if true, will fetch all tags attached to the instance
}
// Client provides high-level CIPD client interface. Thread safe.
type Client interface {
// BeginBatch makes the client enter into a "batch mode".
//
// In this mode various cleanup and cache updates, usually performed right
// away, are deferred until 'EndBatch' call.
//
// This is an optimization. Use it if you plan to call a bunch of Client
// methods in a short amount of time (parallel or sequentially).
//
// Batches can be nested.
BeginBatch(ctx context.Context)
// EndBatch ends a batch started with BeginBatch.
//
// EndBatch does various delayed maintenance tasks (like cache updates, trash
// cleanup and so on). This is best-effort operations, and thus this method
// doesn't return an errors.
//
// See also BeginBatch doc for more details.
EndBatch(ctx context.Context)
// FetchACL returns a list of PackageACL objects (parent paths first).
//
// Together they define the access control list for the given package prefix.
FetchACL(ctx context.Context, prefix string) ([]PackageACL, error)
// ModifyACL applies a set of PackageACLChanges to a package prefix ACL.
ModifyACL(ctx context.Context, prefix string, changes []PackageACLChange) error
// FetchRoles returns all roles the caller has in the given package prefix.
//
// Understands roles inheritance, e.g. if the caller is OWNER, the return
// value will list all roles implied by being an OWNER (e.g. READER, WRITER,
// ...).
FetchRoles(ctx context.Context, prefix string) ([]string, error)
// ResolveVersion converts an instance ID, a tag or a ref into a concrete Pin.
ResolveVersion(ctx context.Context, packageName, version string) (common.Pin, error)
// RegisterInstance makes the package instance available for clients.
//
// It uploads the instance to the storage, waits until the storage verifies
// its hash matches instance ID in 'pin', and then registers the package in
// the repository, making it discoverable.
//
// 'pin' here should match the package body, otherwise CIPD backend will
// reject the package. Either get it from builder.BuildInstance (when building
// a new package) or from reader.CalculatePin (when uploading an existing
// package file).
//
// 'timeout' specifies for how long to wait until the instance hash is
// verified by the storage backend. If 0, default CASFinalizationTimeout will
// be used.
RegisterInstance(ctx context.Context, pin common.Pin, body io.ReadSeeker, timeout time.Duration) error
// DescribeInstance returns information about a package instance.
//
// May also be used as a simple instance presence check, if opts is nil. If
// the request succeeds, then the instance exists.
DescribeInstance(ctx context.Context, pin common.Pin, opts *DescribeInstanceOpts) (*InstanceDescription, error)
// DescribeClient returns information about a CIPD client binary matching the
// given client package pin.
DescribeClient(ctx context.Context, pin common.Pin) (*ClientDescription, error)
// SetRefWhenReady moves a ref to point to a package instance.
SetRefWhenReady(ctx context.Context, ref string, pin common.Pin) error
// AttachTagsWhenReady attaches tags to an instance.
AttachTagsWhenReady(ctx context.Context, pin common.Pin, tags []string) error
// FetchPackageRefs returns information about all refs defined for a package.
//
// The returned list is sorted by modification timestamp (newest first).
FetchPackageRefs(ctx context.Context, packageName string) ([]RefInfo, error)
// FetchInstance downloads a package instance file from the repository.
//
// It verifies that the package hash matches pin.InstanceID.
//
// It returns an InstanceFile pointing to the raw package data. The caller
// must close it when done.
FetchInstance(ctx context.Context, pin common.Pin) (pkg.Source, error)
// FetchInstanceTo downloads a package instance file into the given writer.
//
// This is roughly the same as getting a reader with 'FetchInstance' and
// copying its data into the writer, except this call skips unnecessary temp
// files if the client is not using cache.
//
// It verifies that the package hash matches pin.InstanceID, but does it while
// writing to 'output', so expect to discard all data there if FetchInstanceTo
// returns an error.
FetchInstanceTo(ctx context.Context, pin common.Pin, output io.WriteSeeker) error
// FetchAndDeployInstance fetches the package instance and deploys it.
//
// Deploys to the given subdir under the site root (see ClientOptions.Root).
// It doesn't check whether the instance is already deployed.
FetchAndDeployInstance(ctx context.Context, subdir string, pin common.Pin) error
// ListPackages returns a list packages and prefixes under the given prefix.
ListPackages(ctx context.Context, prefix string, recursive, includeHidden bool) ([]string, error)
// SearchInstances finds instances of some package with all given tags.
//
// Returns their concrete Pins. If the package doesn't exist at all, returns
// empty slice and nil error.
SearchInstances(ctx context.Context, packageName string, tags []string) (common.PinSlice, error)
// ListInstances enumerates instances of a package, most recent first.
//
// Returns an object that can be used to fetch the listing, page by page.
ListInstances(ctx context.Context, packageName string) (InstanceEnumerator, error)
// EnsurePackages installs, removes and updates packages in the site root.
//
// Given a description of what packages (and versions) should be installed it
// will do all necessary actions to bring the state of the site root to the
// desired one.
//
// Depending on the paranoia mode, will optionally verify that all installed
// packages are installed correctly and will attempt to fix ones that are not.
// See the enum for more info.
//
// If dryRun is true, will just check for changes and return them in Actions
// struct, but won't actually perform them.
//
// If the update was only partially applied, returns both Actions and error.
EnsurePackages(ctx context.Context, pkgs common.PinSliceBySubdir, paranoia ParanoidMode, dryRun bool) (ActionMap, error)
// CheckDeployment looks at what is supposed to be installed and compares it
// to what is really installed.
//
// Returns an error if it can't even detect what is supposed to be installed.
// Inconsistencies are returned through the ActionMap.
CheckDeployment(ctx context.Context, paranoia ParanoidMode) (ActionMap, error)
// RepairDeployment attempts to repair a deployment in the site root if it
// appears to be broken (per given paranoia mode).
//
// Returns an action map of what it did.
RepairDeployment(ctx context.Context, paranoia ParanoidMode) (ActionMap, error)
}
// ClientOptions is passed to NewClient factory function.
type ClientOptions struct {
// ServiceURL is root URL of the backend service.
//
// Default is ServiceURL const.
ServiceURL string
// Root is a site root directory.
//
// It is a directory where packages will be installed to. It also hosts
// .cipd/* directory that tracks internal state of installed packages and
// keeps various cache files. 'Root' can be an empty string if the client is
// not going to be used to deploy or remove local packages.
Root string
// CacheDir is a directory for shared cache.
//
// If empty, instances are not cached and tags are cached inside the site
// root. If both Root and CacheDir are empty, tag cache is disabled.
CacheDir string
// Versions is optional database of (pkg, version) => instance ID resolutions.
//
// If set, it will be used for all version resolutions done by the client.
// The client won't be consulting (or updating) the tag cache and won't make
// 'ResolveVersion' backend RPCs.
//
// This is primarily used to implement $ResolvedVersions ensure file feature.
Versions ensure.VersionsFile
// AnonymousClient is http.Client that doesn't attach authentication headers.
//
// Will be used when talking to the Google Storage. We use signed URLs that do
// not require additional authentication.
//
// Default is http.DefaultClient.
AnonymousClient *http.Client
// AuthenticatedClient is http.Client that attaches authentication headers.
//
// Will be used when talking to the backend.
//
// Default is same as AnonymousClient (it will probably not work for most
// packages, since the backend won't authorize an anonymous access).
AuthenticatedClient *http.Client
// UserAgent is put into User-Agent HTTP header with each request.
//
// Default is UserAgent const.
UserAgent string
// Mocks used by tests.
casMock api.StorageClient
repoMock api.RepositoryClient
storageMock storage
}
// LoadFromEnv loads supplied default values from an environment into opts.
//
// The supplied getEnv function is used to access named environment variables,
// and should return an empty string if the environment variable is not defined.
func (opts *ClientOptions) LoadFromEnv(getEnv func(string) string) error {
if opts.CacheDir == "" {
if v := getEnv(EnvCacheDir); v != "" {
if !filepath.IsAbs(v) {
return fmt.Errorf("bad %s: not an absolute path - %s", EnvCacheDir, v)
}
opts.CacheDir = v
}
}
if opts.UserAgent == "" {
if v := getEnv(EnvHTTPUserAgentPrefix); v != "" {
opts.UserAgent = fmt.Sprintf("%s/%s", v, UserAgent)
}
}
return nil
}
// NewClient initializes CIPD client object.
func NewClient(opts ClientOptions) (Client, error) {
if opts.AnonymousClient == nil {
opts.AnonymousClient = http.DefaultClient
}
if opts.AuthenticatedClient == nil {
opts.AuthenticatedClient = opts.AnonymousClient
}
if opts.UserAgent == "" {
opts.UserAgent = UserAgent
}
// Validate and normalize service URL.
if opts.ServiceURL == "" {
return nil, fmt.Errorf("ServiceURL is required")
}
parsed, err := url.Parse(opts.ServiceURL)
if err != nil {
return nil, fmt.Errorf("not a valid URL %q - %s", opts.ServiceURL, err)
}
if parsed.Path != "" && parsed.Path != "/" {
return nil, fmt.Errorf("expecting a root URL, not %q", opts.ServiceURL)
}
opts.ServiceURL = fmt.Sprintf("%s://%s", parsed.Scheme, parsed.Host)
prpcC := &prpc.Client{
C: opts.AuthenticatedClient,
Host: parsed.Host,
Options: &prpc.Options{
UserAgent: opts.UserAgent,
Insecure: parsed.Scheme == "http", // for testing with local dev server
Retry: func() retry.Iterator {
return &retry.ExponentialBackoff{
Limited: retry.Limited{
Delay: time.Second,
Retries: 10,
},
}
},
},
}
cas := opts.casMock
if cas == nil {
cas = api.NewStoragePRPCClient(prpcC)
}
repo := opts.repoMock
if repo == nil {
repo = api.NewRepositoryPRPCClient(prpcC)
}
s := opts.storageMock
if s == nil {
s = &storageImpl{
chunkSize: uploadChunkSize,
userAgent: opts.UserAgent,
client: opts.AnonymousClient,
}
}
return &clientImpl{
ClientOptions: opts,
cas: cas,
repo: repo,
storage: s,
deployer: deployer.New(opts.Root),
}, nil
}
// MaybeUpdateClient will update the client binary at clientExe (given as
// a native path) to targetVersion if it's out of date (based on its hash).
//
// This update is done from the "infra/tools/cipd/${platform}" package, see
// ClientPackage. The function will use the given ClientOptions to figure out
// how to establish a connection with the backend. Its Root and CacheDir values
// are ignored (values derived from clientExe are used instead).
//
// If given 'digests' is not nil, will make sure the hash of the downloaded
// client binary is in 'digests'.
//
// Note that this function make sense only in a context of a default CIPD CLI
// client. Other binaries that link to cipd package should not use it, they'll
// be "updated" to the CIPD client binary.
func MaybeUpdateClient(ctx context.Context, opts ClientOptions, targetVersion, clientExe string, digests *digests.ClientDigestsFile) (common.Pin, error) {
if err := common.ValidateInstanceVersion(targetVersion); err != nil {
return common.Pin{}, err
}
opts.Root = filepath.Dir(clientExe)
opts.CacheDir = filepath.Join(opts.Root, ".cipd_client_cache")
client, err := NewClient(opts)
if err != nil {
return common.Pin{}, err
}
impl := client.(*clientImpl)
fs := fs.NewFileSystem(opts.Root, filepath.Join(opts.CacheDir, "trash"))
defer fs.CleanupTrash(ctx)
pin, err := impl.maybeUpdateClient(ctx, fs, targetVersion, clientExe, digests)
if err == nil {
impl.ensureClientVersionInfo(ctx, fs, pin, clientExe)
}
return pin, err
}
type clientImpl struct {
ClientOptions
// pRPC API clients.
cas api.StorageClient
repo api.RepositoryClient
// batchLock protects guts of by BeginBatch/EndBatch implementation.
batchLock sync.Mutex
batchNesting int
batchPending map[batchAwareOp]struct{}
// storage knows how to upload and download raw binaries using signed URLs.
storage storage
// deployer knows how to install packages to local file system. Thread safe.
deployer deployer.Deployer
// tagCache is a file-system based cache of resolved tags.
tagCache *internal.TagCache
tagCacheInit sync.Once
// instanceCache is a file-system based cache of instances.
instanceCache *internal.InstanceCache
instanceCacheInit sync.Once
}
type batchAwareOp int
const (
batchAwareOpSaveTagCache batchAwareOp = iota
batchAwareOpCleanupTrash
)
// See https://golang.org/ref/spec#Method_expressions
var batchAwareOps = map[batchAwareOp]func(*clientImpl, context.Context){
batchAwareOpSaveTagCache: (*clientImpl).saveTagCache,
batchAwareOpCleanupTrash: (*clientImpl).cleanupTrash,
}
func (client *clientImpl) saveTagCache(ctx context.Context) {
if client.tagCache != nil {
if err := client.tagCache.Save(ctx); err != nil {
logging.Warningf(ctx, "cipd: failed to save tag cache - %s", err)
}
}
}
func (client *clientImpl) cleanupTrash(ctx context.Context) {
client.deployer.CleanupTrash(ctx)
}
// getTagCache lazy-initializes tagCache and returns it.
//
// May return nil if tag cache is disabled.
func (client *clientImpl) getTagCache() *internal.TagCache {
client.tagCacheInit.Do(func() {
var dir string
switch {
case client.CacheDir != "":
dir = client.CacheDir
case client.Root != "":
dir = filepath.Join(client.Root, fs.SiteServiceDir)
default:
return
}
parsed, err := url.Parse(client.ServiceURL)
if err != nil {
panic(err) // the URL has been validated in NewClient already
}
client.tagCache = internal.NewTagCache(fs.NewFileSystem(dir, ""), parsed.Host)
})
return client.tagCache
}
// getInstanceCache lazy-initializes instanceCache and returns it.
//
// May return nil if instance cache is disabled.
func (client *clientImpl) getInstanceCache(ctx context.Context) *internal.InstanceCache {
client.instanceCacheInit.Do(func() {
if client.CacheDir == "" {
return
}
path := filepath.Join(client.CacheDir, "instances")
client.instanceCache = internal.NewInstanceCache(fs.NewFileSystem(path, ""))
logging.Infof(ctx, "cipd: using instance cache at %q", path)
})
return client.instanceCache
}
func (client *clientImpl) BeginBatch(ctx context.Context) {
client.batchLock.Lock()
defer client.batchLock.Unlock()
client.batchNesting++
}
func (client *clientImpl) EndBatch(ctx context.Context) {
client.batchLock.Lock()
defer client.batchLock.Unlock()
if client.batchNesting <= 0 {
panic("EndBatch called without corresponding BeginBatch")
}
client.batchNesting--
if client.batchNesting == 0 {
// Execute all pending batch aware calls now.
for op := range client.batchPending {
batchAwareOps[op](client, ctx)
}
client.batchPending = nil
}
}
func (client *clientImpl) doBatchAwareOp(ctx context.Context, op batchAwareOp) {
client.batchLock.Lock()
defer client.batchLock.Unlock()
if client.batchNesting == 0 {
// Not inside a batch, execute right now.
batchAwareOps[op](client, ctx)
} else {
// Schedule to execute when 'EndBatch' is called.
if client.batchPending == nil {
client.batchPending = make(map[batchAwareOp]struct{}, 1)
}
client.batchPending[op] = struct{}{}
}
}
func (client *clientImpl) FetchACL(ctx context.Context, prefix string) ([]PackageACL, error) {
if _, err := common.ValidatePackagePrefix(prefix); err != nil {
return nil, err
}
resp, err := client.repo.GetInheritedPrefixMetadata(ctx, &api.PrefixRequest{
Prefix: prefix,
}, expectedCodes)
if err != nil {
return nil, humanErr(err)
}
return prefixMetadataToACLs(resp), nil
}
func (client *clientImpl) ModifyACL(ctx context.Context, prefix string, changes []PackageACLChange) error {
if _, err := common.ValidatePackagePrefix(prefix); err != nil {
return err
}
// Fetch existing metadata, if any.
meta, err := client.repo.GetPrefixMetadata(ctx, &api.PrefixRequest{
Prefix: prefix,
}, expectedCodes)
if code := grpc.Code(err); code != codes.OK && code != codes.NotFound {
return humanErr(err)
}
// Construct new empty metadata for codes.NotFound.
if meta == nil {
meta = &api.PrefixMetadata{Prefix: prefix}
}
// Apply mutations.
if dirty, err := mutateACLs(meta, changes); !dirty || err != nil {
return err
}
// Store the new metadata. This call will check meta.Fingerprint.
_, err = client.repo.UpdatePrefixMetadata(ctx, meta, expectedCodes)
return humanErr(err)
}
func (client *clientImpl) FetchRoles(ctx context.Context, prefix string) ([]string, error) {
if _, err := common.ValidatePackagePrefix(prefix); err != nil {
return nil, err
}
resp, err := client.repo.GetRolesInPrefix(ctx, &api.PrefixRequest{
Prefix: prefix,
}, expectedCodes)
if err != nil {
return nil, humanErr(err)
}
out := make([]string, len(resp.Roles))
for i, r := range resp.Roles {
out[i] = r.Role.String()
}
return out, nil
}
func (client *clientImpl) ListPackages(ctx context.Context, prefix string, recursive, includeHidden bool) ([]string, error) {
if _, err := common.ValidatePackagePrefix(prefix); err != nil {
return nil, err
}
resp, err := client.repo.ListPrefix(ctx, &api.ListPrefixRequest{
Prefix: prefix,
Recursive: recursive,
IncludeHidden: includeHidden,
}, expectedCodes)
if err != nil {
return nil, humanErr(err)
}
listing := resp.Packages
for _, pfx := range resp.Prefixes {
listing = append(listing, pfx+"/")
}
sort.Strings(listing)
return listing, nil
}
func (client *clientImpl) ResolveVersion(ctx context.Context, packageName, version string) (common.Pin, error) {
if err := common.ValidatePackageName(packageName); err != nil {
return common.Pin{}, err
}
// Is it instance ID already? Then it is already resolved.
if common.ValidateInstanceID(version, common.AnyHash) == nil {
return common.Pin{PackageName: packageName, InstanceID: version}, nil
}
if err := common.ValidateInstanceVersion(version); err != nil {
return common.Pin{}, err
}
// Use the preresolved version if configured to do so. Do NOT fallback to
// the backend calls. A missing version is an error.
if client.Versions != nil {
return client.Versions.ResolveVersion(packageName, version)
}
// Use a local cache when resolving tags to avoid round trips to the backend
// when calling same 'cipd ensure' command again and again.
var cache *internal.TagCache
if common.ValidateInstanceTag(version) == nil {
cache = client.getTagCache() // note: may be nil if the cache is disabled
}
if cache != nil {
cached, err := cache.ResolveTag(ctx, packageName, version)
if err != nil {
logging.Warningf(ctx, "cipd: could not query tag cache - %s", err)
}
if cached.InstanceID != "" {
logging.Debugf(ctx, "cipd: tag cache hit for %s:%s - %s", packageName, version, cached.InstanceID)
return cached, nil
}
}
// Either resolving a ref, or a tag cache miss? Hit the backend.
resp, err := client.repo.ResolveVersion(ctx, &api.ResolveVersionRequest{
Package: packageName,
Version: version,
}, expectedCodes)
if err != nil {
return common.Pin{}, humanErr(err)
}
pin := common.Pin{
PackageName: packageName,
InstanceID: common.ObjectRefToInstanceID(resp.Instance),
}
// If was resolving a tag, store it in the cache.
if cache != nil {
if err := cache.AddTag(ctx, pin, version); err != nil {
logging.Warningf(ctx, "cipd: could not add tag to the cache")
}
client.doBatchAwareOp(ctx, batchAwareOpSaveTagCache)
}
return pin, nil
}
// ensureClientVersionInfo is called only with the specially constructed client,
// see MaybeUpdateClient function.
func (client *clientImpl) ensureClientVersionInfo(ctx context.Context, fs fs.FileSystem, pin common.Pin, clientExe string) {
expect, err := json.Marshal(version.Info{
PackageName: pin.PackageName,
InstanceID: pin.InstanceID,
})
if err != nil {
// Should never occur; only error could be if version.Info is not JSON
// serializable.
logging.WithError(err).Errorf(ctx, "Unable to generate version file content")
return
}
verFile := version.GetVersionFile(clientExe)
if data, err := ioutil.ReadFile(verFile); err == nil && bytes.Equal(expect, data) {
return // up to date
}
// There was an error reading the existing version file, or its content does
// not match. Proceed with EnsureFile.
err = fs.EnsureFile(ctx, verFile, func(of *os.File) error {
_, err := of.Write(expect)
return err
})
if err != nil {
logging.WithError(err).Warningf(ctx, "Unable to update version info %q", verFile)
}
}
// maybeUpdateClient is called only with the specially constructed client, see
// MaybeUpdateClient function.
func (client *clientImpl) maybeUpdateClient(ctx context.Context, fs fs.FileSystem,
targetVersion, clientExe string, digests *digests.ClientDigestsFile) (common.Pin, error) {
// currentHashMatches calculates the existing client binary hash and compares
// it to 'obj'.
currentHashMatches := func(obj *api.ObjectRef) (yep bool, err error) {
hash, err := common.NewHash(obj.HashAlgo)
if err != nil {
return false, err
}
file, err := os.Open(clientExe)
if err != nil {
return false, err
}
defer file.Close()
if _, err := io.Copy(hash, file); err != nil {
return false, err
}
return common.HexDigest(hash) == obj.HexDigest, nil
}
client.BeginBatch(ctx)
defer client.EndBatch(ctx)
// Resolve the client version to a pin, to be able to later grab URL to the
// binary by querying info for that pin.
var pin common.Pin
clientPackage, err := template.DefaultExpander().Expand(ClientPackage)
if err != nil {
return common.Pin{}, err // shouldn't be happening in reality
}
if pin, err = client.ResolveVersion(ctx, clientPackage, targetVersion); err != nil {
return common.Pin{}, err
}
// The name of the client binary inside the client CIPD package. Acts only as
// a key inside the extracted refs cache, nothing more, so can technically be
// arbitrary.
clientFileName := "cipd"
if platform.CurrentOS() == "windows" {
clientFileName = "cipd.exe"
}
// rememberClientRef populates the extracted refs cache.
rememberClientRef := func(pin common.Pin, ref *api.ObjectRef) {
if cache := client.getTagCache(); cache != nil {
cache.AddExtractedObjectRef(ctx, pin, clientFileName, ref)
client.doBatchAwareOp(ctx, batchAwareOpSaveTagCache)
}
}
// Look up the hash corresponding to the pin in the extracted refs cache. See
// rememberClientRef calls below for where it is stored initially. A cache
// miss is fine, we'll reach to the backend to get the hash. A warm cache
// allows skipping RPCs to the backend on a "happy path", when the client is
// already up-to-date.
var clientRef *api.ObjectRef
if cache := client.getTagCache(); cache != nil {
if clientRef, err = cache.ResolveExtractedObjectRef(ctx, pin, clientFileName); err != nil {
return common.Pin{}, err
}
}
// If not using the tags cache or it is cold, ask the backend for an expected
// client ref. Note that we do it even if we have 'digests' file available,
// to handle the case when 'digests' file is stale (which can happen when
// updating the client version file).
var info *ClientDescription
if clientRef == nil {
if info, err = client.DescribeClient(ctx, pin); err != nil {
return common.Pin{}, err
}
clientRef = info.Digest
}
// If using pinned client digests, make sure the hash reported by the backend
// is mentioned there. In most cases a mismatch means the pinned digests file
// is just stale. The mismatch can also happen if the backend is compromised
// or the client package was forcefully replaced (this should never really
// happen...).
if digests != nil {
plat := platform.CurrentPlatform()
switch pinnedRef := digests.ClientRef(plat); {
case pinnedRef == nil:
return common.Pin{}, fmt.Errorf("there's no supported hash for %q in CIPD *.digests file", plat)
case !digests.Contains(plat, clientRef):
return common.Pin{}, fmt.Errorf(
"the CIPD client hash reported by the backend (%s) is not in *.digests file, "+
"if you changed CIPD client version recently most likely the *.digests "+
"file is just stale and needs to be regenerated via 'cipd selfupdate-roll ...'",
clientRef.HexDigest)
default:
clientRef = pinnedRef // pick the best supported hash algo from *.digests
}
}
// Is the client binary already up-to-date (has the expected hash)?
switch yep, err := currentHashMatches(clientRef); {
case err != nil:
return common.Pin{}, err // can't read clientExe
case yep:
// If we had to fetch the expected hash, store it in the cache to avoid
// fetching it again. Don't do it if we read it from the cache initially (to
// skip unnecessary cache write).
if info != nil {
rememberClientRef(pin, clientRef)
}
return pin, nil
}
if targetVersion == pin.InstanceID {
logging.Infof(ctx, "cipd: updating client to %s", pin)
} else {
logging.Infof(ctx, "cipd: updating client to %s (%s)", pin, targetVersion)
}
// Grab the signed URL of the client binary if we haven't done so already.
if info == nil {
if info, err = client.DescribeClient(ctx, pin); err != nil {
return common.Pin{}, err
}
}
// Here we know for sure that the current binary has wrong hash (most likely
// it is outdated). Fetch the new binary, verifying its hash matches the one
// we expect.
err = client.installClient(
ctx, fs,
common.MustNewHash(clientRef.HashAlgo),
info.SignedUrl,
clientExe,
clientRef.HexDigest)
if err != nil {
// Either a download error or hash mismatch.
return common.Pin{}, errors.Annotate(err, "when updating the CIPD client to %q", targetVersion).Err()
}
// The new fetched binary is valid.
rememberClientRef(pin, clientRef)
return pin, nil
}
func (client *clientImpl) RegisterInstance(ctx context.Context, pin common.Pin, body io.ReadSeeker, timeout time.Duration) error {
if timeout == 0 {
timeout = CASFinalizationTimeout
}
// attemptToRegister calls RegisterInstance RPC and logs the result.
attemptToRegister := func() (*api.UploadOperation, error) {
logging.Infof(ctx, "cipd: registering %s", pin)
resp, err := client.repo.RegisterInstance(ctx, &api.Instance{
Package: pin.PackageName,
Instance: common.InstanceIDToObjectRef(pin.InstanceID),
}, expectedCodes)
if err != nil {
return nil, humanErr(err)
}
switch resp.Status {
case api.RegistrationStatus_REGISTERED:
logging.Infof(ctx, "cipd: instance %s was successfully registered", pin)
return nil, nil
case api.RegistrationStatus_ALREADY_REGISTERED:
logging.Infof(
ctx, "cipd: instance %s is already registered by %s on %s",
pin, resp.Instance.RegisteredBy,
google.TimeFromProto(resp.Instance.RegisteredTs).Local())
return nil, nil
case api.RegistrationStatus_NOT_UPLOADED:
return resp.UploadOp, nil
default:
return nil, fmt.Errorf("unrecognized package registration status %s", resp.Status)
}
}
// Attempt to register. May be asked to actually upload the file first.
uploadOp, err := attemptToRegister()
switch {
case err != nil:
return err
case uploadOp == nil:
return nil // no need to upload, the instance is registered
}
// The backend asked us to upload the data to CAS. Do it.
if err := client.storage.upload(ctx, uploadOp.UploadUrl, body); err != nil {
return err
}
if err := client.finalizeUpload(ctx, uploadOp.OperationId, timeout); err != nil {
return err
}
logging.Infof(ctx, "cipd: successfully uploaded and verified %s", pin)
// Try the registration again now that the file is uploaded to CAS. It should
// succeed.
switch uploadOp, err := attemptToRegister(); {
case uploadOp != nil:
return ErrBadUpload // welp, the upload didn't work for some reason, give up
default:
return err
}
}
// finalizeUpload repeatedly calls FinishUpload RPC until server reports that
// the uploaded file has been verified.
func (client *clientImpl) finalizeUpload(ctx context.Context, opID string, timeout time.Duration) error {
ctx, cancel := clock.WithTimeout(ctx, timeout)
defer cancel()
sleep := time.Second
for {
select {
case <-ctx.Done():
return ErrFinalizationTimeout
default:
}
op, err := client.cas.FinishUpload(ctx, &api.FinishUploadRequest{
UploadOperationId: opID,
})
switch {
case err == context.DeadlineExceeded:
continue // this may be short RPC deadline, try again
case err != nil:
return humanErr(err)
case op.Status == api.UploadStatus_PUBLISHED:
return nil // verified!
case op.Status == api.UploadStatus_ERRORED:
return errors.New(op.ErrorMessage) // fatal verification error
case op.Status == api.UploadStatus_UPLOADING || op.Status == api.UploadStatus_VERIFYING:
logging.Infof(ctx, "cipd: uploading - verifying")
if clock.Sleep(clock.Tag(ctx, "cipd-sleeping"), sleep).Incomplete() {
return ErrFinalizationTimeout
}
if sleep < 10*time.Second {
sleep += 500 * time.Millisecond
}
default:
return fmt.Errorf("unrecognized upload operation status %s", op.Status)
}
}
}
func (client *clientImpl) DescribeInstance(ctx context.Context, pin common.Pin, opts *DescribeInstanceOpts) (*InstanceDescription, error) {
if err := common.ValidatePin(pin, common.AnyHash); err != nil {
return nil, err
}
if opts == nil {
opts = &DescribeInstanceOpts{}
}
resp, err := client.repo.DescribeInstance(ctx, &api.DescribeInstanceRequest{
Package: pin.PackageName,
Instance: common.InstanceIDToObjectRef(pin.InstanceID),
DescribeRefs: opts.DescribeRefs,
DescribeTags: opts.DescribeTags,
}, expectedCodes)
if err != nil {
return nil, humanErr(err)
}
return apiDescToInfo(resp), nil
}
func (client *clientImpl) DescribeClient(ctx context.Context, pin common.Pin) (*ClientDescription, error) {
if err := common.ValidatePin(pin, common.AnyHash); err != nil {
return nil, err
}
resp, err := client.repo.DescribeClient(ctx, &api.DescribeClientRequest{
Package: pin.PackageName,
Instance: common.InstanceIDToObjectRef(pin.InstanceID),
}, expectedCodes)
if err != nil {
return nil, humanErr(err)
}
return apiClientDescToInfo(resp), nil
}
func (client *clientImpl) SetRefWhenReady(ctx context.Context, ref string, pin common.Pin) error {
if err := common.ValidatePackageRef(ref); err != nil {
return err
}
if err := common.ValidatePin(pin, common.AnyHash); err != nil {
return err
}
logging.Infof(ctx, "cipd: setting ref of %q: %q => %q", pin.PackageName, ref, pin.InstanceID)
err := retryUntilReady(ctx, SetRefTimeout, func(ctx context.Context) error {
_, err := client.repo.CreateRef(ctx, &api.Ref{
Name: ref,
Package: pin.PackageName,
Instance: common.InstanceIDToObjectRef(pin.InstanceID),
}, expectedCodes)
return err
})
switch err {
case nil:
logging.Infof(ctx, "cipd: ref %q is set", ref)
case ErrProcessingTimeout:
logging.Errorf(ctx, "cipd: failed to set ref - deadline exceeded")
default:
logging.Errorf(ctx, "cipd: failed to set ref - %s", err)
}
return err
}
func (client *clientImpl) AttachTagsWhenReady(ctx context.Context, pin common.Pin, tags []string) error {
if err := common.ValidatePin(pin, common.AnyHash); err != nil {
return err
}
if len(tags) == 0 {
return nil
}
apiTags := make([]*api.Tag, len(tags))
for i, t := range tags {
var err error
if apiTags[i], err = common.ParseInstanceTag(t); err != nil {
return err
}
logging.Infof(ctx, "cipd: attaching tag %s", t)
}
err := retryUntilReady(ctx, TagAttachTimeout, func(ctx context.Context) error {
_, err := client.repo.AttachTags(ctx, &api.AttachTagsRequest{
Package: pin.PackageName,
Instance: common.InstanceIDToObjectRef(pin.InstanceID),
Tags: apiTags,
}, expectedCodes)
return err
})
switch err {
case nil:
logging.Infof(ctx, "cipd: all tags attached")
case ErrProcessingTimeout:
logging.Errorf(ctx, "cipd: failed to attach tags - deadline exceeded")
default:
logging.Errorf(ctx, "cipd: failed to attach tags - %s", err)
}
return err
}
// How long to wait between retries in retryUntilReady.
const retryDelay = 5 * time.Second
// retryUntilReady calls the callback and retries on FailedPrecondition errors,
// which indicate that the instance is not ready yet (still being processed by
// the backend).
func retryUntilReady(ctx context.Context, timeout time.Duration, cb func(context.Context) error) error {
ctx, cancel := clock.WithTimeout(ctx, timeout)
defer cancel()
for {
select {
case <-ctx.Done():
return ErrProcessingTimeout
default:
}
switch err := cb(ctx); {
case err == nil:
return nil
case err == context.DeadlineExceeded:
continue // this may be short RPC deadline, try again
case grpc.Code(err) == codes.FailedPrecondition: // the instance is not ready
logging.Warningf(ctx, "cipd: %s", humanErr(err))
if clock.Sleep(clock.Tag(ctx, "cipd-sleeping"), retryDelay).Incomplete() {
return ErrProcessingTimeout
}
default:
return humanErr(err)
}
}
}
func (client *clientImpl) SearchInstances(ctx context.Context, packageName string, tags []string) (common.PinSlice, error) {
if err := common.ValidatePackageName(packageName); err != nil {
return nil, err
}
if len(tags) == 0 {
return nil, errors.New("at least one tag is required")
}
apiTags := make([]*api.Tag, len(tags))
for i, t := range tags {
var err error
if apiTags[i], err = common.ParseInstanceTag(t); err != nil {
return nil, err
}
}
resp, err := client.repo.SearchInstances(ctx, &api.SearchInstancesRequest{
Package: packageName,
Tags: apiTags,
PageSize: 1000, // TODO(vadimsh): Support pagination on the client.
}, expectedCodes)
switch {
case status.Code(err) == codes.NotFound: // no such package => no instances
return nil, nil
case err != nil:
return nil, humanErr(err)
}
out := make(common.PinSlice, len(resp.Instances))
for i, inst := range resp.Instances {
out[i] = apiInstanceToInfo(inst).Pin
}
if resp.NextPageToken != "" {
logging.Warningf(ctx, "Truncating the result only to first %d instances", len(resp.Instances))
}
return out, nil
}
func (client *clientImpl) ListInstances(ctx context.Context, packageName string) (InstanceEnumerator, error) {
if err := common.ValidatePackageName(packageName); err != nil {
return nil, err
}
return &instanceEnumeratorImpl{
fetch: func(ctx context.Context, limit int, cursor string) ([]InstanceInfo, string, error) {
resp, err := client.repo.ListInstances(ctx, &api.ListInstancesRequest{
Package: packageName,
PageSize: int32(limit),
PageToken: cursor,
}, expectedCodes)
if err != nil {
return nil, "", humanErr(err)
}
instances := make([]InstanceInfo, len(resp.Instances))
for i, inst := range resp.Instances {
instances[i] = apiInstanceToInfo(inst)
}
return instances, resp.NextPageToken, nil
},
}, nil
}
func (client *clientImpl) FetchPackageRefs(ctx context.Context, packageName string) ([]RefInfo, error) {
if err := common.ValidatePackageName(packageName); err != nil {
return nil, err
}
resp, err := client.repo.ListRefs(ctx, &api.ListRefsRequest{
Package: packageName,
}, expectedCodes)
if err != nil {
return nil, humanErr(err)
}
refs := make([]RefInfo, len(resp.Refs))
for i, r := range resp.Refs {
refs[i] = apiRefToInfo(r)
}
return refs, nil
}
func (client *clientImpl) FetchInstance(ctx context.Context, pin common.Pin) (pkg.Source, error) {
if err := common.ValidatePin(pin, common.KnownHash); err != nil {
return nil, err
}
if cache := client.getInstanceCache(ctx); cache != nil {
return client.fetchInstanceWithCache(ctx, pin, cache)
}
return client.fetchInstanceNoCache(ctx, pin)
}
func (client *clientImpl) FetchInstanceTo(ctx context.Context, pin common.Pin, output io.WriteSeeker) error {
if err := common.ValidatePin(pin, common.KnownHash); err != nil {
return err
}
// Deal with no-cache situation first, it is simple - just fetch the instance
// into the 'output'.
cache := client.getInstanceCache(ctx)
if cache == nil {
return client.remoteFetchInstance(ctx, pin, output)
}
// If using the cache, always fetch into the cache first, and then copy data
// from the cache into the output.
input, err := client.fetchInstanceWithCache(ctx, pin, cache)
if err != nil {
return err
}
defer func() {
if err := input.Close(ctx, false); err != nil {
logging.Warningf(ctx, "cipd: failed to close the package file - %s", err)
}
}()
logging.Infof(ctx, "cipd: copying the instance into the final destination...")
_, err = io.Copy(output, input)
return err
}
func (client *clientImpl) fetchInstanceNoCache(ctx context.Context, pin common.Pin) (pkg.Source, error) {
// Use temp file for storing package data. Delete it when the caller is done
// with it.
f, err := client.deployer.TempFile(ctx, pin.InstanceID)
if err != nil {
return nil, err
}
tmp := deleteOnClose{f}
// Make sure to remove the garbage on errors or panics.
ok := false
defer func() {
if !ok {
if err := tmp.Close(ctx, false); err != nil {
logging.Warningf(ctx, "cipd: failed to close the temp file - %s", err)
}
}
}()
if err := client.remoteFetchInstance(ctx, pin, tmp); err != nil {
return nil, err
}
if _, err := tmp.Seek(0, os.SEEK_SET); err != nil {
return nil, err
}
ok = true
return tmp, nil
}
func (client *clientImpl) fetchInstanceWithCache(ctx context.Context, pin common.Pin, cache *internal.InstanceCache) (pkg.Source, error) {
attempt := 0
for {
attempt++
// Try to get the instance from cache.
now := clock.Now(ctx)
switch file, err := cache.Get(ctx, pin, now); {
case os.IsNotExist(err):
// No such package in the cache. This is fine.
case err != nil:
// Some unexpected error. Log and carry on, as if it is a cache miss.
logging.Warningf(ctx, "cipd: could not get %s from cache - %s", pin, err)
default:
logging.Infof(ctx, "cipd: instance cache hit for %s", pin)
return file, nil
}
// Download the package into the cache. 'remoteFetchInstance' verifies the
// hash. When reading from the cache, we can skip the hash check (and we
// indeed do, see 'cipd: instance cache hit' case above).
err := cache.Put(ctx, pin, now, func(f *os.File) error {
return client.remoteFetchInstance(ctx, pin, f)
})
if err != nil {
return nil, err
}
// Try to open it now. There's (very) small chance that it has been evicted
// from the cache already. If this happens, try again. Do it only once.
//
// Note that theoretically we could keep open the handle to the file used in
// 'cache.Put' above, but this file gets renamed at some point, and renaming
// files with open handles on Windows is moot. So instead we close it,
// rename the file (this happens inside cache.Put), and reopen it again
// under the new name.
file, err := cache.Get(ctx, pin, clock.Now(ctx))
if err != nil {
logging.Errorf(ctx, "cipd: %s is unexpectedly missing from cache (%s)", pin, err)
if attempt == 1 {
logging.Infof(ctx, "cipd: retrying...")
continue
}
logging.Errorf(ctx, "cipd: giving up")
return nil, err
}
return file, nil
}
}
// remoteFetchInstance fetches the package file into 'output' and verifies its
// hash along the way. Assumes 'pin' is already validated.
func (client *clientImpl) remoteFetchInstance(ctx context.Context, pin common.Pin, output io.WriteSeeker) (err error) {
startTS := clock.Now(ctx)
defer func() {
if err != nil {
logging.Errorf(ctx, "cipd: failed to fetch %s - %s", pin, err)
} else {
logging.Infof(ctx, "cipd: successfully fetched %s in %.1fs", pin, clock.Now(ctx).Sub(startTS).Seconds())
}
}()
objRef := common.InstanceIDToObjectRef(pin.InstanceID)
logging.Infof(ctx, "cipd: resolving fetch URL for %s", pin)
resp, err := client.repo.GetInstanceURL(ctx, &api.GetInstanceURLRequest{
Package: pin.PackageName,
Instance: objRef,
}, expectedCodes)
if err != nil {
return humanErr(err)
}
hash := common.MustNewHash(objRef.HashAlgo)
if err = client.storage.download(ctx, resp.SignedUrl, output, hash); err != nil {
return
}
// Make sure we fetched what we've asked for.
if digest := common.HexDigest(hash); objRef.HexDigest != digest {
err = fmt.Errorf("package hash mismatch: expecting %q, got %q", objRef.HexDigest, digest)
}
return
}
// fetchAndDo will fetch and open an instance and pass it to the callback.
//
// If the callback fails with an error that indicates a corrupted instance, will
// delete the instance from the cache, refetch it and call the callback again,
// thus the callback should be idempotent.
//
// Any other error from the callback is propagated as is.
func (client *clientImpl) fetchAndDo(ctx context.Context, pin common.Pin, cb func(pkg.Instance) error) error {
if err := common.ValidatePin(pin, common.KnownHash); err != nil {
return err
}
doit := func() (err error) {
// Fetch the package (verifying its hash) and obtain a pointer to its data.
instanceFile, err := client.FetchInstance(ctx, pin)
if err != nil {
return
}
// Notify the underlying object if 'err' is a corruption error.
type corruptable interface {
Close(ctx context.Context, corrupt bool) error
}
closeMaybeCorrupted := func(f corruptable) {
corrupt := reader.IsCorruptionError(err)
if clErr := f.Close(ctx, corrupt); clErr != nil && clErr != os.ErrClosed {
logging.Warningf(ctx, "cipd: failed to close the package file - %s", clErr)
}
}
// Open the instance. This reads its manifest. 'FetchInstance' has verified
// the hash already, so skip the verification.
instance, err := reader.OpenInstance(ctx, instanceFile, reader.OpenInstanceOpts{
VerificationMode: reader.SkipHashVerification,
InstanceID: pin.InstanceID,
})
if err != nil {
closeMaybeCorrupted(instanceFile)
return
}
defer client.doBatchAwareOp(ctx, batchAwareOpCleanupTrash)
defer closeMaybeCorrupted(instance)
// Use it. 'defer' will take care of removing the temp file if needed.
return cb(instance)
}
err := doit()
if err != nil && reader.IsCorruptionError(err) {
logging.WithError(err).Warningf(ctx, "cipd: unpacking failed, retrying.")
err = doit()
}
return err
}
func (client *clientImpl) FetchAndDeployInstance(ctx context.Context, subdir string, pin common.Pin) error {
if err := common.ValidateSubdir(subdir); err != nil {
return err
}
return client.fetchAndDo(ctx, pin, func(instance pkg.Instance) error {
_, err := client.deployer.DeployInstance(ctx, subdir, instance)
return err
})
}
func (client *clientImpl) EnsurePackages(ctx context.Context, allPins common.PinSliceBySubdir, paranoia ParanoidMode, dryRun bool) (ActionMap, error) {
return client.ensurePackagesImpl(ctx, allPins, paranoia, dryRun, false)
}
func (client *clientImpl) ensurePackagesImpl(ctx context.Context, allPins common.PinSliceBySubdir, paranoia ParanoidMode, dryRun, silent bool) (aMap ActionMap, err error) {
if err = allPins.Validate(common.AnyHash); err != nil {
return
}
if err = paranoia.Validate(); err != nil {
return
}
client.BeginBatch(ctx)
defer client.EndBatch(ctx)
// Enumerate existing packages.
existing, err := client.deployer.FindDeployed(ctx)
if err != nil {
return
}
// Figure out what needs to be updated and deleted, log it.
aMap = buildActionPlan(allPins, existing, client.makeRepairChecker(ctx, paranoia))
if len(aMap) == 0 {
if !silent {
logging.Debugf(ctx, "Everything is up-to-date.")
}
return
}
// TODO(iannucci): ensure that no packages cross root boundaries
if !silent {
aMap.Log(ctx, false)
}
if dryRun {
if !silent {
logging.Infof(ctx, "Dry run, not actually doing anything.")
}
return
}
hasErrors := false
// Remove all unneeded stuff.
aMap.LoopOrdered(func(subdir string, actions *Actions) {
for _, pin := range actions.ToRemove {
err = client.deployer.RemoveDeployed(ctx, subdir, pin.PackageName)
if err != nil {
logging.Errorf(ctx, "Failed to remove %s - %s (subdir %q)", pin.PackageName, err, subdir)
hasErrors = true
actions.Errors = append(actions.Errors, ActionError{
Action: "remove",
Pin: pin,
Error: JSONError{err},
})
}
}
})
// Install all new and updated stuff, repair broken stuff. Install in the
// order specified by 'pins'. Order matters if multiple packages install same
// file.
aMap.LoopOrdered(func(subdir string, actions *Actions) {
toDeploy := make(map[string]bool, len(actions.ToInstall)+len(actions.ToUpdate))
toRepair := make(map[string]*RepairPlan, len(actions.ToRepair))
for _, p := range actions.ToInstall {
toDeploy[p.PackageName] = true
}
for _, pair := range actions.ToUpdate {
toDeploy[pair.To.PackageName] = true
}
for _, broken := range actions.ToRepair {
if broken.RepairPlan.NeedsReinstall {
toDeploy[broken.Pin.PackageName] = true
} else {
plan := broken.RepairPlan
toRepair[broken.Pin.PackageName] = &plan
}
}
for _, pin := range allPins[subdir] {
var action string
var err error
if toDeploy[pin.PackageName] {
action = "install"
err = client.FetchAndDeployInstance(ctx, subdir, pin)
} else if plan := toRepair[pin.PackageName]; plan != nil {
action = "repair"
err = client.repairDeployed(ctx, subdir, pin, plan)
}
if err != nil {
logging.Errorf(ctx, "Failed to %s %s - %s", action, pin, err)
hasErrors = true
actions.Errors = append(actions.Errors, ActionError{
Action: action,
Pin: pin,
Error: JSONError{err},
})
}
}
})
// Opportunistically cleanup the trash left from previous installs.
client.doBatchAwareOp(ctx, batchAwareOpCleanupTrash)
if !hasErrors {
logging.Infof(ctx, "All changes applied.")
} else {
err = ErrEnsurePackagesFailed
}
return
}
func (client *clientImpl) CheckDeployment(ctx context.Context, paranoia ParanoidMode) (ActionMap, error) {
// This is essentially a dry run of EnsurePackages(already installed pkgs),
// but with some paranoia mode, so it detects breakages.
existing, err := client.deployer.FindDeployed(ctx)
if err != nil {
return nil, err
}
return client.ensurePackagesImpl(ctx, existing, paranoia, true, true)
}
func (client *clientImpl) RepairDeployment(ctx context.Context, paranoia ParanoidMode) (ActionMap, error) {
// And this is a real run of EnsurePackages(already installed pkgs), so it
// can do repairs, if necessary.
existing, err := client.deployer.FindDeployed(ctx)
if err != nil {
return nil, err
}
return client.EnsurePackages(ctx, existing, paranoia, false)
}
// makeRepairChecker returns a function that decided whether we should attempt
// to repair an already installed package.
//
// The implementation depends on selected paranoia mode.
func (client *clientImpl) makeRepairChecker(ctx context.Context, paranoia ParanoidMode) repairCB {
if paranoia == NotParanoid {
return func(string, common.Pin) *RepairPlan { return nil }
}
return func(subdir string, pin common.Pin) *RepairPlan {
switch state, err := client.deployer.CheckDeployed(ctx, subdir, pin.PackageName, paranoia, WithoutManifest); {
case err != nil:
// This error is probably non-recoverable, but we'll try anyway and
// properly fail later.
return &RepairPlan{
NeedsReinstall: true,
ReinstallReason: fmt.Sprintf("failed to check the package state - %s", err),
}
case !state.Deployed:
// This should generally not happen. Can probably happen if two clients
// are messing with same .cipd/* directory concurrently.
return &RepairPlan{
NeedsReinstall: true,
ReinstallReason: "the package is not deployed at all",
}
case state.Pin.InstanceID != pin.InstanceID:
// Same here.
return &RepairPlan{
NeedsReinstall: true,
ReinstallReason: fmt.Sprintf("expected to see instance %q, but saw %q", pin.InstanceID, state.Pin.InstanceID),
}
case len(state.ToRedeploy) != 0 || len(state.ToRelink) != 0:
// Have some corrupted files that need to be repaired.
return &RepairPlan{
ToRedeploy: state.ToRedeploy,
ToRelink: state.ToRelink,
}
default:
return nil // the package needs no repairs
}
}
}
func (client *clientImpl) repairDeployed(ctx context.Context, subdir string, pin common.Pin, plan *RepairPlan) error {
// Fetch the package from the backend (or the cache) if some files are really
// missing. Skip this if we only need to restore symlinks.
if len(plan.ToRedeploy) != 0 {
logging.Infof(ctx, "Getting %q to extract %d missing file(s) from it", pin.PackageName, len(plan.ToRedeploy))
return client.fetchAndDo(ctx, pin, func(instance pkg.Instance) error {
return client.deployer.RepairDeployed(ctx, subdir, pin, deployer.RepairParams{
Instance: instance,
ToRedeploy: plan.ToRedeploy,
ToRelink: plan.ToRelink,
})
})
}
return client.deployer.RepairDeployed(ctx, subdir, pin, deployer.RepairParams{
ToRelink: plan.ToRelink,
})
}
////////////////////////////////////////////////////////////////////////////////
// pRPC error handling.
// gRPC errors that may be returned by api.RepositoryClient that we recognize
// and handle ourselves. They will not be logged by the pRPC library.
var expectedCodes = prpc.ExpectedCode(
codes.Aborted,
codes.AlreadyExists,
codes.FailedPrecondition,
codes.NotFound,
codes.PermissionDenied,
)
// humanErr takes gRPC errors and returns a human readable error that can be
// presented in the CLI.
//
// It basically strips scary looking gRPC framing around the error message.
func humanErr(err error) error {
if err != nil {
if status, ok := status.FromError(err); ok {
return errors.New(status.Message())
}
}
return err
}