blob: cacae69458e8f1f23e747095193fa8b341521054 [file] [log] [blame]
# Copyright 2018 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Servod power measurement module."""
import logging
import os
import re
import threading
import time
from servo import client
from servo.utils import stats_manager
from servo.utils import timelined_stats_manager
SAMPLE_TIME_KEY = "Sample_msecs"
# Default sample rate to query ec for battery power consumption
DEFAULT_VBAT_RATE = 60
# Default sample rate to query accumulator ADCs. Since these support
# accumulation and averaging, they can be queried less frequently.
# b/238674542: Make accum rate more aggressive to secure at least one sample
# for an 1-minute measurement cycle.
DEFAULT_ADC_ACCUM_RATE = 30
# Default sample rate to query ADCs for power consumption
DEFAULT_ADC_RATE = 1
# Powerstate name used when no powerstate is known. The 'default' alias is
# added to make it clear on modules that use this as a library e.g. dut_power,
# that this is the 'default'.
DEFAULT_POWERSTATE = UNKNOWN_POWERSTATE = "S?"
# List of known power states
POWERSTATES = ["S0", "S0ix", "S3", "S5", "G3"]
# Board name to use when no board name provided, or querying it fails
# First attempt to read it from the environment before defaulting to unknown
DEFAULT_BOARD = os.environ.get("BOARD", "unknown")
class PowerTrackerError(Exception):
"""Error class to invoke on PowerTracker errors."""
class NoSourceError(PowerTrackerError):
"""Specific error when no power data source was setup successfully."""
class ServodPowerTracker(threading.Thread):
"""Threaded PowerTracker using servod as power number source.
This PowerTracker uses servod to sample all |_ctrls| at |_rate|.
Attributes:
title: human-readable title of the PowerTracker
"""
# Note: the only suffixes used for us are '[avg_]rail_name[_avg]_mw'
# Refresher when reading this
# - ?: indicates we don't care to retrieve those match groups
# - ?P<rail>: ensures we can access the group by name
# - +?: ensures we do a non-greedy match so that we don't match 'avg' in the
# rail group
RAIL_RE = re.compile(r"(?:avg_)?(?P<rail>[\w._]+?)(?:_avg)?_mw")
def __init__(
self,
host,
port,
stop_signal,
ctrls,
sample_rate,
tag="",
title="unnamed",
suffix="mw",
):
"""Initialize ServodPowerTracker by making servod proxy & storing ctrls.
Args:
host: servod host name
port: servod port number
stop_signal: Event object to flag when to stop measuring power
ctrls: list of servod ctrls to collect power numbers
sample_rate: rate for collecting samples for |ctrls|
tag: string to prepend to summary & raw rail file names
title: human-readable title of the PowerTracker
suffix: what metric is being measured (mw, ma, mv)
"""
super(ServodPowerTracker, self).__init__()
self._sclient = client.ServoClient(host=host, port=port)
self._stop_signal = stop_signal
self._ctrls = ctrls
self._rate = sample_rate
if not title.endswith(suffix):
title = "%s (%s)" % (title, suffix)
self.title = title
self._stats = timelined_stats_manager.TimelinedStatsManager(
smid=tag, title=title, rate=sample_rate
)
self._logger = logging.getLogger(type(self).__name__)
# Flag to indicate whether to skip the first reading. This is used
# for trackers that run averaging, and the first reading is used to reset
# the counters.
self._skip_first = False
self.daemon = True
self._sample_data = []
self._previous_sample_data = []
@property
def empty(self):
"""Whether the tracker has any controls."""
return len(self._ctrls) == 0
def _rail_name(self, ctrl_name):
"""Strip suffix from rail to return core rail name.
Args:
ctrl_name: str, servod control name for a rail's data e.g. pp3300_h1_mw
"""
return self.RAIL_RE.match(ctrl_name).group("rail")
@property
def rails(self):
"""Return all rail-names that this tracker uses."""
return [self._rail_name(c) for c in self._ctrls]
def remove_rail(self, rail):
"""Remove the servod control that would query |rail|.
Note: if |rail| is not in this tracker, this is effectively a noop
Args:
rail: str, rail name to remove e.g. pp3300_h1
"""
# Remove by just rebuilding the list of ctrls.
self._ctrls = [c for c in self._ctrls if self._rail_name(c) != rail]
def prepare(self, fast=False, powerstate=UNKNOWN_POWERSTATE):
"""Do any setup work right before number collection begins.
Args:
fast: flag to indicate if pre-run work should be "fast" (e.g. no UART)
powerstate: powerstate to allow for conditional preps based on powerstate
"""
def verify(self):
"""Verify by trying to query all ctrls once.
Raises:
PowerTrackerError: if verification failed
"""
try:
self._sclient.set_get_all(self._ctrls)
except client.ServoClientError:
msg = "Failed to test servod commands. Tested: %s" % str(self._ctrls)
raise PowerTrackerError(msg)
def run(self):
"""run power collection thread by querying all |_ctrls| at |_rate| rate.
Query all |_ctrls| and take timestamp once at the start of |_rate| interval.
"""
skip = self._skip_first
while not self._stop_signal.is_set():
if not skip:
temp_sample_data = []
sample_tuples, duration_ms = self._sample_ctrls(self._ctrls)
for domain, sample in sample_tuples:
# temp_stats.AddSample(domain, sample)
temp_sample_data.append((domain, sample))
self._stats.AddSamples(sample_tuples)
self.set_sample_data(temp_sample_data)
else:
duration_ms = 0
self._logger.debug("ready bit not set, skipping sample round")
self._stop_signal.wait(max(self._rate - (duration_ms / 1000), 0))
# We only skip the first reading if requested, and no others.
skip = False
def set_sample_data(self, sample_data):
"""Set the value to _sample_data"""
self._sample_data = sample_data
def get_sample_data(self):
"""Passed the collected data which will use for the visualization
If the _sample_data is empty, it means that we have not got the new data yet,
reuse the previous sample data,
otherwise, pass the new sample data and update previous sample data
Returns:
The return object contains the power information
"""
if not self._sample_data:
return self._previous_sample_data
self._previous_sample_data = self._sample_data
return self._sample_data
def clean_sample_data(self):
"""Clean the _sample_data containers when the data has been passed"""
self._sample_data = []
def _sample_ctrls(self, ctrls):
"""Helper to query all servod ctrls, and create (name, value) tuples.
Args:
ctrls: list of servod ctrls to sample
Returns:
tuple (sample_tuples, duration_ms)
sample_tuples: a list of (rail-name, value) tuples
value is a power reading on success, NaN on failure
duration_ms: time it took to collect the sample, in milliseconds
"""
start = time.time()
rails = [self._rail_name(c) for c in ctrls]
try:
samples = self._sclient.set_get_all(ctrls)
except client.ServoClientError:
self._logger.warning(
"Attempt to get commands: %s failed. Recording them all as NaN.",
", ".join(ctrls),
)
samples = [float("nan")] * len(ctrls)
duration_ms = (time.time() - start) * 1000
sample_tuples = list(zip(rails, samples))
sample_tuples.append((SAMPLE_TIME_KEY, duration_ms))
return (sample_tuples, duration_ms)
def process_measurement(self, tstart=None, tend=None):
"""Process the measurement by calculating stats.
Args:
tstart: first timestamp to include. Seconds since epoch
tend: last timestamp to include. Seconds since epoch
Returns:
StatsManager object containing info from the run
"""
self._stats.TrimSamples(tstart, tend)
self._stats.CalculateStats()
return self._stats
def __str__(self):
"""Helper to print out the tracker name."""
return self.title
class HighResServodPowerTracker(ServodPowerTracker):
"""High Resolution implementation of ServodPowerTracker.
The difference here is that while ServodPowerTracker sleeps if it finishes
before |_rate| is up, HighResServodPowerTracker tries to collect as many
samples as it can during |_rate| before recording the mean of those samples
as one data point.
"""
def run(self):
"""run power collection thread.
Query all |_ctrls| as much as possible during |_rate| interval before
reporting the mean of those samples as one row of data. Timestamps are
computed based on the last measurement added.
"""
start_time = time.time()
temp_stats = None
last_row = 0
while not self._stop_signal.is_set():
# Discarding the duration_ms since the difference between the current
# time and the start time are being used.
sample_tuples, _unused = self._sample_ctrls(self._ctrls)
temp_stats = stats_manager.StatsManager()
temp_sample_data = []
for domain, sample in sample_tuples:
temp_stats.AddSample(domain, sample)
temp_sample_data.append((domain, sample))
self.set_sample_data(temp_sample_data)
# If the last timestamp would have been in a new row on the table
# log the current set of measurements as the next row.
current_row = int((time.time() - start_time) / self._rate)
if last_row != current_row:
last_row = current_row
self._record_mean_samples(temp_stats)
temp_stats = None
# Record the last row of data
if temp_stats is not None:
self._record_mean_samples(temp_stats)
def _record_mean_samples(self, temp_stats):
"""Converts a StatsManager object into a row of averaged measurements.
Args:
temp_stats: StatsManager object which may contain a collection
of rail measurements.
"""
temp_stats.CalculateStats()
temp_summary = temp_stats.GetSummary()
samples = [
(measurement, summary["mean"])
for measurement, summary in temp_summary.items()
]
self._stats.AddSamples(samples)
def process_measurement(self, tstart=None, tend=None):
"""Process the measurement by calculating stats.
Args:
tstart: first timestamp to include. Seconds since epoch
tend: last timestamp to include. Seconds since epoch
Returns:
StatsManager object containing info from the run
"""
# Each data point is the mean of all samples taken during |_rate| interval,
# and the timestamp of the data point is taken at the end of the |_rate|
# interval. To ensure that we keep all the data points with at least half of
# their samples within [tstart, tend], add padding = |_rate| / 2.
# tstart: add the padding to discard data points with less than half samples
# within [tstart, tend].
# tend: add the padding to avoid losing data points that have at least half
# of samples within [tstart, tend].
self._stats.TrimSamples(tstart, tend, self._rate / 2)
self._stats.CalculateStats()
return self._stats
class OnboardADCPowerTracker(HighResServodPowerTracker):
"""Off-the-shelf PowerTracker to measure onboard ADCs through servod."""
def __init__(self, host, port, stop_signal, cfilter, sample_rate=DEFAULT_ADC_RATE):
"""Init by finding onboard ADC ctrls."""
super(OnboardADCPowerTracker, self).__init__(
host=host,
port=port,
stop_signal=stop_signal,
ctrls=[],
sample_rate=sample_rate,
tag="onboard",
title="Onboard ADC",
)
self._ctrls = cfilter(self._sclient.get("power_rails"))
if not self._ctrls:
raise PowerTrackerError("No onboard ADCs found.")
self._logger.debug(
"Following power rail commands found: %s", ", ".join(self._ctrls)
)
self._ez_cfg_ctrls = cfilter(self._sclient.get("adc_ez_config_ctrls"))
def prepare(self, fast=False, powerstate=UNKNOWN_POWERSTATE):
"""prepare onboard ADC measurement by configuring ADCs for powerstate."""
cfg_ctrls = ["%s:on" % cfg_cmd for cfg_cmd in self._ez_cfg_ctrls]
try:
self._sclient.set_get_all(cfg_ctrls)
except client.ServoClientError:
self._logger.warning("Power rail configuration failed. Details in DEBUG.")
self._logger.debug("Controls issued: %s", " ".join(cfg_ctrls))
class OnboardADCAccumPowerTracker(ServodPowerTracker):
"""Off-the-shelf PowerTracker to measure onboard ADCs with accumulator."""
def __init__(
self, host, port, stop_signal, cfilter, sample_rate=DEFAULT_ADC_ACCUM_RATE
):
"""Init by finding onboard ADC accum ctrls."""
title = "Onboard ADC (w/ accum)"
super(OnboardADCAccumPowerTracker, self).__init__(
host=host,
port=port,
stop_signal=stop_signal,
ctrls=[],
sample_rate=sample_rate,
tag="onboard.accum",
title=title,
)
self._ctrls = cfilter(self._sclient.get("avg_power_rails"))
self._clear_ctrls = cfilter(self._sclient.get("accum_clear_ctrls"))
if not self._ctrls or not self._clear_ctrls:
raise PowerTrackerError("No support for accum rails detected.")
self._logger.debug(
"Following avg power rail commands found: %s", ", ".join(self._ctrls)
)
self._ez_cfg_ctrls = cfilter(self._sclient.get("adc_ez_config_ctrls"))
# Pre-process the accumulator clearing controls so they can be issued
# at once.
self._clear_ctrls = ["%s:yes" % c for c in self._clear_ctrls]
# The first reading on these has stale, old data. It needs to ignore the
# first reading, and only start at the second reading.
self._skip_first = True
# Filter out the right controls
def prepare(self, fast=False, powerstate=UNKNOWN_POWERSTATE):
"""prepare onboard ADC measurement by configuring ADCs for powerstate."""
# Note: |ez_config| on accumulator supporting ADCs automatically
# clears the accumulators.
cfg_ctrls = ["%s:on" % cfg_cmd for cfg_cmd in self._ez_cfg_ctrls]
try:
self._sclient.set_get_all(cfg_ctrls)
except client.ServoClientError:
self._logger.warning("Power rail configuration failed. Details in DEBUG.")
self._logger.debug("Controls issued: %s", " ".join(cfg_ctrls))
def _clear_accum(self):
"""Issue controls to clear all the accumulators on the ADCs."""
self._sclient.set_get_all(self._clear_ctrls)
def _sample_ctrls(self, ctrls):
"""Overwrite the base implementation to clear accumulator after reading."""
ret = super(OnboardADCAccumPowerTracker, self)._sample_ctrls(ctrls)
self._clear_accum()
return ret
class ECPowerTracker(ServodPowerTracker):
"""Off-the-shelf PowerTracker to measure power-draw as seen by the EC."""
def __init__(self, host, port, stop_signal, cfilter, sample_rate=DEFAULT_VBAT_RATE):
"""Init EC power measurement by setting up ec 'vbat' servod control."""
self._ec_cmd = "ppvar_vbat_mw"
self._avg_ec_cmd = "avg_ppvar_vbat_mw"
self._cfilter = cfilter
super(ECPowerTracker, self).__init__(
host=host,
port=port,
stop_signal=stop_signal,
ctrls=[self._ec_cmd],
sample_rate=sample_rate,
tag="ec",
title="EC",
)
def verify(self):
"""ECPowerTracker verify that also checks if avg_ppvar is available."""
# First verify the normal ctrl.
super(ECPowerTracker, self).verify()
# Then get ambitious and check if the newer avg_ppvar_vbat_mw is also
# available.
self._ctrls = self._cfilter([self._avg_ec_cmd])
try:
super(ECPowerTracker, self).verify()
# This means that avg_ppvar_vbat_mw worked fine.
except PowerTrackerError as e:
# This means that avg_ppvar_vbat_mw is not supported.
# Revert back to ppvar_vbat_mw for main ctrls.
self._logger.info(str(e))
self._logger.info(
"%s not supported, using %r instead.", self._avg_ec_cmd, self._ec_cmd
)
self._ctrls = self._cfilter([self._ec_cmd])
def prepare(self, fast=False, powerstate=UNKNOWN_POWERSTATE):
"""Reduce the time needed to enter deep-sleep after console interaction."""
# Do not check for failure or anything, as its a nice-to-have and increases
# accuracy, but does not impact functionality. Dsleep might also not be
# available on some EC images.
self._sclient.set("ec_uart_cmd", "dsleep 2")
def run(self):
"""EC vbat 'run' to ensure the first reading does not use averaging."""
sample_tuples, duration_ms = self._sample_ctrls([self._ec_cmd])
# Rewrite the name to be whatever actual control is being used: avg &
# regular. This is required so that the output is not split into two
# domains that are really the same: regular & avg.
adjusted_sample_tuples = []
temp_sample_data = []
for name, sample in sample_tuples:
if name == self._ec_cmd:
name = self._ctrls[0]
adjusted_sample_tuples.append((name, sample))
temp_sample_data.append((name, sample))
self.set_sample_data(temp_sample_data)
self._stats.AddSamples(adjusted_sample_tuples)
self._stop_signal.wait(max(self._rate - (duration_ms / 1000), 0))
super(ECPowerTracker, self).run()
class RegexFilter:
"""Filter out control names based on regex."""
def __init__(self, rgx_to_keep, rgx_to_remove):
""""""
self._logger = logging.getLogger(type(self).__name__)
self.rgx_to_keep = rgx_to_keep
self.rgx_to_remove = rgx_to_remove
def __call__(self, control_names):
"""Filter out |control_names|
Args:
control_names: a list of control names
Note: rgx filters
The caller should really only specify one of them, but if both are
specified, only first the |rgx_to_remove| are removed, and then
only those matchin the |rgx_to_keep| are kept
Returns:
filtered controls: a list (potentially empty) after filtering
"""
# Only use ctrls from the main device for now
controls = [ctrl for ctrl in control_names if "." not in ctrl]
if self.rgx_to_remove is not None:
controls = []
for c in control_names:
if re.match(self.rgx_to_remove, c):
self._logger.info("Filtering out control %r", c)
else:
controls.append(c)
if self.rgx_to_keep is not None:
controls = []
for c in control_names:
if not re.match(self.rgx_to_keep, c):
self._logger.info("Filtering out control %r", c)
else:
controls.append(c)
return controls
class PowerMeasurementError(Exception):
"""Error class to invoke on PowerMeasurement errors."""
class PowerMeasurement:
"""Class to perform power measurements using servod.
PowerMeasurement allows the user to perform synchronous and asynchronous
power measurements using servod.
The class performs discovery, configuration, and sampling of power commands
exposed by servod, and allows for configuration of:
- rates to measure
- how to store the data
Attributes:
_sclient: servod proxy
_board: name of board attached to servo
_stats: collection of power measurement stats after run completes
_outdir: default outdir to save data to
After the measurement a new directory is generated
_processing_done: bool flag indicating if measurement data is processed
_setup_done: Event object to indicate power collection is about to start
_stop_signal: Event object to indicate that power collection should stop
_power_trackers: list of PowerTracker objects setup for measurement
_fast: if True measurements will skip explicit powerstate retrieval
_logger: PowerMeasurement logger
Note: PowerMeasurement garbage collection, or any call to Reset(), will
result in an attempt to clean up directories that were created and
left empty.
"""
DEFAULT_OUTDIR_BASE = os.path.join(
os.getenv("TMPDIR", "/tmp"), "power_measurements/"
)
PREMATURE_RETRIEVAL_MSG = (
"Cannot retrieve information before data collection has finished."
)
def __init__(
self,
host,
port,
adc_rate=DEFAULT_ADC_RATE,
adc_accum_rate=DEFAULT_ADC_ACCUM_RATE,
vbat_rate=DEFAULT_VBAT_RATE,
fast=False,
board=DEFAULT_BOARD,
rgx_to_keep=None,
rgx_to_remove=None,
):
"""Init PowerMeasurement class by attempting to create PowerTrackers.
Args:
host: host to reach servod instance
port: port on host to reach servod instance
adc_rate: sample rate for servod ADC controls
adc_accum_rate: sample rate for servod ADC accumulator/avg power controls
vbat_rate: sample rate for servod ec vbat command
fast: if true, no servod control verification is done before measuring
power, nor the powerstate queried from the EC
board: board name to use. If this is not provided, then an attempt
is made to query it from the EC
rgx_to_keep: regex, only keep rails matching this regex
rgx_to_remove: regex, remove all rails that match this regex
Note: rgx filters - read comment on RegexFilter class
Raises:
PowerMeasurementError: if no PowerTracker setup successful
PowerMeasurementError: ADCs cannot be configured correctly
"""
self._fast = fast
self._logger = logging.getLogger(type(self).__name__)
self._outdir = None
self._sclient = client.ServoClient(host=host, port=port)
self._board = board
if not fast and self._board == DEFAULT_BOARD:
try:
board = self._sclient.get("ec_board")
if board != "not_applicable":
self._board = board
except client.ServoClientError:
self._logger.warning(
"Failed to get ec_board, setting to %r.", self._board
)
self._processing_done = False
self._setup_done = threading.Event()
self._stop_signal = threading.Event()
self._power_trackers = []
self._stats = {}
self._sample_data = []
power_trackers = []
# build out the filter
cfilter = RegexFilter(rgx_to_keep, rgx_to_remove)
adc_tracker = adc_accum_tracker = ec_tracker = None
# Setup ADCs on the servo device.
self._sclient.set("servo_adcs_enabled", "on")
if self._sclient.get("servo_adcs_enabled") != "on":
raise PowerMeasurementError("ADCs setup failed.")
if adc_rate > 0:
try:
adc_tracker = OnboardADCPowerTracker(
host, port, self._stop_signal, cfilter, adc_rate
)
except PowerTrackerError:
self._logger.warning("Onboard ADC tracker setup failed.")
if adc_accum_rate > 0:
try:
adc_accum_tracker = OnboardADCAccumPowerTracker(
host, port, self._stop_signal, cfilter, adc_accum_rate
)
except PowerTrackerError:
self._logger.debug(
"Onboard ADC accumulators not supported, or setup failed."
)
if vbat_rate > 0:
try:
ec_tracker = ECPowerTracker(
host, port, self._stop_signal, cfilter, vbat_rate
)
except PowerTrackerError:
self._logger.warning("EC Power tracker setup failed.")
# if an ADC supports accumulator controls, it also supports regular
# controls. In those cases, we do not need two sources of the same data.
# Therefore, we remove all controls from the |adc_tracker| that are also
# present in the |adc_accum_tracker|. If after that, the |adc_tracker|
# is empty, we skip it entirely.
if adc_tracker is not None and adc_accum_tracker is not None:
for rail in adc_accum_tracker.rails:
adc_tracker.remove_rail(rail)
if adc_tracker.empty:
self._logger.info(
"ADC tracker has no controls that are not already "
"covered by the ADC accum tracker. Removing."
)
# After preprocessing is done, append the trackers.
for tracker in [adc_tracker, adc_accum_tracker, ec_tracker]:
if tracker is not None:
if not tracker.empty:
power_trackers.append(tracker)
else:
self._logger.info(
"Will not be using %r tracker (nothing to track)", tracker
)
self.Reset()
for tracker in power_trackers:
if not self._fast:
try:
tracker.verify()
except PowerTrackerError:
self._logger.warning(
"Tracker %s failed verification. Not using it.", tracker.title
)
continue
self._power_trackers.append(tracker)
if not self._power_trackers:
raise NoSourceError("No power measurement source successfully setup.")
def Reset(self):
"""Reset PowerMeasurement object to reuse for a new measurement.
The same PowerMeasurement object can be used for multiple power
collection runs on the same servod instance by calling Reset() on
it. This will wipe the previous run's data to allow for a fresh
reading.
Use this when running multiple tests back to back to simplify the code
and avoid recreating the same PowerMeasurement object again.
"""
self._stats = {}
self._setup_done.clear()
self._stop_signal.clear()
self._processing_done = False
def MeasureTimedPower(self, sample_time=60, wait=0, powerstate=UNKNOWN_POWERSTATE):
"""Measure power in the main thread.
Measure power for |sample_time| seconds before processing the results and
returning to the caller. After this method returns, retrieving measurement
results is safe.
Args:
sample_time: seconds to measure power for
wait: seconds to wait before collecting power
powerstate: (optional) pass the powerstate if known
"""
setup_done = self.MeasurePower(wait=wait, powerstate=powerstate)
setup_done.wait()
time.sleep(sample_time + wait)
self.FinishMeasurement()
def MeasurePower(self, wait=0, powerstate=UNKNOWN_POWERSTATE):
"""Measure power in the background until caller indicates to stop.
Spins up a background measurement thread and then returns events to manage
the power measurement time.
This should be used when the main thread needs to do work
(like an autotest) while power measurements are going on.
Args:
wait: seconds to wait before collecting power
powerstate: (optional) pass the powerstate if known
Returns:
Event - |setup_done|
Caller can wait on |setup_done| to know when setup for measurement is done
"""
self.Reset()
measure_t = threading.Thread(
target=self._MeasurePower, kwargs={"wait": wait, "powerstate": powerstate}
)
measure_t.daemon = True
measure_t.start()
return self._setup_done
def _MeasurePower(self, wait, powerstate=UNKNOWN_POWERSTATE):
"""Power measurement thread method coordinating sampling threads.
Args:
wait: seconds to wait before collecting power
powerstate: (optional) pass the powerstate if known
"""
if not self._fast and powerstate == UNKNOWN_POWERSTATE:
try:
ecpowerstate = self._sclient.get("ec_system_powerstate")
if ecpowerstate != "not_applicable":
# Skip setting the power_state if the ec control does not
# provide real information.
powerstate = ecpowerstate
except client.ServoClientError:
self._logger.warning("Failed to get powerstate from EC.")
for power_tracker in self._power_trackers:
power_tracker.prepare(self._fast, powerstate)
ts = time.strftime("%Y%m%d-%H%M%S", time.localtime(time.time()))
self._outdir = os.path.join(
self.DEFAULT_OUTDIR_BASE, self._board, "%s_%s" % (powerstate, ts)
)
# Signal that setting the measurement is complete
self._setup_done.set()
# Wait on the stop signal for |wait| seconds. Preemptible.
self._stop_signal.wait(wait)
if not self._stop_signal.is_set():
for power_tracker in self._power_trackers:
power_tracker.start()
def FinishMeasurement(self):
"""Signal to stop collection to Trackers before joining their threads."""
self._stop_signal.set()
for tracker in self._power_trackers:
if tracker.is_alive():
tracker.join()
def GetPMStatus(self):
"""Pass the information if the power measurement is finished or not
Returns:
True: power measurement is finished
False: power measurement is still working
"""
return self._stop_signal.is_set()
def ProcessMeasurement(self, tstart=None, tend=None):
"""Trim data to [tstart, tend] before calculating stats.
Call FinishMeasurement internally to ensure that data collection is fully
wrapped up.
Args:
tstart: first timestamp to include. Seconds since epoch
tend: last timestamp to include. Seconds since epoch
"""
# In case the caller did not explicitly call FinishMeasurement yet.
self.FinishMeasurement()
try:
for tracker in self._power_trackers:
self._stats[tracker.title] = tracker.process_measurement(tstart, tend)
finally:
self._processing_done = True
def SaveRawData(self, outdir=None):
"""Save raw data of the PowerMeasurement run.
Files can be read into object by using numpy.loadtxt()
Args:
outdir: output directory to use instead of autogenerated one
Returns:
List of pathnames, where each path has the raw data for a rail on
this run
Raises:
PowerMeasurementError: if called before measurement processing is done
"""
if not self._processing_done:
raise PowerMeasurementError(self.PREMATURE_RETRIEVAL_MSG)
outdir = outdir if outdir else self._outdir
outfiles = []
for stat in self._stats.values():
outfiles.extend(stat.SaveRawData(outdir))
self._logger.info("Storing raw data at:\n%s", "\n".join(outfiles))
return outfiles
def GetRawData(self):
"""Retrieve raw data for current run.
Retrieve a dictionary of each StatsManager object this run used, where
each entry is a dictionary of the rail to raw values.
Returns:
A dictionary of the form:
{ 'EC' : { 'time' : [0.01, 0.02 ... ],
'timeline' : [0.0, 0.01 ...],
'Sample_msecs' : [0.4, 0.2 ...],
'ec_ppvar_vbat_mw' : [52.23, 87.23 ... ]}
'Onboard ADC' : ... }
Possible keys are: 'EC', 'Onboard ADC'
Raises:
PowerMeasurementError: if called before measurement processing is done
"""
if not self._processing_done:
raise PowerMeasurementError(self.PREMATURE_RETRIEVAL_MSG)
return {name: stat.GetRawData() for name, stat in self._stats.items()}
def SaveTrimmedSummary(self, tag, tstart, tend, outdir=None, message=None):
"""Save a formatted summary that only contains [tstart, tend] data.
Args:
tag: string, identifier to tag this subset of data
tstart: start of valid data range, seconds since epoch
tend: end of valid data range, seconds since epoch
outdir: output directory to use instead of autogenerated one
message: message to attach after the summary for each summary
Returns:
List of pathnames, where summaries for this run are stored
Raises:
PowerMeasurementError: if called before measurement processing is done
"""
if not self._processing_done:
raise PowerMeasurementError(self.PREMATURE_RETRIEVAL_MSG)
stats_managers = [
s.TrimmedCopy(tag=tag, tstart=tstart, tend=tend)
for s in self._stats.values()
]
# If trimming produces an 'empty' stats manager i.e. a stats manager with
# no data in it, |TrimmedCopy| will return None, so we need to filter
# those out.
stats_managers = [s for s in stats_managers if s is not None]
# Lastly, we want to modify the titles. This is simply to make sure
# that the tag can help identify the summary
for s in stats_managers:
s._title += "(%s)" % tag
return self._SaveSummary(
stats_managers=stats_managers, outdir=outdir, message=message
)
def SaveSummary(self, outdir=None, message=None):
"""Save summary of the PowerMeasurement run.
Args:
outdir: output directory to use instead of autogenerated one
message: message to attach after the summary for each summary
Returns:
List of pathnames, where summaries for this run are stored
Raises:
PowerMeasurementError: if called before measurement processing is done
"""
if not self._processing_done:
raise PowerMeasurementError(self.PREMATURE_RETRIEVAL_MSG)
return self._SaveSummary(
stats_managers=self._stats.values(), outdir=outdir, message=message
)
def _SaveSummary(self, stats_managers=[], outdir=None, message=None):
"""Save summary of the PowerMeasurement run.
Args:
stats_managers: a list of stats managers from which to save the summaries
outdir: output directory to use instead of autogenerated one
message: message to attach after the summary for each summary
Returns:
List of pathnames, where summaries for this run are stored
Raises:
PowerMeasurementError: if called before measurement processing is done
"""
outdir = outdir if outdir else self._outdir
outfiles = [stat.SaveSummary(outdir) for stat in stats_managers]
if message:
for fname in outfiles:
with open(fname, "a", encoding="utf-8") as f:
f.write("\n%s\n" % message)
# Also, output a markdown version of each summary
md_outfiles = [stat.SaveSummaryMD(outdir) for stat in stats_managers]
self._logger.info("Storing summaries at:\n%s", "\n".join(outfiles))
self._logger.info("Storing .md summaries at:\n%s", "\n".join(md_outfiles))
return outfiles
def GetSummary(self):
"""Retrieve summary of the PowerMeasurement run.
Retrieve a dictionary of each StatsManager object this run used, where
each entry is a dictionary of the rail to its statistics.
Returns:
A dictionary of the form:
{'EC': {'ppvar_vbat_mw': {'count': 1,
'max': 0.0,
'mean': 0.0,
'min': 0.0,
'stddev': 0.0},
'Sample_msecs': {...},
'time': {...},
'timeline': {...}},
'Onboard ADC': {...}}
Possible keys are: 'EC', 'Onboard ADC'
Raises:
PowerMeasurementError: if called before measurement processing is done
"""
if not self._processing_done:
raise PowerMeasurementError(self.PREMATURE_RETRIEVAL_MSG)
return {name: stat.GetSummary() for name, stat in self._stats.items()}
def GetFormattedSummary(self):
"""Retrieve summary of the PowerMeasurement run.
See StatsManager._DisplaySummary() for more details
Returns:
string with all available summaries concatenated
Raises:
PowerMeasurementError: if called before measurement processing is done
"""
if not self._processing_done:
raise PowerMeasurementError(self.PREMATURE_RETRIEVAL_MSG)
summaries = [stat.SummaryToString() for stat in self._stats.values()]
return "\n".join(summaries)
def DisplaySummary(self):
"""Print summary retrieved from GetFormattedSummary() call."""
print("\n%s" % self.GetFormattedSummary())
def SaveSummaryJSON(self, outdir=None):
"""Save summary of the PowerMeasurement run as JSON.
Args:
outdir: output directory to use instead of autogenerated one
Returns:
List of pathnames, where summaries for this run are stored
Raises:
PowerMeasurementError: if called before measurement processing is done
"""
if not self._processing_done:
raise PowerMeasurementError(self.PREMATURE_RETRIEVAL_MSG)
return self._SaveSummaryJSON(stats_managers=self._stats.values(), outdir=outdir)
def _SaveSummaryJSON(self, stats_managers=[], outdir=None):
"""Save summary of the PowerMeasurement run as JSON.
Args:
stats_managers: a list of stats managers from which to save the summaries
outdir: output directory to use instead of autogenerated one
Returns:
List of pathnames, where summaries for this run are stored
Raises:
PowerMeasurementError: if called before measurement processing is done
"""
outdir = outdir if outdir else self._outdir
json_outfiles = [stat.SaveSummaryJSON(outdir) for stat in stats_managers]
self._logger.info("Storing .md summaries at:\n%s", "\n".join(json_outfiles))
return json_outfiles
def GetSampleData(self):
"""This function can pass the latest power information
Collect the data in each tracker and append the data into an array
Returns:
return the latest power information
"""
sampleData = []
for power_data in self._power_trackers:
data = power_data.get_sample_data()
if data:
sampleData += data
return sampleData
def CleanSampleData(self):
"""This function will clean the current data structure which saves
the power data.
"""
for trackers in self._power_trackers:
trackers.clean_sample_data()