| #!/usr/bin/python |
| # Copyright (c) 2010 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| '''Unittest for cgpt_state module |
| |
| In this unit teset, cgpt_table is used as a template to manipulate the change |
| of gpt information. It is initialized as test_cgpt_handler.MOCKED_COMMANDS. |
| cgpt_table is dynamically changed by cgpt_handler.set_partition() over |
| the iterations of cgpt_state.test_loop() so that |
| cgpt_handler.get_device_info() can read kernel properties out of it. |
| |
| ''' |
| |
| import os |
| import re |
| import sys |
| import unittest |
| |
| import cgpt_handler |
| import cgpt_state |
| import test_cgpt_handler |
| |
| from saft_utility import BASE_STORAGE_DEVICE |
| |
| |
| # Global variables to be shared between TestCgptState and MockChromeosInterface |
| cgpt_table = test_cgpt_handler.MOCKED_COMMANDS |
| step = 0 |
| |
| |
| class MockChromeosInterface: |
| def __init__(self, cgpt_choice): |
| # The cgpt step file will be kept under /tmp during this unit test. |
| self.state_dir = '/tmp' |
| cgpt_state_seq_body = cgpt_state.CGPT_STATE_SEQ_BODY[cgpt_choice] |
| cgpt_state_seq_end = cgpt_state.CGPT_STATE_SEQ_END |
| self.cgpt_state_seq = cgpt_state_seq_body + cgpt_state_seq_end |
| self.POS_BOOT_VEC = cgpt_state.CgptState.PARA_POS['BOOT_VEC'] |
| self.POS_EXPECTED = cgpt_state.CgptState.PARA_POS['EXPECTED'] |
| |
| def state_dir_file(self, file_name): |
| return os.path.join(self.state_dir, file_name) |
| |
| def log(self, msg): |
| pass |
| |
| def boot_state_vector(self): |
| '''Mocked to read boot vector from cgpt_state_seq directly.''' |
| global step |
| return self.cgpt_state_seq[step - 1][self.POS_BOOT_VEC] |
| |
| def run_shell_command_get_output(self, command): |
| '''Mocked to return device info for cgpt_handler.get_device_info()''' |
| return [x.rstrip() for x in cgpt_table[command].split('\n')] |
| |
| def run_shell_command(self, command): |
| '''Mocked to perform cgpt_handler.set_partition on cgpt_table()''' |
| global step |
| ATTR_NAME = cgpt_state.CgptState.PROP_NAME |
| PART_POS = {2:0, 4:1} |
| attr_dict = {} |
| # extract partition id from the command line |
| part = re.search(r'-i \d+', command) |
| if part: |
| part_id = int(part.group().split()[1]) |
| # Extract attribute values from cgpt_state_seq |
| expected_props = self.cgpt_state_seq[step][self.POS_EXPECTED] |
| expected_prop = expected_props[PART_POS[part_id]] |
| attr_val_str = expected_prop.split(':') |
| for i, name in enumerate(ATTR_NAME): |
| attr_dict[name] = int(attr_val_str[i]) |
| # scan through every line of cgpt table |
| part_flag = False |
| table_content = cgpt_table['cgpt show /dev/sda'] |
| lines = table_content.split('\n') |
| for i in range(len(lines)): |
| # Proceed to the correct partition |
| if 'Label:' in lines[i] and int(lines[i].split()[2]) == part_id: |
| part_flag = True |
| # Set attributes into the line of the mocked cgpt table |
| if part_flag and 'Attr:' in lines[i]: |
| for j, name in enumerate(ATTR_NAME): |
| if attr_dict[name] is not None: |
| lines[i] = re.sub(r'(%s)=\d*' % ATTR_NAME[j], |
| r'\1=%d' % attr_dict[name], lines[i]) |
| break |
| # reconstruct cgpt table with modified attributes |
| cgpt_table['cgpt show /dev/sda'] = '\n'.join(lines) |
| |
| |
| class TestCgptState(unittest.TestCase): |
| def setUp(self): |
| cgpt_choice = 'COMPLETE' |
| self.chros_if = MockChromeosInterface(cgpt_choice) |
| self.cgpt_st = cgpt_state.CgptState(cgpt_choice, self.chros_if, |
| BASE_STORAGE_DEVICE) |
| # Suppress error messages generated by the program under test. |
| self.stdout = sys.stdout |
| sys.stdout = open('/dev/null', 'w') |
| |
| def test_cgpt_state_test_loop(self): |
| global step |
| |
| # To test set_step() and get_step() work as expected |
| self.cgpt_st.set_step(step) |
| step_got = self.cgpt_st.get_step() |
| self.assertEqual(step, step_got) |
| |
| # to test whether cgpt_state.test_loop() works as expected |
| while self.cgpt_st.test_loop() == 0: |
| step += 1 |
| |
| # To verify the number of steps executed is as expected |
| self.assertEqual(step, self.cgpt_st.num_steps-1) |
| |
| def tearDown(self): |
| sys.stdout = self.stdout |
| # remove the temporary cgpt step file |
| os.remove(self.cgpt_st.step_file) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |