| // Copyright 2020 The Chromium Authors |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| package main |
| |
| import ( |
| "context" |
| "fmt" |
| "os" |
| |
| "golang.org/x/time/rate" |
| |
| "go.chromium.org/luci/common/logging" |
| gitilesProto "go.chromium.org/luci/common/proto/gitiles" |
| |
| "go.chromium.org/infra/appengine/cr-rev/backend/gitiles" |
| "go.chromium.org/infra/appengine/cr-rev/backend/pubsub" |
| "go.chromium.org/infra/appengine/cr-rev/backend/repoimport" |
| "go.chromium.org/infra/appengine/cr-rev/common" |
| "go.chromium.org/infra/appengine/cr-rev/config" |
| ) |
| |
| // rateLimit is the maximum number of requests per second that gitiles client |
| // will make to Gitiles server. |
| const rateLimit = 3 |
| |
| func setupImport(ctx context.Context, cfg *config.Config) error { |
| hosts := cfg.GetHosts() |
| for _, host := range hosts { |
| logging.Infof(ctx, "Scanning host %s for new repositories", host.Name) |
| fullHost := fmt.Sprintf("%s.googlesource.com", host.Name) |
| c, err := gitiles.NewAuthClient(ctx, fullHost) |
| if err != nil { |
| return fmt.Errorf("failed to create gitiles auth client: %w", err) |
| } |
| c = gitiles.NewRetriableClient(gitiles.NewThrottlingClient(c, rate.NewLimiter(rateLimit, rateLimit))) |
| ctx := gitiles.SetClient(ctx, c) |
| |
| // Create import controller for each host. |
| importController := repoimport.NewController(repoimport.NewGitilesImporter) |
| go importController.Start(ctx) |
| |
| initialHostImport(ctx, importController, host) |
| |
| pubsubSubscription := host.GetPubsubSubscription() |
| if pubsubSubscription == "" { |
| logging.Warningf(ctx, "No pubsub subscription found for host: %s", host.GetName()) |
| continue |
| } |
| |
| pubsubClient, err := pubsub.NewClient(ctx, os.Getenv("GOOGLE_CLOUD_PROJECT"), pubsubSubscription) |
| if err != nil { |
| logging.Errorf(ctx, "Couldn't subscribe to host %s, pubsub: %s", host.GetName(), pubsubSubscription) |
| continue |
| } |
| go pubsub.Subscribe(ctx, pubsubClient, pubsub.Processor(host)) |
| } |
| return nil |
| } |
| |
| func initialHostImport(ctx context.Context, importController repoimport.Controller, host *config.Host) error { |
| c := gitiles.GetClient(ctx) |
| req := &gitilesProto.ProjectsRequest{} |
| resp, err := c.Projects(ctx, req, nil) |
| if err != nil { |
| return fmt.Errorf("failed to query gitiles for projects: %w", err) |
| } |
| |
| repoConfigs := map[string]*config.Repository{} |
| for _, repo := range host.GetRepos() { |
| repoConfigs[repo.GetName()] = repo |
| } |
| logging.Infof(ctx, "Found %d repositories in %s", len(resp.GetProjects()), host.Name) |
| for _, repo := range resp.GetProjects() { |
| repoConfig, ok := repoConfigs[repo] |
| |
| if ok { |
| if repoConfig.GetDoNotIndex() { |
| continue |
| } |
| } |
| |
| logging.Debugf(ctx, "scheduling scan for: %s/%s", host.Name, repo) |
| importController.Index(common.GitRepository{ |
| Host: host.Name, |
| Name: repo, |
| Config: repoConfig, |
| }) |
| } |
| return nil |
| } |