blob: d1dfe43012fbe05f9742444503a0749a671116d6 [file]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsmon
import (
"fmt"
"net/http"
"net/url"
"strings"
"sync"
"time"
"golang.org/x/net/context"
"google.golang.org/appengine"
"github.com/luci/gae/service/info"
"github.com/luci/luci-go/common/clock"
"github.com/luci/luci-go/common/iotools"
"github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/common/tsmon"
"github.com/luci/luci-go/common/tsmon/metric"
"github.com/luci/luci-go/common/tsmon/monitor"
"github.com/luci/luci-go/common/tsmon/store"
"github.com/luci/luci-go/common/tsmon/target"
"github.com/luci/luci-go/server/auth"
"github.com/luci/luci-go/server/router"
)
// State holds the configuration of the tsmon library for GAE.
//
// Define it as a global variable and inject it in the request contexts using
// State.Middleware().
//
// It will initialize itself from the tsmon state in the passed context on
// a first use, mutating it along the way. Assumes caller is consistently using
// contexts configured with exact same tsmon state (in a vast majority of cases
// it would be global tsmon state that corresponds to context.Background, but
// unit tests may provide its own state).
//
// Will panic if it detects that caller has changed tsmon state in the context
// between the requests.
type State struct {
lock sync.RWMutex
state *tsmon.State
lastSettings tsmonSettings
flushingNow bool
lastFlushed time.Time
// testingMonitor is mocked monitor used in unit tests.
testingMonitor monitor.Monitor
// testingSettings if not nil are used in unit tests.
testingSettings *tsmonSettings
}
// responseWriter wraps a given http.ResponseWriter, records its
// status code and response size.
type responseWriter struct {
http.ResponseWriter
writer iotools.CountingWriter
status int
}
func (rw *responseWriter) Write(buf []byte) (int, error) { return rw.writer.Write(buf) }
func (rw *responseWriter) Size() int64 { return rw.writer.Count }
func (rw *responseWriter) Status() int { return rw.status }
func (rw *responseWriter) WriteHeader(code int) {
rw.status = code
rw.ResponseWriter.WriteHeader(code)
}
func newResponseWriter(rw http.ResponseWriter) *responseWriter {
return &responseWriter{
ResponseWriter: rw,
writer: iotools.CountingWriter{Writer: rw},
status: http.StatusOK,
}
}
// Middleware is a middleware that must be inserted into the middleware
// chain to enable tsmon metrics to be send on AppEngine.
func (s *State) Middleware(c *router.Context, next router.Handler) {
state, settings := s.checkSettings(c.Context)
if settings.Enabled {
started := clock.Now(c.Context)
req := c.Request
userAgent, ok := req.Header["User-Agent"]
if !ok || len(userAgent) == 0 {
userAgent = []string{"Unknown"}
}
ctx := c.Context
contentLength := c.Request.ContentLength
nrw := newResponseWriter(c.Writer)
c.Writer = nrw
defer func() {
dur := clock.Now(ctx).Sub(started)
metric.UpdateServerMetrics(ctx, "/", nrw.Status(), dur,
contentLength, nrw.Size(), userAgent[0])
}()
next(c)
s.flushIfNeeded(ctx, req, state, settings)
} else {
next(c)
}
}
// checkSettings fetches tsmon settings and initializes, reinitializes or
// deinitializes tsmon, as needed.
//
// Returns current tsmon state and settings. Panics if the context is using
// unexpected tsmon state.
func (s *State) checkSettings(c context.Context) (*tsmon.State, *tsmonSettings) {
state := tsmon.GetState(c)
var settings tsmonSettings
if s.testingSettings != nil {
settings = *s.testingSettings
} else {
settings = fetchCachedSettings(c)
}
// Read the values used when handling previous request. In most cases they
// are identical to current ones and we can skip grabbing a heavier write
// lock.
s.lock.RLock()
if s.state == state && s.lastSettings == settings {
s.lock.RUnlock()
return state, &settings
}
s.lock.RUnlock()
// 'settings' or 'state' has changed. Reinitialize tsmon as needed under
// the write lock.
s.lock.Lock()
defer s.lock.Unlock()
// First call to 'checkSettings' ever?
if s.state == nil {
s.state = state
s.state.M = monitor.NewNilMonitor() // doFlush uses its own monitor
s.state.InvokeGlobalCallbacksOnFlush = false
} else if state != s.state {
panic("tsmon state in the context was unexpectedly changed between requests")
}
switch {
case !bool(s.lastSettings.Enabled) && bool(settings.Enabled):
s.enableTsMon(c)
case bool(s.lastSettings.Enabled) && !bool(settings.Enabled):
s.disableTsMon(c)
}
s.lastSettings = settings
return state, &settings
}
// enableTsMon puts in-memory metrics store in the context's tsmon state.
//
// Called with 's.lock' locked.
func (s *State) enableTsMon(c context.Context) {
s.state.SetStore(store.NewInMemory(&target.Task{
DataCenter: targetDataCenter,
ServiceName: info.AppID(c),
JobName: info.ModuleName(c),
HostName: strings.SplitN(info.VersionID(c), ".", 2)[0],
TaskNum: -1,
}))
// Request the flush to be executed ASAP, so it registers (or updates)
// 'Instance' entity in the datastore.
s.lastFlushed = time.Time{}
}
// disableTsMon puts nil metrics store in the context's tsmon state.
//
// Called with 's.lock' locked.
func (s *State) disableTsMon(c context.Context) {
s.state.SetStore(store.NewNilStore())
}
// withGAEContext is replaced in unit tests.
var withGAEContext = appengine.WithContext
// flushIfNeeded periodically flushes the accumulated metrics.
//
// It skips the flush if some other goroutine is already flushing. Logs errors.
func (s *State) flushIfNeeded(c context.Context, req *http.Request, state *tsmon.State, settings *tsmonSettings) {
now := clock.Now(c)
flushTime := now.Add(-time.Duration(settings.FlushIntervalSec) * time.Second)
// Most of the time the flush is not needed and we can get away with
// lightweight RLock.
s.lock.RLock()
skip := s.flushingNow || s.lastFlushed.After(flushTime)
s.lock.RUnlock()
if skip {
return
}
// Need to flush. Update flushingNow. Redo the check under write lock.
s.lock.Lock()
skip = s.flushingNow || s.lastFlushed.After(flushTime)
if !skip {
s.flushingNow = true
}
s.lock.Unlock()
if skip {
return
}
// Report per-process statistic, like memory stats.
collectProcessMetrics(c, settings)
// Unset 'flushingNow' no matter what (even on panics). Update 'lastFlushed'
// only on successful flush. Unsuccessful flush thus will be retried ASAP.
success := false
defer func() {
s.lock.Lock()
s.flushingNow = false
if success {
s.lastFlushed = now
}
s.lock.Unlock()
}()
// The flush must be fast. Limit it with by some timeout. It also needs real
// GAE context for gRPC calls in PubSub guts, so slap it on top of luci-go
// context.
c, cancel := clock.WithTimeout(c, flushTimeout)
defer cancel()
c = withGAEContext(c, req) // TODO(vadimsh): not needed with ProdX
if err := s.updateInstanceEntityAndFlush(c, state, settings); err != nil {
logging.Errorf(c, "Failed to flush tsmon metrics: %s", err)
} else {
success = true
}
}
// updateInstanceEntityAndFlush waits for instance to get assigned a task number and
// flushes the metrics.
func (s *State) updateInstanceEntityAndFlush(c context.Context, state *tsmon.State, settings *tsmonSettings) error {
c = info.MustNamespace(c, instanceNamespace)
defTarget := state.S.DefaultTarget()
task, ok := defTarget.(*target.Task)
if !ok {
return fmt.Errorf("default tsmon target is not a Task (%T): %v", defTarget, defTarget)
}
now := clock.Now(c)
// Grab TaskNum assigned to the currently running instance. It is updated by
// a dedicated cron job, see housekeepingHandler in handler.go.
entity, err := getOrCreateInstanceEntity(c)
if err != nil {
return fmt.Errorf("failed to get instance entity - %s", err)
}
// Don't do the flush if we still haven't get a task number.
if entity.TaskNum < 0 {
if task.TaskNum >= 0 {
// We used to have a task number but we don't any more (we were inactive
// for too long), so clear our state.
logging.Warningf(c, "Instance %s got purged from Datastore, but is still alive. "+
"Clearing cumulative metrics", info.InstanceID(c))
state.ResetCumulativeMetrics(c)
}
task.TaskNum = -1
state.S.SetDefaultTarget(task)
// Complain if we haven't been given a task number yet. This is fine for
// recently started instances. Task numbers are populated by the
// housekeeping cron that MUST be configured for tsmon to work.
logging.Warningf(c, "Skipping the flush: instance %s has no task_num.", info.InstanceID(c))
if now.Sub(entity.LastUpdated) > instanceExpectedToHaveTaskNum {
logging.Errorf(c, "Is /internal/cron/ts_mon/housekeeping running? "+
"The instance %s is %s old and has no task_num yet. This is abnormal.",
info.InstanceID(c), now.Sub(entity.LastUpdated))
}
// Non-assigned TaskNum is expected situation, pretend the flush succeeded,
// so that next try happens after regular flush period, not right away.
return nil
}
task.TaskNum = int32(entity.TaskNum)
state.S.SetDefaultTarget(task)
// Refresh 'entity.LastUpdated'. Ignore errors here since the flush is
// happening already even this operation fails.
putDone := make(chan struct{})
go func() {
defer close(putDone)
if err := refreshLastUpdatedTime(c, now); err != nil {
logging.Errorf(c, "Failed to update instance entity: %s", err)
}
}()
ret := s.doFlush(c, state, settings)
<-putDone
return ret
}
// doFlush actually sends the metrics to the monitor.
func (s *State) doFlush(c context.Context, state *tsmon.State, settings *tsmonSettings) error {
var mon monitor.Monitor
var err error
if s.testingMonitor != nil {
mon = s.testingMonitor
} else if info.IsDevAppServer(c) || settings.ProdXAccount == "" {
mon = monitor.NewDebugMonitor("")
} else {
logging.Infof(c, "Sending metrics to ProdX using %s", settings.ProdXAccount)
transport, err := auth.GetRPCTransport(
c,
auth.AsActor,
auth.WithServiceAccount(settings.ProdXAccount),
auth.WithScopes(monitor.ProdxmonScopes...))
if err != nil {
return err
}
endpoint, err := url.Parse(prodXEndpoint)
if err != nil {
return err
}
mon, err = monitor.NewHTTPMonitor(c, &http.Client{Transport: transport}, endpoint)
if err != nil {
return err
}
}
defer mon.Close()
if err = state.Flush(c, mon); err != nil {
return err
}
return nil
}