blob: f1cd363aa57bb731019f1a757d469fbc9eeb951e [file] [log] [blame]
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Utility functions to mock SCPI device over TCP.
Example Usage:
MockServerHandler.AddLookup(r'*CLS', None)
SetupLookupTable()
SERVER_PORT = 5025
MockTestServer(('0.0.0.0', SERVER_PORT), MockServerHandler).serve_forever()
"""
import logging
import inspect
import re
import types
import SocketServer
class MockTestServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
allow_reuse_address = True
class MockServerHandler(SocketServer.StreamRequestHandler):
"""A mocking handler for socket.
This handler responses client based on its pre-defined lookup table.
Lookup table is a list of tuple where the first of each tuple is a regular
expression and the second is response. Response could be one of None, string
or function.
Exceptions will be raised if input cannot match any of the regular expression
from keys.
"""
responses_lookup = list()
@classmethod
def ResetLookup(cls):
cls.responses_lookup = list()
@classmethod
def AddLookup(cls, request, response):
"""Add a request-response pair into the lookup table.
Args:
request: A string or a compiled regular expression object.
response: None, a string or a function.
"""
# Check if the response is one of the known types.
is_known_types = False
if (isinstance(response, types.StringType) or
isinstance(response, types.NoneType)):
is_known_types = True
elif inspect.isfunction(response) or inspect.ismethod(response):
is_known_types = True
assert is_known_types, (
'type %r of response is not supported' % type(response))
cls.responses_lookup.append((request, response))
def __init__(self, *args, **kwargs):
self.lookup = list(self.responses_lookup)
SocketServer.StreamRequestHandler.__init__(self, *args, **kwargs)
def handle(self):
regex_type = type(re.compile(''))
while True:
line = self.rfile.readline().rstrip('\r\n')
if not line:
break
for rule, response in self.lookup:
if isinstance(rule, str) and line == rule:
logging.info('Input %r matched.', line)
elif isinstance(rule, regex_type) and rule.match(line):
logging.info('Input %r matched with regexp %s', line, rule.pattern)
else:
continue
if inspect.isfunction(response) or inspect.ismethod(response):
response = response(line)
if isinstance(response, types.StringType):
self.wfile.write(response)
break # Only the first match will be used.
else:
raise ValueError('Input %r is not matching any.' % line)