blob: cfb9ccfe04db7262436e71d49858553fce5c84e1 [file] [log] [blame]
// Copyright 2019 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
// Package dump implements loading AuthDB from dumps in Google Storage.
package dump
import (
// Fetcher can fetch AuthDB snapshots from GCS dumps, requesting access through
// Auth Service if necessary.
// It's designed not to depend on Auth Service availability at all if everything
// is already setup (i.e. the access to AuthDB snapshot is granted). For that
// reason it requires the location of GCS dump and name of Auth Service's
// signing account to be provided as static configuration (since we don't want
// to make RPCs to potentially unavailable Auth Service to discover them).
// The only time Auth Service is directly hit is when GCS returns permission
// errors. When this happens, Fetcher tries to authorize itself through the
// Auth Service API call and then retries the fetch.
type Fetcher struct {
StorageDumpPath string // GCS storage path to the dump "<bucket>/<object>"
AuthServiceURL string // URL of the auth service "https://..."
AuthServiceAccount string // service account name that signed the blob
OAuthScopes []string // scopes to use when making OAuth tokens
testRetryPolicy func() retry.Iterator // how to retry, mocked in tests
testStorageURL string // Google Storage URL, mocked in tests
testStorageClient *http.Client // client to access Google Storage, mocked in tests
testSigningCerts *signing.PublicCertificates // certs to use to check signature, mocked in tests
// FetchAuthDB checks whether there's a newer version of AuthDB available in
// GCS and fetches it if so. If 'cur' is already up-to-date, returns it as is.
// Logs and retries errors internally until the context cancellation or timeout.
func (f *Fetcher) FetchAuthDB(ctx context.Context, cur *authdb.SnapshotDB) (fresh *authdb.SnapshotDB, err error) {
client := f.testStorageClient
if client == nil {
t, err := auth.GetRPCTransport(ctx, auth.AsSelf, auth.WithScopes(f.OAuthScopes...))
if err != nil {
return nil, errors.Reason("can't get authenticating transport").Err()
client = &http.Client{Transport: t}
retryPolicy := f.testRetryPolicy
if retryPolicy == nil {
retryPolicy = transient.Only(indefiniteRetry)
err = retry.Retry(ctx, retryPolicy, func() (err error) {
fresh, err = f.doFetchAttempt(ctx, cur, client)
return err
}, func(err error, wait time.Duration) {
logging.Warningf(ctx, "Failed to fetch AuthDB dump, will retry in %s: %s", wait, err)
// indefiniteRetry is retry.Iterator that retries indefinitely.
func indefiniteRetry() retry.Iterator {
return &retry.ExponentialBackoff{
Limited: retry.Limited{
Retries: -1,
Delay: 500 * time.Millisecond,
MaxDelay: 30 * time.Second,
// doFetchAttempt is one iteration of FetchAuthDB retry loop.
func (f *Fetcher) doFetchAttempt(ctx context.Context, cur *authdb.SnapshotDB, client *http.Client) (*authdb.SnapshotDB, error) {
// Fetch a tiny latest.json. In most cases this is the only RPC we'll do.
latestRev, needAccess, err := f.fetchLatestRev(ctx, client)
if err != nil {
return nil, err
// If have no access, ask for it and immediately try again.
if needAccess {
if err := f.requestAccess(ctx); err != nil {
return nil, err
switch latestRev, needAccess, err = f.fetchLatestRev(ctx, client); {
case err != nil:
return nil, err
case needAccess: // this should not be happening
return nil, errors.Reason("still no access to GCS").Tag(transient.Tag).Err()
// Skip the rest if we already have same or more recent revision.
if cur != nil && cur.Rev >= latestRev {
if cur.Rev > latestRev {
logging.Warningf(ctx, "AuthDB dump revision went back in time (we have %d, the dump is %d)", cur.Rev, latestRev)
return cur, nil
// Fetch and validate the new snapshot.
logging.Infof(ctx, "AuthDB rev %d is available, fetching it...", latestRev)
signed, err := f.fetchSignedAuthDB(ctx, client)
if err != nil {
return nil, err
if err := f.checkSignature(ctx, signed); err != nil {
return nil, err
fresh, err := f.deserializeAuthDB(ctx, signed.AuthDbBlob)
if err != nil {
return nil, err
// Make sure we don't switch to an older revisions no matter what. This should
// not be happening.
if cur != nil && fresh.Rev <= cur.Rev {
logging.Errorf(ctx, "Unexpectedly got an older snapshot (%d <= %d), ignoring it", fresh.Rev, cur.Rev)
return cur, nil
return fresh, nil
// fetchLatestRev returns the revision of the latest AuthDB dump in the storage.
// On access errors returns (0, true, nil). All other errors are considered
// transient.
func (f *Fetcher) fetchLatestRev(ctx context.Context, client *http.Client) (rev int64, needAccess bool, err error) {
switch code, resp, err := f.fetchFromGCS(ctx, client, "latest.json"); {
case err != nil:
return 0, false, transient.Tag.Apply(err)
case code == http.StatusOK:
rev := protocol.AuthDBRevision{}
if err := jsonpb.Unmarshal(bytes.NewReader(resp), &rev); err != nil {
return 0, false, errors.Annotate(err, "failed to unmarshal AuthDBRevision").Err()
return rev.AuthDbRev, false, nil
case code == http.StatusForbidden || code == http.StatusNotFound:
logging.Errorf(ctx, "Permission errors when fetching latest.json")
return 0, true, nil
return 0, false, errors.Reason("got HTTP %d when fetching latest.json:\n%s", code, resp).Tag(transient.Tag).Err()
// fetchSignedAuthDB fetches SignedAuthDB from GCS and deserializes it.
func (f *Fetcher) fetchSignedAuthDB(ctx context.Context, client *http.Client) (*protocol.SignedAuthDB, error) {
code, resp, err := f.fetchFromGCS(ctx, client, "latest.db")
switch {
case err != nil:
return nil, transient.Tag.Apply(err)
case code == http.StatusOK:
logging.Infof(ctx, "Fetched AuthDB snapshot (%.1f Kb)", float32(len(resp))/1024)
db := protocol.SignedAuthDB{}
if err := proto.Unmarshal(resp, &db); err != nil {
return nil, errors.Annotate(err, "failed to unmarshal SignedAuthDB").Err()
return &db, nil
return nil, errors.Reason("got HTTP %d when fetching latest.json:\n%s", code, resp).Tag(transient.Tag).Err()
// checkSignature checks the signature in SignedAuthDB.
func (f *Fetcher) checkSignature(ctx context.Context, s *protocol.SignedAuthDB) error {
if s.SignerId != f.AuthServiceAccount {
return errors.Reason("the snapshot is signed by %q, but we accept only %q", s.SignerId, f.AuthServiceAccount).Err()
certs := f.testSigningCerts
if certs == nil {
var err error
if certs, err = signing.FetchCertificatesForServiceAccount(ctx, s.SignerId); err != nil {
return errors.Annotate(err, "failed to fetch certs of %q", s.SignerId).Tag(transient.Tag).Err()
hash := sha512.Sum512(s.AuthDbBlob)
if err := certs.CheckSignature(s.SigningKeyId, hash[:], s.Signature); err != nil {
return errors.Annotate(err, "failed to verify that AuthDB was signed by %q", s.SignerId).Err()
return nil
// deserializeAuthDB unmarshals and validates AuthDB stored in serialized
// ReplicationPushRequest.
func (f *Fetcher) deserializeAuthDB(ctx context.Context, blob []byte) (*authdb.SnapshotDB, error) {
m := protocol.ReplicationPushRequest{}
if err := proto.Unmarshal(blob, &m); err != nil {
return nil, errors.Annotate(err, "failed to unmarshal ReplicationPushRequest").Err()
rev := m.Revision.AuthDbRev
"AuthDB snapshot rev %d generated by %s (using components.auth v%s)",
rev, m.Revision.PrimaryId, m.AuthCodeVersion)
snap, err := authdb.NewSnapshotDB(m.AuthDb, f.AuthServiceURL, rev, true)
if err != nil {
return nil, errors.Annotate(err, "snapshot at rev %d fails validation", rev).Err()
return snap, nil
// requestAccess asks Auth Service to grant us access to the AuthDB dump.
func (f *Fetcher) requestAccess(ctx context.Context) error {
svc := service.AuthService{
URL: f.AuthServiceURL,
OAuthScopes: f.OAuthScopes, // use same scopes as for GCS to reuse the cached token
logging.Warningf(ctx, "Asking %s to grant us access to read %q...", f.AuthServiceURL, f.StorageDumpPath)
switch info, err := svc.RequestAccess(ctx); {
case err != nil:
return transient.Tag.Apply(err)
case info.StorageDumpPath == "":
return errors.Reason("service %s is not configured to upload AuthDB to GCS", f.AuthServiceURL).Err()
case info.StorageDumpPath != f.StorageDumpPath:
// Note: we can't just dynamically "fix" f.StorageDumpPath. It is important
// that original configuration (e.g. CLI flag) is fixed too, otherwise after
// restart we'll resume looking at the wrong place. So treat this situation
// as a fatal error.
return errors.Reason("wrong configuration: service %s uploads AuthDB to %q, but we are looking at %q",
f.AuthServiceURL, info.StorageDumpPath, f.StorageDumpPath).Err()
logging.Infof(ctx, "Access granted")
return nil
// fetchFromGCS fetches gs://<StorageDumpPath>/<rel> file into memory.
func (f *Fetcher) fetchFromGCS(ctx context.Context, client *http.Client, rel string) (statusCode int, body []byte, err error) {
storageURL := ""
if f.testStorageURL != "" {
storageURL = f.testStorageURL
url := fmt.Sprintf("%s/%s/%s", storageURL, f.StorageDumpPath, rel)
req, _ := http.NewRequest("GET", url, nil)
resp, err := client.Do(req.WithContext(ctx))
if err != nil {
return 0, nil, errors.Annotate(err, "GET %s", url).Err()
defer resp.Body.Close()
blob, err := ioutil.ReadAll(resp.Body)
if err != nil {
return 0, nil, errors.Annotate(err, "GET %s", url).Err()
return resp.StatusCode, blob, nil