| #!/usr/bin/python |
| # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| # |
| # Python wrapper script for running tests under QEMU |
| # |
| |
| import errno |
| import imp |
| import json |
| import os |
| import optparse |
| import re |
| import signal |
| import socket |
| import subprocess |
| import sys |
| import threading |
| import time |
| |
| QEMU_BINARY="qemu-system-arm" |
| QEMU_OPTIONS=["-machine","lm4f232h5","-serial","stdio","-display","none"] |
| |
| def trace(msg): |
| sys.stdout.write(msg) |
| |
| class QEMUError(Exception): |
| def __init__(self, value): |
| self.value = value |
| |
| def __str__(self): |
| return "QEMU Error:" + repr(self.value) |
| |
| class QEMUInstance: |
| PORT=3456 |
| QMP_ADDR=("127.0.0.1", PORT) |
| |
| def __run_qemu(self, cmdline, redirect_stdio=False): |
| trace("Starting QEMU binary ...\n") |
| if redirect_stdio: |
| stdin = subprocess.PIPE |
| stdout = subprocess.PIPE |
| else: |
| stdin = None |
| stdout = None |
| self.__qemu = subprocess.Popen(cmdline, shell=False, bufsize=16384, |
| stdin=stdin, stdout=stdout, close_fds=True) |
| trace("QEMU started pid:%d\n" % (self.__qemu.pid)) |
| self.__qemu.wait() |
| trace("QEMU has terminated\n") |
| |
| def __init__(self, qemu_bin, firmware, romcode = None, testmode = False): |
| self.__events = [] |
| cmdline = [qemu_bin] + QEMU_OPTIONS + ["-kernel",firmware,"-qmp","tcp:%s:%d" % self.QMP_ADDR] |
| if romcode: |
| cmdline += ["-bios",romcode] |
| self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| self.__sock.bind(self.QMP_ADDR) |
| self.__sock.listen(1) |
| |
| self.__thr = threading.Thread(target=QEMUInstance.__run_qemu,args=(self,cmdline,testmode)) |
| self.__thr.start() |
| try: |
| trace("Waiting for QEMU connection ...\n") |
| self.__sock, _ = self.__sock.accept() |
| self.__sockfd = self.__sock.makefile() |
| except socket.error: |
| raise QEMUError('Cannot connect to QMP server') |
| |
| version = self.__json_recv() |
| if version is None or not version.has_key('QMP'): |
| raise QEMUError('Not QMP support') |
| # Test basic communication with QMP |
| resp = self.send_qmp('qmp_capabilities') |
| if not "return" in resp: |
| raise QEMUError('QMP not working properly') |
| trace("QMP connected\n") |
| |
| def __json_recv(self, only_event=False): |
| while True: |
| data = self.__sockfd.readline() |
| if not data: |
| return |
| return json.loads(data) |
| |
| def send_qmp(self, name, args=None): |
| qmp_cmd = { 'execute': name } |
| if args: |
| qmp_cmd['arguments'] = args |
| try: |
| self.__sock.sendall(json.dumps(qmp_cmd)) |
| except socket.error, err: |
| if err[0] == errno.EPIPE: |
| return |
| raise QEMUError("Error on QMP socket:" + err) |
| return self.__json_recv() |
| |
| def serial_readline(self): |
| return self.__qemu.stdout.readline() |
| |
| def serial_write(self, string): |
| self.__qemu.stdin.write(string) |
| self.__qemu.stdin.flush() |
| |
| def get_event(self, blocking=True): |
| if not blocking: |
| self.__sock.setblocking(0) |
| try: |
| val = self.__json_recv() |
| except socket.error, err: |
| if err[0] == errno.EAGAIN: |
| # Nothing available |
| return None |
| if not blocking: |
| self.__sock.setblocking(1) |
| return val |
| |
| def close(self): |
| # Try to terminate QEMU gracefully |
| if self.__qemu.poll() == None: |
| self.send_qmp("quit") |
| time.sleep(0.1) |
| # Force termination if the process is still here : |
| if self.__qemu.poll() == None: |
| self.__qemu.terminate() |
| self.__thr.join() |
| self.__sock.close() |
| self.__sockfd.close() |
| |
| class TestFailure(Exception): |
| def __init__(self, reason): |
| self.value = reason |
| |
| def __str__(self): |
| return "reason:" + repr(self.value) |
| |
| class EcTest: |
| def __init__(self, qemu_bin, firmware, romcode, test): |
| self.__qemu_bin = qemu_bin |
| self.__firmware = firmware |
| self.__romcode = romcode |
| self.__test = test |
| |
| def timeout_handler(self, signum, frame): |
| raise TestFailure("Timeout waiting for %s" % self.__timeout_reason) |
| |
| def wait_output(self, string, use_re = False, timeout = 5): |
| self.__timeout_reason = string |
| old_handler = signal.signal(signal.SIGALRM, lambda |
| s,f:self.timeout_handler(s,f)) |
| if use_re: |
| regexp = re.compile(string) |
| signal.alarm(timeout) |
| while True: |
| ln = self.__qemu.serial_readline() |
| trace("[EC]%s" % ln) |
| if use_re: |
| res = regexp.search(ln) |
| if res: |
| signal.alarm(0) |
| signal.signal(signal.SIGALRM, old_handler) |
| return res.groupdict() |
| else: |
| if string in ln: |
| signal.alarm(0) |
| signal.signal(signal.SIGALRM, old_handler) |
| return |
| |
| def check_no_output(self, string, use_re = False, timeout = 1): |
| success = False |
| try: |
| self.wait_output(string, use_re=use_re, timeout=timeout) |
| except: |
| success = True |
| return success |
| |
| |
| def wait_prompt(self): |
| self.wait_output("> ") |
| |
| def ec_command(self, cmd): |
| self.__qemu.serial_write(cmd + '\r\n') |
| |
| def trace(self, msg): |
| trace(msg) |
| |
| def report(self, msg): |
| sys.stderr.write(" === TEST %s ===\n" % msg) |
| |
| def fail(self, msg): |
| raise TestFailure(msg) |
| |
| def run_test(self): |
| try: |
| self.__qemu = QEMUInstance(self.__qemu_bin, self.__firmware, |
| self.__romcode, True) |
| except QEMUError as e: |
| self.report("QEMU FATAL ERROR: " + e.value) |
| return 1 |
| |
| # Set up import path so each test can import other modules inside 'test' |
| sys.path.insert(0, os.path.dirname(os.path.abspath(self.__test))) |
| |
| testmod = imp.load_module("testmodule", file(self.__test,"r"), |
| self.__test, (".py","r",imp.PY_SOURCE)) |
| self.report("RUN: %s" % os.path.basename(self.__test)) |
| try: |
| res = testmod.test(self) |
| except TestFailure as e: |
| res = False |
| self.report("FAIL: %s" % e.value) |
| self.__qemu.close() |
| if res: |
| self.report("PASS") |
| return 0 |
| return 1 |
| |
| def run_interactive(qemu_bin, firmware, romcode): |
| try: |
| qemu = QEMUInstance(qemu_bin, firmware, romcode, False) |
| except QEMUError as e: |
| sys.stderr.write('FATAL: %s\n' % e.value) |
| return 1 |
| |
| # Dummy testing code : TODO remove |
| #print qemu.send_qmp("query-commands") |
| #print qemu.send_qmp("human-monitor-command", |
| # { 'command-line': "sendkey ctrl-alt-f1 50",'cpu-index': 0 }) |
| while True: |
| msg = qemu.get_event() |
| trace("[EVENT]%s\n" % msg) |
| if msg.has_key("event") and msg["event"] == "RESET": |
| break |
| qemu.close() |
| return 0 |
| |
| def parse_cmdline(basedir): |
| parser = optparse.OptionParser("usage: %prog [options] [testname]") |
| parser.add_option("-b", "--board", dest="board", default="bds", |
| help="board to use") |
| parser.add_option("-i", "--image", dest="image", |
| help="firmware image filename") |
| parser.add_option("-r", "--rom", dest="romcode", |
| default=os.path.join(basedir,"util","rom_lm4fs1ge5bb.bin"), |
| help="ROM code image filename") |
| parser.add_option("-q", "--qemu", dest="qemu_bin", |
| default=os.path.join(basedir,"util",QEMU_BINARY), |
| help="Qemu binary path") |
| (options, args) = parser.parse_args() |
| if options.image: |
| image = options.image |
| else: |
| image = os.path.join(basedir,"build",options.board,"ec.bin") |
| |
| return options.qemu_bin, image,options.romcode, args |
| |
| if __name__ == '__main__': |
| basedir = os.path.abspath(os.path.join(os.path.dirname(__file__),"..")) |
| qemu_bin, image, romcode, tests = parse_cmdline(basedir) |
| if len(tests) > 0: |
| res = EcTest(qemu_bin, image, romcode, tests[0]).run_test() |
| else: |
| res = run_interactive(qemu_bin, image, romcode) |
| sys.exit(res) |