[cv] add hostname in PubSubRun message
It can be used to distinguish messages between cv hosts,
when an application receives messages from multiple cv hosts,
such as staging and prod.
Bug: 1226144
Change-Id: I216669ed680d875855a2628c610a5a7b0edbb66c
Reviewed-on: https://chromium-review.googlesource.com/c/infra/luci/luci-go/+/3235713
Reviewed-by: Yiwei Zhang <yiwzhang@google.com>
Reviewed-by: Andrii Shyshkalov <tandrii@google.com>
Commit-Queue: Scott Lee <ddoman@chromium.org>
diff --git a/cv/api/v1/pubsub.pb.go b/cv/api/v1/pubsub.pb.go
index b6615e8..2123a03 100644
--- a/cv/api/v1/pubsub.pb.go
+++ b/cv/api/v1/pubsub.pb.go
@@ -54,6 +54,8 @@
Status Run_Status `protobuf:"varint,2,opt,name=status,proto3,enum=cv.v1.Run_Status" json:"status,omitempty"`
// eversion is the entity version, which is monotonically increasing.
Eversion int64 `protobuf:"varint,3,opt,name=eversion,proto3" json:"eversion,omitempty"`
+ // The hostname of the CV service that published the message.
+ Hostname string `protobuf:"bytes,4,opt,name=hostname,proto3" json:"hostname,omitempty"`
}
func (x *PubSubRun) Reset() {
@@ -109,6 +111,13 @@
return 0
}
+func (x *PubSubRun) GetHostname() string {
+ if x != nil {
+ return x.Hostname
+ }
+ return ""
+}
+
var File_go_chromium_org_luci_cv_api_v1_pubsub_proto protoreflect.FileDescriptor
var file_go_chromium_org_luci_cv_api_v1_pubsub_proto_rawDesc = []byte{
@@ -117,17 +126,18 @@
0x2f, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x63,
0x76, 0x2e, 0x76, 0x31, 0x1a, 0x28, 0x67, 0x6f, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x69, 0x75,
0x6d, 0x2e, 0x6f, 0x72, 0x67, 0x2f, 0x6c, 0x75, 0x63, 0x69, 0x2f, 0x63, 0x76, 0x2f, 0x61, 0x70,
- 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x72, 0x75, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x62,
+ 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x72, 0x75, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x7e,
0x0a, 0x09, 0x50, 0x75, 0x62, 0x53, 0x75, 0x62, 0x52, 0x75, 0x6e, 0x12, 0x0e, 0x0a, 0x02, 0x69,
0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x29, 0x0a, 0x06, 0x73,
0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x11, 0x2e, 0x63, 0x76,
0x2e, 0x76, 0x31, 0x2e, 0x52, 0x75, 0x6e, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06,
0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x65, 0x76, 0x65, 0x72, 0x73, 0x69,
0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x65, 0x76, 0x65, 0x72, 0x73, 0x69,
- 0x6f, 0x6e, 0x42, 0x25, 0x5a, 0x23, 0x67, 0x6f, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x69, 0x75,
- 0x6d, 0x2e, 0x6f, 0x72, 0x67, 0x2f, 0x6c, 0x75, 0x63, 0x69, 0x2f, 0x63, 0x76, 0x2f, 0x61, 0x70,
- 0x69, 0x2f, 0x76, 0x31, 0x3b, 0x63, 0x76, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f,
- 0x33,
+ 0x6f, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x68, 0x6f, 0x73, 0x74, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x04,
+ 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x68, 0x6f, 0x73, 0x74, 0x6e, 0x61, 0x6d, 0x65, 0x42, 0x25,
+ 0x5a, 0x23, 0x67, 0x6f, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x69, 0x75, 0x6d, 0x2e, 0x6f, 0x72,
+ 0x67, 0x2f, 0x6c, 0x75, 0x63, 0x69, 0x2f, 0x63, 0x76, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31,
+ 0x3b, 0x63, 0x76, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
diff --git a/cv/api/v1/pubsub.proto b/cv/api/v1/pubsub.proto
index ecb700e..a30d789 100644
--- a/cv/api/v1/pubsub.proto
+++ b/cv/api/v1/pubsub.proto
@@ -36,4 +36,6 @@
Run.Status status = 2;
// eversion is the entity version, which is monotonically increasing.
int64 eversion = 3;
+ // The hostname of the CV service that published the message.
+ string hostname = 4;
}
diff --git a/cv/internal/run/impl/handler/common_test.go b/cv/internal/run/impl/handler/common_test.go
index fc52482..a78f9cb 100644
--- a/cv/internal/run/impl/handler/common_test.go
+++ b/cv/internal/run/impl/handler/common_test.go
@@ -356,7 +356,7 @@
TreeClient: ct.TreeFake.Client(),
GFactory: ct.GFactory(),
BQExporter: bq.NewExporter(ct.TQDispatcher, ct.BQFake, ct.Env),
- Publisher: pubsub.NewPublisher(ct.TQDispatcher),
+ Publisher: pubsub.NewPublisher(ct.TQDispatcher, ct.Env),
}
return impl, deps
}
diff --git a/cv/internal/run/impl/manager.go b/cv/internal/run/impl/manager.go
index d81b174..370e58c 100644
--- a/cv/internal/run/impl/manager.go
+++ b/cv/internal/run/impl/manager.go
@@ -95,7 +95,7 @@
BQExporter: runbq.NewExporter(n.TasksBinding.TQDispatcher, bqc, env),
GFactory: g,
TreeClient: tc,
- Publisher: pubsub.NewPublisher(n.TasksBinding.TQDispatcher),
+ Publisher: pubsub.NewPublisher(n.TasksBinding.TQDispatcher, env),
},
}
n.TasksBinding.ManageRun.AttachHandler(
diff --git a/cv/internal/run/pubsub/publisher.go b/cv/internal/run/pubsub/publisher.go
index 5f0f917..3233bfe 100644
--- a/cv/internal/run/pubsub/publisher.go
+++ b/cv/internal/run/pubsub/publisher.go
@@ -41,7 +41,7 @@
// NewPublisher creates a new Publisher and registers TaskClasses for run
// events.
-func NewPublisher(tqd *tq.Dispatcher) *Publisher {
+func NewPublisher(tqd *tq.Dispatcher, env *common.Env) *Publisher {
p := &Publisher{tqd}
tqd.RegisterTaskClass(tq.TaskClass{
ID: v1RunEndedTaskClass,
@@ -54,6 +54,7 @@
Id: t.GetPublicId(),
Status: versioning.RunStatusV1(t.GetStatus()),
Eversion: t.GetEversion(),
+ Hostname: env.LogicalHostname,
})
if err != nil {
return nil, err
diff --git a/cv/internal/run/pubsub/publisher_test.go b/cv/internal/run/pubsub/publisher_test.go
index eaabe8f..03cc133 100644
--- a/cv/internal/run/pubsub/publisher_test.go
+++ b/cv/internal/run/pubsub/publisher_test.go
@@ -40,7 +40,7 @@
ctx, cancel := ct.SetUp()
defer cancel()
- publisher := NewPublisher(ct.TQDispatcher)
+ publisher := NewPublisher(ct.TQDispatcher, ct.Env)
epoch := ct.Clock.Now().UTC()
r := run.Run{
ID: common.MakeRunID("lproject", epoch, 1, []byte("aaa")),
@@ -80,6 +80,7 @@
Id: r.ID.PublicID(),
Status: cvpb.Run_SUCCEEDED,
Eversion: int64(r.EVersion),
+ Hostname: ct.Env.LogicalHostname,
})
})
})