blob: 4a202b10fd289c7f8984901775a2f131b3e9ec62 [file] [log] [blame]
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A wrapper for talking with a tty modem."""
import re
import serial
import logging
import factory_common # pylint: disable=W0611
from cros.factory.utils.type_utils import Error
_COMMAND_RETRY_TIMES = 5
_RECEIVE_RETRY_TIMES = 10
class Modem(object):
def __init__(self, port, timeout=2,
cancel_echo=False, disable_operation=False):
"""Initiates a modem serial port communication.
Args:
port: the relative port path starts from '/dev/'
timeout: timeout seconds that passed to pyserial
cancel_echo: AT command to suppress the echo
disable_operation: Put modem into a non-operation mode so it will
not throw unexpected messages.
"""
self.ser = serial.Serial('/dev/%s' % port, timeout=timeout)
if cancel_echo:
self.SendCommandWithCheck('ATE0')
if disable_operation:
self.SendCommandWithCheck('AT+CFUN=0')
# Send an AT command and expect 'OK'
self.SendCommandWithCheck('AT')
def ReadLine(self):
"""Reads a line from the modem."""
line = self.ser.readline()
logging.info('modem[ %r', line)
return line.rstrip('\r\n')
def SendLine(self, line):
"""Sends a line to the modem."""
logging.info('modem] %r', line)
self.ser.write(line + '\r')
def SendCommand(self, command):
"""Sends a line to the modem and discards the echo."""
self.SendLine(command)
self.ReadLine()
def SendCommandWithCheck(self, command, retry_times=_COMMAND_RETRY_TIMES):
"""Sends a command to the modem.
SendCommand function allow retry when response is not OK.
Returns:
response: A list contains all success responses from modem.
"""
retries = 0
while retries < retry_times:
self.SendLine(command)
response = self.GetResponse()
if response[-1] == 'OK':
break
else:
retries += 1
return response
def GetResponse(self, retry_times=_RECEIVE_RETRY_TIMES):
"""Gets response from modem.
A formal response should be OK or ERROR at the end of response.
Returns:
response: A list contains all response from modem.
Raises:
Error when getting response exceeds time limit
(serial timeout * retry_times).
"""
response = []
retries = 0
while retries < retry_times:
line = self.ReadLine()
if len(line):
response.append(line)
# TODO (henryhsu): The response may have "+CME ERROR: <errno>".
# If we will use ME command in the future, we will need handle this
# error type.
if line in ['OK', 'ERROR']:
return response
else:
retries += 1
raise Error('Cannot get entire response: %r' % (response))
def ExpectResponse(self, expected_msg, modem_response):
"""Checks expected messages from modem.
Args:
expected_msg: expected messages can be list or string.
modem_response: A list contains all responses from modem.
Raises:
Error when results mismatch.
"""
if isinstance(expected_msg, str):
expected_msg = [expected_msg]
for msg in expected_msg:
if msg not in modem_response:
raise Error('Expected %r but got %r' % (expected_msg, modem_response))
def ExpectLine(self, expected_line):
'''Expects a line from the modem.'''
line = self.ReadLine()
if line != expected_line:
raise Error('Expected %r but got %r' % (expected_line, line))
def ExpectMultipleLines(self, expected_regex):
'''Expects a multiple line regular expression.'''
lines = self.ser.readlines()
for line in lines:
logging.info('modem[ %r', line)
if not re.search(expected_regex, ''.join(lines), re.MULTILINE | re.DOTALL):
raise Error('Expected %r but got %r' % (expected_regex, lines))