| #!/usr/bin/python2 |
| # Copyright 2015 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Unit tests for the EC-3PO Console interface.""" |
| |
| from __future__ import print_function |
| |
| import binascii |
| # pylint: disable=cros-logging-import |
| import logging |
| import mock |
| import multiprocessing |
| import tempfile |
| import unittest |
| |
| import console |
| import interpreter |
| |
| ESC_STRING = chr(console.ControlKey.ESC) |
| |
| class Keys(object): |
| """A class that contains the escape sequences for special keys.""" |
| LEFT_ARROW = [console.ControlKey.ESC, ord('['), ord('D')] |
| RIGHT_ARROW = [console.ControlKey.ESC, ord('['), ord('C')] |
| UP_ARROW = [console.ControlKey.ESC, ord('['), ord('A')] |
| DOWN_ARROW = [console.ControlKey.ESC, ord('['), ord('B')] |
| HOME = [console.ControlKey.ESC, ord('['), ord('1'), ord('~')] |
| END = [console.ControlKey.ESC, ord('['), ord('8'), ord('~')] |
| DEL = [console.ControlKey.ESC, ord('['), ord('3'), ord('~')] |
| |
| class OutputStream(object): |
| """A class that has methods which return common console output.""" |
| |
| @staticmethod |
| def MoveCursorLeft(count): |
| """Produces what would be printed to the console if the cursor moved left. |
| |
| Args: |
| count: An integer representing how many columns to move left. |
| |
| Returns: |
| string: A string which contains what would be printed to the console if |
| the cursor moved left. |
| """ |
| string = ESC_STRING |
| string += '[' + str(count) + 'D' |
| return string |
| |
| @staticmethod |
| def MoveCursorRight(count): |
| """Produces what would be printed to the console if the cursor moved right. |
| |
| Args: |
| count: An integer representing how many columns to move right. |
| |
| Returns: |
| string: A string which contains what would be printed to the console if |
| the cursor moved right. |
| """ |
| string = ESC_STRING |
| string += '[' + str(count) + 'C' |
| return string |
| |
| BACKSPACE_STRING = '' |
| # Move cursor left 1 column. |
| BACKSPACE_STRING += OutputStream.MoveCursorLeft(1) |
| # Write a space. |
| BACKSPACE_STRING += ' ' |
| # Move cursor left 1 column. |
| BACKSPACE_STRING += OutputStream.MoveCursorLeft(1) |
| |
| def StringToByteList(string): |
| """Converts a string to list of bytes. |
| |
| Args: |
| string: A literal string to turn into a list of bytes. |
| |
| Returns: |
| A list of integers representing the byte value of each character in the |
| string. |
| """ |
| return [ord(c) for c in string] |
| |
| def BadConsoleOutput(expected, got): |
| """Format the console output into readable text. |
| |
| Args: |
| expected: A list of bytes representing the expected output console |
| stream. |
| got: A list of byte representing the actual output console stream. |
| |
| Returns: |
| string: A formatted string which shows the expected console output stream |
| and the actual console output stream. |
| """ |
| esc_state = 0 |
| string = 'Incorrect console output stream.\n' |
| string += 'exp: |' |
| count = 0 |
| for char in expected: |
| if esc_state != 0: |
| if esc_state == console.EscState.ESC_START: |
| if char == '[': |
| esc_state = console.EscState.ESC_BRACKET |
| elif esc_state == console.EscState.ESC_BRACKET: |
| if char == 'D': |
| string += '[cursor left ' + str(count) + ' cols]' |
| esc_state = 0 |
| elif char == 'C': |
| string += '[cursor right ' + str(count) + ' cols]' |
| esc_state = 0 |
| else: |
| count = int(char) |
| # Print if it's printable. |
| elif console.IsPrintable(ord(char)): |
| string += char |
| else: |
| # It might be a sequence of some type. |
| if ord(char) == console.ControlKey.ESC: |
| # Need to look at the following sequence. |
| esc_state = console.EscState.ESC_START |
| else: |
| string += '{' + binascii.hexlify(char) + '}' |
| |
| string += '|\n\ngot: |' |
| for char in got: |
| if esc_state != 0: |
| if esc_state == console.EscState.ESC_START: |
| if char == '[': |
| esc_state = console.EscState.ESC_BRACKET |
| elif esc_state == console.EscState.ESC_BRACKET: |
| if char == 'D': |
| string += '[cursor left ' + str(count) + ' cols]' |
| esc_state = 0 |
| elif char == 'C': |
| string += '[cursor right ' + str(count) + ' cols]' |
| esc_state = 0 |
| else: |
| count = int(char) |
| # Print if it's printable. |
| elif console.IsPrintable(ord(char)): |
| string += char |
| else: |
| # It might be a sequence of some type. |
| if ord(char) == console.ControlKey.ESC: |
| # Need to look at the following sequence. |
| esc_state = console.EscState.ESC_START |
| else: |
| string += '{' + binascii.hexlify(char) + '}' |
| string += '|\n\n' |
| |
| # TODO(aaboagye): It would be nice to replace all those move left 1, ' ', |
| # move left 1, with backspace. |
| |
| return string |
| |
| def CheckConsoleOutput(test_case, exp_console_out): |
| """Verify what was sent out the console matches what we expect. |
| |
| Args: |
| test_case: A unittest.TestCase object representing the current unit test. |
| exp_console_out: A string representing the console output stream. |
| """ |
| # Read what was sent out the console. |
| test_case.tempfile.seek(0) |
| console_out = test_case.tempfile.read() |
| |
| test_case.assertEqual(exp_console_out, |
| console_out, |
| (BadConsoleOutput(exp_console_out, console_out) |
| + str(test_case.console))) |
| |
| def CheckInputBuffer(test_case, exp_input_buffer): |
| """Verify that the input buffer contains what we expect. |
| |
| Args: |
| test_case: A unittest.TestCase object representing the current unit test. |
| exp_input_buffer: A string containing the contents of the current input |
| buffer. |
| """ |
| test_case.assertEqual(exp_input_buffer, test_case.console.input_buffer, |
| ('input buffer does not match expected.\n' |
| 'expected: |' + exp_input_buffer + '|\n' |
| 'got: |' + test_case.console.input_buffer + |
| '|\n' + str(test_case.console))) |
| |
| def CheckInputBufferPosition(test_case, exp_pos): |
| """Verify the input buffer position. |
| |
| Args: |
| test_case: A unittest.TestCase object representing the current unit test. |
| exp_pos: An integer representing the expected input buffer position. |
| """ |
| test_case.assertEqual(exp_pos, test_case.console.input_buffer_pos, |
| 'input buffer position is incorrect.\ngot: ' + |
| str(test_case.console.input_buffer_pos) + '\nexp: ' + |
| str(exp_pos) + '\n' + str(test_case.console)) |
| |
| def CheckHistoryBuffer(test_case, exp_history): |
| """Verify that the items in the history buffer are what we expect. |
| |
| Args: |
| test_case: A unittest.TestCase object representing the current unit test. |
| exp_history: A list of strings representing the expected contents of the |
| history buffer. |
| """ |
| # First, check to see if the length is what we expect. |
| test_case.assertEqual(len(exp_history), len(test_case.console.history), |
| ('The number of items in the history is unexpected.\n' |
| 'exp: ' + str(len(exp_history)) + '\n' |
| 'got: ' + str(len(test_case.console.history)) + '\n' |
| 'internal state:\n' + str(test_case.console))) |
| |
| # Next, check the actual contents of the history buffer. |
| for i in range(len(exp_history)): |
| test_case.assertEqual(exp_history[i], test_case.console.history[i], |
| ('history buffer contents are incorrect.\n' |
| 'exp: ' + exp_history[i] + '\n' |
| 'got: ' + test_case.console.history[i] + '\n' |
| 'internal state:\n' + str(test_case.console))) |
| |
| |
| class TestConsoleEditingMethods(unittest.TestCase): |
| """Test case to verify all console editing methods.""" |
| |
| def setUp(self): |
| """Setup the test harness.""" |
| # Setup logging with a timestamp, the module, and the log level. |
| logging.basicConfig(level=logging.DEBUG, |
| format=('%(asctime)s - %(module)s -' |
| ' %(levelname)s - %(message)s')) |
| |
| # Create a temp file and set both the master and slave PTYs to the file to |
| # create a loopback. |
| self.tempfile = tempfile.TemporaryFile() |
| |
| # Create some dummy pipes. These won't be used since we'll mock out sends |
| # to the interpreter. |
| dummy_pipe_end_0, dummy_pipe_end_1 = multiprocessing.Pipe() |
| self.console = console.Console(self.tempfile.fileno(), self.tempfile, |
| dummy_pipe_end_0, dummy_pipe_end_1) |
| |
| # Console editing methods are only valid for enhanced EC images, therefore |
| # we have to assume that the "EC" we're talking to is enhanced. By default, |
| # the console believes that the EC it's communicating with is NOT enhanced |
| # which is why we have to override it here. |
| self.console.enhanced_ec = True |
| self.console.CheckForEnhancedECImage = mock.MagicMock(return_value=True) |
| |
| # Mock out sends to the interpreter. |
| multiprocessing.Pipe.send = mock.MagicMock() |
| |
| def test_EnteringChars(self): |
| """Verify that characters are echoed onto the console.""" |
| test_str = 'abc' |
| input_stream = StringToByteList(test_str) |
| |
| # Send the characters in. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # Check the input position. |
| exp_pos = len(test_str) |
| CheckInputBufferPosition(self, exp_pos) |
| |
| # Verify that the input buffer is correct. |
| expected_buffer = test_str |
| CheckInputBuffer(self, expected_buffer) |
| |
| # Check console output |
| exp_console_out = test_str |
| CheckConsoleOutput(self, exp_console_out) |
| |
| def test_EnteringDeletingMoreCharsThanEntered(self): |
| """Verify that we can press backspace more than we have entered chars.""" |
| test_str = 'spamspam' |
| input_stream = StringToByteList(test_str) |
| |
| # Send the characters in. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # Now backspace 1 more than what we sent. |
| input_stream = [] |
| for _ in range(len(test_str) + 1): |
| input_stream.append(console.ControlKey.BACKSPACE) |
| |
| # Send that sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # First, verify that input buffer position is 0. |
| CheckInputBufferPosition(self, 0) |
| |
| # Next, examine the output stream for the correct sequence. |
| exp_console_out = test_str |
| for _ in range(len(test_str)): |
| exp_console_out += BACKSPACE_STRING |
| |
| # Now, verify that we got what we expected. |
| CheckConsoleOutput(self, exp_console_out) |
| |
| def test_EnteringMoreThanCharLimit(self): |
| """Verify that we drop characters when the line is too long.""" |
| test_str = self.console.line_limit * 'o' # All allowed. |
| test_str += 5 * 'x' # All should be dropped. |
| input_stream = StringToByteList(test_str) |
| |
| # Send the characters in. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # First, we expect that input buffer position should be equal to the line |
| # limit. |
| exp_pos = self.console.line_limit |
| CheckInputBufferPosition(self, exp_pos) |
| |
| # The input buffer should only hold until the line limit. |
| exp_buffer = test_str[0:self.console.line_limit] |
| CheckInputBuffer(self, exp_buffer) |
| |
| # Lastly, check that the extra characters are not printed. |
| exp_console_out = exp_buffer |
| CheckConsoleOutput(self, exp_console_out) |
| |
| def test_ValidKeysOnLongLine(self): |
| """Verify that we can still press valid keys if the line is too long.""" |
| # Fill the line. |
| test_str = self.console.line_limit * 'o' |
| exp_console_out = test_str |
| # Try to fill it even more; these should all be dropped. |
| test_str += 5 * 'x' |
| input_stream = StringToByteList(test_str) |
| |
| # We should be able to press the following keys: |
| # - Backspace |
| # - Arrow Keys/CTRL+B/CTRL+F/CTRL+P/CTRL+N |
| # - Delete |
| # - Home/CTRL+A |
| # - End/CTRL+E |
| # - Carriage Return |
| |
| # Backspace 1 character |
| input_stream.append(console.ControlKey.BACKSPACE) |
| exp_console_out += BACKSPACE_STRING |
| # Refill the line. |
| input_stream.extend(StringToByteList('o')) |
| exp_console_out += 'o' |
| |
| # Left arrow key. |
| input_stream.extend(Keys.LEFT_ARROW) |
| exp_console_out += OutputStream.MoveCursorLeft(1) |
| |
| # Right arrow key. |
| input_stream.extend(Keys.RIGHT_ARROW) |
| exp_console_out += OutputStream.MoveCursorRight(1) |
| |
| # CTRL+B |
| input_stream.append(console.ControlKey.CTRL_B) |
| exp_console_out += OutputStream.MoveCursorLeft(1) |
| |
| # CTRL+F |
| input_stream.append(console.ControlKey.CTRL_F) |
| exp_console_out += OutputStream.MoveCursorRight(1) |
| |
| # Let's press enter now so we can test up and down. |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| exp_console_out += '\r\n' + self.console.prompt |
| |
| # Up arrow key. |
| input_stream.extend(Keys.UP_ARROW) |
| exp_console_out += test_str[:self.console.line_limit] |
| |
| # Down arrow key. |
| input_stream.extend(Keys.DOWN_ARROW) |
| # Since the line was blank, we have to backspace the entire line. |
| exp_console_out += self.console.line_limit * BACKSPACE_STRING |
| |
| # CTRL+P |
| input_stream.append(console.ControlKey.CTRL_P) |
| exp_console_out += test_str[:self.console.line_limit] |
| |
| # CTRL+N |
| input_stream.append(console.ControlKey.CTRL_N) |
| # Since the line was blank, we have to backspace the entire line. |
| exp_console_out += self.console.line_limit * BACKSPACE_STRING |
| |
| # Press the Up arrow key to reprint the long line. |
| input_stream.extend(Keys.UP_ARROW) |
| exp_console_out += test_str[:self.console.line_limit] |
| |
| # Press the Home key to jump to the beginning of the line. |
| input_stream.extend(Keys.HOME) |
| exp_console_out += OutputStream.MoveCursorLeft(self.console.line_limit) |
| |
| # Press the End key to jump to the end of the line. |
| input_stream.extend(Keys.END) |
| exp_console_out += OutputStream.MoveCursorRight(self.console.line_limit) |
| |
| # Press CTRL+A to jump to the beginning of the line. |
| input_stream.append(console.ControlKey.CTRL_A) |
| exp_console_out += OutputStream.MoveCursorLeft(self.console.line_limit) |
| |
| # Press CTRL+E to jump to the end of the line. |
| input_stream.extend(Keys.END) |
| exp_console_out += OutputStream.MoveCursorRight(self.console.line_limit) |
| |
| # Move left one column so we can delete a character. |
| input_stream.extend(Keys.LEFT_ARROW) |
| exp_console_out += OutputStream.MoveCursorLeft(1) |
| |
| # Press the delete key. |
| input_stream.extend(Keys.DEL) |
| # This should look like a space, and then move cursor left 1 column since |
| # we're at the end of line. |
| exp_console_out += ' ' + OutputStream.MoveCursorLeft(1) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # Verify everything happened correctly. |
| CheckConsoleOutput(self, exp_console_out) |
| |
| def test_BackspaceOnEmptyLine(self): |
| """Verify that we can backspace on an empty line with no bad effects.""" |
| # Send a single backspace. |
| test_str = [console.ControlKey.BACKSPACE] |
| |
| # Send the characters in. |
| for byte in test_str: |
| self.console.HandleChar(byte) |
| |
| # Check the input position. |
| exp_pos = 0 |
| CheckInputBufferPosition(self, exp_pos) |
| |
| # Check that buffer is empty. |
| exp_input_buffer = '' |
| CheckInputBuffer(self, exp_input_buffer) |
| |
| # Check that the console output is empty. |
| exp_console_out = '' |
| CheckConsoleOutput(self, exp_console_out) |
| |
| def test_BackspaceWithinLine(self): |
| """Verify that we shift the chars over when backspacing within a line.""" |
| # Misspell 'help' |
| test_str = 'heelp' |
| input_stream = StringToByteList(test_str) |
| # Use the arrow key to go back to fix it. |
| # Move cursor left 1 column. |
| input_stream.extend(2*Keys.LEFT_ARROW) |
| # Backspace once to remove the extra 'e'. |
| input_stream.append(console.ControlKey.BACKSPACE) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # Verify the input buffer |
| exp_input_buffer = 'help' |
| CheckInputBuffer(self, exp_input_buffer) |
| |
| # Verify the input buffer position. It should be at 2 (cursor over the 'l') |
| CheckInputBufferPosition(self, 2) |
| |
| # We expect the console output to be the test string, with two moves to the |
| # left, another move left, and then the rest of the line followed by a |
| # space. |
| exp_console_out = test_str |
| exp_console_out += 2 * OutputStream.MoveCursorLeft(1) |
| |
| # Move cursor left 1 column. |
| exp_console_out += OutputStream.MoveCursorLeft(1) |
| # Rest of the line and a space. (test_str in this case) |
| exp_console_out += 'lp ' |
| # Reset the cursor 2 + 1 to the left. |
| exp_console_out += OutputStream.MoveCursorLeft(3) |
| |
| # Verify console output. |
| CheckConsoleOutput(self, exp_console_out) |
| |
| def test_JumpToBeginningOfLineViaCtrlA(self): |
| """Verify that we can jump to the beginning of a line with Ctrl+A.""" |
| # Enter some chars and press CTRL+A |
| test_str = 'abc' |
| input_stream = StringToByteList(test_str) + [console.ControlKey.CTRL_A] |
| |
| # Send the characters in. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # We expect to see our test string followed by a move cursor left. |
| exp_console_out = test_str |
| exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) |
| |
| # Check to see what whas printed on the console. |
| CheckConsoleOutput(self, exp_console_out) |
| |
| # Check that the input buffer position is now 0. |
| CheckInputBufferPosition(self, 0) |
| |
| # Check input buffer still contains our test string. |
| CheckInputBuffer(self, test_str) |
| |
| def test_JumpToBeginningOfLineViaHomeKey(self): |
| """Jump to beginning of line via HOME key.""" |
| test_str = 'version' |
| input_stream = StringToByteList(test_str) |
| input_stream.extend(Keys.HOME) |
| |
| # Send out the stream. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # First, verify that input buffer position is now 0. |
| CheckInputBufferPosition(self, 0) |
| |
| # Next, verify that the input buffer did not change. |
| CheckInputBuffer(self, test_str) |
| |
| # Lastly, check that the cursor moved correctly. |
| exp_console_out = test_str |
| exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) |
| CheckConsoleOutput(self, exp_console_out) |
| |
| def test_JumpToEndOfLineViaEndKey(self): |
| """Jump to the end of the line using the END key.""" |
| test_str = 'version' |
| input_stream = StringToByteList(test_str) |
| input_stream += [console.ControlKey.CTRL_A] |
| # Now, jump to the end of the line. |
| input_stream.extend(Keys.END) |
| |
| # Send out the stream. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # Verify that the input buffer position is correct. This should be at the |
| # end of the test string. |
| CheckInputBufferPosition(self, len(test_str)) |
| |
| # The expected output should be the test string, followed by a jump to the |
| # beginning of the line, and lastly a jump to the end of the line. |
| exp_console_out = test_str |
| exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) |
| # Now the jump back to the end of the line. |
| exp_console_out += OutputStream.MoveCursorRight(len(test_str)) |
| |
| # Verify console output stream. |
| CheckConsoleOutput(self, exp_console_out) |
| |
| def test_JumpToEndOfLineViaCtrlE(self): |
| """Enter some chars and then try to jump to the end. (Should be a no-op)""" |
| test_str = 'sysinfo' |
| input_stream = StringToByteList(test_str) |
| input_stream.append(console.ControlKey.CTRL_E) |
| |
| # Send out the stream |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # Verify that the input buffer position isn't any further than we expect. |
| # At this point, the position should be at the end of the test string. |
| CheckInputBufferPosition(self, len(test_str)) |
| |
| # Now, let's try to jump to the beginning and then jump back to the end. |
| input_stream = [console.ControlKey.CTRL_A, console.ControlKey.CTRL_E] |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # Perform the same verification. |
| CheckInputBufferPosition(self, len(test_str)) |
| |
| # Lastly try to jump again, beyond the end. |
| input_stream = [console.ControlKey.CTRL_E] |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # Perform the same verification. |
| CheckInputBufferPosition(self, len(test_str)) |
| |
| # We expect to see the test string, a jump to the beginning of the line, and |
| # one jump to the end of the line. |
| exp_console_out = test_str |
| # Jump to beginning. |
| exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) |
| # Jump back to end. |
| exp_console_out += OutputStream.MoveCursorRight(len(test_str)) |
| |
| # Verify the console output. |
| CheckConsoleOutput(self, exp_console_out) |
| |
| def test_MoveLeftWithArrowKey(self): |
| """Move cursor left one column with arrow key.""" |
| test_str = 'tastyspam' |
| input_stream = StringToByteList(test_str) |
| input_stream.extend(Keys.LEFT_ARROW) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # Verify that the input buffer position is 1 less than the length. |
| CheckInputBufferPosition(self, len(test_str) - 1) |
| |
| # Also, verify that the input buffer is not modified. |
| CheckInputBuffer(self, test_str) |
| |
| # We expect the test string, followed by a one column move left. |
| exp_console_out = test_str + OutputStream.MoveCursorLeft(1) |
| |
| # Verify console output. |
| CheckConsoleOutput(self, exp_console_out) |
| |
| def test_MoveLeftWithCtrlB(self): |
| """Move cursor back one column with Ctrl+B.""" |
| test_str = 'tastyspam' |
| input_stream = StringToByteList(test_str) |
| input_stream.append(console.ControlKey.CTRL_B) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # Verify that the input buffer position is 1 less than the length. |
| CheckInputBufferPosition(self, len(test_str) - 1) |
| |
| # Also, verify that the input buffer is not modified. |
| CheckInputBuffer(self, test_str) |
| |
| # We expect the test string, followed by a one column move left. |
| exp_console_out = test_str + OutputStream.MoveCursorLeft(1) |
| |
| # Verify console output. |
| CheckConsoleOutput(self, exp_console_out) |
| |
| def test_MoveRightWithArrowKey(self): |
| """Move cursor one column to the right with the arrow key.""" |
| test_str = 'version' |
| input_stream = StringToByteList(test_str) |
| # Jump to beginning of line. |
| input_stream.append(console.ControlKey.CTRL_A) |
| # Press right arrow key. |
| input_stream.extend(Keys.RIGHT_ARROW) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # Verify that the input buffer position is 1. |
| CheckInputBufferPosition(self, 1) |
| |
| # Also, verify that the input buffer is not modified. |
| CheckInputBuffer(self, test_str) |
| |
| # We expect the test string, followed by a jump to the beginning of the |
| # line, and finally a move right 1. |
| exp_console_out = test_str + OutputStream.MoveCursorLeft(len((test_str))) |
| |
| # A move right 1 column. |
| exp_console_out += OutputStream.MoveCursorRight(1) |
| |
| # Verify console output. |
| CheckConsoleOutput(self, exp_console_out) |
| |
| def test_MoveRightWithCtrlF(self): |
| """Move cursor forward one column with Ctrl+F.""" |
| test_str = 'panicinfo' |
| input_stream = StringToByteList(test_str) |
| input_stream.append(console.ControlKey.CTRL_A) |
| # Now, move right one column. |
| input_stream.append(console.ControlKey.CTRL_F) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # Verify that the input buffer position is 1. |
| CheckInputBufferPosition(self, 1) |
| |
| # Also, verify that the input buffer is not modified. |
| CheckInputBuffer(self, test_str) |
| |
| # We expect the test string, followed by a jump to the beginning of the |
| # line, and finally a move right 1. |
| exp_console_out = test_str + OutputStream.MoveCursorLeft(len((test_str))) |
| |
| # A move right 1 column. |
| exp_console_out += OutputStream.MoveCursorRight(1) |
| |
| # Verify console output. |
| CheckConsoleOutput(self, exp_console_out) |
| |
| def test_ImpossibleMoveLeftWithArrowKey(self): |
| """Verify that we can't move left at the beginning of the line.""" |
| # We shouldn't be able to move left if we're at the beginning of the line. |
| input_stream = Keys.LEFT_ARROW |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # Nothing should have been output. |
| exp_console_output = '' |
| CheckConsoleOutput(self, exp_console_output) |
| |
| # The input buffer position should still be 0. |
| CheckInputBufferPosition(self, 0) |
| |
| # The input buffer itself should be empty. |
| CheckInputBuffer(self, '') |
| |
| def test_ImpossibleMoveRightWithArrowKey(self): |
| """Verify that we can't move right at the end of the line.""" |
| # We shouldn't be able to move right if we're at the end of the line. |
| input_stream = Keys.RIGHT_ARROW |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # Nothing should have been output. |
| exp_console_output = '' |
| CheckConsoleOutput(self, exp_console_output) |
| |
| # The input buffer position should still be 0. |
| CheckInputBufferPosition(self, 0) |
| |
| # The input buffer itself should be empty. |
| CheckInputBuffer(self, '') |
| |
| def test_KillEntireLine(self): |
| """Verify that we can kill an entire line with Ctrl+K.""" |
| test_str = 'accelinfo on' |
| input_stream = StringToByteList(test_str) |
| # Jump to beginning of line and then kill it with Ctrl+K. |
| input_stream.extend([console.ControlKey.CTRL_A, console.ControlKey.CTRL_K]) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # First, we expect that the input buffer is empty. |
| CheckInputBuffer(self, '') |
| |
| # The buffer position should be 0. |
| CheckInputBufferPosition(self, 0) |
| |
| # What we expect to see on the console stream should be the following. The |
| # test string, a jump to the beginning of the line, then jump back to the |
| # end of the line and replace the line with spaces. |
| exp_console_out = test_str |
| # Jump to beginning of line. |
| exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) |
| # Jump to end of line. |
| exp_console_out += OutputStream.MoveCursorRight(len(test_str)) |
| # Replace line with spaces, which looks like backspaces. |
| for _ in range(len(test_str)): |
| exp_console_out += BACKSPACE_STRING |
| |
| # Verify the console output. |
| CheckConsoleOutput(self, exp_console_out) |
| |
| def test_KillPartialLine(self): |
| """Verify that we can kill a portion of a line.""" |
| test_str = 'accelread 0 1' |
| input_stream = StringToByteList(test_str) |
| len_to_kill = 5 |
| for _ in range(len_to_kill): |
| # Move cursor left |
| input_stream.extend(Keys.LEFT_ARROW) |
| # Now kill |
| input_stream.append(console.ControlKey.CTRL_K) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # First, check that the input buffer was truncated. |
| exp_input_buffer = test_str[:-len_to_kill] |
| CheckInputBuffer(self, exp_input_buffer) |
| |
| # Verify the input buffer position. |
| CheckInputBufferPosition(self, len(test_str) - len_to_kill) |
| |
| # The console output stream that we expect is the test string followed by a |
| # move left of len_to_kill, then a jump to the end of the line and backspace |
| # of len_to_kill. |
| exp_console_out = test_str |
| for _ in range(len_to_kill): |
| # Move left 1 column. |
| exp_console_out += OutputStream.MoveCursorLeft(1) |
| # Then jump to the end of the line |
| exp_console_out += OutputStream.MoveCursorRight(len_to_kill) |
| # Backspace of len_to_kill |
| for _ in range(len_to_kill): |
| exp_console_out += BACKSPACE_STRING |
| |
| # Verify console output. |
| CheckConsoleOutput(self, exp_console_out) |
| |
| def test_InsertingCharacters(self): |
| """Verify that we can insert charcters within the line.""" |
| test_str = 'accel 0 1' # Here we forgot the 'read' part in 'accelread' |
| input_stream = StringToByteList(test_str) |
| # We need to move over to the 'l' and add read. |
| insertion_point = test_str.find('l') + 1 |
| for i in range(len(test_str) - insertion_point): |
| # Move cursor left. |
| input_stream.extend(Keys.LEFT_ARROW) |
| # Now, add in 'read' |
| added_str = 'read' |
| input_stream.extend(StringToByteList(added_str)) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # First, verify that the input buffer is correct. |
| exp_input_buffer = test_str[:insertion_point] + added_str |
| exp_input_buffer += test_str[insertion_point:] |
| CheckInputBuffer(self, exp_input_buffer) |
| |
| # Verify that the input buffer position is correct. |
| exp_input_buffer_pos = insertion_point + len(added_str) |
| CheckInputBufferPosition(self, exp_input_buffer_pos) |
| |
| # The console output stream that we expect is the test string, followed by |
| # move cursor left until the 'l' was found, the added test string while |
| # shifting characters around. |
| exp_console_out = test_str |
| for i in range(len(test_str) - insertion_point): |
| # Move cursor left. |
| exp_console_out += OutputStream.MoveCursorLeft(1) |
| |
| # Now for each character, write the rest of the line will be shifted to the |
| # right one column. |
| for i in range(len(added_str)): |
| # Printed character. |
| exp_console_out += added_str[i] |
| # The rest of the line |
| exp_console_out += test_str[insertion_point:] |
| # Reset the cursor back left |
| reset_dist = len(test_str[insertion_point:]) |
| exp_console_out += OutputStream.MoveCursorLeft(reset_dist) |
| |
| # Verify the console output. |
| CheckConsoleOutput(self, exp_console_out) |
| |
| def test_StoreCommandHistory(self): |
| """Verify that entered commands are stored in the history.""" |
| test_commands = [] |
| test_commands.append('help') |
| test_commands.append('version') |
| test_commands.append('accelread 0 1') |
| input_stream = [] |
| for c in test_commands: |
| input_stream.extend(StringToByteList(c)) |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # We expect to have the test commands in the history buffer. |
| exp_history_buf = test_commands |
| CheckHistoryBuffer(self, exp_history_buf) |
| |
| def test_CycleUpThruCommandHistory(self): |
| """Verify that the UP arrow key will print itmes in the history buffer.""" |
| # Enter some commands. |
| test_commands = ['version', 'accelrange 0', 'battery', 'gettime'] |
| input_stream = [] |
| for command in test_commands: |
| input_stream.extend(StringToByteList(command)) |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| |
| # Now, hit the UP arrow key to print the previous entries. |
| for i in range(len(test_commands)): |
| input_stream.extend(Keys.UP_ARROW) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # The expected output should be test commands with prompts printed in |
| # between, followed by line kills with the previous test commands printed. |
| exp_console_out = '' |
| for i in range(len(test_commands)): |
| exp_console_out += test_commands[i] + '\r\n' + self.console.prompt |
| |
| # When we press up, the line should be cleared and print the previous buffer |
| # entry. |
| for i in range(len(test_commands)-1, 0, -1): |
| exp_console_out += test_commands[i] |
| # Backspace to the beginning. |
| for i in range(len(test_commands[i])): |
| exp_console_out += BACKSPACE_STRING |
| |
| # The last command should just be printed out with no backspacing. |
| exp_console_out += test_commands[0] |
| |
| # Now, verify. |
| CheckConsoleOutput(self, exp_console_out) |
| |
| def test_UpArrowOnEmptyHistory(self): |
| """Ensure nothing happens if the history is empty.""" |
| # Press the up arrow key twice. |
| input_stream = 2 * Keys.UP_ARROW |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # We expect nothing to have happened. |
| exp_console_out = '' |
| exp_input_buffer = '' |
| exp_input_buffer_pos = 0 |
| exp_history_buf = [] |
| |
| # Verify. |
| CheckConsoleOutput(self, exp_console_out) |
| CheckInputBufferPosition(self, exp_input_buffer_pos) |
| CheckInputBuffer(self, exp_input_buffer) |
| CheckHistoryBuffer(self, exp_history_buf) |
| |
| def test_UpArrowDoesNotGoOutOfBounds(self): |
| """Verify that pressing the up arrow many times won't go out of bounds.""" |
| # Enter one command. |
| test_str = 'help version' |
| input_stream = StringToByteList(test_str) |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| # Then press the up arrow key twice. |
| input_stream.extend(2 * Keys.UP_ARROW) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # Verify that the history buffer is correct. |
| exp_history_buf = [test_str] |
| CheckHistoryBuffer(self, exp_history_buf) |
| |
| # We expect that the console output should only contain our entered command, |
| # a new prompt, and then our command aggain. |
| exp_console_out = test_str + '\r\n' + self.console.prompt |
| # Pressing up should reprint the command we entered. |
| exp_console_out += test_str |
| |
| # Verify. |
| CheckConsoleOutput(self, exp_console_out) |
| |
| def test_CycleDownThruCommandHistory(self): |
| """Verify that we can select entries by hitting the down arrow.""" |
| # Enter at least 4 commands. |
| test_commands = ['version', 'accelrange 0', 'battery', 'gettime'] |
| input_stream = [] |
| for command in test_commands: |
| input_stream.extend(StringToByteList(command)) |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| |
| # Now, hit the UP arrow key twice to print the previous two entries. |
| for i in range(2): |
| input_stream.extend(Keys.UP_ARROW) |
| |
| # Now, hit the DOWN arrow key twice to print the newer entries. |
| input_stream.extend(2*Keys.DOWN_ARROW) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # The expected output should be commands that we entered, followed by |
| # prompts, then followed by our last two commands in reverse. Then, we |
| # should see the last entry in the list, followed by the saved partial cmd |
| # of a blank line. |
| exp_console_out = '' |
| for i in range(len(test_commands)): |
| exp_console_out += test_commands[i] + '\r\n' + self.console.prompt |
| |
| # When we press up, the line should be cleared and print the previous buffer |
| # entry. |
| for i in range(len(test_commands)-1, 1, -1): |
| exp_console_out += test_commands[i] |
| # Backspace to the beginning. |
| for i in range(len(test_commands[i])): |
| exp_console_out += BACKSPACE_STRING |
| |
| # When we press down, it should have cleared the last command (which we |
| # covered with the previous for loop), and then prints the next command. |
| exp_console_out += test_commands[3] |
| for i in range(len(test_commands[3])): |
| exp_console_out += BACKSPACE_STRING |
| |
| # Verify console output. |
| CheckConsoleOutput(self, exp_console_out) |
| |
| # Verify input buffer. |
| exp_input_buffer = '' # Empty because our partial command was empty. |
| exp_input_buffer_pos = len(exp_input_buffer) |
| CheckInputBuffer(self, exp_input_buffer) |
| CheckInputBufferPosition(self, exp_input_buffer_pos) |
| |
| def test_SavingPartialCommandWhenNavigatingHistory(self): |
| """Verify that partial commands are saved when navigating history.""" |
| # Enter a command. |
| test_str = 'accelinfo' |
| input_stream = StringToByteList(test_str) |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| |
| # Enter a partial command. |
| partial_cmd = 'ver' |
| input_stream.extend(StringToByteList(partial_cmd)) |
| |
| # Hit the UP arrow key. |
| input_stream.extend(Keys.UP_ARROW) |
| # Then, the DOWN arrow key. |
| input_stream.extend(Keys.DOWN_ARROW) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # The expected output should be the command we entered, a prompt, the |
| # partial command, clearing of the partial command, the command entered, |
| # clearing of the command entered, and then the partial command. |
| exp_console_out = test_str + '\r\n' + self.console.prompt |
| exp_console_out += partial_cmd |
| for _ in range(len(partial_cmd)): |
| exp_console_out += BACKSPACE_STRING |
| exp_console_out += test_str |
| for _ in range(len(test_str)): |
| exp_console_out += BACKSPACE_STRING |
| exp_console_out += partial_cmd |
| |
| # Verify console output. |
| CheckConsoleOutput(self, exp_console_out) |
| |
| # Verify input buffer. |
| exp_input_buffer = partial_cmd |
| exp_input_buffer_pos = len(exp_input_buffer) |
| CheckInputBuffer(self, exp_input_buffer) |
| CheckInputBufferPosition(self, exp_input_buffer_pos) |
| |
| def test_DownArrowOnEmptyHistory(self): |
| """Ensure nothing happens if the history is empty.""" |
| # Then press the up down arrow twice. |
| input_stream = 2 * Keys.DOWN_ARROW |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # We expect nothing to have happened. |
| exp_console_out = '' |
| exp_input_buffer = '' |
| exp_input_buffer_pos = 0 |
| exp_history_buf = [] |
| |
| # Verify. |
| CheckConsoleOutput(self, exp_console_out) |
| CheckInputBufferPosition(self, exp_input_buffer_pos) |
| CheckInputBuffer(self, exp_input_buffer) |
| CheckHistoryBuffer(self, exp_history_buf) |
| |
| def test_DeleteCharsUsingDELKey(self): |
| """Verify that we can delete characters using the DEL key.""" |
| test_str = 'version' |
| input_stream = StringToByteList(test_str) |
| |
| # Hit the left arrow key 2 times. |
| input_stream.extend(2 * Keys.LEFT_ARROW) |
| |
| # Press the DEL key. |
| input_stream.extend(Keys.DEL) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # The expected output should be the command we entered, 2 individual cursor |
| # moves to the left, and then removing a char and shifting everything to the |
| # left one column. |
| exp_console_out = test_str |
| exp_console_out += 2 * OutputStream.MoveCursorLeft(1) |
| |
| # Remove the char by shifting everything to the left one, slicing out the |
| # remove char. |
| exp_console_out += test_str[-1:] + ' ' |
| |
| # Reset the cursor by moving back 2 columns because of the 'n' and space. |
| exp_console_out += OutputStream.MoveCursorLeft(2) |
| |
| # Verify console output. |
| CheckConsoleOutput(self, exp_console_out) |
| |
| # Verify input buffer. The input buffer should have the char sliced out and |
| # be positioned where the char was removed. |
| exp_input_buffer = test_str[:-2] + test_str[-1:] |
| exp_input_buffer_pos = len(exp_input_buffer) - 1 |
| CheckInputBuffer(self, exp_input_buffer) |
| CheckInputBufferPosition(self, exp_input_buffer_pos) |
| |
| def test_RepeatedCommandInHistory(self): |
| """Verify that we don't store 2 consecutive identical commands in history""" |
| # Enter a few commands. |
| test_commands = ['version', 'accelrange 0', 'battery', 'gettime'] |
| # Repeat the last command. |
| test_commands.append(test_commands[len(test_commands)-1]) |
| |
| input_stream = [] |
| for command in test_commands: |
| input_stream.extend(StringToByteList(command)) |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # Verify that the history buffer is correct. The last command, since |
| # it was repeated, should not have been added to the history. |
| exp_history_buf = test_commands[0:len(test_commands)-1] |
| CheckHistoryBuffer(self, exp_history_buf) |
| |
| |
| class TestConsoleCompatibility(unittest.TestCase): |
| """Verify that console can speak to enhanced and non-enhanced EC images.""" |
| def setUp(self): |
| """Setup the test harness.""" |
| # Setup logging with a timestamp, the module, and the log level. |
| logging.basicConfig(level=logging.DEBUG, |
| format=('%(asctime)s - %(module)s -' |
| ' %(levelname)s - %(message)s')) |
| # Create a temp file and set both the master and slave PTYs to the file to |
| # create a loopback. |
| self.tempfile = tempfile.TemporaryFile() |
| |
| # Mock out the pipes. |
| dummy_pipe_end_0, dummy_pipe_end_1 = mock.MagicMock(), mock.MagicMock() |
| self.console = console.Console(self.tempfile.fileno(), self.tempfile, |
| dummy_pipe_end_0, dummy_pipe_end_1) |
| |
| @mock.patch('console.Console.CheckForEnhancedECImage') |
| def test_ActAsPassThruInNonEnhancedMode(self, mock_check): |
| """Verify we simply pass everything thru to non-enhanced ECs. |
| |
| Args: |
| mock_check: A MagicMock object replacing the CheckForEnhancedECImage() |
| method. |
| """ |
| # Set the interrogation mode to always so that we actually interrogate. |
| self.console.interrogation_mode = 'always' |
| |
| # Assume EC interrogations indicate that the image is non-enhanced. |
| mock_check.return_value = False |
| |
| # Press enter, followed by the command, and another enter. |
| input_stream = [] |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| test_command = 'version' |
| input_stream.extend(StringToByteList(test_command)) |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # Expected calls to send down the pipe would be each character of the test |
| # command. |
| expected_calls = [] |
| expected_calls.append(mock.call(chr(console.ControlKey.CARRIAGE_RETURN))) |
| for char in test_command: |
| expected_calls.append(mock.call(char)) |
| expected_calls.append(mock.call(chr(console.ControlKey.CARRIAGE_RETURN))) |
| |
| # Verify that the calls happened. |
| self.console.cmd_pipe.send.assert_has_calls(expected_calls) |
| |
| # Since we're acting as a pass-thru, the input buffer should be empty and |
| # input_buffer_pos is 0. |
| CheckInputBuffer(self, '') |
| CheckInputBufferPosition(self, 0) |
| |
| @mock.patch('console.Console.CheckForEnhancedECImage') |
| def test_TransitionFromNonEnhancedToEnhanced(self, mock_check): |
| """Verify that we transition correctly to enhanced mode. |
| |
| Args: |
| mock_check: A MagicMock object replacing the CheckForEnhancedECImage() |
| method. |
| """ |
| # Set the interrogation mode to always so that we actually interrogate. |
| self.console.interrogation_mode = 'always' |
| |
| # First, assume that the EC interrogations indicate an enhanced EC image. |
| mock_check.return_value = True |
| # But our current knowledge of the EC image (which was actually the |
| # 'previous' EC) was a non-enhanced image. |
| self.console.enhanced_ec = False |
| |
| test_command = 'sysinfo' |
| input_stream = [] |
| input_stream.extend(StringToByteList(test_command)) |
| |
| expected_calls = [] |
| # All keystrokes to the console should be directed straight through to the |
| # EC until we press the enter key. |
| for char in test_command: |
| expected_calls.append(mock.call(char)) |
| |
| # Press the enter key. |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| # The enter key should not be sent to the pipe since we should negotiate |
| # to an enhanced EC image. |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # At this point, we should have negotiated to enhanced. |
| self.assertTrue(self.console.enhanced_ec, msg=('Did not negotiate to ' |
| 'enhanced EC image.')) |
| |
| # The command would have been dropped however, so verify this... |
| CheckInputBuffer(self, '') |
| CheckInputBufferPosition(self, 0) |
| # ...and repeat the command. |
| input_stream = StringToByteList(test_command) |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # Since we're enhanced now, we should have sent the entire command as one |
| # string with no trailing carriage return |
| expected_calls.append(mock.call(test_command)) |
| |
| # Verify all of the calls. |
| self.console.cmd_pipe.send.assert_has_calls(expected_calls) |
| |
| @mock.patch('console.Console.CheckForEnhancedECImage') |
| def test_TransitionFromEnhancedToNonEnhanced(self, mock_check): |
| """Verify that we transition correctly to non-enhanced mode. |
| |
| Args: |
| mock_check: A MagicMock object replacing the CheckForEnhancedECImage() |
| method. |
| """ |
| # Set the interrogation mode to always so that we actually interrogate. |
| self.console.interrogation_mode = 'always' |
| |
| # First, assume that the EC interrogations indicate an non-enhanced EC |
| # image. |
| mock_check.return_value = False |
| # But our current knowledge of the EC image (which was actually the |
| # 'previous' EC) was an enhanced image. |
| self.console.enhanced_ec = True |
| |
| test_command = 'sysinfo' |
| input_stream = [] |
| input_stream.extend(StringToByteList(test_command)) |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # But, we will negotiate to non-enhanced however, dropping this command. |
| # Verify this. |
| self.assertFalse(self.console.enhanced_ec, msg=('Did not negotiate to' |
| 'non-enhanced EC image.')) |
| CheckInputBuffer(self, '') |
| CheckInputBufferPosition(self, 0) |
| |
| # The carriage return should have passed through though. |
| expected_calls = [] |
| expected_calls.append(mock.call(chr(console.ControlKey.CARRIAGE_RETURN))) |
| |
| # Since the command was dropped, repeat the command. |
| input_stream = StringToByteList(test_command) |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # Since we're not enhanced now, we should have sent each character in the |
| # entire command separately and a carriage return. |
| for char in test_command: |
| expected_calls.append(mock.call(char)) |
| expected_calls.append(mock.call(chr(console.ControlKey.CARRIAGE_RETURN))) |
| |
| # Verify all of the calls. |
| self.console.cmd_pipe.send.assert_has_calls(expected_calls) |
| |
| def test_EnhancedCheckIfTimedOut(self): |
| """Verify that the check returns false if it times out.""" |
| # Make the debug pipe "time out". |
| self.console.dbg_pipe.poll.return_value = False |
| self.assertFalse(self.console.CheckForEnhancedECImage()) |
| |
| def test_EnhancedCheckIfACKReceived(self): |
| """Verify that the check returns true if the ACK is received.""" |
| # Make the debug pipe return EC_ACK. |
| self.console.dbg_pipe.poll.return_value = True |
| self.console.dbg_pipe.recv.return_value = interpreter.EC_ACK |
| self.assertTrue(self.console.CheckForEnhancedECImage()) |
| |
| def test_EnhancedCheckIfWrong(self): |
| """Verify that the check returns false if byte received is wrong.""" |
| # Make the debug pipe return the wrong byte. |
| self.console.dbg_pipe.poll.return_value = True |
| self.console.dbg_pipe.recv.return_value = '\xff' |
| self.assertFalse(self.console.CheckForEnhancedECImage()) |
| |
| def test_EnhancedCheckUsingBuffer(self): |
| """Verify that given reboot output, enhanced EC images are detected.""" |
| enhanced_output_stream = """ |
| --- UART initialized after reboot --- |
| [Reset cause: reset-pin soft] |
| [Image: RO, jerry_v1.1.4363-2af8572-dirty 2016-02-23 13:26:20 aaboagye@lithium.mtv.corp.google.com] |
| [0.001695 KB boot key 0] |
| [0.001790 Inits done] |
| [0.001923 not sysjump; forcing AP shutdown] |
| [0.002047 EC triggered warm reboot] |
| [0.002155 assert GPIO_PMIC_WARM_RESET_L for 4 ms] |
| [0.006326 auto_power_on set due to reset_flag 0x22] |
| [0.006477 Wait for battery stabilized during 1000000] |
| [0.007368 battery responded with status c0] |
| [0.009099 hash start 0x00010000 0x0000eb7c] |
| [0.009307 KB init state: -- -- -- -- -- -- -- -- -- -- -- -- --] |
| [0.009531 KB wait] |
| Enhanced Console is enabled (v1.0.0); type HELP for help. |
| > [0.009782 event set 0x00002000] |
| [0.009903 hostcmd init 0x2000] |
| [0.010031 power state 0 = G3, in 0x0000] |
| [0.010173 power state 4 = G3->S5, in 0x0000] |
| [0.010324 power state 1 = S5, in 0x0000] |
| [0.010466 power on 2] |
| [0.010566 power state 5 = S5->S3, in 0x0000] |
| [0.037713 event set 0x00000080] |
| [0.037836 event set 0x00400000] |
| [0.038675 Battery 89% / 1092h:15 to empty] |
| [0.224060 hash done 41dac382e3a6e3d2ea5b4d789c1bc46525cae7cc5ff6758f0de8d8369b506f57] |
| [0.375150 POWER_GOOD seen] |
| """ |
| for line in enhanced_output_stream.split('\n'): |
| self.console.CheckBufferForEnhancedImage(line) |
| |
| # Since the enhanced console string was present in the output, the console |
| # should have caught it. |
| self.assertTrue(self.console.enhanced_ec) |
| |
| # Also should check that the command was sent to the interpreter. |
| self.console.cmd_pipe.send.assert_called_once_with('enhanced True') |
| |
| # Now test the non-enhanced EC image. |
| self.console.cmd_pipe.reset_mock() |
| non_enhanced_output_stream = """ |
| --- UART initialized after reboot --- |
| [Reset cause: reset-pin soft] |
| [Image: RO, jerry_v1.1.4363-2af8572-dirty 2016-02-23 13:03:15 aaboagye@lithium.mtv.corp.google.com] |
| [0.001695 KB boot key 0] |
| [0.001790 Inits done] |
| [0.001923 not sysjump; forcing AP shutdown] |
| [0.002047 EC triggered warm reboot] |
| [0.002156 assert GPIO_PMIC_WARM_RESET_L for 4 ms] |
| [0.006326 auto_power_on set due to reset_flag 0x22] |
| [0.006477 Wait for battery stabilized during 1000000] |
| [0.007368 battery responded with status c0] |
| [0.008951 hash start 0x00010000 0x0000ed78] |
| [0.009159 KB init state: -- -- -- -- -- -- -- -- -- -- -- -- --] |
| [0.009383 KB wait] |
| Console is enabled; type HELP for help. |
| > [0.009602 event set 0x00002000] |
| [0.009722 hostcmd init 0x2000] |
| [0.009851 power state 0 = G3, in 0x0000] |
| [0.009993 power state 4 = G3->S5, in 0x0000] |
| [0.010144 power state 1 = S5, in 0x0000] |
| [0.010285 power on 2] |
| [0.010385 power state 5 = S5->S3, in 0x0000] |
| """ |
| for line in non_enhanced_output_stream.split('\n'): |
| self.console.CheckBufferForEnhancedImage(line) |
| |
| # Since the default console string is present in the output, it should be |
| # determined to be non enhanced now. |
| self.assertFalse(self.console.enhanced_ec) |
| |
| # Check that command was also sent to the interpreter. |
| self.console.cmd_pipe.send.assert_called_once_with('enhanced False') |
| |
| |
| class TestOOBMConsoleCommands(unittest.TestCase): |
| """Verify that OOBM console commands work correctly.""" |
| def setUp(self): |
| """Setup the test harness.""" |
| # Setup logging with a timestamp, the module, and the log level. |
| logging.basicConfig(level=logging.DEBUG, |
| format=('%(asctime)s - %(module)s -' |
| ' %(levelname)s - %(message)s')) |
| # Create a temp file and set both the master and slave PTYs to the file to |
| # create a loopback. |
| self.tempfile = tempfile.TemporaryFile() |
| |
| # Mock out the pipes. |
| dummy_pipe_end_0, dummy_pipe_end_1 = mock.MagicMock(), mock.MagicMock() |
| self.console = console.Console(self.tempfile.fileno(), self.tempfile, |
| dummy_pipe_end_0, dummy_pipe_end_1) |
| self.console.oobm_queue = mock.MagicMock() |
| |
| @mock.patch('console.Console.CheckForEnhancedECImage') |
| def test_InterrogateCommand(self, mock_check): |
| """Verify that 'interrogate' command works as expected. |
| |
| Args: |
| mock_check: A MagicMock object replacing the CheckForEnhancedECIMage() |
| method. |
| """ |
| input_stream = [] |
| expected_calls = [] |
| mock_check.side_effect = [False] |
| |
| # 'interrogate never' should disable the interrogation from happening at |
| # all. |
| cmd = 'interrogate never' |
| # Enter the OOBM prompt. |
| input_stream.extend(StringToByteList('%')) |
| # Type the command |
| input_stream.extend(StringToByteList(cmd)) |
| # Press enter. |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| input_stream = [] |
| |
| # The OOBM queue should have been called with the command being put. |
| expected_calls.append(mock.call.put(cmd)) |
| self.console.oobm_queue.assert_has_calls(expected_calls) |
| |
| # Process the OOBM queue. |
| self.console.oobm_queue.get.side_effect = [cmd] |
| self.console.ProcessOOBMQueue() |
| |
| # Type out a few commands. |
| input_stream.extend(StringToByteList('version')) |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| input_stream.extend(StringToByteList('flashinfo')) |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| input_stream.extend(StringToByteList('sysinfo')) |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # The Check function should NOT have been called at all. |
| mock_check.assert_not_called() |
| |
| # The EC image should be assumed to be not enhanced. |
| self.assertFalse(self.console.enhanced_ec, 'The image should be assumed to' |
| ' be NOT enhanced.') |
| |
| # Reset the mocks. |
| mock_check.reset_mock() |
| self.console.oobm_queue.reset_mock() |
| |
| # 'interrogate auto' should not interrogate at all. It should only be |
| # scanning the output stream for the 'console is enabled' strings. |
| cmd = 'interrogate auto' |
| # Enter the OOBM prompt. |
| input_stream.extend(StringToByteList('%')) |
| # Type the command |
| input_stream.extend(StringToByteList(cmd)) |
| # Press enter. |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| input_stream = [] |
| expected_calls = [] |
| |
| # The OOBM queue should have been called with the command being put. |
| expected_calls.append(mock.call.put(cmd)) |
| self.console.oobm_queue.assert_has_calls(expected_calls) |
| |
| # Process the OOBM queue. |
| self.console.oobm_queue.get.side_effect = [cmd] |
| self.console.ProcessOOBMQueue() |
| |
| # Type out a few commands. |
| input_stream.extend(StringToByteList('version')) |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| input_stream.extend(StringToByteList('flashinfo')) |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| input_stream.extend(StringToByteList('sysinfo')) |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # The Check function should NOT have been called at all. |
| mock_check.assert_not_called() |
| |
| # The EC image should be assumed to be not enhanced. |
| self.assertFalse(self.console.enhanced_ec, 'The image should be assumed to' |
| ' be NOT enhanced.') |
| |
| # Reset the mocks. |
| mock_check.reset_mock() |
| self.console.oobm_queue.reset_mock() |
| |
| # 'interrogate always' should, like its name implies, interrogate always |
| # after each press of the enter key. This was the former way of doing |
| # interrogation. |
| cmd = 'interrogate always' |
| # Enter the OOBM prompt. |
| input_stream.extend(StringToByteList('%')) |
| # Type the command |
| input_stream.extend(StringToByteList(cmd)) |
| # Press enter. |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| input_stream = [] |
| expected_calls = [] |
| |
| # The OOBM queue should have been called with the command being put. |
| expected_calls.append(mock.call.put(cmd)) |
| self.console.oobm_queue.assert_has_calls(expected_calls) |
| |
| # Process the OOBM queue. |
| self.console.oobm_queue.get.side_effect = [cmd] |
| self.console.ProcessOOBMQueue() |
| |
| # The Check method should be called 3 times here. |
| mock_check.side_effect = [False, False, False] |
| |
| # Type out a few commands. |
| input_stream.extend(StringToByteList('help list')) |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| input_stream.extend(StringToByteList('taskinfo')) |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| input_stream.extend(StringToByteList('hibdelay')) |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # The Check method should have been called 3 times here. |
| expected_calls = [mock.call(), mock.call(), mock.call()] |
| mock_check.assert_has_calls(expected_calls) |
| |
| # The EC image should be assumed to be not enhanced. |
| self.assertFalse(self.console.enhanced_ec, 'The image should be assumed to' |
| ' be NOT enhanced.') |
| |
| # Now, let's try to assume that the image is enhanced while still disabling |
| # interrogation. |
| mock_check.reset_mock() |
| self.console.oobm_queue.reset_mock() |
| input_stream = [] |
| cmd = 'interrogate never enhanced' |
| # Enter the OOBM prompt. |
| input_stream.extend(StringToByteList('%')) |
| # Type the command |
| input_stream.extend(StringToByteList(cmd)) |
| # Press enter. |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| input_stream = [] |
| expected_calls = [] |
| |
| # The OOBM queue should have been called with the command being put. |
| expected_calls.append(mock.call.put(cmd)) |
| self.console.oobm_queue.assert_has_calls(expected_calls) |
| |
| # Process the OOBM queue. |
| self.console.oobm_queue.get.side_effect = [cmd] |
| self.console.ProcessOOBMQueue() |
| |
| # Type out a few commands. |
| input_stream.extend(StringToByteList('chgstate')) |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| input_stream.extend(StringToByteList('hash')) |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| input_stream.extend(StringToByteList('sysjump rw')) |
| input_stream.append(console.ControlKey.CARRIAGE_RETURN) |
| |
| # Send the sequence out. |
| for byte in input_stream: |
| self.console.HandleChar(byte) |
| |
| # The check method should have never been called. |
| mock_check.assert_not_called() |
| |
| # The EC image should be assumed to be enhanced. |
| self.assertTrue(self.console.enhanced_ec, 'The image should be' |
| ' assumed to be enhanced.') |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |