// Copyright 2017 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package registration
import (
log ""
ds ""
logdog ""
func getTopicAndExpiration(c context.Context, req *logdog.RegisterPrefixRequest) (pubsub.Topic, time.Duration, error) {
// Load our service and project configurations.
cfg, err := config.Config(c)
if err != nil {
log.WithError(err).Errorf(c, "Failed to load service configuration.")
return "", 0, grpcutil.Internal
pcfg, err := coordinator.ProjectConfig(c)
if err != nil {
log.WithError(err).Errorf(c, "Failed to load project configuration.")
return "", 0, grpcutil.Internal
// Determine our Pub/Sub topic.
cfgTransport := cfg.Transport
if cfgTransport == nil {
log.Errorf(c, "Missing transport configuration.")
return "", 0, grpcutil.Internal
cfgTransportPubSub := cfgTransport.GetPubsub()
if cfgTransportPubSub == nil {
log.Errorf(c, "Missing transport Pub/Sub configuration.")
return "", 0, grpcutil.Internal
pubsubTopic := pubsub.NewTopic(cfgTransportPubSub.Project, cfgTransportPubSub.Topic)
if err := pubsubTopic.Validate(); err != nil {
log.ErrorKey: err,
"topic": pubsubTopic,
}.Errorf(c, "Invalid transport Pub/Sub topic.")
return "", 0, grpcutil.Internal
expiration := endpoints.MinDuration(req.Expiration, cfg.Coordinator.PrefixExpiration, pcfg.PrefixExpiration)
return pubsubTopic, expiration, nil
func (s *server) RegisterPrefix(c context.Context, req *logdog.RegisterPrefixRequest) (*logdog.RegisterPrefixResponse, error) {
"project": req.Project,
"realm": req.Realm,
"prefix": req.Prefix,
"source": req.SourceInfo,
"expiration": google.DurationFromProto(req.Expiration),
}.Debugf(c, "Registering log prefix.")
// Note: if the project has `enforce_realms_in` setting ON and req.Realm is
// empty, CheckPermission below will barf with InvalidArgument error. And when
// not enforcing realm ACLs, it is fine to pass empty "realm" string, it will
// be replaced with @legacy.
var realm string
if req.Realm != "" {
if err := realms.ValidateRealmName(req.Realm, realms.ProjectScope); err != nil {
log.WithError(err).Warningf(c, "Invalid realm.")
return nil, grpcutil.Errf(codes.InvalidArgument, "%s", err)
realm = realms.Join(req.Project, req.Realm)
// Confirm that the Prefix is a valid stream name.
prefix := types.StreamName(req.Prefix)
if err := prefix.Validate(); err != nil {
log.WithError(err).Warningf(c, "Invalid prefix.")
return nil, grpcutil.Errf(codes.InvalidArgument, "invalid prefix")
// Check the caller is allowed to register prefixes in the requested realm.
if err := coordinator.CheckPermission(c, coordinator.PermLogsCreate, prefix, realm); err != nil {
return nil, err
pubsubTopic, expiration, err := getTopicAndExpiration(c, req)
if err != nil {
log.WithError(err).Errorf(c, "Cannot get pubsubTopic")
return nil, grpcutil.Errf(codes.Internal, "unable to get pubsubTopic")
// Determine our prefix expiration. This must be > 0, else there will be no
// window when log streams can be registered and this prefix is useless.
// We will choose the shortest expiration window defined by our request and
// our project and service configurations.
if expiration <= 0 {
log.Errorf(c, "Refusing to register prefix in expired state.")
return nil, grpcutil.Errf(codes.InvalidArgument, "no prefix expiration defined")
// prep our response with the pubsubTopic
resp := &logdog.RegisterPrefixResponse{
LogBundleTopic: string(pubsubTopic),
// Has the prefix already been registered?
pfx := &coordinator.LogPrefix{ID: coordinator.LogPrefixID(prefix)}
// Check for existing prefix registration (non-transactional).
switch err := ds.Get(c, pfx); err {
case ds.ErrNoSuchEntity:
// we'll register it shortly
case nil:
if pfx.IsRetry(c, req.OpNonce) {
log.Infof(c, "The prefix is registered, but we have valid retry (non-transactional).")
resp.Secret = pfx.Secret
return resp, nil
log.Errorf(c, "The prefix is already registered (non-transactional).")
return nil, grpcutil.AlreadyExists
log.WithError(err).Errorf(c, "Failed to check for existing prefix (non-transactional).")
return nil, grpcutil.Internal
// The prefix doesn't appear to be registered. Prepare to transactionally
// register it.
secret := make(types.PrefixSecret, types.PrefixSecretLength)
if _, err := cryptorand.Read(c, []byte(secret)); err != nil {
log.WithError(err).Errorf(c, "Failed to generate prefix secret.")
return nil, grpcutil.Internal
pfx.Created = clock.Now(c).UTC()
pfx.Prefix = string(prefix)
pfx.Realm = realm
pfx.Source = req.SourceInfo
pfx.Secret = []byte(secret)
pfx.Expiration = pfx.Created.Add(expiration)
pfx.OpNonce = req.OpNonce
if err := pfx.Validate(); err != nil {
log.WithError(err).Errorf(c, "Invalid LogPrefix.")
return nil, grpcutil.Errf(codes.InvalidArgument, "LogPrefix definition invalid")
resp.Secret = pfx.Secret
// Transactionally register the prefix.
err = ds.RunInTransaction(c, func(c context.Context) error {
// Get the prefix (if it exists)
switch err := ds.Get(c, pfx); err {
case ds.ErrNoSuchEntity:
// we need to put it
case nil:
if pfx.IsRetry(c, req.OpNonce) {
log.Infof(c, "The prefix is registered, but we have valid retry (transactional).")
return nil
log.Errorf(c, "The prefix is already registered (transactional).")
return grpcutil.AlreadyExists
log.WithError(err).Errorf(c, "Failed to check for existing prefix (transactional).")
return grpcutil.Internal
if err := ds.Put(c, pfx); err != nil {
log.WithError(err).Errorf(c, "Failed to register prefix.")
return grpcutil.Internal
log.Infof(c, "The prefix was successfully registered.")
return nil
}, nil)
if err != nil {
log.WithError(err).Errorf(c, "Failed to register prefix (transactional).")
return nil, err
return resp, nil