blob: 0652c17c969c174b34d19d25905358bb7ffeb3a2 [file] [log] [blame] [edit]
// Copyright 2020 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package repoimport
import (
"context"
"errors"
"fmt"
"os"
"reflect"
"sync"
"time"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/infra/appengine/cr-rev/common"
"go.chromium.org/infra/appengine/cr-rev/models"
)
const (
leaseUpdateDuration = 10 * time.Minute
)
var errImportNotRequired = errors.New("the repository scan is not required")
type leaser struct {
repo common.GitRepository
doc *models.Repository
}
func newLeaser(repo common.GitRepository) *leaser {
return &leaser{
repo: repo,
doc: &models.Repository{
ID: models.RepoID{
Host: repo.Host,
Repository: repo.Name,
},
},
}
}
// WithLease runs function f if lease can be acquired. It then periodically
// refreshes the lease. In case the lease is broken by external process, it
// cancels context passed to the function.
// Lease document will be removed iff there is error during import and lease
// was not broken.
func (l *leaser) WithLease(ctx context.Context, f func(ctx context.Context) error) error {
err := l.acquireLease(ctx)
if err != nil {
return err
}
logging.Debugf(ctx, "Lease acquired for %s/%s", l.repo.Host, l.repo.Name)
// Wait for go routine to finish before returning result
wg := sync.WaitGroup{}
wg.Add(1)
cctx, cancel := context.WithCancel(ctx)
cleanupOnError := true
go func() {
// Refresh lock periodically, and check ownership.
defer wg.Done()
timer := clock.NewTimer(ctx)
timer.Reset(leaseUpdateDuration)
for {
select {
case <-cctx.Done():
return
case <-timer.GetC():
err := l.refreshLease(cctx)
if err != nil {
logging.WithError(err).Errorf(ctx, "Datastore repository state is not expected")
cancel()
// External process acquired the lease so we shouldn't do anything with it.
cleanupOnError = false
return
}
timer.Reset(leaseUpdateDuration)
}
}
}()
err = f(cctx)
cancel()
wg.Wait()
if err != nil && cleanupOnError {
logging.Debugf(ctx, "Releasing lease %s/%s, error: %s", l.repo.Host, l.repo.Name, err.Error())
datastore.Delete(ctx, l.doc)
return err
}
logging.Debugf(ctx, "Releasing lease %s/%s, no error", l.repo.Host, l.repo.Name)
// Indexing is completed, so stop goroutine for refreshing lock.
l.doc.SetIndexingCompleted(clock.Now(ctx).UTC().Round(time.Millisecond))
return datastore.Put(ctx, l.doc)
}
// acquireLease attempts to acquire a lease which is stored in Datastore. If
// there is already an active lease, such lease may be broken only if stale
// (not renewed in deadline).
func (l *leaser) acquireLease(ctx context.Context) error {
return datastore.RunInTransaction(ctx, func(ctx context.Context) error {
if err := datastore.Get(ctx, l.doc); err != nil && err != datastore.ErrNoSuchEntity {
return fmt.Errorf("error reading from datastore: %w", err)
}
now := clock.Now(ctx).UTC().Round(time.Millisecond)
if !l.doc.IsScanRequired(now) {
logging.Debugf(ctx, "the repository scan is not required (%+v)", l.doc)
return errImportNotRequired
}
l.doc.SetStartIndexing(now, os.Getenv("GAE_INSTANCE"))
if err := datastore.Put(ctx, l.doc); err != nil {
return fmt.Errorf("error writing to datastore: %w", err)
}
return nil
}, nil)
}
// refreshLease attempts to extend the lease. If the document is modified by
// external process, the lease won't be renewed and error will be returned.
func (l *leaser) refreshLease(ctx context.Context) error {
dst := models.Repository{
ID: l.doc.ID,
}
return datastore.RunInTransaction(ctx, func(ctx context.Context) error {
if err := datastore.Get(ctx, &dst); err != nil {
return err
}
if !reflect.DeepEqual(*l.doc, dst) {
return errors.New("some other process claimed the lock, aborting import")
}
l.doc.ExtendLease(clock.Now(ctx).UTC().Round(time.Millisecond))
return datastore.Put(ctx, l.doc)
}, nil)
}