blob: ce58a387efb500c0a8d9a65bccf40df7abaa961c [file] [log] [blame]
// Copyright 2022 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
"archive/tar"
"compress/gzip"
"context"
"fmt"
"io"
"regexp"
"sort"
"strings"
"time"
"google.golang.org/protobuf/encoding/prototext"
"go.chromium.org/luci/auth/identity"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/data/stringset"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/server/auth"
"go.chromium.org/luci/auth_service/api/configspb"
)
// Imports groups from some external tar.gz bundle or plain text list.
// External URL should serve *.tar.gz file with the following file structure:
// <external group system name>/<group name>:
// userid
// userid
// ...
// For example ldap.tar.gz may look like:
// ldap/trusted-users:
// jane
// joe
// ...
// ldap/all:
// jane
// joe
// ...
// Each tarball may have groups from multiple external systems, but groups from
// some external system must not be split between multiple tarballs. When importer
// sees <external group system name>/* in a tarball, it modifies group list from
// that system on the server to match group list in the tarball _exactly_,
// including removal of groups that are on the server, but no longer present in
// the tarball.
// Plain list format should have one userid per line and can only describe a single
// group in a single system. Such groups will be added to 'external/*' groups
// namespace. Removing such group from importer config will remove it from
// service too.
// The service can also be configured to accept tarball uploads (instead of
// fetching them). Fetched and uploaded tarballs are handled in the exact same way,
// in particular all caveats related to external group system names apply.
// GroupImporterConfig is a singleton entity that contains the contents of the imports.cfg file.
type GroupImporterConfig struct {
Kind string `gae:"$kind,GroupImporterConfig"`
ID string `gae:"$id,config"`
// ConfigProto is the plaintext copy of the config found at imports.cfg.
ConfigProto string `gae:"config_proto"`
// ConfigRevision is revision version of the config found at imports.cfg.
ConfigRevision []byte `gae:"config_revision"`
// ModifiedBy is the email of the user who modified the cfg.
ModifiedBy string `gae:"modified_by"`
// ModifiedTS is the time when this entity was last modified.
ModifiedTS time.Time `gae:"modified_ts"`
}
var GroupNameRe = regexp.MustCompile(`^([a-z\-]+/)?[0-9a-z_\-\.@]{1,100}$`)
// GroupBundle is a map where k: groupName, v: list of identities belonging to group k.
type GroupBundle = map[string][]identity.Identity
// GetGroupImporterConfig fetches the GroupImporterConfig entity from the datastore.
//
// Returns GroupImporterConfig entity if present.
// Returns datastore.ErrNoSuchEntity if the entity is not present.
// Returns annotated error for all other errors.
func GetGroupImporterConfig(ctx context.Context) (*GroupImporterConfig, error) {
groupsCfg := &GroupImporterConfig{
Kind: "GroupImporterConfig",
ID: "config",
}
switch err := datastore.Get(ctx, groupsCfg); {
case err == nil:
return groupsCfg, nil
case errors.Is(err, datastore.ErrNoSuchEntity):
return nil, err
default:
return nil, errors.Annotate(err, "error getting GroupImporterConfig").Err()
}
}
// IngestTarball handles upload of tarball's specified in 'tarball_upload' config entries.
// expected to be called in an auth context of the upload PUT request.
//
// returns
//
// []string - list of modified groups
// int64 - authDBRevision
// error
// proto translation error
// entry is nil
// entry not found in tarball upload config
// unauthorized uploader
// bad tarball structure
func IngestTarball(ctx context.Context, name string, content io.Reader) ([]string, int64, error) {
g, err := GetGroupImporterConfig(ctx)
if err != nil {
return nil, 0, err
}
gConfigProto, err := g.ToProto()
if err != nil {
return nil, 0, errors.Annotate(err, "issue getting proto from config entity").Err()
}
caller := auth.CurrentIdentity(ctx)
var entry *configspb.GroupImporterConfig_TarballUploadEntry
// make sure that tarball_upload entry we're looking for is specified in config
for _, tbu := range gConfigProto.GetTarballUpload() {
if tbu.Name == name {
entry = tbu
break
}
}
if entry == nil {
return nil, 0, errors.New("entry is nil")
}
if entry.Name == "" {
return nil, 0, errors.New("entry not found in tarball upload names")
}
if !contains(caller.Email(), entry.AuthorizedUploader) {
return nil, 0, errors.New(fmt.Sprintf("%q is not an authorized uploader", caller.Email()))
}
bundles, err := loadTarball(ctx, content, entry.GetDomain(), entry.GetSystems(), entry.GetGroups())
if err != nil {
return nil, 0, errors.Annotate(err, "bad tarball").Err()
}
return importBundles(ctx, bundles, caller, nil)
}
// loadTarball unzips tarball with groups and deserializes them.
func loadTarball(ctx context.Context, content io.Reader, domain string, systems, groups []string) (map[string]GroupBundle, error) {
// map looks like: K: system, V: { K: groupName, V: []identities }
bundles := make(map[string]GroupBundle)
entries, err := extractTarArchive(content)
if err != nil {
return nil, err
}
// verify system/groupname and then parse blob if valid
for filename, fileobj := range entries {
chunks := strings.Split(filename, "/")
if len(chunks) != 2 || !GroupNameRe.MatchString(chunks[1]) {
logging.Warningf(ctx, "Skipping file %s, not a valid name", filename)
continue
}
if groups != nil && !contains(filename, groups) {
continue
}
system := chunks[0]
if !contains(system, systems) {
logging.Warningf(ctx, "Skipping file %s, not allowed", filename)
continue
}
identities, err := loadGroupFile(string(fileobj), domain)
if err != nil {
return nil, err
}
if _, ok := bundles[system]; !ok {
bundles[system] = make(GroupBundle)
}
bundles[system][filename] = identities
}
return bundles, nil
}
func loadGroupFile(identities string, domain string) ([]identity.Identity, error) {
members := make(map[identity.Identity]bool)
memsSplit := strings.Split(identities, "\n")
for _, uid := range memsSplit {
uid = strings.TrimSpace(uid)
if uid == "" {
continue
}
var ident string
if domain == "" {
ident = fmt.Sprintf("user:%s", uid)
} else {
ident = fmt.Sprintf("user:%s@%s", uid, domain)
}
emailIdent, err := identity.MakeIdentity(ident)
if err != nil {
return nil, err
}
members[emailIdent] = true
}
membersSorted := make([]identity.Identity, 0, len(members))
for mem := range members {
membersSorted = append(membersSorted, mem)
}
sort.Slice(membersSorted, func(i, j int) bool {
return membersSorted[i].Value() < membersSorted[j].Value()
})
return membersSorted, nil
}
// importBundles imports given set of bundles all at once.
// A bundle is a map with groups that is the result of a processing of some tarball.
// A bundle specifies the desired state of all groups under some system, e.g.
// importBundles({'ldap': {}}, ...) will REMOVE all existing 'ldap/*' groups.
//
// Group names in the bundle are specified in their full prefixed form (with
// system name prefix). An example of expected 'bundles':
//
// {
// 'ldap': {
// 'ldap/group': [Identity(...), Identity(...)],
// },
// }
//
// Args:
//
// bundles: map system name -> GroupBundle
// providedBy: auth.Identity to put in modifiedBy or createdBy fields.
//
// Returns:
//
// (list of modified groups,
// new AuthDB revision number or 0 if no changes,
// error if issue with writing entities).
func importBundles(ctx context.Context, bundles map[string]GroupBundle, providedBy identity.Identity, testHook func()) ([]string, int64, error) {
// Nothing to process.
if len(bundles) == 0 {
return []string{}, 0, nil
}
getAuthDBRevision := func(ctx context.Context) (int64, error) {
state, err := GetReplicationState(ctx)
switch {
case errors.Is(err, datastore.ErrNoSuchEntity):
return 0, nil
case err != nil:
return -1, err
default:
return state.AuthDBRev, nil
}
}
// Fetches all existing groups and AuthDB revision number.
groupsSnapshot := func(ctx context.Context) (gMap map[string]*AuthGroup, rev int64, err error) {
err = datastore.RunInTransaction(ctx, func(ctx context.Context) error {
groups, err := GetAllAuthGroups(ctx)
if err != nil {
return err
}
gMap = make(map[string]*AuthGroup, len(groups))
for _, g := range groups {
gMap[g.ID] = g
}
rev, err = getAuthDBRevision(ctx)
if err != nil {
return errors.Annotate(err, "couldn't get AuthDBRev").Err()
}
return nil
}, nil)
return gMap, rev, err
}
// Transactionally puts and deletes a bunch of entities.
applyImport := func(expectedRevision int64, entitiesToPut, entitiesToDelete []*AuthGroup, ts time.Time) error {
// Runs in transaction.
return runAuthDBChange(ctx, "Imported from group bundles", func(ctx context.Context, cae commitAuthEntity) error {
rev, err := getAuthDBRevision(ctx)
if err != nil {
return err
}
// DB changed between transactions try again.
if rev != expectedRevision {
return errors.New("revision numbers don't match")
}
for _, e := range entitiesToPut {
if err := cae(e, ts, providedBy, false); err != nil {
return err
}
}
for _, e := range entitiesToDelete {
if err := cae(e, ts, providedBy, true); err != nil {
return err
}
}
return nil
})
}
updatedGroups := stringset.New(0)
revision := int64(0)
loopCount := 0
var groups map[string]*AuthGroup
var err error
// Try to apply the change in batches until it lands completely or deadline
// happens. Split each batch update into two transactions (assuming AuthDB
// changes infrequently) to avoid reading and writing too much stuff from
// within a single transaction (and to avoid keeping the transaction open while
// calculating the diff).
for {
// Use same timestamp everywhere to reflect that groups were imported
// atomically within a single transaction.
ts := clock.Now(ctx).UTC()
loopCount += 1
groups, revision, err = groupsSnapshot(ctx)
if err != nil {
return nil, revision, err
}
// For testing purposes only.
if testHook != nil && loopCount == 2 {
testHook()
}
entitiesToPut := []*AuthGroup{}
entitiesToDel := []*AuthGroup{}
for sys := range bundles {
iGroups := bundles[sys]
toPut, toDel := prepareImport(ctx, sys, groups, iGroups)
entitiesToPut = append(entitiesToPut, toPut...)
entitiesToDel = append(entitiesToDel, toDel...)
}
if len(entitiesToPut) == 0 && len(entitiesToDel) == 0 {
logging.Infof(ctx, "nothing to do")
break
}
// An `applyImport` transaction can touch at most 500 entities. Cap the
// number of entities we create/delete by 200 each since we attach a historical
// entity to each entity. The rest will be updated on the next cycle of the loop.
// This is safe to do since:
// * Imported groups are "leaf" groups (have no subgroups) and can be added
// in arbitrary order without worrying about referential integrity.
// * Deleted groups are guaranteed to be unreferenced by `prepareImport`
// and can be deleted in arbitrary order as well.
truncated := false
// Both these operations happen in the same transaction so we have
// to trim it to make sure the total is <= 200.
if len(entitiesToPut) > 200 {
entitiesToPut = entitiesToPut[:200]
entitiesToDel = nil
truncated = true
} else if len(entitiesToPut)+len(entitiesToDel) > 200 {
entitiesToDel = entitiesToDel[:200-len(entitiesToPut)]
truncated = true
}
// Log what we are about to do to help debugging transaction errors.
logging.Infof(ctx, "Preparing AuthDB rev %d with %d puts and %d deletes:", revision+1, len(entitiesToPut), len(entitiesToDel))
for _, e := range entitiesToPut {
logging.Infof(ctx, "U %s", e.ID)
updatedGroups.Add(e.ID)
}
for _, e := range entitiesToDel {
logging.Infof(ctx, "D %s", e.ID)
updatedGroups.Add(e.ID)
}
// Land the change iff the current AuthDB revision is still == `revision`.
err := applyImport(revision, entitiesToPut, entitiesToDel, ts)
if err != nil && strings.Contains(err.Error(), "revision numbers don't match") {
logging.Warningf(ctx, "authdb changed between transactions, retrying...")
continue
} else if err != nil {
logging.Errorf(ctx, "couldn't apply changes to datastore entities %s", err.Error())
return nil, revision, err
}
// The new revision has landed
revision += 1
if truncated {
logging.Infof(ctx, "going for another round to push the rest of the groups")
clock.Sleep(ctx, 5*time.Second)
continue
}
logging.Infof(ctx, "Done")
break
}
if len(updatedGroups) > 0 {
return updatedGroups.ToSortedSlice(), int64(revision), nil
}
return nil, 0, nil
}
// prepareImport compares the bundle given to the what is currently present in datastore
// to get the operations for all the groups.
func prepareImport(ctx context.Context, systemName string, existingGroups map[string]*AuthGroup, iGroups GroupBundle) (toPut []*AuthGroup, toDel []*AuthGroup) {
systemGroups := []string{}
iGroupsSet := stringset.New(len(iGroups))
for gID := range existingGroups {
if strings.HasPrefix(gID, fmt.Sprintf("%s/", systemName)) {
systemGroups = append(systemGroups, gID)
}
}
for groupName := range iGroups {
iGroupsSet.Add(groupName)
}
sysGroupsSet := stringset.NewFromSlice(systemGroups...)
toCreate := iGroupsSet.Difference(sysGroupsSet).ToSlice()
for _, g := range toCreate {
group := makeAuthGroup(ctx, g)
group.Members = identitiesToStrings(iGroups[g])
toPut = append(toPut, group)
}
toUpdate := sysGroupsSet.Intersect(iGroupsSet).ToSlice()
for _, g := range toUpdate {
importGMems := stringset.NewFromSlice(identitiesToStrings(iGroups[g])...)
existMems := existingGroups[g].Members
if !(len(importGMems) == len(existMems) && importGMems.HasAll(existMems...)) {
group := makeAuthGroup(ctx, g)
group.Members = importGMems.ToSlice()
toPut = append(toPut, group)
}
}
toDelete := sysGroupsSet.Difference(iGroupsSet).ToSlice()
for _, g := range toDelete {
group := makeAuthGroup(ctx, g)
toDel = append(toDel, group)
}
return toPut, toDel
}
func identitiesToStrings(idents []identity.Identity) []string {
res := make([]string, len(idents))
for i, id := range idents {
res[i] = string(id)
}
return res
}
// extractTarArchive unpacks a tar archive and returns a map
// of filename -> fileobj pairs.
func extractTarArchive(r io.Reader) (map[string][]byte, error) {
entries := make(map[string][]byte)
gzr, err := gzip.NewReader(r)
if err != nil {
return nil, err
}
tr := tar.NewReader(gzr)
for {
header, err := tr.Next()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, err
}
fileContents, err := io.ReadAll(tr)
if err != nil {
return nil, err
}
entries[header.Name] = fileContents
}
if err := gzr.Close(); err != nil {
return nil, err
}
return entries, nil
}
// TODO(cjacomet): replace with slices.Contains when
// slices package isn't experimental.
func contains(key string, search []string) bool {
for _, val := range search {
if val == key {
return true
}
}
return false
}
// ToProto converts the GroupImporterConfig entity to the proto equivalent.
func (g *GroupImporterConfig) ToProto() (*configspb.GroupImporterConfig, error) {
gConfig := &configspb.GroupImporterConfig{}
if err := prototext.Unmarshal([]byte(g.ConfigProto), gConfig); err != nil {
return nil, err
}
return gConfig, nil
}