blob: 482be75c9ac837f143d8747a34ddb3bee7306101 [file] [log] [blame]
#!/usr/bin/env python
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import json
import logging
import os
import shutil
import subprocess
import sys
import tempfile
import unittest
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, ROOT_DIR)
import run_test_cases
FILENAME = os.path.basename(__file__)
REL_DATA = os.path.join(u'tests', 'trace_inputs')
VERBOSE = False
class CalledProcessError(subprocess.CalledProcessError):
"""Makes 2.6 version act like 2.7"""
def __init__(self, returncode, cmd, output, cwd):
super(CalledProcessError, self).__init__(returncode, cmd)
self.output = output
self.cwd = cwd
def __str__(self):
return super(CalledProcessError, self).__str__() + (
'\n'
'cwd=%s\n%s') % (self.cwd, self.output)
class TraceInputsBase(unittest.TestCase):
def setUp(self):
self.tempdir = tempfile.mkdtemp(prefix='trace_smoke_test')
self.log = os.path.join(self.tempdir, 'log')
self.trace_inputs_path = os.path.join(ROOT_DIR, 'trace_inputs.py')
# Wraps up all the differences between OSes here.
# - Windows doesn't track initial_cwd.
# - OSX replaces /usr/bin/python with /usr/bin/python2.7.
self.cwd = os.path.join(ROOT_DIR, u'tests')
self.initial_cwd = unicode(self.cwd)
self.expected_cwd = unicode(ROOT_DIR)
if sys.platform == 'win32':
# Not supported on Windows.
self.initial_cwd = None
self.expected_cwd = None
# There's 3 kinds of references to python, self.executable,
# self.real_executable and self.naked_executable. It depends how python was
# started.
self.executable = sys.executable
if sys.platform == 'darwin':
# /usr/bin/python is a thunk executable that decides which version of
# python gets executed.
suffix = '.'.join(map(str, sys.version_info[0:2]))
if os.access(self.executable + suffix, os.X_OK):
# So it'll look like /usr/bin/python2.7
self.executable += suffix
import trace_inputs
self.real_executable = trace_inputs.get_native_path_case(
unicode(self.executable))
trace_inputs = None
# self.naked_executable will only be naked on Windows.
self.naked_executable = unicode(sys.executable)
if sys.platform == 'win32':
self.naked_executable = os.path.basename(sys.executable)
def tearDown(self):
if VERBOSE:
print 'Leaking: %s' % self.tempdir
else:
shutil.rmtree(self.tempdir)
@staticmethod
def get_child_command(from_data):
"""Returns command to run the child1.py."""
cmd = [sys.executable]
if from_data:
# When the gyp argument is specified, the command is started from --cwd
# directory. In this case, 'tests'.
cmd.extend([os.path.join('trace_inputs', 'child1.py'), '--child-gyp'])
else:
# When the gyp argument is not specified, the command is started from
# --root-dir directory.
cmd.extend([os.path.join(REL_DATA, 'child1.py'), '--child'])
return cmd
@staticmethod
def _size(*args):
return os.stat(os.path.join(ROOT_DIR, *args)).st_size
class TraceInputs(TraceInputsBase):
def _execute(self, mode, command, cwd):
cmd = [
sys.executable,
self.trace_inputs_path,
mode,
'--log', self.log,
]
if VERBOSE:
cmd.extend(['-v'] * 3)
cmd.extend(command)
logging.info('Command: %s' % ' '.join(cmd))
p = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=cwd,
universal_newlines=True)
out, err = p.communicate()
if VERBOSE:
print err
if p.returncode:
raise CalledProcessError(p.returncode, cmd, out + err, cwd)
return out or ''
def _trace(self, from_data):
if from_data:
cwd = os.path.join(ROOT_DIR, 'tests')
else:
cwd = ROOT_DIR
return self._execute('trace', self.get_child_command(from_data), cwd=cwd)
def test_trace(self):
expected = '\n'.join((
'Total: 7',
'Non existent: 0',
'Interesting: 7 reduced to 6',
' tests/trace_inputs/child1.py'.replace('/', os.path.sep),
' tests/trace_inputs/child2.py'.replace('/', os.path.sep),
' tests/trace_inputs/files1/'.replace('/', os.path.sep),
' tests/trace_inputs/test_file.txt'.replace('/', os.path.sep),
(' tests/%s' % FILENAME).replace('/', os.path.sep),
' trace_inputs.py',
)) + '\n'
trace_expected = '\n'.join((
'child from %s' % ROOT_DIR,
'child2',
)) + '\n'
trace_actual = self._trace(False)
actual = self._execute(
'read',
[
'--root-dir', ROOT_DIR,
'--blacklist', '.+\\.pyc',
'--blacklist', '.*\\.svn',
'--blacklist', '.*do_not_care\\.txt',
],
cwd=ROOT_DIR)
self.assertEquals(expected, actual)
self.assertEquals(trace_expected, trace_actual)
def test_trace_json(self):
expected = {
u'root': {
u'children': [
{
u'children': [],
u'command': [u'python', u'child2.py'],
u'executable': self.naked_executable,
u'files': [
{
u'path': os.path.join(REL_DATA, 'child2.py'),
u'size': self._size(REL_DATA, 'child2.py'),
},
{
u'path': os.path.join(REL_DATA, 'files1', 'bar'),
u'size': self._size(REL_DATA, 'files1', 'bar'),
},
{
u'path': os.path.join(REL_DATA, 'files1', 'foo'),
u'size': self._size(REL_DATA, 'files1', 'foo'),
},
{
u'path': os.path.join(REL_DATA, 'test_file.txt'),
u'size': self._size(REL_DATA, 'test_file.txt'),
},
],
u'initial_cwd': self.initial_cwd,
#u'pid': 123,
},
],
u'command': [
unicode(self.executable),
os.path.join(u'trace_inputs', 'child1.py'),
u'--child-gyp',
],
u'executable': self.real_executable,
u'files': [
{
u'path': os.path.join(REL_DATA, 'child1.py'),
u'size': self._size(REL_DATA, 'child1.py'),
},
{
u'path': os.path.join(u'tests', u'trace_inputs_smoke_test.py'),
u'size': self._size('tests', 'trace_inputs_smoke_test.py'),
},
{
u'path': u'trace_inputs.py',
u'size': self._size('trace_inputs.py'),
},
],
u'initial_cwd': self.initial_cwd,
#u'pid': 123,
},
}
trace_expected = '\n'.join((
'child_gyp from %s' % os.path.join(ROOT_DIR, 'tests'),
'child2',
)) + '\n'
trace_actual = self._trace(True)
actual_text = self._execute(
'read',
[
'--root-dir', ROOT_DIR,
'--blacklist', '.+\\.pyc',
'--blacklist', '.*\\.svn',
'--blacklist', '.*do_not_care\\.txt',
'--json',
],
cwd=ROOT_DIR)
actual_json = json.loads(actual_text)
self.assertEquals(list, actual_json.__class__)
self.assertEquals(1, len(actual_json))
actual_json = actual_json[0]
# Removes the pids.
self.assertTrue(actual_json['root'].pop('pid'))
self.assertTrue(actual_json['root']['children'][0].pop('pid'))
self.assertEquals(expected, actual_json)
self.assertEquals(trace_expected, trace_actual)
class TraceInputsImport(TraceInputsBase):
def setUp(self):
super(TraceInputsImport, self).setUp()
import trace_inputs
self.trace_inputs = trace_inputs
def tearDown(self):
del self.trace_inputs
super(TraceInputsImport, self).tearDown()
# Similar to TraceInputs test fixture except that it calls the function
# directly, so the Results instance can be inspected.
# Roughly, make sure the API is stable.
def _execute_trace(self, command):
# Similar to what trace_test_cases.py does.
api = self.trace_inputs.get_api()
_, _ = self.trace_inputs.trace(
self.log, command, self.cwd, api, True)
# TODO(maruel): Check
#self.assertEquals(0, returncode)
#self.assertEquals('', output)
def blacklist(f):
return f.endswith(('.pyc', '.svn', 'do_not_care.txt'))
return self.trace_inputs.load_trace(self.log, ROOT_DIR, api, blacklist)
def _gen_dict_wrong_path(self):
"""Returns the expected flattened Results when child1.py is called with the
wrong relative path.
"""
return {
'root': {
'children': [],
'command': [
self.executable,
os.path.join(REL_DATA, 'child1.py'),
'--child',
],
'executable': self.real_executable,
'files': [],
'initial_cwd': self.initial_cwd,
},
}
def _gen_dict_full(self):
"""Returns the expected flattened Results when child1.py is called with
--child.
"""
return {
'root': {
'children': [
{
'children': [],
'command': ['python', 'child2.py'],
'executable': self.naked_executable,
'files': [
{
'path': os.path.join(REL_DATA, 'child2.py'),
'size': self._size(REL_DATA, 'child2.py'),
},
{
'path': os.path.join(REL_DATA, 'files1', 'bar'),
'size': self._size(REL_DATA, 'files1', 'bar'),
},
{
'path': os.path.join(REL_DATA, 'files1', 'foo'),
'size': self._size(REL_DATA, 'files1', 'foo'),
},
{
'path': os.path.join(REL_DATA, 'test_file.txt'),
'size': self._size(REL_DATA, 'test_file.txt'),
},
],
'initial_cwd': self.expected_cwd,
},
],
'command': [
self.executable,
os.path.join(REL_DATA, 'child1.py'),
'--child',
],
'executable': self.real_executable,
'files': [
{
'path': os.path.join(REL_DATA, 'child1.py'),
'size': self._size(REL_DATA, 'child1.py'),
},
{
u'path': os.path.join(u'tests', u'trace_inputs_smoke_test.py'),
'size': self._size('tests', 'trace_inputs_smoke_test.py'),
},
{
'path': u'trace_inputs.py',
'size': self._size('trace_inputs.py'),
},
],
'initial_cwd': self.expected_cwd,
},
}
def _gen_dict_full_gyp(self):
"""Returns the expected flattened results when child1.py is called with
--child-gyp.
"""
return {
'root': {
'children': [
{
'children': [],
'command': ['python', 'child2.py'],
'executable': self.naked_executable,
'files': [
{
'path': os.path.join(REL_DATA, 'child2.py'),
'size': self._size(REL_DATA, 'child2.py'),
},
{
'path': os.path.join(REL_DATA, 'files1', 'bar'),
'size': self._size(REL_DATA, 'files1', 'bar'),
},
{
'path': os.path.join(REL_DATA, 'files1', 'foo'),
'size': self._size(REL_DATA, 'files1', 'foo'),
},
{
'path': os.path.join(REL_DATA, 'test_file.txt'),
'size': self._size(REL_DATA, 'test_file.txt'),
},
],
'initial_cwd': self.initial_cwd,
},
],
'command': [
self.executable,
os.path.join('trace_inputs', 'child1.py'),
'--child-gyp',
],
'executable': self.real_executable,
'files': [
{
'path': os.path.join(REL_DATA, 'child1.py'),
'size': self._size(REL_DATA, 'child1.py'),
},
{
'path': os.path.join(u'tests', u'trace_inputs_smoke_test.py'),
'size': self._size('tests', 'trace_inputs_smoke_test.py'),
},
{
'path': u'trace_inputs.py',
'size': self._size('trace_inputs.py'),
},
],
'initial_cwd': self.initial_cwd,
},
}
def test_trace_wrong_path(self):
# Deliberately start the trace from the wrong path. Starts it from the
# directory 'tests' so 'tests/tests/trace_inputs/child1.py' is not
# accessible, so child2.py process is not started.
results = self._execute_trace(self.get_child_command(False))
expected = self._gen_dict_wrong_path()
actual = results.flatten()
self.assertTrue(actual['root'].pop('pid'))
self.assertEquals(expected, actual)
def test_trace(self):
expected = self._gen_dict_full_gyp()
results = self._execute_trace(self.get_child_command(True))
actual = results.flatten()
self.assertTrue(actual['root'].pop('pid'))
self.assertTrue(actual['root']['children'][0].pop('pid'))
self.assertEquals(expected, actual)
files = [
u'tests/trace_inputs/child1.py'.replace('/', os.path.sep),
u'tests/trace_inputs/child2.py'.replace('/', os.path.sep),
u'tests/trace_inputs/files1/'.replace('/', os.path.sep),
u'tests/trace_inputs/test_file.txt'.replace('/', os.path.sep),
u'tests/trace_inputs_smoke_test.py'.replace('/', os.path.sep),
u'trace_inputs.py',
]
def blacklist(f):
return f.endswith(('.pyc', 'do_not_care.txt', '.git', '.svn'))
simplified = self.trace_inputs.extract_directories(
ROOT_DIR, results.files, blacklist)
self.assertEquals(files, [f.path for f in simplified])
def test_trace_multiple(self):
# Starts parallel threads and trace parallel child processes simultaneously.
# Some are started from 'tests' directory, others from this script's
# directory. One trace fails. Verify everything still goes one.
parallel = 8
def trace(tracer, cmd, cwd, tracename):
resultcode, output = tracer.trace(
cmd, cwd, tracename, True)
return (tracename, resultcode, output)
with run_test_cases.ThreadPool(parallel) as pool:
api = self.trace_inputs.get_api()
with api.get_tracer(self.log) as tracer:
pool.add_task(
trace, tracer, self.get_child_command(False), ROOT_DIR, 'trace1')
pool.add_task(
trace, tracer, self.get_child_command(True), self.cwd, 'trace2')
pool.add_task(
trace, tracer, self.get_child_command(False), ROOT_DIR, 'trace3')
pool.add_task(
trace, tracer, self.get_child_command(True), self.cwd, 'trace4')
# Have this one fail since it's started from the wrong directory.
pool.add_task(
trace, tracer, self.get_child_command(False), self.cwd, 'trace5')
pool.add_task(
trace, tracer, self.get_child_command(True), self.cwd, 'trace6')
pool.add_task(
trace, tracer, self.get_child_command(False), ROOT_DIR, 'trace7')
pool.add_task(
trace, tracer, self.get_child_command(True), self.cwd, 'trace8')
trace_results = pool.join()
def blacklist(f):
return f.endswith(('.pyc', 'do_not_care.txt', '.git', '.svn'))
actual_results = api.parse_log(self.log, blacklist)
self.assertEquals(8, len(trace_results))
self.assertEquals(8, len(actual_results))
# Convert to dict keyed on the trace name, simpler to verify.
trace_results = dict((i[0], i[1:]) for i in trace_results)
actual_results = dict((x.pop('trace'), x) for x in actual_results)
self.assertEquals(sorted(trace_results), sorted(actual_results))
# It'd be nice to start different kinds of processes.
expected_results = [
self._gen_dict_full(),
self._gen_dict_full_gyp(),
self._gen_dict_full(),
self._gen_dict_full_gyp(),
self._gen_dict_wrong_path(),
self._gen_dict_full_gyp(),
self._gen_dict_full(),
self._gen_dict_full_gyp(),
]
self.assertEquals(len(expected_results), len(trace_results))
# See the comment above about the trace that fails because it's started from
# the wrong directory.
busted = 4
for index, key in enumerate(sorted(actual_results)):
self.assertEquals('trace%d' % (index + 1), key)
self.assertEquals(2, len(trace_results[key]))
# returncode
self.assertEquals(0 if index != busted else 2, trace_results[key][0])
# output
self.assertEquals(actual_results[key]['output'], trace_results[key][1])
self.assertEquals(['output', 'results'], sorted(actual_results[key]))
results = actual_results[key]['results']
results = results.strip_root(ROOT_DIR)
actual = results.flatten()
self.assertTrue(actual['root'].pop('pid'))
if index != busted:
self.assertTrue(actual['root']['children'][0].pop('pid'))
self.assertEquals(expected_results[index], actual)
if sys.platform != 'win32':
def test_trace_symlink(self):
expected = {
'root': {
'children': [],
'command': [
self.executable,
os.path.join('trace_inputs', 'symlink.py'),
],
'executable': self.real_executable,
'files': [
{
'path': os.path.join(REL_DATA, 'files2', 'bar'),
'size': self._size(REL_DATA, 'files2', 'bar'),
},
{
'path': os.path.join(REL_DATA, 'files2', 'foo'),
'size': self._size(REL_DATA, 'files2', 'foo'),
},
{
'path': os.path.join(REL_DATA, 'symlink.py'),
'size': self._size(REL_DATA, 'symlink.py'),
},
],
'initial_cwd': self.initial_cwd,
},
}
cmd = [sys.executable, os.path.join('trace_inputs', 'symlink.py')]
results = self._execute_trace(cmd)
actual = results.flatten()
self.assertTrue(actual['root'].pop('pid'))
self.assertEquals(expected, actual)
files = [
# In particular, the symlink is *not* resolved.
u'tests/trace_inputs/files2/'.replace('/', os.path.sep),
u'tests/trace_inputs/symlink.py'.replace('/', os.path.sep),
]
def blacklist(f):
return f.endswith(('.pyc', '.svn', 'do_not_care.txt'))
simplified = self.trace_inputs.extract_directories(
ROOT_DIR, results.files, blacklist)
self.assertEquals(files, [f.path for f in simplified])
def test_trace_quoted(self):
results = self._execute_trace([sys.executable, '-c', 'print("hi")'])
expected = {
'root': {
'children': [],
'command': [
self.executable,
'-c',
'print("hi")',
],
'executable': self.real_executable,
'files': [],
'initial_cwd': self.initial_cwd,
},
}
actual = results.flatten()
self.assertTrue(actual['root'].pop('pid'))
self.assertEquals(expected, actual)
def _touch_expected(self, command):
# Looks for file that were touched but not opened, using different apis.
results = self._execute_trace(
[sys.executable, os.path.join('trace_inputs', 'touch_only.py'), command])
expected = {
'root': {
'children': [],
'command': [
self.executable,
os.path.join('trace_inputs', 'touch_only.py'),
command,
],
'executable': self.real_executable,
'files': [
{
'path': os.path.join(REL_DATA, 'test_file.txt'),
'size': 0,
},
{
'path': os.path.join(REL_DATA, 'touch_only.py'),
'size': self._size(REL_DATA, 'touch_only.py'),
},
],
'initial_cwd': self.initial_cwd,
},
}
if sys.platform != 'linux2':
# TODO(maruel): Remove once properly implemented.
expected['root']['files'].pop(0)
actual = results.flatten()
self.assertTrue(actual['root'].pop('pid'))
self.assertEquals(expected, actual)
def test_trace_touch_only_access(self):
self._touch_expected('access')
def test_trace_touch_only_isfile(self):
self._touch_expected('isfile')
def test_trace_touch_only_stat(self):
self._touch_expected('stat')
if __name__ == '__main__':
VERBOSE = '-v' in sys.argv
logging.basicConfig(level=logging.DEBUG if VERBOSE else logging.ERROR)
unittest.main()