blob: 08bbfdc3d0b9113893e8d6641cd99d4f0ad66aa1 [file] [log] [blame]
// Copyright 2015 The LUCI Authors. All rights reserved.
// Use of this source code is governed under the Apache License, Version 2.0
// that can be found in the LICENSE file.
// Package cipd implements client side of Chrome Infra Package Deployer.
//
// Binary package file format (in free form representation):
// <binary package> := <zipped data>
// <zipped data> := DeterministicZip(<all input files> + <manifest json>)
// <manifest json> := File{
// name: ".cipdpkg/manifest.json",
// data: JSON({
// "FormatVersion": "1",
// "PackageName": <name of the package>
// }),
// }
// DeterministicZip = zip archive with deterministic ordering of files and stripped timestamps
//
// Main package data (<zipped data> above) is deterministic, meaning its content
// depends only on inputs used to built it (byte to byte): contents and names of
// all files added to the package (plus 'executable' file mode bit) and
// a package name (and all other data in the manifest).
//
// Binary package data MUST NOT depend on a timestamp, hostname of machine that
// built it, revision of the source code it was built from, etc. All that
// information will be distributed as a separate metadata packet associated with
// the package when it gets uploaded to the server.
//
// TODO: expand more when there's server-side package data model (labels
// and stuff).
package cipd
import (
"bytes"
"crypto/sha1"
"encoding/json"
"fmt"
"hash"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"path/filepath"
"sort"
"sync"
"time"
"golang.org/x/net/context"
"github.com/luci/luci-go/common/clock"
"github.com/luci/luci-go/common/data/stringset"
"github.com/luci/luci-go/common/errors"
"github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/cipd/client/cipd/common"
"github.com/luci/luci-go/cipd/client/cipd/internal"
"github.com/luci/luci-go/cipd/client/cipd/local"
"github.com/luci/luci-go/cipd/version"
)
// PackageACLChangeAction defines a flavor of PackageACLChange.
type PackageACLChangeAction string
const (
// GrantRole is used in PackageACLChange to request a role to be granted.
GrantRole PackageACLChangeAction = "GRANT"
// RevokeRole is used in PackageACLChange to request a role to be revoked.
RevokeRole PackageACLChangeAction = "REVOKE"
// CASFinalizationTimeout is how long to wait for CAS service to finalize the upload.
CASFinalizationTimeout = 5 * time.Minute
// SetRefTimeout is how long to wait for an instance to be processed when setting a ref.
SetRefTimeout = 3 * time.Minute
// TagAttachTimeout is how long to wait for an instance to be processed when attaching tags.
TagAttachTimeout = 3 * time.Minute
)
// Environment variable definitions
const (
EnvCacheDir = "CIPD_CACHE_DIR"
EnvHTTPUserAgentPrefix = "CIPD_HTTP_USER_AGENT_PREFIX"
)
var (
// ErrFinalizationTimeout is returned if CAS service can not finalize upload fast enough.
ErrFinalizationTimeout = errors.WrapTransient(errors.New("timeout while waiting for CAS service to finalize the upload"))
// ErrBadUpload is returned when a package file is uploaded, but servers asks us to upload it again.
ErrBadUpload = errors.WrapTransient(errors.New("package file is uploaded, but servers asks us to upload it again"))
// ErrBadUploadSession is returned by UploadToCAS if provided UploadSession is not valid.
ErrBadUploadSession = errors.New("uploadURL must be set if UploadSessionID is used")
// ErrUploadSessionDied is returned by UploadToCAS if upload session suddenly disappeared.
ErrUploadSessionDied = errors.WrapTransient(errors.New("upload session is unexpectedly missing"))
// ErrNoUploadSessionID is returned by UploadToCAS if server didn't provide upload session ID.
ErrNoUploadSessionID = errors.New("server didn't provide upload session ID")
// ErrSetRefTimeout is returned when service refuses to move a ref for a long time.
ErrSetRefTimeout = errors.WrapTransient(errors.New("timeout while moving a ref"))
// ErrAttachTagsTimeout is returned when service refuses to accept tags for a long time.
ErrAttachTagsTimeout = errors.WrapTransient(errors.New("timeout while attaching tags"))
// ErrDownloadError is returned by FetchInstance on download errors.
ErrDownloadError = errors.WrapTransient(errors.New("failed to download the package file after multiple attempts"))
// ErrUploadError is returned by RegisterInstance and UploadToCAS on upload errors.
ErrUploadError = errors.WrapTransient(errors.New("failed to upload the package file after multiple attempts"))
// ErrAccessDenined is returned by calls talking to backend on 401 or 403 HTTP errors.
ErrAccessDenined = errors.New("access denied (not authenticated or not enough permissions)")
// ErrBackendInaccessible is returned by calls talking to backed if it doesn't response.
ErrBackendInaccessible = errors.WrapTransient(errors.New("request to the backend failed after multiple attempts"))
// ErrEnsurePackagesFailed is returned by EnsurePackages if something is not right.
ErrEnsurePackagesFailed = errors.New("failed to update packages, see the log")
// ErrPackageNotFound is returned by DeletePackage if the package doesn't exist.
ErrPackageNotFound = errors.New("no such package")
)
var (
// UserAgent is HTTP user agent string for CIPD client.
UserAgent = "cipd 1.7.0"
)
func init() {
ver, err := version.GetStartupVersion()
if err != nil || ver.InstanceID == "" {
return
}
UserAgent += fmt.Sprintf(" (%s@%s)", ver.PackageName, ver.InstanceID)
}
// UnixTime is time.Time that serializes to unix timestamp in JSON (represented
// as a number of seconds since January 1, 1970 UTC).
type UnixTime time.Time
// String is needed to be able to print UnixTime.
func (t UnixTime) String() string {
return time.Time(t).String()
}
// Before is used to compare UnixTime objects.
func (t UnixTime) Before(t2 UnixTime) bool {
return time.Time(t).Before(time.Time(t2))
}
// IsZero reports whether t represents the zero time instant.
func (t UnixTime) IsZero() bool {
return time.Time(t).IsZero()
}
// MarshalJSON is used by JSON encoder.
func (t UnixTime) MarshalJSON() ([]byte, error) {
if t.IsZero() {
return []byte("0"), nil
}
return []byte(fmt.Sprintf("%d", time.Time(t).Unix())), nil
}
// JSONError is wrapper around Error that serializes it as string.
type JSONError struct {
error
}
// MarshalJSON is used by JSON encoder.
func (e JSONError) MarshalJSON() ([]byte, error) {
return []byte(e.Error()), nil
}
// PackageACL is per package path per role access control list that is a part of
// larger overall ACL: ACL for package "a/b/c" is a union of PackageACLs for "a"
// "a/b" and "a/b/c".
type PackageACL struct {
// PackagePath is a package subpath this ACL is defined for.
PackagePath string `json:"package_path"`
// Role is a role that listed users have, e.g. 'READER', 'WRITER', ...
Role string `json:"role"`
// Principals list users and groups granted the role.
Principals []string `json:"principals"`
// ModifiedBy specifies who modified the list the last time.
ModifiedBy string `json:"modified_by"`
// ModifiedTs is a timestamp when the list was modified the last time.
ModifiedTs UnixTime `json:"modified_ts"`
}
// PackageACLChange is a mutation to some package ACL.
type PackageACLChange struct {
// Action defines what action to perform: GrantRole or RevokeRole.
Action PackageACLChangeAction
// Role to grant or revoke to a user or group.
Role string
// Principal is a user or a group to grant or revoke a role for.
Principal string
}
// UploadSession describes open CAS upload session.
type UploadSession struct {
// ID identifies upload session in the backend.
ID string
// URL is where to upload the data to.
URL string
}
// InstanceInfo is returned by FetchInstanceInfo.
type InstanceInfo struct {
// Pin identifies package instance.
Pin common.Pin `json:"pin"`
// RegisteredBy is identify of whoever uploaded this instance.
RegisteredBy string `json:"registered_by"`
// RegisteredTs is when the instance was registered.
RegisteredTs UnixTime `json:"registered_ts"`
}
// TagInfo is returned by FetchInstanceTags.
type TagInfo struct {
// Tag is actual tag name ("key:value" pair).
Tag string `json:"tag"`
// RegisteredBy is identify of whoever attached this tag.
RegisteredBy string `json:"registered_by"`
// RegisteredTs is when the tag was registered.
RegisteredTs UnixTime `json:"registered_ts"`
}
// RefInfo is returned by FetchInstanceRefs.
type RefInfo struct {
// Ref is the ref name.
Ref string `json:"ref"`
// ModifiedBy is identify of whoever modified this ref last time.
ModifiedBy string `json:"modified_by"`
// ModifiedTs is when the ref was modified last time.
ModifiedTs UnixTime `json:"modified_ts"`
}
// Counter is returned by ReadCounter.
type Counter struct {
// Name is the counter's name.
Name string `json:"name"`
// Value is the counter's value.
Value int64 `json:"value"`
// CreatedTS is the first time the counter was written.
CreatedTS UnixTime `json:"created_ts"`
// UpdatedTS is the most recent time the counter was written.
UpdatedTS UnixTime `json:"updated_ts"`
}
// ActionMap is a map of subdir to the Actions which will occur within it.
type ActionMap map[string]*Actions
// LoopOrdered loops over the ActionMap in sorted order (by subdir).
func (am ActionMap) LoopOrdered(cb func(subdir string, actions *Actions)) {
subdirs := make(sort.StringSlice, 0, len(am))
for subdir := range am {
subdirs = append(subdirs, subdir)
}
subdirs.Sort()
for _, subdir := range subdirs {
cb(subdir, am[subdir])
}
}
// Log prints the pending action to the logger installed in ctx.
func (am ActionMap) Log(ctx context.Context) {
keys := make([]string, 0, len(am))
for key := range am {
keys = append(keys, key)
}
sort.Strings(keys)
for _, subdir := range keys {
actions := am[subdir]
if subdir == "" {
logging.Infof(ctx, "In root:")
} else {
logging.Infof(ctx, "In subdir %q:", subdir)
}
if len(actions.ToInstall) != 0 {
logging.Infof(ctx, " to install:")
for _, pin := range actions.ToInstall {
logging.Infof(ctx, " %s", pin)
}
}
if len(actions.ToUpdate) != 0 {
logging.Infof(ctx, " to update:")
for _, pair := range actions.ToUpdate {
logging.Infof(ctx, " %s (%s -> %s)",
pair.From.PackageName, pair.From.InstanceID, pair.To.InstanceID)
}
}
if len(actions.ToRemove) != 0 {
logging.Infof(ctx, " to remove:")
for _, pin := range actions.ToRemove {
logging.Infof(ctx, " %s", pin)
}
}
}
}
// Actions is returned by EnsurePackages.
//
// It lists pins that were attempted to be installed, updated or removed, as
// well as all errors.
type Actions struct {
ToInstall common.PinSlice `json:"to_install,omitempty"` // pins to be installed
ToUpdate []UpdatedPin `json:"to_update,omitempty"` // pins to be replaced
ToRemove common.PinSlice `json:"to_remove,omitempty"` // pins to be removed
Errors []ActionError `json:"errors,omitempty"` // all individual errors
}
// Empty is true if there are no actions specified.
func (a *Actions) Empty() bool {
return len(a.ToInstall) == 0 && len(a.ToUpdate) == 0 && len(a.ToRemove) == 0
}
// UpdatedPin specifies a pair of pins: old and new version of a package.
type UpdatedPin struct {
From common.Pin `json:"from"`
To common.Pin `json:"to"`
}
// ActionError holds an error that happened when installing or removing the pin.
type ActionError struct {
Action string `json:"action"`
Pin common.Pin `json:"pin"`
Error JSONError `json:"error,omitempty"`
}
// ReadSeekCloser is the interface that groups Reader, Seeker and Closer.
type ReadSeekCloser interface {
io.Reader
io.Seeker
io.Closer
}
// Client provides high-level CIPD client interface. Thread safe.
type Client interface {
// BeginBatch makes the client enter into a "batch mode".
//
// In this mode various cleanup and cache updates, usually performed right
// away, are deferred until 'EndBatch' call.
//
// This is an optimization. Use it if you plan to call a bunch of Client
// methods in a short amount of time (parallel or sequentially).
//
// Batches can be nested.
BeginBatch(ctx context.Context)
// EndBatch ends a batch started with BeginBatch.
//
// EndBatch does various delayed maintenance tasks (like cache updates, trash
// cleanup and so on). This is best-effort operations, and thus this method
// doesn't return an errors.
//
// See also BeginBatch doc for more details.
EndBatch(ctx context.Context)
// FetchACL returns a list of PackageACL objects (parent paths first).
//
// Together they define the access control list for the given package subpath.
FetchACL(ctx context.Context, packagePath string) ([]PackageACL, error)
// ModifyACL applies a set of PackageACLChanges to a package path.
ModifyACL(ctx context.Context, packagePath string, changes []PackageACLChange) error
// UploadToCAS uploads package data blob to Content Addressed Store.
//
// Does nothing if it is already there. The data is addressed by SHA1 hash
// (also known as package's InstanceID). It can be used as a standalone
// function (if 'session' is nil) or as a part of more high level upload
// process (in that case upload session can be opened elsewhere and its
// properties passed here via 'session' argument).
//
// Returns nil on successful upload.
UploadToCAS(ctx context.Context, sha1 string, data io.ReadSeeker, session *UploadSession, timeout time.Duration) error
// ResolveVersion converts an instance ID, a tag or a ref into a concrete Pin.
ResolveVersion(ctx context.Context, packageName, version string) (common.Pin, error)
// MaybeUpdateClient will update `destination` to `targetVersion` if
// `currentHash` doesn't match version's executable hash.
//
// This update is done from the "infra/tools/cipd/${os}-${arch}" package.
MaybeUpdateClient(ctx context.Context, fs local.FileSystem, targetVersion, currentHash, destination string) error
// RegisterInstance makes the package instance available for clients.
//
// It uploads the instance to the storage and registers it in the package
// repository.
RegisterInstance(ctx context.Context, instance local.PackageInstance, timeout time.Duration) error
// DeletePackage removes the package (all its instances) from the backend.
//
// It will delete all package instances, all tags and refs. There's no undo.
DeletePackage(ctx context.Context, packageName string) error
// SetRefWhenReady moves a ref to point to a package instance.
SetRefWhenReady(ctx context.Context, ref string, pin common.Pin) error
// AttachTagsWhenReady attaches tags to an instance.
AttachTagsWhenReady(ctx context.Context, pin common.Pin, tags []string) error
// FetchInstanceInfo returns general information about the instance.
FetchInstanceInfo(ctx context.Context, pin common.Pin) (InstanceInfo, error)
// FetchInstanceTags returns information about tags attached to the instance.
//
// The returned list is sorted by tag key and creation timestamp (newest
// first). If 'tags' is empty, fetches all attached tags, otherwise only
// ones specified.
FetchInstanceTags(ctx context.Context, pin common.Pin, tags []string) ([]TagInfo, error)
// FetchInstanceRefs returns information about refs pointing to the instance.
//
// The returned list is sorted by modification timestamp (newest first). If
// 'refs' is empty, fetches all refs, otherwise only ones specified.
FetchInstanceRefs(ctx context.Context, pin common.Pin, refs []string) ([]RefInfo, error)
// FetchInstance downloads a package instance file from the repository.
//
// It verifies that the package hash matches pin.InstanceID.
//
// It returns an ReadSeekCloser pointing to the raw package data. The caller
// must close it when done.
FetchInstance(ctx context.Context, pin common.Pin) (ReadSeekCloser, error)
// FetchInstanceTo downloads a package instance file into the given writer.
//
// This is roughly the same as getting a reader with 'FetchInstance' and
// copying its data into the writer, except this call skips unnecessary temp
// files if the client is not using cache.
//
// It verifies that the package hash matches pin.InstanceID, but does it while
// writing to 'output', so expect to discard all data there if FetchInstanceTo
// returns an error.
FetchInstanceTo(ctx context.Context, pin common.Pin, output io.WriteSeeker) error
// FetchAndDeployInstance fetches the package instance and deploys it.
//
// Deploys to the given subdir under the site root (see ClientOptions.Root).
// It doesn't check whether the instance is already deployed.
FetchAndDeployInstance(ctx context.Context, subdir string, pin common.Pin) error
// ListPackages returns a list of strings of package names.
ListPackages(ctx context.Context, path string, recursive, showHidden bool) ([]string, error)
// SearchInstances finds all instances with given tag and optionally name.
//
// Returns their concrete Pins.
SearchInstances(ctx context.Context, tag, packageName string) (common.PinSlice, error)
// EnsurePackages installs, removes and updates packages in the site root.
//
// Given a description of what packages (and versions) should be installed it
// will do all necessary actions to bring the state of the site root to the
// desired one.
//
// If dryRun is true, will just check for changes and return them in Actions
// struct, but won't actually perform them.
//
// If the update was only partially applied, returns both Actions and error.
EnsurePackages(ctx context.Context, pkgs common.PinSliceBySubdir, dryRun bool) (ActionMap, error)
// IncrementCounter adds delta to the counter's value and updates its last
// updated timestamp.
//
// delta must be 0 or 1.
IncrementCounter(ctx context.Context, pin common.Pin, counterName string, delta int) error
// ReadCounter returns the current value of the counter.
ReadCounter(ctx context.Context, pin common.Pin, counterName string) (Counter, error)
}
// ClientOptions is passed to NewClient factory function.
type ClientOptions struct {
// ServiceURL is root URL of the backend service.
//
// Default is ServiceURL const.
ServiceURL string
// Root is a site root directory.
//
// It is a directory where packages will be installed to. It also hosts
// .cipd/* directory that tracks internal state of installed packages and
// keeps various cache files. 'Root' can be an empty string if the client is
// not going to be used to deploy or remove local packages.
Root string
// CacheDir is a directory for shared cache.
//
// If empty, instances are not cached and tags are cached inside the site
// root. If both Root and CacheDir are empty, tag cache is disabled.
CacheDir string
// AnonymousClient is http.Client that doesn't attach authentication headers.
//
// Will be used when talking to the Google Storage. We use signed URLs that do
// not require additional authentication.
//
// Default is http.DefaultClient.
AnonymousClient *http.Client
// AuthenticatedClient is http.Client that attaches authentication headers.
//
// Will be used when talking to the backend.
//
// Default is same as AnonymousClient (it will probably not work for most
// packages, since the backend won't authorize an anonymous access).
AuthenticatedClient *http.Client
// UserAgent is put into User-Agent HTTP header with each request.
//
// Default is UserAgent const.
UserAgent string
}
// LoadFromEnv loads supplied default values from an environment into opts.
//
// The supplied getEnv function is used to access named enviornment variables,
// and should return an empty string if the enviornment variable is not defined.
func (opts *ClientOptions) LoadFromEnv(getEnv func(string) string) error {
if opts.CacheDir == "" {
if v := getEnv(EnvCacheDir); v != "" {
if !filepath.IsAbs(v) {
return fmt.Errorf("bad %s: not an absolute path - %s", EnvCacheDir, v)
}
opts.CacheDir = v
}
}
if opts.UserAgent == "" {
if v := getEnv(EnvHTTPUserAgentPrefix); v != "" {
opts.UserAgent = fmt.Sprintf("%s/%s", v, UserAgent)
}
}
return nil
}
// NewClient initializes CIPD client object.
func NewClient(opts ClientOptions) (Client, error) {
if opts.AnonymousClient == nil {
opts.AnonymousClient = http.DefaultClient
}
if opts.AuthenticatedClient == nil {
opts.AuthenticatedClient = opts.AnonymousClient
}
if opts.UserAgent == "" {
opts.UserAgent = UserAgent
}
// Validate and normalize service URL.
if opts.ServiceURL == "" {
return nil, fmt.Errorf("ServiceURL is required")
}
parsed, err := url.Parse(opts.ServiceURL)
if err != nil {
return nil, fmt.Errorf("not a valid URL %q - %s", opts.ServiceURL, err)
}
if parsed.Path != "" && parsed.Path != "/" {
return nil, fmt.Errorf("expecting a root URL, not %q", opts.ServiceURL)
}
opts.ServiceURL = fmt.Sprintf("%s://%s", parsed.Scheme, parsed.Host)
return &clientImpl{
ClientOptions: opts,
remote: &remoteImpl{
serviceURL: opts.ServiceURL,
userAgent: opts.UserAgent,
client: opts.AuthenticatedClient,
},
storage: &storageImpl{
chunkSize: uploadChunkSize,
userAgent: opts.UserAgent,
client: opts.AnonymousClient,
},
deployer: local.NewDeployer(opts.Root),
}, nil
}
type clientImpl struct {
ClientOptions
// batchLock protects guts of by BeginBatch/EndBatch implementation.
batchLock sync.Mutex
batchNesting int
batchPending map[batchAwareOp]struct{}
// remote knows how to call backend REST API.
remote remote
// storage knows how to upload and download raw binaries using signed URLs.
storage storage
// deployer knows how to install packages to local file system. Thread safe.
deployer local.Deployer
// tagCache is a file-system based cache of resolved tags.
tagCache *internal.TagCache
tagCacheInit sync.Once
// instanceCache is a file-system based cache of instances.
instanceCache *internal.InstanceCache
instanceCacheInit sync.Once
}
type batchAwareOp int
const (
batchAwareOpSaveTagCache batchAwareOp = iota
batchAwareOpCleanupTrash
)
// See https://golang.org/ref/spec#Method_expressions
var batchAwareOps = map[batchAwareOp]func(*clientImpl, context.Context){
batchAwareOpSaveTagCache: (*clientImpl).saveTagCache,
batchAwareOpCleanupTrash: (*clientImpl).cleanupTrash,
}
func (client *clientImpl) saveTagCache(ctx context.Context) {
if client.tagCache != nil {
if err := client.tagCache.Save(ctx); err != nil {
logging.Warningf(ctx, "cipd: failed to save tag cache - %s", err)
}
}
}
func (client *clientImpl) cleanupTrash(ctx context.Context) {
if err := client.deployer.CleanupTrash(ctx); err != nil {
logging.Warningf(ctx, "cipd: failed to cleanup trash (this is fine) - %s", err)
}
}
// getTagCache lazy-initializes tagCache and returns it.
//
// May return nil if tag cache is disabled.
func (client *clientImpl) getTagCache() *internal.TagCache {
client.tagCacheInit.Do(func() {
var dir string
switch {
case client.CacheDir != "":
dir = client.CacheDir
case client.Root != "":
dir = filepath.Join(client.Root, local.SiteServiceDir)
default:
return
}
parsed, err := url.Parse(client.ServiceURL)
if err != nil {
panic(err) // the URL has been validated in NewClient already
}
client.tagCache = internal.NewTagCache(local.NewFileSystem(dir, ""), parsed.Host)
})
return client.tagCache
}
// getInstanceCache lazy-initializes instanceCache and returns it.
//
// May return nil if instance cache is disabled.
func (client *clientImpl) getInstanceCache(ctx context.Context) *internal.InstanceCache {
client.instanceCacheInit.Do(func() {
if client.CacheDir == "" {
return
}
path := filepath.Join(client.CacheDir, "instances")
client.instanceCache = internal.NewInstanceCache(local.NewFileSystem(path, ""))
logging.Infof(ctx, "cipd: using instance cache at %q", path)
})
return client.instanceCache
}
func (client *clientImpl) BeginBatch(ctx context.Context) {
client.batchLock.Lock()
defer client.batchLock.Unlock()
client.batchNesting++
}
func (client *clientImpl) EndBatch(ctx context.Context) {
client.batchLock.Lock()
defer client.batchLock.Unlock()
if client.batchNesting <= 0 {
panic("EndBatch called without corresponding BeginBatch")
}
client.batchNesting--
if client.batchNesting == 0 {
// Execute all pending batch aware calls now.
for op := range client.batchPending {
batchAwareOps[op](client, ctx)
}
client.batchPending = nil
}
}
func (client *clientImpl) doBatchAwareOp(ctx context.Context, op batchAwareOp) {
client.batchLock.Lock()
defer client.batchLock.Unlock()
if client.batchNesting == 0 {
// Not inside a batch, execute right now.
batchAwareOps[op](client, ctx)
} else {
// Schedule to execute when 'EndBatch' is called.
if client.batchPending == nil {
client.batchPending = make(map[batchAwareOp]struct{}, 1)
}
client.batchPending[op] = struct{}{}
}
}
func (client *clientImpl) FetchACL(ctx context.Context, packagePath string) ([]PackageACL, error) {
return client.remote.fetchACL(ctx, packagePath)
}
func (client *clientImpl) ModifyACL(ctx context.Context, packagePath string, changes []PackageACLChange) error {
return client.remote.modifyACL(ctx, packagePath, changes)
}
func (client *clientImpl) ListPackages(ctx context.Context, path string, recursive, showHidden bool) ([]string, error) {
pkgs, dirs, err := client.remote.listPackages(ctx, path, recursive, showHidden)
if err != nil {
return nil, err
}
// Add trailing slash to directories.
for k, d := range dirs {
dirs[k] = d + "/"
}
// Merge and sort packages and directories.
allPkgs := append(pkgs, dirs...)
sort.Strings(allPkgs)
return allPkgs, nil
}
func (client *clientImpl) UploadToCAS(ctx context.Context, sha1 string, data io.ReadSeeker, session *UploadSession, timeout time.Duration) error {
// Open new upload session if an existing is not provided.
var err error
if session == nil {
logging.Infof(ctx, "cipd: uploading %s: initiating", sha1)
session, err = client.remote.initiateUpload(ctx, sha1)
if err != nil {
logging.Warningf(ctx, "cipd: can't upload %s - %s", sha1, err)
return err
}
if session == nil {
logging.Infof(ctx, "cipd: %s is already uploaded", sha1)
return nil
}
} else {
if session.ID == "" || session.URL == "" {
return ErrBadUploadSession
}
}
// Upload the file to CAS storage.
err = client.storage.upload(ctx, session.URL, data)
if err != nil {
return err
}
// Finalize the upload, wait until server verifies and publishes the file.
if timeout == 0 {
timeout = CASFinalizationTimeout
}
started := clock.Now(ctx)
delay := time.Second
for {
if err := ctx.Err(); err != nil {
return err
}
published, err := client.remote.finalizeUpload(ctx, session.ID)
if err != nil {
logging.Warningf(ctx, "cipd: upload of %s failed: %s", sha1, err)
return err
}
if published {
logging.Infof(ctx, "cipd: successfully uploaded %s", sha1)
return nil
}
if clock.Now(ctx).Sub(started) > timeout {
logging.Warningf(ctx, "cipd: upload of %s failed: timeout", sha1)
return ErrFinalizationTimeout
}
logging.Infof(ctx, "cipd: uploading - verifying")
clock.Sleep(ctx, delay)
if delay < 4*time.Second {
delay += 500 * time.Millisecond
}
}
}
func (client *clientImpl) ResolveVersion(ctx context.Context, packageName, version string) (common.Pin, error) {
if err := common.ValidatePackageName(packageName); err != nil {
return common.Pin{}, err
}
// Is it instance ID already? Don't bother calling the backend.
if common.ValidateInstanceID(version) == nil {
return common.Pin{PackageName: packageName, InstanceID: version}, nil
}
if err := common.ValidateInstanceVersion(version); err != nil {
return common.Pin{}, err
}
// Use local cache when resolving tags to avoid round trips to backend when
// calling same 'cipd ensure' command again and again.
var cache *internal.TagCache
if common.ValidateInstanceTag(version) == nil {
cache = client.getTagCache()
}
if cache != nil {
cached, err := cache.ResolveTag(ctx, packageName, version)
if err != nil {
logging.Warningf(ctx, "cipd: could not query tag cache - %s", err)
}
if cached.InstanceID != "" {
logging.Debugf(ctx, "cipd: tag cache hit for %s:%s - %s", packageName, version, cached.InstanceID)
return cached, nil
}
}
pin, err := client.remote.resolveVersion(ctx, packageName, version)
if err != nil {
return pin, err
}
if cache != nil {
if err := cache.AddTag(ctx, pin, version); err != nil {
logging.Warningf(ctx, "cipd: could not add tag to the cache")
}
client.doBatchAwareOp(ctx, batchAwareOpSaveTagCache)
}
return pin, nil
}
const clientPackageBase = "infra/tools/cipd"
var clientPackage = ""
var clientFileName = ""
func init() {
clientFileName = "cipd"
if common.CurrentOS() == "windows" {
clientFileName = "cipd.exe"
}
clientPackage = fmt.Sprintf("%s/%s-%s", clientPackageBase,
common.CurrentOS(), common.CurrentArchitecture())
}
func (client *clientImpl) ensureClientVersionInfo(ctx context.Context, fs local.FileSystem, pin common.Pin, exePath string) {
verFile := version.GetVersionFile(exePath)
expect, err := json.Marshal(version.Info{
PackageName: pin.PackageName,
InstanceID: pin.InstanceID,
})
if err != nil {
// should never occur; only error could be if version.Info is not JSON
// serializable.
logging.WithError(err).Errorf(ctx, "Unable to generate version file content")
return
}
if f, err := os.Open(verFile); err == nil {
data, err := ioutil.ReadAll(f)
f.Close()
if err == nil && bytes.Equal(expect, data) {
// up to date
return
}
}
// there was an error reading the existing version file, or its content does
// not match. Proceed with EnsureFile.
err = fs.EnsureFile(ctx, verFile, func(of *os.File) error {
_, err := of.Write(expect)
return err
})
if err != nil {
logging.WithError(err).Warningf(ctx, "Unable to update version info %q", verFile)
}
}
func (client *clientImpl) MaybeUpdateClient(ctx context.Context, fs local.FileSystem, targetVersion, currentHash, destination string) error {
pin, err := client.maybeUpdateClient(ctx, fs, targetVersion, currentHash, destination)
if err == nil {
client.ensureClientVersionInfo(ctx, fs, pin, destination)
}
return err
}
func (client *clientImpl) maybeUpdateClient(ctx context.Context, fs local.FileSystem, targetVersion, currentHash, destination string) (pin common.Pin, err error) {
if err = common.ValidateFileHash(currentHash); err != nil {
return
}
client.BeginBatch(ctx)
defer client.EndBatch(ctx)
if pin, err = client.ResolveVersion(ctx, clientPackage, targetVersion); err != nil {
return
}
cache := client.getTagCache()
exeHash := ""
if cache != nil {
if exeHash, err = cache.ResolveFile(ctx, pin, clientFileName); err != nil {
return
}
}
if exeHash == currentHash {
// already up-to-date. Make sure version file is up to date.
return
}
if targetVersion == pin.InstanceID {
logging.Infof(ctx, "cipd: updating client to %s", pin)
} else {
logging.Infof(ctx, "cipd: updating client to %s (%s)", pin, targetVersion)
}
info, err := client.remote.fetchClientBinaryInfo(ctx, pin)
if err != nil {
return
}
if cache != nil {
if err = cache.AddFile(ctx, pin, clientFileName, info.clientBinary.SHA1); err != nil {
return
}
}
client.doBatchAwareOp(ctx, batchAwareOpSaveTagCache)
if info.clientBinary.SHA1 == currentHash {
// already up-to-date, but the cache didn't know that. Make sure version
// file is update.
return
}
err = client.installClient(ctx, fs, sha1.New(), info.clientBinary.FetchURL, destination, info.clientBinary.SHA1)
if err != nil {
return
}
return
}
func (client *clientImpl) RegisterInstance(ctx context.Context, instance local.PackageInstance, timeout time.Duration) error {
// Attempt to register.
logging.Infof(ctx, "cipd: registering %s", instance.Pin())
result, err := client.remote.registerInstance(ctx, instance.Pin())
if err != nil {
return err
}
// Asked to upload the package file to CAS first?
if result.uploadSession != nil {
err = client.UploadToCAS(
ctx, instance.Pin().InstanceID, instance.DataReader(),
result.uploadSession, timeout)
if err != nil {
return err
}
// Try again, now that file is uploaded.
logging.Infof(ctx, "cipd: registering %s", instance.Pin())
result, err = client.remote.registerInstance(ctx, instance.Pin())
if err != nil {
return err
}
if result.uploadSession != nil {
return ErrBadUpload
}
}
if result.alreadyRegistered {
logging.Infof(
ctx, "cipd: instance %s is already registered by %s on %s",
instance.Pin(), result.registeredBy, result.registeredTs)
} else {
logging.Infof(ctx, "cipd: instance %s was successfully registered", instance.Pin())
}
return nil
}
func (client *clientImpl) DeletePackage(ctx context.Context, packageName string) error {
if err := common.ValidatePackageName(packageName); err != nil {
return err
}
return client.remote.deletePackage(ctx, packageName)
}
func (client *clientImpl) IncrementCounter(ctx context.Context, pin common.Pin, counter string, delta int) error {
return client.remote.incrementCounter(ctx, pin, counter, delta)
}
func (client *clientImpl) ReadCounter(ctx context.Context, pin common.Pin, counter string) (Counter, error) {
return client.remote.readCounter(ctx, pin, counter)
}
func (client *clientImpl) SetRefWhenReady(ctx context.Context, ref string, pin common.Pin) error {
if err := common.ValidatePackageRef(ref); err != nil {
return err
}
if err := common.ValidatePin(pin); err != nil {
return err
}
logging.Infof(ctx, "cipd: setting ref of %q: %q => %q", pin.PackageName, ref, pin.InstanceID)
deadline := clock.Now(ctx).Add(SetRefTimeout)
for clock.Now(ctx).Before(deadline) {
if err := ctx.Err(); err != nil {
return err
}
err := client.remote.setRef(ctx, ref, pin)
if err == nil {
return nil
}
if _, ok := err.(*pendingProcessingError); ok {
logging.Warningf(ctx, "cipd: package instance is not ready yet - %s", err)
clock.Sleep(ctx, 5*time.Second)
} else {
logging.Errorf(ctx, "cipd: failed to set ref - %s", err)
return err
}
}
logging.Errorf(ctx, "cipd: failed set ref - deadline exceeded")
return ErrSetRefTimeout
}
func (client *clientImpl) AttachTagsWhenReady(ctx context.Context, pin common.Pin, tags []string) error {
err := common.ValidatePin(pin)
if err != nil {
return err
}
if len(tags) == 0 {
return nil
}
for _, tag := range tags {
logging.Infof(ctx, "cipd: attaching tag %s", tag)
}
deadline := clock.Now(ctx).Add(TagAttachTimeout)
for clock.Now(ctx).Before(deadline) {
if err := ctx.Err(); err != nil {
return err
}
err = client.remote.attachTags(ctx, pin, tags)
if err == nil {
logging.Infof(ctx, "cipd: all tags attached")
return nil
}
if _, ok := err.(*pendingProcessingError); ok {
logging.Warningf(ctx, "cipd: package instance is not ready yet - %s", err)
clock.Sleep(ctx, 5*time.Second)
} else {
logging.Errorf(ctx, "cipd: failed to attach tags - %s", err)
return err
}
}
logging.Errorf(ctx, "cipd: failed to attach tags - deadline exceeded")
return ErrAttachTagsTimeout
}
func (client *clientImpl) SearchInstances(ctx context.Context, tag, packageName string) (common.PinSlice, error) {
if packageName != "" {
// Don't bother searching if packageName is invalid.
if err := common.ValidatePackageName(packageName); err != nil {
return common.PinSlice{}, err
}
}
return client.remote.searchInstances(ctx, tag, packageName)
}
func (client *clientImpl) FetchInstanceInfo(ctx context.Context, pin common.Pin) (InstanceInfo, error) {
err := common.ValidatePin(pin)
if err != nil {
return InstanceInfo{}, err
}
info, err := client.remote.fetchInstance(ctx, pin)
if err != nil {
return InstanceInfo{}, err
}
return InstanceInfo{
Pin: pin,
RegisteredBy: info.registeredBy,
RegisteredTs: UnixTime(info.registeredTs),
}, nil
}
type sortByTagKey []TagInfo
func (s sortByTagKey) Len() int { return len(s) }
func (s sortByTagKey) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s sortByTagKey) Less(i, j int) bool {
k1 := common.GetInstanceTagKey(s[i].Tag)
k2 := common.GetInstanceTagKey(s[j].Tag)
if k1 == k2 {
// Newest first.
return s[j].RegisteredTs.Before(s[i].RegisteredTs)
}
return k1 < k2
}
func (client *clientImpl) FetchInstanceTags(ctx context.Context, pin common.Pin, tags []string) ([]TagInfo, error) {
err := common.ValidatePin(pin)
if err != nil {
return nil, err
}
fetched, err := client.remote.fetchTags(ctx, pin, tags)
if err != nil {
return nil, err
}
sort.Sort(sortByTagKey(fetched))
return fetched, nil
}
func (client *clientImpl) FetchInstanceRefs(ctx context.Context, pin common.Pin, refs []string) ([]RefInfo, error) {
err := common.ValidatePin(pin)
if err != nil {
return nil, err
}
return client.remote.fetchRefs(ctx, pin, refs)
}
func (client *clientImpl) FetchInstance(ctx context.Context, pin common.Pin) (ReadSeekCloser, error) {
if err := common.ValidatePin(pin); err != nil {
return nil, err
}
if cache := client.getInstanceCache(ctx); cache != nil {
return client.fetchInstanceWithCache(ctx, pin, cache)
}
return client.fetchInstanceNoCache(ctx, pin)
}
func (client *clientImpl) FetchInstanceTo(ctx context.Context, pin common.Pin, output io.WriteSeeker) error {
if err := common.ValidatePin(pin); err != nil {
return err
}
// Deal with no-cache situation first, it is simple - just fetch the instance
// into the 'output'.
cache := client.getInstanceCache(ctx)
if cache == nil {
return client.remoteFetchInstance(ctx, pin, output)
}
// If using the cache, always fetch into the cache first, and then copy data
// from the cache into the output.
input, err := client.fetchInstanceWithCache(ctx, pin, cache)
if err != nil {
return err
}
defer func() {
if err := input.Close(); err != nil {
logging.Warningf(ctx, "cipd: failed to close the package file - %s", err)
}
}()
logging.Infof(ctx, "cipd: copying the instance into the final destination...")
_, err = io.Copy(output, input)
return err
}
func (client *clientImpl) fetchInstanceNoCache(ctx context.Context, pin common.Pin) (ReadSeekCloser, error) {
// Use temp file for storing package data. Delete it when the caller is done
// with it.
f, err := client.deployer.TempFile(ctx, pin.InstanceID)
if err != nil {
return nil, err
}
tmp := deleteOnClose{f}
// Make sure to remove the garbage on errors or panics.
ok := false
defer func() {
if !ok {
if err := tmp.Close(); err != nil {
logging.Warningf(ctx, "cipd: failed to close the temp file - %s", err)
}
}
}()
if err := client.remoteFetchInstance(ctx, pin, tmp); err != nil {
return nil, err
}
// Return exact same file as ReadSeekCloser.
if _, err := tmp.Seek(0, os.SEEK_SET); err != nil {
return nil, err
}
ok = true
return tmp, nil
}
func (client *clientImpl) fetchInstanceWithCache(ctx context.Context, pin common.Pin, cache *internal.InstanceCache) (ReadSeekCloser, error) {
attempt := 0
for {
attempt++
// Try to get the instance from cache.
now := clock.Now(ctx)
switch file, err := cache.Get(ctx, pin, now); {
case os.IsNotExist(err):
// No such package in the cache. This is fine.
case err != nil:
// Some unexpected error. Log and carry on, as if it is a cache miss.
logging.Warningf(ctx, "cipd: could not get %s from cache - %s", pin, err)
default:
logging.Infof(ctx, "cipd: instance cache hit for %s", pin)
return file, nil
}
// Download the package into the cache. 'remoteFetchInstance' verifies the
// hash. When reading from the cache, we can skip the hash check (and we
// indeed do, see 'cipd: instance cache hit' case above).
err := cache.Put(ctx, pin, now, func(f *os.File) error {
return client.remoteFetchInstance(ctx, pin, f)
})
if err != nil {
return nil, err
}
// Try to open it now. There's (very) small chance that it has been evicted
// from the cache already. If this happens, try again. Do it only once.
//
// Note that theoretically we could keep open the handle to the file used in
// 'cache.Put' above, but this file gets renamed at some point, and renaming
// files with open handles on Windows is moot. So instead we close it,
// rename the file (this happens inside cache.Put), and reopen it again
// under the new name.
file, err := cache.Get(ctx, pin, clock.Now(ctx))
if err != nil {
logging.Errorf(ctx, "cipd: %s is unexpectedly missing from cache (%s)", pin, err)
if attempt == 1 {
logging.Infof(ctx, "cipd: retrying...")
continue
}
logging.Errorf(ctx, "cipd: giving up")
return nil, err
}
return file, nil
}
}
// remoteFetchInstance fetches the package file into 'output' and verifies its
// hash along the way.
func (client *clientImpl) remoteFetchInstance(ctx context.Context, pin common.Pin, output io.WriteSeeker) (err error) {
defer func() {
if err != nil {
logging.Errorf(ctx, "cipd: failed to fetch %s - %s", pin, err)
} else {
logging.Infof(ctx, "cipd: successfully fetched %s", pin)
}
}()
logging.Infof(ctx, "cipd: resolving fetch URL for %s", pin)
fetchInfo, err := client.remote.fetchInstance(ctx, pin)
if err != nil {
return
}
hash, err := common.HashForInstanceID(pin.InstanceID)
if err != nil {
return
}
if err = client.storage.download(ctx, fetchInfo.fetchURL, output, hash); err != nil {
return
}
if common.InstanceIDFromHash(hash) != pin.InstanceID {
err = fmt.Errorf("package hash mismatch")
}
return
}
func (client *clientImpl) FetchAndDeployInstance(ctx context.Context, subdir string, pin common.Pin) error {
if err := common.ValidateSubdir(subdir); err != nil {
return err
}
if err := common.ValidatePin(pin); err != nil {
return err
}
// Fetch the package (verifying its hash) and obtain a pointer to its data.
instanceFile, err := client.FetchInstance(ctx, pin)
if err != nil {
return err
}
defer func() {
if err := instanceFile.Close(); err != nil && err != os.ErrClosed {
logging.Warningf(ctx, "cipd: failed to close the package file - %s", err)
}
}()
// Open the instance. This reads its manifest. 'FetchInstance' has verified
// the hash already, so skip verification.
instance, err := local.OpenInstance(ctx, instanceFile, pin.InstanceID, local.SkipHashVerification)
if err != nil {
return err
}
// Opportunistically clean up trashed files.
defer client.doBatchAwareOp(ctx, batchAwareOpCleanupTrash)
// Deploy it. 'defer' will take care of removing the temp file if needed.
_, err = client.deployer.DeployInstance(ctx, subdir, instance)
return err
}
func (client *clientImpl) EnsurePackages(ctx context.Context, allPins common.PinSliceBySubdir, dryRun bool) (aMap ActionMap, err error) {
if err = allPins.Validate(); err != nil {
return
}
client.BeginBatch(ctx)
defer client.EndBatch(ctx)
// Enumerate existing packages.
existing, err := client.deployer.FindDeployed(ctx)
if err != nil {
return
}
// Figure out what needs to be updated and deleted, log it.
aMap = buildActionPlan(allPins, existing)
if len(aMap) == 0 {
logging.Debugf(ctx, "Everything is up-to-date.")
return
}
// TODO(iannucci): ensure that no packages cross root boundaries
aMap.Log(ctx)
if dryRun {
logging.Infof(ctx, "Dry run, not actually doing anything.")
return
}
hasErrors := false
// Remove all unneeded stuff.
aMap.LoopOrdered(func(subdir string, actions *Actions) {
for _, pin := range actions.ToRemove {
err = client.deployer.RemoveDeployed(ctx, subdir, pin.PackageName)
if err != nil {
logging.Errorf(ctx, "Failed to remove %s - %s (subdir %q)", pin.PackageName, err, subdir)
hasErrors = true
actions.Errors = append(actions.Errors, ActionError{
Action: "remove",
Pin: pin,
Error: JSONError{err},
})
}
}
})
// Install all new and updated stuff. Install in the order specified by
// 'pins'. Order matters if multiple packages install same file.
aMap.LoopOrdered(func(subdir string, actions *Actions) {
toDeploy := make(map[string]bool, len(actions.ToInstall)+len(actions.ToUpdate))
for _, p := range actions.ToInstall {
toDeploy[p.PackageName] = true
}
for _, pair := range actions.ToUpdate {
toDeploy[pair.To.PackageName] = true
}
for _, pin := range allPins[subdir] {
if !toDeploy[pin.PackageName] {
continue
}
err = client.FetchAndDeployInstance(ctx, subdir, pin)
if err != nil {
logging.Errorf(ctx, "Failed to install %s - %s", pin, err)
hasErrors = true
actions.Errors = append(actions.Errors, ActionError{
Action: "install",
Pin: pin,
Error: JSONError{err},
})
}
}
})
// Opportunistically cleanup the trash left from previous installs.
client.doBatchAwareOp(ctx, batchAwareOpCleanupTrash)
if !hasErrors {
logging.Infof(ctx, "All changes applied.")
} else {
err = ErrEnsurePackagesFailed
}
return
}
////////////////////////////////////////////////////////////////////////////////
// Private structs and interfaces.
type remote interface {
fetchACL(ctx context.Context, packagePath string) ([]PackageACL, error)
modifyACL(ctx context.Context, packagePath string, changes []PackageACLChange) error
resolveVersion(ctx context.Context, packageName, version string) (common.Pin, error)
initiateUpload(ctx context.Context, sha1 string) (*UploadSession, error)
finalizeUpload(ctx context.Context, sessionID string) (bool, error)
registerInstance(ctx context.Context, pin common.Pin) (*registerInstanceResponse, error)
deletePackage(ctx context.Context, packageName string) error
setRef(ctx context.Context, ref string, pin common.Pin) error
attachTags(ctx context.Context, pin common.Pin, tags []string) error
fetchTags(ctx context.Context, pin common.Pin, tags []string) ([]TagInfo, error)
fetchRefs(ctx context.Context, pin common.Pin, refs []string) ([]RefInfo, error)
fetchInstance(ctx context.Context, pin common.Pin) (*fetchInstanceResponse, error)
fetchClientBinaryInfo(ctx context.Context, pin common.Pin) (*fetchClientBinaryInfoResponse, error)
listPackages(ctx context.Context, path string, recursive, showHidden bool) ([]string, []string, error)
searchInstances(ctx context.Context, tag, packageName string) (common.PinSlice, error)
incrementCounter(ctx context.Context, pin common.Pin, counter string, delta int) error
readCounter(ctx context.Context, pin common.Pin, counter string) (Counter, error)
}
type storage interface {
upload(ctx context.Context, url string, data io.ReadSeeker) error
download(ctx context.Context, url string, output io.WriteSeeker, h hash.Hash) error
}
type registerInstanceResponse struct {
uploadSession *UploadSession
alreadyRegistered bool
registeredBy string
registeredTs time.Time
}
type fetchInstanceResponse struct {
fetchURL string
registeredBy string
registeredTs time.Time
}
type clientBinary struct {
FileName string `json:"file_name"`
SHA1 string `json:"sha1"`
FetchURL string `json:"fetch_url"`
Size int64 `json:"size,string"`
}
type fetchClientBinaryInfoResponse struct {
instance *InstanceInfo
clientBinary *clientBinary
}
// deleteOnClose deletes the file once it is closed.
type deleteOnClose struct {
*os.File
}
func (d deleteOnClose) Close() (err error) {
name := d.File.Name()
defer func() {
if rmErr := os.Remove(name); err == nil && rmErr != nil && !os.IsNotExist(rmErr) {
err = rmErr
}
}()
return d.File.Close()
}
// Private stuff.
// buildActionPlan is used by EnsurePackages to figure out what to install or remove.
func buildActionPlan(desired, existing common.PinSliceBySubdir) (aMap ActionMap) {
desiredSubdirs := stringset.New(len(desired))
for desiredSubdir := range desired {
desiredSubdirs.Add(desiredSubdir)
}
existingSubdirs := stringset.New(len(existing))
for existingSubdir := range existing {
existingSubdirs.Add(existingSubdir)
}
aMap = ActionMap{}
// all newly added subdirs
desiredSubdirs.Difference(existingSubdirs).Iter(func(subdir string) bool {
if want := desired[subdir]; len(want) > 0 {
aMap[subdir] = &Actions{ToInstall: want}
}
return true
})
// all removed subdirs
existingSubdirs.Difference(desiredSubdirs).Iter(func(subdir string) bool {
if have := existing[subdir]; len(have) > 0 {
aMap[subdir] = &Actions{ToRemove: have}
}
return true
})
// all common subdirs
desiredSubdirs.Intersect(existingSubdirs).Iter(func(subdir string) bool {
a := Actions{}
// Figure out what needs to be installed or updated.
haveMap := existing[subdir].ToMap()
for _, want := range desired[subdir] {
if haveID, exists := haveMap[want.PackageName]; !exists {
a.ToInstall = append(a.ToInstall, want)
} else if haveID != want.InstanceID {
a.ToUpdate = append(a.ToUpdate, UpdatedPin{
From: common.Pin{PackageName: want.PackageName, InstanceID: haveID},
To: want,
})
}
}
// Figure out what needs to be removed.
wantMap := desired[subdir].ToMap()
for _, have := range existing[subdir] {
if wantMap[have.PackageName] == "" {
a.ToRemove = append(a.ToRemove, have)
}
}
if !a.Empty() {
aMap[subdir] = &a
}
return true
})
if len(aMap) == 0 {
return nil
}
return
}