blob: 984bc7bca5296b8468dde94d0621a9abcec5d089 [file] [log] [blame]
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
r"""Utility functions to mock SCPI device over TCP.
Example Usage:
MockServerHandler.AddLookup(r'\*CLS', None)
SetupLookupTable()
SERVER_PORT = 5025
MockTestServer(('0.0.0.0', SERVER_PORT), MockServerHandler).serve_forever()
"""
import logging
import inspect
import re
import types
import SocketServer
# pylint: disable=W0232
class MockTestServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
allow_reuse_address = True
class MockServerHandler(SocketServer.StreamRequestHandler):
"""A mocking handler for socket.
This handler responses client based on its pre-defined lookup table.
Lookup table is a list of tuple where the first of each tuple is a regular
expression and the second is response. Response could be one of None, string
or function.
Exceptions will be raised if input cannot match any of the regular expression
from keys.
"""
responses_lookup = list()
@classmethod
def AddLookup(cls, input_line, response):
# Check if the response is one of the known types.
is_known_types = False
if (isinstance(response, types.StringType) or
isinstance(response, types.NoneType)):
is_known_types = True
elif (inspect.isfunction(response) or inspect.ismethod(response)):
is_known_types = True
assert is_known_types, (
'type %r of response is not supported' % type(response))
cls.responses_lookup.append((input_line, response))
def __init__(self, *args, **kwargs):
self.lookup = list(self.responses_lookup)
SocketServer.StreamRequestHandler.__init__(self, *args, **kwargs)
def handle(self):
while True:
line = self.rfile.readline().rstrip('\r\n')
if not line:
break
matched = False
for regexp, response in self.lookup:
if not re.search(regexp, line):
continue
matched = True
logging.info('Input %r matched with regexp %r', line, regexp)
if (inspect.isfunction(response) or inspect.ismethod(response)):
response = response(line)
if isinstance(response, types.StringType):
self.wfile.write(response)
elif isinstance(response, types.NoneType):
pass
# Only the first match will be used.
break
if not matched:
raise ValueError('Input %r is not matching any.' % line)