| #!/usr/bin/env python |
| # |
| # Copyright 2007 Google Inc. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| """Output writers for MapReduce.""" |
| |
| from __future__ import with_statement |
| |
| |
| __all__ = [ |
| "BlobstoreOutputWriter", |
| "BlobstoreOutputWriterBase", |
| "BlobstoreRecordsOutputWriter", |
| "FileOutputWriter", |
| "FileOutputWriterBase", |
| "FileRecordsOutputWriter", |
| "KeyValueBlobstoreOutputWriter", |
| "KeyValueFileOutputWriter", |
| "COUNTER_IO_WRITE_BYTES", |
| "COUNTER_IO_WRITE_MSEC", |
| "OutputWriter", |
| "RecordsPool", |
| ] |
| |
| import gc |
| import itertools |
| import logging |
| import time |
| |
| from google.appengine.api import files |
| from google.appengine.api.files import file_service_pb |
| from google.appengine.api.files import records |
| from google.appengine.ext.mapreduce import errors |
| from google.appengine.ext.mapreduce import model |
| from google.appengine.ext.mapreduce import operation |
| |
| |
| |
| COUNTER_IO_WRITE_BYTES = "io-write-bytes" |
| |
| |
| COUNTER_IO_WRITE_MSEC = "io-write-msec" |
| |
| |
| class OutputWriter(model.JsonMixin): |
| """Abstract base class for output writers. |
| |
| Output writers process all mapper handler output, which is not |
| the operation. |
| |
| OutputWriter's lifecycle is the following: |
| 0) validate called to validate mapper specification. |
| 1) init_job is called to initialize any job-level state. |
| 2) create() is called, which should create a new instance of output |
| writer for a given shard |
| 3) from_json()/to_json() are used to persist writer's state across |
| multiple slices. |
| 4) write() method is called to write data. |
| 5) finalize() is called when shard processing is done. |
| 5) finalize_job() is called when job is completed. |
| """ |
| |
| @classmethod |
| def validate(cls, mapper_spec): |
| """Validates mapper specification. |
| |
| Args: |
| mapper_spec: an instance of model.MapperSpec to validate. |
| """ |
| raise NotImplementedError("validate() not implemented in %s" % cls) |
| |
| @classmethod |
| def init_job(cls, mapreduce_state): |
| """Initialize job-level writer state. |
| |
| Args: |
| mapreduce_state: an instance of model.MapreduceState describing current |
| job. State can be modified during initialization. |
| """ |
| raise NotImplementedError("init_job() not implemented in %s" % cls) |
| |
| @classmethod |
| def finalize_job(cls, mapreduce_state): |
| """Finalize job-level writer state. |
| |
| Args: |
| mapreduce_state: an instance of model.MapreduceState describing current |
| job. State can be modified during finalization. |
| """ |
| raise NotImplementedError("finalize_job() not implemented in %s" % cls) |
| |
| @classmethod |
| def from_json(cls, state): |
| """Creates an instance of the OutputWriter for the given json state. |
| |
| Args: |
| state: The OutputWriter state as a dict-like object. |
| |
| Returns: |
| An instance of the OutputWriter configured using the values of json. |
| """ |
| raise NotImplementedError("from_json() not implemented in %s" % cls) |
| |
| def to_json(self): |
| """Returns writer state to serialize in json. |
| |
| Returns: |
| A json-izable version of the OutputWriter state. |
| """ |
| raise NotImplementedError("to_json() not implemented in %s" % |
| self.__class__) |
| |
| @classmethod |
| def create(cls, mapreduce_state, shard_number): |
| """Create new writer for a shard. |
| |
| Args: |
| mapreduce_state: an instance of model.MapreduceState describing current |
| job. State can be modified. |
| shard_number: shard number as integer. |
| """ |
| raise NotImplementedError("create() not implemented in %s" % cls) |
| |
| def write(self, data, ctx): |
| """Write data. |
| |
| Args: |
| data: actual data yielded from handler. Type is writer-specific. |
| ctx: an instance of context.Context. |
| """ |
| raise NotImplementedError("write() not implemented in %s" % |
| self.__class__) |
| |
| def finalize(self, ctx, shard_number): |
| """Finalize writer shard-level state. |
| |
| Args: |
| ctx: an instance of context.Context. |
| shard_number: shard number as integer. |
| """ |
| raise NotImplementedError("finalize() not implemented in %s" % |
| self.__class__) |
| |
| @classmethod |
| def get_filenames(cls, mapreduce_state): |
| """Obtain output filenames from mapreduce state. |
| |
| Args: |
| mapreduce_state: an instance of model.MapreduceState |
| |
| Returns: |
| list of filenames this writer writes to or None if writer |
| doesn't write to a file. |
| """ |
| raise NotImplementedError("get_filenames() not implemented in %s" % cls) |
| |
| |
| _FILES_API_FLUSH_SIZE = 128*1024 |
| |
| |
| _FILES_API_MAX_SIZE = 1000*1024 |
| |
| |
| class _FilePool(object): |
| """Pool of file append operations.""" |
| |
| def __init__(self, flush_size_chars=_FILES_API_FLUSH_SIZE, ctx=None): |
| """Constructor. |
| |
| Args: |
| flush_size_chars: buffer flush size in bytes as int. Internal buffer |
| will be flushed once this size is reached. |
| ctx: mapreduce context as context.Context. Can be null. |
| """ |
| self._flush_size = flush_size_chars |
| self._append_buffer = {} |
| self._size = 0 |
| self._ctx = ctx |
| |
| def __append(self, filename, data): |
| """Append data to the filename's buffer without checks and flushes.""" |
| self._append_buffer[filename] = ( |
| self._append_buffer.get(filename, "") + data) |
| self._size += len(data) |
| |
| def append(self, filename, data): |
| """Append data to a file. |
| |
| Args: |
| filename: the name of the file as string. |
| data: data as string. |
| """ |
| if self._size + len(data) > self._flush_size: |
| self.flush() |
| |
| if len(data) > _FILES_API_MAX_SIZE: |
| raise errors.Error( |
| "Can't write more than %s bytes in one request: " |
| "risk of writes interleaving." % self._flush_size) |
| else: |
| self.__append(filename, data) |
| |
| if self._size > self._flush_size: |
| self.flush() |
| |
| def flush(self): |
| """Flush pool contents.""" |
| start_time = time.time() |
| for filename, data in self._append_buffer.iteritems(): |
| with files.open(filename, "a") as f: |
| if len(data) > self._flush_size: |
| raise errors.Error("Bad data: %s" % len(data)) |
| if self._ctx: |
| operation.counters.Increment( |
| COUNTER_IO_WRITE_BYTES, len(data))(self._ctx) |
| f.write(data) |
| if self._ctx: |
| operation.counters.Increment( |
| COUNTER_IO_WRITE_MSEC, |
| int((time.time() - start_time) * 1000))(self._ctx) |
| self._append_buffer = {} |
| self._size = 0 |
| |
| |
| class _StringWriter(object): |
| """Simple writer for records api that writes to a string buffer.""" |
| |
| def __init__(self): |
| self._buffer = "" |
| |
| def to_string(self): |
| """Convert writer buffer to string.""" |
| return self._buffer |
| |
| def write(self, data): |
| """Write data. |
| |
| Args: |
| data: data to append to the buffer as string. |
| """ |
| self._buffer += data |
| |
| |
| class RecordsPool(object): |
| """Pool of append operations for records files.""" |
| |
| |
| _RECORD_OVERHEAD_BYTES = 10 |
| |
| def __init__(self, filename, |
| flush_size_chars=_FILES_API_FLUSH_SIZE, |
| ctx=None, |
| exclusive=False): |
| """Constructor. |
| |
| Args: |
| filename: file name to write data to as string. |
| flush_size_chars: buffer flush threshold as int. |
| ctx: mapreduce context as context.Context. |
| exclusive: a boolean flag indicating if the pool has an exclusive |
| access to the file. If it is True, then it's possible to write |
| bigger chunks of data. |
| """ |
| self._flush_size = flush_size_chars |
| self._buffer = [] |
| self._size = 0 |
| self._filename = filename |
| self._ctx = ctx |
| self._exclusive = exclusive |
| |
| def append(self, data): |
| """Append data to a file.""" |
| data_length = len(data) |
| if self._size + data_length > self._flush_size: |
| self.flush() |
| |
| if not self._exclusive and data_length > _FILES_API_MAX_SIZE: |
| raise errors.Error( |
| "Too big input %s (%s)." % (data_length, _FILES_API_MAX_SIZE)) |
| else: |
| self._buffer.append(data) |
| self._size += data_length |
| |
| if self._size > self._flush_size: |
| self.flush() |
| |
| def flush(self): |
| """Flush pool contents.""" |
| |
| buf = _StringWriter() |
| with records.RecordsWriter(buf) as w: |
| for record in self._buffer: |
| w.write(record) |
| |
| str_buf = buf.to_string() |
| if not self._exclusive and len(str_buf) > _FILES_API_MAX_SIZE: |
| |
| raise errors.Error( |
| "Buffer too big. Can't write more than %s bytes in one request: " |
| "risk of writes interleaving. Got: %s" % |
| (_FILES_API_MAX_SIZE, len(str_buf))) |
| |
| |
| start_time = time.time() |
| with files.open(self._filename, "a", exclusive_lock=self._exclusive) as f: |
| f.write(str_buf) |
| if self._ctx: |
| operation.counters.Increment( |
| COUNTER_IO_WRITE_BYTES, len(str_buf))(self._ctx) |
| if self._ctx: |
| operation.counters.Increment( |
| COUNTER_IO_WRITE_MSEC, |
| int((time.time() - start_time) * 1000))(self._ctx) |
| |
| |
| self._buffer = [] |
| self._size = 0 |
| gc.collect() |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, atype, value, traceback): |
| self.flush() |
| |
| |
| def _get_output_sharding(mapreduce_state=None, mapper_spec=None): |
| """Get output sharding parameter value from mapreduce state or mapper spec. |
| |
| At least one of the parameters should not be None. |
| |
| Args: |
| mapreduce_state: mapreduce state as model.MapreduceState. |
| mapper_spec: mapper specification as model.MapperSpec |
| """ |
| if mapper_spec: |
| return mapper_spec.params.get( |
| FileOutputWriterBase.OUTPUT_SHARDING_PARAM, |
| FileOutputWriterBase.OUTPUT_SHARDING_NONE).lower() |
| if mapreduce_state: |
| mapper_spec = mapreduce_state.mapreduce_spec.mapper |
| return _get_output_sharding(mapper_spec=mapper_spec) |
| raise errors.Error("Neither mapreduce_state nor mapper_spec specified.") |
| |
| |
| class FileOutputWriterBase(OutputWriter): |
| """Base class for all file output writers.""" |
| |
| |
| OUTPUT_SHARDING_PARAM = "output_sharding" |
| |
| |
| OUTPUT_SHARDING_NONE = "none" |
| |
| |
| OUTPUT_SHARDING_INPUT_SHARDS = "input" |
| |
| OUTPUT_FILESYSTEM_PARAM = "filesystem" |
| |
| GS_BUCKET_NAME_PARAM = "gs_bucket_name" |
| |
| class _State(object): |
| """Writer state. Stored in MapreduceState. |
| |
| State list all files which were created for the job. |
| """ |
| |
| def __init__(self, filenames, request_filenames): |
| """State initializer. |
| |
| Args: |
| filenames: writable or finalized filenames as returned by the files api. |
| request_filenames: filenames as given to the files create api. |
| """ |
| self.filenames = filenames |
| self.request_filenames = request_filenames |
| |
| def to_json(self): |
| return { |
| "filenames": self.filenames, |
| "request_filenames": self.request_filenames |
| } |
| |
| @classmethod |
| def from_json(cls, json): |
| return cls(json["filenames"], json["request_filenames"]) |
| |
| def __init__(self, filename): |
| self._filename = filename |
| |
| @classmethod |
| def validate(cls, mapper_spec): |
| """Validates mapper specification. |
| |
| Args: |
| mapper_spec: an instance of model.MapperSpec to validate. |
| """ |
| if mapper_spec.output_writer_class() != cls: |
| raise errors.BadWriterParamsError("Output writer class mismatch") |
| |
| output_sharding = _get_output_sharding(mapper_spec=mapper_spec) |
| if (output_sharding != cls.OUTPUT_SHARDING_NONE and |
| output_sharding != cls.OUTPUT_SHARDING_INPUT_SHARDS): |
| raise errors.BadWriterParamsError( |
| "Invalid output_sharding value: %s" % output_sharding) |
| |
| filesystem = cls._get_filesystem(mapper_spec) |
| if filesystem not in files.FILESYSTEMS: |
| raise errors.BadWriterParamsError( |
| "Filesystem '%s' is not supported. Should be one of %s" % |
| (filesystem, files.FILESYSTEMS)) |
| if filesystem == files.GS_FILESYSTEM: |
| if not cls.GS_BUCKET_NAME_PARAM in mapper_spec.params: |
| raise errors.BadWriterParamsError( |
| "bucket_name is required for Google store filesystem") |
| else: |
| if mapper_spec.params.get(cls.GS_BUCKET_NAME_PARAM) is not None: |
| raise errors.BadWriterParamsError( |
| "bucket_name can only be provided for Google store filesystem") |
| |
| @classmethod |
| def init_job(cls, mapreduce_state): |
| """Initialize job-level writer state. |
| |
| Args: |
| mapreduce_state: an instance of model.MapreduceState describing current |
| job. |
| """ |
| output_sharding = _get_output_sharding(mapreduce_state=mapreduce_state) |
| mapper_spec = mapreduce_state.mapreduce_spec.mapper |
| mime_type = mapper_spec.params.get("mime_type", "application/octet-stream") |
| filesystem = cls._get_filesystem(mapper_spec=mapper_spec) |
| bucket = mapper_spec.params.get(cls.GS_BUCKET_NAME_PARAM) |
| |
| if output_sharding == cls.OUTPUT_SHARDING_INPUT_SHARDS: |
| number_of_files = mapreduce_state.mapreduce_spec.mapper.shard_count |
| else: |
| number_of_files = 1 |
| |
| filenames = [] |
| request_filenames = [] |
| for i in range(number_of_files): |
| filename = (mapreduce_state.mapreduce_spec.name + "-" + |
| mapreduce_state.mapreduce_spec.mapreduce_id + "-output") |
| if number_of_files > 1: |
| filename += "-" + str(i) |
| if bucket is not None: |
| filename = "%s/%s" % (bucket, filename) |
| request_filenames.append(filename) |
| filenames.append(cls._create_file(filesystem, filename, mime_type)) |
| |
| mapreduce_state.writer_state = cls._State( |
| filenames, request_filenames).to_json() |
| |
| @classmethod |
| def _get_filesystem(cls, mapper_spec): |
| return mapper_spec.params.get(cls.OUTPUT_FILESYSTEM_PARAM, "").lower() |
| |
| @classmethod |
| def _create_file(cls, filesystem, filename, mime_type, **kwargs): |
| """Creates a file and returns its created filename.""" |
| if filesystem == files.BLOBSTORE_FILESYSTEM: |
| return files.blobstore.create(mime_type, filename) |
| elif filesystem == files.GS_FILESYSTEM: |
| return files.gs.create("/gs/%s" % filename, mime_type, **kwargs) |
| else: |
| raise errors.BadWriterParamsError( |
| "Filesystem '%s' is not supported" % filesystem) |
| |
| @classmethod |
| def _get_finalized_filename(cls, fs, create_filename, request_filename): |
| """Returns the finalized filename for the created filename.""" |
| if fs == "blobstore": |
| return files.blobstore.get_file_name( |
| files.blobstore.get_blob_key(create_filename)) |
| elif fs == "gs": |
| return "/gs/" + request_filename |
| else: |
| raise errors.BadWriterParamsError( |
| "Filesystem '%s' is not supported" % fs) |
| |
| @classmethod |
| def finalize_job(cls, mapreduce_state): |
| """Finalize job-level writer state. |
| |
| Args: |
| mapreduce_state: an instance of model.MapreduceState describing current |
| job. |
| """ |
| state = cls._State.from_json(mapreduce_state.writer_state) |
| output_sharding = _get_output_sharding(mapreduce_state=mapreduce_state) |
| filesystem = cls._get_filesystem(mapreduce_state.mapreduce_spec.mapper) |
| finalized_filenames = [] |
| for create_filename, request_filename in itertools.izip( |
| state.filenames, state.request_filenames): |
| if output_sharding != cls.OUTPUT_SHARDING_INPUT_SHARDS: |
| files.finalize(create_filename) |
| finalized_filenames.append(cls._get_finalized_filename(filesystem, |
| create_filename, |
| request_filename)) |
| |
| state.filenames = finalized_filenames |
| state.request_filenames = [] |
| mapreduce_state.writer_state = state.to_json() |
| |
| @classmethod |
| def from_json(cls, state): |
| """Creates an instance of the OutputWriter for the given json state. |
| |
| Args: |
| state: The OutputWriter state as a json object (dict like). |
| |
| Returns: |
| An instance of the OutputWriter configured using the values of json. |
| """ |
| return cls(state["filename"]) |
| |
| def to_json(self): |
| """Returns writer state to serialize in json. |
| |
| Returns: |
| A json-izable version of the OutputWriter state. |
| """ |
| return {"filename": self._filename} |
| |
| @classmethod |
| def create(cls, mapreduce_state, shard_number): |
| """Create new writer for a shard. |
| |
| Args: |
| mapreduce_state: an instance of model.MapreduceState describing current |
| job. |
| shard_number: shard number as integer. |
| """ |
| file_index = 0 |
| output_sharding = _get_output_sharding(mapreduce_state=mapreduce_state) |
| if output_sharding == cls.OUTPUT_SHARDING_INPUT_SHARDS: |
| file_index = shard_number |
| |
| state = cls._State.from_json(mapreduce_state.writer_state) |
| return cls(state.filenames[file_index]) |
| |
| def finalize(self, ctx, shard_number): |
| """Finalize writer shard-level state. |
| |
| Args: |
| ctx: an instance of context.Context. |
| shard_number: shard number as integer. |
| """ |
| mapreduce_spec = ctx.mapreduce_spec |
| output_sharding = _get_output_sharding(mapper_spec=mapreduce_spec.mapper) |
| if output_sharding == self.OUTPUT_SHARDING_INPUT_SHARDS: |
| |
| |
| |
| files.finalize(self._filename) |
| |
| @classmethod |
| def get_filenames(cls, mapreduce_state): |
| """Obtain output filenames from mapreduce state. |
| |
| Args: |
| mapreduce_state: an instance of model.MapreduceState |
| |
| Returns: |
| list of filenames this writer writes to. |
| """ |
| state = cls._State.from_json(mapreduce_state.writer_state) |
| return state.filenames |
| |
| |
| class FileOutputWriter(FileOutputWriterBase): |
| """An implementation of OutputWriter which outputs data into file.""" |
| |
| def write(self, data, ctx): |
| """Write data. |
| |
| Args: |
| data: actual data yielded from handler. Type is writer-specific. |
| ctx: an instance of context.Context. |
| """ |
| if ctx.get_pool("file_pool") is None: |
| ctx.register_pool("file_pool", _FilePool(ctx=ctx)) |
| ctx.get_pool("file_pool").append(self._filename, str(data)) |
| |
| |
| class FileRecordsOutputWriter(FileOutputWriterBase): |
| """A File OutputWriter which outputs data using leveldb log format.""" |
| |
| @classmethod |
| def validate(cls, mapper_spec): |
| """Validates mapper specification. |
| |
| Args: |
| mapper_spec: an instance of model.MapperSpec to validate. |
| """ |
| if cls.OUTPUT_SHARDING_PARAM in mapper_spec.params: |
| raise errors.BadWriterParamsError( |
| "output_sharding should not be specified for %s" % cls.__name__) |
| mapper_spec.params[cls.OUTPUT_SHARDING_PARAM] = ( |
| cls.OUTPUT_SHARDING_INPUT_SHARDS) |
| super(FileRecordsOutputWriter, cls).validate(mapper_spec) |
| |
| def write(self, data, ctx): |
| """Write data. |
| |
| Args: |
| data: actual data yielded from handler. Type is writer-specific. |
| ctx: an instance of context.Context. |
| """ |
| if ctx.get_pool("records_pool") is None: |
| ctx.register_pool("records_pool", |
| |
| |
| RecordsPool(self._filename, ctx=ctx, exclusive=True)) |
| ctx.get_pool("records_pool").append(str(data)) |
| |
| |
| class KeyValueFileOutputWriter(FileRecordsOutputWriter): |
| """A file output writer for KeyValue records.""" |
| |
| def write(self, data, ctx): |
| if len(data) != 2: |
| logging.error("Got bad tuple of length %d (2-tuple expected): %s", |
| len(data), data) |
| |
| try: |
| key = str(data[0]) |
| value = str(data[1]) |
| except TypeError: |
| logging.error("Expecting a tuple, but got %s: %s", |
| data.__class__.__name__, data) |
| |
| proto = file_service_pb.KeyValue() |
| proto.set_key(key) |
| proto.set_value(value) |
| FileRecordsOutputWriter.write(self, proto.Encode(), ctx) |
| |
| |
| class BlobstoreOutputWriterBase(FileOutputWriterBase): |
| """A base class of OutputWriter which outputs data into blobstore.""" |
| |
| @classmethod |
| def _get_filesystem(cls, mapper_spec): |
| return "blobstore" |
| |
| |
| class BlobstoreOutputWriter(FileOutputWriter, BlobstoreOutputWriterBase): |
| """An implementation of OutputWriter which outputs data into blobstore.""" |
| |
| |
| class BlobstoreRecordsOutputWriter(FileRecordsOutputWriter, |
| BlobstoreOutputWriterBase): |
| """An OutputWriter which outputs data into records format.""" |
| |
| |
| class KeyValueBlobstoreOutputWriter(KeyValueFileOutputWriter, |
| BlobstoreOutputWriterBase): |
| """Output writer for KeyValue records files in blobstore.""" |