blob: df03968cda7769c1615b50c9f6c7e9f0cf5db1b3 [file] [log] [blame]
# Lint as: python2, python3
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import unittest
import re
import csv
import common
import os
from autotest_lib.server.cros import resource_monitor
from autotest_lib.server.hosts import abstract_ssh
from autotest_lib.server import utils
import six
from six.moves import map
from six.moves import range
class HostMock(abstract_ssh.AbstractSSHHost):
"""Mocks a host object."""
TOP_PID = '1234'
def _initialize(self, test_env):
self.top_is_running = False
# Keep track of whether the top raw output file exists on the system,
# and if it does, where it is.
self.top_output_file_path = None
# Keep track of whether the raw top output file is currently being
# written to by top.
self.top_output_file_is_open = False
self.test_env = test_env
def get_file(self, src, dest):
pass
def called_unsupported_command(self, command):
"""Raises assertion error when called.
@param command string the unsupported command called.
"""
raise AssertionError(
"ResourceMonitor called unsupported command %s" % command)
def _process_top(self, cmd_args, cmd_line):
"""Process top command.
@param cmd_args string_list of command line args.
@param cmd_line string the command to run.
"""
self.test_env.assertFalse(self.top_is_running,
msg="Top must not already be running.")
self.test_env.assertFalse(self.top_output_file_is_open,
msg="The top output file should not be being written "
"to before top is started")
self.test_env.assertIsNone(self.top_output_file_path,
msg="The top output file should not exist"
"before top is started")
try:
self.redirect_index = cmd_args.index(">")
self.top_output_file_path = cmd_args[self.redirect_index + 1]
except (ValueError, IndexError):
self.called_unsupported_command(cmd_line)
self.top_is_running = True
self.top_output_file_is_open = True
return HostMock.TOP_PID
def _process_kill(self, cmd_args, cmd_line):
"""Process kill command.
@param cmd_args string_list of command line args.
@param cmd_line string the command to run.
"""
try:
if cmd_args[1].startswith('-'):
pid_to_kill = cmd_args[2]
else:
pid_to_kill = cmd_args[1]
except IndexError:
self.called_unsupported_command(cmd_line)
self.test_env.assertEqual(pid_to_kill, HostMock.TOP_PID,
msg="Wrong pid (%r) killed . Top pid is %r." % (pid_to_kill,
HostMock.TOP_PID))
self.test_env.assertTrue(self.top_is_running,
msg="Top must be running before we try to kill it")
self.top_is_running = False
self.top_output_file_is_open = False
def _process_rm(self, cmd_args, cmd_line):
"""Process rm command.
@param cmd_args string list list of command line args.
@param cmd_line string the command to run.
"""
try:
if cmd_args[1].startswith('-'):
file_to_rm = cmd_args[2]
else:
file_to_rm = cmd_args[1]
except IndexError:
self.called_unsupported_command(cmd_line)
self.test_env.assertEqual(file_to_rm, self.top_output_file_path,
msg="Tried to remove file that is not the top output file.")
self.test_env.assertFalse(self.top_output_file_is_open,
msg="Tried to remove top output file while top is still "
"writing to it.")
self.test_env.assertFalse(self.top_is_running,
msg="Top was still running when we tried to remove"
"the top output file.")
self.test_env.assertIsNotNone(self.top_output_file_path)
self.top_output_file_path = None
def _run_single_cmd(self, cmd_line, *args, **kwargs):
"""Run a single command on host.
@param cmd_line command to run on the host.
"""
# Make the input a little nicer.
cmd_line = cmd_line.strip()
cmd_line = re.sub(">", " > ", cmd_line)
cmd_args = re.split("\s+", cmd_line)
self.test_env.assertTrue(len(cmd_args) >= 1)
command = cmd_args[0]
if (command == "top"):
return self._process_top(cmd_args, cmd_line)
elif (command == "kill"):
return self._process_kill(cmd_args, cmd_line)
elif(command == "rm"):
return self._process_rm(cmd_args, cmd_line)
else:
logging.warning("Called unemulated command %r", cmd_line)
return None
def run(self, cmd_line, *args, **kwargs):
"""Run command(s) on host.
@param cmd_line command to run on the host.
@return CmdResult object.
"""
cmds = re.split("&&", cmd_line)
for cmd in cmds:
self._run_single_cmd(cmd)
return utils.CmdResult(exit_status=0)
def run_background(self, cmd_line, *args, **kwargs):
"""Run command in background on host.
@param cmd_line command to run on the host.
"""
return self._run_single_cmd(cmd_line, args, kwargs)
def is_monitoring(self):
"""Return true iff host is currently running top and writing output
to a file.
"""
return self.top_is_running and self.top_output_file_is_open and (
self.top_output_file_path is not None)
def monitoring_stopped(self):
"""Return true iff host is not running top and all top output files are
closed.
"""
return not self.is_monitoring()
class ResourceMonitorTest(unittest.TestCase):
"""Tests the non-trivial functionality of ResourceMonitor."""
def setUp(self):
self.topoutfile = '/tmp/resourcemonitorunittest-1234'
self.monitor_period = 1
self.rm_conf = resource_monitor.ResourceMonitorConfig(
monitor_period=self.monitor_period,
rawresult_output_filename=self.topoutfile)
self.host = HostMock(self)
def test_normal_operation(self):
"""Checks that normal (i.e. no exceptions, etc.) execution works."""
self.assertFalse(self.host.is_monitoring())
with resource_monitor.ResourceMonitor(self.host, self.rm_conf) as rm:
self.assertFalse(self.host.is_monitoring())
for i in range(3):
rm.start()
self.assertTrue(self.host.is_monitoring())
rm.stop()
self.assertTrue(self.host.monitoring_stopped())
self.assertTrue(self.host.monitoring_stopped())
def test_forgot_to_stop_monitor(self):
"""Checks that resource monitor is cleaned up even if user forgets to
explicitly stop it.
"""
self.assertFalse(self.host.is_monitoring())
with resource_monitor.ResourceMonitor(self.host, self.rm_conf) as rm:
self.assertFalse(self.host.is_monitoring())
rm.start()
self.assertTrue(self.host.is_monitoring())
self.assertTrue(self.host.monitoring_stopped())
def test_unexpected_interruption_while_monitoring(self):
"""Checks that monitor is cleaned up upon unexpected interrupt."""
self.assertFalse(self.host.is_monitoring())
with resource_monitor.ResourceMonitor(self.host, self.rm_conf) as rm:
self.assertFalse(self.host.is_monitoring())
rm.start()
self.assertTrue(self.host.is_monitoring())
raise KeyboardInterrupt
self.assertTrue(self.host.monitoring_stopped())
class ResourceMonitorResultTest(unittest.TestCase):
"""Functional tests for ResourceMonitorParsedResult."""
def setUp(self):
self._res_dir = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
'res_resource_monitor')
def run_with_test_data(self, testdata_file, testans_file):
"""Parses testdata_file with the parses, and checks that results
are the same as those in testans_file.
@param testdata_file string filename containing top output to test.
@param testans_file string filename containing answers to the test.
"""
parsed_results = resource_monitor.ResourceMonitorParsedResult(
testdata_file)
with open(testans_file, "rb") as testans:
csvreader = csv.reader(testans)
columns = next(csvreader)
self.assertEqual(list(columns),
resource_monitor.ResourceMonitorParsedResult._columns)
utils_over_time = []
for util_val in map(
resource_monitor.
ResourceMonitorParsedResult.UtilValues._make,
csvreader):
utils_over_time.append(util_val)
self.assertEqual(utils_over_time, parsed_results._utils_over_time)
def test_full_data(self):
"""General test with many possible changes to input."""
self.run_with_test_data(
os.path.join(self._res_dir, 'top_test_data.txt'),
os.path.join(self._res_dir, 'top_test_data_ans.csv'))
def test_whitespace_ridden(self):
"""Tests resilience to arbitrary whitespace characters between fields"""
self.run_with_test_data(
os.path.join(self._res_dir, 'top_whitespace_ridden.txt'),
os.path.join(self._res_dir, 'top_whitespace_ridden_ans.csv'))
def test_field_order_changed(self):
"""Tests resilience to changes in the order of fields
(for e.g, if the Mem free/used fields change orders in the input).
"""
self.run_with_test_data(
os.path.join(self._res_dir, 'top_field_order_changed.txt'),
os.path.join(self._res_dir, 'top_field_order_changed_ans.csv'))
if __name__ == '__main__':
unittest.main()