blob: f3f6620d95770639aca31683dcf76e1e238b7de8 [file] [log] [blame]
# Copyright 2013 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A wrapper for talking with a tty modem."""
import logging
from typing import List
# yapf: disable
import serial # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
from cros.factory.utils import sync_utils
from cros.factory.utils import type_utils
# yapf: enable
_COMMAND_RETRY_TIMES = 5
SUCCESSS_RESPONSE_TOKEN = 'OK'
ERROR_RESPONSE_TOKEN = 'ERROR'
class Modem:
def __init__(self, port, timeout=2,
cancel_echo=False, disable_operation=False):
"""Initiates a modem serial port communication.
Args:
port: the relative port path starts from '/dev/'
timeout: timeout seconds that passed to pyserial
cancel_echo: AT command to suppress the echo
disable_operation: Put modem into a non-operation mode so it will
not throw unexpected messages.
"""
self.ser = serial.Serial(f'/dev/{port}', timeout=timeout)
if cancel_echo:
self.SendCommandWithCheck('ATE0')
if disable_operation:
self.SendCommandWithCheck('AT+CFUN=0')
# Send an AT command and expect 'OK'
self.SendCommandWithCheck('AT')
def ReadLine(self) -> str:
"""Reads a line from the modem.
Raises:
MaxRetryError: If the serial didn't return a not `None` object
after `_COMMAND_RETRY_TIMES` times
"""
# yapf: disable
@sync_utils.RetryDecorator(max_attempt_count=_COMMAND_RETRY_TIMES, # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
timeout_sec=float('inf'),
target_condition=lambda x: x)
def _SerialRead() -> str:
return self.ser.readline()
try:
response = _SerialRead()
except type_utils.MaxRetryError:
logging.error('modem cannot get non-empty response')
raise
response = response.rstrip('\r\n')
logging.info('modem[%s]', response)
return response
def SendLine(self, line):
"""Sends a line to the modem."""
logging.info('modem] %r', line)
self.ser.write(line + '\r')
def SendCommand(self, command):
"""Sends a line to the modem and discards the echo."""
self.SendLine(command)
self.ReadLine()
def SendCommandWithCheck(
self, command: str, retry_times: int = _COMMAND_RETRY_TIMES) -> List[str]:
"""Sends a command to the modem.
SendCommand function allow retry when response is not OK.
Returns:
response: A list contains all success responses from modem.
Raises:
MaxRetryError: If the command cannot get non-empty response after
`_COMMAND_RETRY_TIMES`.
"""
# yapf: disable
@sync_utils.RetryDecorator(max_attempt_count=retry_times, # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
timeout_sec=float('inf'),
target_condition=lambda x: x)
def _SendCommand() -> List[str]:
self.SendLine(command)
response = self._GetFullResponse()
if response[-1] == SUCCESSS_RESPONSE_TOKEN:
return response
return []
return _SendCommand()
def _GetFullResponse(self) -> List[str]:
"""Gets response from modem.
A formal response should be OK or ERROR at the end of response.
Returns:
response: A list of str contains all response from modem.
Raises:
MaxRetryError: If the underlying `ReadLine()` cannot get non-empty
response after `_COMMAND_RETRY_TIMES`.
"""
response = []
while True:
line = self.ReadLine()
response.append(line)
# TODO (henryhsu): The response may have "+CME ERROR: <errno>".
# If we will use ME command in the future, we will need handle this
# error type.
if line in {SUCCESSS_RESPONSE_TOKEN, ERROR_RESPONSE_TOKEN}:
return response
def ExpectLine(self, expected_line):
"""Expects a line from the modem."""
line = self.ReadLine()
if line != expected_line:
raise type_utils.Error(f'Expected {expected_line!r} but got {line!r}')