blob: 785d8bae204d68eccaffbaba7140fe7e49f43110 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Output writer interface for map job."""
from google.appengine.ext.mapreduce import errors
from google.appengine.ext.mapreduce import json_util
from google.appengine.ext.mapreduce import shard_life_cycle
COUNTER_IO_WRITE_BYTES = "io-write-bytes"
COUNTER_IO_WRITE_MSEC = "io-write-msec"
class OutputWriter(shard_life_cycle._ShardLifeCycle, json_util.JsonMixin):
"""Abstract base class for output writers.
OutputWriter's lifecycle:
0) validate() is called to validate JobConfig.
1) create() is called, which should create a new instance of output
writer for the given shard
2) beging_shard/end_shard/begin_slice/end_slice are called at the time
implied by the names.
3) from_json()/to_json() are used to persist writer's state across
multiple slices.
4) write() method is called with data yielded by JobConfig.mapper.
"""
def __init__(self):
self._slice_ctx = None
@classmethod
def validate(cls, job_config):
"""Validates relevant parameters.
This method can validate fields which it deems relevant.
Args:
job_config: an instance of map_job.JobConfig.
Raises:
errors.BadWriterParamsError: required parameters are missing or invalid.
"""
if job_config.output_writer_cls != cls:
raise errors.BadWriterParamsError(
"Expect output writer class %r, got %r." %
(cls, job_config.output_writer_cls))
@classmethod
def from_json(cls, state):
"""Creates an instance of the OutputWriter for the given json state.
No RPC should take place in this method. Use start_slice/end_slice instead.
Args:
state: The output writer state as returned by to_json.
Returns:
An instance of the OutputWriter that can resume writing.
"""
raise NotImplementedError("from_json() not implemented in %s" % cls)
def to_json(self):
"""Returns writer state.
No RPC should take place in this method. Use start_slice/end_slice instead.
Returns:
A json-serializable state for the OutputWriter instance.
"""
raise NotImplementedError("to_json() not implemented in %s" %
type(self))
@classmethod
def create(cls, shard_ctx):
"""Create new writer for a shard.
Args:
shard_ctx: map_job_context.ShardContext for this shard.
"""
raise NotImplementedError("create() not implemented in %s" % cls)
def write(self, data):
"""Write data.
Args:
data: actual data yielded from handler. User is responsible to match the
type expected by this writer to the type yielded by mapper.
"""
raise NotImplementedError("write() not implemented in %s" %
self.__class__)
@classmethod
def commit_output(cls, shard_ctx, iterator):
"""Saves output references when a shard finishes.
Inside end_shard(), an output writer can optionally use this method
to persist some references to the outputs from this shard
(e.g a list of filenames)
Args:
shard_ctx: map_job_context.ShardContext for this shard.
iterator: an iterator that yields json serializable
references to the outputs from this shard.
Contents from the iterator can be accessible later via
map_job.Job.get_outputs.
"""
outs = tuple(iterator)
shard_ctx._state.writer_state["outs"] = outs
def begin_slice(self, slice_ctx):
"""Keeps an internal reference to slice_ctx.
Args:
slice_ctx: SliceContext singleton instance for this slice.
"""
self._slice_ctx = slice_ctx
def end_slice(self, slice_ctx):
"""Drops the internal reference to slice_ctx.
Args:
slice_ctx: SliceContext singleton instance for this slice.
"""
self._slice_ctx = None
def _supports_slice_recovery(self, mapper_spec):
"""Whether this output writer supports slice recovery.
Args:
mapper_spec: instance of model.MapperSpec.
Returns:
True if it does. False otherwise.
"""
return False
def _recover(self, mr_spec, shard_number, shard_attempt):
"""Create a new output writer instance from the old one.
This method is called when _supports_slice_recovery returns True,
and when there is a chance the old output writer instance is out of sync
with its storage medium due to a retry of a slice. _recover should
create a new instance based on the old one. When end_shard is called
on the new instance, it could combine valid outputs from all instances
to generate the final output. How the new instance maintains references
to previous outputs is up to implementation.
Any exception during recovery is subject to normal slice/shard retry.
So recovery logic must be idempotent.
Args:
mr_spec: an instance of model.MapreduceSpec describing current job.
shard_number: int shard number.
shard_attempt: int shard attempt.
Returns:
a new instance of output writer.
"""
raise NotImplementedError()