blob: 6a4b27604fbbdddf7fd5bd706f69fd6e233fed9b [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsmon
import (
"context"
"errors"
"sync"
"sync/atomic"
"go.chromium.org/luci/common/tsmon/monitor"
"go.chromium.org/luci/common/tsmon/registry"
"go.chromium.org/luci/common/tsmon/store"
"go.chromium.org/luci/common/tsmon/types"
)
// State holds the configuration of the tsmon library. There is one global
// instance of State, but it can be overridden in a Context by tests.
type State struct {
mu sync.RWMutex
store store.Store
monitor monitor.Monitor
flusher *autoFlusher
callbacks []Callback
globalCallbacks []GlobalCallback
invokeGlobalCallbacksOnFlush int32
}
// NewState returns a new State instance, configured with a nil store and nil
// monitor. By default, global callbacks that are registered will be invoked
// when flushing registered metrics.
func NewState() *State {
return &State{
store: store.NewNilStore(),
monitor: monitor.NewNilMonitor(),
invokeGlobalCallbacksOnFlush: 1,
}
}
var globalState = NewState()
// GetState returns the State instance held in the context (if set) or else
// returns the global State.
func GetState(ctx context.Context) *State {
return stateFromContext(ctx)
}
// Callbacks returns all registered Callbacks.
func (s *State) Callbacks() []Callback {
s.mu.RLock()
defer s.mu.RUnlock()
return append([]Callback{}, s.callbacks...)
}
// GlobalCallbacks returns all registered GlobalCallbacks.
func (s *State) GlobalCallbacks() []GlobalCallback {
s.mu.RLock()
defer s.mu.RUnlock()
return append([]GlobalCallback{}, s.globalCallbacks...)
}
// InhibitGlobalCallbacksOnFlush signals that the registered global callbacks
// are not to be executed upon flushing registered metrics.
func (s *State) InhibitGlobalCallbacksOnFlush() {
atomic.StoreInt32(&s.invokeGlobalCallbacksOnFlush, 0)
}
// InvokeGlobalCallbacksOnFlush signals that the registered global callbacks
// are to be be executed upon flushing registered metrics.
func (s *State) InvokeGlobalCallbacksOnFlush() {
atomic.StoreInt32(&s.invokeGlobalCallbacksOnFlush, 1)
}
// Monitor returns the State's monitor.
func (s *State) Monitor() monitor.Monitor {
s.mu.RLock()
defer s.mu.RUnlock()
return s.monitor
}
// RegisterCallbacks registers the given Callback(s) with State.
func (s *State) RegisterCallbacks(f ...Callback) {
s.mu.Lock()
defer s.mu.Unlock()
s.callbacks = append(s.callbacks, f...)
}
// RegisterGlobalCallbacks registers the given GlobalCallback(s) with State.
func (s *State) RegisterGlobalCallbacks(f ...GlobalCallback) {
s.mu.Lock()
defer s.mu.Unlock()
s.globalCallbacks = append(s.globalCallbacks, f...)
}
// Store returns the State's store.
func (s *State) Store() store.Store {
s.mu.RLock()
defer s.mu.RUnlock()
return s.store
}
// SetMonitor sets the Store's monitor.
func (s *State) SetMonitor(m monitor.Monitor) {
s.mu.Lock()
defer s.mu.Unlock()
s.monitor = m
}
// SetStore changes the metric store. All metrics that were registered with
// the old store will be re-registered on the new store.
func (s *State) SetStore(st store.Store) {
s.mu.Lock()
defer s.mu.Unlock()
s.store = st
}
// ResetCumulativeMetrics resets only cumulative metrics.
func (s *State) ResetCumulativeMetrics(ctx context.Context) {
store := s.Store()
registry.Iter(func(m types.Metric) {
if m.Info().ValueType.IsCumulative() {
store.Reset(ctx, m)
}
})
}
// RunGlobalCallbacks runs all registered global callbacks that produce global
// metrics.
//
// See RegisterGlobalCallback for more info.
func (s *State) RunGlobalCallbacks(ctx context.Context) {
for _, cb := range s.GlobalCallbacks() {
cb.Callback(ctx)
}
}
// Flush sends all the metrics that are registered in the application.
//
// Uses given monitor if not nil, otherwise the State's current monitor.
func (s *State) Flush(ctx context.Context, mon monitor.Monitor) error {
if mon == nil {
mon = s.Monitor()
}
if mon == nil {
return errors.New("no tsmon Monitor is configured")
}
// Run any callbacks that have been registered to populate values in callback
// metrics.
s.runCallbacks(ctx)
if atomic.LoadInt32(&s.invokeGlobalCallbacksOnFlush) != 0 {
s.RunGlobalCallbacks(ctx)
}
cells := s.Store().GetAll(ctx)
if len(cells) == 0 {
return nil
}
// Split up the payload into chunks if there are too many cells.
chunkSize := mon.ChunkSize()
if chunkSize == 0 {
chunkSize = len(cells)
}
var lastErr error
for s := 0; s < len(cells); s += chunkSize {
e := s + chunkSize
if e > len(cells) {
e = len(cells)
}
if err := mon.Send(ctx, cells[s:e]); err != nil {
lastErr = err
}
}
s.resetGlobalCallbackMetrics(ctx)
return lastErr
}
// resetGlobalCallbackMetrics resets metrics produced by global callbacks.
//
// See RegisterGlobalCallback for more info.
func (s *State) resetGlobalCallbackMetrics(ctx context.Context) {
store := s.Store()
for _, cb := range s.GlobalCallbacks() {
for _, m := range cb.metrics {
store.Reset(ctx, m)
}
}
}
// runCallbacks runs any callbacks that have been registered to populate values
// in callback metrics.
func (s *State) runCallbacks(ctx context.Context) {
for _, cb := range s.Callbacks() {
cb(ctx)
}
}