blob: c42c0ac134a6fcd558de0e6bd7f42bf11ca4f350 [file] [log] [blame]
// Copyright 2018 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package backend
import (
"context"
"fmt"
"net/http"
"regexp"
"strings"
"time"
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
compute "google.golang.org/api/compute/v1"
"google.golang.org/api/googleapi"
"go.chromium.org/luci/appengine/tq"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/gce/api/tasks/v1"
"go.chromium.org/luci/gce/appengine/backend/internal/metrics"
"go.chromium.org/luci/gce/appengine/model"
)
var (
// Check createVM in queues.go for the naming logic
hostnameSuffixMatch = regexp.MustCompile(`-[0-9]+-[a-z0-9]{4}$`)
)
// setCreated sets the GCE instance as created in the datastore if it isn't already.
func setCreated(c context.Context, id string, inst *compute.Instance) error {
t, err := time.Parse(time.RFC3339, inst.CreationTimestamp)
if err != nil {
return errors.Annotate(err, "failed to parse instance creation time").Err()
}
nics := make([]model.NetworkInterface, len(inst.NetworkInterfaces))
for i, n := range inst.NetworkInterfaces {
if len(n.AccessConfigs) > 0 {
// GCE currently supports at most one access config per network interface.
nics[i].ExternalIP = n.AccessConfigs[0].NatIP
if len(n.AccessConfigs) > 1 {
logging.Warningf(c, "network interface %q has more than one access config", n.Name)
}
}
nics[i].InternalIP = n.NetworkIP
}
vm := &model.VM{
ID: id,
}
put := false
err = datastore.RunInTransaction(c, func(c context.Context) error {
put = false
switch err := datastore.Get(c, vm); {
case err != nil:
return errors.Annotate(err, "failed to fetch VM").Err()
case vm.Created > 0:
return nil
}
vm.Created = t.Unix()
vm.NetworkInterfaces = nics
vm.URL = inst.SelfLink
if err := datastore.Put(c, vm); err != nil {
return errors.Annotate(err, "failed to store VM").Err()
}
put = true
return nil
}, nil)
if put && err == nil {
metrics.ReportCreationTime(c, float64(vm.Created-vm.Configured), vm.Prefix, vm.Attributes.GetProject(), vm.Attributes.GetZone())
}
return err
}
// logErrors logs the errors in the given *googleapi.Error.
func logErrors(c context.Context, name string, err *googleapi.Error) {
var errMsgs []string
for _, err := range err.Errors {
errMsgs = append(errMsgs, err.Message)
}
logging.Errorf(c, "failure for %s, HTTP: %d, errors: %s", name, err.Code, strings.Join(errMsgs, ","))
}
// rateLimitExceeded returns whether the given *googleapi.Error contains a rate
// limit error.
func rateLimitExceeded(err *googleapi.Error) bool {
for _, err := range err.Errors {
switch {
case strings.Contains(err.Message, "Queries per user per 100 seconds"):
return true
case strings.Contains(err.Message, "Rate Limit Exceeded"):
return true
case strings.Contains(err.Message, "rateLimitExceeded"):
return true
case strings.Contains(err.Reason, "rateLimitExceeded"):
return true
}
}
return false
}
// checkInstance fetches the GCE instance and either sets its creation details
// or deletes the VM if the instance doesn't exist.
func checkInstance(c context.Context, vm *model.VM) error {
srv := getCompute(c).Stable.Instances
call := srv.Get(vm.Attributes.GetProject(), vm.Attributes.GetZone(), vm.Hostname)
inst, err := call.Context(c).Do()
if err != nil {
if gerr, ok := err.(*googleapi.Error); ok {
if gerr.Code == http.StatusNotFound {
metrics.UpdateFailures(c, gerr.Code, vm)
if err := deleteVM(c, vm.ID, vm.Hostname); err != nil {
return errors.Annotate(err, "instance not found").Err()
}
return errors.Annotate(err, "instance not found").Err()
}
logErrors(c, vm.Hostname, gerr)
}
return errors.Annotate(err, "failed to fetch instance").Err()
}
logging.Debugf(c, "created instance: %s", inst.SelfLink)
metrics.CreatedInstanceChecked.Add(c, 1, vm.Config, vm.Attributes.GetProject(), vm.Attributes.GetZone(), vm.Hostname)
return setCreated(c, vm.ID, inst)
}
// createInstanceQueue is the name of the create instance task handler queue.
const createInstanceQueue = "create-instance"
// createInstance creates a GCE instance.
func createInstance(ctx context.Context, payload proto.Message) error {
task, ok := payload.(*tasks.CreateInstance)
switch {
case !ok:
return errors.Reason("unexpected payload %q", payload).Err()
case task.GetId() == "":
return errors.Reason("ID is required").Err()
}
vm := &model.VM{
ID: task.Id,
}
switch err := datastore.Get(ctx, vm); {
case err != nil:
return errors.Annotate(err, "failed to fetch VM").Err()
case vm.URL != "":
logging.Debugf(ctx, "instance exists: %s", vm.URL)
return nil
}
logging.Debugf(ctx, "creating instance %q", vm.Hostname)
instance := vm.GetInstance()
// Generate a request ID based on the hostname.
// Ensures duplicate operations aren't created in GCE.
// Request IDs are valid for 24 hours.
rID := uuid.NewSHA1(uuid.Nil, []byte(fmt.Sprintf("create-%s", vm.Hostname)))
srv := getCompute(ctx)
op, err := srv.InsertInstance(ctx, vm.Attributes.GetProject(), vm.Attributes.GetZone(), instance, rID.String())
if err != nil {
if gerr, ok := err.(*googleapi.Error); ok {
logErrors(ctx, vm.Hostname, gerr)
metrics.UpdateFailures(ctx, gerr.Code, vm)
// TODO(b/130826296): Remove this once rate limit returns a transient HTTP error code.
if rateLimitExceeded(gerr) {
return errors.Annotate(err, "rate limit exceeded creating instance %s", vm.Hostname).Err()
}
if gerr.Code == http.StatusTooManyRequests || gerr.Code >= 500 {
return errors.Annotate(err, "transiently failed to create instance %s", vm.Hostname).Err()
}
if err := deleteVM(ctx, task.Id, vm.Hostname); err != nil {
return errors.Annotate(err, "failed to create instance %s", vm.Hostname).Err()
}
}
return errors.Annotate(err, "failed to create instance %s", vm.Hostname).Err()
}
// TODO(gregorynisbet): I don't like this variable name.
errs := op.GetErrors()
if len(errs) > 0 {
for _, err := range errs {
logging.Errorf(ctx, "for vm %s, %s: %s", vm.Hostname, err.Code, err.Message)
}
metrics.UpdateFailures(ctx, 200, vm)
if err := deleteVM(ctx, task.Id, vm.Hostname); err != nil {
return errors.Annotate(err, "failed to create instance %s", vm.Hostname).Err()
}
return errors.Reason("failed to create instance %s", vm.Hostname).Err()
}
if op.GetStatus() == "DONE" {
return checkInstance(ctx, vm)
}
// Instance creation is pending.
return nil
}
// destroyInstanceAsync schedules a task queue task to destroy a GCE instance.
func destroyInstanceAsync(c context.Context, id, url string) error {
t := &tq.Task{
Payload: &tasks.DestroyInstance{
Id: id,
Url: url,
},
}
if err := getDispatcher(c).AddTask(c, t); err != nil {
return errors.Annotate(err, "failed to schedule destroy task").Err()
}
return nil
}
// destroyInstanceQueue is the name of the destroy instance task handler queue.
const destroyInstanceQueue = "destroy-instance"
// destroyInstance destroys a GCE instance.
func destroyInstance(c context.Context, payload proto.Message) error {
task, ok := payload.(*tasks.DestroyInstance)
switch {
case !ok:
return errors.Reason("unexpected payload type %T", payload).Err()
case task.GetId() == "":
return errors.Reason("ID is required").Err()
case task.GetUrl() == "":
return errors.Reason("URL is required").Err()
}
vm := &model.VM{
ID: task.Id,
}
switch err := datastore.Get(c, vm); {
case err == datastore.ErrNoSuchEntity:
return nil
case err != nil:
return errors.Annotate(err, "failed to fetch VM %s", vm.ID).Err()
case vm.URL != task.Url:
// Instance is already destroyed and replaced. Don't destroy the new one.
logging.Debugf(c, "instance %s does not exist: %s", vm.Hostname, task.Url)
return nil
}
logging.Debugf(c, "destroying instance %q", vm.Hostname)
rID := uuid.NewSHA1(uuid.Nil, []byte(fmt.Sprintf("destroy-%s", vm.Hostname)))
srv := getCompute(c).Stable.Instances
call := srv.Delete(vm.Attributes.GetProject(), vm.Attributes.GetZone(), vm.Hostname)
op, err := call.RequestId(rID.String()).Context(c).Do()
metrics.DestroyInstanceUnchecked.Add(c, 1, vm.Config, vm.Attributes.GetProject(), vm.Attributes.GetZone(), vm.Hostname)
if err != nil {
if gerr, ok := err.(*googleapi.Error); ok {
if gerr.Code == http.StatusNotFound {
// Instance is already destroyed.
logging.Debugf(c, "instance does not exist: %s", vm.URL)
return deleteBotAsync(c, task.Id, vm.Hostname)
}
logErrors(c, vm.Hostname, gerr)
}
return errors.Annotate(err, "failed to destroy instance %s", vm.Hostname).Err()
}
if op.Error != nil && len(op.Error.Errors) > 0 {
for _, err := range op.Error.Errors {
logging.Errorf(c, "%s: %s", err.Code, err.Message)
}
return errors.Reason("failed to destroy instance %s", vm.Hostname).Err()
}
if op.Status == "DONE" {
logging.Debugf(c, "destroyed instance: %s", op.TargetLink)
return deleteBotAsync(c, task.Id, vm.Hostname)
}
// Instance destruction is pending.
return nil
}
const auditInstancesQueue = "audit-instances"
// Number of instances to audit per run
const auditBatchSize = 100
// auditInstanceInZone attempts to enforce the vm table as the source of truth
// for any given project. It does this by deleting anything that it doesn't
// recognize.
func auditInstanceInZone(c context.Context, payload proto.Message) error {
task, ok := payload.(*tasks.AuditProject)
switch {
case !ok:
return errors.Reason("Unexpected payload type %T", payload).Err()
case task.GetProject() == "":
return errors.Reason("Project is required").Err()
case task.GetZone() == "":
return errors.Reason("Zone is required").Err()
}
proj := task.GetProject()
zone := task.GetZone()
logging.Debugf(c, "Auditing %s in %s", proj, zone)
// List a bunch on instances and validate with the DB
srv := getCompute(c).Stable.Instances
call := srv.List(proj, zone).MaxResults(auditBatchSize)
if task.GetPageToken() != "" {
// Add the page token if given
call = call.PageToken(task.GetPageToken())
}
op, err := call.Context(c).Do()
if err != nil {
if gerr, ok := err.(*googleapi.Error); ok {
logErrors(c, fmt.Sprintf("%s-%s", proj, zone), gerr)
}
return errors.Annotate(err, "failed to list %s-%s", proj, zone).Err()
}
if op.Warning != nil {
logging.Warningf(c, "%s: %s", op.Warning.Code, op.Warning.Message)
}
// Assign job to handle the next set of instances
defer func(c context.Context, task *tasks.AuditProject, pageToken string) {
if pageToken != "" {
task.PageToken = pageToken
if err := getDispatcher(c).AddTask(c, &tq.Task{
Payload: task,
}); err != nil {
logging.Errorf(c, "Failed to add audit task for %s-%s", task.GetProject(), task.GetZone())
}
}
}(c, task, op.NextPageToken)
// Mapping from hostname to VM
hostToVM := make(map[string]*model.VM)
for _, inst := range op.Items {
// Init all the hostnames. Or as gcloud calls it names. There
// is no hostname assigned to this VM in gclouds sense. This is
// because it has a different meaning. We mean the hostname
// from swarmings perspective, which is gclouds sense is just
// name. The instance struct does have a hostname field, but
// this is empty for all the VMs
hostToVM[inst.Name] = nil
}
query := datastore.NewQuery(model.VMKind)
// Queries will contain all the queries to be made to datastore
var queries []*datastore.Query
for hostname := range hostToVM {
// Eq creates a new object everytime. So collect all the unique
// queries
queries = append(queries, query.Eq("hostname", hostname))
}
mapVMs := func(vm *model.VM) {
hostToVM[vm.Hostname] = vm
}
// Get all the VM records corresponding to the listed instances
err = datastore.RunMulti(c, queries, mapVMs)
if err != nil {
logging.Errorf(c, "Failed to query for the VMS. %v", err)
return err
}
var countLeaks int64
// Delete the ones we don't know of
for hostname, vm := range hostToVM {
if vm == nil && isLeakHuerestic(c, hostname, proj, zone) {
countLeaks += 1
logging.Debugf(c, "plugging the instance leak in %s-%s: %s", proj, zone, hostname)
// Send a delete request for the instance
reqID := uuid.NewSHA1(uuid.Nil, []byte(fmt.Sprintf("plug-%s", hostname)))
del := srv.Delete(proj, zone, hostname)
op, err := del.RequestId(reqID.String()).Context(c).Do()
if err != nil {
if gerr, ok := err.(*googleapi.Error); ok {
if gerr.Code == http.StatusNotFound {
// Instance is already destroyed.
logging.Debugf(c, "instance does not exist: %s", hostname)
}
logErrors(c, hostname, gerr)
}
logging.Errorf(c, "failed to plug the leak %s. %v", hostname, err)
continue
}
if op.Error != nil && len(op.Error.Errors) > 0 {
for _, e := range op.Error.Errors {
logging.Errorf(c, "%s: %s", e.Code, e.Message)
}
logging.Errorf(c, "failed to plug the leak %s. %v", hostname, err)
continue
}
if op.Status == "DONE" {
logging.Debugf(c, "plugged the leak of instance: %s", op.TargetLink)
}
}
}
metrics.UpdateLeaks(c, countLeaks, proj, zone)
return nil
}
// isLeakHuerestic determines if the given hostname is a leak from gce-provider
// using a heuristic.
func isLeakHuerestic(ctx context.Context, hostname, proj, zone string) bool {
// If this was an instance created by gce-provider. There must be a set
// of VM records for the prefix matching the hostname. Checking the
// existence of these will let us determine if this was a leak
id := hostname[:len(hostname)-5] // Remove the last 5 characters
prefix := hostnameSuffixMatch.ReplaceAllString(hostname, "")
q := datastore.NewQuery(model.VMKind)
// Get all the VMs under the prefix
q = q.Eq("prefix", prefix)
// Update the leak var based on the VM records that we find
leak := false
cb := func(vm *model.VM) error {
switch {
case vm.Hostname == hostname:
logging.Debugf(ctx, "%s record exists. Not a leak", hostname)
leak = false
// Stop searing as this is not a leak
return datastore.Stop
case vm.ID == id:
logging.Debugf(ctx, "%s is a leaked instance replaced by %s", hostname, vm.Hostname)
leak = true
// Stop searching as this is a leak
return datastore.Stop
default:
// Prefix matches but we cannot find the id that matches. Maybe the pool
// was resized. Might be a leak unless above conditions are true for
// other matches
leak = true
}
return nil
}
err := datastore.Run(ctx, q, cb)
if err != nil {
logging.Errorf(ctx, "isLeakHuerestic -- [%s]Error querying DB %v", hostname, err)
leak = false
}
if leak {
logging.Debugf(ctx, "%s is probably a leak in %s and %s", hostname, proj, zone)
} else {
logging.Debugf(ctx, "Cannot determine if %s is a leak in %s and %s", hostname, proj, zone)
}
return leak
}