[buildbucket] Config validation - add topics validation logic

add topics validation logic in the project config validation.

Bug:1381210
Change-Id: I5f6ff024d1cf6e8fd22e97dae7b68c45234d09b1
Reviewed-on: https://chromium-review.googlesource.com/c/infra/luci/luci-go/+/4061167
Reviewed-by: Chan Li <chanli@chromium.org>
Commit-Queue: Yuanjun Huang <yuanjunh@google.com>
diff --git a/buildbucket/appengine/internal/clients/fakepubsub.go b/buildbucket/appengine/internal/clients/fakepubsub.go
new file mode 100644
index 0000000..51852bc
--- /dev/null
+++ b/buildbucket/appengine/internal/clients/fakepubsub.go
@@ -0,0 +1,48 @@
+// Copyright 2022 The LUCI Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clients
+
+import (
+	"context"
+
+	"cloud.google.com/go/pubsub"
+	"cloud.google.com/go/pubsub/pstest"
+	"google.golang.org/api/option"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/credentials/insecure"
+)
+
+// SetupTestPubsub creates a new fake Pub/Sub server and the client connection
+// to the server. It also adds the client into the context.
+func SetupTestPubsub(ctx context.Context, cloudProject string) (context.Context, *pstest.Server, *pubsub.Client, error) {
+	srv := pstest.NewServer()
+	conn, err := grpc.Dial(srv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
+	if err != nil {
+		return ctx, nil, nil, err
+	}
+
+	client, err := pubsub.NewClient(ctx, cloudProject, option.WithGRPCConn(conn))
+	if err != nil {
+		return ctx, nil, nil, err
+	}
+
+	mockClients, ok := ctx.Value(&mockPubsubClientKey).(map[string]*pubsub.Client)
+	if !ok {
+		mockClients = make(map[string]*pubsub.Client)
+	}
+	mockClients[cloudProject] = client
+	ctx = context.WithValue(ctx, &mockPubsubClientKey, mockClients)
+	return ctx, srv, client, nil
+}
\ No newline at end of file
diff --git a/buildbucket/appengine/internal/clients/pubsub.go b/buildbucket/appengine/internal/clients/pubsub.go
new file mode 100644
index 0000000..84fa7b8
--- /dev/null
+++ b/buildbucket/appengine/internal/clients/pubsub.go
@@ -0,0 +1,52 @@
+// Copyright 2022 The LUCI Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clients
+
+import (
+	"context"
+
+	"cloud.google.com/go/pubsub"
+	"google.golang.org/api/option"
+	"google.golang.org/grpc"
+
+	"go.chromium.org/luci/common/errors"
+	"go.chromium.org/luci/grpc/grpcmon"
+	"go.chromium.org/luci/server/auth"
+)
+
+var mockPubsubClientKey = "mock pubsub clients key for testing only"
+
+func NewPubsubClient(ctx context.Context, cloudProject, luciProject string) (*pubsub.Client, error) {
+	if mockClients, ok := ctx.Value(&mockPubsubClientKey).(map[string]*pubsub.Client); ok {
+		if mockClient, exist := mockClients[cloudProject]; exist {
+			return mockClient, nil
+		}
+		return nil, errors.Reason("couldn't find mock pubsub client for %s", cloudProject).Err()
+	}
+
+	creds, err := auth.GetPerRPCCredentials(ctx, auth.AsProject, auth.WithProject(luciProject), auth.WithScopes(auth.CloudOAuthScopes...))
+	if err != nil {
+		return nil, err
+	}
+	client, err := pubsub.NewClient(
+		ctx, cloudProject,
+		option.WithGRPCDialOption(grpcmon.WithClientRPCStatsMonitor()),
+		option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds)),
+	)
+	if err != nil {
+		return nil, err
+	}
+	return client, nil
+}
diff --git a/buildbucket/appengine/internal/config/project.go b/buildbucket/appengine/internal/config/project.go
index 3545bcc..4b1fc2f 100644
--- a/buildbucket/appengine/internal/config/project.go
+++ b/buildbucket/appengine/internal/config/project.go
@@ -29,6 +29,7 @@
 	"google.golang.org/protobuf/encoding/prototext"
 	"google.golang.org/protobuf/proto"
 
+	"go.chromium.org/luci/buildbucket/protoutil"
 	"go.chromium.org/luci/common/data/stringset"
 	"go.chromium.org/luci/common/data/strpair"
 	"go.chromium.org/luci/common/errors"
@@ -39,9 +40,9 @@
 	"go.chromium.org/luci/config/validation"
 	"go.chromium.org/luci/gae/service/datastore"
 
+	"go.chromium.org/luci/buildbucket/appengine/internal/clients"
 	"go.chromium.org/luci/buildbucket/appengine/model"
 	pb "go.chromium.org/luci/buildbucket/proto"
-	"go.chromium.org/luci/buildbucket/protoutil"
 )
 
 const CurrentBucketSchemaVersion = 13
@@ -77,6 +78,14 @@
 	cacheNameMaxLength = 4096
 
 	experimentNameRE = regexp.MustCompile(`^[a-z][a-z0-9_]*(?:\.[a-z][a-z0-9_]*)*$`)
+
+	// cloudProjectIDRE is the cloud project identifier regex derived from
+	// https://cloud.google.com/resource-manager/docs/creating-managing-projects#before_you_begin
+	cloudProjectIDRE = regexp.MustCompile(`^[a-z]([a-z0-9-]){4,28}[a-z0-9]$`)
+	// topicNameRE is the full topic name regex derived from https://cloud.google.com/pubsub/docs/admin#resource_names
+	topicNameRE = regexp.MustCompile(`^projects/(.*)/topics/(.*)$`)
+	// topicIDRE is the topic id regex derived from https://cloud.google.com/pubsub/docs/admin#resource_names
+	topicIDRE = regexp.MustCompile(`^[A-Za-z]([0-9A-Za-z\._\-~+%]){3,255}$`)
 )
 
 // changeLog is a temporary struct to track all changes in UpdateProjectCfg.
@@ -413,9 +422,77 @@
 		}
 		ctx.Exit()
 	}
+	validateBuildNotifyTopics(ctx, cfg.BuildsNotificationTopics, project)
 	return nil
 }
 
+// validateBuildNotifyTopics validate `builds_notification_topics` field.
+func validateBuildNotifyTopics(ctx *validation.Context, topics []*pb.BuildbucketCfgTopic, project string) {
+	if len(topics) == 0 {
+		return
+	}
+
+	ctx.Enter("builds_notification_topics")
+	defer ctx.Exit()
+
+	errs := make(errors.MultiError, len(topics))
+	_ = parallel.WorkPool(min(6, len(topics)), func(work chan<- func() error) {
+		for i, topic := range topics {
+			i := i
+			topic := topic
+			cloudProj, topicID, err := validateTopicName(topic.Name)
+			if err != nil {
+				errs[i] = err
+				continue
+			}
+			work <- func() error {
+				client, err := clients.NewPubsubClient(ctx.Context, cloudProj, project)
+				if err != nil {
+					errs[i] = errors.Annotate(err, "failed to create a pubsub client for %q", cloudProj).Err()
+					return nil
+				}
+				cTopic := client.Topic(topicID)
+				switch perms, err := cTopic.IAM().TestPermissions(ctx.Context, []string{"pubsub.topics.publish"}); {
+				case err != nil:
+					errs[i] = errors.Annotate(err, "failed to check luci project account's permission for %s", topic.Name).Err()
+				case len(perms) < 1:
+					errs[i] = errors.Reason("luci project account (%s-scoped@luci-project-accounts.iam.gserviceaccount.com) doesn't have the publish permission for %s", project, topic.Name).Err()
+				}
+				return nil
+			}
+		}
+	})
+
+	for _, err := range errs {
+		if err != nil {
+			ctx.Errorf("builds_notification_topics: %s", err)
+		}
+	}
+}
+
+// validateTopicName validates the format of topic, extract the cloud project and topic id, and return them.
+func validateTopicName(topic string) (string, string, error) {
+	matches := topicNameRE.FindAllStringSubmatch(topic, -1)
+	if matches == nil || len(matches[0]) != 3 {
+		return "", "", errors.Reason("topic %q does not match %q", topic, topicNameRE).Err()
+	}
+
+	cloudProj := matches[0][1]
+	topicID := matches[0][2]
+	// Only internal App Engine projects start "google.com:" with go/gae4g-setup#choosing-the-right-app-engine-version,
+	// all other project ids conform to cloudProjectIDRE.
+	if !strings.HasPrefix(cloudProj, "google.com:") && !cloudProjectIDRE.MatchString(cloudProj) {
+		return "", "", errors.Reason("cloud project id %q does not match %q", cloudProj, cloudProjectIDRE).Err()
+	}
+	if strings.HasPrefix(topicID, "goog") {
+		return "", "", errors.Reason("topic id %q shouldn't begin with the string goog", topicID).Err()
+	}
+	if !topicIDRE.MatchString(topicID) {
+		return "", "", errors.Reason("topic id %q does not match %q", topicID, topicIDRE).Err()
+	}
+	return cloudProj, topicID, nil
+}
+
 // validateProjectSwarming validates project_config.Swarming.
 func validateProjectSwarming(ctx *validation.Context, s *pb.Swarming, wellKnownExperiments stringset.Set) {
 	ctx.Enter("swarming")
diff --git a/buildbucket/appengine/internal/config/project_test.go b/buildbucket/appengine/internal/config/project_test.go
index ba3ddb4..5b4e0fa 100644
--- a/buildbucket/appengine/internal/config/project_test.go
+++ b/buildbucket/appengine/internal/config/project_test.go
@@ -567,6 +567,31 @@
 			So(allErrs, ShouldContainSubstring, `key "is_experimental": reserved key`)
 		})
 	})
+
+	Convey("validateTopicName", t, func() {
+		Convey("wrong topic name", func() {
+			_, _, err := validateTopicName("projects/adsf/")
+			So(err, ShouldErrLike, `topic "projects/adsf/" does not match "^projects/(.*)/topics/(.*)$"`)
+		})
+		Convey("wrong project identifier", func() {
+			_, _, err := validateTopicName("projects/pro/topics/topic1")
+			So(err, ShouldErrLike,`cloud project id "pro" does not match "^[a-z]([a-z0-9-]){4,28}[a-z0-9]$"`)
+		})
+		Convey("wrong topic id prefix", func() {
+			_, _, err := validateTopicName("projects/cloud-project/topics/goog11")
+			So(err, ShouldErrLike, `topic id "goog11" shouldn't begin with the string goog`)
+		})
+		Convey("wrong topic id format", func() {
+			_, _, err := validateTopicName("projects/cloud-project/topics/abc##")
+			So(err, ShouldErrLike,  `topic id "abc##" does not match "^[A-Za-z]([0-9A-Za-z\\._\\-~+%]){3,255}$"`)
+		})
+		Convey("success", func() {
+			cloudProj, topic, err := validateTopicName("projects/cloud-project/topics/mytopic")
+			So(err,ShouldBeNil)
+			So(cloudProj, ShouldEqual, "cloud-project")
+			So(topic, ShouldEqual, "mytopic")
+		})
+	})
 }
 
 func TestUpdateProject(t *testing.T) {