| #!/usr/bin/env python |
| # |
| # Copyright 2007 Google Inc. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| """Mapreduce shuffler implementation.""" |
| |
| from __future__ import with_statement |
| |
| |
| |
| |
| __all__ = [ |
| "ShufflePipeline", |
| ] |
| |
| |
| |
| import gc |
| import heapq |
| import logging |
| import pickle |
| import time |
| |
| from appengine_pipeline.src import pipeline |
| from appengine_pipeline.src.pipeline import common as pipeline_common |
| from google.appengine.api.files import file_service_pb |
| from google.appengine.ext import db |
| from google.appengine.ext.mapreduce import context |
| from google.appengine.ext.mapreduce import errors |
| from google.appengine.ext.mapreduce import input_readers |
| from google.appengine.ext.mapreduce import mapper_pipeline |
| from google.appengine.ext.mapreduce import model |
| from google.appengine.ext.mapreduce import operation |
| from google.appengine.ext.mapreduce import output_writers |
| from google.appengine.ext.mapreduce import pipeline_base |
| from google.appengine.ext.mapreduce import records |
| from google.appengine.ext.mapreduce import util |
| |
| |
| |
| try: |
| |
| from google.appengine.ext import cloudstorage |
| if hasattr(cloudstorage, "_STUB"): |
| cloudstorage = None |
| except ImportError: |
| pass |
| |
| |
| |
| |
| |
| |
| class _OutputFile(db.Model): |
| """Entity to store output filenames of pipelines. |
| |
| These entities are always children of key returned by get_root_key(). |
| """ |
| |
| @classmethod |
| def kind(cls): |
| """Returns entity kind.""" |
| return "_GAE_MR_OutputFile" |
| |
| @classmethod |
| def get_root_key(cls, job_id): |
| """Get root key to store output files. |
| |
| Args: |
| job_id: pipeline's job id. |
| |
| Returns: |
| root key for a given job id to store output file entities. |
| """ |
| return db.Key.from_path(cls.kind(), job_id) |
| |
| |
| def _compare_keys(key_record1, key_record2): |
| """Compare two (key, records) protos by key.""" |
| return cmp(key_record1[0], key_record2[0]) |
| |
| |
| class _BatchGCSRecordsReader( |
| input_readers._GoogleCloudStorageRecordInputReader): |
| """GCS Records reader that reads in big batches.""" |
| |
| BATCH_SIZE = 1024 *1024 * 3 |
| |
| def __iter__(self): |
| |
| records = [] |
| size = 0 |
| try: |
| while True: |
| record = super(_BatchGCSRecordsReader, self).next() |
| records.append(record) |
| size += len(record) |
| if size > self.BATCH_SIZE: |
| yield records |
| size = 0 |
| records = [] |
| gc.collect() |
| except StopIteration: |
| pass |
| if records: |
| yield records |
| records = [] |
| gc.collect() |
| |
| |
| |
| def _sort_records_map(records): |
| """Map function sorting records. |
| |
| Converts records to KeyValue protos, sorts them by key and writes them |
| into new GCS file. Creates _OutputFile entity to record resulting |
| file name. |
| |
| Args: |
| records: list of records which are serialized KeyValue protos. |
| """ |
| ctx = context.get() |
| l = len(records) |
| key_records = [None] * l |
| |
| logging.debug("Parsing") |
| for i in range(l): |
| proto = file_service_pb.KeyValue() |
| proto.ParseFromString(records[i]) |
| key_records[i] = (proto.key(), records[i]) |
| |
| logging.debug("Sorting") |
| key_records.sort(cmp=_compare_keys) |
| |
| logging.debug("Writing") |
| mapper_spec = ctx.mapreduce_spec.mapper |
| params = input_readers._get_params(mapper_spec) |
| bucket_name = params.get("bucket_name") |
| filename = (ctx.mapreduce_spec.name + "/" + ctx.mapreduce_id + "/output-" + |
| ctx.shard_id + "-" + str(int(time.time()))) |
| full_filename = "/%s/%s" % (bucket_name, filename) |
| filehandle = cloudstorage.open(full_filename, mode="w") |
| with output_writers.GCSRecordsPool(filehandle, ctx=ctx) as pool: |
| for key_record in key_records: |
| pool.append(key_record[1]) |
| |
| logging.debug("Finalizing") |
| filehandle.close() |
| |
| entity = _OutputFile(key_name=full_filename, |
| parent=_OutputFile.get_root_key(ctx.mapreduce_id)) |
| entity.put() |
| |
| |
| class _SortChunksPipeline(pipeline_base.PipelineBase): |
| """A pipeline to sort multiple key-value files. |
| |
| Args: |
| job_name: root job name. |
| bucket_name: The name of the Google Cloud Storage bucket. |
| filenames: list of a list of filenames (hashed/bucketed) to sort, |
| as produced by _HashingGCSOutputWriter. |
| |
| Returns: |
| The list of lists of sorted filenames. Each list corresponds to each |
| list of input files. Each filenames contains a chunk of sorted data. |
| """ |
| |
| def run(self, job_name, bucket_name, filenames): |
| sort_mappers = [] |
| for i in range(len(filenames)): |
| filenames_only = util.strip_prefix_from_items("/%s/" % bucket_name, |
| filenames[i]) |
| sort_mapper = yield mapper_pipeline.MapperPipeline( |
| "%s-shuffle-sort-%s" % (job_name, str(i)), |
| __name__ + "._sort_records_map", |
| __name__ + "._BatchGCSRecordsReader", |
| None, |
| { |
| "input_reader": { |
| "bucket_name": bucket_name, |
| "objects": filenames_only, |
| }, |
| }, |
| shards=1) |
| sort_mappers.append(sort_mapper) |
| with pipeline.After(*sort_mappers): |
| job_ids = yield pipeline_common.Append(*[mapper.job_id for mapper in |
| sort_mappers]) |
| result = yield _CollectOutputFiles(job_ids) |
| with pipeline.After(result): |
| yield _CleanupOutputFiles(job_ids) |
| yield pipeline_common.Return(result) |
| |
| |
| class _CollectOutputFiles(pipeline_base.PipelineBase): |
| """Collect output file names from _OutputFile entities for given jobs. |
| |
| Args: |
| job_ids: list of job ids to load filenames. |
| |
| Returns: |
| list of lists of filenames produced by specified job ids. |
| """ |
| |
| def run(self, job_ids): |
| result = [] |
| for job_id in job_ids: |
| entities = _OutputFile.all().ancestor(_OutputFile.get_root_key(job_id)) |
| result.append([entity.key().name() for entity in entities]) |
| return result |
| |
| |
| class _CleanupOutputFiles(pipeline_base.PipelineBase): |
| """Cleanup _OutputFile entities for given job ids. |
| |
| Args: |
| job_ids: list of job ids. |
| """ |
| |
| def run(self, job_ids): |
| for job_id in job_ids: |
| db.delete(_OutputFile.all().ancestor(_OutputFile.get_root_key(job_id))) |
| |
| |
| class _MergingReader(input_readers.InputReader): |
| """Reader which merge-reads multiple sorted KeyValue files. |
| |
| Reads list of lists of filenames. Each filename list constitutes one shard |
| and is merged together. |
| |
| Yields (key, values) tuple. If none of the max_values_count and |
| max_values_size parameters are not specified, then there will be a single key. |
| Otherwise multiple (key, values) pairs for the same key will be created, |
| according to restrictions. |
| """ |
| |
| expand_parameters = True |
| |
| FILES_PARAM = "files" |
| MAX_VALUES_COUNT_PARAM = "max_values_count" |
| MAX_VALUES_SIZE_PARAM = "max_values_size" |
| |
| |
| GCS_BUFFER_SIZE = 256 * 1024 |
| |
| def __init__(self, |
| offsets, |
| max_values_count, |
| max_values_size): |
| """Constructor. |
| |
| Args: |
| offsets: offsets for each input file to start from as list of ints. |
| max_values_count: maximum number of values to yield for a single value at |
| a time. Ignored if -1. |
| max_values_size: maximum total size of yielded values. Ignored if -1 |
| """ |
| self._offsets = offsets |
| self._max_values_count = max_values_count |
| self._max_values_size = max_values_size |
| |
| def __iter__(self): |
| """Iterate over records in input files. |
| |
| self._offsets is always correctly updated so that stopping iterations |
| doesn't skip records and doesn't read the same record twice. |
| |
| Raises: |
| Exception: when Files list and offsets do not match. |
| |
| Yields: |
| The result. |
| """ |
| ctx = context.get() |
| mapper_spec = ctx.mapreduce_spec.mapper |
| shard_number = ctx._shard_state.shard_number |
| filenames = mapper_spec.params[self.FILES_PARAM][shard_number] |
| |
| if len(filenames) != len(self._offsets): |
| raise Exception("Files list and offsets do not match.") |
| |
| |
| readers = [] |
| |
| |
| for (i, filename) in enumerate(filenames): |
| offset = self._offsets[i] |
| |
| |
| reader = records.RecordsReader( |
| cloudstorage.open(filename, read_buffer_size=self.GCS_BUFFER_SIZE)) |
| reader.seek(offset) |
| readers.append((None, None, i, reader)) |
| |
| |
| |
| |
| |
| current_result = None |
| current_count = 0 |
| current_size = 0 |
| while readers: |
| (key, value, index, reader) = readers[0] |
| |
| if key is not None: |
| current_count += 1 |
| current_size += len(value) |
| |
| should_yield = False |
| if current_result: |
| if key != current_result[0]: |
| |
| should_yield = True |
| elif (self._max_values_count != -1 and |
| current_count >= self._max_values_count): |
| |
| current_result[2] = True |
| should_yield = True |
| elif (self._max_values_size != -1 and |
| current_size >= self._max_values_size): |
| |
| current_result[2] = True |
| should_yield = True |
| |
| if should_yield: |
| |
| yield current_result |
| if not current_result or should_yield: |
| current_result = [key, [], False] |
| current_count = 0 |
| current_size = 0 |
| current_result[1].append(value) |
| |
| |
| try: |
| self._offsets[index] = reader.tell() |
| start_time = time.time() |
| binary_record = reader.read() |
| |
| if context.get(): |
| operation.counters.Increment( |
| input_readers.COUNTER_IO_READ_BYTES, |
| len(binary_record))(context.get()) |
| operation.counters.Increment( |
| input_readers.COUNTER_IO_READ_MSEC, |
| int((time.time() - start_time) * 1000))(context.get()) |
| proto = file_service_pb.KeyValue() |
| proto.ParseFromString(binary_record) |
| |
| heapq.heapreplace(readers, |
| (proto.key(), proto.value(), index, reader)) |
| except EOFError: |
| heapq.heappop(readers) |
| |
| |
| if current_result: |
| yield current_result |
| |
| @classmethod |
| def from_json(cls, json): |
| """Restore reader from json state.""" |
| return cls(json["offsets"], |
| json["max_values_count"], |
| json["max_values_size"]) |
| |
| def to_json(self): |
| """Serialize reader state to json.""" |
| return {"offsets": self._offsets, |
| "max_values_count": self._max_values_count, |
| "max_values_size": self._max_values_size} |
| |
| @classmethod |
| def split_input(cls, mapper_spec): |
| """Split input into multiple shards.""" |
| filelists = mapper_spec.params[cls.FILES_PARAM] |
| max_values_count = mapper_spec.params.get(cls.MAX_VALUES_COUNT_PARAM, -1) |
| max_values_size = mapper_spec.params.get(cls.MAX_VALUES_SIZE_PARAM, -1) |
| return [cls([0] * len(files), max_values_count, max_values_size) |
| for files in filelists] |
| |
| @classmethod |
| def validate(cls, mapper_spec): |
| """Validate reader parameters in mapper_spec.""" |
| if mapper_spec.input_reader_class() != cls: |
| raise errors.BadReaderParamsError("Input reader class mismatch") |
| params = mapper_spec.params |
| if cls.FILES_PARAM not in params: |
| raise errors.BadReaderParamsError("Missing files parameter.") |
| |
| |
| class _HashingGCSOutputWriter(output_writers.OutputWriter): |
| """An OutputWriter which outputs data into GCS in key-value format. |
| |
| The output is tailored towards shuffler needs. It shards key/values using |
| key hash modulo number of output files. Each shard will hash keys that will |
| be placed in one of shard_count number of files (buckets) specific to that |
| shard. The same key will be hashed to the same logical file across all of |
| the shards. Then the list of all the same logical files will be assembled |
| and a list of those lists will be returned. |
| """ |
| |
| |
| BUCKET_NAME_PARAM = "bucket_name" |
| |
| |
| def __init__(self, filehandles): |
| """Constructor. |
| |
| Args: |
| filehandles: list of file handles that this writer outputs to. |
| """ |
| self._filehandles = filehandles |
| self._pools = [None] * len(filehandles) |
| |
| @classmethod |
| def validate(cls, mapper_spec): |
| """Validates mapper specification. |
| |
| Args: |
| mapper_spec: an instance of model.MapperSpec to validate. |
| Raises: |
| BadWriterParamsError: when Output writer class mismatch. |
| """ |
| if mapper_spec.output_writer_class() != cls: |
| raise errors.BadWriterParamsError("Output writer class mismatch") |
| params = output_writers._get_params(mapper_spec) |
| |
| if cls.BUCKET_NAME_PARAM not in params: |
| raise errors.BadWriterParamsError( |
| "%s is required for the _HashingGCSOutputWriter" % |
| cls.BUCKET_NAME_PARAM) |
| |
| @classmethod |
| def finalize_job(cls, mapreduce_state): |
| """Finalize job-level writer state. |
| |
| Args: |
| mapreduce_state: an instance of model.MapreduceState describing current |
| job. State can be modified during finalization. |
| """ |
| shards = mapreduce_state.mapreduce_spec.mapper.shard_count |
| filenames = [] |
| for _ in range(shards): |
| filenames.append([None] * shards) |
| shard_states = model.ShardState.find_all_by_mapreduce_state(mapreduce_state) |
| for x, shard_state in enumerate(shard_states): |
| shard_filenames = shard_state.writer_state["shard_filenames"] |
| for y in range(shards): |
| filenames[y][x] = shard_filenames[y] |
| mapreduce_state.writer_state = {"filenames": filenames} |
| |
| @classmethod |
| def from_json(cls, json): |
| """Creates an instance of the OutputWriter for the given json state. |
| |
| Args: |
| json: The OutputWriter state as a dict-like object. |
| |
| Returns: |
| An instance of the OutputWriter configured using the values of json. |
| """ |
| return cls(pickle.loads(json["filehandles"])) |
| |
| def to_json(self): |
| """Returns writer state to serialize in json. |
| |
| Returns: |
| A json-izable version of the OutputWriter state. |
| """ |
| |
| |
| |
| for pool in self._pools: |
| if pool is not None: |
| pool.flush(True) |
| return {"filehandles": pickle.dumps(self._filehandles)} |
| |
| @classmethod |
| def create(cls, mr_spec, shard_number, shard_attempt, _writer_state=None): |
| """Inherit docs.""" |
| mapper_spec = mr_spec.mapper |
| params = output_writers._get_params(mapper_spec) |
| bucket_name = params.get(cls.BUCKET_NAME_PARAM) |
| shards = mapper_spec.shard_count |
| |
| filehandles = [] |
| filename = (mr_spec.name + "/" + mr_spec.mapreduce_id + |
| "/shard-" + str(shard_number) + "-bucket-") |
| for i in range(shards): |
| full_filename = "/%s/%s%d" % (bucket_name, filename, i) |
| filehandles.append(cloudstorage.open(full_filename, mode="w")) |
| return cls(filehandles) |
| |
| @classmethod |
| def get_filenames(cls, mapreduce_state): |
| """See parent class.""" |
| if mapreduce_state.writer_state: |
| return mapreduce_state.writer_state["filenames"] |
| return [] |
| |
| def finalize(self, ctx, shard_state): |
| """See parent class.""" |
| filenames = [] |
| for filehandle in self._filehandles: |
| filenames.append(filehandle.name) |
| filehandle.close() |
| shard_state.writer_state = {"shard_filenames": filenames} |
| |
| def write(self, data): |
| """Write data. |
| |
| Args: |
| data: actual data yielded from handler. Type is writer-specific. |
| """ |
| ctx = context.get() |
| if len(data) != 2: |
| logging.error("Got bad tuple of length %d (2-tuple expected): %s", |
| len(data), data) |
| |
| try: |
| key = str(data[0]) |
| value = str(data[1]) |
| except TypeError: |
| logging.error("Expecting a tuple, but got %s: %s", |
| data.__class__.__name__, data) |
| |
| file_index = key.__hash__() % len(self._filehandles) |
| |
| |
| |
| |
| |
| pool = self._pools[file_index] |
| if pool is None: |
| filehandle = self._filehandles[file_index] |
| pool = output_writers.GCSRecordsPool(filehandle=filehandle, ctx=ctx) |
| self._pools[file_index] = pool |
| |
| proto = file_service_pb.KeyValue() |
| proto.set_key(key) |
| proto.set_value(value) |
| pool.append(proto.Encode()) |
| |
| |
| class _ShardOutputs(pipeline_base.PipelineBase): |
| """Shards the ouputs. |
| |
| Takes a flat list of filenames, returns a list of lists, each with |
| one member each. |
| """ |
| |
| def run(self, filenames): |
| result = [] |
| for name in filenames: |
| result.append([name]) |
| return result |
| |
| |
| def _merge_map(key, values, partial): |
| """A map function used in merge phase. |
| |
| Stores (key, values) into KeyValues proto and yields its serialization. |
| |
| Args: |
| key: values key. |
| values: values themselves. |
| partial: True if more values for this key will follow. False otherwise. |
| |
| Yields: |
| The proto. |
| """ |
| proto = file_service_pb.KeyValues() |
| proto.set_key(key) |
| proto.value_list().extend(values) |
| proto.set_partial(partial) |
| yield proto.Encode() |
| |
| |
| class _MergePipeline(pipeline_base.PipelineBase): |
| """Pipeline to merge sorted chunks. |
| |
| This pipeline merges together individually sorted chunks of each shard. |
| |
| Args: |
| filenames: list of lists of filenames. Each list will correspond to a single |
| shard. Each file in the list should have keys sorted and should contain |
| records with KeyValue serialized entity. |
| |
| Yields: |
| The list of filenames, where each filename is fully merged and will contain |
| records with KeyValues serialized entity. |
| """ |
| |
| |
| _MAX_VALUES_COUNT = 100000 |
| |
| _MAX_VALUES_SIZE = 1000000 |
| |
| def run(self, job_name, bucket_name, filenames): |
| yield mapper_pipeline.MapperPipeline( |
| job_name + "-shuffle-merge", |
| __name__ + "._merge_map", |
| __name__ + "._MergingReader", |
| output_writer_spec= |
| output_writers.__name__ + "._GoogleCloudStorageRecordOutputWriter", |
| params={ |
| _MergingReader.FILES_PARAM: filenames, |
| _MergingReader.MAX_VALUES_COUNT_PARAM: self._MAX_VALUES_COUNT, |
| _MergingReader.MAX_VALUES_SIZE_PARAM: self._MAX_VALUES_SIZE, |
| "output_writer": { |
| "bucket_name": bucket_name, |
| }, |
| }, |
| shards=len(filenames)) |
| |
| |
| def _hashing_map(binary_record): |
| """A map function used in hash phase. |
| |
| Reads KeyValue from binary record. |
| |
| Args: |
| binary_record: The binary record. |
| |
| Yields: |
| The (key, value). |
| """ |
| proto = file_service_pb.KeyValue() |
| proto.ParseFromString(binary_record) |
| yield (proto.key(), proto.value()) |
| |
| |
| class _HashPipeline(pipeline_base.PipelineBase): |
| """A pipeline to read mapper output and hash by key. |
| |
| Args: |
| job_name: root mapreduce job name. |
| bucket_name: The name of the Google Cloud Storage bucket. |
| filenames: filenames of mapper output. Should be of records format |
| with serialized KeyValue proto. |
| shards: Optional. Number of output shards to generate. Defaults |
| to the number of input files. |
| |
| Yields: |
| The list of filenames. Each file is of records formad with serialized |
| KeyValue proto. For each proto its output file is decided based on key |
| hash. Thus all equal keys would end up in the same file. |
| """ |
| |
| def run(self, job_name, bucket_name, filenames, shards=None): |
| filenames_only = ( |
| util.strip_prefix_from_items("/%s/" % bucket_name, filenames)) |
| if shards is None: |
| shards = len(filenames) |
| yield mapper_pipeline.MapperPipeline( |
| job_name + "-shuffle-hash", |
| __name__ + "._hashing_map", |
| input_readers.__name__ + "._GoogleCloudStorageRecordInputReader", |
| output_writer_spec=__name__ + "._HashingGCSOutputWriter", |
| params={ |
| "input_reader": { |
| "bucket_name": bucket_name, |
| "objects": filenames_only, |
| }, |
| "output_writer": { |
| "bucket_name": bucket_name, |
| }, |
| }, |
| shards=shards) |
| |
| |
| class ShufflePipeline(pipeline_base.PipelineBase): |
| """A pipeline to shuffle multiple key-value files. |
| |
| Args: |
| job_name: The descriptive name of the overall job. |
| mapper_params: parameters to use for mapper phase. |
| filenames: list of file names to sort. Files have to be of records format |
| defined by Files API and contain serialized file_service_pb.KeyValue |
| protocol messages. The filenames may or may not contain the |
| GCS bucket name in their path. |
| shards: Optional. Number of output shards to generate. Defaults |
| to the number of input files. |
| |
| Returns: |
| default: a list of filenames as string. Resulting files contain |
| serialized file_service_pb.KeyValues protocol messages with |
| all values collated to a single key. When there is no output, |
| an empty list from shuffle service or a list of empty files from |
| in memory shuffler. |
| """ |
| |
| def run(self, job_name, mapper_params, filenames, shards=None): |
| bucket_name = mapper_params["bucket_name"] |
| hashed_files = yield _HashPipeline(job_name, bucket_name, |
| filenames, shards=shards) |
| sorted_files = yield _SortChunksPipeline(job_name, bucket_name, |
| hashed_files) |
| temp_files = [hashed_files, sorted_files] |
| |
| merged_files = yield _MergePipeline(job_name, bucket_name, sorted_files) |
| |
| with pipeline.After(merged_files): |
| all_temp_files = yield pipeline_common.Extend(*temp_files) |
| yield _GCSCleanupPipeline(all_temp_files) |
| |
| yield pipeline_common.Return(merged_files) |
| |
| |
| class _GCSCleanupPipeline(pipeline_base.PipelineBase): |
| """A pipeline to do a cleanup for mapreduce jobs that use GCS. |
| |
| Args: |
| filename_or_list: list of files or file lists to delete. |
| """ |
| |
| |
| _MIN_RETRIES = 5 |
| |
| _MAX_RETRIES = 10 |
| |
| def delete_file_or_list(self, filename_or_list): |
| if isinstance(filename_or_list, list): |
| for filename in filename_or_list: |
| self.delete_file_or_list(filename) |
| else: |
| filename = filename_or_list |
| retry_params = cloudstorage.RetryParams(min_retries=self._MIN_RETRIES, |
| max_retries=self._MAX_RETRIES) |
| |
| try: |
| cloudstorage.delete(filename, retry_params) |
| except: |
| pass |
| |
| def run(self, temp_files): |
| self.delete_file_or_list(temp_files) |