blob: 347b9ad1e0e451f08bdd7cfc9137865caa6bc971 [file] [log] [blame]
// Copyright 2018 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package admin
import (
api ""
// A registry of mapping job configurations.
// Populated during init time. See other *.go files in this package.
var mappers = map[api.MapperKind]*mapperDef{}
// initMapper is called during init time to register some mapper kind.
func initMapper(d mapperDef) {
if _, ok := mappers[d.Kind]; ok {
panic(fmt.Sprintf("mapper with kind %s has already been initialized", d.Kind))
mappers[d.Kind] = &d
// mapperDef is some single flavor of mapping jobs.
// It contains parameters for the mapper (what entity to map over, number of
// shards, etc), and the actual mapping function.
type mapperDef struct {
Kind api.MapperKind // also used to derive mapper.ID
Func func(context.Context, mapper.JobID, *api.JobConfig, []*datastore.Key) error
Config mapper.JobConfig // note: Params will be overwritten
// mapperID returns an identifier for this mapper (to use cross-process).
func (m *mapperDef) mapperID() mapper.ID {
return mapper.ID(fmt.Sprintf("cipd:v1:%s", m.Kind))
// newMapper creates new instance of a mapping function.
func (m *mapperDef) newMapper(ctx context.Context, j *mapper.Job, shardIdx int) (mapper.Mapper, error) {
cfg := &api.JobConfig{}
if err := proto.Unmarshal(j.Config.Params, cfg); err != nil {
return nil, errors.Annotate(err, "failed to unmarshal JobConfig").Err()
return func(ctx context.Context, keys []*datastore.Key) error {
return m.Func(ctx, j.ID, cfg, keys)
}, nil