blob: dd7e76cd5e4b5179464c1e4924db7e61ca9562cf [file] [log] [blame]
// Copyright 2018 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package butler
import (
"github.com/golang/protobuf/proto"
"go.chromium.org/luci/logdog/api/logpb"
"go.chromium.org/luci/logdog/common/types"
)
// StreamRegistrationCallback is a callback to invoke when a new stream is
// registered with this butler.
//
// Expects passed *logpb.LogStreamDescriptor reference to be safe to keep, and
// should treat it as read-only.
//
// See streamConfig.callback.
type StreamRegistrationCallback func(*logpb.LogStreamDescriptor) StreamChunkCallback
// StreamChunkCallback is a callback to invoke on a complete LogEntry.
//
// Called once with nil when this stream has come to an end.
//
// See streamConfig.callback.
type StreamChunkCallback func(*logpb.LogEntry)
// AddStreamRegistrationCallback adds a new callback to this Butler.
//
// The callback is called on new streams and returns a chunk callback to attach
// to the stream or nil if you don't want to monitor the stream.
//
// If multiple callbacks are all interested in the same stream, the first one
// wins.
//
// Wrapping
//
// If `wrap` is true, the callback is wrapped internally to buffer LogEntries
// until they're complete.
//
// In wrapped callbacks for text and datagram streams, LogEntry .TimeOffset,
// and .PrefixIndex will be 0. .StreamIndex and .Sequence WILL NOT correspond
// to the values that the logdog service sees. They will, however, be
// internally consistent within the stream.
//
// Wrapped datagram streams never send a partial datagram; If the logdog
// server or stream is shut down while we have a partial datagram buffered,
// the partially buffered datagram will not be observed by the buffered
// callback.
//
// Wrapping a binary stream is a noop (i.e. your callback will see the exact
// same values wrapped and unwrapped).
//
// When the stream ends (either due to EOF from the user, or when the butler
// is stopped), your callback will be invoked exactly once with `nil`.
func (b *Butler) AddStreamRegistrationCallback(cb StreamRegistrationCallback, wrap bool) {
b.streamRegistrationCallbacksMu.Lock()
defer b.streamRegistrationCallbacksMu.Unlock()
b.streamRegistrationCallbacks = append(b.streamRegistrationCallbacks,
registeredCallback{cb, wrap})
}
func (b *Butler) maybeAddStreamCallback(d *logpb.LogStreamDescriptor) {
b.streamRegistrationCallbacksMu.RLock()
defer b.streamRegistrationCallbacksMu.RUnlock()
for _, cb := range b.streamRegistrationCallbacks {
if ret := cb.cb(proto.Clone(d).(*logpb.LogStreamDescriptor)); ret != nil {
if cb.wrap {
switch d.StreamType {
case logpb.StreamType_TEXT:
ret = getWrappedTextCallback(ret)
case logpb.StreamType_DATAGRAM:
ret = getWrappedDatagramCallback(ret)
}
}
b.streamCallbacksMu.Lock()
defer b.streamCallbacksMu.Unlock()
b.streamCallbacks[d.Name] = ret
return
}
}
}
func (b *Butler) runCallbacks(bundle *logpb.ButlerLogBundle) {
b.streamCallbacksMu.RLock()
cbsCopy := make(map[string]StreamChunkCallback, len(b.streamCallbacks))
for k, v := range b.streamCallbacks {
cbsCopy[k] = v
}
b.streamCallbacksMu.RUnlock()
for _, entry := range bundle.Entries {
name := entry.Desc.Name
cb := cbsCopy[name]
if cb != nil {
for _, log := range entry.Logs {
cb(log)
}
}
if entry.Terminal {
b.finalCallback(name)
}
}
}
// finalCallback is called from two places.
//
// Once, from runStreams, in the event that the stream read NO data from the
// user.
//
// The other, from runCallbacks, in the event that the bundler produced the
// terminal index for the stream.
//
// Ideally this would ONLY be used from runCallbacks... however bundler treats
// streams-with-data and streams-without-data asymmetrically.
//
// TODO(iannucci): Make ALL streams appear via bundler, even empty ones.
func (b *Butler) finalCallback(name string) {
b.streamCallbacksMu.Lock()
cb, ok := b.streamCallbacks[name]
if ok {
delete(b.streamCallbacks, name)
}
b.streamCallbacksMu.Unlock()
if cb != nil {
cb(nil)
}
b.streams.DrainedStream(types.StreamName(name))
}