blob: 3323cc3b52c1912827465708d4cb9ffaefae8711 [file] [log] [blame]
# Copyright 2017 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import threading
import time
class LoggerMock(object):
def __init__(self, test_lib):
self.test_lib = test_lib
self.runtimes = collections.defaultdict(list)
self.exit_codes = collections.defaultdict(list)
self.last_execution_times = collections.defaultdict(list)
self.execution_numbers = collections.defaultdict(list)
def log_exit(self, task):
self.runtimes[task.test_id].append(task.runtime_ms)
self.exit_codes[task.test_id].append(task.exit_code)
self.last_execution_times[task.test_id].append(task.last_execution_time)
self.execution_numbers[task.test_id].append(task.execution_number)
def assertRecorded(self, test_id, expected, retries):
self.test_lib.assertIn(test_id, self.runtimes)
self.test_lib.assertListEqual(expected['runtime_ms'][:retries],
self.runtimes[test_id])
self.test_lib.assertListEqual(expected['exit_code'][:retries],
self.exit_codes[test_id])
self.test_lib.assertListEqual(expected['last_execution_time'][:retries],
self.last_execution_times[test_id])
self.test_lib.assertListEqual(expected['execution_number'][:retries],
self.execution_numbers[test_id])
class TestTimesMock(object):
def __init__(self, test_lib, test_data=None):
self.test_lib = test_lib
self.test_data = test_data or {}
self.last_execution_times = collections.defaultdict(list)
def record_test_time(self, test_binary, test_name, last_execution_time):
test_id = (test_binary, test_name)
self.last_execution_times[test_id].append(last_execution_time)
def get_test_time(self, test_binary, test_name):
test_group, test = test_name.split('.')
return self.test_data.get(
test_binary, {}).get(test_group, {}).get(test, None)
def assertRecorded(self, test_id, expected, retries):
self.test_lib.assertIn(test_id, self.last_execution_times)
self.test_lib.assertListEqual(expected['last_execution_time'][:retries],
self.last_execution_times[test_id])
class TestResultsMock(object):
def __init__(self, test_lib):
self.results = []
self.test_lib = test_lib
def log(self, test_name, runtime_ms, actual_result):
self.results.append((test_name, runtime_ms, actual_result))
def assertRecorded(self, test_id, expected, retries):
test_results = [
(test_id[1], runtime_ms, 'PASS' if exit_code == 0 else 'FAIL')
for runtime_ms, exit_code in zip(expected['runtime_ms'][:retries],
expected['exit_code'][:retries])
]
for test_result in test_results:
self.test_lib.assertIn(test_result, self.results)
class TaskManagerMock(object):
def __init__(self):
self.running_groups = []
self.check_lock = threading.Lock()
self.had_running_parallel_groups = False
self.total_tasks_run = 0
def run_task(self, task):
test_group = task.test_name.split('.')[0]
with self.check_lock:
self.total_tasks_run += 1
if test_group in self.running_groups:
self.had_running_parallel_groups = True
self.running_groups.append(test_group)
# Delay as if real test were run.
time.sleep(0.001)
with self.check_lock:
self.running_groups.remove(test_group)
class TaskMockFactory(object):
def __init__(self, test_data):
self.data = test_data
self.passed = []
self.failed = []
def get_task(self, test_id, execution_number=0):
task = TaskMock(test_id, execution_number, self.data[test_id])
if task.exit_code == 0:
self.passed.append(task)
else:
self.failed.append(task)
return task
def __call__(self, test_binary, test_name, test_command, execution_number,
last_execution_time, output_dir):
return self.get_task((test_binary, test_name), execution_number)
class TaskMock(object):
def __init__(self, test_id, execution_number, test_data):
self.test_id = test_id
self.execution_number = execution_number
self.runtime_ms = test_data['runtime_ms'][execution_number]
self.exit_code = test_data['exit_code'][execution_number]
self.last_execution_time = (
test_data['last_execution_time'][execution_number])
if 'log_file' in test_data:
self.log_file = test_data['log_file'][execution_number]
else:
self.log_file = None
self.test_command = None
self.output_dir = None
self.test_binary = test_id[0]
self.test_name = test_id[1]
self.task_id = (test_id[0], test_id[1], execution_number)
def run(self):
pass
class SubprocessMock(object):
def __init__(self, test_data=None):
self._test_data = test_data
self.last_invocation = None
def __call__(self, command, **kwargs):
self.last_invocation = command
binary = command[0]
test_list = []
tests_for_binary = sorted(self._test_data.get(binary, {}).items())
for test_group, tests in tests_for_binary:
test_list.append(test_group + ".")
for test in sorted(tests):
test_list.append(" " + test)
return '\n'.join(test_list)