blob: 35c89198d3eee14c7d0e15bb00bdc47da407c5f8 [file] [log] [blame]
# Copyright 2018 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This script implements a few IntelPowerGadget related helper functions.
This script only works on Windows with Intel CPU. Intel Power Gadget needs to
be installed on the machine before this script works. The software can be
downloaded from:
https://software.intel.com/en-us/articles/intel-power-gadget-20
An easy way to use the APIs are:
1) Launch your program.
2) Call RunIPG() with no args. It will automatically locate the IPG installed
on the machine.
3) Call AnalyzeIPGLogFile() with no args. It will analyze the default IPG log
file, which is PowerLog.csv at current dir; then it will print out the power
usage summary. If you want to skip a few seconds of the power log data, say,
5 seconds, call AnalyzeIPGLogFile(skip_in_sec=5).
"""
import json
import logging
import os
import subprocess
import datetime
def LocateIPG():
ipg_dir = os.getenv('IPG_Dir')
if not ipg_dir:
logging.warning("No env IPG_Dir")
return None
gadget_path = os.path.join(ipg_dir, "PowerLog3.0.exe")
logging.debug("Try to locale Intel Power Gadget at " + gadget_path)
if os.path.isfile(gadget_path):
logging.debug("Intel Power Gadget Found")
return gadget_path
return None
def GenerateIPGLogFilename(log_prefix='PowerLog', log_dir=None, current_run=1,
total_runs=1, timestamp=False):
# If all args take default value, it is the IPG's default log path.
log_dir = log_dir or os.getcwd()
log_dir = os.path.abspath(log_dir)
if total_runs > 1:
log_prefix = "%s_%d_%d" % (log_prefix, current_run, total_runs)
if timestamp:
now = datetime.datetime.now()
log_prefix = "%s_%s" % (log_prefix, now.strftime('%Y%m%d%H%M%S'))
return os.path.join(log_dir, log_prefix + '.csv')
def RunIPG(duration_in_s=60, resolution_in_ms=100, logfile=None):
intel_power_gadget_path = LocateIPG()
if not intel_power_gadget_path:
logging.warning("Can't locate Intel Power Gadget")
return
command = ('"%s" -duration %d -resolution %d' %
(intel_power_gadget_path, duration_in_s, resolution_in_ms))
if not logfile:
# It is not necessary but allows to print out the log path for debugging.
logfile = GenerateIPGLogFilename()
command = command + (' -file %s' %logfile)
logging.debug("Running: " + command)
try:
output = subprocess.check_output(command)
logging.debug("Running: DONE")
logging.debug(output)
except subprocess.CalledProcessError as err:
logging.warning(err)
def AnalyzeIPGLogFile(logfile=None, skip_in_sec=0):
if not logfile:
logfile = GenerateIPGLogFilename()
if not os.path.isfile(logfile):
logging.warning("Can't locate logfile at " + logfile)
return {}
first_line = True
samples = 0
cols = 0
indices = []
labels = []
sums = []
col_time = None
for line in open(logfile):
tokens = line.split(',')
if first_line:
first_line = False
cols = len(tokens)
for ii in range(0, cols):
if tokens[ii].startswith('Elapsed Time'):
col_time = ii
elif tokens[ii].endswith('(Watt)'):
indices.append(ii)
labels.append(tokens[ii][:-len('(Watt)')])
sums.append(0.0)
assert col_time
assert cols > 0
assert len(indices) > 0
continue
if len(tokens) != cols:
continue
if skip_in_sec > 0 and float(tokens[col_time]) < skip_in_sec:
continue
samples += 1
for ii in range(0, len(indices)):
index = indices[ii]
sums[ii] += float(tokens[index])
results = {'samples': samples}
if samples > 0:
for ii in range(0, len(indices)):
results[labels[ii]] = sums[ii] / samples
return results
def ProcessResultsFromMultipleIPGRuns(logfiles, skip_in_seconds=0,
outliers=0, output_json=None):
assert len(logfiles) > 1
output = {}
summary = {}
for logfile in logfiles:
results = AnalyzeIPGLogFile(logfile, skip_in_seconds)
results['log'] = logfile
(_, filename) = os.path.split(logfile)
(core, _) = os.path.splitext(filename)
prefix = 'PowerLog_'
if core.startswith(prefix):
core = core[len(prefix):]
output[core] = results
for key in results:
if key == 'samples' or key == 'log':
continue
if not key in summary:
summary[key] = [results[key]]
else:
summary[key].append(results[key])
for key in summary:
data = summary[key]
assert data and len(data) > 1
n = len(data)
if outliers > 0:
assert outliers * 2 < n
data.sort()
data = data[outliers:(n - outliers)]
n = len(data)
logging.debug('%s: valid samples = %d', key, n)
mean = sum(data) / float(n)
ss = sum((x - mean) ** 2 for x in data)
stdev = (ss / float(n)) ** 0.5
summary[key] = {
'mean': mean,
'stdev': stdev,
}
output['summary'] = summary
if output_json:
json_file = open(output_json, 'w')
json_file.write(json.dumps(output, indent=4))
json_file.close()
return summary