blob: f37d475c90b7b5b8e5d794b79f25fc1dbf8e7423 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Define file format root."""
from __future__ import with_statement
__all__ = ['FileFormatRoot',
'split']
import copy
from google.appengine.api.files import file as files
from google.appengine.ext.mapreduce import file_formats
from google.appengine.ext.mapreduce import json_util
import google.appengine.ext.mapreduce.file_format_parser as parser
def split(filenames, format_string, shards):
"""Get a FileFormatRoot for each shard.
This method creates a list of FileFormatRoot and assigns each root
some input files. The number of roots is less than or equal to shards.
Args:
filenames: input filenames
format_string: format string from user.
shards: number of shards to split inputs.
Returns:
A list of FileFormatRoot or None if all input files have zero bytes.
"""
parsed_formats = parser.parse(format_string)
sizes = [files.stat(filename).st_size for filename in filenames]
size_per_shard = float(sum(sizes)) / shards
if not size_per_shard:
return
if parsed_formats[0].can_split():
return _deep_split(filenames, size_per_shard, parsed_formats)
else:
return _shallow_split(filenames, size_per_shard, parsed_formats, sizes)
def _shallow_split(filenames, size_per_shard, parsed_formats, sizes):
"""Split files into roots only based on top level file sizes.
This split does not cross file boundary.
"""
roots = []
inputs = []
shard_size = 0
for i, size in enumerate(sizes):
shard_size += size
inputs.append(_FileRange(filenames[i], None))
if shard_size >= size_per_shard:
roots.append(FileFormatRoot(copy.deepcopy(parsed_formats), inputs))
inputs = []
shard_size = 0
if inputs:
roots.append(FileFormatRoot(copy.deepcopy(parsed_formats), inputs))
return roots
def _deep_split(filenames, size_per_shard, parsed_formats):
"""Split files into roots using the first FileFormat.
Deep split can split within a file. It tells the first format how big
a split it wants and the first format will do the actually splitting
because only the first format knows how to operate on this particular
format.
Args:
filenames: a list of input filenames.
size_per_shard: size per shard.
parsed_format: the parsed FileFormats.
Returns:
A list of FileFormatRoot.
"""
roots = []
inputs = []
size_left = size_per_shard
for filename in filenames:
index = 0
with files.open(filename) as f:
cache_for_split = {}
while True:
if size_left <= 0:
roots.append(FileFormatRoot(copy.deepcopy(parsed_formats), inputs))
size_left = size_per_shard
inputs = []
start_index = index
size_left, index = parsed_formats[0].split(size_left,
start_index,
f,
cache_for_split)
if start_index == index:
break
inputs.append(_FileRange(filename, (start_index, index)))
if inputs:
roots.append(FileFormatRoot(copy.deepcopy(parsed_formats), inputs))
return roots
class _FileRange(json_util.JsonMixin):
"""Describe a range of a file to read.
FileFormatRootFactory creates instances of this class and
feeds them to different roots.
"""
FILENAME = 'filename'
RANGE = 'range'
def __init__(self, filename, file_range=None):
"""Init.
Args:
filename: filename in str.
file_range: [start_index, end_index) tuple. This only makes sense for
_FileFormats that support splitting within a file.
It specify the range to read this file.
None means reading the entire file. When defined, what it means
differ for each format. For example, if a file is of zip format,
index specifies the member files to read. If a file is of record
format, index specifies the records to read.
"""
self.filename = filename
self.range = file_range
def to_json(self):
return {self.FILENAME: self.filename,
self.RANGE: self.range}
@classmethod
def from_json(cls, json):
return cls(json[cls.FILENAME], json[cls.RANGE])
class FileFormatRoot(json_util.JsonMixin):
"""FileFormatRoot.
FileFormatRoot takes a list of FileFormats as processing units and
a list of _FileRanges as inputs. It provides an interface to
iterate through all the inputs. All inputs will be processed by each
processing unit in a cascaded manner before being emitted.
The order of the list of FileFormats matters. The last
FileFormat's output is returned by FileFormatRoot.
Each FileFormat asks FileFormatRoot for inputs, which are either outputs
from its previous FileFormat or, in the case of the first FileFormat,
outputs directly from FileFormatRoot.
FileFormats don't know each other. FileFormatRoot coordinates all
their initializations, (de)serialization, and communications.
"""
_INPUTS = 'inputs'
_FORMATS = 'formats'
_FILES_STREAMS = 'files_streams'
def __init__(self, formats, inputs, files_streams_json=None):
"""Init.
Args:
formats: A list of _FileFormats.
inputs: A list of _FileRanges.
init_files_streams: If to initialize files streams to default value.
"""
self._inputs = inputs
self._formats = formats
for i, file_format in enumerate(self._formats):
stream_cls = _RootFilesStream if i == 0 else _FilesStream
if files_streams_json:
file_format._input_files_stream = stream_cls.from_json(
files_streams_json[i], self)
else:
file_format._input_files_stream = stream_cls(i, self)
def __repr__(self):
return str(self.to_json())
def __iter__(self):
return self
def to_json(self):
return {self._INPUTS: [_.to_json() for _ in self._inputs],
self._FORMATS: [_.to_json() for _ in self._formats],
self._FILES_STREAMS:
[_._input_files_stream.to_json() for _ in self._formats]}
@classmethod
def from_json(cls, json):
formats = [file_formats.FORMATS[_json[file_formats.FileFormat._FORMAT]].
from_json(_json) for _json in json[cls._FORMATS]]
root = cls(formats,
[_FileRange.from_json(_) for _ in json[cls._INPUTS]],
json[cls._FILES_STREAMS])
return root
def next(self):
"""Iterate over inputs."""
result = self._formats[-1].next()
self._formats[-1]._input_files_stream.checkpoint()
self._formats[-1].checkpoint()
return result
class _FilesStream(object):
"""Provide FileFormat with a stream of file-like objects as inputs.
Attributes:
current: the current file-like object to read from.
"""
PREVIOUS_OFFSET = 'previous'
INDEX = 'index'
def __init__(self,
index,
file_format_root,
offset=0,
next_func=None):
"""Init.
Args:
file_format_root: the FileFormatRoot this stream should talk to.
index: the index of this stream within the FileFormatRoot.
offset: the offset to start reading current file.
next_func: a function that gives back the next file from the stream.
"""
self._next_file = next_func or file_format_root._formats[index-1].next
self._preprocess = file_format_root._formats[index].preprocess
self._previous_offset = offset
self._index = index
self._current = self._preprocess(self._next_file())
self._current.seek(offset)
def advance(self):
"""Advance _current to the next file-like object.
_FileStream should call this after consumed the current file-like object.
"""
self._previous_offset = 0
self._current.close()
self._current = self._preprocess(self._next_file())
@property
def current(self):
return self._current
def checkpoint(self):
self._previous_offset = self._current.tell()
def to_json(self):
return {self.PREVIOUS_OFFSET: self._previous_offset,
self.INDEX: self._index}
@classmethod
def from_json(cls, json, file_format_root):
return cls(json[cls.INDEX], file_format_root, json[cls.PREVIOUS_OFFSET])
class _RootFilesStream(_FilesStream):
"""Special FilesStream for the first FileFormat"""
PREVIOUS_INPUT_INDEX = 'input_index'
def __init__(self,
index,
file_format_root,
offset=0,
input_index=0):
"""Init.
Args:
index: the index of this stream within the FileFormatRoot.
file_format_root: the FileFormatRoot this stream should talk to.
offset: the offset to start reading current file.
input_index: index of the next input file to read.
"""
self.__inputs = file_format_root._inputs
self.__input_index = input_index
self.__previous_input_index = input_index
self.__file_format_root = file_format_root
super(_RootFilesStream, self).__init__(index,
file_format_root,
offset,
self.next_file)
def next_file(self):
if self.__input_index == len(self.__inputs):
raise StopIteration()
file_input = self.__inputs[self.__input_index]
if file_input.range:
first_format = self.__file_format_root._formats[0]
if not first_format.can_split():
raise ValueError('Input range specified for a non splitable format %s'
% first_format.NAME)
first_format._range = file_input.range
self.__previous_input_index = self.__input_index
self.__input_index += 1
return files.open(file_input.filename, 'r', buffering=-1)
def to_json(self):
result = super(_RootFilesStream, self).to_json()
result[self.PREVIOUS_INPUT_INDEX] = self.__previous_input_index
return result
@classmethod
def from_json(cls, json, file_format_root):
return cls(json[cls.INDEX],
file_format_root,
json[cls.PREVIOUS_OFFSET],
json[cls.PREVIOUS_INPUT_INDEX])