blob: 1b8a87b9ae75b845304e6624650146762e6ed27f [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package coordinator
import (
"time"
gcps "cloud.google.com/go/pubsub"
"github.com/golang/protobuf/proto"
log "github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/common/retry"
"github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1"
"golang.org/x/net/context"
)
// ArchivalPublisher is capable of publishing archival requests.
type ArchivalPublisher interface {
// Close shutdowns this publisher instance, releasing all its resources.
Close() error
// Publish publishes the supplied ArchiveTask.
Publish(context.Context, *logdog.ArchiveTask) error
// NewPublishIndex returns a new publish index. Each publish index is unique
// within its request.
NewPublishIndex() uint64
}
type pubsubArchivalPublisher struct {
// client is Pub/Sub client used by the publisher.
//
// This client is owned by the prodServicesInst that created this instnace,
// and should not be closed on shutdown here.
client *gcps.Client
// topic is the authenticated Pub/Sub topic handle to publish to.
topic *gcps.Topic
// publishIndexFunc is a function that will return a unique publish index
// for this request.
publishIndexFunc func() uint64
}
func (p *pubsubArchivalPublisher) Close() error { return nil }
func (p *pubsubArchivalPublisher) Publish(c context.Context, t *logdog.ArchiveTask) error {
d, err := proto.Marshal(t)
if err != nil {
log.WithError(err).Errorf(c, "Failed to marshal task.")
return err
}
// TODO: Route this through some system (e.g., task queue, tumble) that can
// impose a dispatch delay for the settle period.
msg := gcps.Message{
Data: d,
}
return retry.Retry(c, retry.Default, func() error {
log.Fields{
"project": t.Project,
"hash": t.Id,
"key": t.Key,
}.Infof(c, "Publishing archival message for stream.")
_, err := p.topic.Publish(c, &msg).Get(c)
return err
}, func(err error, d time.Duration) {
log.Fields{
log.ErrorKey: err,
"delay": d,
}.Warningf(c, "Failed to publish task. Retrying...")
})
}
func (p *pubsubArchivalPublisher) NewPublishIndex() uint64 {
return p.publishIndexFunc()
}