| #!/usr/bin/env python | 
 | # | 
 | # Copyright 2007 Google Inc. | 
 | # | 
 | # Licensed under the Apache License, Version 2.0 (the "License"); | 
 | # you may not use this file except in compliance with the License. | 
 | # You may obtain a copy of the License at | 
 | # | 
 | #     http://www.apache.org/licenses/LICENSE-2.0 | 
 | # | 
 | # Unless required by applicable law or agreed to in writing, software | 
 | # distributed under the License is distributed on an "AS IS" BASIS, | 
 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
 | # See the License for the specific language governing permissions and | 
 | # limitations under the License. | 
 | # | 
 |  | 
 |  | 
 |  | 
 |  | 
 |  | 
 |  | 
 |  | 
 |  | 
 |  | 
 |  | 
 |  | 
 |  | 
 |  | 
 |  | 
 | """Define file format root.""" | 
 |  | 
 | from __future__ import with_statement | 
 |  | 
 |  | 
 |  | 
 | __all__ = ['FileFormatRoot', | 
 |            'split'] | 
 |  | 
 | import copy | 
 | from google.appengine.api.files import file as files | 
 | from google.appengine.ext.mapreduce import file_formats | 
 | from google.appengine.ext.mapreduce import json_util | 
 | import google.appengine.ext.mapreduce.file_format_parser as parser | 
 |  | 
 |  | 
 | def split(filenames, format_string, shards): | 
 |   """Get a FileFormatRoot for each shard. | 
 |  | 
 |   This method creates a list of FileFormatRoot and assigns each root | 
 |   some input files. The number of roots is less than or equal to shards. | 
 |  | 
 |   Args: | 
 |     filenames: input filenames | 
 |     format_string: format string from user. | 
 |     shards: number of shards to split inputs. | 
 |  | 
 |   Returns: | 
 |     A list of FileFormatRoot or None if all input files have zero bytes. | 
 |   """ | 
 |   parsed_formats = parser.parse(format_string) | 
 |  | 
 |   sizes = [files.stat(filename).st_size for filename in filenames] | 
 |  | 
 |   size_per_shard = float(sum(sizes)) / shards | 
 |   if not size_per_shard: | 
 |     return | 
 |  | 
 |   if parsed_formats[0].can_split(): | 
 |     return _deep_split(filenames, size_per_shard, parsed_formats) | 
 |   else: | 
 |     return _shallow_split(filenames, size_per_shard, parsed_formats, sizes) | 
 |  | 
 |  | 
 | def _shallow_split(filenames, size_per_shard, parsed_formats, sizes): | 
 |   """Split files into roots only based on top level file sizes. | 
 |  | 
 |   This split does not cross file boundary. | 
 |   """ | 
 |   roots = [] | 
 |   inputs = [] | 
 |   shard_size = 0 | 
 |   for i, size in enumerate(sizes): | 
 |     shard_size += size | 
 |     inputs.append(_FileRange(filenames[i], None)) | 
 |     if shard_size >= size_per_shard: | 
 |       roots.append(FileFormatRoot(copy.deepcopy(parsed_formats), inputs)) | 
 |       inputs = [] | 
 |       shard_size = 0 | 
 |  | 
 |   if inputs: | 
 |     roots.append(FileFormatRoot(copy.deepcopy(parsed_formats), inputs)) | 
 |  | 
 |   return roots | 
 |  | 
 |  | 
 | def _deep_split(filenames, size_per_shard, parsed_formats): | 
 |   """Split files into roots using the first FileFormat. | 
 |  | 
 |   Deep split can split within a file. It tells the first format how big | 
 |   a split it wants and the first format will do the actually splitting | 
 |   because only the first format knows how to operate on this particular | 
 |   format. | 
 |  | 
 |   Args: | 
 |     filenames: a list of input filenames. | 
 |     size_per_shard: size per shard. | 
 |     parsed_format: the parsed FileFormats. | 
 |  | 
 |   Returns: | 
 |     A list of FileFormatRoot. | 
 |   """ | 
 |   roots = [] | 
 |   inputs = [] | 
 |   size_left = size_per_shard | 
 |  | 
 |   for filename in filenames: | 
 |     index = 0 | 
 |     with files.open(filename) as f: | 
 |       cache_for_split = {} | 
 |  | 
 |       while True: | 
 |         if size_left <= 0: | 
 |  | 
 |           roots.append(FileFormatRoot(copy.deepcopy(parsed_formats), inputs)) | 
 |           size_left = size_per_shard | 
 |           inputs = [] | 
 |         start_index = index | 
 |         size_left, index = parsed_formats[0].split(size_left, | 
 |                                                    start_index, | 
 |                                                    f, | 
 |                                                    cache_for_split) | 
 |  | 
 |         if start_index == index: | 
 |           break | 
 |         inputs.append(_FileRange(filename, (start_index, index))) | 
 |  | 
 |   if inputs: | 
 |     roots.append(FileFormatRoot(copy.deepcopy(parsed_formats), inputs)) | 
 |  | 
 |   return roots | 
 |  | 
 |  | 
 | class _FileRange(json_util.JsonMixin): | 
 |   """Describe a range of a file to read. | 
 |  | 
 |   FileFormatRootFactory creates instances of this class and | 
 |   feeds them to different roots. | 
 |   """ | 
 |  | 
 |  | 
 |   FILENAME = 'filename' | 
 |   RANGE = 'range' | 
 |  | 
 |   def __init__(self, filename, file_range=None): | 
 |     """Init. | 
 |  | 
 |     Args: | 
 |       filename: filename in str. | 
 |       file_range: [start_index, end_index) tuple. This only makes sense for | 
 |         _FileFormats that support splitting within a file. | 
 |         It specify the range to read this file. | 
 |         None means reading the entire file. When defined, what it means | 
 |         differ for each format. For example, if a file is of zip format, | 
 |         index specifies the member files to read. If a file is of record | 
 |         format, index specifies the records to read. | 
 |     """ | 
 |     self.filename = filename | 
 |     self.range = file_range | 
 |  | 
 |   def to_json(self): | 
 |     return {self.FILENAME: self.filename, | 
 |             self.RANGE: self.range} | 
 |  | 
 |   @classmethod | 
 |   def from_json(cls, json): | 
 |     return cls(json[cls.FILENAME], json[cls.RANGE]) | 
 |  | 
 |  | 
 | class FileFormatRoot(json_util.JsonMixin): | 
 |   """FileFormatRoot. | 
 |  | 
 |   FileFormatRoot takes a list of FileFormats as processing units and | 
 |   a list of _FileRanges as inputs. It provides an interface to | 
 |   iterate through all the inputs. All inputs will be processed by each | 
 |   processing unit in a cascaded manner before being emitted. | 
 |  | 
 |   The order of the list of FileFormats matters. The last | 
 |   FileFormat's output is returned by FileFormatRoot. | 
 |   Each FileFormat asks FileFormatRoot for inputs, which are either outputs | 
 |   from its previous FileFormat or, in the case of the first FileFormat, | 
 |   outputs directly from FileFormatRoot. | 
 |  | 
 |   FileFormats don't know each other. FileFormatRoot coordinates all | 
 |   their initializations, (de)serialization, and communications. | 
 |   """ | 
 |  | 
 |  | 
 |   _INPUTS = 'inputs' | 
 |   _FORMATS = 'formats' | 
 |   _FILES_STREAMS = 'files_streams' | 
 |  | 
 |   def __init__(self, formats, inputs, files_streams_json=None): | 
 |     """Init. | 
 |  | 
 |     Args: | 
 |       formats: A list of _FileFormats. | 
 |       inputs: A list of _FileRanges. | 
 |       init_files_streams: If to initialize files streams to default value. | 
 |     """ | 
 |     self._inputs = inputs | 
 |     self._formats = formats | 
 |     for i, file_format in enumerate(self._formats): | 
 |       stream_cls = _RootFilesStream if i == 0 else _FilesStream | 
 |       if files_streams_json: | 
 |         file_format._input_files_stream = stream_cls.from_json( | 
 |             files_streams_json[i], self) | 
 |       else: | 
 |         file_format._input_files_stream = stream_cls(i, self) | 
 |  | 
 |   def __repr__(self): | 
 |     return str(self.to_json()) | 
 |  | 
 |   def __iter__(self): | 
 |     return self | 
 |  | 
 |   def to_json(self): | 
 |     return  {self._INPUTS: [_.to_json() for _ in self._inputs], | 
 |              self._FORMATS: [_.to_json() for _ in self._formats], | 
 |              self._FILES_STREAMS: | 
 |              [_._input_files_stream.to_json() for _ in self._formats]} | 
 |  | 
 |   @classmethod | 
 |   def from_json(cls, json): | 
 |     formats = [file_formats.FORMATS[_json[file_formats.FileFormat._FORMAT]]. | 
 |         from_json(_json) for _json in json[cls._FORMATS]] | 
 |  | 
 |     root = cls(formats, | 
 |                [_FileRange.from_json(_) for _ in json[cls._INPUTS]], | 
 |                json[cls._FILES_STREAMS]) | 
 |  | 
 |     return root | 
 |  | 
 |   def next(self): | 
 |     """Iterate over inputs.""" | 
 |     result = self._formats[-1].next() | 
 |     self._formats[-1]._input_files_stream.checkpoint() | 
 |     self._formats[-1].checkpoint() | 
 |     return result | 
 |  | 
 |  | 
 | class _FilesStream(object): | 
 |   """Provide FileFormat with a stream of file-like objects as inputs. | 
 |  | 
 |   Attributes: | 
 |     current: the current file-like object to read from. | 
 |   """ | 
 |  | 
 |  | 
 |   PREVIOUS_OFFSET = 'previous' | 
 |   INDEX = 'index' | 
 |  | 
 |   def __init__(self, | 
 |                index, | 
 |                file_format_root, | 
 |                offset=0, | 
 |                next_func=None): | 
 |     """Init. | 
 |  | 
 |     Args: | 
 |       file_format_root: the FileFormatRoot this stream should talk to. | 
 |       index: the index of this stream within the FileFormatRoot. | 
 |       offset: the offset to start reading current file. | 
 |       next_func: a function that gives back the next file from the stream. | 
 |     """ | 
 |     self._next_file = next_func or file_format_root._formats[index-1].next | 
 |     self._preprocess = file_format_root._formats[index].preprocess | 
 |  | 
 |     self._previous_offset = offset | 
 |     self._index = index | 
 |     self._current = self._preprocess(self._next_file()) | 
 |     self._current.seek(offset) | 
 |  | 
 |   def advance(self): | 
 |     """Advance _current to the next file-like object. | 
 |  | 
 |     _FileStream should call this after consumed the current file-like object. | 
 |     """ | 
 |     self._previous_offset = 0 | 
 |     self._current.close() | 
 |     self._current = self._preprocess(self._next_file()) | 
 |  | 
 |   @property | 
 |   def current(self): | 
 |     return self._current | 
 |  | 
 |   def checkpoint(self): | 
 |     self._previous_offset = self._current.tell() | 
 |  | 
 |   def to_json(self): | 
 |     return {self.PREVIOUS_OFFSET: self._previous_offset, | 
 |             self.INDEX: self._index} | 
 |  | 
 |   @classmethod | 
 |   def from_json(cls, json, file_format_root): | 
 |     return cls(json[cls.INDEX], file_format_root, json[cls.PREVIOUS_OFFSET]) | 
 |  | 
 |  | 
 | class _RootFilesStream(_FilesStream): | 
 |   """Special FilesStream for the first FileFormat""" | 
 |  | 
 |   PREVIOUS_INPUT_INDEX = 'input_index' | 
 |  | 
 |   def __init__(self, | 
 |                index, | 
 |                file_format_root, | 
 |                offset=0, | 
 |                input_index=0): | 
 |     """Init. | 
 |  | 
 |     Args: | 
 |       index: the index of this stream within the FileFormatRoot. | 
 |       file_format_root: the FileFormatRoot this stream should talk to. | 
 |       offset: the offset to start reading current file. | 
 |       input_index: index of the next input file to read. | 
 |     """ | 
 |     self.__inputs = file_format_root._inputs | 
 |     self.__input_index = input_index | 
 |     self.__previous_input_index = input_index | 
 |     self.__file_format_root = file_format_root | 
 |  | 
 |     super(_RootFilesStream, self).__init__(index, | 
 |                                            file_format_root, | 
 |                                            offset, | 
 |                                            self.next_file) | 
 |  | 
 |   def next_file(self): | 
 |     if self.__input_index == len(self.__inputs): | 
 |       raise StopIteration() | 
 |     file_input = self.__inputs[self.__input_index] | 
 |     if file_input.range: | 
 |       first_format = self.__file_format_root._formats[0] | 
 |       if not first_format.can_split(): | 
 |         raise ValueError('Input range specified for a non splitable format %s' | 
 |                          % first_format.NAME) | 
 |       first_format._range = file_input.range | 
 |     self.__previous_input_index = self.__input_index | 
 |     self.__input_index += 1 | 
 |     return files.open(file_input.filename, 'r', buffering=-1) | 
 |  | 
 |   def to_json(self): | 
 |     result = super(_RootFilesStream, self).to_json() | 
 |     result[self.PREVIOUS_INPUT_INDEX] = self.__previous_input_index | 
 |     return result | 
 |  | 
 |   @classmethod | 
 |   def from_json(cls, json, file_format_root): | 
 |     return cls(json[cls.INDEX], | 
 |                file_format_root, | 
 |                json[cls.PREVIOUS_OFFSET], | 
 |                json[cls.PREVIOUS_INPUT_INDEX]) |