blob: 1889eedcf3f4b5fd509436a7a25752a4b00b885e [file] [log] [blame]
#!/usr/bin/python
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
#
# Python wrapper script for running tests under QEMU
#
import errno
import imp
import json
import os
import optparse
import re
import signal
import socket
import subprocess
import sys
import threading
import time
QEMU_BINARY="qemu-system-arm"
QEMU_OPTIONS=["-machine","lm4f232h5","-serial","stdio","-display","none"]
def trace(msg):
sys.stdout.write(msg)
class QEMUError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return "QEMU Error:" + repr(self.value)
class QEMUInstance:
PORT=3456
QMP_ADDR=("127.0.0.1", PORT)
def __run_qemu(self, cmdline, redirect_stdio=False):
trace("Starting QEMU binary ...\n")
if redirect_stdio:
stdin = subprocess.PIPE
stdout = subprocess.PIPE
else:
stdin = None
stdout = None
self.__qemu = subprocess.Popen(cmdline, shell=False, bufsize=16384,
stdin=stdin, stdout=stdout, close_fds=True)
trace("QEMU started pid:%d\n" % (self.__qemu.pid))
self.__qemu.wait()
trace("QEMU has terminated\n")
def __init__(self, qemu_bin, firmware, romcode = None, testmode = False):
self.__events = []
cmdline = [qemu_bin] + QEMU_OPTIONS + ["-kernel",firmware,"-qmp","tcp:%s:%d" % self.QMP_ADDR]
if romcode:
cmdline += ["-bios",romcode]
self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__sock.bind(self.QMP_ADDR)
self.__sock.listen(1)
self.__thr = threading.Thread(target=QEMUInstance.__run_qemu,args=(self,cmdline,testmode))
self.__thr.start()
try:
trace("Waiting for QEMU connection ...\n")
self.__sock, _ = self.__sock.accept()
self.__sockfd = self.__sock.makefile()
except socket.error:
raise QEMUError('Cannot connect to QMP server')
version = self.__json_recv()
if version is None or not version.has_key('QMP'):
raise QEMUError('Not QMP support')
# Test basic communication with QMP
resp = self.send_qmp('qmp_capabilities')
if not "return" in resp:
raise QEMUError('QMP not working properly')
trace("QMP connected\n")
def __json_recv(self, only_event=False):
while True:
data = self.__sockfd.readline()
if not data:
return
return json.loads(data)
def send_qmp(self, name, args=None):
qmp_cmd = { 'execute': name }
if args:
qmp_cmd['arguments'] = args
try:
self.__sock.sendall(json.dumps(qmp_cmd))
except socket.error, err:
if err[0] == errno.EPIPE:
return
raise QEMUError("Error on QMP socket:" + err)
return self.__json_recv()
def serial_readline(self):
return self.__qemu.stdout.readline()
def serial_write(self, string):
self.__qemu.stdin.write(string)
self.__qemu.stdin.flush()
def get_event(self, blocking=True):
if not blocking:
self.__sock.setblocking(0)
try:
val = self.__json_recv()
except socket.error, err:
if err[0] == errno.EAGAIN:
# Nothing available
return None
if not blocking:
self.__sock.setblocking(1)
return val
def close(self):
# Try to terminate QEMU gracefully
if self.__qemu.poll() == None:
self.send_qmp("quit")
time.sleep(0.1)
# Force termination if the process is still here :
if self.__qemu.poll() == None:
self.__qemu.terminate()
self.__thr.join()
self.__sock.close()
self.__sockfd.close()
class TestFailure(Exception):
def __init__(self, reason):
self.value = reason
def __str__(self):
return "reason:" + repr(self.value)
class EcTest:
def __init__(self, qemu_bin, firmware, romcode, test):
self.__qemu_bin = qemu_bin
self.__firmware = firmware
self.__romcode = romcode
self.__test = test
def timeout_handler(self, signum, frame):
raise TestFailure("Timeout waiting for %s" % self.__timeout_reason)
def wait_output(self, string, use_re = False, timeout = 5):
self.__timeout_reason = string
old_handler = signal.signal(signal.SIGALRM, lambda
s,f:self.timeout_handler(s,f))
if use_re:
regexp = re.compile(string)
signal.alarm(timeout)
while True:
ln = self.__qemu.serial_readline()
trace("[EC]%s" % ln)
if use_re:
res = regexp.search(ln)
if res:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
return res.groupdict()
else:
if string in ln:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
return
def check_no_output(self, string, use_re = False, timeout = 1):
success = False
try:
self.wait_output(string, use_re=use_re, timeout=timeout)
except:
success = True
return success
def wait_prompt(self):
self.wait_output("> ")
def ec_command(self, cmd):
self.__qemu.serial_write(cmd + '\r\n')
def trace(self, msg):
trace(msg)
def report(self, msg):
sys.stderr.write(" === TEST %s ===\n" % msg)
def fail(self, msg):
raise TestFailure(msg)
def run_test(self):
try:
self.__qemu = QEMUInstance(self.__qemu_bin, self.__firmware,
self.__romcode, True)
except QEMUError as e:
self.report("QEMU FATAL ERROR: " + e.value)
return 1
# Set up import path so each test can import other modules inside 'test'
sys.path.insert(0, os.path.dirname(os.path.abspath(self.__test)))
testmod = imp.load_module("testmodule", file(self.__test,"r"),
self.__test, (".py","r",imp.PY_SOURCE))
self.report("RUN: %s" % os.path.basename(self.__test))
try:
res = testmod.test(self)
except TestFailure as e:
res = False
self.report("FAIL: %s" % e.value)
self.__qemu.close()
if res:
self.report("PASS")
return 0
return 1
def run_interactive(qemu_bin, firmware, romcode):
try:
qemu = QEMUInstance(qemu_bin, firmware, romcode, False)
except QEMUError as e:
sys.stderr.write('FATAL: %s\n' % e.value)
return 1
# Dummy testing code : TODO remove
#print qemu.send_qmp("query-commands")
#print qemu.send_qmp("human-monitor-command",
# { 'command-line': "sendkey ctrl-alt-f1 50",'cpu-index': 0 })
while True:
msg = qemu.get_event()
trace("[EVENT]%s\n" % msg)
if msg.has_key("event") and msg["event"] == "RESET":
break
qemu.close()
return 0
def parse_cmdline(basedir):
parser = optparse.OptionParser("usage: %prog [options] [testname]")
parser.add_option("-b", "--board", dest="board", default="bds",
help="board to use")
parser.add_option("-i", "--image", dest="image",
help="firmware image filename")
parser.add_option("-r", "--rom", dest="romcode",
default=os.path.join(basedir,"util","rom_lm4fs1ge5bb.bin"),
help="ROM code image filename")
parser.add_option("-q", "--qemu", dest="qemu_bin",
default=os.path.join(basedir,"util",QEMU_BINARY),
help="Qemu binary path")
(options, args) = parser.parse_args()
if options.image:
image = options.image
else:
image = os.path.join(basedir,"build",options.board,"ec.bin")
return options.qemu_bin, image,options.romcode, args
if __name__ == '__main__':
basedir = os.path.abspath(os.path.join(os.path.dirname(__file__),".."))
qemu_bin, image, romcode, tests = parse_cmdline(basedir)
if len(tests) > 0:
res = EcTest(qemu_bin, image, romcode, tests[0]).run_test()
else:
res = run_interactive(qemu_bin, image, romcode)
sys.exit(res)