blob: c041c6599285553f987717a159e3a6f3ce7a1fc6 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Parameters to control Mapreduce."""
__all__ = ["CONFIG_NAMESPACE",
"config"]
import pickle
import google
try:
from appengine_pipeline.src.pipeline import util as pipeline_util
except ImportError:
pipeline_util = None
from google.appengine.api import lib_config
CONFIG_NAMESPACE = "mapreduce"
class _JobConfigMeta(type):
"""Metaclass that controls class creation."""
_OPTIONS = "_options"
_REQUIRED = "_required"
def __new__(mcs, classname, bases, class_dict):
"""Creates a _Config class and modifies its class dict.
Args:
classname: name of the class.
bases: a list of base classes.
class_dict: original class dict.
Returns:
A new _Config class. The modified class will have two fields.
_options field is a dict from option name to _Option objects.
_required field is a set of required option names.
"""
options = {}
required = set()
for name, option in class_dict.iteritems():
if isinstance(option, _Option):
options[name] = option
if option.required:
required.add(name)
for name in options:
class_dict.pop(name)
class_dict[mcs._OPTIONS] = options
class_dict[mcs._REQUIRED] = required
cls = type.__new__(mcs, classname, bases, class_dict)
if object not in bases:
parent_options = {}
for c in reversed(cls.__mro__):
if mcs._OPTIONS in c.__dict__:
parent_options.update(c.__dict__[mcs._OPTIONS])
if mcs._REQUIRED in c.__dict__:
required.update(c.__dict__[mcs._REQUIRED])
for k, v in parent_options.iteritems():
if k not in options:
options[k] = v
return cls
class _Option(object):
"""An option for _Config."""
def __init__(self, kind, required=False, default_factory=None,
can_be_none=False):
"""Init.
Args:
kind: type of the option.
required: whether user is required to supply a value.
default_factory: a factory, when called, returns the default value.
can_be_none: whether value can be None.
Raises:
ValueError: if arguments aren't compatible.
"""
if required and default_factory is not None:
raise ValueError("No default_factory value when option is required.")
self.kind = kind
self.required = required
self.default_factory = default_factory
self.can_be_none = can_be_none
class _Config(object):
"""Root class for all per job configuration."""
__metaclass__ = _JobConfigMeta
def __init__(self, _lenient=False, **kwds):
"""Init.
Args:
_lenient: When true, no option is required.
**kwds: keyword arguments for options and their values.
"""
self._verify_keys(kwds, _lenient)
self._set_values(kwds, _lenient)
def _verify_keys(self, kwds, _lenient):
keys = set()
for k in kwds:
if k not in self._options:
raise ValueError("Option %s is not supported." % (k))
keys.add(k)
if not _lenient:
missing = self._required - keys
if missing:
raise ValueError("Options %s are required." % tuple(missing))
def _set_values(self, kwds, _lenient):
for k, option in self._options.iteritems():
v = kwds.get(k)
if v is None and option.default_factory:
v = option.default_factory()
setattr(self, k, v)
if _lenient:
continue
if v is None and option.can_be_none:
continue
if isinstance(v, type) and not issubclass(v, option.kind):
raise TypeError(
"Expect subclass of %r for option %s. Got %r" % (
option.kind, k, v))
if not isinstance(v, type) and not isinstance(v, option.kind):
raise TypeError("Expect type %r for option %s. Got %r" % (
option.kind, k, v))
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
return other.__dict__ == self.__dict__
def __repr__(self):
return str(self.__dict__)
def to_json(self):
return {"config": pickle.dumps(self)}
@classmethod
def from_json(cls, json):
return pickle.loads(json["config"])
class _ConfigDefaults(object):
"""Default configs.
Do not change parameters whose names begin with _.
SHARD_MAX_ATTEMPTS: Max attempts to execute a shard before giving up.
TASK_MAX_ATTEMPTS: Max attempts to execute a task before dropping it. Task
is any taskqueue task created by MR framework. A task is dropped
when its X-AppEngine-TaskExecutionCount is bigger than this number.
Dropping a task will cause abort on the entire MR job.
TASK_MAX_DATA_PROCESSING_ATTEMPTS:
Max times to execute a task when previous task attempts failed during
data processing stage. An MR work task has three major stages:
initial setup, data processing, and final checkpoint.
Setup stage should be allowed to be retried more times than data processing
stage: setup failures are caused by unavailable GAE services while
data processing failures are mostly due to user function error out on
certain input data. Thus, set TASK_MAX_ATTEMPTS higher than this parameter.
QUEUE_NAME: Default queue for MR.
SHARD_COUNT: Default shard count.
PROCESSING_RATE_PER_SEC: Default rate of processed entities per second.
BASE_PATH : Base path of mapreduce and pipeline handlers.
"""
SHARD_MAX_ATTEMPTS = 4
TASK_MAX_ATTEMPTS = 31
TASK_MAX_DATA_PROCESSING_ATTEMPTS = 11
QUEUE_NAME = "default"
SHARD_COUNT = 8
PROCESSING_RATE_PER_SEC = 1000000
BASE_PATH = "/_ah/mapreduce"
_SLICE_DURATION_SEC = 15
_CONTROLLER_PERIOD_SEC = 2
config = lib_config.register(CONFIG_NAMESPACE, _ConfigDefaults.__dict__)
_DEFAULT_PIPELINE_BASE_PATH = config.BASE_PATH + "/pipeline"
_GCS_URLFETCH_TIMEOUT_SEC = 30
_LEASE_DURATION_SEC = config._SLICE_DURATION_SEC * 1.1
_MAX_LEASE_DURATION_SEC = max(10 * 60 + 30, config._SLICE_DURATION_SEC * 1.5)