blob: f1846b9f9bfa2bc790f7fedcfc8873b0fba8b0a4 [file] [log] [blame]
// Copyright 2019 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package state
import (
"context"
"go.chromium.org/luci/common/tsmon/field"
"go.chromium.org/luci/common/tsmon/metric"
"go.chromium.org/luci/common/tsmon/types"
"infra/appengine/qscheduler-swarming/app/eventlog"
"infra/qscheduler/qslib/protos/metrics"
"infra/qscheduler/qslib/scheduler"
)
var (
counterCompleted = metric.NewCounter(
"qscheduler/state/event/task_completed",
"Task completed by swarming.",
nil,
field.String("scheduler_id"),
field.String("account_id"),
)
counterEnqueued = metric.NewCounter(
"qscheduler/state/event/task_enqueued",
"Task enqueued by swarming.",
nil,
field.String("scheduler_id"),
field.String("account_id"),
)
counterAssigned = metric.NewCounter(
"qscheduler/state/event/task_assigned",
"Task assigned by qscheduler.",
nil,
field.String("scheduler_id"),
field.String("account_id"),
field.Bool("preempting"),
field.Bool("provision_required"),
)
counterPreempted = metric.NewCounter(
"qscheduler/state/event/task_preempted",
"Task preempted by qscheduler.",
nil,
field.String("scheduler_id"),
field.String("account_id"),
)
counterReprioritized = metric.NewCounter(
"qscheduler/state/event/task_reprioritized",
"Task reprioritized by qscheduler.",
nil,
field.String("scheduler_id"),
field.String("account_id"),
)
gaugeQueueSize = metric.NewInt(
"qscheduler/state/queue_size",
"The number of tasks in the queue.",
nil,
field.String("scheduler_id"),
)
gaugeProtoSize = metric.NewInt(
"qscheduler/store/proto_size",
"Size of a loaded store proto.",
&types.MetricMetadata{
Units: types.Bytes,
},
field.String("scheduler_id"),
field.String("type"),
)
)
// metricsBuffer implements scheduler.EventSink.
//
// Metrics are buffered so that they can be sent to bigquery and tsmon upon
// successful datastore transaction.
type metricsBuffer struct {
schedulerID string
taskEvents []*metrics.TaskEvent
queueSize *int
}
// newMetricsBuffer creates a metrics sink for the given scheduler.
func newMetricsBuffer(schedulerID string) *metricsBuffer {
return &metricsBuffer{schedulerID: schedulerID}
}
// reset resets the given metrics sink, erasing any previously added entries.
func (e *metricsBuffer) reset() {
e.taskEvents = nil
e.queueSize = nil
}
// flushToBQ flushes events to bigquery.
//
// This can be called inside of a datastore transaction, in which case events
// will only be flushed if the transaction succeeds.
func (e *metricsBuffer) flushToBQ(ctx context.Context) error {
return eventlog.TaskEvents(ctx, e.taskEvents...)
}
// flushToTsMon flushes events to ts_mon.
func (e *metricsBuffer) flushToTsMon(ctx context.Context) error {
for _, event := range e.taskEvents {
switch event.EventType {
case metrics.TaskEvent_SWARMING_COMPLETED:
counterCompleted.Add(ctx, 1, event.SchedulerId, event.AccountId)
case metrics.TaskEvent_SWARMING_ENQUEUED:
counterEnqueued.Add(ctx, 1, event.SchedulerId, event.AccountId)
case metrics.TaskEvent_QSCHEDULER_ASSIGNED:
details := event.GetAssignedDetails()
counterAssigned.Add(ctx, 1, event.SchedulerId, event.AccountId, details.Preempting, details.ProvisionRequired)
case metrics.TaskEvent_QSCHEDULER_PREEMPTED:
counterPreempted.Add(ctx, 1, event.SchedulerId, event.AccountId)
case metrics.TaskEvent_QSCHEDULER_REPRIORITIZED:
counterReprioritized.Add(ctx, 1, event.SchedulerId, event.AccountId)
}
}
if e.queueSize != nil {
gaugeQueueSize.Set(ctx, int64(*e.queueSize), e.schedulerID)
}
return nil
}
// AddEvent implements scheduler.EventSink.
func (e *metricsBuffer) AddEvent(event *metrics.TaskEvent) {
event.SchedulerId = e.schedulerID
e.taskEvents = append(e.taskEvents, event)
}
// recordStateMetrics records general metrics about the given state.
func (e *metricsBuffer) recordStateMetrics(s *scheduler.Scheduler) {
queueSize := len(s.GetWaitingRequests())
e.queueSize = &queueSize
}
// recordProtoSize records a metric about a given proto's size.
func recordProtoSize(ctx context.Context, bytes int, schedulerID string, protoType string) {
gaugeProtoSize.Set(ctx, int64(bytes), schedulerID, protoType)
}