[cv] add hostname in PubSubRun message

It can be used to distinguish messages between cv hosts,
when an application receives messages from multiple cv hosts,
such as staging and prod.

Bug: 1226144
Change-Id: I216669ed680d875855a2628c610a5a7b0edbb66c
Reviewed-on: https://chromium-review.googlesource.com/c/infra/luci/luci-go/+/3235713
Reviewed-by: Yiwei Zhang <yiwzhang@google.com>
Reviewed-by: Andrii Shyshkalov <tandrii@google.com>
Commit-Queue: Scott Lee <ddoman@chromium.org>
diff --git a/cv/api/v1/pubsub.pb.go b/cv/api/v1/pubsub.pb.go
index b6615e8..2123a03 100644
--- a/cv/api/v1/pubsub.pb.go
+++ b/cv/api/v1/pubsub.pb.go
@@ -54,6 +54,8 @@
 	Status Run_Status `protobuf:"varint,2,opt,name=status,proto3,enum=cv.v1.Run_Status" json:"status,omitempty"`
 	// eversion is the entity version, which is monotonically increasing.
 	Eversion int64 `protobuf:"varint,3,opt,name=eversion,proto3" json:"eversion,omitempty"`
+	// The hostname of the CV service that published the message.
+	Hostname string `protobuf:"bytes,4,opt,name=hostname,proto3" json:"hostname,omitempty"`
 }
 
 func (x *PubSubRun) Reset() {
@@ -109,6 +111,13 @@
 	return 0
 }
 
+func (x *PubSubRun) GetHostname() string {
+	if x != nil {
+		return x.Hostname
+	}
+	return ""
+}
+
 var File_go_chromium_org_luci_cv_api_v1_pubsub_proto protoreflect.FileDescriptor
 
 var file_go_chromium_org_luci_cv_api_v1_pubsub_proto_rawDesc = []byte{
@@ -117,17 +126,18 @@
 	0x2f, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x63,
 	0x76, 0x2e, 0x76, 0x31, 0x1a, 0x28, 0x67, 0x6f, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x69, 0x75,
 	0x6d, 0x2e, 0x6f, 0x72, 0x67, 0x2f, 0x6c, 0x75, 0x63, 0x69, 0x2f, 0x63, 0x76, 0x2f, 0x61, 0x70,
-	0x69, 0x2f, 0x76, 0x31, 0x2f, 0x72, 0x75, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x62,
+	0x69, 0x2f, 0x76, 0x31, 0x2f, 0x72, 0x75, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x7e,
 	0x0a, 0x09, 0x50, 0x75, 0x62, 0x53, 0x75, 0x62, 0x52, 0x75, 0x6e, 0x12, 0x0e, 0x0a, 0x02, 0x69,
 	0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x29, 0x0a, 0x06, 0x73,
 	0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x11, 0x2e, 0x63, 0x76,
 	0x2e, 0x76, 0x31, 0x2e, 0x52, 0x75, 0x6e, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06,
 	0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x65, 0x76, 0x65, 0x72, 0x73, 0x69,
 	0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x65, 0x76, 0x65, 0x72, 0x73, 0x69,
-	0x6f, 0x6e, 0x42, 0x25, 0x5a, 0x23, 0x67, 0x6f, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x69, 0x75,
-	0x6d, 0x2e, 0x6f, 0x72, 0x67, 0x2f, 0x6c, 0x75, 0x63, 0x69, 0x2f, 0x63, 0x76, 0x2f, 0x61, 0x70,
-	0x69, 0x2f, 0x76, 0x31, 0x3b, 0x63, 0x76, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f,
-	0x33,
+	0x6f, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x68, 0x6f, 0x73, 0x74, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x04,
+	0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x68, 0x6f, 0x73, 0x74, 0x6e, 0x61, 0x6d, 0x65, 0x42, 0x25,
+	0x5a, 0x23, 0x67, 0x6f, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x69, 0x75, 0x6d, 0x2e, 0x6f, 0x72,
+	0x67, 0x2f, 0x6c, 0x75, 0x63, 0x69, 0x2f, 0x63, 0x76, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31,
+	0x3b, 0x63, 0x76, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
 }
 
 var (
diff --git a/cv/api/v1/pubsub.proto b/cv/api/v1/pubsub.proto
index ecb700e..a30d789 100644
--- a/cv/api/v1/pubsub.proto
+++ b/cv/api/v1/pubsub.proto
@@ -36,4 +36,6 @@
   Run.Status status = 2;
   // eversion is the entity version, which is monotonically increasing.
   int64 eversion = 3;
+  // The hostname of the CV service that published the message.
+  string hostname = 4;
 }
diff --git a/cv/internal/run/impl/handler/common_test.go b/cv/internal/run/impl/handler/common_test.go
index fc52482..a78f9cb 100644
--- a/cv/internal/run/impl/handler/common_test.go
+++ b/cv/internal/run/impl/handler/common_test.go
@@ -356,7 +356,7 @@
 		TreeClient: ct.TreeFake.Client(),
 		GFactory:   ct.GFactory(),
 		BQExporter: bq.NewExporter(ct.TQDispatcher, ct.BQFake, ct.Env),
-		Publisher:  pubsub.NewPublisher(ct.TQDispatcher),
+		Publisher:  pubsub.NewPublisher(ct.TQDispatcher, ct.Env),
 	}
 	return impl, deps
 }
diff --git a/cv/internal/run/impl/manager.go b/cv/internal/run/impl/manager.go
index d81b174..370e58c 100644
--- a/cv/internal/run/impl/manager.go
+++ b/cv/internal/run/impl/manager.go
@@ -95,7 +95,7 @@
 			BQExporter: runbq.NewExporter(n.TasksBinding.TQDispatcher, bqc, env),
 			GFactory:   g,
 			TreeClient: tc,
-			Publisher:  pubsub.NewPublisher(n.TasksBinding.TQDispatcher),
+			Publisher:  pubsub.NewPublisher(n.TasksBinding.TQDispatcher, env),
 		},
 	}
 	n.TasksBinding.ManageRun.AttachHandler(
diff --git a/cv/internal/run/pubsub/publisher.go b/cv/internal/run/pubsub/publisher.go
index 5f0f917..3233bfe 100644
--- a/cv/internal/run/pubsub/publisher.go
+++ b/cv/internal/run/pubsub/publisher.go
@@ -41,7 +41,7 @@
 
 // NewPublisher creates a new Publisher and registers TaskClasses for run
 // events.
-func NewPublisher(tqd *tq.Dispatcher) *Publisher {
+func NewPublisher(tqd *tq.Dispatcher, env *common.Env) *Publisher {
 	p := &Publisher{tqd}
 	tqd.RegisterTaskClass(tq.TaskClass{
 		ID:        v1RunEndedTaskClass,
@@ -54,6 +54,7 @@
 				Id:       t.GetPublicId(),
 				Status:   versioning.RunStatusV1(t.GetStatus()),
 				Eversion: t.GetEversion(),
+				Hostname: env.LogicalHostname,
 			})
 			if err != nil {
 				return nil, err
diff --git a/cv/internal/run/pubsub/publisher_test.go b/cv/internal/run/pubsub/publisher_test.go
index eaabe8f..03cc133 100644
--- a/cv/internal/run/pubsub/publisher_test.go
+++ b/cv/internal/run/pubsub/publisher_test.go
@@ -40,7 +40,7 @@
 		ctx, cancel := ct.SetUp()
 		defer cancel()
 
-		publisher := NewPublisher(ct.TQDispatcher)
+		publisher := NewPublisher(ct.TQDispatcher, ct.Env)
 		epoch := ct.Clock.Now().UTC()
 		r := run.Run{
 			ID:            common.MakeRunID("lproject", epoch, 1, []byte("aaa")),
@@ -80,6 +80,7 @@
 					Id:       r.ID.PublicID(),
 					Status:   cvpb.Run_SUCCEEDED,
 					Eversion: int64(r.EVersion),
+					Hostname: ct.Env.LogicalHostname,
 				})
 			})
 		})