blob: 480e783799175a3dc6bc0f5256bfd9b8f7d15e21 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Status page handler for mapreduce framework."""
import os
import pkgutil
import time
from google.appengine.api import validation
from google.appengine.api import yaml_builder
from google.appengine.api import yaml_errors
from google.appengine.api import yaml_listener
from google.appengine.api import yaml_object
from google.appengine.ext import db
from google.appengine.ext import webapp
from google.appengine.ext.mapreduce import base_handler
from google.appengine.ext.mapreduce import errors
from google.appengine.ext.mapreduce import model
MR_YAML_NAMES = ["mapreduce.yaml", "mapreduce.yml"]
class BadStatusParameterError(Exception):
"""A parameter passed to a status handler was invalid."""
class UserParam(validation.Validated):
"""A user-supplied parameter to a mapreduce job."""
ATTRIBUTES = {
"name": r"[a-zA-Z0-9_\.]+",
"default": validation.Optional(r".*"),
"value": validation.Optional(r".*"),
}
class MapperInfo(validation.Validated):
"""Configuration parameters for the mapper part of the job."""
ATTRIBUTES = {
"handler": r".+",
"input_reader": r".+",
"output_writer": validation.Optional(r".+"),
"params": validation.Optional(validation.Repeated(UserParam)),
"params_validator": validation.Optional(r".+"),
}
class MapreduceInfo(validation.Validated):
"""Mapreduce description in mapreduce.yaml."""
ATTRIBUTES = {
"name": r".+",
"mapper": MapperInfo,
"params": validation.Optional(validation.Repeated(UserParam)),
"params_validator": validation.Optional(r".+"),
}
class MapReduceYaml(validation.Validated):
"""Root class for mapreduce.yaml.
File format:
mapreduce:
- name: <mapreduce_name>
mapper:
- input_reader: google.appengine.ext.mapreduce.DatastoreInputReader
- handler: path_to_my.MapperFunction
- params:
- name: foo
default: bar
- name: blah
default: stuff
- params_validator: path_to_my.ValidatorFunction
Where
mapreduce_name: The name of the mapreduce. Used for UI purposes.
mapper_handler_spec: Full <module_name>.<function_name/class_name> of
mapper handler. See MapreduceSpec class documentation for full handler
specification.
input_reader: Full <module_name>.<function_name/class_name> of the
InputReader sub-class to use for the mapper job.
params: A list of optional parameter names and optional default values
that may be supplied or overridden by the user running the job.
params_validator is full <module_name>.<function_name/class_name> of
a callable to validate the mapper_params after they are input by the
user running the job.
"""
ATTRIBUTES = {
"mapreduce": validation.Optional(validation.Repeated(MapreduceInfo))
}
@staticmethod
def to_dict(mapreduce_yaml):
"""Converts a MapReduceYaml file into a JSON-encodable dictionary.
For use in user-visible UI and internal methods for interfacing with
user code (like param validation). as a list
Args:
mapreduce_yaml: The Pyton representation of the mapreduce.yaml document.
Returns:
A list of configuration dictionaries.
"""
all_configs = []
for config in mapreduce_yaml.mapreduce:
out = {
"name": config.name,
"mapper_input_reader": config.mapper.input_reader,
"mapper_handler": config.mapper.handler,
}
if config.mapper.params_validator:
out["mapper_params_validator"] = config.mapper.params_validator
if config.mapper.params:
param_defaults = {}
for param in config.mapper.params:
param_defaults[param.name] = param.default or param.value
out["mapper_params"] = param_defaults
if config.params:
param_defaults = {}
for param in config.params:
param_defaults[param.name] = param.default or param.value
out["params"] = param_defaults
if config.mapper.output_writer:
out["mapper_output_writer"] = config.mapper.output_writer
all_configs.append(out)
return all_configs
def find_mapreduce_yaml(status_file=__file__):
"""Traverse directory trees to find mapreduce.yaml file.
Begins with the location of status.py and then moves on to check the working
directory.
Args:
status_file: location of status.py, overridable for testing purposes.
Returns:
the path of mapreduce.yaml file or None if not found.
"""
checked = set()
yaml = _find_mapreduce_yaml(os.path.dirname(status_file), checked)
if not yaml:
yaml = _find_mapreduce_yaml(os.getcwd(), checked)
return yaml
def _find_mapreduce_yaml(start, checked):
"""Traverse the directory tree identified by start until a directory already
in checked is encountered or the path of mapreduce.yaml is found.
Checked is present both to make loop termination easy to reason about and so
that the same directories do not get rechecked.
Args:
start: the path to start in and work upward from
checked: the set of already examined directories
Returns:
the path of mapreduce.yaml file or None if not found.
"""
dir = start
while dir not in checked:
checked.add(dir)
for mr_yaml_name in MR_YAML_NAMES:
yaml_path = os.path.join(dir, mr_yaml_name)
if os.path.exists(yaml_path):
return yaml_path
dir = os.path.dirname(dir)
return None
def parse_mapreduce_yaml(contents):
"""Parses mapreduce.yaml file contents.
Args:
contents: mapreduce.yaml file contents.
Returns:
MapReduceYaml object with all the data from original file.
Raises:
errors.BadYamlError: when contents is not a valid mapreduce.yaml file.
"""
try:
builder = yaml_object.ObjectBuilder(MapReduceYaml)
handler = yaml_builder.BuilderHandler(builder)
listener = yaml_listener.EventListener(handler)
listener.Parse(contents)
mr_info = handler.GetResults()
except (ValueError, yaml_errors.EventError), e:
raise errors.BadYamlError(e)
if len(mr_info) < 1:
raise errors.BadYamlError("No configs found in mapreduce.yaml")
if len(mr_info) > 1:
raise errors.MultipleDocumentsInMrYaml("Found %d YAML documents" %
len(mr_info))
jobs = mr_info[0]
job_names = set(j.name for j in jobs.mapreduce)
if len(jobs.mapreduce) != len(job_names):
raise errors.BadYamlError(
"Overlapping mapreduce names; names must be unique")
return jobs
def get_mapreduce_yaml(parse=parse_mapreduce_yaml):
"""Locates mapreduce.yaml, loads and parses its info.
Args:
parse: Used for testing.
Returns:
MapReduceYaml object.
Raises:
errors.BadYamlError: when contents is not a valid mapreduce.yaml file or the
file is missing.
"""
mr_yaml_path = find_mapreduce_yaml()
if not mr_yaml_path:
raise errors.MissingYamlError()
mr_yaml_file = open(mr_yaml_path)
try:
return parse(mr_yaml_file.read())
finally:
mr_yaml_file.close()
class ResourceHandler(webapp.RequestHandler):
"""Handler for static resources."""
_RESOURCE_MAP = {
"status": ("overview.html", "text/html"),
"detail": ("detail.html", "text/html"),
"base.css": ("base.css", "text/css"),
"jquery.js": ("jquery-1.6.1.min.js", "text/javascript"),
"jquery-json.js": ("jquery.json-2.2.min.js", "text/javascript"),
"jquery-url.js": ("jquery.url.js", "text/javascript"),
"status.js": ("status.js", "text/javascript"),
}
def get(self, relative):
if relative not in self._RESOURCE_MAP:
self.response.set_status(404)
self.response.out.write("Resource not found.")
return
real_path, content_type = self._RESOURCE_MAP[relative]
path = os.path.join(os.path.dirname(__file__), "static", real_path)
self.response.headers["Cache-Control"] = "public; max-age=300"
self.response.headers["Content-Type"] = content_type
try:
data = pkgutil.get_data(__name__, "static/" + real_path)
except AttributeError:
data = None
self.response.out.write(data or open(path).read())
class ListConfigsHandler(base_handler.GetJsonHandler):
"""Lists mapreduce configs as JSON for users to start jobs."""
def handle(self):
self.json_response["configs"] = MapReduceYaml.to_dict(get_mapreduce_yaml())
class ListJobsHandler(base_handler.GetJsonHandler):
"""Lists running and completed mapreduce jobs for an overview as JSON."""
def handle(self):
cursor = self.request.get("cursor")
count = int(self.request.get("count", "50"))
query = model.MapreduceState.all()
if cursor:
query.filter("__key__ >=", db.Key(cursor))
query.order("__key__")
jobs_list = query.fetch(count + 1)
if len(jobs_list) == (count + 1):
self.json_response["cursor"] = str(jobs_list[-1].key())
jobs_list = jobs_list[:-1]
all_jobs = []
for job in jobs_list:
out = {
"name": job.mapreduce_spec.name,
"mapreduce_id": job.mapreduce_spec.mapreduce_id,
"active": job.active,
"start_timestamp_ms":
int(time.mktime(job.start_time.utctimetuple()) * 1000),
"updated_timestamp_ms":
int(time.mktime(job.last_poll_time.utctimetuple()) * 1000),
"chart_url": job.sparkline_url,
"chart_width": job.chart_width,
"active_shards": job.active_shards,
"shards": job.mapreduce_spec.mapper.shard_count,
}
if job.result_status:
out["result_status"] = job.result_status
all_jobs.append(out)
self.json_response["jobs"] = all_jobs
class GetJobDetailHandler(base_handler.GetJsonHandler):
"""Retrieves the details of a mapreduce job as JSON."""
def handle(self):
mapreduce_id = self.request.get("mapreduce_id")
if not mapreduce_id:
raise BadStatusParameterError("'mapreduce_id' was invalid")
job = model.MapreduceState.get_by_key_name(mapreduce_id)
if job is None:
raise KeyError("Could not find job with ID %r" % mapreduce_id)
self.json_response.update(job.mapreduce_spec.to_json())
self.json_response.update(job.counters_map.to_json())
self.json_response.update({
"active": job.active,
"start_timestamp_ms":
int(time.mktime(job.start_time.utctimetuple()) * 1000),
"updated_timestamp_ms":
int(time.mktime(job.last_poll_time.utctimetuple()) * 1000),
"chart_url": job.chart_url,
"chart_width": job.chart_width,
})
self.json_response["result_status"] = job.result_status
all_shards = []
for shard in model.ShardState.find_all_by_mapreduce_state(job):
out = {
"active": shard.active,
"result_status": shard.result_status,
"shard_number": shard.shard_number,
"shard_id": shard.shard_id,
"updated_timestamp_ms":
int(time.mktime(shard.update_time.utctimetuple()) * 1000),
"shard_description": shard.shard_description,
"last_work_item": shard.last_work_item,
}
out.update(shard.counters_map.to_json())
all_shards.append(out)
all_shards.sort(key=lambda x: x["shard_number"])
self.json_response["shards"] = all_shards