| // Copyright 2017 The LUCI Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package repo |
| |
| import ( |
| "context" |
| "encoding/json" |
| "fmt" |
| "net/http" |
| "sort" |
| "strings" |
| "sync" |
| |
| "github.com/golang/protobuf/proto" |
| "github.com/golang/protobuf/ptypes/empty" |
| |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/status" |
| |
| "go.chromium.org/luci/gae/service/datastore" |
| |
| "go.chromium.org/luci/appengine/tq" |
| "go.chromium.org/luci/auth/identity" |
| "go.chromium.org/luci/common/clock" |
| "go.chromium.org/luci/common/data/stringset" |
| "go.chromium.org/luci/common/errors" |
| "go.chromium.org/luci/common/iotools" |
| "go.chromium.org/luci/common/logging" |
| "go.chromium.org/luci/common/proto/google" |
| "go.chromium.org/luci/common/retry/transient" |
| "go.chromium.org/luci/common/sync/parallel" |
| "go.chromium.org/luci/grpc/grpcutil" |
| "go.chromium.org/luci/server/auth" |
| "go.chromium.org/luci/server/router" |
| |
| api "go.chromium.org/luci/cipd/api/cipd/v1" |
| "go.chromium.org/luci/cipd/appengine/impl/cas" |
| "go.chromium.org/luci/cipd/appengine/impl/metadata" |
| "go.chromium.org/luci/cipd/appengine/impl/model" |
| "go.chromium.org/luci/cipd/appengine/impl/repo/processing" |
| "go.chromium.org/luci/cipd/appengine/impl/repo/tasks" |
| "go.chromium.org/luci/cipd/common" |
| ) |
| |
| // Public returns publicly exposed implementation of cipd.Repository service. |
| // |
| // It checks ACLs. |
| func Public(internalCAS cas.StorageServer, d *tq.Dispatcher) Server { |
| impl := &repoImpl{ |
| tq: d, |
| meta: metadata.GetStorage(), |
| cas: internalCAS, |
| } |
| impl.registerTasks() |
| impl.registerProcessor(&processing.ClientExtractor{CAS: internalCAS}) |
| return impl |
| } |
| |
| // Server is api.RepositoryServer that can also expose some non-pRPC routes. |
| type Server interface { |
| api.RepositoryServer |
| |
| // InstallHandlers installs non-pRPC HTTP handlers into the router. |
| // |
| // Assumes 'base' middleware chain does OAuth2 authentication already. |
| InstallHandlers(r *router.Router, base router.MiddlewareChain) |
| } |
| |
| // repoImpl implements api.RepositoryServer. |
| type repoImpl struct { |
| tq *tq.Dispatcher |
| |
| meta metadata.Storage // storage for package prefix metadata |
| cas cas.StorageServer // non-ACLed storage for instance package files |
| |
| procs []processing.Processor // in order of registerProcessor calls |
| procsMap map[string]processing.Processor // ID => processing.Processor |
| } |
| |
| // registerTasks adds tasks to the tq Dispatcher. |
| func (impl *repoImpl) registerTasks() { |
| // See queue.yaml for "run-processors" task queue definition. |
| impl.tq.RegisterTask(&tasks.RunProcessors{}, func(c context.Context, m proto.Message) error { |
| return impl.runProcessorsTask(c, m.(*tasks.RunProcessors)) |
| }, "run-processors", nil) |
| } |
| |
| // registerProcessor adds a new processor. |
| func (impl *repoImpl) registerProcessor(p processing.Processor) { |
| if impl.procsMap == nil { |
| impl.procsMap = map[string]processing.Processor{} |
| } |
| |
| id := p.ID() |
| if impl.procsMap[id] != nil { |
| panic(fmt.Sprintf("processor %q has already been registered", id)) |
| } |
| |
| impl.procs = append(impl.procs, p) |
| impl.procsMap[id] = p |
| } |
| |
| // packageReader opens a package instance for reading. |
| func (impl *repoImpl) packageReader(c context.Context, ref *api.ObjectRef) (*processing.PackageReader, error) { |
| // Get slow Google Storage based ReaderAt. |
| rawReader, err := impl.cas.GetReader(c, ref) |
| switch code := grpc.Code(err); { |
| case code == codes.NotFound: |
| return nil, errors.Annotate(err, "package instance is not in the storage").Err() |
| case code != codes.OK: |
| return nil, errors.Annotate(err, "failed to open the object for reading").Tag(transient.Tag).Err() |
| } |
| |
| // Read in 512 Kb chunks, keep 2 of them buffered. |
| pkg, err := processing.NewPackageReader( |
| iotools.NewBufferingReaderAt(rawReader, 512*1024, 2), |
| rawReader.Size()) |
| if err != nil { |
| return nil, errors.Annotate(err, "error when opening the package").Err() |
| } |
| return pkg, nil |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Prefix metadata RPC methods + related helpers including ACL checks. |
| |
| // GetPrefixMetadata implements the corresponding RPC method, see the proto doc. |
| func (impl *repoImpl) GetPrefixMetadata(c context.Context, r *api.PrefixRequest) (resp *api.PrefixMetadata, err error) { |
| // It is fine to implement this in terms of GetInheritedPrefixMetadata, since |
| // we need to fetch all inherited metadata anyway to check ACLs. |
| inherited, err := impl.GetInheritedPrefixMetadata(c, r) |
| if err != nil { |
| return nil, err |
| } |
| // Have the metadata for the requested prefix? It should be the last if so. |
| if m := inherited.PerPrefixMetadata; len(m) != 0 && m[len(m)-1].Prefix == r.Prefix { |
| return m[len(m)-1], nil |
| } |
| // Note that GetInheritedPrefixMetadata checked that the caller has permission |
| // to view the requested prefix (via some parent prefix ACL), so sincerely |
| // reply with NotFound. |
| return nil, noMetadataErr(r.Prefix) |
| } |
| |
| // GetInheritedPrefixMetadata implements the corresponding RPC method, see the |
| // proto doc. |
| // |
| // Note: it normalizes Prefix field inside the request. |
| func (impl *repoImpl) GetInheritedPrefixMetadata(c context.Context, r *api.PrefixRequest) (resp *api.InheritedPrefixMetadata, err error) { |
| defer func() { err = grpcutil.GRPCifyAndLogErr(c, err) }() |
| |
| r.Prefix, err = common.ValidatePackagePrefix(r.Prefix) |
| if err != nil { |
| return nil, status.Errorf(codes.InvalidArgument, "bad 'prefix' - %s", err) |
| } |
| |
| metas, err := impl.checkRole(c, r.Prefix, api.Role_OWNER) |
| if err != nil { |
| return nil, err |
| } |
| return &api.InheritedPrefixMetadata{PerPrefixMetadata: metas}, nil |
| } |
| |
| // UpdatePrefixMetadata implements the corresponding RPC method, see the proto doc. |
| func (impl *repoImpl) UpdatePrefixMetadata(c context.Context, r *api.PrefixMetadata) (resp *api.PrefixMetadata, err error) { |
| defer func() { err = grpcutil.GRPCifyAndLogErr(c, err) }() |
| |
| // Fill in server-assigned fields. |
| r.UpdateTime = google.NewTimestamp(clock.Now(c)) |
| r.UpdateUser = string(auth.CurrentIdentity(c)) |
| |
| // Normalize and validate format of the PrefixMetadata. |
| if err := common.NormalizePrefixMetadata(r); err != nil { |
| return nil, status.Errorf(codes.InvalidArgument, "bad prefix metadata - %s", err) |
| } |
| |
| // The root metadata is not modifiable through API. |
| if r.Prefix == "" { |
| return nil, status.Errorf(codes.InvalidArgument, "the root metadata is not modifiable") |
| } |
| |
| // Check ACLs. |
| if _, err := impl.checkRole(c, r.Prefix, api.Role_OWNER); err != nil { |
| return nil, err |
| } |
| |
| // Transactionally check the fingerprint and update the metadata. impl.meta |
| // will recalculate the new fingerprint. Note there's a small chance the |
| // caller no longer has OWNER role to modify the metadata inside the |
| // transaction. We ignore it. It happens when caller's permissions are revoked |
| // by someone else exactly during UpdatePrefixMetadata call. |
| return impl.meta.UpdateMetadata(c, r.Prefix, func(cur *api.PrefixMetadata) error { |
| if cur.Fingerprint != r.Fingerprint { |
| switch { |
| case cur.Fingerprint == "": |
| // The metadata was deleted while the caller was messing with it. |
| return noMetadataErr(r.Prefix) |
| case r.Fingerprint == "": |
| // Caller tries to make a new one, but we already have it. |
| return status.Errorf( |
| codes.AlreadyExists, "metadata for prefix %q already exists and has fingerprint %q, "+ |
| "use combination of GetPrefixMetadata and UpdatePrefixMetadata to "+ |
| "update it", r.Prefix, cur.Fingerprint) |
| default: |
| // The fingerprint has changed while the caller was messing with |
| // the metadata. |
| return status.Errorf( |
| codes.FailedPrecondition, "metadata for prefix %q was updated concurrently "+ |
| "(the fingerprint in the request %q doesn't match the current fingerprint %q), "+ |
| "fetch new metadata with GetPrefixMetadata and reapply your "+ |
| "changes", r.Prefix, r.Fingerprint, cur.Fingerprint) |
| } |
| } |
| prev := *cur |
| *cur = *r |
| return model.EmitMetadataEvents(c, &prev, cur) |
| }) |
| } |
| |
| // GetRolesInPrefix implements the corresponding RPC method, see the proto doc. |
| func (impl *repoImpl) GetRolesInPrefix(c context.Context, r *api.PrefixRequest) (resp *api.RolesInPrefixResponse, err error) { |
| defer func() { err = grpcutil.GRPCifyAndLogErr(c, err) }() |
| |
| r.Prefix, err = common.ValidatePackagePrefix(r.Prefix) |
| if err != nil { |
| return nil, status.Errorf(codes.InvalidArgument, "bad 'prefix' - %s", err) |
| } |
| |
| metas, err := impl.meta.GetMetadata(c, r.Prefix) |
| if err != nil { |
| return nil, err |
| } |
| |
| roles, err := rolesInPrefix(c, metas) |
| if err != nil { |
| return nil, err |
| } |
| |
| resp = &api.RolesInPrefixResponse{ |
| Roles: make([]*api.RolesInPrefixResponse_RoleInPrefix, len(roles)), |
| } |
| for i, r := range roles { |
| resp.Roles[i] = &api.RolesInPrefixResponse_RoleInPrefix{Role: r} |
| } |
| return resp, nil |
| } |
| |
| // checkRole checks where the caller has the given role in the given prefix or |
| // any of its parent prefixes. |
| // |
| // Understands role inheritance. See acl.go for more details. |
| // |
| // Returns grpc PermissionDenied error if the caller doesn't have the requested |
| // role. The error message depends on whether caller has READER role or not |
| // (readers see more details). |
| // |
| // Fetches and returns metadata of the prefix and all parent prefixes as a side |
| // effect. |
| func (impl *repoImpl) checkRole(c context.Context, prefix string, role api.Role) ([]*api.PrefixMetadata, error) { |
| metas, err := impl.meta.GetMetadata(c, prefix) |
| if err != nil { |
| return nil, err |
| } |
| |
| switch yes, err := hasRole(c, metas, role); { |
| case err != nil: |
| return nil, err |
| case yes: |
| return metas, nil |
| case role == api.Role_READER: // was checking for a reader, and caller is not |
| return nil, noAccessErr(c, prefix) |
| } |
| |
| // We end up here if role is something other than READER, and the caller |
| // doesn't have it. Maybe caller IS a reader, then we can give more concrete |
| // error message. |
| switch yes, err := hasRole(c, metas, api.Role_READER); { |
| case err != nil: |
| return nil, err |
| case yes: |
| return nil, status.Errorf( |
| codes.PermissionDenied, "%q has no required %s role in prefix %q", |
| auth.CurrentIdentity(c), role, prefix) |
| default: |
| return nil, noAccessErr(c, prefix) |
| } |
| } |
| |
| // noAccessErr produces a grpc error saying that the given prefix doesn't |
| // exist or the caller has no access to it. This is generic error message that |
| // should not give away prefix presence to non-readers. |
| func noAccessErr(c context.Context, prefix string) error { |
| var msg string |
| if ident := auth.CurrentIdentity(c); ident.Kind() == identity.Anonymous { |
| msg = "not visible to unauthenticated callers" |
| } else { |
| msg = fmt.Sprintf("%q is not allowed to see it", ident) |
| } |
| return status.Errorf(codes.PermissionDenied, "prefix %q doesn't exist or %s", prefix, msg) |
| } |
| |
| // noMetadataErr produces a grpc error saying that the given prefix doesn't have |
| // metadata attached. |
| func noMetadataErr(prefix string) error { |
| return status.Errorf(codes.NotFound, "prefix %q has no metadata", prefix) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Prefix listing. |
| |
| // ListPrefix implement the corresponding RPC method, see the proto doc. |
| func (impl *repoImpl) ListPrefix(c context.Context, r *api.ListPrefixRequest) (resp *api.ListPrefixResponse, err error) { |
| defer func() { err = grpcutil.GRPCifyAndLogErr(c, err) }() |
| |
| r.Prefix, err = common.ValidatePackagePrefix(r.Prefix) |
| if err != nil { |
| return nil, status.Errorf(codes.InvalidArgument, "bad 'prefix' - %s", err) |
| } |
| |
| // Discover prefixes the caller is allowed to see. Note that checking only |
| // r.Prefix ACL is not sufficient, since the caller may not see it, but still |
| // see some deeper prefix, thus we need to enumerate the metadata subtree. |
| var visibleRoots []string // sorted list of prefixes visible to the caller |
| err = impl.meta.VisitMetadata(c, r.Prefix, func(pfx string, md []*api.PrefixMetadata) (cont bool, err error) { |
| switch visible, err := hasRole(c, md, api.Role_READER); { |
| case err != nil: |
| return false, err |
| case visible: |
| // Found a visible root. Everything under 'pfx' is visible to the caller, |
| // no sense in recursing deeper into this subtree. |
| visibleRoots = append(visibleRoots, pfx) |
| return false, nil |
| default: |
| // Continue exploring this subtree until we find something visible there, |
| // if anything. |
| return true, nil |
| } |
| }) |
| if err != nil { |
| return nil, errors.Annotate(err, "failed to enumerate the metadata").Err() |
| } |
| |
| // One of 'visibleRoots' => its full sorted recursive listing. |
| perVisibleRoot := make(map[string][]string, len(visibleRoots)) |
| |
| // ListPackages used below lists only packages that are under the prefix. It |
| // is possible there are packages that match some of visibleRoots directly |
| // (have exact same name as some visible root). We check it here. Note that |
| // this applies only to subprefixes of r.Prefix: we don't want to return a |
| // package named r.Prefix in the result (if any). So if visibleRoots is |
| // [r.Prefix] itself, we skip this check. Note that if r.Prefix is in |
| // visibleRoots, then it is the only item there, by construction. |
| if len(visibleRoots) != 1 || visibleRoots[0] != r.Prefix { |
| rootPkgs, err := model.CheckPackages(c, visibleRoots, r.IncludeHidden) |
| if err != nil { |
| return nil, errors.Annotate(err, "failed to check presence of packages").Err() |
| } |
| for _, name := range rootPkgs { |
| perVisibleRoot[name] = []string{name} |
| } |
| } |
| |
| // Fetch the listing of each visible root in parallel. Note that there's no |
| // intersection between them due to the order of VisitMetadata enumeration. |
| err = parallel.WorkPool(8, func(tasks chan<- func() error) { |
| mu := sync.Mutex{} |
| for _, pfx := range visibleRoots { |
| pfx := pfx |
| tasks <- func() error { |
| // TODO(vadimsh): This is inefficient for non-recursive listings. |
| // Unfortunately we have to do the recursive listing to discover |
| // subprefixes. E.g. when listing 'a', we need to return 'a/b' as a |
| // subprefix if there's a package 'a/b/c', so to discover 'a/b/c' we |
| // need full recursive listing of 'a'. |
| // |
| // One possible improvement is: |
| // 1. Introduce a new parallel entity group just for browsing: |
| // Entry { |
| // Name: <full path to the package or the prefix>, |
| // Children: [<names of all child prefixes and packages>], |
| // HiddenChildren: [<names of hidden prefixes and packages>], |
| // Hidden: <true|false>, // also true if all children are hidden |
| // } |
| // 2. 'Entry' basically represents a browsable item (either a package |
| // or a "directory" aka prefix). |
| // 3. Update this entity group when registering, deleting, hiding or |
| // showing packages. Will require O(<number of path components>) |
| // entity writes in a single transaction, which is acceptable. |
| // 4. In non-recursive listing just look at Children of corresponding |
| // entity. |
| listing, err := model.ListPackages(c, pfx, r.IncludeHidden) |
| if err == nil { |
| mu.Lock() |
| perVisibleRoot[pfx] = append(perVisibleRoot[pfx], listing...) |
| mu.Unlock() |
| } |
| return err |
| } |
| } |
| }) |
| if err != nil { |
| return nil, errors.Annotate(err, "failed to list a prefix").Err() |
| } |
| |
| // searchRoot is lexicographical prefix of packages we are concerned about. |
| searchRoot := r.Prefix |
| if searchRoot != "" { |
| searchRoot += "/" |
| } |
| |
| // Visit all discovered packages (in sorted order). |
| resp = &api.ListPrefixResponse{} |
| dirs := stringset.New(0) |
| for _, pfx := range visibleRoots { |
| for _, pkg := range perVisibleRoot[pfx] { |
| // By construction, ALL packages here must be within the search root. |
| if !strings.HasPrefix(pkg, searchRoot) { |
| panic(fmt.Sprintf( |
| "unexpected package %q in results when listing %q, visible roots are %q", |
| pkg, searchRoot, visibleRoots)) |
| } |
| |
| // E.g. "a/b/c", relative to searchRoot. |
| rel := strings.TrimPrefix(pkg, searchRoot) |
| |
| if r.Recursive { |
| // If listing recursively, add everything to the result. |
| resp.Packages = append(resp.Packages, pkg) |
| // For rel "a/b/c", add [".../a", ".../a/b"] to the directories set. |
| if chunks := strings.Split(rel, "/"); len(chunks) > 1 { |
| for i := 1; i < len(chunks); i++ { |
| dirs.Add(searchRoot + strings.Join(chunks[:i], "/")) |
| } |
| } |
| } else { |
| // Otherwise add only packages that directly reside under searchRoot, |
| // and pick only first path component of directories, since we don't |
| // care about what's deeper. |
| if idx := strings.IndexRune(rel, '/'); idx != -1 { |
| // 'rel' has / => it is a directory, add its first path component. |
| dirs.Add(searchRoot + rel[:idx]) |
| } else { |
| // 'rel' has no / inside => it's a package directly under searchRoot. |
| resp.Packages = append(resp.Packages, pkg) |
| } |
| } |
| } |
| } |
| |
| // Note: resp.Packages are already sorted by construction. |
| resp.Prefixes = dirs.ToSlice() |
| sort.Strings(resp.Prefixes) |
| |
| return resp, nil |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Hide/unhide package. |
| |
| // HidePackage implements the corresponding RPC method, see the proto doc. |
| func (impl *repoImpl) HidePackage(c context.Context, r *api.PackageRequest) (*empty.Empty, error) { |
| return impl.setPackageHidden(c, r, model.Hidden) |
| } |
| |
| // UnhidePackage implements the corresponding RPC method, see the proto doc. |
| func (impl *repoImpl) UnhidePackage(c context.Context, r *api.PackageRequest) (*empty.Empty, error) { |
| return impl.setPackageHidden(c, r, model.Visible) |
| } |
| |
| // setPackageHidden is common implementation of HidePackage and UnhidePackage. |
| func (impl *repoImpl) setPackageHidden(c context.Context, r *api.PackageRequest, hidden bool) (resp *empty.Empty, err error) { |
| defer func() { err = grpcutil.GRPCifyAndLogErr(c, err) }() |
| |
| if err := common.ValidatePackageName(r.Package); err != nil { |
| return nil, status.Errorf(codes.InvalidArgument, "bad 'package' - %s", err) |
| } |
| if _, err := impl.checkRole(c, r.Package, api.Role_OWNER); err != nil { |
| return nil, err |
| } |
| |
| switch err := model.SetPackageHidden(c, r.Package, hidden); { |
| case err == datastore.ErrNoSuchEntity: |
| return nil, status.Errorf(codes.NotFound, "no such package") |
| case err != nil: |
| return nil, errors.Annotate(err, "failed to update the package").Err() |
| } |
| return &empty.Empty{}, nil |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Package deletion. |
| |
| // DeletePackage implements the corresponding RPC method, see the proto doc. |
| func (impl *repoImpl) DeletePackage(c context.Context, r *api.PackageRequest) (resp *empty.Empty, err error) { |
| defer func() { err = grpcutil.GRPCifyAndLogErr(c, err) }() |
| |
| if err := common.ValidatePackageName(r.Package); err != nil { |
| return nil, status.Errorf(codes.InvalidArgument, "bad 'package' - %s", err) |
| } |
| |
| // DeletePackage RPC, due to its potential severity, is limited only to |
| // administrators, i.e. OWNERs of the repository root (prefix ""). Before |
| // checking this, make sure the caller can otherwise modify the package at |
| // all, to give a nicer error message if they can't. |
| if _, err := impl.checkRole(c, r.Package, api.Role_OWNER); err != nil { |
| return nil, err |
| } |
| if _, err := impl.checkRole(c, "", api.Role_OWNER); err != nil { |
| if grpc.Code(err) == codes.PermissionDenied { |
| return nil, status.Errorf(codes.PermissionDenied, |
| "package deletion is allowed only to service administrators") |
| } |
| return nil, err |
| } |
| |
| return &empty.Empty{}, model.DeletePackage(c, r.Package) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Package instance registration and post-registration processing. |
| |
| // RegisterInstance implements the corresponding RPC method, see the proto doc. |
| func (impl *repoImpl) RegisterInstance(c context.Context, r *api.Instance) (resp *api.RegisterInstanceResponse, err error) { |
| defer func() { err = grpcutil.GRPCifyAndLogErr(c, err) }() |
| |
| // Validate the request format. |
| if err := common.ValidatePackageName(r.Package); err != nil { |
| return nil, status.Errorf(codes.InvalidArgument, "bad 'package' - %s", err) |
| } |
| if err := common.ValidateObjectRef(r.Instance, common.KnownHash); err != nil { |
| return nil, status.Errorf(codes.InvalidArgument, "bad 'instance' - %s", err) |
| } |
| |
| // Check ACLs. |
| if _, err := impl.checkRole(c, r.Package, api.Role_WRITER); err != nil { |
| return nil, err |
| } |
| |
| // Is such instance already registered? |
| instance := (&model.Instance{}).FromProto(c, r) |
| switch err := datastore.Get(c, instance); { |
| case err == nil: |
| return &api.RegisterInstanceResponse{ |
| Status: api.RegistrationStatus_ALREADY_REGISTERED, |
| Instance: instance.Proto(), |
| }, nil |
| case err != datastore.ErrNoSuchEntity: |
| return nil, errors.Annotate(err, "failed to fetch the instance entity").Err() |
| } |
| |
| // Attempt to start a new upload session. This will fail with ALREADY_EXISTS |
| // if such object is already in the storage. This is expected (it means the |
| // client has uploaded the object already and we should just register the |
| // instance right away). |
| uploadOp, err := impl.cas.BeginUpload(c, &api.BeginUploadRequest{ |
| Object: r.Instance, |
| }) |
| switch code := grpc.Code(err); { |
| case code == codes.AlreadyExists: |
| break // the object is already there |
| case code == codes.OK: |
| // The object is not in the storage and we have just started the upload. Let |
| // the client finish it. |
| return &api.RegisterInstanceResponse{ |
| Status: api.RegistrationStatus_NOT_UPLOADED, |
| UploadOp: uploadOp, |
| }, nil |
| default: |
| return nil, errors.Annotate(err, "failed to initiate an upload op (code %s)", code).Err() |
| } |
| |
| // Warn about registering deprecated SHA1 packages. Eventually this will be |
| // forbidden completely. |
| if r.Instance.HashAlgo == api.HashAlgo_SHA1 { |
| logging.Warningf(c, "Deprecated SHA1 instance: %s (%s) from %s", |
| r.Package, common.ObjectRefToInstanceID(r.Instance), auth.CurrentIdentity(c)) |
| } |
| |
| // The instance is already in the CAS storage. Register it in the repository. |
| instance = (&model.Instance{ |
| RegisteredBy: string(auth.CurrentIdentity(c)), |
| RegisteredTs: clock.Now(c).UTC(), |
| }).FromProto(c, r) |
| registered, instance, err := model.RegisterInstance(c, instance, impl.onInstanceRegistration) |
| if err != nil { |
| return nil, errors.Annotate(err, "failed to register the instance").Err() |
| } |
| |
| resp = &api.RegisterInstanceResponse{Instance: instance.Proto()} |
| if registered { |
| resp.Status = api.RegistrationStatus_REGISTERED |
| } else { |
| resp.Status = api.RegistrationStatus_ALREADY_REGISTERED |
| } |
| return |
| } |
| |
| // onInstanceRegistration is called in a txn when registering an instance. |
| func (impl *repoImpl) onInstanceRegistration(c context.Context, inst *model.Instance) error { |
| // Collect IDs of applicable processors. |
| var procs []string |
| for _, p := range impl.procs { |
| if p.Applicable(inst) { |
| procs = append(procs, p.ID()) |
| } |
| } |
| if len(procs) == 0 { |
| return nil |
| } |
| |
| // Mark the instance as being processed now. |
| inst.ProcessorsPending = procs |
| |
| // Launch the TQ task that does the processing (see runProcessorsTask below). |
| return impl.tq.AddTask(c, &tq.Task{ |
| Payload: &tasks.RunProcessors{Instance: inst.Proto()}, |
| Title: inst.InstanceID, |
| }) |
| } |
| |
| // runProcessorsTask executes a post-upload processing step. |
| // |
| // Returning a transient error here causes the task queue service to retry the |
| // task. |
| func (impl *repoImpl) runProcessorsTask(c context.Context, t *tasks.RunProcessors) error { |
| // Fetch the instance to see what processors are still pending. |
| inst := (&model.Instance{}).FromProto(c, t.Instance) |
| switch err := datastore.Get(c, inst); { |
| case err == datastore.ErrNoSuchEntity: |
| return fmt.Errorf("instance %q is unexpectedly gone from the datastore", inst.InstanceID) |
| case err != nil: |
| return transient.Tag.Apply(err) |
| } |
| |
| results := map[string]processing.Result{} |
| |
| // Grab processors we haven't ran yet. |
| var run []processing.Processor |
| for _, id := range inst.ProcessorsPending { |
| if proc := impl.procsMap[id]; proc != nil { |
| run = append(run, proc) |
| } else { |
| logging.Errorf(c, "Skipping unknown processor %q", id) |
| results[id] = processing.Result{Err: fmt.Errorf("unknown processor %q", id)} |
| } |
| } |
| |
| // Exit early if there's nothing to run. |
| if len(run) == 0 { |
| return impl.updateProcessors(c, t.Instance, results) |
| } |
| |
| // Open the package for reading. |
| pkg, err := impl.packageReader(c, t.Instance.Instance) |
| switch { |
| case transient.Tag.In(err): |
| return err // retry the whole thing |
| case err != nil: |
| // The package is fatally broken, give up. |
| logging.WithError(err).Errorf(c, "The package can't be opened, failing all processors") |
| for _, proc := range run { |
| results[proc.ID()] = processing.Result{Err: err} |
| } |
| return impl.updateProcessors(c, t.Instance, results) |
| } |
| |
| // Run the processors sequentially, since PackageReader is not very friendly |
| // to concurrent access. |
| var transientErrs errors.MultiError |
| for _, proc := range run { |
| logging.Infof(c, "Running processor %q", proc.ID()) |
| res, err := proc.Run(c, inst, pkg) |
| if err != nil { |
| logging.WithError(err).Errorf(c, "Processor %q failed transiently", proc.ID()) |
| transientErrs = append(transientErrs, err) |
| } else { |
| if res.Err != nil { |
| logging.WithError(res.Err).Errorf(c, "Processor %q failed fatally", proc.ID()) |
| } |
| results[proc.ID()] = res |
| } |
| } |
| |
| // Store what we've got, even if some processor may have failed to run. |
| updErr := impl.updateProcessors(c, t.Instance, results) |
| |
| // Prefer errors from processors over 'updErr' if both happen. Processor |
| // errors are more interesting. |
| switch { |
| case len(transientErrs) != 0: |
| return transient.Tag.Apply(transientErrs) |
| case updErr != nil: |
| return updErr |
| } |
| return nil |
| } |
| |
| // updateProcessors transactionally creates ProcessingResult entities and |
| // updates Instance.Processors* fields. |
| func (impl *repoImpl) updateProcessors(c context.Context, inst *api.Instance, results map[string]processing.Result) error { |
| if len(results) == 0 { |
| return nil |
| } |
| |
| instEnt := (&model.Instance{}).FromProto(c, inst) |
| instKey := datastore.KeyForObj(c, instEnt) |
| |
| now := clock.Now(c).UTC() |
| |
| // Create ProcessingResult outside the transaction, since this involves slow |
| // zlib compression in WriteResult. |
| procResults := make(map[string]*model.ProcessingResult, len(results)) |
| for procID, res := range results { |
| procRes := &model.ProcessingResult{ |
| ProcID: procID, |
| Instance: instKey, |
| CreatedTs: now, |
| Success: res.Err == nil, |
| } |
| procResults[procID] = procRes |
| |
| // If the result is not serializable, store the serialization error instead. |
| err := res.Err |
| if err == nil { |
| if err = procRes.WriteResult(res.Result); err != nil { |
| err = errors.Annotate(err, "failed to write the processing result").Err() |
| } |
| } |
| if err != nil { |
| procRes.Success = false |
| procRes.Error = err.Error() |
| } |
| } |
| |
| // Mutate Instance entity, storing results that haven't been stored yet. |
| return model.Txn(c, "updateProcessors", func(c context.Context) error { |
| switch err := datastore.Get(c, instEnt); { |
| case err == datastore.ErrNoSuchEntity: |
| return fmt.Errorf("the entity is unexpectedly gone") |
| case err != nil: |
| return errors.Annotate(err, "failed to fetch the entity").Tag(transient.Tag).Err() |
| } |
| |
| var toPut []interface{} |
| |
| // Go over what's is still pending, and move it to either Success or Failure |
| // group if it is done. |
| stillPending := instEnt.ProcessorsPending[:0] |
| for _, procID := range instEnt.ProcessorsPending { |
| res, done := procResults[procID] |
| if !done { |
| stillPending = append(stillPending, procID) |
| continue |
| } |
| toPut = append(toPut, res) |
| if res.Success { |
| instEnt.ProcessorsSuccess = append(instEnt.ProcessorsSuccess, procID) |
| } else { |
| instEnt.ProcessorsFailure = append(instEnt.ProcessorsFailure, procID) |
| } |
| } |
| instEnt.ProcessorsPending = stillPending |
| |
| // Store all the changes (if any). |
| if len(toPut) == 0 { |
| return nil |
| } |
| return transient.Tag.Apply(datastore.Put(c, toPut, instEnt)) |
| }) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Instance listing and querying. |
| |
| type paginatedQueryOpts struct { |
| Package string |
| PageToken string |
| PageSize int32 |
| Validator func() error // validates callback-specific fields in the request |
| Handler func(cur datastore.Cursor, pageSize int32) ([]*model.Instance, datastore.Cursor, error) |
| } |
| |
| // paginatedQuery is a common part of ListInstances and SearchInstances. |
| func (impl *repoImpl) paginatedQuery(c context.Context, opts paginatedQueryOpts) (out []*api.Instance, nextTok string, err error) { |
| // Validate the request, decode the cursor. |
| if err := common.ValidatePackageName(opts.Package); err != nil { |
| return nil, "", status.Errorf(codes.InvalidArgument, "bad 'package' - %s", err) |
| } |
| |
| switch { |
| case opts.PageSize < 0: |
| return nil, "", status.Errorf(codes.InvalidArgument, "bad 'page_size' %d - it should be non-negative", opts.PageSize) |
| case opts.PageSize == 0: |
| opts.PageSize = 100 |
| } |
| |
| var cursor datastore.Cursor |
| if opts.PageToken != "" { |
| if cursor, err = datastore.DecodeCursor(c, opts.PageToken); err != nil { |
| return nil, "", status.Errorf(codes.InvalidArgument, "bad 'page_token' - %s", err) |
| } |
| } |
| |
| if opts.Validator != nil { |
| if err := opts.Validator(); err != nil { |
| return nil, "", err |
| } |
| } |
| |
| // Check ACLs. |
| if _, err := impl.checkRole(c, opts.Package, api.Role_READER); err != nil { |
| return nil, "", err |
| } |
| |
| // Check that the package is registered. |
| if err := model.CheckPackageExists(c, opts.Package); err != nil { |
| return nil, "", err |
| } |
| |
| // Do the actual listing. |
| inst, cursor, err := opts.Handler(cursor, opts.PageSize) |
| if err != nil { |
| return nil, "", errors.Annotate(err, "failed to query instances").Err() |
| } |
| |
| // Convert results to proto. |
| out = make([]*api.Instance, len(inst)) |
| for i, ent := range inst { |
| out[i] = ent.Proto() |
| } |
| if cursor != nil { |
| nextTok = cursor.String() |
| } |
| return |
| } |
| |
| // ListInstances implements the corresponding RPC method, see the proto doc. |
| func (impl *repoImpl) ListInstances(c context.Context, r *api.ListInstancesRequest) (resp *api.ListInstancesResponse, err error) { |
| defer func() { err = grpcutil.GRPCifyAndLogErr(c, err) }() |
| |
| result, nextPage, err := impl.paginatedQuery(c, paginatedQueryOpts{ |
| Package: r.Package, |
| PageSize: r.PageSize, |
| PageToken: r.PageToken, |
| Handler: func(cur datastore.Cursor, pageSize int32) ([]*model.Instance, datastore.Cursor, error) { |
| return model.ListInstances(c, r.Package, pageSize, cur) |
| }, |
| }) |
| if err != nil { |
| return nil, err |
| } |
| |
| return &api.ListInstancesResponse{ |
| Instances: result, |
| NextPageToken: nextPage, |
| }, nil |
| } |
| |
| // SearchInstances implements the corresponding RPC method, see the proto doc. |
| func (impl *repoImpl) SearchInstances(c context.Context, r *api.SearchInstancesRequest) (resp *api.SearchInstancesResponse, err error) { |
| defer func() { err = grpcutil.GRPCifyAndLogErr(c, err) }() |
| |
| who := auth.CurrentIdentity(c) |
| for _, t := range r.Tags { |
| logging.Infof(c, "SearchInstances: %s %s %s:%s", who, r.Package, t.Key, t.Value) |
| } |
| |
| result, nextPage, err := impl.paginatedQuery(c, paginatedQueryOpts{ |
| Package: r.Package, |
| PageSize: r.PageSize, |
| PageToken: r.PageToken, |
| Validator: func() error { return validateTagList(r.Tags) }, |
| Handler: func(cur datastore.Cursor, pageSize int32) ([]*model.Instance, datastore.Cursor, error) { |
| return model.SearchInstances(c, r.Package, r.Tags, pageSize, cur) |
| }, |
| }) |
| if err != nil { |
| return nil, err |
| } |
| |
| return &api.SearchInstancesResponse{ |
| Instances: result, |
| NextPageToken: nextPage, |
| }, nil |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Refs support. |
| |
| // CreateRef implements the corresponding RPC method, see the proto doc. |
| func (impl *repoImpl) CreateRef(c context.Context, r *api.Ref) (resp *empty.Empty, err error) { |
| defer func() { err = grpcutil.GRPCifyAndLogErr(c, err) }() |
| |
| // Validate the request. |
| if err := common.ValidatePackageRef(r.Name); err != nil { |
| return nil, status.Errorf(codes.InvalidArgument, "bad 'name' - %s", err) |
| } |
| if err := common.ValidatePackageName(r.Package); err != nil { |
| return nil, status.Errorf(codes.InvalidArgument, "bad 'package' - %s", err) |
| } |
| if err := common.ValidateObjectRef(r.Instance, common.KnownHash); err != nil { |
| return nil, status.Errorf(codes.InvalidArgument, "bad 'instance' - %s", err) |
| } |
| |
| // Check ACLs. |
| if _, err := impl.checkRole(c, r.Package, api.Role_WRITER); err != nil { |
| return nil, err |
| } |
| |
| // Actually create or move the ref. This will also transactionally check the |
| // instance exists and it has passed the processing successfully. |
| inst := &model.Instance{ |
| InstanceID: common.ObjectRefToInstanceID(r.Instance), |
| Package: model.PackageKey(c, r.Package), |
| } |
| if err := model.SetRef(c, r.Name, inst); err != nil { |
| return nil, err |
| } |
| return &empty.Empty{}, nil |
| } |
| |
| // DeleteRef implements the corresponding RPC method, see the proto doc. |
| func (impl *repoImpl) DeleteRef(c context.Context, r *api.DeleteRefRequest) (resp *empty.Empty, err error) { |
| defer func() { err = grpcutil.GRPCifyAndLogErr(c, err) }() |
| |
| // Validate the request. |
| if err := common.ValidatePackageRef(r.Name); err != nil { |
| return nil, status.Errorf(codes.InvalidArgument, "bad 'name' - %s", err) |
| } |
| if err := common.ValidatePackageName(r.Package); err != nil { |
| return nil, status.Errorf(codes.InvalidArgument, "bad 'package' - %s", err) |
| } |
| |
| // Check ACLs. |
| if _, err := impl.checkRole(c, r.Package, api.Role_WRITER); err != nil { |
| return nil, err |
| } |
| |
| // Verify the package actually exists, per DeleteRef contract. |
| if err := model.CheckPackageExists(c, r.Package); err != nil { |
| return nil, err |
| } |
| |
| // Actually delete the ref. |
| if err := model.DeleteRef(c, r.Package, r.Name); err != nil { |
| return nil, err |
| } |
| return &empty.Empty{}, nil |
| } |
| |
| // ListRefs implements the corresponding RPC method, see the proto doc. |
| func (impl *repoImpl) ListRefs(c context.Context, r *api.ListRefsRequest) (resp *api.ListRefsResponse, err error) { |
| defer func() { err = grpcutil.GRPCifyAndLogErr(c, err) }() |
| |
| // Validate the request. |
| if err := common.ValidatePackageName(r.Package); err != nil { |
| return nil, status.Errorf(codes.InvalidArgument, "bad 'package' - %s", err) |
| } |
| |
| // Check ACLs. |
| if _, err := impl.checkRole(c, r.Package, api.Role_READER); err != nil { |
| return nil, err |
| } |
| |
| // Verify the package actually exists, per ListPackageRefs contract. |
| if err := model.CheckPackageExists(c, r.Package); err != nil { |
| return nil, err |
| } |
| |
| // Actually list refs. |
| refs, err := model.ListPackageRefs(c, r.Package) |
| if err != nil { |
| return nil, errors.Annotate(err, "failed to list refs").Err() |
| } |
| resp = &api.ListRefsResponse{Refs: make([]*api.Ref, len(refs))} |
| for i, ref := range refs { |
| resp.Refs[i] = ref.Proto() |
| } |
| return resp, nil |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Tags support. |
| |
| func validateTagList(tags []*api.Tag) error { |
| if len(tags) == 0 { |
| return status.Errorf(codes.InvalidArgument, "bad 'tags' - cannot be empty") |
| } |
| for _, t := range tags { |
| kv := common.JoinInstanceTag(t) |
| if err := common.ValidateInstanceTag(kv); err != nil { |
| return status.Errorf(codes.InvalidArgument, "bad tag in 'tags' - %s", err) |
| } |
| } |
| return nil |
| } |
| |
| func validateMultiTagReq(pkg string, inst *api.ObjectRef, tags []*api.Tag) error { |
| if err := common.ValidatePackageName(pkg); err != nil { |
| return status.Errorf(codes.InvalidArgument, "bad 'package' - %s", err) |
| } |
| if err := common.ValidateObjectRef(inst, common.KnownHash); err != nil { |
| return status.Errorf(codes.InvalidArgument, "bad 'instance' - %s", err) |
| } |
| return validateTagList(tags) |
| } |
| |
| // AttachTags implements the corresponding RPC method, see the proto doc. |
| func (impl *repoImpl) AttachTags(c context.Context, r *api.AttachTagsRequest) (resp *empty.Empty, err error) { |
| defer func() { err = grpcutil.GRPCifyAndLogErr(c, err) }() |
| |
| // Validate the request. |
| if err := validateMultiTagReq(r.Package, r.Instance, r.Tags); err != nil { |
| return nil, err |
| } |
| |
| // Check ACLs. |
| if _, err := impl.checkRole(c, r.Package, api.Role_WRITER); err != nil { |
| return nil, err |
| } |
| |
| // Actually attach the tags. This will also transactionally check the instance |
| // exists and it has passed the processing successfully. |
| inst := &model.Instance{ |
| InstanceID: common.ObjectRefToInstanceID(r.Instance), |
| Package: model.PackageKey(c, r.Package), |
| } |
| if err := model.AttachTags(c, inst, r.Tags); err != nil { |
| return nil, err |
| } |
| return &empty.Empty{}, nil |
| } |
| |
| // DetachTags implements the corresponding RPC method, see the proto doc. |
| func (impl *repoImpl) DetachTags(c context.Context, r *api.DetachTagsRequest) (resp *empty.Empty, err error) { |
| defer func() { err = grpcutil.GRPCifyAndLogErr(c, err) }() |
| |
| // Validate the request. |
| if err := validateMultiTagReq(r.Package, r.Instance, r.Tags); err != nil { |
| return nil, err |
| } |
| |
| // Check ACLs. Note this is scoped to OWNERS, see the proto doc. |
| if _, err := impl.checkRole(c, r.Package, api.Role_OWNER); err != nil { |
| return nil, err |
| } |
| |
| // Verify the instance exists, per DetachTags contract. |
| inst := &model.Instance{ |
| InstanceID: common.ObjectRefToInstanceID(r.Instance), |
| Package: model.PackageKey(c, r.Package), |
| } |
| if err := model.CheckInstanceExists(c, inst); err != nil { |
| return nil, err |
| } |
| |
| // Actually detach the tags. |
| if err := model.DetachTags(c, inst, r.Tags); err != nil { |
| return nil, err |
| } |
| return &empty.Empty{}, nil |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Version resolution and instance info fetching. |
| |
| // ResolveVersion implements the corresponding RPC method, see the proto doc. |
| func (impl *repoImpl) ResolveVersion(c context.Context, r *api.ResolveVersionRequest) (resp *api.Instance, err error) { |
| defer func() { err = grpcutil.GRPCifyAndLogErr(c, err) }() |
| |
| // Validate the request. |
| if err := common.ValidatePackageName(r.Package); err != nil { |
| return nil, status.Errorf(codes.InvalidArgument, "bad 'package' - %s", err) |
| } |
| if err := common.ValidateInstanceVersion(r.Version); err != nil { |
| return nil, status.Errorf(codes.InvalidArgument, "bad 'version' - %s", err) |
| } |
| |
| // Check ACLs. |
| if _, err := impl.checkRole(c, r.Package, api.Role_READER); err != nil { |
| return nil, err |
| } |
| |
| // Actually resolve the version. This will return an appropriately grpc-tagged |
| // error. |
| inst, err := model.ResolveVersion(c, r.Package, r.Version) |
| if err != nil { |
| return nil, err |
| } |
| return inst.Proto(), nil |
| } |
| |
| // GetInstanceURL implements the corresponding RPC method, see the proto doc. |
| func (impl *repoImpl) GetInstanceURL(c context.Context, r *api.GetInstanceURLRequest) (resp *api.ObjectURL, err error) { |
| defer func() { err = grpcutil.GRPCifyAndLogErr(c, err) }() |
| |
| // Validate the request. |
| if err := common.ValidatePackageName(r.Package); err != nil { |
| return nil, status.Errorf(codes.InvalidArgument, "bad 'package' - %s", err) |
| } |
| if err := common.ValidateObjectRef(r.Instance, common.KnownHash); err != nil { |
| return nil, status.Errorf(codes.InvalidArgument, "bad 'instance' - %s", err) |
| } |
| |
| // Check ACLs. |
| if _, err := impl.checkRole(c, r.Package, api.Role_READER); err != nil { |
| return nil, err |
| } |
| |
| // Make sure this instance actually exists (without this check the caller |
| // would be able to "probe" CAS namespace unrestricted). |
| inst := (&model.Instance{}).FromProto(c, &api.Instance{ |
| Package: r.Package, |
| Instance: r.Instance, |
| }) |
| if err := model.CheckInstanceExists(c, inst); err != nil { |
| return nil, err |
| } |
| |
| // Ask CAS generate an URL for us. Note that CAS does caching internally. |
| return impl.cas.GetObjectURL(c, &api.GetObjectURLRequest{ |
| Object: r.Instance, |
| }) |
| } |
| |
| // DescribeInstance implements the corresponding RPC method, see the proto doc. |
| func (impl *repoImpl) DescribeInstance(c context.Context, r *api.DescribeInstanceRequest) (resp *api.DescribeInstanceResponse, err error) { |
| defer func() { err = grpcutil.GRPCifyAndLogErr(c, err) }() |
| |
| // Validate the request. |
| if err := common.ValidatePackageName(r.Package); err != nil { |
| return nil, status.Errorf(codes.InvalidArgument, "bad 'package' - %s", err) |
| } |
| if err := common.ValidateObjectRef(r.Instance, common.KnownHash); err != nil { |
| return nil, status.Errorf(codes.InvalidArgument, "bad 'instance' - %s", err) |
| } |
| |
| // Check ACLs. |
| if _, err := impl.checkRole(c, r.Package, api.Role_READER); err != nil { |
| return nil, err |
| } |
| |
| // Make sure this instance exists and fetch basic details about it. |
| inst := (&model.Instance{}).FromProto(c, &api.Instance{ |
| Package: r.Package, |
| Instance: r.Instance, |
| }) |
| if err := model.CheckInstanceExists(c, inst); err != nil { |
| return nil, err |
| } |
| |
| // Fetch the rest based on what the client wants. |
| var refs []*model.Ref |
| var tags []*model.Tag |
| var proc []*api.Processor |
| err = parallel.FanOutIn(func(tasks chan<- func() error) { |
| if r.DescribeRefs { |
| tasks <- func() error { |
| var err error |
| refs, err = model.ListInstanceRefs(c, inst) |
| return errors.Annotate(err, "failed to fetch refs").Err() |
| } |
| } |
| if r.DescribeTags { |
| tasks <- func() error { |
| var err error |
| tags, err = model.ListInstanceTags(c, inst) |
| return errors.Annotate(err, "failed to fetch tags").Err() |
| } |
| } |
| if r.DescribeProcessors { |
| tasks <- func() error { |
| var err error |
| proc, err = model.FetchProcessors(c, inst) |
| return errors.Annotate(err, "failed to fetch processors").Err() |
| } |
| } |
| }) |
| if err != nil { |
| return nil, err // note: this is always Internal error |
| } |
| |
| // Assemble the resulting proto. Apparently nil and empty slices are treated |
| // differently by jsonpb, so make sure to keep empty fields as nils. |
| resp = &api.DescribeInstanceResponse{Instance: inst.Proto()} |
| if len(refs) != 0 { |
| resp.Refs = make([]*api.Ref, len(refs)) |
| for i, r := range refs { |
| resp.Refs[i] = r.Proto() |
| } |
| } |
| if len(tags) != 0 { |
| resp.Tags = make([]*api.Tag, len(tags)) |
| for i, t := range tags { |
| resp.Tags[i] = t.Proto() |
| } |
| } |
| if len(proc) != 0 { |
| resp.Processors = proc |
| } |
| return resp, nil |
| } |
| |
| // DescribeClient implements the corresponding RPC method, see the proto doc. |
| func (impl *repoImpl) DescribeClient(c context.Context, r *api.DescribeClientRequest) (resp *api.DescribeClientResponse, err error) { |
| defer func() { err = grpcutil.GRPCifyAndLogErr(c, err) }() |
| |
| // Validate the request. |
| if err := common.ValidatePackageName(r.Package); err != nil { |
| return nil, status.Errorf(codes.InvalidArgument, "bad 'package' - %s", err) |
| } |
| if !processing.IsClientPackage(r.Package) { |
| return nil, status.Errorf(codes.InvalidArgument, "bad 'package' - not a CIPD client package") |
| } |
| if err := common.ValidateObjectRef(r.Instance, common.KnownHash); err != nil { |
| return nil, status.Errorf(codes.InvalidArgument, "bad 'instance' - %s", err) |
| } |
| |
| // Check ACLs. |
| if _, err := impl.checkRole(c, r.Package, api.Role_READER); err != nil { |
| return nil, err |
| } |
| |
| // Make sure this instance exists, has all processors finished and fetch |
| // basic details about it. |
| inst := (&model.Instance{}).FromProto(c, &api.Instance{ |
| Package: r.Package, |
| Instance: r.Instance, |
| }) |
| if err := model.CheckInstanceReady(c, inst); err != nil { |
| return nil, err |
| } |
| |
| // Grab the location of the extracted CIPD client from the post-processor. |
| // This must succeed, since CheckInstanceReady above verified processors have |
| // finished. Thus treat any error here as internal, as it will require an |
| // investigation. |
| proc, err := processing.GetClientExtractorResult(c, inst.Proto()) |
| if err != nil { |
| return nil, errors.Annotate(err, "failed to get client extractor results").Tag(grpcutil.InternalTag).Err() |
| } |
| ref, err := proc.ToObjectRef() |
| if err != nil { |
| return nil, errors.Annotate(err, "malformed or unrecognized ref in the client extractor results").Tag(grpcutil.InternalTag).Err() |
| } |
| |
| // refAliases (and SHA1 in particular, as hash supported by oldest code) is |
| // required to allow older clients to self-update to a newer client. See the |
| // doc for DescribeClientResponse proto message. |
| refAliases := proc.ObjectRefAliases() |
| sha1 := "" |
| for _, ref := range refAliases { |
| if ref.HashAlgo == api.HashAlgo_SHA1 { |
| sha1 = ref.HexDigest |
| break |
| } |
| } |
| if sha1 == "" { |
| return nil, errors.Reason("malformed client extraction results, missing SHA1 digest").Tag(grpcutil.InternalTag).Err() |
| } |
| |
| // Grab the signed URL of the client binary. |
| signedURL, err := impl.cas.GetObjectURL(c, &api.GetObjectURLRequest{ |
| Object: ref, |
| DownloadFilename: processing.GetClientBinaryName(r.Package), // e.g. 'cipd.exe' |
| }) |
| if err != nil { |
| return nil, errors.Annotate(err, "failed to get signed URL to the client binary").Tag(grpcutil.InternalTag).Err() |
| } |
| |
| return &api.DescribeClientResponse{ |
| Instance: inst.Proto(), |
| ClientRef: ref, |
| ClientBinary: signedURL, |
| ClientSize: proc.ClientBinary.Size, |
| LegacySha1: sha1, |
| ClientRefAliases: refAliases, |
| }, nil |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Non-pRPC handlers for the client bootstrap and legacy API. |
| |
| // Name of a header that contains resolved CIPD instance IDs in /client and /dl |
| // responses. |
| const cipdInstanceHeader = "X-Cipd-Instance" |
| |
| // legacyInstance is JSON representation of Instance in the legacy API. |
| type legacyInstance struct { |
| PackageName string `json:"package_name,omitempty"` |
| InstanceID string `json:"instance_id,omitempty"` |
| RegisteredBy string `json:"registered_by,omitempty"` |
| RegisteredTs string `json:"registered_ts,omitempty"` // timestamp in microsec |
| } |
| |
| // FromInstance fills in legacyInstance based on data from Instance proto. |
| func (l *legacyInstance) FromInstance(inst *api.Instance) *legacyInstance { |
| l.PackageName = inst.Package |
| l.InstanceID = common.ObjectRefToInstanceID(inst.Instance) |
| l.RegisteredBy = inst.RegisteredBy |
| if ts := inst.RegisteredTs; ts != nil { |
| l.RegisteredTs = fmt.Sprintf("%d", ts.Seconds*1e6+int64(ts.Nanos/1e3)) |
| } else { |
| l.RegisteredTs = "" |
| } |
| return l |
| } |
| |
| // adaptGrpcErr knows how to convert gRPC-style errors to ugly looking HTTP |
| // error pages with appropriate HTTP status codes. |
| // |
| // Recognizes either real gRPC errors (produced with status.Errorf) or |
| // grpc-tagged errors produced via grpcutil. |
| func adaptGrpcErr(h func(*router.Context) error) router.Handler { |
| return func(ctx *router.Context) { |
| err := grpcutil.GRPCifyAndLogErr(ctx.Context, h(ctx)) |
| if code := grpc.Code(err); code != codes.OK { |
| http.Error(ctx.Writer, grpc.ErrorDesc(err), grpcutil.CodeStatus(code)) |
| } |
| } |
| } |
| |
| // replyWithJSON sends StatusOK with JSON body. |
| func replyWithJSON(w http.ResponseWriter, obj interface{}) error { |
| w.Header().Set("Content-Type", "application/json; charset=utf-8") |
| w.WriteHeader(http.StatusOK) |
| enc := json.NewEncoder(w) |
| enc.SetIndent("", " ") |
| enc.SetEscapeHTML(false) |
| return enc.Encode(obj) |
| } |
| |
| // replyWithError sends StatusOK with JSON body containing an error. |
| // |
| // Due to Cloud Endpoints limitations, legacy API used StatusOK for some not-OK |
| // responses and communicated the actual error through 'status' response field. |
| func replyWithError(w http.ResponseWriter, status, message string, args ...interface{}) error { |
| return replyWithJSON(w, map[string]string{ |
| "status": status, |
| "error_message": fmt.Sprintf(message, args...), |
| }) |
| } |
| |
| // InstallHandlers installs non-pRPC HTTP handlers into the router. |
| // |
| // 'base' middleware chain here is assumed to have an authentication middleware |
| // that checks 'Authorization' header (not cookies!). |
| func (impl *repoImpl) InstallHandlers(r *router.Router, base router.MiddlewareChain) { |
| r.GET("/client", base, adaptGrpcErr(impl.handleClientBootstrap)) |
| r.GET("/dl/*path", base, adaptGrpcErr(impl.handlePackageDownload)) |
| |
| r.GET("/_ah/api/repo/v1/client", base, adaptGrpcErr(impl.handleLegacyClientInfo)) |
| r.GET("/_ah/api/repo/v1/instance", base, adaptGrpcErr(impl.handleLegacyInstance)) |
| r.GET("/_ah/api/repo/v1/instance/resolve", base, adaptGrpcErr(impl.handleLegacyResolve)) |
| |
| // All other legacy endpoints (/_ah/api/repo/v1/*) just return an error asking |
| // the client to update. |
| r.NotFound(base, func(ctx *router.Context) { |
| if strings.HasPrefix(ctx.Request.URL.Path, "/_ah/api/repo/v1/") { |
| // Note: can't use http.Error here because it appends '\n' that renders |
| // ugly by the client. |
| ctx.Writer.Header().Set("Content-Type", "text/plain; charset=utf-8") |
| ctx.Writer.WriteHeader(http.StatusMethodNotAllowed) |
| fmt.Fprintf(ctx.Writer, "This version of CIPD client is no longer supported, please upgrade") |
| } else { |
| http.Error(ctx.Writer, "No such page", http.StatusNotFound) |
| } |
| }) |
| } |
| |
| // handleClientBootstrap redirects to a CIPD client binary in Google Storage. |
| // |
| // GET /client?platform=...&version=... |
| // |
| // Where: |
| // platform: linux-amd64, windows-386, etc. |
| // version: a package version identifier (instance ID, a ref or a tag). |
| // |
| // On success issues HTTP 302 redirect to the signed Google Storage URL. |
| // On errors returns HTTP 4** with an error message. |
| func (impl *repoImpl) handleClientBootstrap(ctx *router.Context) error { |
| c, r, w := ctx.Context, ctx.Request, ctx.Writer |
| |
| // Do light validation (the rest is in ResolveVersion), and get a full client |
| // package name. |
| platform := r.FormValue("platform") |
| version := r.FormValue("version") |
| switch { |
| case platform == "": |
| return status.Errorf(codes.InvalidArgument, "no 'platform' specified") |
| case version == "": |
| return status.Errorf(codes.InvalidArgument, "no 'version' specified") |
| } |
| pkg, err := processing.GetClientPackage(platform) |
| if err != nil { |
| return status.Errorf(codes.InvalidArgument, "bad platform name") |
| } |
| |
| // Resolve the version into a concrete instance. This also does rigorous |
| // argument validation, ACL checks and verifies the resulting instance exists. |
| inst, err := impl.ResolveVersion(c, &api.ResolveVersionRequest{ |
| Package: pkg, |
| Version: version, |
| }) |
| if err != nil { |
| return err |
| } |
| |
| // Put resolved instance ID into the response headers. This may be useful when |
| // debugging fetches. |
| w.Header().Set(cipdInstanceHeader, common.ObjectRefToInstanceID(inst.Instance)) |
| |
| // Grab the location of the extracted CIPD client from the post-processor. |
| res, err := processing.GetClientExtractorResult(c, inst) |
| switch { |
| case transient.Tag.In(err): |
| return err |
| case err == datastore.ErrNoSuchEntity: |
| return status.Errorf(codes.NotFound, "the client binary is not extracted yet, try later") |
| case err != nil: // fatal |
| return status.Errorf(codes.NotFound, "the client binary is not available - %s", err) |
| } |
| ref, err := res.ToObjectRef() |
| if err != nil { |
| return status.Errorf(codes.NotFound, "malformed ref to the client binary - %s", err) |
| } |
| |
| // Ask CAS for a signed URL to the client binary and redirect there. |
| url, err := impl.cas.GetObjectURL(c, &api.GetObjectURLRequest{ |
| Object: ref, |
| DownloadFilename: processing.GetClientBinaryName(pkg), // e.g. 'cipd.exe' |
| }) |
| if err == nil { |
| http.Redirect(w, r, url.SignedUrl, http.StatusFound) |
| } |
| return err |
| } |
| |
| // handlePackageDownload redirects to a CIPD package file (raw octet stream |
| // with zipped package data) in Google Storage. |
| // |
| // GET /dl/<package>/+/<version>. |
| // |
| // Where: |
| // package: a CIPD package name (e.g. "a/b/c/linux-amd64"). |
| // version: a package version identifier (instance ID, a ref or a tag). |
| // |
| // On success issues HTTP 302 redirect to the signed Google Storage URL. |
| // On errors returns HTTP 4** with an error message. |
| func (impl *repoImpl) handlePackageDownload(ctx *router.Context) error { |
| c, r, w := ctx.Context, ctx.Request, ctx.Writer |
| |
| // Parse the path. The router is too simplistic to parse such paths. It also |
| // likes to prepend '/' to it. |
| path := strings.TrimPrefix(ctx.Params.ByName("path"), "/") |
| chunks := strings.SplitN(path, "/+/", 2) |
| if len(chunks) != 2 { |
| return status.Errorf(codes.InvalidArgument, "the URL should have form /dl/<package>/+/<version>") |
| } |
| pkg, version := chunks[0], chunks[1] |
| |
| // Resolve the version into a concrete instance. This also does rigorous |
| // argument validation, ACL checks and verifies the resulting instance exists. |
| inst, err := impl.ResolveVersion(c, &api.ResolveVersionRequest{ |
| Package: pkg, |
| Version: version, |
| }) |
| if err != nil { |
| return err |
| } |
| |
| // Put resolved instance ID into the response headers. This may be useful when |
| // debugging fetches. |
| w.Header().Set(cipdInstanceHeader, common.ObjectRefToInstanceID(inst.Instance)) |
| |
| // Generate a name for the file based on the last two components of the |
| // package name. This name is used by browsers when downloading the file. |
| name := "" |
| chunks = strings.Split(pkg, "/") |
| if len(chunks) > 1 { |
| name = fmt.Sprintf("%s-%s", chunks[len(chunks)-2], chunks[len(chunks)-1]) |
| } else { |
| name = chunks[0] |
| } |
| |
| // Ask CAS for a signed URL to the package and redirect there. |
| url, err := impl.cas.GetObjectURL(c, &api.GetObjectURLRequest{ |
| Object: inst.Instance, |
| DownloadFilename: name + ".zip", |
| }) |
| if err == nil { |
| http.Redirect(w, r, url.SignedUrl, http.StatusFound) |
| } |
| return err |
| } |
| |
| // handleLegacyClientInfo is a legacy handler for an RPC replaced by /client |
| // endpoint. |
| // |
| // It returns information about a CIPD client package and a signed URL to fetch |
| // the client binary. |
| // |
| // GET /_ah/api/repo/v1/client?package_name=...&instance_id=... |
| // |
| // Where: |
| // package_name: full name of a CIPD client package. |
| // instance_id: a hex digest with instance ID. |
| // |
| // Returns: |
| // { |
| // "status": "...", |
| // "error_message": "...", |
| // "client_binary": { |
| // "file_name": "cipd", |
| // "sha1": "...", |
| // "fetch_url": "...", |
| // "size": "..." |
| // }, |
| // "instance": { |
| // "package_name": "...", |
| // "instance_id": "...", |
| // "registered_by": "...", |
| // "registered_ts": "<int64 timestamp in microseconds>" |
| // } |
| // } |
| func (impl *repoImpl) handleLegacyClientInfo(ctx *router.Context) error { |
| c, r, w := ctx.Context, ctx.Request, ctx.Writer |
| |
| iid := r.FormValue("instance_id") |
| if err := common.ValidateInstanceID(iid, common.KnownHash); err != nil { |
| return errors.Annotate(err, "bad instance_id").Tag(grpcutil.InvalidArgumentTag).Err() |
| } |
| |
| desc, err := impl.DescribeClient(c, &api.DescribeClientRequest{ |
| Package: r.FormValue("package_name"), |
| Instance: common.InstanceIDToObjectRef(iid), |
| }) |
| |
| switch grpc.Code(err) { |
| case codes.OK: |
| return replyWithJSON(w, map[string]interface{}{ |
| "status": "SUCCESS", |
| "instance": (&legacyInstance{}).FromInstance(desc.Instance), |
| "client_binary": map[string]string{ |
| "file_name": processing.GetClientBinaryName(desc.Instance.Package), |
| "sha1": desc.LegacySha1, |
| "fetch_url": desc.ClientBinary.SignedUrl, |
| "size": fmt.Sprintf("%d", desc.ClientSize), |
| }, |
| }) |
| case codes.NotFound: |
| return replyWithError(w, "INSTANCE_NOT_FOUND", "%s", grpc.ErrorDesc(err)) |
| case codes.FailedPrecondition: |
| return replyWithError(w, "NOT_EXTRACTED_YET", "the client binary is not extracted yet, try later") |
| case codes.Aborted: |
| return replyWithError(w, "ERROR", "the client binary is not available - %s", grpc.ErrorDesc(err)) |
| default: |
| return err // the legacy client recognizes other codes just fine |
| } |
| } |
| |
| // handleLegacyInstance is a legacy handler for an RPC replaced by |
| // DescribeInstance and GetInstanceURL. |
| // |
| // It returns information about an instance and a signed URL to fetch it. |
| // |
| // GET /_ah/api/repo/v1/instance?package_name=...&instance_id=... |
| // |
| // Where: |
| // package_name: full name of a package. |
| // instance_id: a hex digest with instance ID. |
| // |
| // Returns: |
| // { |
| // "status": "...", |
| // "error_message": "...", |
| // "fetch_url": "...", |
| // "instance": { |
| // "package_name": "...", |
| // "instance_id": "...", |
| // "registered_by": "...", |
| // "registered_ts": "<int64 timestamp in microseconds>" |
| // } |
| // } |
| func (impl *repoImpl) handleLegacyInstance(ctx *router.Context) error { |
| c, r, w := ctx.Context, ctx.Request, ctx.Writer |
| |
| iid := r.FormValue("instance_id") |
| if err := common.ValidateInstanceID(iid, common.KnownHash); err != nil { |
| return errors.Annotate(err, "bad instance_id").Tag(grpcutil.InvalidArgumentTag).Err() |
| } |
| |
| // This checks the request format, ACLs, verifies the instance exists and |
| // returns info about it. |
| inst, err := impl.DescribeInstance(c, &api.DescribeInstanceRequest{ |
| Package: r.FormValue("package_name"), |
| Instance: common.InstanceIDToObjectRef(iid), |
| }) |
| |
| var signedURL *api.ObjectURL |
| if err == nil { |
| // Here we know the instance exists and the caller has access to it, so just |
| // ask the CAS for an URL directly instead of using impl.GetInstanceURL, |
| // which will needlessly recheck ACLs and instance presence. |
| signedURL, err = impl.cas.GetObjectURL(c, &api.GetObjectURLRequest{ |
| Object: inst.Instance.Instance, |
| }) |
| } |
| |
| switch grpc.Code(err) { |
| case codes.OK: |
| return replyWithJSON(w, map[string]interface{}{ |
| "status": "SUCCESS", |
| "fetch_url": signedURL.SignedUrl, |
| "instance": (&legacyInstance{}).FromInstance(inst.Instance), |
| }) |
| case codes.NotFound: |
| return replyWithError(w, "INSTANCE_NOT_FOUND", "%s", grpc.ErrorDesc(err)) |
| default: |
| return err // the legacy client recognizes other codes just fine |
| } |
| } |
| |
| // handleLegacyResolve is a legacy handler for ResolveVersion RPC. |
| // |
| // GET /_ah/api/repo/v1/instance/resolve?package_name=...&version=... |
| // |
| // Where: |
| // package_name: full name of a package. |
| // version: a package version identifier (instance ID, a ref or a tag). |
| // |
| // Returns: |
| // { |
| // "status": "...", |
| // "error_message": "...", |
| // "instance_id": "..." |
| // } |
| func (impl *repoImpl) handleLegacyResolve(ctx *router.Context) error { |
| c, r, w := ctx.Context, ctx.Request, ctx.Writer |
| |
| resp, err := impl.ResolveVersion(c, &api.ResolveVersionRequest{ |
| Package: r.FormValue("package_name"), |
| Version: r.FormValue("version"), |
| }) |
| |
| switch grpc.Code(err) { |
| case codes.OK: |
| return replyWithJSON(w, map[string]string{ |
| "status": "SUCCESS", |
| "instance_id": common.ObjectRefToInstanceID(resp.Instance), |
| }) |
| case codes.NotFound: |
| return replyWithError(w, "INSTANCE_NOT_FOUND", "%s", grpc.ErrorDesc(err)) |
| case codes.FailedPrecondition: |
| return replyWithError(w, "AMBIGUOUS_VERSION", "%s", grpc.ErrorDesc(err)) |
| default: |
| return err // the legacy client recognizes other codes just fine |
| } |
| } |