blob: 8581d252ed6f71d07d826353b749925b60748405 [file] [log] [blame]
// Copyright 2018 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package admin
import (
api ""
// A registry of mapping job configurations.
// Populated during init time. See other *.go files in this package.
var mappers = map[api.MapperKind]*mapperDef{}
// initMapper is called during init time to register some mapper kind.
func initMapper(d mapperDef) {
if _, ok := mappers[d.Kind]; ok {
panic(fmt.Sprintf("mapper with kind %s has already been initialized", d.Kind))
mappers[d.Kind] = &d
// mapperDef is some single flavor of mapping jobs.
// It contains parameters for the mapper (what entity to map over, number of
// shards, etc), and the actual mapping function.
type mapperDef struct {
Kind api.MapperKind // also used to derive dsmapper.ID
Func func(context.Context, dsmapper.JobID, *api.JobConfig, []*datastore.Key) error
Config dsmapper.JobConfig // note: Params will be overwritten
// mapperID returns an identifier for this mapper (to use cross-process).
func (m *mapperDef) mapperID() dsmapper.ID {
return dsmapper.ID(fmt.Sprintf("cipd:v1:%s", m.Kind))
// newMapper creates new instance of a mapping function.
func (m *mapperDef) newMapper(ctx context.Context, j *dsmapper.Job, shardIdx int) (dsmapper.Mapper, error) {
cfg := &api.JobConfig{}
if err := proto.Unmarshal(j.Config.Params, cfg); err != nil {
return nil, errors.Annotate(err, "failed to unmarshal JobConfig").Err()
return func(ctx context.Context, keys []*datastore.Key) error {
return m.Func(ctx, j.ID, cfg, keys)
}, nil