| // Copyright 2018 The LUCI Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package git |
| |
| import ( |
| "container/heap" |
| "context" |
| "fmt" |
| "sync" |
| |
| "github.com/golang/protobuf/proto" |
| |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/status" |
| |
| gitilesapi "go.chromium.org/luci/common/api/gitiles" |
| "go.chromium.org/luci/common/errors" |
| "go.chromium.org/luci/common/logging" |
| gitpb "go.chromium.org/luci/common/proto/git" |
| "go.chromium.org/luci/common/sync/parallel" |
| "go.chromium.org/luci/gae/service/datastore" |
| "go.chromium.org/luci/milo/common" |
| ) |
| |
| // A structure to keep a list of commits for some ref. |
| type refCommits struct { |
| commits []*gitpb.Commit |
| } |
| |
| // The pop method removes and returns first commit. Second return value is true |
| // if this was the last commit. Caller must ensure refCommits has commits when |
| // calling the method. |
| func (rc *refCommits) pop() (commit *gitpb.Commit, empty bool) { |
| commit, rc.commits = rc.commits[0], rc.commits[1:] |
| return commit, len(rc.commits) == 0 |
| } |
| |
| // We use commitHeap to merge slices of commits using max-heap algorithm below. |
| // Only first commit in each slice is used for comparisons. |
| type commitHeap []refCommits |
| |
| func (h commitHeap) Len() int { |
| return len(h) |
| } |
| |
| func (h commitHeap) Swap(i, j int) { |
| h[i], h[j] = h[j], h[i] |
| } |
| |
| func (h commitHeap) Less(i, j int) bool { |
| iTime := h[i].commits[0].Committer.Time.AsTime() |
| jTime := h[j].commits[0].Committer.Time.AsTime() |
| |
| // Ensure consistent ordering based on commit hash when times are identical. |
| if iTime == jTime { |
| return h[i].commits[0].Id > h[j].commits[0].Id |
| } |
| |
| // To make heap behave as max-heap, we consider later time to be smaller than |
| // earlier timer, i.e. latest commit will be the at the root of the heap. |
| return iTime.After(jTime) |
| } |
| |
| func (h *commitHeap) Push(x interface{}) { |
| *h = append(*h, x.(refCommits)) |
| } |
| |
| func (h *commitHeap) Pop() interface{} { |
| old := *h |
| n := len(old) |
| x := old[n-1] |
| *h = old[0 : n-1] |
| return x |
| } |
| |
| // logCache stores a cached list of commits (log) for a given ref at a given |
| // commit position return by Gerrit. The Key describes the query that was used |
| // to retrieve the log and follows the following format: |
| // |
| // host|project|ref|exclude_ref|limit |
| // |
| // When the ref moves, entity is updated with the new CommitID and updated Log. |
| // The Log field is an encoded list of commits, which is a created by encoding a |
| // varint for the number of commits in the list followed by the corresponding |
| // number of serialized gitpb.Commit messages. |
| type logCache struct { |
| Key string `gae:"$id"` |
| CommitID string `gae:"commit,noindex"` |
| Log []byte `gae:"log,noindex"` |
| |
| ref string `gae:"-"` |
| } |
| |
| func logCacheFor(host, project, ref, excludeRef string, limit int) logCache { |
| return logCache{ |
| Key: fmt.Sprintf("%s|%s|%s|%s|%d", host, project, ref, excludeRef, limit), |
| ref: ref, |
| } |
| } |
| |
| func loadCacheFromDS(c context.Context, host, project, excludeRef string, limit int, refTips map[string]string) (cachedLogs map[string][]*gitpb.Commit) { |
| items := make([]logCache, 0, len(refTips)) |
| for ref := range refTips { |
| items = append(items, logCacheFor(host, project, ref, excludeRef, limit)) |
| } |
| |
| cachedLogs = map[string][]*gitpb.Commit{} |
| var merr errors.MultiError |
| switch err := datastore.Get(c, items).(type) { |
| case errors.MultiError: |
| merr = err |
| case nil: |
| merr = nil |
| default: |
| return |
| } |
| |
| for i, item := range items { |
| if (merr != nil && merr[i] != nil) || item.CommitID != refTips[item.ref] { |
| continue |
| } |
| |
| buf := proto.NewBuffer(item.Log) |
| numCommits, err := buf.DecodeVarint() |
| if err != nil { |
| continue |
| } |
| |
| log := make([]*gitpb.Commit, 0, numCommits) |
| for j := uint64(0); j < numCommits; j++ { |
| var commit gitpb.Commit |
| if err = buf.DecodeMessage(&commit); err != nil { |
| continue |
| } |
| |
| log = append(log, &commit) |
| } |
| |
| cachedLogs[item.ref] = log |
| } |
| |
| return |
| } |
| |
| func saveCacheToDS(c context.Context, host, project, excludeRef string, limit int, refLogs map[string][]*gitpb.Commit, refTips map[string]string) error { |
| items := make([]logCache, 0, len(refLogs)) |
| totalBytes := 0 |
| for ref, log := range refLogs { |
| buf := proto.NewBuffer([]byte{}) |
| if err := buf.EncodeVarint(uint64(len(log))); err != nil { |
| return err |
| } |
| |
| for _, commit := range log { |
| if err := buf.EncodeMessage(commit); err != nil { |
| return err |
| } |
| } |
| |
| item := logCacheFor(host, project, ref, excludeRef, limit) |
| item.CommitID = refTips[ref] |
| item.Log = buf.Bytes() |
| items = append(items, item) |
| |
| // This logic breaks storing caches into datastore into smaller requests to |
| // avoid exceeding 1MB limit on datastore requests set by AppEngine. |
| totalBytes += len(item.Log) |
| if totalBytes > 512*1024 { // 0.5 MiB |
| if err := datastore.Put(c, items); err != nil { |
| return err |
| } |
| totalBytes = 0 |
| items = items[:0] |
| } |
| } |
| |
| return datastore.Put(c, items) |
| } |
| |
| // maxGitilesLogRPCsPerRequest is the max number of Gitiles requests allowed per |
| // user request to avoid exceeding Gitiles quota. |
| const maxGitilesLogRPCsPerRequest = 50 |
| |
| func (impl *implementation) loadLogsForRefs(c context.Context, host, project, excludeRef string, limit int, refTips map[string]string) ([][]*gitpb.Commit, error) { |
| cachedLogs := loadCacheFromDS(c, host, project, excludeRef, limit, refTips) |
| logging.Infof(c, "Fetched %d logs from cache, will fetch remaining %d logs from Gitiles", len(cachedLogs), len(refTips)-len(cachedLogs)) |
| |
| // Load missing logs from Gitiles. |
| newLogs := make(map[string][]*gitpb.Commit) |
| lock := sync.Mutex{} // for concurrent writes to the map |
| err := parallel.WorkPool(8, func(ch chan<- func() error) { |
| numRequests := 0 |
| for ref := range refTips { |
| if _, ok := cachedLogs[ref]; ok { |
| continue |
| } |
| |
| if numRequests++; numRequests > maxGitilesLogRPCsPerRequest { |
| ch <- func() error { |
| // TODO(sergiyb,tandrii): if you have genuine need for this many refs |
| // at once, implement a cron job that runs this very function |
| // continuously to avoid bursts of gitiles traffic that will make Milo |
| // not functional for the other projects. |
| return errors.Reason("too many refs are new or changed to be "+ |
| "fetched at once, stopping after %d. Check your config and/or "+ |
| "reload the page", maxGitilesLogRPCsPerRequest).Err() |
| } |
| break |
| } |
| |
| ref := ref |
| ch <- func() error { |
| log, err := impl.log(c, host, project, refTips[ref], excludeRef, &LogOptions{Limit: limit}) |
| if err != nil { |
| return err |
| } |
| |
| lock.Lock() |
| defer lock.Unlock() |
| newLogs[ref] = log |
| return nil |
| } |
| } |
| }) |
| |
| // Try to cache what we've fetched even if some requests failed. |
| if derr := saveCacheToDS(c, host, project, excludeRef, limit, newLogs, refTips); derr != nil { |
| logging.WithError(derr).Warningf(c, "Failed to cache logs fetched from Gitiles") |
| } |
| |
| if err != nil { |
| return nil, errors.Annotate(err, "failed to fetch %d logs from Gitiles", len(refTips)-len(cachedLogs)-len(newLogs)).Err() |
| } |
| |
| // Drop ref names and create a list containing all logs. |
| logs := make([][]*gitpb.Commit, 0, len(cachedLogs)+len(newLogs)) |
| for _, log := range cachedLogs { |
| logs = append(logs, log) |
| } |
| for _, log := range newLogs { |
| logs = append(logs, log) |
| } |
| |
| return logs, nil |
| } |
| |
| // CombinedLogs implements Client interface. |
| func (impl *implementation) CombinedLogs(c context.Context, host, project, excludeRef string, refs []string, limit int) (commits []*gitpb.Commit, err error) { |
| defer func() { err = errors.Annotate(common.TagGRPC(c, err), "gitiles.CombinedLogs").Err() }() |
| |
| // Check if the user is allowed to access this project. |
| allowed, err := impl.acls.IsAllowed(c, host, project) |
| switch { |
| case err != nil: |
| return |
| case !allowed: |
| err = status.Errorf(codes.NotFound, "not found") |
| return |
| } |
| |
| // Prepare Gitiles client. |
| client, err := impl.gitilesClient(c, host) |
| if err != nil { |
| return |
| } |
| |
| // Resolve all refs and commits they are pointing at. |
| refTips, missingRefs, err := gitilesapi.NewRefSet(refs).Resolve(c, client, project) |
| if err != nil { |
| return |
| } |
| if len(missingRefs) > 0 { |
| logging.Warningf(c, "configured refs %s weren't resolved to any ref; either incorrect ACLs or redudant refs", missingRefs) |
| } |
| |
| var logs [][]*gitpb.Commit |
| if logs, err = impl.loadLogsForRefs(c, host, project, excludeRef, limit, refTips); err != nil { |
| return |
| } |
| |
| // We merge commits from all refs sorted by time into a single list up to a |
| // limit. We use max-heap based merging algorithm below. |
| var h commitHeap |
| for _, log := range logs { |
| if len(log) > 0 { |
| h = append(h, refCommits{log}) |
| } |
| } |
| |
| // Keep adding commits to the merged list until we reach the limit or run out |
| // of commits on all refs. |
| heap.Init(&h) |
| commits = make([]*gitpb.Commit, 0, limit) |
| for len(commits) < limit && len(h) != 0 { |
| commit, empty := h[0].pop() |
| // Do not add duplicate commits that come from different refs. |
| if len(commits) == 0 || commits[len(commits)-1].Id != commit.Id { |
| commits = append(commits, commit) |
| } |
| if empty { |
| heap.Remove(&h, 0) |
| } else { |
| heap.Fix(&h, 0) |
| } |
| } |
| |
| return |
| } |