| // Copyright 2015 The LUCI Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package authdbimpl |
| |
| import ( |
| "context" |
| "fmt" |
| "strings" |
| "time" |
| |
| ds "go.chromium.org/luci/gae/service/datastore" |
| |
| "go.chromium.org/luci/common/clock" |
| "go.chromium.org/luci/common/errors" |
| "go.chromium.org/luci/common/logging" |
| "go.chromium.org/luci/common/retry/transient" |
| "go.chromium.org/luci/server/auth/service" |
| "go.chromium.org/luci/server/auth/service/protocol" |
| ) |
| |
| // SnapshotInfo identifies some concrete AuthDB snapshot. |
| // |
| // Singleton entity. Serves as a pointer to a blob with corresponding AuthDB |
| // proto message (stored in separate Snapshot entity). |
| type SnapshotInfo struct { |
| AuthServiceURL string `gae:",noindex"` |
| Rev int64 `gae:",noindex"` |
| |
| _kind string `gae:"$kind,gaeauth.SnapshotInfo"` |
| _id int64 `gae:"$id,1"` |
| } |
| |
| // GetSnapshotID returns datastore ID of the corresponding Snapshot entity. |
| func (si *SnapshotInfo) GetSnapshotID() string { |
| if strings.IndexByte(si.AuthServiceURL, ',') != -1 { |
| panic(fmt.Errorf("forbidden symbol ',' in URL %q", si.AuthServiceURL)) |
| } |
| return fmt.Sprintf("v1,%s,%d", si.AuthServiceURL, si.Rev) |
| } |
| |
| // Snapshot is serialized deflated AuthDB blob with some minimal metadata. |
| // |
| // Root entity. Immutable. Key has the form "v1,<AuthServiceURL>,<Revision>", |
| // it's generated by SnapshotInfo.GetSnapshotID(). It is globally unique |
| // version identifier, since it includes URL of an auth service. AuthServiceURL |
| // should be not very long (~< 250 chars) for this too work. |
| // |
| // Currently does not get garbage collected. |
| type Snapshot struct { |
| ID string `gae:"$id"` |
| |
| // AuthDBDeflated is zlib-compressed serialized AuthDB protobuf message. |
| AuthDBDeflated []byte `gae:",noindex"` |
| |
| CreatedAt time.Time // when it was created on Auth service |
| FetchedAt time.Time // when it was fetched and put into the datastore |
| |
| _kind string `gae:"$kind,gaeauth.Snapshot"` |
| } |
| |
| // GetLatestSnapshotInfo fetches SnapshotInfo singleton entity. |
| // |
| // If no such entity is stored, returns (nil, nil). |
| func GetLatestSnapshotInfo(ctx context.Context) (*SnapshotInfo, error) { |
| report := durationReporter(ctx, latestSnapshotInfoDuration) |
| logging.Debugf(ctx, "Fetching AuthDB snapshot info from the datastore") |
| ctx = ds.WithoutTransaction(defaultNS(ctx)) |
| info := SnapshotInfo{} |
| switch err := ds.Get(ctx, &info); { |
| case err == ds.ErrNoSuchEntity: |
| report("SUCCESS") |
| return nil, nil |
| case err != nil: |
| report("ERROR_TRANSIENT") |
| return nil, transient.Tag.Apply(err) |
| default: |
| report("SUCCESS") |
| return &info, nil |
| } |
| } |
| |
| // deleteSnapshotInfo removes SnapshotInfo entity from the datastore. |
| // |
| // Used to detach the service from auth_service. |
| func deleteSnapshotInfo(ctx context.Context) error { |
| ctx = ds.WithoutTransaction(ctx) |
| return ds.Delete(ctx, ds.KeyForObj(ctx, &SnapshotInfo{})) |
| } |
| |
| // GetAuthDBSnapshot fetches, inflates and deserializes AuthDB snapshot. |
| func GetAuthDBSnapshot(ctx context.Context, id string) (*protocol.AuthDB, error) { |
| report := durationReporter(ctx, getSnapshotDuration) |
| logging.Debugf(ctx, "Fetching AuthDB snapshot from the datastore") |
| defer logging.Debugf(ctx, "AuthDB snapshot fetched") |
| |
| ctx = ds.WithoutTransaction(defaultNS(ctx)) |
| snap := Snapshot{ID: id} |
| switch err := ds.Get(ctx, &snap); { |
| case err == ds.ErrNoSuchEntity: |
| report("ERROR_NO_SNAPSHOT") |
| return nil, err // not transient |
| case err != nil: |
| report("ERROR_TRANSIENT") |
| return nil, transient.Tag.Apply(err) |
| } |
| |
| db, err := service.InflateAuthDB(snap.AuthDBDeflated) |
| if err != nil { |
| report("ERROR_INFLATION") |
| return nil, err |
| } |
| |
| report("SUCCESS") |
| return db, nil |
| } |
| |
| // ConfigureAuthService makes initial fetch of AuthDB snapshot from the auth |
| // service and sets up PubSub subscription. |
| // |
| // `baseURL` is root URL of currently running service, will be used to derive |
| // PubSub push endpoint URL. |
| // |
| // If `authServiceURL` is blank, disables the fetching. |
| func ConfigureAuthService(ctx context.Context, baseURL, authServiceURL string) error { |
| logging.Infof(ctx, "Reconfiguring AuthDB to be fetched from %q", authServiceURL) |
| ctx = defaultNS(ctx) |
| |
| // If switching auth services, need to grab URL of a currently configured |
| // auth service to unsubscribe from its PubSub stream. |
| prevAuthServiceURL := "" |
| switch existing, err := GetLatestSnapshotInfo(ctx); { |
| case err != nil: |
| return err |
| case existing != nil: |
| prevAuthServiceURL = existing.AuthServiceURL |
| } |
| |
| // Stopping synchronization completely? |
| if authServiceURL == "" { |
| if prevAuthServiceURL != "" { |
| if err := killPubSub(ctx, prevAuthServiceURL); err != nil { |
| return err |
| } |
| } |
| return deleteSnapshotInfo(ctx) |
| } |
| |
| // Fetch latest AuthDB snapshot and store it in the datastore, thus verifying |
| // authServiceURL works end-to-end. |
| srv := getAuthService(ctx, authServiceURL) |
| latestRev, err := srv.GetLatestSnapshotRevision(ctx) |
| if err != nil { |
| return err |
| } |
| info := &SnapshotInfo{ |
| AuthServiceURL: authServiceURL, |
| Rev: latestRev, |
| } |
| if err := fetchSnapshot(ctx, info); err != nil { |
| logging.Errorf(ctx, "Failed to fetch latest snapshot from %s - %s", authServiceURL, err) |
| return err |
| } |
| |
| // Configure PubSub subscription to receive future updates. |
| if err := setupPubSub(ctx, baseURL, authServiceURL); err != nil { |
| logging.Errorf(ctx, "Failed to configure pubsub subscription - %s", err) |
| return err |
| } |
| |
| // All is configured. Switch SnapshotInfo entity to point to new snapshot. |
| // It makes syncAuthDB fetch changes from `authServiceURL`, thus promoting |
| // `authServiceURL` to the status of main auth service. |
| if err := ds.Put(ds.WithoutTransaction(ctx), info); err != nil { |
| return transient.Tag.Apply(err) |
| } |
| |
| // Stop getting notifications from previously used auth service. |
| if prevAuthServiceURL != "" && prevAuthServiceURL != authServiceURL { |
| return killPubSub(ctx, prevAuthServiceURL) |
| } |
| |
| return nil |
| } |
| |
| // fetchSnapshot fetches AuthDB snapshot specified by `info` and puts it into |
| // the datastore. |
| // |
| // Idempotent. Doesn't touch SnapshotInfo entity itself, and thus always safe |
| // to call. |
| func fetchSnapshot(ctx context.Context, info *SnapshotInfo) error { |
| srv := getAuthService(ctx, info.AuthServiceURL) |
| snap, err := srv.GetSnapshot(ctx, info.Rev) |
| if err != nil { |
| return err |
| } |
| blob, err := service.DeflateAuthDB(snap.AuthDB) |
| if err != nil { |
| return err |
| } |
| ent := Snapshot{ |
| ID: info.GetSnapshotID(), |
| AuthDBDeflated: blob, |
| CreatedAt: snap.Created.UTC(), |
| FetchedAt: clock.Now(ctx).UTC(), |
| } |
| logging.Infof(ctx, "Lag: %s", ent.FetchedAt.Sub(ent.CreatedAt)) |
| return transient.Tag.Apply(ds.Put(ds.WithoutTransaction(ctx), &ent)) |
| } |
| |
| // syncAuthDB fetches latest AuthDB snapshot from the configured auth service, |
| // puts it into the datastore and updates SnapshotInfo entity to point to it. |
| // |
| // Expects authenticating transport to be in the context. Called when receiving |
| // PubSub notifications. |
| // |
| // Returns SnapshotInfo of the most recent snapshot. |
| func syncAuthDB(ctx context.Context) (*SnapshotInfo, error) { |
| report := durationReporter(ctx, syncAuthDBDuration) |
| |
| // `info` is what we have in the datastore now. |
| info, err := GetLatestSnapshotInfo(ctx) |
| if err != nil { |
| report("ERROR_GET_LATEST_INFO") |
| return nil, err |
| } |
| if info == nil { |
| report("ERROR_NOT_CONFIGURED") |
| return nil, errors.New("auth_service URL is not configured") |
| } |
| |
| // Grab revision number of the latest snapshot on the server. |
| srv := getAuthService(ctx, info.AuthServiceURL) |
| latestRev, err := srv.GetLatestSnapshotRevision(ctx) |
| if err != nil { |
| report("ERROR_GET_LATEST_REVISION") |
| return nil, err |
| } |
| |
| // Nothing new? |
| if info.Rev == latestRev { |
| logging.Infof(ctx, "AuthDB is up-to-date at revision %d", latestRev) |
| report("SUCCESS_UP_TO_DATE") |
| return info, nil |
| } |
| |
| // Auth service traveled back in time? |
| if info.Rev > latestRev { |
| logging.Errorf( |
| ctx, "Latest AuthDB revision on server is %d, we have %d. It should not happen", |
| latestRev, info.Rev) |
| report("SUCCESS_NEWER_ALREADY") |
| return info, nil |
| } |
| |
| // Fetch the actual snapshot from the server and put it into the datastore. |
| info.Rev = latestRev |
| if err = fetchSnapshot(ctx, info); err != nil { |
| logging.Errorf(ctx, "Failed to fetch snapshot %d from %q - %s", info.Rev, info.AuthServiceURL, err) |
| report("ERROR_FETCHING") |
| return nil, err |
| } |
| |
| // Move pointer to the latest snapshot only if it is more recent than what is |
| // already in the datastore. |
| var latest *SnapshotInfo |
| err = ds.RunInTransaction(ds.WithoutTransaction(ctx), func(ctx context.Context) error { |
| latest = &SnapshotInfo{} |
| switch err := ds.Get(ctx, latest); { |
| case err == ds.ErrNoSuchEntity: |
| logging.Warningf(ctx, "No longer need to fetch AuthDB, not configured anymore") |
| return nil |
| case err != nil: |
| return err |
| case latest.AuthServiceURL != info.AuthServiceURL: |
| logging.Warningf( |
| ctx, "No longer need to fetch AuthDB from %q, %q is primary now", |
| info.AuthServiceURL, latest.AuthServiceURL) |
| return nil |
| case latest.Rev >= info.Rev: |
| logging.Warningf(ctx, "Already have rev %d", info.Rev) |
| return nil |
| } |
| latest = info |
| return ds.Put(ctx, info) |
| }, nil) |
| |
| if err != nil { |
| report("ERROR_COMMITTING") |
| return nil, transient.Tag.Apply(err) |
| } |
| |
| report("SUCCESS_UPDATED") |
| return latest, nil |
| } |