| #!/usr/bin/env python |
| # |
| # Copyright 2007 Google Inc. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| """Mapreduce shuffler implementation.""" |
| |
| from __future__ import with_statement |
| |
| |
| |
| __all__ = [ |
| "ShufflePipeline", |
| ] |
| |
| import gc |
| import heapq |
| import logging |
| import os |
| import time |
| |
| from appengine_pipeline.src import pipeline |
| from appengine_pipeline.src.pipeline import common as pipeline_common |
| from google.appengine.api import files |
| from google.appengine.api import modules |
| from google.appengine.api.files import file_service_pb |
| from google.appengine.ext import db |
| from google.appengine.ext.mapreduce import context |
| from google.appengine.ext.mapreduce import errors |
| from google.appengine.ext.mapreduce import input_readers |
| from google.appengine.ext.mapreduce import mapper_pipeline |
| from google.appengine.ext.mapreduce import operation |
| from google.appengine.ext.mapreduce import output_writers |
| from google.appengine.ext.mapreduce import pipeline_base |
| from google.appengine.ext.mapreduce import records |
| |
| |
| |
| |
| |
| class _OutputFile(db.Model): |
| """Entity to store output filenames of pipelines. |
| |
| These entities are always children of key returned by get_root_key(). |
| """ |
| |
| @classmethod |
| def kind(cls): |
| """Returns entity kind.""" |
| return "_GAE_MR_OutputFile" |
| |
| @classmethod |
| def get_root_key(cls, job_id): |
| """Get root key to store output files. |
| |
| Args: |
| job_id: pipeline's job id. |
| |
| Returns: |
| root key for a given job id to store output file entities. |
| """ |
| return db.Key.from_path(cls.kind(), job_id) |
| |
| |
| def _compare_keys(key_record1, key_record2): |
| """Compare two (key, records) protos by key.""" |
| return cmp(key_record1[0], key_record2[0]) |
| |
| |
| class _BatchRecordsReader(input_readers.RecordsReader): |
| """Records reader that reads in big batches.""" |
| |
| BATCH_SIZE = 1024*1024 * 3 |
| |
| def __iter__(self): |
| records = [] |
| size = 0 |
| for record in input_readers.RecordsReader.__iter__(self): |
| records.append(record) |
| size += len(record) |
| if size > self.BATCH_SIZE: |
| yield records |
| size = 0 |
| records = [] |
| gc.collect() |
| if records: |
| yield records |
| records = [] |
| gc.collect() |
| |
| |
| def _sort_records_map(records): |
| """Map function sorting records. |
| |
| Converts records to KeyValue protos, sorts them by key and writes them |
| into new blobstore file. Creates _OutputFile entity to record resulting |
| file name. |
| |
| Args: |
| records: list of records which are serialized KeyValue protos. |
| """ |
| ctx = context.get() |
| l = len(records) |
| key_records = [None] * l |
| |
| logging.debug("Parsing") |
| for i in range(l): |
| proto = file_service_pb.KeyValue() |
| proto.ParseFromString(records[i]) |
| key_records[i] = (proto.key(), records[i]) |
| |
| logging.debug("Sorting") |
| key_records.sort(cmp=_compare_keys) |
| |
| logging.debug("Writing") |
| blob_file_name = (ctx.mapreduce_spec.name + "-" + |
| ctx.mapreduce_id + "-output") |
| output_path = files.blobstore.create( |
| _blobinfo_uploaded_filename=blob_file_name) |
| with output_writers.RecordsPool(output_path, ctx=ctx) as pool: |
| for key_record in key_records: |
| pool.append(key_record[1]) |
| |
| logging.debug("Finalizing") |
| files.finalize(output_path) |
| output_path = files.blobstore.get_file_name( |
| files.blobstore.get_blob_key(output_path)) |
| |
| entity = _OutputFile(key_name=output_path, |
| parent=_OutputFile.get_root_key(ctx.mapreduce_id)) |
| entity.put() |
| |
| |
| class _SortChunksPipeline(pipeline_base.PipelineBase): |
| """A pipeline to sort multiple key-value files. |
| |
| Args: |
| job_name: root job name. |
| filenames: list of filenames to sort. |
| |
| Returns: |
| The list of lists of sorted filenames. Each list corresponds to one |
| input file. Each filenames contains a chunk of sorted data. |
| """ |
| def run(self, job_name, filenames): |
| sort_mappers = [] |
| for i in range(len(filenames)): |
| filename = filenames[i] |
| sort_mapper = yield mapper_pipeline.MapperPipeline( |
| "%s-shuffle-sort-%s" % (job_name, str(i)), |
| __name__ + "._sort_records_map", |
| __name__ + "._BatchRecordsReader", |
| None, |
| { |
| "files": [filename], |
| "processing_rate": 1000000, |
| }, |
| shards=1) |
| sort_mappers.append(sort_mapper) |
| with pipeline.After(*sort_mappers): |
| job_ids = yield pipeline_common.Append(*[mapper.job_id for mapper in |
| sort_mappers]) |
| result = yield _CollectOutputFiles(job_ids) |
| with pipeline.After(result): |
| yield _CleanupOutputFiles(job_ids) |
| yield pipeline_common.Return(result) |
| |
| |
| class _CollectOutputFiles(pipeline_base.PipelineBase): |
| """Collect output file names from _OutputFile entities for given jobs. |
| |
| Args: |
| job_ids: list of job ids to load filenames. |
| |
| Returns: |
| list of lists of filenames produced by specified job ids. |
| """ |
| |
| def run(self, job_ids): |
| result = [] |
| for job_id in job_ids: |
| entities = _OutputFile.all().ancestor(_OutputFile.get_root_key(job_id)) |
| result.append([entity.key().name() for entity in entities]) |
| return result |
| |
| |
| class _CleanupOutputFiles(pipeline_base.PipelineBase): |
| """Cleanup _OutputFile entities for given job ids. |
| |
| Args: |
| job_ids: list of job ids. |
| """ |
| |
| def run(self, job_ids): |
| result = [] |
| for job_id in job_ids: |
| db.delete(_OutputFile.all().ancestor(_OutputFile.get_root_key(job_id))) |
| |
| |
| class _MergingReader(input_readers.InputReader): |
| """Reader which merge-reads multiple sorted KeyValue files. |
| |
| Reads list of lists of filenames. Each filename list constitutes one shard |
| and is merged together. |
| |
| Yields (key, values) tuple. If none of the max_values_count and |
| max_values_size parameters are not specified, then there will be a single key. |
| Otherwise multiple (key, values) pairs for the same key will be created, |
| according to restrictions. |
| """ |
| |
| expand_parameters = True |
| |
| FILES_PARAM = "files" |
| MAX_VALUES_COUNT_PARAM = "max_values_count" |
| MAX_VALUES_SIZE_PARAM = "max_values_size" |
| |
| def __init__(self, |
| offsets, |
| max_values_count, |
| max_values_size): |
| """Constructor. |
| |
| Args: |
| offsets: offsets for each input file to start from as list of ints. |
| max_values_count: maximum number of values to yield for a single value at |
| a time. Ignored if -1. |
| max_values_size: maximum total size of yielded values. Ignored if -1 |
| """ |
| self._offsets = offsets |
| self._max_values_count = max_values_count |
| self._max_values_size = max_values_size |
| |
| def __iter__(self): |
| """Iterate over records in input files. |
| |
| self._offsets is always correctly updated so that stopping iterations |
| doesn't skip records and doesn't read the same record twice. |
| """ |
| ctx = context.get() |
| mapper_spec = ctx.mapreduce_spec.mapper |
| shard_number = ctx._shard_state.shard_number |
| filenames = mapper_spec.params[self.FILES_PARAM][shard_number] |
| |
| if len(filenames) != len(self._offsets): |
| raise Exception("Files list and offsets do not match.") |
| |
| |
| readers = [] |
| |
| |
| for (i, filename) in enumerate(filenames): |
| offset = self._offsets[i] |
| reader = records.RecordsReader(files.BufferedFile(filename)) |
| reader.seek(offset) |
| readers.append((None, None, i, reader)) |
| |
| |
| |
| |
| |
| current_result = None |
| current_count = 0 |
| current_size = 0 |
| while readers: |
| (key, value, index, reader) = readers[0] |
| |
| if key is not None: |
| current_count += 1 |
| current_size += len(value) |
| |
| should_yield = False |
| if current_result: |
| if key != current_result[0]: |
| |
| should_yield = True |
| elif (self._max_values_count != -1 and |
| current_count >= self._max_values_count): |
| |
| current_result[2] = True |
| should_yield = True |
| elif (self._max_values_size != -1 and |
| current_size >= self._max_values_size): |
| |
| current_result[2] = True |
| should_yield = True |
| |
| if should_yield: |
| |
| yield current_result |
| if not current_result or should_yield: |
| current_result = [key, [], False] |
| current_count = 0 |
| current_size = 0 |
| current_result[1].append(value) |
| |
| |
| try: |
| self._offsets[index] = reader.tell() |
| start_time = time.time() |
| binary_record = reader.read() |
| |
| if context.get(): |
| operation.counters.Increment( |
| input_readers.COUNTER_IO_READ_BYTES, |
| len(binary_record))(context.get()) |
| operation.counters.Increment( |
| input_readers.COUNTER_IO_READ_MSEC, |
| int((time.time() - start_time) * 1000))(context.get()) |
| proto = file_service_pb.KeyValue() |
| proto.ParseFromString(binary_record) |
| |
| heapq.heapreplace(readers, |
| (proto.key(), proto.value(), index, reader)) |
| except EOFError: |
| heapq.heappop(readers) |
| |
| |
| if current_result: |
| yield current_result |
| |
| @classmethod |
| def from_json(cls, json): |
| """Restore reader from json state.""" |
| return cls(json["offsets"], |
| json["max_values_count"], |
| json["max_values_size"]) |
| |
| def to_json(self): |
| """Serialize reader state to json.""" |
| return {"offsets": self._offsets, |
| "max_values_count": self._max_values_count, |
| "max_values_size": self._max_values_size} |
| |
| @classmethod |
| def split_input(cls, mapper_spec): |
| """Split input into multiple shards.""" |
| filelists = mapper_spec.params[cls.FILES_PARAM] |
| max_values_count = mapper_spec.params.get(cls.MAX_VALUES_COUNT_PARAM, -1) |
| max_values_size = mapper_spec.params.get(cls.MAX_VALUES_SIZE_PARAM, -1) |
| return [cls([0] * len(files), max_values_count, max_values_size) |
| for files in filelists] |
| |
| @classmethod |
| def validate(cls, mapper_spec): |
| """Validate reader parameters in mapper_spec.""" |
| if mapper_spec.input_reader_class() != cls: |
| raise errors.BadReaderParamsError("Input reader class mismatch") |
| params = mapper_spec.params |
| if not cls.FILES_PARAM in params: |
| raise errors.BadReaderParamsError("Missing files parameter.") |
| |
| |
| class _HashingBlobstoreOutputWriter(output_writers.BlobstoreOutputWriterBase): |
| """An OutputWriter which outputs data into blobstore in key-value format. |
| |
| The output is tailored towards shuffler needs. It shards key/values using |
| key hash modulo number of output files. |
| """ |
| |
| def __init__(self, filenames): |
| """Constructor. |
| |
| Args: |
| filenames: list of filenames that this writer outputs to. |
| """ |
| self._filenames = filenames |
| |
| @classmethod |
| def validate(cls, mapper_spec): |
| """Validates mapper specification. |
| |
| Args: |
| mapper_spec: an instance of model.MapperSpec to validate. |
| """ |
| if mapper_spec.output_writer_class() != cls: |
| raise errors.BadWriterParamsError("Output writer class mismatch") |
| |
| @classmethod |
| def init_job(cls, mapreduce_state): |
| """Initialize job-level writer state. |
| |
| Args: |
| mapreduce_state: an instance of model.MapreduceState describing current |
| job. State can be modified during initialization. |
| """ |
| shards = mapreduce_state.mapreduce_spec.mapper.shard_count |
| |
| filenames = [] |
| for i in range(shards): |
| blob_file_name = (mapreduce_state.mapreduce_spec.name + |
| "-" + mapreduce_state.mapreduce_spec.mapreduce_id + |
| "-output-" + str(i)) |
| filenames.append( |
| files.blobstore.create( |
| _blobinfo_uploaded_filename=blob_file_name)) |
| mapreduce_state.writer_state = {"filenames": filenames} |
| |
| @classmethod |
| def finalize_job(cls, mapreduce_state): |
| """Finalize job-level writer state. |
| |
| Args: |
| mapreduce_state: an instance of model.MapreduceState describing current |
| job. State can be modified during finalization. |
| """ |
| finalized_filenames = [] |
| for filename in mapreduce_state.writer_state["filenames"]: |
| files.finalize(filename) |
| finalized_filenames.append( |
| files.blobstore.get_file_name( |
| files.blobstore.get_blob_key(filename))) |
| mapreduce_state.writer_state = {"filenames": finalized_filenames} |
| |
| @classmethod |
| def from_json(cls, json): |
| """Creates an instance of the OutputWriter for the given json state. |
| |
| Args: |
| json: The OutputWriter state as a dict-like object. |
| |
| Returns: |
| An instance of the OutputWriter configured using the values of json. |
| """ |
| return cls(json["filenames"]) |
| |
| def to_json(self): |
| """Returns writer state to serialize in json. |
| |
| Returns: |
| A json-izable version of the OutputWriter state. |
| """ |
| return {"filenames": self._filenames} |
| |
| @classmethod |
| def create(cls, mr_spec, shard_number, shard_attempt, _writer_state=None): |
| """Inherit docs.""" |
| return cls(_writer_state["filenames"]) |
| |
| @classmethod |
| def get_filenames(cls, mapreduce_state): |
| """See parent class.""" |
| if mapreduce_state.writer_state: |
| return mapreduce_state.writer_state["filenames"] |
| return [] |
| |
| def finalize(self, ctx, shard_state): |
| pass |
| |
| def write(self, data): |
| """Write data. |
| |
| Args: |
| data: actual data yielded from handler. Type is writer-specific. |
| """ |
| ctx = context.get() |
| if len(data) != 2: |
| logging.error("Got bad tuple of length %d (2-tuple expected): %s", |
| len(data), data) |
| |
| try: |
| key = str(data[0]) |
| value = str(data[1]) |
| except TypeError: |
| logging.error("Expecting a tuple, but got %s: %s", |
| data.__class__.__name__, data) |
| |
| file_index = key.__hash__() % len(self._filenames) |
| pool_name = "kv_pool%d" % file_index |
| filename = self._filenames[file_index] |
| |
| if ctx.get_pool(pool_name) is None: |
| ctx.register_pool(pool_name, |
| output_writers.RecordsPool(filename=filename, ctx=ctx)) |
| proto = file_service_pb.KeyValue() |
| proto.set_key(key) |
| proto.set_value(value) |
| ctx.get_pool(pool_name).append(proto.Encode()) |
| |
| |
| class _ShardOutputs(pipeline_base.PipelineBase): |
| """Takes a flat list of filenames, returns a list of lists, each with |
| one member each. |
| """ |
| |
| def run(self, filenames): |
| result = [] |
| for name in filenames: |
| result.append([name]) |
| return result |
| |
| |
| def _merge_map(key, values, partial): |
| """A map function used in merge phase. |
| |
| Stores (key, values) into KeyValues proto and yields its serialization. |
| |
| Args: |
| key: values key. |
| values: values themselves. |
| partial: True if more values for this key will follow. False otherwise. |
| """ |
| proto = file_service_pb.KeyValues() |
| proto.set_key(key) |
| proto.value_list().extend(values) |
| proto.set_partial(partial) |
| yield proto.Encode() |
| |
| |
| class _MergePipeline(pipeline_base.PipelineBase): |
| """Pipeline to merge sorted chunks. |
| |
| This pipeline merges together individually sorted chunks of each shard. |
| |
| Args: |
| filenames: list of lists of filenames. Each list will correspond to a single |
| shard. Each file in the list should have keys sorted and should contain |
| records with KeyValue serialized entity. |
| |
| Returns: |
| The list of filenames, where each filename is fully merged and will contain |
| records with KeyValues serialized entity. |
| """ |
| |
| |
| _MAX_VALUES_COUNT = 100000 |
| |
| _MAX_VALUES_SIZE = 1000000 |
| |
| def run(self, job_name, filenames): |
| yield mapper_pipeline.MapperPipeline( |
| job_name + "-shuffle-merge", |
| __name__ + "._merge_map", |
| __name__ + "._MergingReader", |
| output_writer_spec= |
| output_writers.__name__ + ".BlobstoreRecordsOutputWriter", |
| params={ |
| _MergingReader.FILES_PARAM: filenames, |
| _MergingReader.MAX_VALUES_COUNT_PARAM: self._MAX_VALUES_COUNT, |
| _MergingReader.MAX_VALUES_SIZE_PARAM: self._MAX_VALUES_SIZE, |
| }, |
| shards=len(filenames)) |
| |
| |
| def _hashing_map(binary_record): |
| """A map function used in hash phase. |
| |
| Reads KeyValue from binary record and yields (key, value). |
| """ |
| proto = file_service_pb.KeyValue() |
| proto.ParseFromString(binary_record) |
| yield (proto.key(), proto.value()) |
| |
| |
| class _HashPipeline(pipeline_base.PipelineBase): |
| """A pipeline to read mapper output and hash by key. |
| |
| Args: |
| job_name: root mapreduce job name. |
| filenames: filenames of mapper output. Should be of records format |
| with serialized KeyValue proto. |
| shards: Optional. Number of output shards to generate. Defaults |
| to the number of input files. |
| |
| Returns: |
| The list of filenames. Each file is of records formad with serialized |
| KeyValue proto. For each proto its output file is decided based on key |
| hash. Thus all equal keys would end up in the same file. |
| """ |
| def run(self, job_name, filenames, shards=None): |
| if shards is None: |
| shards = len(filenames) |
| yield mapper_pipeline.MapperPipeline( |
| job_name + "-shuffle-hash", |
| __name__ + "._hashing_map", |
| input_readers.__name__ + ".RecordsReader", |
| output_writer_spec= __name__ + "._HashingBlobstoreOutputWriter", |
| params={'files': filenames}, |
| shards=shards) |
| |
| |
| class _ShuffleServicePipeline(pipeline_base.PipelineBase): |
| """A pipeline to invoke shuffle service. |
| |
| Args: |
| input_files: list of file names to shuffle. |
| |
| Returns: |
| list of shuffled file names. Empty list if there is no input. |
| """ |
| async = True |
| |
| output_names = [ |
| |
| "_output_files", |
| ] |
| |
| def run(self, job_name, input_files): |
| |
| |
| empty = True |
| for filename in input_files: |
| if files.stat(filename).st_size > 0: |
| empty = False |
| break |
| if empty: |
| self.complete([]) |
| return |
| |
| shard_number = len(input_files) |
| output_files = [] |
| for i in range(shard_number): |
| blob_file_name = (job_name + "-shuffle-output-" + str(i)) |
| file_name = files.blobstore.create( |
| _blobinfo_uploaded_filename=blob_file_name) |
| output_files.append(file_name) |
| self.fill(self.outputs._output_files, output_files) |
| |
| |
| |
| target = modules.get_current_version_name() |
| module_name = modules.get_current_module_name() |
| if module_name != "default": |
| |
| |
| |
| target = "%s.%s." % (target, module_name) |
| |
| files.shuffler.shuffle("%s-%s" % (job_name, int(time.time())), |
| input_files, |
| output_files, |
| { |
| "url": self.get_callback_url(), |
| |
| |
| |
| "method": "GET", |
| "queue": self.queue_name, |
| "version": target, |
| }) |
| |
| def callback(self, **kwargs): |
| if "error" in kwargs: |
| self.retry("Error from shuffle service: %s" % kwargs["error"]) |
| return |
| |
| output_files = self.outputs._output_files.value |
| for filename in output_files: |
| files.finalize(filename) |
| |
| finalized_file_names = [] |
| for filename in output_files: |
| finalized_file_names.append( |
| files.blobstore.get_file_name( |
| files.blobstore.get_blob_key(filename))) |
| self.complete(finalized_file_names) |
| |
| def try_cancel(self): |
| return True |
| |
| |
| class ShufflePipeline(pipeline_base.PipelineBase): |
| """A pipeline to shuffle multiple key-value files. |
| |
| Args: |
| job_name: The descriptive name of the overall job. |
| filenames: list of file names to sort. Files have to be of records format |
| defined by Files API and contain serialized file_service_pb.KeyValue |
| protocol messages. |
| shards: Optional. Number of output shards to generate. Defaults |
| to the number of input files. |
| |
| Returns: |
| default: a list of filenames as string. Resulting files contain |
| serialized file_service_pb.KeyValues protocol messages with |
| all values collated to a single key. When there is no output, |
| an empty list from shuffle service or a list of empty files from |
| in memory shuffler. |
| """ |
| |
| def run(self, job_name, filenames, shards=None): |
| hashed_files = yield _HashPipeline(job_name, filenames, shards=shards) |
| sorted_files = yield _SortChunksPipeline(job_name, hashed_files) |
| temp_files = [hashed_files, sorted_files] |
| |
| merged_files = yield _MergePipeline(job_name, sorted_files) |
| |
| with pipeline.After(merged_files): |
| all_temp_files = yield pipeline_common.Extend(*temp_files) |
| yield mapper_pipeline._CleanupPipeline(all_temp_files) |
| |
| yield pipeline_common.Return(merged_files) |