blob: 8581d252ed6f71d07d826353b749925b60748405 [file] [log] [blame]
// Copyright 2018 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package admin
import (
"context"
"fmt"
"google.golang.org/protobuf/proto"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/server/dsmapper"
api "go.chromium.org/luci/cipd/api/admin/v1"
)
// A registry of mapping job configurations.
//
// Populated during init time. See other *.go files in this package.
var mappers = map[api.MapperKind]*mapperDef{}
// initMapper is called during init time to register some mapper kind.
func initMapper(d mapperDef) {
if _, ok := mappers[d.Kind]; ok {
panic(fmt.Sprintf("mapper with kind %s has already been initialized", d.Kind))
}
mappers[d.Kind] = &d
}
// mapperDef is some single flavor of mapping jobs.
//
// It contains parameters for the mapper (what entity to map over, number of
// shards, etc), and the actual mapping function.
type mapperDef struct {
Kind api.MapperKind // also used to derive dsmapper.ID
Func func(context.Context, dsmapper.JobID, *api.JobConfig, []*datastore.Key) error
Config dsmapper.JobConfig // note: Params will be overwritten
}
// mapperID returns an identifier for this mapper (to use cross-process).
func (m *mapperDef) mapperID() dsmapper.ID {
return dsmapper.ID(fmt.Sprintf("cipd:v1:%s", m.Kind))
}
// newMapper creates new instance of a mapping function.
func (m *mapperDef) newMapper(ctx context.Context, j *dsmapper.Job, shardIdx int) (dsmapper.Mapper, error) {
cfg := &api.JobConfig{}
if err := proto.Unmarshal(j.Config.Params, cfg); err != nil {
return nil, errors.Annotate(err, "failed to unmarshal JobConfig").Err()
}
return func(ctx context.Context, keys []*datastore.Key) error {
return m.Func(ctx, j.ID, cfg, keys)
}, nil
}