blob: 5b9e46ddc2975c6b54e670307c3717a64df97215 [file] [log] [blame]
# Copyright 2014 Dirk Pranke. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import io
import logging
import sys
from import _TeedStream
is_python3 = bool(sys.version_info.major == 3)
if is_python3: # pragma: python3
# redefining built-in 'unicode' pylint: disable=W0622
unicode = str
class FakeHost(object):
# "too many instance attributes" pylint: disable=R0902
# "redefining built-in" pylint: disable=W0622
# "unused arg" pylint: disable=W0613
python_interpreter = 'python'
is_python3 = bool(sys.version_info.major == 3)
def __init__(self):
self.logger = logging.getLogger()
self.stdin = io.StringIO()
self.stdout = io.StringIO()
self.stderr = io.StringIO()
self.platform = 'linux2'
self.env = {}
self.sep = '/'
self.dirs = set([])
self.files = {}
self.fetches = []
self.fetch_responses = {}
self.written_files = {}
self.last_tmpdir = None
self.current_tmpno = 0
self.mtimes = {}
self.cmds = []
self.cwd = '/tmp'
self._orig_logging_handlers = []
def __getstate__(self):
d = copy.copy(self.__dict__)
del d['stderr']
del d['stdout']
del d['stdin']
del d['logger']
del d['_orig_logging_handlers']
return d
def __setstate__(self, d):
for k, v in d.items():
setattr(self, k, v)
self.logger = logging.getLogger()
self.stdin = io.StringIO()
self.stdout = io.StringIO()
self.stderr = io.StringIO()
def abspath(self, *comps):
relpath = self.join(*comps)
if relpath.startswith('/'):
return relpath
return self.join(self.cwd, relpath)
def add_to_path(self, *comps):
absolute_path = self.abspath(*comps)
if absolute_path not in sys.path:
def basename(self, path):
return path.split(self.sep)[-1]
def call(self, argv, stdin=None, env=None):
return 0, '', ''
def call_inline(self, argv):
def chdir(self, *comps):
path = self.join(*comps)
if not path.startswith('/'):
path = self.join(self.cwd, path)
self.cwd = path
def cpu_count(self):
return 1
def dirname(self, path):
return '/'.join(path.split('/')[:-1])
def exists(self, *comps):
path = self.abspath(*comps)
return ((path in self.files and self.files[path] is not None) or
path in self.dirs)
def files_under(self, top):
files = []
top = self.abspath(top)
for f in self.files:
if self.files[f] is not None and f.startswith(top):
files.append(self.relpath(f, top))
return files
def for_mp(self):
return self
def getcwd(self):
return self.cwd
def getenv(self, key, default=None):
return self.env.get(key, default)
def getpid(self):
return 1
def isdir(self, *comps):
path = self.abspath(*comps)
return path in self.dirs
def isfile(self, *comps):
path = self.abspath(*comps)
return path in self.files and self.files[path] is not None
def join(self, *comps):
p = ''
for c in comps:
if c in ('', '.'):
elif c.startswith('/'):
p = c
elif p:
p += '/' + c
p = c
# Handle ./
p = p.replace('/./', '/')
# Handle ../
while '/..' in p:
comps = p.split('/')
idx = comps.index('..')
comps = comps[:idx-1] + comps[idx+1:]
p = '/'.join(comps)
return p
def maybe_mkdir(self, *comps):
path = self.abspath(self.join(*comps))
if path not in self.dirs:
def mktempfile(self, delete=True):
curno = self.current_tmpno
self.current_tmpno += 1
f = io.StringIO() = '__im_tmp/tmpfile_%u' % curno
return f
def mkdtemp(self, suffix='', prefix='tmp', dir=None, **_kwargs):
if dir is None:
dir = self.sep + '__im_tmp'
curno = self.current_tmpno
self.current_tmpno += 1
self.last_tmpdir = self.join(dir, '%s_%u_%s' % (prefix, curno, suffix))
return self.last_tmpdir
def mtime(self, *comps):
return self.mtimes.get(self.join(*comps), 0)
def print_(self, msg='', end='\n', stream=None):
stream = stream or self.stdout
stream.write(msg + end)
def read_binary_file(self, *comps):
return self._read(comps)
def read_text_file(self, *comps):
return self._read(comps)
def _read(self, comps):
return self.files[self.abspath(*comps)]
def realpath(self, *comps):
return self.abspath(*comps)
def relpath(self, path, start):
return path.replace(start + '/', '')
def remove(self, *comps):
path = self.abspath(*comps)
self.files[path] = None
self.written_files[path] = None
def rmtree(self, *comps):
path = self.abspath(*comps)
for f in self.files:
if f.startswith(path):
self.files[f] = None
self.written_files[f] = None
def terminal_width(self):
return 80
def splitext(self, path):
idx = path.rfind('.')
if idx == -1:
return (path, '')
return (path[:idx], path[idx:])
def time(self):
return 0
def write_binary_file(self, path, contents):
self._write(path, contents)
def write_text_file(self, path, contents):
self._write(path, contents)
def _write(self, path, contents):
full_path = self.abspath(path)
self.files[full_path] = contents
self.written_files[full_path] = contents
def fetch(self, url, data=None, headers=None):
resp = self.fetch_responses.get(url, FakeResponse(unicode(''), url))
self.fetches.append((url, data, headers, resp))
return resp
def _tap_output(self):
self.stdout = _TeedStream(self.stdout)
self.stderr = _TeedStream(self.stderr)
if True:
sys.stdout = self.stdout
sys.stderr = self.stderr
def _untap_output(self):
assert isinstance(self.stdout, _TeedStream)
self.stdout =
self.stderr =
if True:
sys.stdout = self.stdout
sys.stderr = self.stderr
def capture_output(self, divert=True):
self._orig_logging_handlers = self.logger.handlers
if self._orig_logging_handlers:
self.logger.handlers = [logging.StreamHandler(self.stderr)]
def restore_output(self):
assert isinstance(self.stdout, _TeedStream)
out, err = (self.stdout.restore(), self.stderr.restore())
self.logger.handlers = self._orig_logging_handlers
return out, err
class FakeResponse(io.StringIO):
def __init__(self, response, url, code=200):
io.StringIO.__init__(self, response)
self._url = url
self.code = code
def geturl(self):
return self._url
def getcode(self):
return self.code