cipd: Add -tag option to 'create' and 'pkg-register' subcomands.

R=nodir@chromium.org
BUG=481270

Review URL: https://codereview.chromium.org/1130733007
diff --git a/go/src/infra/tools/cipd/apps/cipd/main.go b/go/src/infra/tools/cipd/apps/cipd/main.go
index f634f74..aa91fcd 100644
--- a/go/src/infra/tools/cipd/apps/cipd/main.go
+++ b/go/src/infra/tools/cipd/apps/cipd/main.go
@@ -222,6 +222,42 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+// TagsOptions mixin.
+
+// Tags holds array of '-tag' command line options.
+type Tags []string
+
+func (tags *Tags) String() string {
+	// String() for empty vars used in -help output.
+	if len(*tags) == 0 {
+		return "key:value"
+	}
+	return strings.Join(*tags, " ")
+}
+
+// Set is called by 'flag' package when parsing command line options.
+func (tags *Tags) Set(value string) error {
+	err := cipd.ValidateInstanceTag(value)
+	if err != nil {
+		return err
+	}
+	*tags = append(*tags, value)
+	return nil
+}
+
+// TagsOptions defines command line arguments for commands that accept a set
+// of tags.
+type TagsOptions struct {
+	// Set of tags to attach to the package instance.
+	tags Tags
+}
+
+func (opts *TagsOptions) registerFlags(f *flag.FlagSet) {
+	opts.tags = []string{}
+	f.Var(&opts.tags, "tag", "tag to attach to the package instance")
+}
+
+////////////////////////////////////////////////////////////////////////////////
 // JSONOutputOptions mixin.
 
 // PackageInfo is put into JSON output by subcommands. It describes a built
@@ -289,6 +325,7 @@
 	CommandRun: func() subcommands.CommandRun {
 		c := &createRun{}
 		c.InputOptions.registerFlags(&c.Flags)
+		c.TagsOptions.registerFlags(&c.Flags)
 		c.ServiceOptions.registerFlags(&c.Flags)
 		c.JSONOutputOptions.registerFlags(&c.Flags)
 		return c
@@ -298,6 +335,7 @@
 type createRun struct {
 	subcommands.CommandRunBase
 	InputOptions
+	TagsOptions
 	ServiceOptions
 	JSONOutputOptions
 }
@@ -306,7 +344,7 @@
 	if !checkCommandLine(args, c.GetFlags(), 0) {
 		return 1
 	}
-	info, err := buildAndUploadInstance(c.InputOptions, c.ServiceOptions)
+	info, err := buildAndUploadInstance(c.InputOptions, c.TagsOptions, c.ServiceOptions)
 	err = c.writeJSONOutput(&info, err)
 	if err != nil {
 		reportError("Error while uploading the package: %s", err)
@@ -315,7 +353,7 @@
 	return 0
 }
 
-func buildAndUploadInstance(inputOpts InputOptions, serviceOpts ServiceOptions) (PackageInfo, error) {
+func buildAndUploadInstance(inputOpts InputOptions, tagsOpts TagsOptions, serviceOpts ServiceOptions) (PackageInfo, error) {
 	f, err := ioutil.TempFile("", "cipd_pkg")
 	if err != nil {
 		return PackageInfo{}, err
@@ -328,7 +366,7 @@
 	if err != nil {
 		return PackageInfo{}, err
 	}
-	return registerInstanceFile(f.Name(), serviceOpts)
+	return registerInstanceFile(f.Name(), tagsOpts, serviceOpts)
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -827,6 +865,7 @@
 	LongDesc:  "Uploads and registers package instance in the package repository.",
 	CommandRun: func() subcommands.CommandRun {
 		c := &registerRun{}
+		c.TagsOptions.registerFlags(&c.Flags)
 		c.ServiceOptions.registerFlags(&c.Flags)
 		c.JSONOutputOptions.registerFlags(&c.Flags)
 		return c
@@ -835,6 +874,7 @@
 
 type registerRun struct {
 	subcommands.CommandRunBase
+	TagsOptions
 	ServiceOptions
 	JSONOutputOptions
 }
@@ -843,7 +883,7 @@
 	if !checkCommandLine(args, c.GetFlags(), 1) {
 		return 1
 	}
-	info, err := registerInstanceFile(args[0], c.ServiceOptions)
+	info, err := registerInstanceFile(args[0], c.TagsOptions, c.ServiceOptions)
 	err = c.writeJSONOutput(&info, err)
 	if err != nil {
 		reportError("Error while registering the package: %s", err)
@@ -852,7 +892,7 @@
 	return 0
 }
 
-func registerInstanceFile(instanceFile string, serviceOpts ServiceOptions) (PackageInfo, error) {
+func registerInstanceFile(instanceFile string, tagsOpts TagsOptions, serviceOpts ServiceOptions) (PackageInfo, error) {
 	inst, err := cipd.OpenInstanceFile(instanceFile, "")
 	if err != nil {
 		return PackageInfo{}, err
@@ -866,6 +906,7 @@
 	info := inspectInstance(inst, false)
 	return info, cipd.RegisterInstance(cipd.RegisterInstanceOptions{
 		PackageInstance: inst,
+		Tags:            tagsOpts.tags,
 		UploadOptions: cipd.UploadOptions{
 			ServiceURL: serviceOpts.serviceURL,
 			Client:     client,
diff --git a/go/src/infra/tools/cipd/common.go b/go/src/infra/tools/cipd/common.go
index 3bcc9da..e4c2859 100644
--- a/go/src/infra/tools/cipd/common.go
+++ b/go/src/infra/tools/cipd/common.go
@@ -10,6 +10,7 @@
 	"io"
 	"io/ioutil"
 	"regexp"
+	"strings"
 
 	"infra/libs/build"
 )
@@ -20,10 +21,13 @@
 // Name of the directory inside an installation root reserved for cipd stuff.
 const siteServiceDir = ".cipd"
 
-// packageNameRe is a Regular expression for a package name: <word>/<word/<word>
+// packageNameRe is a regular expression for a package name: <word>/<word/<word>
 // Package names must be lower case.
 var packageNameRe = regexp.MustCompile(`^([a-z0-9_\-]+/)*[a-z0-9_\-]+$`)
 
+// instanceTagKeyRe is a regular expression for a tag key.
+var instanceTagKeyRe = regexp.MustCompile(`^[a-z0-9_\-]+$`)
+
 // Name of the manifest file inside the package.
 const manifestName = packageServiceDir + "/manifest.json"
 
@@ -58,6 +62,21 @@
 	return nil
 }
 
+// ValidateInstanceTag returns error if a string doesn't look like a valid tag.
+func ValidateInstanceTag(t string) error {
+	chunks := strings.SplitN(t, ":", 2)
+	if len(chunks) != 2 {
+		return fmt.Errorf("The string %q doesn't look like a tag (a key:value pair)", t)
+	}
+	if len(t) > 400 {
+		return fmt.Errorf("The tag is too long: %q", t)
+	}
+	if !instanceTagKeyRe.MatchString(chunks[0]) {
+		return fmt.Errorf("Invalid tag key in %q. Should be a lowercase word.", t)
+	}
+	return nil
+}
+
 // DefaultServiceURL returns URL to a backend to use by default.
 func DefaultServiceURL() string {
 	if build.ReleaseBuild {
diff --git a/go/src/infra/tools/cipd/common_test.go b/go/src/infra/tools/cipd/common_test.go
index 039b208..c0bd2d8 100644
--- a/go/src/infra/tools/cipd/common_test.go
+++ b/go/src/infra/tools/cipd/common_test.go
@@ -72,3 +72,16 @@
 		So(ValidateInstanceID("AAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"), ShouldNotBeNil)
 	})
 }
+
+func TestValidateInstanceTag(t *testing.T) {
+	Convey("ValidateInstanceTag works", t, func() {
+		So(ValidateInstanceTag(""), ShouldNotBeNil)
+		So(ValidateInstanceTag("notapair"), ShouldNotBeNil)
+		So(ValidateInstanceTag(strings.Repeat("long", 200)+":abc"), ShouldNotBeNil)
+		So(ValidateInstanceTag("BADKEY:value"), ShouldNotBeNil)
+		So(ValidateInstanceTag("good:tag"), ShouldBeNil)
+		So(ValidateInstanceTag("good_tag:"), ShouldBeNil)
+		So(ValidateInstanceTag("good:tag:blah"), ShouldBeNil)
+		So(ValidateInstanceTag("good_tag:asdad/asdad/adad/a\\asdasdad"), ShouldBeNil)
+	})
+}
diff --git a/go/src/infra/tools/cipd/remote.go b/go/src/infra/tools/cipd/remote.go
index 1904342..627fbba 100644
--- a/go/src/infra/tools/cipd/remote.go
+++ b/go/src/infra/tools/cipd/remote.go
@@ -67,6 +67,16 @@
 	Principal string `json:"principal"`
 }
 
+// pendingProcessingError is returned by attachTags if package instance is not
+// yet ready and the call should be retried later.
+type pendingProcessingError struct {
+	message string
+}
+
+func (e *pendingProcessingError) Error() string {
+	return e.message
+}
+
 // newRemoteService is mocked in tests.
 var newRemoteService = func(client *http.Client, url string, log logging.Logger) *remoteService {
 	log.Infof("cipd: service URL is %s", url)
@@ -339,6 +349,46 @@
 	return fmt.Errorf("Unexpected reply status: %s", reply.Status)
 }
 
+func (r *remoteService) attachTags(packageName, instanceID string, tags []string) error {
+	// Tags will be passed in the request body, not via URL.
+	endpoint, err := tagsEndpoint(packageName, instanceID, nil)
+	if err != nil {
+		return err
+	}
+
+	if len(tags) == 0 {
+		return errors.New("At least one tag must be provided")
+	}
+	for _, tag := range tags {
+		err = ValidateInstanceTag(tag)
+		if err != nil {
+			return err
+		}
+	}
+	var request struct {
+		Tags []string `json:"tags"`
+	}
+	request.Tags = tags
+
+	var reply struct {
+		Status       string `json:"status"`
+		ErrorMessage string `json:"error_message"`
+	}
+	err = r.makeRequest(endpoint, "POST", &request, &reply)
+	if err != nil {
+		return err
+	}
+	switch reply.Status {
+	case "SUCCESS":
+		return nil
+	case "PROCESSING_NOT_FINISHED_YET":
+		return &pendingProcessingError{reply.ErrorMessage}
+	case "ERROR", "PROCESSING_FAILED":
+		return errors.New(reply.ErrorMessage)
+	}
+	return fmt.Errorf("Unexpected status when attaching tags: %s", reply.Status)
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 
 func instanceEndpoint(packageName, instanceID string) (string, error) {
@@ -366,6 +416,30 @@
 	return "repo/v1/acl?" + params.Encode(), nil
 }
 
+func tagsEndpoint(packageName, instanceID string, tags []string) (string, error) {
+	err := ValidatePackageName(packageName)
+	if err != nil {
+		return "", err
+	}
+	err = ValidateInstanceID(instanceID)
+	if err != nil {
+		return "", err
+	}
+	for _, tag := range tags {
+		err = ValidateInstanceTag(tag)
+		if err != nil {
+			return "", err
+		}
+	}
+	params := url.Values{}
+	params.Add("package_name", packageName)
+	params.Add("instance_id", instanceID)
+	for _, tag := range tags {
+		params.Add("tag", tag)
+	}
+	return "repo/v1/tags?" + params.Encode(), nil
+}
+
 // convertTimestamp coverts string with int64 timestamp in microseconds since
 // to time.Time
 func convertTimestamp(ts string) (time.Time, error) {
diff --git a/go/src/infra/tools/cipd/remote_test.go b/go/src/infra/tools/cipd/remote_test.go
index 427d245..f8e40b4 100644
--- a/go/src/infra/tools/cipd/remote_test.go
+++ b/go/src/infra/tools/cipd/remote_test.go
@@ -84,6 +84,20 @@
 		return remote.modifyACL("pkgname", changes)
 	}
 
+	mockAttachTags := func(c C, tags []string, request, response string) error {
+		remote := mockRemoteService(func(w http.ResponseWriter, r *http.Request) {
+			c.So(r.URL.Path, ShouldEqual, "/_ah/api/repo/v1/tags")
+			c.So(r.URL.Query().Get("package_name"), ShouldEqual, "pkgname")
+			c.So(r.URL.Query().Get("instance_id"), ShouldEqual, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
+			c.So(r.Method, ShouldEqual, "POST")
+			body, err := ioutil.ReadAll(r.Body)
+			c.So(err, ShouldBeNil)
+			c.So(string(body), ShouldEqual, request)
+			w.Write([]byte(response))
+		})
+		return remote.attachTags("pkgname", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", tags)
+	}
+
 	Convey("makeRequest POST works", t, func(c C) {
 		remote := mockRemoteService(func(w http.ResponseWriter, r *http.Request) {
 			c.So(r.Method, ShouldEqual, "POST")
@@ -434,6 +448,40 @@
 		}`)
 		So(err, ShouldNotBeNil)
 	})
+
+	Convey("attachTags SUCCESS", t, func(c C) {
+		err := mockAttachTags(
+			c, []string{"tag1:value1", "tag2:value2"},
+			`{"tags":["tag1:value1","tag2:value2"]}`,
+			`{"status":"SUCCESS"}`)
+		So(err, ShouldBeNil)
+	})
+
+	Convey("attachTags no tags", t, func(c C) {
+		err := mockAttachTags(c, nil, "", "")
+		So(err, ShouldNotBeNil)
+	})
+
+	Convey("attachTags bad tag", t, func(c C) {
+		err := mockAttachTags(c, []string{"BADTAG"}, "", "")
+		So(err, ShouldNotBeNil)
+	})
+
+	Convey("attachTags PROCESSING_NOT_FINISHED_YET", t, func(c C) {
+		err := mockAttachTags(
+			c, []string{"tag1:value1", "tag2:value2"},
+			`{"tags":["tag1:value1","tag2:value2"]}`,
+			`{"status":"PROCESSING_NOT_FINISHED_YET", "error_message":"Blah"}`)
+		So(err, ShouldResemble, &pendingProcessingError{message: "Blah"})
+	})
+
+	Convey("attachTags ERROR", t, func(c C) {
+		err := mockAttachTags(
+			c, []string{"tag1:value1", "tag2:value2"},
+			`{"tags":["tag1:value1","tag2:value2"]}`,
+			`{"status":"ERROR", "error_message":"Blah"}`)
+		So(err, ShouldNotBeNil)
+	})
 }
 
 ////////////////////////////////////////////////////////////////////////////////
diff --git a/go/src/infra/tools/cipd/uploader.go b/go/src/infra/tools/cipd/uploader.go
index 78b83dc..c76bb80 100644
--- a/go/src/infra/tools/cipd/uploader.go
+++ b/go/src/infra/tools/cipd/uploader.go
@@ -18,6 +18,8 @@
 var (
 	// ErrFinalizationTimeout is returned if CAS service can not finalize upload fast enough.
 	ErrFinalizationTimeout = errors.New("Timeout while waiting for CAS service to finalize the upload")
+	// ErrAttachTagsTimeout is returned when service refuses to accept tags for a long time.
+	ErrAttachTagsTimeout = errors.New("Timeout while attaching tags")
 )
 
 // UploadOptions contains upload related parameters shared by UploadToCAS and
@@ -135,8 +137,11 @@
 type RegisterInstanceOptions struct {
 	UploadOptions
 
-	// PackageInstance is a package to upload.
+	// PackageInstance is a package instance to register.
 	PackageInstance PackageInstance
+	// Tags is a list of tags to attach to an instance. Will be attached even if
+	// the instance already existed before.
+	Tags []string
 }
 
 // RegisterInstance makes the package instance available for clients by
@@ -194,7 +199,7 @@
 		log.Infof("cipd: instance %s:%s was successfully registered", inst.PackageName(), inst.InstanceID())
 	}
 
-	return nil
+	return attachTagsWhenReady(remote, inst.PackageName(), inst.InstanceID(), options.Tags, log)
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -324,3 +329,34 @@
 	}
 	return
 }
+
+////////////////////////////////////////////////////////////////////////////////
+// Tags related functions.
+
+// attachTagsWhenReady attaches tags to an instance retrying when receiving
+// PROCESSING_NOT_FINISHED_YET errors.
+func attachTagsWhenReady(remote *remoteService, packageName, instanceID string, tags []string, log logging.Logger) error {
+	if len(tags) == 0 {
+		return nil
+	}
+	for _, tag := range tags {
+		log.Infof("cipd: attaching tag %s", tag)
+	}
+	deadline := clock.Now().Add(60 * time.Second)
+	for clock.Now().Before(deadline) {
+		err := remote.attachTags(packageName, instanceID, tags)
+		if err == nil {
+			log.Infof("cipd: all tags attached")
+			return nil
+		}
+		if _, ok := err.(*pendingProcessingError); ok {
+			log.Warningf("cipd: package instance is not ready yet - %s", err)
+			clock.Sleep(5 * time.Second)
+		} else {
+			log.Errorf("cipd: failed to attach tags - %s", err)
+			return err
+		}
+	}
+	log.Errorf("cipd: failed to attach tags - deadline exceeded")
+	return ErrAttachTagsTimeout
+}
diff --git a/go/src/infra/tools/cipd/uploader_test.go b/go/src/infra/tools/cipd/uploader_test.go
index 5678d32..11d1128 100644
--- a/go/src/infra/tools/cipd/uploader_test.go
+++ b/go/src/infra/tools/cipd/uploader_test.go
@@ -214,6 +214,59 @@
 	})
 }
 
+func TestAttachTagsWhenReady(t *testing.T) {
+	Convey("Mocking clock", t, func() {
+		mockClock(time.Now())
+
+		Convey("attachTagsWhenReady works", func() {
+			remote := mockRemoteServiceWithExpectations([]expectedHTTPCall{
+				{
+					Method: "POST",
+					Path:   "/_ah/api/repo/v1/tags",
+					Query: url.Values{
+						"instance_id":  []string{"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"},
+						"package_name": []string{"pkgname"},
+					},
+					Reply: `{"status": "PROCESSING_NOT_FINISHED_YET"}`,
+				},
+				{
+					Method: "POST",
+					Path:   "/_ah/api/repo/v1/tags",
+					Query: url.Values{
+						"instance_id":  []string{"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"},
+						"package_name": []string{"pkgname"},
+					},
+					Reply: `{"status": "SUCCESS"}`,
+				},
+			})
+			err := attachTagsWhenReady(
+				remote, "pkgname", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
+				[]string{"tag1:value1"}, logging.DefaultLogger)
+			So(err, ShouldBeNil)
+		})
+
+		Convey("attachTagsWhenReady timeout", func() {
+			calls := []expectedHTTPCall{}
+			for i := 0; i < 20; i++ {
+				calls = append(calls, expectedHTTPCall{
+					Method: "POST",
+					Path:   "/_ah/api/repo/v1/tags",
+					Query: url.Values{
+						"instance_id":  []string{"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"},
+						"package_name": []string{"pkgname"},
+					},
+					Reply: `{"status": "PROCESSING_NOT_FINISHED_YET"}`,
+				})
+			}
+			remote := mockRemoteServiceWithExpectations(calls)
+			err := attachTagsWhenReady(
+				remote, "pkgname", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
+				[]string{"tag1:value1"}, logging.DefaultLogger)
+			So(err, ShouldEqual, ErrAttachTagsTimeout)
+		})
+	})
+}
+
 func mockResumableUpload() {
 	prev := resumableUpload
 	resumableUpload = func(string, int64, UploadToCASOptions) error {