blob: d765b36a029fb06e44b544e65aaca328251af6ea [file] [log] [blame]
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Implementation of SCPI-over-TCP interface wrapper."""
import socket
import time
import graphyte_common # pylint: disable=unused-import
from graphyte import link
from graphyte.default_setting import logger
from graphyte.utils import sync_utils
class SCPILinkError(Exception):
pass
class SCPILink(link.DeviceLink):
"""A target that is connected via SCPI-over-TCP interface.
link_options example:
{
'link_class': 'SCPILink',
'host': '192.168.1.100',
'port': 5025,
}
"""
def __init__(self, host, port=5025, timeout=3, retries=5, initial_delay=1):
"""Connects to a device using SCPI-over-TCP.
Parameters:
host: Host to connect to.
port: Port to connect to.
timeout: Timeout in seconds. (Uses the ALRM signal.)
retries: Maximum attempts to connect to the host.
initial_delay: Delay in seconds before issuing the first command.
"""
self.timeout = timeout
self.initial_delay = initial_delay
self.host = host
self.port = port
self.retries = retries;
self.rfile = None
self.wfile = None
self.socket = None
self.id = None
def Open(self):
if not sync_utils.Retry(self.retries, interval=1, target=self._Connect):
raise SCPILinkError('Failed to connect %s:%d after %d tries' %
(self.host, self.port, self.retries))
def Push(self, local, remote):
raise NotImplementedError
def Pull(self, remote, local=None):
raise NotImplementedError
def Shell(self, command, stdin=None, stdout=None, stderr=None):
raise NotImplementedError
def IsReady(self):
try:
info = self.CallOutput('*IDN?')
return info != ''
except ValueError:
return False
# pylint: disable=arguments-differ
def Call(self, command, stdin=None, stdout=None, stderr=None):
"""Sends command with SCPI interface.
Write the command to the device. If stdout is not None, then read the result
back and write into stdout. Since it is not a real linux shell, other
standard streams are not supported.
Args:
command: A string or a list, to be written to device.
Returns:
The return code.
"""
if (stdin, stderr) != (None, None):
raise NotImplementedError(
'Redirecting standard input and error are not supported.')
if isinstance(command, list):
command = ' '.join([str(arg) for arg in command])
try:
if self._IsQueryCommand(command):
output = self._Query(command)
if stdout:
stdout.write(output.encode('utf-8'))
else:
self._Send(command)
return 0
except SCPILinkError:
logger.exception('Call command %s error', command)
return 1
def _Connect(self):
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
with sync_utils.Timeout(self.timeout):
logger.debug('Connecting to %s:%d...', self.host, self.port)
self.socket.connect((self.host, self.port))
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.rfile = self.socket.makefile('rb', -1) # Default buffering
self.wfile = self.socket.makefile('wb', 0) # No buffering
logger.info('Connected')
# Give equipment time to warm up if required so.
time.sleep(self.initial_delay)
self.id = self._Query('*IDN?')
except Exception:
logger.exception('Unable to connect to %s:%d.', self.host, self.port)
self.Close()
return False
return True
def Close(self):
logger.info('Close the SCPI socket.')
if self.rfile:
self.rfile.close()
self.rfile = None
if self.wfile:
self.wfile.close()
self.wfile = None
if self.socket:
self.socket.close()
self.socket = None
def _Send(self, command):
"""Sends a command to the device.
Args:
command: A SCPI command to send to the device (string).
"""
self._WriteLine(command)
def _Query(self, command):
"""Issues a query, returning the result.
Args:
command: A SCPI command to send to the device (string).
"""
self._WriteLine(command)
output = self._ReadLine()
return output
def _ReadLine(self):
"""Reads a single line, timing out in self.timeout seconds."""
with sync_utils.Timeout(self.timeout):
if not self.timeout:
logger.debug('Waiting for reading...')
ret = self.rfile.readline().decode('utf-8').rstrip('\n')
logger.debug('Read: %s', ret)
return ret
def _WriteLine(self, command):
"""Writes a single line."""
if '\n' in command:
raise SCPILinkError('Newline in command: %r' % command)
logger.debug('Write: %s', command)
self.wfile.write((command + '\n').encode('utf-8'))
def _IsQueryCommand(self, command):
return '?' in command