blob: abb4cf9e5182835549500c64933c2235dc498ade [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""User API for controlling Map job execution."""
from google.appengine.api import taskqueue
from google.appengine.datastore import datastore_rpc
from google.appengine.ext import db
from google.appengine.ext.mapreduce import model
from google.appengine.ext.mapreduce import util
from google.appengine.ext.mapreduce.api.map_job import map_job_config
class Job(object):
"""The job submitter's view of the job.
The class allows user to submit a job, control a submitted job,
query its state and result.
"""
RUNNING = "running"
FAILED = model.MapreduceState.RESULT_FAILED
ABORTED = model.MapreduceState.RESULT_ABORTED
SUCCESS = model.MapreduceState.RESULT_SUCCESS
STATUS_ENUM = [RUNNING, FAILED, ABORTED, SUCCESS]
def __init__(self, state=None):
"""Init the job instance representing the job with id job_id.
Do not directly call this method. Use class methods to construct
new instances.
Args:
state: model.MapreduceState.
"""
self._state = state
self.job_config = map_job_config.JobConfig._to_map_job_config(
state.mapreduce_spec,
queue_name=state.mapreduce_spec.params.get("queue_name"))
@classmethod
def get_job_by_id(cls, job_id=None):
"""Gets the job instance representing the job with id job_id.
Args:
job_id: a job id, job_config.job_id, of a submitted job.
Returns:
A Job instance for job_id.
"""
state = cls.__get_state_by_id(job_id)
return cls(state)
def get_status(self):
"""Get status enum.
Returns:
One of the status enum.
"""
self.__update_state()
if self._state.active:
return self.RUNNING
else:
return self._state.result_status
def abort(self):
"""Aborts the job."""
model.MapreduceControl.abort(self.job_config.job_id)
def get_counters(self):
"""Get counters from this job.
When a job is running, counter values won't be very accurate.
Returns:
An iterator that returns (counter_name, value) pairs of type
(basestring, int)
"""
self.__update_state()
return self._state.counters_map.counters.iteritems()
def get_outputs(self):
"""Get outputs of this job.
Should only call if status is SUCCESS.
Yields:
Iterators, one for each shard. Each iterator is
from the argument of map_job.output_writer.commit_output.
"""
assert self.SUCCESS == self.get_status()
ss = model.ShardState.find_all_by_mapreduce_state(self._state)
for s in ss:
yield iter(s.writer_state.get("outs", []))
@classmethod
def submit(cls, job_config, in_xg_transaction=False):
"""Submit the job to run.
Args:
job_config: an instance of map_job.MapJobConfig.
in_xg_transaction: controls what transaction scope to use to start this MR
job. If True, there has to be an already opened cross-group transaction
scope. MR will use one entity group from it.
If False, MR will create an independent transaction to start the job
regardless of any existing transaction scopes.
Returns:
a Job instance representing the submitted job.
"""
cls.__validate_job_config(job_config)
mapper_spec = job_config._get_mapper_spec()
mapreduce_params = job_config._get_mr_params()
mapreduce_spec = model.MapreduceSpec(
job_config.job_name,
job_config.job_id,
mapper_spec.to_json(),
mapreduce_params,
util._obj_to_path(job_config._hooks_cls))
if in_xg_transaction:
propagation = db.MANDATORY
else:
propagation = db.INDEPENDENT
state = None
@db.transactional(propagation=propagation)
def _txn():
state = cls.__create_and_save_state(job_config, mapreduce_spec)
cls.__add_kickoff_task(job_config, mapreduce_spec)
return state
state = _txn()
return cls(state)
def __update_state(self):
"""Fetches most up to date state from db."""
if self._state.active:
self._state = self.__get_state_by_id(self.job_config.job_id)
@classmethod
def __get_state_by_id(cls, job_id):
"""Get job state by id.
Args:
job_id: job id.
Returns:
model.MapreduceState for the job.
Raises:
ValueError: if the job state is missing.
"""
state = model.MapreduceState.get_by_job_id(job_id)
if state is None:
raise ValueError("Job state for job %s is missing." % job_id)
return state
@classmethod
def __validate_job_config(cls, job_config):
job_config.input_reader_cls.validate(job_config)
if job_config.output_writer_cls:
job_config.output_writer_cls.validate(job_config._get_mapper_spec())
@classmethod
def __create_and_save_state(cls, job_config, mapreduce_spec):
"""Save map job state to datastore.
Save state to datastore so that UI can see it immediately.
Args:
job_config: map_job.JobConfig.
mapreduce_spec: model.MapreduceSpec.
Returns:
model.MapreduceState for this job.
"""
state = model.MapreduceState.create_new(job_config.job_id)
state.mapreduce_spec = mapreduce_spec
state.active = True
state.active_shards = 0
state.app_id = job_config._app
config = datastore_rpc.Configuration(force_writes=job_config._force_writes)
state.put(config=config)
return state
@classmethod
def __add_kickoff_task(cls, job_config, mapreduce_spec):
"""Add kickoff task to taskqueue.
Args:
job_config: map_job.JobConfig.
mapreduce_spec: model.MapreduceSpec,
"""
params = {"mapreduce_id": job_config.job_id}
kickoff_task = taskqueue.Task(
url=job_config._base_path + "/kickoffjob_callback/" + job_config.job_id,
headers=util._get_task_headers(job_config.job_id),
params=params)
if job_config._hooks_cls:
hooks = job_config._hooks_cls(mapreduce_spec)
try:
hooks.enqueue_kickoff_task(kickoff_task, job_config.queue_name)
return
except NotImplementedError:
pass
kickoff_task.add(job_config.queue_name, transactional=True)