blob: 0761ccf5f3971c94f41ce8af243b2b1eb857bd66 [file] [log] [blame]
# Copyright 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Implementation for Agilent ENA Series Network Analyzer (E5071C) device.
"""
# TODO(itspeter): write unittest and verify it on a real E5071C
from __future__ import division
import bisect
import itertools
import logging
import urllib
from six.moves import xrange
import factory_common # pylint: disable=unused-import
from cros.factory.test.rf import agilent_scpi
from cros.factory.test.rf import lan_scpi
from cros.factory.utils import type_utils
def CheckTraceValid(x_values, y_values):
"""Checks validity of trace.
Raises an exception if x_values and y_values cannot form a valid trace.
Args:
x_values: A list of X values.
y_values: A list of Y values.
Raises:
ValueError:
(1) x_values is empty.
(2) x_values is not an increasing sequence.
(3) len(x_values) != len(y_values).
"""
if not x_values:
raise ValueError('Parameter x_values is empty')
if len(x_values) != len(y_values):
raise ValueError('Parameter x_values and y_values are not equal in length')
if not all(x <= y for x, y in zip(x_values, x_values[1:])):
raise ValueError('Parameter x_values is not an increasing sequence')
def Interpolate(x_values, y_values, x_position):
"""Interpolates y-values.
Returns an interpolated (linear) y-value at x_position.
This function is especially designed for interpolating values from a
Network Analyzer. It happens in practice that x_values will have
sorted, duplicated values. In addition, y_values may be different for
identical x value. The function behavior under this situation is as follows:
(1) The function finds a right sentinel for interpolating, which is the
smallest index that less of equal to the x_position.
(2) If it is exactly the x_position, returns the y_value.
(3) Otherwise, interpolate values as the left sentinel is just the
one before right sentinel.
Example used in the unittest elaborates more on this.
Args:
x_values: A list of X values.
y_values: A list of Y values.
x_position: The position where we want to interpolate.
Returns:
Interpolated value. For example:
Interpolate([10, 20], [0, 10], 15) returns 5.0
Raises:
ValueError:
(1) x_position is not in the range of x_values.
(2) Arguments failed to pass CheckTraceValid().
"""
CheckTraceValid(x_values, y_values)
# Check if the x_position is inside some interval in the trace
if x_position < x_values[0] or x_position > x_values[-1]:
raise ValueError(
'x_position is not in the current range of x_values[%s,%s]' %
(x_values[0], x_values[-1]))
# Binary search where to interpolate the x_position
right_index = bisect.bisect_left(x_values, x_position)
if x_position == x_values[right_index]:
return y_values[right_index]
# Interpolate the value according to the x_position
delta_interval = ((x_position - x_values[right_index - 1]) /
(x_values[right_index] - x_values[right_index - 1]))
return (y_values[right_index - 1] +
(y_values[right_index] - y_values[right_index - 1]) * delta_interval)
class Traces(object):
def __init__(self):
self.parameters = None
self.x_axis = None
self.traces = {}
def __repr__(self):
"""Returns a representation of the object, including its properties."""
return (self.__class__.__name__ + '(' +
', '.join('%s=%s' % (k, repr(getattr(self, k)))
for k in sorted(self.__dict__.keys())
if k[0] != '_')
+ ')')
def GetFreqResponse(self, freq, parameter):
"""Returns corresponding frequency response.
Returns corresponding frequency response given the parameter.
If the particular frequency was not sampled, uses linear
interpolation to estimate the response.
Args:
freq: The frequency we want to obtain from the traces.
parameter: One of the parameters provided in
ENASCPI.PARAMETERS.
Returns:
A floating point value in dB at freq.
"""
if parameter not in self.traces:
raise lan_scpi.Error('No trace available for parameter %s' % parameter)
return Interpolate(self.x_axis, self.traces[parameter], freq)
class ENASCPI(agilent_scpi.AgilentSCPI):
"""An Agilent ENA (E5071C) device."""
PARAMETERS = type_utils.Enum(['S11', 'S12', 'S21', 'S22'])
def __init__(self, *args, **kwargs):
# The first few commands need some warm up time in real E5071C based
# on experimental result. Pass the timeout arguement so initialization
# will be a success.
kwargs_copy = dict(kwargs)
kwargs_copy.setdefault('timeout', 10)
super(ENASCPI, self).__init__('E5071C', *args, **kwargs_copy)
def SaveScreen(self, filename):
"""Saves the screenshot.
Saves the current screen to a portable network graphics (PNG) file.
The default store path in E5071C is under disk D.
"""
self.Send(':MMEMory:STORe:IMAGe "%s.png"' % filename)
def SetMarker(self, channel, marker_num, marker_freq):
"""Sets the marker at channel.
The marker will only be showed if it is checked on the ENA already.
This function is used to set marker position, not to enable markers.
Example usage:
Set marker 5 to 600MHz on channel 1.
SetMarker(1, 5, 600*1e6)
"""
# TODO(itspeter): understand why channel doesn't make a difference.
# http://ena.tm.agilent.com/e5061b/manuals/webhelp/eng/
# programming/command_reference/calculate/scpi_calculate
# _ch_selected_marker_mk_x.htm#Syntax
#:CALCulate{[1]-4}[:SELected]:MARKer{[1]-10}:X <numeric>
buffer_str = ':CALCulate%d:SELected:MARKer%d:X %f' % (
channel, marker_num, float(marker_freq))
self.Send(buffer_str)
def SetLinearSweep(self, min_freq, max_freq):
"""Sets linear sweep mode.
Sets the mode to be a linear sweep between min_freq and max_freq.
Args:
min_freq: The minimum frequency in Hz.
max_freq: The maximum frequency in Hz.
"""
self.Send([':SENS:SWEep:TYPE LINear',
':SENS:FREQ:STAR %d' % min_freq,
':SENS:FREQ:STOP %d' % max_freq])
def SetSweepSegments(self, segments):
"""Sets a collection of sweep segments.
Args:
segments: An array of 3-tuples. Each tuple is of the
form (min_freq, max_freq, points) as follows:
min_freq: The segment's minimum frequency in Hz.
max_freq: The segment's maximum frequency in Hz.
points: The number of points in the segment.
The frequencies must be monotonically increasing.
"""
# Check that the segments are all 3-tuples and that they are
# in increasing order of frequency.
for i in xrange(len(segments)):
# pylint: disable=unused-variable
min_freq, max_freq, pts = segments[i]
assert max_freq >= min_freq
if i < len(segments) - 1:
assert segments[i + 1][0] >= min_freq
data = [
5, # Magic number from the device documentation
0, # Stop/stop values
0, # No per-segment IF bandwidth setting
0, # No per-segment sweep delay setting
0, # No per-segment sweep mode setting
0, # No per-segment sweep time setting
len(segments), # Number of segments
] + list(sum(segments, ()))
self.Send([':SENS:SWEep:TYPE SEGMent',
(':SENS:SEGMent:DATA %s' %
','.join(str(x) for x in data))])
def GetTraces(self, parameters):
"""Collects a set of traces based on the current sweep.
Returns:
A Traces object containing the following attributes:
x_axis: An array of X-axis values.
traces: A map from each parameter name to an array
of values for that trace.
Example Usage:
ena.set_linear_sweep(700e6, 2200e6)
data = ena.get_traces(['S11', 'S12', 'S22'])
print zip(data.x_axis, data.traces['S11'])
"""
assert parameters
assert len(parameters) <= 4
commands = [':CALC:PAR:COUN %d' % len(parameters)]
for i, p in zip(itertools.count(1), parameters):
commands.append(':CALC:PAR%d:DEF %s' % (i, p))
self.Send(commands)
ret = Traces()
ret.parameters = parameters
ret.x_axis = self.Query(':CALC:SEL:DATA:XAX?', lan_scpi.FLOATS)
ret.traces = {}
# Force the FDATA to be updated immediatedly.
self.Send(':INITiate1:CONTinuous OFF')
self.Send(':INITiate1:IMMediate')
for i, p in zip(itertools.count(1), parameters):
ret.traces[p] = (
self.Query(':CALC:TRACE%d:DATA:FDAT?' % i, lan_scpi.FLOATS)[0::2])
if len(ret.x_axis) != len(ret.traces[p]):
raise lan_scpi.Error('x_axis has %d elements but trace has %d' %
(len(ret.x_axis), len(ret.traces[p])))
CheckTraceValid(ret.x_axis, ret.traces[p])
# Unfreeze the trace.
self.Send(':INITiate1:CONTinuous ON')
return ret
def CheckCalibration(self, min_frequency, max_frequency, sample_points,
min_threshold, max_threshold):
"""Checks if the trace is as flat as expected.
This function uniformly samples "sample_points" from the ENA between
[min_frequency, max_frequency], and checks if the response values on all
these points are between [min_threshold, max_threshold].
Args:
min_frequency: a Frequency instance indicating the minimum frequency.
max_frequency: a Frequency instance indicating the maximum frequency.
sample_points: an int indicating how many points to sample.
min_threshold: a float indicating the lowest threshold.
max_threshold: a float indicating the highest threshold.
Returns:
A tuple. The 1st element is True if the values of all sample points
between [min_frequency, max_frequency] are between [min_threshold,
max_threshold], otherwise, False. The 2nd element is the trace data
obtained by the ENA.
"""
logging.info(
'Checking calibration from %.2f to %.2f with threshold (%f, %f)...',
min_frequency.Hzf(), max_frequency.Hzf(), min_threshold, max_threshold)
self.SetSweepSegments([(
min_frequency.Hzf(), max_frequency.Hzf(), sample_points)])
TRACES_TO_CHECK = ['S11', 'S22']
traces = self.GetTraces(TRACES_TO_CHECK)
calibration_check_passed = True
for trace_name in TRACES_TO_CHECK:
trace_data = traces.traces[trace_name]
for index, freq in enumerate(traces.x_axis):
if not min_threshold <= trace_data[index] <= max_threshold:
# Do not stop, continue to find all failing parts.
logging.info(
'Calibration check failed at %s-%15.2f', trace_name, freq)
calibration_check_passed = False
if calibration_check_passed:
return (True, traces)
else:
return (False, traces)
def CaptureScreenshot(self):
"""Return the screenshot content in PNG format."""
# Save a screenshot copy in ENA.
self.SaveScreen('screenshot')
# The SaveScreen above has saved a screenshot inside ENA, but it does not
# allow reading that file directly (see SCPI protocol for details). To save
# a copy locally, we need to make another screenshot using ENA's HTTP
# service (image.asp) which always puts the file publicly available as
# "disp.png".
urllib.urlopen('http://%s/image.asp' % self.host).read()
return urllib.urlopen('http://%s/disp.png' % self.host).read()