blob: b34537e01ccb1cc42a04cd0daabb203e98f48a4c [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Per job config for map jobs."""
from google.appengine.ext.mapreduce import hooks
from google.appengine.ext.mapreduce import input_readers
from google.appengine.ext.mapreduce import output_writers
from google.appengine.ext.mapreduce import parameters
from google.appengine.ext.mapreduce import util
from google.appengine.ext.mapreduce.api.map_job import input_reader
from google.appengine.ext.mapreduce.api.map_job import mapper as mapper_module
_Option = parameters._Option
_API_VERSION = 1
class JobConfig(parameters._Config):
"""Configurations for a map job.
Names started with _ are reserved for internal use.
To create an instance:
all option names can be used as keys to __init__.
If an option is required, the key must be provided.
If an option isn't required and no value is given, the default value
will be used.
"""
job_name = _Option(basestring, required=True)
job_id = _Option(basestring, default_factory=util._get_descending_key)
mapper = _Option(mapper_module.Mapper, required=True)
input_reader_cls = _Option(input_reader.InputReader, required=True)
input_reader_params = _Option(dict, default_factory=lambda: {})
output_writer_cls = _Option(output_writers.OutputWriter,
can_be_none=True)
output_writer_params = _Option(dict, default_factory=lambda: {})
shard_count = _Option(int,
default_factory=lambda: parameters.config.SHARD_COUNT)
user_params = _Option(dict, default_factory=lambda: {})
queue_name = _Option(
basestring, default_factory=lambda: parameters.config.QUEUE_NAME)
shard_max_attempts = _Option(
int, default_factory=lambda: parameters.config.SHARD_MAX_ATTEMPTS)
done_callback_url = _Option(basestring, can_be_none=True)
_force_writes = _Option(bool, default_factory=lambda: False)
_base_path = _Option(basestring,
default_factory=lambda: parameters.config.BASE_PATH)
_task_max_attempts = _Option(
int, default_factory=lambda: parameters.config.TASK_MAX_ATTEMPTS)
_task_max_data_processing_attempts = _Option(
int, default_factory=(
lambda: parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS))
_hooks_cls = _Option(hooks.Hooks, can_be_none=True)
_app = _Option(basestring, can_be_none=True)
_api_version = _Option(int, default_factory=lambda: _API_VERSION)
def _get_mapper_params(self):
"""Converts self to model.MapperSpec.params."""
reader_params = self.input_reader_cls.params_to_json(
self.input_reader_params)
return {"input_reader": reader_params,
"output_writer": self.output_writer_params}
def _get_mapper_spec(self):
"""Converts self to model.MapperSpec."""
from google.appengine.ext.mapreduce import model
return model.MapperSpec(
handler_spec=util._obj_to_path(self.mapper),
input_reader_spec=util._obj_to_path(self.input_reader_cls),
params=self._get_mapper_params(),
shard_count=self.shard_count,
output_writer_spec=util._obj_to_path(self.output_writer_cls))
def _get_mr_params(self):
"""Converts self to model.MapreduceSpec.params."""
return {"force_writes": self._force_writes,
"done_callback": self.done_callback_url,
"user_params": self.user_params,
"shard_max_attempts": self.shard_max_attempts,
"task_max_attempts": self._task_max_attempts,
"task_max_data_processing_attempts":
self._task_max_data_processing_attempts,
"queue_name": self.queue_name,
"base_path": self._base_path,
"app_id": self._app,
"api_version": self._api_version}
@classmethod
def _get_default_mr_params(cls):
"""Gets default values for old API."""
cfg = cls(_lenient=True)
mr_params = cfg._get_mr_params()
mr_params["api_version"] = 0
return mr_params
@classmethod
def _to_map_job_config(cls,
mr_spec,
queue_name):
"""Converts model.MapreduceSpec back to JobConfig.
This method allows our internal methods to use JobConfig directly.
This method also allows us to expose JobConfig as an API during execution,
despite that it is not saved into datastore.
Args:
mr_spec: model.MapreduceSpec.
queue_name: queue name.
Returns:
The JobConfig object for this job.
"""
mapper_spec = mr_spec.mapper
api_version = mr_spec.params.get("api_version", 0)
old_api = api_version == 0
input_reader_cls = mapper_spec.input_reader_class()
input_reader_params = input_readers._get_params(mapper_spec)
if issubclass(input_reader_cls, input_reader.InputReader):
input_reader_params = input_reader_cls.params_from_json(
input_reader_params)
output_writer_cls = mapper_spec.output_writer_class()
output_writer_params = output_writers._get_params(mapper_spec)
return cls(_lenient=old_api,
job_name=mr_spec.name,
job_id=mr_spec.mapreduce_id,
mapper=util.for_name(mapper_spec.handler_spec),
input_reader_cls=input_reader_cls,
input_reader_params=input_reader_params,
output_writer_cls=output_writer_cls,
output_writer_params=output_writer_params,
shard_count=mapper_spec.shard_count,
queue_name=queue_name,
user_params=mr_spec.params.get("user_params"),
shard_max_attempts=mr_spec.params.get("shard_max_attempts"),
done_callback_url=mr_spec.params.get("done_callback"),
_force_writes=mr_spec.params.get("force_writes"),
_base_path=mr_spec.params["base_path"],
_task_max_attempts=mr_spec.params.get("task_max_attempts"),
_task_max_data_processing_attempts=(
mr_spec.params.get("task_max_data_processing_attempts")),
_hooks_cls=util.for_name(mr_spec.hooks_class_name),
_app=mr_spec.params.get("app_id"),
_api_version=api_version)