blob: 770de6055aecf94a86cde3b17774a952cfbcb70a [file] [log] [blame]
# Copyright 2022 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import annotations
import abc
import json
import logging
import math
import pathlib
from re import A
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import crossbench as cb
import crossbench.probes as probes
class JsonResultProbe(probes.Probe, metaclass=abc.ABCMeta):
"""
Abstract Probe that stores a JSON result extracted by the `to_json` method
Tje `to_json` is provided by subclasses. A typical examples includes just
running a JS script on the page.
Multiple JSON result files for RepetitionsRunGroups are merged with the
JSONMerger. Custom merging for other RunGroups can be defined in the subclass.
"""
FLATTEN = True
@property
def results_file_name(self):
return f"{self.name}.json"
@abc.abstractmethod
def to_json(self, actions):
"""
Override in subclasses.
Returns json-serializable data.
"""
return None
def flatten_json_data(self, json_data):
return flatten(json_data)
class Scope(probes.Probe.Scope):
def __init__(self, probe: JsonResultProbe, run: cb.runner.Runner):
super().__init__(probe, run)
self._json_data = None
@property
def probe(self) -> JsonResultProbe:
return super().probe
def to_json(self, actions):
return self.probe.to_json(actions)
def start(self, run):
pass
def stop(self, run):
self._json_data = self.extract_json(run)
def tear_down(self, run):
return self.write_json(run, self._json_data)
def extract_json(self, run: cb.runner.Run):
with run.actions(f"Extracting Probe name={self.probe.name}") as actions:
json_data = self.to_json(actions)
assert json_data is not None, (
"Probe name=={self.probe.name} produced no data")
return json_data
def write_json(self, run: cb.runner.Run, json_data):
with run.actions(f"Writing Probe name={self.probe.name}") as actions:
assert json_data is not None
raw_file = self.results_file
if self.probe.FLATTEN:
raw_file = raw_file.with_suffix(".raw.json")
flattened_file = self.results_file
flat_json_data = self.flatten_json_data(json_data)
with flattened_file.open("w") as f:
json.dump(flat_json_data, f, indent=2)
with raw_file.open("w") as f:
json.dump(json_data, f, indent=2)
if self.probe.FLATTEN:
return (flattened_file, raw_file)
return raw_file
def flatten_json_data(self, json_data):
return self.probe.flatten_json_data(json_data)
def merge_repetitions(self, group: cb.runner.RepetitionsRunGroup):
merger = JSONMerger()
for run in group.runs:
source_file = self.get_mergeable_result_file(run.results[self])
assert source_file.is_file()
with source_file.open("r") as f:
merger.add(json.load(f))
return self.write_group_result(group, merger.to_json())
def get_mergeable_result_file(self, results):
if isinstance(results, tuple):
return pathlib.Path(results[0])
return pathlib.Path(results)
def write_group_result(self, group, merged_data):
destination_path = group.get_probe_results_file(self)
with destination_path.open("w") as f:
json.dump(merged_data, f, indent=2)
return destination_path
class Values:
"""
A collection of values that is use as an accumulator in the JSONMerger.
Values provides simple statistical getters if the collected values are
ints or floats only.
"""
@classmethod
def from_json(cls, json_data):
return cls(json_data["values"])
def __init__(self, values=None):
self.values = values or []
def is_numeric(self):
return all(isinstance(v, (int, float)) for v in self.values)
@property
def min(self):
return min(self.values)
@property
def max(self):
return max(self.values)
@property
def average(self):
return sum(self.values) / len(self.values)
@property
def geomean(self):
product = 1
for value in self.values:
product *= value
return product**(1 / len(self.values))
@property
def stddev(self):
"""
We're ignoring here any actual distribution of the data and use this as a
rough estimate of the quality of the data
"""
average = self.average
variance = 0
for value in self.values:
variance += (average - value)**2
variance /= len(self.values)
return math.sqrt(variance)
def append(self, value):
self.values.append(value)
def to_json(self):
json_data = dict(values=self.values)
if self.is_numeric():
json_data["min"] = self.min
average = json_data["average"] = self.average
json_data["geomean"] = self.geomean
json_data["max"] = self.max
stddev = json_data["stddev"] = self.stddev
if average == 0:
json_data["stddevPercent"] = 0
else:
json_data["stddevPercent"] = (stddev / average) * 100
return json_data
# Simplify repeated non-numeric values
if len(set(self.values)) == 1:
return self.values[0]
return json_data
# ========================================================================
class JSONFlat:
"""
Creates a sorted flat list of (key-path, Values) from hierarchical data.
Input: {"a" : {"aa1":1, "aa2":2}, "b": 12 }
Output: [
"a/aa1": 1,
"a/aa2": 2,
"b": 12,
]
"""
@classmethod
def flatten(cls, *merged_data, key=None):
instance = cls(key)
instance.append(*merged_data)
return instance.data
def __init__(self, key=None):
self._accumulator = {}
self._key_fn = key or (lambda path: "/".join(path))
@property
def data(self):
items = sorted(self._accumulator.items(), key=lambda item: item[0])
return dict(items)
def append(self, *args, ignore_toplevel=False):
toplevel_path = tuple()
for merged_data in args:
self._flatten(toplevel_path, merged_data, ignore_toplevel)
def _is_leaf_item(self, item):
if isinstance(item, (str, float, int, list)):
return True
if "values" in item and isinstance(item["values"], list):
return True
return False
def _flatten(self, parent_path, data, ignore_toplevel=False):
for name, item in data.items():
path = parent_path + (name,)
if self._is_leaf_item(item):
if ignore_toplevel and parent_path == ():
continue
key = self._key_fn(path)
assert isinstance(key, str)
assert key not in self._accumulator, (
f"Duplicate key='{key}' path={path}")
self._accumulator[key] = item
else:
self._flatten(path, item)
def flatten(*merged_data, key=None):
return JSONFlat.flatten(*merged_data, key=key)
# ========================================================================
class JSONMerger:
"""
Merges hierarchical data into 1-level aggregated data;
Input:
data_1 ={
"a": {
"aa": 1.1,
"ab": 2
}
"b": 2.1
}
data_2 = {
"a": {
"aa": 1.2
}
"b": 2.2,
"c": 2
}
The merged data maps pathlib.Path() => Values():
{
pathlib.Path("a/aa"): Values(1.1, 1.2)
pathlib.Path("a/ab"): Values(2)
pathlib.Path("b"): Values(2.1, 2.2)
pathlib.Path("c"): Values(2)
}
"""
@classmethod
def from_merged_files(cls, files):
merger = cls()
for file in files:
with file.open() as f:
merger.merge_json_values(json.load(f))
return merger
@classmethod
def merge(cls, *args):
merger = cls()
for data in args:
merger.add(data)
return merger
def __init__(self):
self._data = {}
self._ignored_paths = set()
@property
def data(self):
return self._data
def merge_json_values(self,
json_data,
prefix_path=None,
merge_duplicate_paths=False):
"""Merge a previously serialized data object"""
for path, data in json_data.items():
if prefix_path:
path = prefix_path / pathlib.Path(path)
else:
path = pathlib.Path(path)
if path in self._ignored_paths:
continue
if path in self._data:
if merge_duplicate_paths:
values = self._data[path]
for value in json_data["values"]:
values.append(value)
else:
logging.debug(
"Removing Values with the same key-path='%s'"
"from multiple files.", path)
del self._data[path]
self._ignored_paths.add(path)
else:
self._data[path] = Values.from_json(data)
def add(self, json_data):
if isinstance(json_data, list):
# Assume that top-level lists are repetitions of the same data
for item in json_data:
self._merge(item, pathlib.Path())
else:
self._merge(json_data, pathlib.Path())
def _merge(self, json_data, parent_path):
assert isinstance(json_data, dict)
for key, value in json_data.items():
path = parent_path / key
if isinstance(value, dict):
self._merge(value, path)
else:
if path in self._data:
values = self._data[path]
else:
values = self._data[path] = Values()
if isinstance(value, list):
for v in value:
values.append(v)
else:
values.append(value)
def to_json(self, value_fn=None):
json_data = {}
# Make sure the data is always in the same order, independent of the input
# order
paths = sorted(self._data.keys())
for path in paths:
value = self._data[path]
assert isinstance(value, Values)
if value_fn is None:
json_data[str(path)] = value.to_json()
else:
json_data[str(path)] = value_fn(value)
return json_data
def merge(*args, value=None):
return JSONMerger.merge(*args).to_json(value_fn=value)