cipd: Add -tag option to 'create' and 'pkg-register' subcomands.
R=nodir@chromium.org
BUG=481270
Review URL: https://codereview.chromium.org/1130733007
diff --git a/go/src/infra/tools/cipd/apps/cipd/main.go b/go/src/infra/tools/cipd/apps/cipd/main.go
index f634f74..aa91fcd 100644
--- a/go/src/infra/tools/cipd/apps/cipd/main.go
+++ b/go/src/infra/tools/cipd/apps/cipd/main.go
@@ -222,6 +222,42 @@
}
////////////////////////////////////////////////////////////////////////////////
+// TagsOptions mixin.
+
+// Tags holds array of '-tag' command line options.
+type Tags []string
+
+func (tags *Tags) String() string {
+ // String() for empty vars used in -help output.
+ if len(*tags) == 0 {
+ return "key:value"
+ }
+ return strings.Join(*tags, " ")
+}
+
+// Set is called by 'flag' package when parsing command line options.
+func (tags *Tags) Set(value string) error {
+ err := cipd.ValidateInstanceTag(value)
+ if err != nil {
+ return err
+ }
+ *tags = append(*tags, value)
+ return nil
+}
+
+// TagsOptions defines command line arguments for commands that accept a set
+// of tags.
+type TagsOptions struct {
+ // Set of tags to attach to the package instance.
+ tags Tags
+}
+
+func (opts *TagsOptions) registerFlags(f *flag.FlagSet) {
+ opts.tags = []string{}
+ f.Var(&opts.tags, "tag", "tag to attach to the package instance")
+}
+
+////////////////////////////////////////////////////////////////////////////////
// JSONOutputOptions mixin.
// PackageInfo is put into JSON output by subcommands. It describes a built
@@ -289,6 +325,7 @@
CommandRun: func() subcommands.CommandRun {
c := &createRun{}
c.InputOptions.registerFlags(&c.Flags)
+ c.TagsOptions.registerFlags(&c.Flags)
c.ServiceOptions.registerFlags(&c.Flags)
c.JSONOutputOptions.registerFlags(&c.Flags)
return c
@@ -298,6 +335,7 @@
type createRun struct {
subcommands.CommandRunBase
InputOptions
+ TagsOptions
ServiceOptions
JSONOutputOptions
}
@@ -306,7 +344,7 @@
if !checkCommandLine(args, c.GetFlags(), 0) {
return 1
}
- info, err := buildAndUploadInstance(c.InputOptions, c.ServiceOptions)
+ info, err := buildAndUploadInstance(c.InputOptions, c.TagsOptions, c.ServiceOptions)
err = c.writeJSONOutput(&info, err)
if err != nil {
reportError("Error while uploading the package: %s", err)
@@ -315,7 +353,7 @@
return 0
}
-func buildAndUploadInstance(inputOpts InputOptions, serviceOpts ServiceOptions) (PackageInfo, error) {
+func buildAndUploadInstance(inputOpts InputOptions, tagsOpts TagsOptions, serviceOpts ServiceOptions) (PackageInfo, error) {
f, err := ioutil.TempFile("", "cipd_pkg")
if err != nil {
return PackageInfo{}, err
@@ -328,7 +366,7 @@
if err != nil {
return PackageInfo{}, err
}
- return registerInstanceFile(f.Name(), serviceOpts)
+ return registerInstanceFile(f.Name(), tagsOpts, serviceOpts)
}
////////////////////////////////////////////////////////////////////////////////
@@ -827,6 +865,7 @@
LongDesc: "Uploads and registers package instance in the package repository.",
CommandRun: func() subcommands.CommandRun {
c := ®isterRun{}
+ c.TagsOptions.registerFlags(&c.Flags)
c.ServiceOptions.registerFlags(&c.Flags)
c.JSONOutputOptions.registerFlags(&c.Flags)
return c
@@ -835,6 +874,7 @@
type registerRun struct {
subcommands.CommandRunBase
+ TagsOptions
ServiceOptions
JSONOutputOptions
}
@@ -843,7 +883,7 @@
if !checkCommandLine(args, c.GetFlags(), 1) {
return 1
}
- info, err := registerInstanceFile(args[0], c.ServiceOptions)
+ info, err := registerInstanceFile(args[0], c.TagsOptions, c.ServiceOptions)
err = c.writeJSONOutput(&info, err)
if err != nil {
reportError("Error while registering the package: %s", err)
@@ -852,7 +892,7 @@
return 0
}
-func registerInstanceFile(instanceFile string, serviceOpts ServiceOptions) (PackageInfo, error) {
+func registerInstanceFile(instanceFile string, tagsOpts TagsOptions, serviceOpts ServiceOptions) (PackageInfo, error) {
inst, err := cipd.OpenInstanceFile(instanceFile, "")
if err != nil {
return PackageInfo{}, err
@@ -866,6 +906,7 @@
info := inspectInstance(inst, false)
return info, cipd.RegisterInstance(cipd.RegisterInstanceOptions{
PackageInstance: inst,
+ Tags: tagsOpts.tags,
UploadOptions: cipd.UploadOptions{
ServiceURL: serviceOpts.serviceURL,
Client: client,
diff --git a/go/src/infra/tools/cipd/common.go b/go/src/infra/tools/cipd/common.go
index 3bcc9da..e4c2859 100644
--- a/go/src/infra/tools/cipd/common.go
+++ b/go/src/infra/tools/cipd/common.go
@@ -10,6 +10,7 @@
"io"
"io/ioutil"
"regexp"
+ "strings"
"infra/libs/build"
)
@@ -20,10 +21,13 @@
// Name of the directory inside an installation root reserved for cipd stuff.
const siteServiceDir = ".cipd"
-// packageNameRe is a Regular expression for a package name: <word>/<word/<word>
+// packageNameRe is a regular expression for a package name: <word>/<word/<word>
// Package names must be lower case.
var packageNameRe = regexp.MustCompile(`^([a-z0-9_\-]+/)*[a-z0-9_\-]+$`)
+// instanceTagKeyRe is a regular expression for a tag key.
+var instanceTagKeyRe = regexp.MustCompile(`^[a-z0-9_\-]+$`)
+
// Name of the manifest file inside the package.
const manifestName = packageServiceDir + "/manifest.json"
@@ -58,6 +62,21 @@
return nil
}
+// ValidateInstanceTag returns error if a string doesn't look like a valid tag.
+func ValidateInstanceTag(t string) error {
+ chunks := strings.SplitN(t, ":", 2)
+ if len(chunks) != 2 {
+ return fmt.Errorf("The string %q doesn't look like a tag (a key:value pair)", t)
+ }
+ if len(t) > 400 {
+ return fmt.Errorf("The tag is too long: %q", t)
+ }
+ if !instanceTagKeyRe.MatchString(chunks[0]) {
+ return fmt.Errorf("Invalid tag key in %q. Should be a lowercase word.", t)
+ }
+ return nil
+}
+
// DefaultServiceURL returns URL to a backend to use by default.
func DefaultServiceURL() string {
if build.ReleaseBuild {
diff --git a/go/src/infra/tools/cipd/common_test.go b/go/src/infra/tools/cipd/common_test.go
index 039b208..c0bd2d8 100644
--- a/go/src/infra/tools/cipd/common_test.go
+++ b/go/src/infra/tools/cipd/common_test.go
@@ -72,3 +72,16 @@
So(ValidateInstanceID("AAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"), ShouldNotBeNil)
})
}
+
+func TestValidateInstanceTag(t *testing.T) {
+ Convey("ValidateInstanceTag works", t, func() {
+ So(ValidateInstanceTag(""), ShouldNotBeNil)
+ So(ValidateInstanceTag("notapair"), ShouldNotBeNil)
+ So(ValidateInstanceTag(strings.Repeat("long", 200)+":abc"), ShouldNotBeNil)
+ So(ValidateInstanceTag("BADKEY:value"), ShouldNotBeNil)
+ So(ValidateInstanceTag("good:tag"), ShouldBeNil)
+ So(ValidateInstanceTag("good_tag:"), ShouldBeNil)
+ So(ValidateInstanceTag("good:tag:blah"), ShouldBeNil)
+ So(ValidateInstanceTag("good_tag:asdad/asdad/adad/a\\asdasdad"), ShouldBeNil)
+ })
+}
diff --git a/go/src/infra/tools/cipd/remote.go b/go/src/infra/tools/cipd/remote.go
index 1904342..627fbba 100644
--- a/go/src/infra/tools/cipd/remote.go
+++ b/go/src/infra/tools/cipd/remote.go
@@ -67,6 +67,16 @@
Principal string `json:"principal"`
}
+// pendingProcessingError is returned by attachTags if package instance is not
+// yet ready and the call should be retried later.
+type pendingProcessingError struct {
+ message string
+}
+
+func (e *pendingProcessingError) Error() string {
+ return e.message
+}
+
// newRemoteService is mocked in tests.
var newRemoteService = func(client *http.Client, url string, log logging.Logger) *remoteService {
log.Infof("cipd: service URL is %s", url)
@@ -339,6 +349,46 @@
return fmt.Errorf("Unexpected reply status: %s", reply.Status)
}
+func (r *remoteService) attachTags(packageName, instanceID string, tags []string) error {
+ // Tags will be passed in the request body, not via URL.
+ endpoint, err := tagsEndpoint(packageName, instanceID, nil)
+ if err != nil {
+ return err
+ }
+
+ if len(tags) == 0 {
+ return errors.New("At least one tag must be provided")
+ }
+ for _, tag := range tags {
+ err = ValidateInstanceTag(tag)
+ if err != nil {
+ return err
+ }
+ }
+ var request struct {
+ Tags []string `json:"tags"`
+ }
+ request.Tags = tags
+
+ var reply struct {
+ Status string `json:"status"`
+ ErrorMessage string `json:"error_message"`
+ }
+ err = r.makeRequest(endpoint, "POST", &request, &reply)
+ if err != nil {
+ return err
+ }
+ switch reply.Status {
+ case "SUCCESS":
+ return nil
+ case "PROCESSING_NOT_FINISHED_YET":
+ return &pendingProcessingError{reply.ErrorMessage}
+ case "ERROR", "PROCESSING_FAILED":
+ return errors.New(reply.ErrorMessage)
+ }
+ return fmt.Errorf("Unexpected status when attaching tags: %s", reply.Status)
+}
+
////////////////////////////////////////////////////////////////////////////////
func instanceEndpoint(packageName, instanceID string) (string, error) {
@@ -366,6 +416,30 @@
return "repo/v1/acl?" + params.Encode(), nil
}
+func tagsEndpoint(packageName, instanceID string, tags []string) (string, error) {
+ err := ValidatePackageName(packageName)
+ if err != nil {
+ return "", err
+ }
+ err = ValidateInstanceID(instanceID)
+ if err != nil {
+ return "", err
+ }
+ for _, tag := range tags {
+ err = ValidateInstanceTag(tag)
+ if err != nil {
+ return "", err
+ }
+ }
+ params := url.Values{}
+ params.Add("package_name", packageName)
+ params.Add("instance_id", instanceID)
+ for _, tag := range tags {
+ params.Add("tag", tag)
+ }
+ return "repo/v1/tags?" + params.Encode(), nil
+}
+
// convertTimestamp coverts string with int64 timestamp in microseconds since
// to time.Time
func convertTimestamp(ts string) (time.Time, error) {
diff --git a/go/src/infra/tools/cipd/remote_test.go b/go/src/infra/tools/cipd/remote_test.go
index 427d245..f8e40b4 100644
--- a/go/src/infra/tools/cipd/remote_test.go
+++ b/go/src/infra/tools/cipd/remote_test.go
@@ -84,6 +84,20 @@
return remote.modifyACL("pkgname", changes)
}
+ mockAttachTags := func(c C, tags []string, request, response string) error {
+ remote := mockRemoteService(func(w http.ResponseWriter, r *http.Request) {
+ c.So(r.URL.Path, ShouldEqual, "/_ah/api/repo/v1/tags")
+ c.So(r.URL.Query().Get("package_name"), ShouldEqual, "pkgname")
+ c.So(r.URL.Query().Get("instance_id"), ShouldEqual, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
+ c.So(r.Method, ShouldEqual, "POST")
+ body, err := ioutil.ReadAll(r.Body)
+ c.So(err, ShouldBeNil)
+ c.So(string(body), ShouldEqual, request)
+ w.Write([]byte(response))
+ })
+ return remote.attachTags("pkgname", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", tags)
+ }
+
Convey("makeRequest POST works", t, func(c C) {
remote := mockRemoteService(func(w http.ResponseWriter, r *http.Request) {
c.So(r.Method, ShouldEqual, "POST")
@@ -434,6 +448,40 @@
}`)
So(err, ShouldNotBeNil)
})
+
+ Convey("attachTags SUCCESS", t, func(c C) {
+ err := mockAttachTags(
+ c, []string{"tag1:value1", "tag2:value2"},
+ `{"tags":["tag1:value1","tag2:value2"]}`,
+ `{"status":"SUCCESS"}`)
+ So(err, ShouldBeNil)
+ })
+
+ Convey("attachTags no tags", t, func(c C) {
+ err := mockAttachTags(c, nil, "", "")
+ So(err, ShouldNotBeNil)
+ })
+
+ Convey("attachTags bad tag", t, func(c C) {
+ err := mockAttachTags(c, []string{"BADTAG"}, "", "")
+ So(err, ShouldNotBeNil)
+ })
+
+ Convey("attachTags PROCESSING_NOT_FINISHED_YET", t, func(c C) {
+ err := mockAttachTags(
+ c, []string{"tag1:value1", "tag2:value2"},
+ `{"tags":["tag1:value1","tag2:value2"]}`,
+ `{"status":"PROCESSING_NOT_FINISHED_YET", "error_message":"Blah"}`)
+ So(err, ShouldResemble, &pendingProcessingError{message: "Blah"})
+ })
+
+ Convey("attachTags ERROR", t, func(c C) {
+ err := mockAttachTags(
+ c, []string{"tag1:value1", "tag2:value2"},
+ `{"tags":["tag1:value1","tag2:value2"]}`,
+ `{"status":"ERROR", "error_message":"Blah"}`)
+ So(err, ShouldNotBeNil)
+ })
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/go/src/infra/tools/cipd/uploader.go b/go/src/infra/tools/cipd/uploader.go
index 78b83dc..c76bb80 100644
--- a/go/src/infra/tools/cipd/uploader.go
+++ b/go/src/infra/tools/cipd/uploader.go
@@ -18,6 +18,8 @@
var (
// ErrFinalizationTimeout is returned if CAS service can not finalize upload fast enough.
ErrFinalizationTimeout = errors.New("Timeout while waiting for CAS service to finalize the upload")
+ // ErrAttachTagsTimeout is returned when service refuses to accept tags for a long time.
+ ErrAttachTagsTimeout = errors.New("Timeout while attaching tags")
)
// UploadOptions contains upload related parameters shared by UploadToCAS and
@@ -135,8 +137,11 @@
type RegisterInstanceOptions struct {
UploadOptions
- // PackageInstance is a package to upload.
+ // PackageInstance is a package instance to register.
PackageInstance PackageInstance
+ // Tags is a list of tags to attach to an instance. Will be attached even if
+ // the instance already existed before.
+ Tags []string
}
// RegisterInstance makes the package instance available for clients by
@@ -194,7 +199,7 @@
log.Infof("cipd: instance %s:%s was successfully registered", inst.PackageName(), inst.InstanceID())
}
- return nil
+ return attachTagsWhenReady(remote, inst.PackageName(), inst.InstanceID(), options.Tags, log)
}
////////////////////////////////////////////////////////////////////////////////
@@ -324,3 +329,34 @@
}
return
}
+
+////////////////////////////////////////////////////////////////////////////////
+// Tags related functions.
+
+// attachTagsWhenReady attaches tags to an instance retrying when receiving
+// PROCESSING_NOT_FINISHED_YET errors.
+func attachTagsWhenReady(remote *remoteService, packageName, instanceID string, tags []string, log logging.Logger) error {
+ if len(tags) == 0 {
+ return nil
+ }
+ for _, tag := range tags {
+ log.Infof("cipd: attaching tag %s", tag)
+ }
+ deadline := clock.Now().Add(60 * time.Second)
+ for clock.Now().Before(deadline) {
+ err := remote.attachTags(packageName, instanceID, tags)
+ if err == nil {
+ log.Infof("cipd: all tags attached")
+ return nil
+ }
+ if _, ok := err.(*pendingProcessingError); ok {
+ log.Warningf("cipd: package instance is not ready yet - %s", err)
+ clock.Sleep(5 * time.Second)
+ } else {
+ log.Errorf("cipd: failed to attach tags - %s", err)
+ return err
+ }
+ }
+ log.Errorf("cipd: failed to attach tags - deadline exceeded")
+ return ErrAttachTagsTimeout
+}
diff --git a/go/src/infra/tools/cipd/uploader_test.go b/go/src/infra/tools/cipd/uploader_test.go
index 5678d32..11d1128 100644
--- a/go/src/infra/tools/cipd/uploader_test.go
+++ b/go/src/infra/tools/cipd/uploader_test.go
@@ -214,6 +214,59 @@
})
}
+func TestAttachTagsWhenReady(t *testing.T) {
+ Convey("Mocking clock", t, func() {
+ mockClock(time.Now())
+
+ Convey("attachTagsWhenReady works", func() {
+ remote := mockRemoteServiceWithExpectations([]expectedHTTPCall{
+ {
+ Method: "POST",
+ Path: "/_ah/api/repo/v1/tags",
+ Query: url.Values{
+ "instance_id": []string{"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"},
+ "package_name": []string{"pkgname"},
+ },
+ Reply: `{"status": "PROCESSING_NOT_FINISHED_YET"}`,
+ },
+ {
+ Method: "POST",
+ Path: "/_ah/api/repo/v1/tags",
+ Query: url.Values{
+ "instance_id": []string{"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"},
+ "package_name": []string{"pkgname"},
+ },
+ Reply: `{"status": "SUCCESS"}`,
+ },
+ })
+ err := attachTagsWhenReady(
+ remote, "pkgname", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
+ []string{"tag1:value1"}, logging.DefaultLogger)
+ So(err, ShouldBeNil)
+ })
+
+ Convey("attachTagsWhenReady timeout", func() {
+ calls := []expectedHTTPCall{}
+ for i := 0; i < 20; i++ {
+ calls = append(calls, expectedHTTPCall{
+ Method: "POST",
+ Path: "/_ah/api/repo/v1/tags",
+ Query: url.Values{
+ "instance_id": []string{"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"},
+ "package_name": []string{"pkgname"},
+ },
+ Reply: `{"status": "PROCESSING_NOT_FINISHED_YET"}`,
+ })
+ }
+ remote := mockRemoteServiceWithExpectations(calls)
+ err := attachTagsWhenReady(
+ remote, "pkgname", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
+ []string{"tag1:value1"}, logging.DefaultLogger)
+ So(err, ShouldEqual, ErrAttachTagsTimeout)
+ })
+ })
+}
+
func mockResumableUpload() {
prev := resumableUpload
resumableUpload = func(string, int64, UploadToCASOptions) error {