blob: 309adcea3e8efd32a3ea6b47b63fa0aa7eb19dac [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""The binary search algorithm."""
from __future__ import print_function
import logging
import math
import re
import sys
from bisect_kit import errors
from bisect_kit import math_util
logger = logging.getLogger(__name__)
# If a given version got equal to or more than SKIP_FOREVER times of 'skip'
# evaluation result, ignores the said version forever.
SKIP_FOREVER = 3
# Constant that denotes version is known not noisy.
NOT_NOISY = (None, None)
def trend_score(prev_from, prev_to, now_from, now_to):
"""Calculates "trend score".
Assume we have observed benchmark number changed from `prev_from` to
`prev_to` from a dashboard, say, from 100 to 110. Now, we reproduced the same
test locally and obtained our own benchmark values, from `now_from` to
`now_to`. Because our local test may not be able to reproduce the exact
identical changes (100 to 102) for some reasons, but it may have similar
trend, say, 95 to 97, which is considered as a good reproduction.
Here, we define the "similar trend" for two cases.
1. we can reproduce the delta, but the value shifted, ex. previous values
are 100->102 and our new values are 95->97 (delta=+2).
2. we can reproduce the proportion change, but the value scaled, ex.
previous values are 10->15 and our local values are 8->12 (+50%).
The property of trend score:
- score=0 if our local values stay the same
- score=1 if we have similar change (delta or proportion); it's possible
that score>1.
- between 0 and 1 if we have the same trend but no so significant
- negative if opposite direction, ex. previous values are increasing but our
values are decreasing.
Some cases proportion is inapplicable (ex. zero to non-zero, negative to
positive, etc.), then only delta is used in calculation.
Args:
prev_from: previous value changed from
prev_to: previous value changed to
now_from: our new value changed from
now_to: our new value changed to
Returns:
the score value
"""
assert prev_to != prev_from
abs_score = (now_to - now_from) / (prev_to - prev_from)
if now_from == 0 or prev_from == 0 or prev_to * prev_from < 0:
return abs_score
ratio_score = ((now_to - now_from) / now_from) / (
(prev_to - prev_from) / prev_from)
return max(abs_score, ratio_score)
# Old version of pylint may give false-positive warning.
# pylint: disable=unpacking-non-sequence
class NoisyBinarySearch:
"""Binary search algorithm supporting noisy signal.
Examples:
bsearch = NoisyBinarySearch(rev_info, observation='old=1/10,new=9/10')
while not bsearch.is_done():
bsearch.update(idx, get_result(rev_info[idx]))
print(bsearch.get_best_guess())
"""
# States
# INITED:
# Before both ends are verified.
# Recomputing initial values is also done in this phase.
# Not yet to calculate probabilities for each candidate.
# STARTED:
# Started to bisect.
# There are no error states because exceptions are raised for fatal errors.
# There are no "done" states because the confidence may become lower after
# feeding more data.
INITED = 'inited'
STARTED = 'started'
def __init__(self,
rev_info,
old_idx=0,
new_idx=None,
old_value=None,
new_value=None,
term_map=None,
recompute_init_values=False,
oracle=None,
observation=None,
confidence=0.999,
verify_confidence=0.999):
"""Initializes NoisyBinarySearch.
The probability model could be specified in three ways:
- If oracle=(old_p, new_p) is specified. This means probability
distribution is known.
P(new behavior | old version) = old_p
P(new behavior | new version) = new_p
- If observation is specified, which means although the probability
distribution is unknown, some previous test results are provided. The
algorithm will re-estimate probability overtime once more test results
feed.
For example, prior_observation="old=2/3,new=9/10" means
old version failed 2 times out of 3 times, and
new version failed 9 times out of 10 times,
where "new behavior" is "failed" in this case.
If one of them is known non-noisy, it could be omitted. For example,
"new=9/10" (old omitted) means old versions always have old behavior.
- If both oracle and prior_observation are not specified, it means the
signal is not noisy and this class will fallback to classic binary
search.
If recompute_init_values is True, the state transition is:
1. `old_value` and `new_value` have values
but `threshold` is None
=> `prob` is None
2. _next_idx_for_initial() return None, so
`old_avg`, `new_avg`, and `threshold` could be calculated
=> reclassify 'old' and 'new'
Args:
rev_info: List of RevInfo.
old_idx: Index of rev, which has old behavior.
new_idx: Index of rev, which has new behavior.
old_value: For value bisection, the value of old version.
new_value: For value bisection, the value of new version.
term_map: Alternative term for states
recompute_init_values: For value bisection, recompute values of old and
new versions.
oracle: The oracle model of probability distribution.
observation: Observed test results.
confidence: Required confidence for bisection result.
verify_confidence: Bisect range verification confidence.
"""
assert oracle is None or observation is None
self._state = self.INITED
self.dirty = True
if new_idx is None:
new_idx = len(rev_info) - 1
assert rev_info is not None
self.rev_info = rev_info
self.old_idx = old_idx
self.new_idx = new_idx
self.old_value = old_value
self.new_value = new_value
self.term_map = term_map or {}
self.recompute_init_values = recompute_init_values
self.oracle = oracle
if oracle:
self.old_p, self.new_p = self.oracle
self.prior_observation = None
elif observation:
self.prior_observation = self._parse_observation(observation)
self.observation = None
(left_new, left_trial), (right_new, right_trial) = self.prior_observation
self.old_p, self.new_p = self._observation_to_prob(
left_new, left_trial, right_new, right_trial)
else:
self.prior_observation = None
self.old_p, self.new_p = 0, 1
self.confidence = confidence
self.verify_confidence = verify_confidence
self.prob = None
# The probability of a revision to be the one introduce new behavior.
# Initialize weight to 1.0 for revs in between old_idx and new_idx; and
# to 0.0 otherwise.
# TODO(kcwu): use heuristic to guess initial weight.
self.init_weight = [
1.0 if self.old_idx < i <= self.new_idx else 0.0
for i in range(len(self.rev_info))
]
self.skip_weight = None
self.init_prob = None
self.threshold = None
if old_value is not None:
if self.recompute_init_values:
self.old_avg = None
self.new_avg = None
else:
self.old_avg = old_value
self.new_avg = new_value
self.threshold = (old_value + new_value) / 2
def is_noisy(self):
"""Is noisy case?"""
if self.oracle and self.oracle != (0, 1):
return True
return self.prior_observation is not None
@property
def state(self):
if self._state == self.INITED:
if not self.dirty:
return self._state
idx = self._next_idx_for_initial()
self.dirty = False
if idx is not None:
return self._state
self._state = self.STARTED
if self.is_value_bisection() and self.recompute_init_values:
if self.old_avg is None:
self.old_avg = math_util.average(
self.rev_info[self.old_idx].averages())
if self.new_avg is None:
self.new_avg = math_util.average(
self.rev_info[self.new_idx].averages())
score = trend_score(self.old_value, self.new_value, self.old_avg,
self.new_avg)
if score <= 0:
raise errors.WrongAssumption(
'unable to reproduce the same trend (%f->%f): %f->%f' %
(self.old_value, self.new_value, self.old_avg, self.new_avg))
# Reject if we cannot reproduce the trend even half way.
# TODO(kcwu): make this threshold configurable.
if score < 0.5:
expect_delta = abs(self.new_value - self.old_value)
actual_delta = abs(self.new_avg - self.old_avg)
if self.old_value == 0 or self.old_avg == 0:
raise errors.WrongAssumption('unable to reproduce the value gap: '
'expect delta=%f; actual delta=%f' %
(expect_delta, actual_delta))
raise errors.WrongAssumption(
'unable to reproduce the value gap: '
'expect delta=%f (%f%%); actual delta=%f (%f%%)' %
(expect_delta, abs(expect_delta / self.old_value) * 100,
actual_delta, abs(actual_delta / self.old_avg) * 100))
if self.threshold is None:
self.threshold = (self.old_avg + self.new_avg) / 2
self.rev_info[self.old_idx].reclassify(self.old_avg, self.threshold,
self.new_avg)
self.rev_info[self.new_idx].reclassify(self.old_avg, self.threshold,
self.new_avg)
assert self._state == self.STARTED, 'unexpected state: %s' % self._state
if self.prob is None:
self._rebuild()
self._estimate_model()
assert self.prob
return self._state
def add_sample(self, idx, status=None, **kwargs):
sample = dict(status=status, **kwargs)
self.rev_info[idx].add_sample(**sample)
self.dirty = True
status = sample['status']
self.check_verification_range()
if status == 'skip':
self.check_proceed_with_skip(idx)
if self.state == self.INITED:
pass
elif self.state == self.STARTED:
if status == 'init':
status = self.classify_result_from_values(sample['values'])
assert self.prob
for _ in range(sample.get('times', 1)):
self._update(idx, status)
else:
assert 0, 'unexpected state: ' + self.state
@staticmethod
def _parse_observation(observation):
"""Parses prior from argument.
Args:
observation: A string denoting previous evaluation results. See comments
of __init__ for detail format.
Returns:
((old false-positive, old trials), (new true-positive, new trials))
"""
# default not noisy
old_f = NOT_NOISY
new_t = NOT_NOISY
if not observation:
return old_f, new_t
for term in observation.split(','):
m = re.match(r'^(old|new)=(\d+)/(\d+)$', term)
assert m
new_behavior, trial = int(m.group(2)), int(m.group(3))
assert 0 <= new_behavior <= trial
if m.group(1) == 'new':
new_t = new_behavior, trial
elif m.group(1) == 'old':
old_f = new_behavior, trial
return old_f, new_t
def _gather_noise_count(self):
"""Gathers noise count from current run results.
After few rounds of binary search, the candidates could be split into three
groups.
- left: high confidence to have OLD behavior.
- middle: not enough confidence, need further inspection.
- right: high confidence to have NEW behavior.
Returns:
left_new: how many NEW in left region.
left_trial: how many trials in left region.
right_new: how many NEW in right region.
right_trial: how many trials in right region.
"""
if self.observation:
(left_new, left_trial), (right_new, right_trial) = self.observation
return left_new, left_trial, right_new, right_trial
(left_new, left_trial), (right_new, right_trial) = self.prior_observation
if (left_new, left_trial) != NOT_NOISY:
info = self.rev_info[self.old_idx]
left_trial += info['old'] + info['new']
left_new += info['new']
if (right_new, right_trial) != NOT_NOISY:
info = self.rev_info[self.new_idx]
right_trial += info['old'] + info['new']
right_new += info['new']
return left_new, left_trial, right_new, right_trial
@staticmethod
def _expected_probability_from_observation(trial, num):
"""Expected probability from given observation.
Given `trial` times of Bernoulli experiment, the results are 1 for `num`
times. Calculates the expected probability.
Returns:
prob: the expected probability.
"""
# P(observation)
# = \int_0^1 C(trial, num) * p^num * (1-p)^(trial-num) dp
# = 1 / (trial+1)
# P(p | observation)
# = P(observation | p) * P(p) / P(observation)
# = C(trial, num) * p^num * (1-p)^(trial-num) * P(p) * (trial+1)
# E[P(p | observation)]
# = \int_0^1 P(p | observation) * p dp
# = C(trial, num) * \int_0^1 p^num * (1-p)^(trial-num)*p dp
# = (num+1)/(trial+2)
return float(num + 1) / (trial + 2)
@staticmethod
def _observation_to_prob(left_new, left_trial, right_new, right_trial):
old_p, new_p = 0, 1
if (left_new, left_trial) != NOT_NOISY:
old_p = NoisyBinarySearch._expected_probability_from_observation(
left_trial, left_new)
if (right_new, right_trial) != NOT_NOISY:
new_p = NoisyBinarySearch._expected_probability_from_observation(
right_trial, right_new)
return old_p, new_p
def _estimate_model_from_observation(self):
"""Estimates model from observation.
We don't know the real probability distribution. Assume they are Bernoulli
process and calculates the expected probability from observations.
Because the model will be slightly different after _rebuild(), we will
repeat until it is stable.
"""
assert self.prior_observation
(left_new, left_trial), (right_new, right_trial) = self.prior_observation
seen = set()
while True:
old_idx, new_idx = self.get_range()
if (old_idx, new_idx) in seen:
break
seen.add((old_idx, new_idx))
if (left_new, left_trial) != NOT_NOISY:
for info in self.rev_info[:old_idx + 1]:
left_trial += info['old'] + info['new']
left_new += info['new']
if (right_new, right_trial) != NOT_NOISY:
for info in self.rev_info[new_idx:]:
right_trial += info['old'] + info['new']
right_new += info['new']
self.observation = (left_new, left_trial), (right_new, right_trial)
self.old_p, self.new_p = self._observation_to_prob(
left_new, left_trial, right_new, right_trial)
if (self.old_p, self.new_p) in seen:
break
seen.add((self.old_p, self.new_p))
self._rebuild()
def _estimate_model(self):
"""Estimates probability of old false-positive and new true-positive.
In other words, the rates of new behavior is observed at old revisions
and new revisions respectively.
"""
if self.oracle:
self.old_p, self.new_p = self.oracle
elif self.prior_observation:
self._estimate_model_from_observation()
logger.debug('estimated model: old_p=%s, new_p=%s', self.old_p,
self.new_p)
else:
self.old_p, self.new_p = 0, 1
if 0 < self.old_p < 1.0 - self.confidence:
logger.warning(
'Confidence value (%r) is too low to detect '
'old candidates with new behavior (p=%r)', self.confidence,
self.old_p)
if self.confidence < self.new_p < 1.0:
logger.warning(
'Confidence value (%r) is too low to detect '
'new candidates with old behavior (p=%r)', self.confidence,
1.0 - self.new_p)
def get_noise_observation(self):
"""Gets current noise observation.
This includes the counts of prior observations from __init__.
Returns:
A string indicates eval result noise observation so far. See comments of
__init__ for detail format.
"""
assert not self.oracle
if not self.prior_observation:
return None
left_new, left_trial, right_new, right_trial = self._gather_noise_count()
observation = []
if (left_new, left_trial) != NOT_NOISY:
observation.append('old=%d/%d' % (left_new, left_trial))
if (right_new, right_trial) != NOT_NOISY:
observation.append('new=%d/%d' % (right_new, right_trial))
return ','.join(observation)
@staticmethod
def get_weight_by_skip(rev_info):
"""Gets weighting adjustment according to 'skip' test result.
There are two cases of 'skip':
1. The tested version is bad (say, compile error). In such case, its
neighbors are likely 'skip' as well.
2. Some thing wrong temporarily (say, network error). Needs retry.
Ideal solution to handle 'skip' properly during binary search:
- Avoid (less likely) to waste time to eval the 'skip' versions and
their neighbors.
- If other candidates are ruled out, we still need to retry them.
- If a version keeps 'skip' few times for every retry, it is not
temporarily case 2, we can ignore them completely.
It's easy to implement above idea. The secret is that we can assign prior,
P(x_i is first), of the Bayesian formula.
- We can assign smaller probabilities if we prefer not to try them.
- We can assign zero probabilities if we don't want to try them.
Args:
rev_info: List of RevInfo.
Returns:
list of weighting; 1.0 for not adjusted; otherwise, lower values.
"""
# heuristic, chosen arbitrary for now
impact_range = int(math.sqrt(len(rev_info)))
impact_factor = 0.9
weighting = [1.0] * len(rev_info)
for i, info in enumerate(rev_info):
# If ever non-skip, we don't care their 'skip' record.
if info['new'] or info['old']:
continue
if not info['skip']:
continue
# If retried enough times but still 'skip', don't try them forever.
if info['skip'] >= SKIP_FOREVER:
weighting[i] = 0
else:
# Allow retry, but the more 'skip', the lower weighting.
weighting[i] *= 0.1**info['skip']
# Adjust weighting of neighbors.
# Following formula is chosen arbitrarily. The main idea is "impact" is
# correlated to the distance until reaching non-skip version.
# p.s. e**(-pi*x**2) is borrowed from normal distribution.
for i, info in enumerate(rev_info):
# If ever non-skip, we don't care their 'skip' record.
if info['new'] or info['old']:
continue
if not info['skip']:
continue
# left to right
for j in range(i + 1, min(i + impact_range, len(rev_info))):
if rev_info[j]['new'] or rev_info[j]['old']:
break
x = float(i - j) / impact_range
impact = math.e**(-math.pi * x**2) * impact_factor
weighting[j] = min(weighting[j], 1.0 - impact)
# right to left
for j in reversed(list(range(max(0, i - impact_range + 1), i))):
if rev_info[j]['new'] or rev_info[j]['old']:
break
x = float(i - j) / impact_range
impact = math.e**(-math.pi * x**2) * impact_factor
weighting[j] = min(weighting[j], 1.0 - impact)
return weighting
@staticmethod
def _calculate_probs(old_p, new_p, rev_info, init_prob):
"""Calculates P(x_i is first new | results) for all candidates.
What we want:
prob[i] = P(x_i is first new | results)
= P(x_i is first new) * P(results | x_i is first new) / P(results)
Args:
old_p: P(observe=new | x=old)
new_p: P(observe=new | x=new)
rev_info: List of RevInfo.
init_prob: unnormalized P(x_i is first new)
Returns:
prob, where prob[i] = P(x_i is first new | results)
"""
assert 0 <= old_p < new_p
# First, let prob[i] = P(x_i is first new). Because prob[i] will be
# normalized later, it's okay to use unnormalized init_prob here directly.
prob = list(init_prob)
total_old = sum(info['old'] for info in rev_info)
total_new = sum(info['new'] for info in rev_info)
# Estimate the lower bound of prob[i].
# If the value is less than minimum float value, use ExtendedFloat for
# calculation to prevent underflow. See comments of ExtendedFloat for more
# detail.
min_prob = math_util.least([old_p, 1 - old_p, new_p, 1 - new_p])**(
total_old + total_new) * math_util.least(prob)
if min_prob < sys.float_info.min:
prob = [math_util.ExtendedFloat(p) for p in prob]
old_p = math_util.ExtendedFloat(old_p)
new_p = math_util.ExtendedFloat(new_p)
# These four variables, {left,right}_{old,new}, are corresponding to x_i.
# That is, initialized to corresponding to x_0 and incrementally updated at
# end of each loop iteration.
# left_old means how many observed old behavior left to x_i. So on and so
# forth.
left_old = left_new = 0
right_old = total_old
right_new = total_new
for i, p in enumerate(prob):
# prob[i] *= P(results | x_i is first new)
if p != 0:
if left_new != 0:
p *= old_p**left_new
if left_old != 0:
p *= (1 - old_p)**left_old
if right_new != 0:
p *= new_p**right_new
if right_old != 0:
p *= (1 - new_p)**right_old
prob[i] = p
# Update count for next iteration.
left_old += rev_info[i]['old']
left_new += rev_info[i]['new']
right_old -= rev_info[i]['old']
right_new -= rev_info[i]['new']
# P(results) = \sum_i P(results | x_i is first new) * P(x_i is first new)
# = \sum_i prob[i]
sum_prob = sum(prob)
if old_p == 0 and new_p == 1:
if sum_prob == 0:
raise errors.WrongAssumption(
'Test results conflict with assumption, try --noisy')
else:
assert sum_prob > 0
# prob[i] /= P(results)
prob = [x / sum_prob for x in prob]
# Convert back to float.
prob = [float(p) for p in prob]
return prob
def _rebuild(self):
"""Rebuilds probability.
Raises:
errors.VerificationFailed: Bisection range is not verified and false
events occur too many times.
"""
assert 0 <= self.old_idx < self.new_idx < len(self.rev_info)
# First, P(x_i is first new) = Normalize(init_prob[i])
# = Normalize(init_weight[i] * skip_weight[i])
self.skip_weight = self.get_weight_by_skip(self.rev_info)
self.init_prob = [p * w for p, w in zip(self.init_weight, self.skip_weight)]
self.prob = self._calculate_probs(self.old_p, self.new_p, self.rev_info,
self.init_prob)
def _is_changing_skip_weight(self, idx, result):
if result == 'skip':
# It is skipped, so skip_weight must be changed.
return True
# Not the first non-skip result, skip_weight is unaffected.
if self.rev_info[idx][result] > 1:
return False
assert self.rev_info[idx][result] == 1
# For the first non-skip result, skip_weight is affected if itself or its
# neighbor's skip_weight != 1.
for i in [idx - 1, idx, idx + 1]:
if 0 <= i < len(self.rev_info) and self.skip_weight[i] != 1:
return True
return False
def is_value_bisection(self):
return self.old_value is not None and self.new_value is not None
def classify_result_from_values(self, values):
assert values, 'eval script terminated normally but does not output values?'
if self.state == self.INITED and self.recompute_init_values:
return 'init'
assert self.threshold is not None
assert values
avg = math_util.average(values)
if self.old_avg < self.new_avg:
return 'old' if avg < self.threshold else 'new'
return 'new' if avg < self.threshold else 'old'
def _update(self, idx, result):
"""Incremental update by one more test result.
Args:
idx: index of the new test.
result: result of the new test. 'old', 'new', or 'skip'.
"""
assert self.prob
assert self.rev_info
if self._is_changing_skip_weight(idx, result):
# Incremental update by skip_weight.
new_skip_weight = self.get_weight_by_skip(self.rev_info)
for i, (cur_skip_w,
new_skip_w) in enumerate(zip(self.skip_weight, new_skip_weight)):
if new_skip_w != cur_skip_w:
# A non-skip result after several skips, the weight will jump from
# zero to non-zero and unable to update incrementally.
if cur_skip_w == 0:
self._rebuild()
return
self.prob[i] *= new_skip_w / cur_skip_w
self.skip_weight = new_skip_weight
# If not skip, prob[i] needs to multiply probability of new observation.
# prob[i] *= P(last result | x_i is first new).
# Let's consider the value of multiplier,
# if result=new && i <= idx: P(true-positive)
# if result=new && i > idx: P(false-positive)
# if result=old && i <= idx: P(true-negative) = 1 - P(true-positive)
# if result=old && i > idx: P(false-negative) = 1 - P(false-positive)
if result == 'new':
for i in range(idx + 1):
self.prob[i] *= self.new_p
for i in range(idx + 1, len(self.prob)):
self.prob[i] *= self.old_p
if result == 'old':
for i in range(idx + 1):
self.prob[i] *= 1 - self.new_p
for i in range(idx + 1, len(self.prob)):
self.prob[i] *= 1 - self.old_p
sum_prob = sum(self.prob)
assert sum_prob > 0
self.prob = [x / sum_prob for x in self.prob]
if self.prior_observation:
self._estimate_model()
def get_range(self, confidence=None):
"""Gets narrowed-down bisection range so far.
Using a heuristic algorithm to find a range (left, right) that,
sum(prob[left:right]) > confidence. Some optimizations are applied so the
range may not be the narrowest.
If `confidence` value is bigger, the returned range is bigger.
Args:
confidence: Confidence value. Default is the confidence provided in ctor.
Returns:
(left, right):
left: Index likely before the point of probability change.
right: Index likely after the point of probability change.
Raises:
errors.WrongAssumption: the bisection range is invalid
"""
if self.state == self.INITED:
return self.old_idx, self.new_idx
assert self.prob
if confidence is None:
confidence = self.confidence
def order_by_prob(x):
index, prob = x
return -prob, index
# Non-zero probabilities sorted by prob (big to small), then index (small
# to big)
sorted_prob = sorted(((i, p) for (i, p) in enumerate(self.prob) if p > 0),
key=order_by_prob)
cumulative = 0
cut = None
candidates = set()
# This loop finds a range whose sum of probabilities are bigger than the
# value of confidence. But on the edge of threshold, it will include more
# candidates until the ratio of probability delta from the cut point is
# larger than a predefined gap, which is empirical.
GAP = 0.5
# p is monotonically decreasing.
for i, p in sorted_prob:
cumulative += p
if cut is None:
if cumulative >= confidence:
cut = p
else:
if p < cut * GAP:
break
candidates.add(i)
assert cut is not None or confidence == 1
left = max(min(candidates) - 1, self.old_idx)
# Sometimes left has old probability due to 'skip', not 'old'. Seeks for
# version with 'old'.
while left > self.old_idx and self.rev_info[left]['old'] == 0:
left -= 1
right = max(candidates)
assert right <= self.new_idx
return left, right
def is_range_verified(self):
"""Is bisection range already been verified?"""
return (self.rev_info[self.old_idx]['old'] > 0 and
self.rev_info[self.new_idx]['new'] > 0)
def check_verification_range(self):
"""Checks if bisection range is reasonable.
If the eval results conflict with the bisection range, abort the
bisection if the probability is too low.
Raises:
errors.VerificationFailed: Bisection range is not verified and false
events occur too many times.
"""
if self.state == self.INITED and self.recompute_init_values:
return
if self.rev_info[self.old_idx]['old'] == 0:
# P(try N times and old behavior occurs 0 times)
p = self.old_p**self.rev_info[self.old_idx]['new']
if p < 1.0 - self.verify_confidence:
raise errors.VerifyOldBehaviorFailed(
self.rev_info[self.old_idx].rev,
self.rev_info[self.old_idx]['new'],
term_old=self.term_map.get('old', 'old'),
term_new=self.term_map.get('new', 'new'))
if self.rev_info[self.new_idx]['new'] == 0:
# P(try N times and new behavior occurs 0 times)
p = (1 - self.new_p)**self.rev_info[self.new_idx]['old']
if p < 1.0 - self.verify_confidence:
raise errors.VerifyNewBehaviorFailed(
self.rev_info[self.new_idx].rev,
self.rev_info[self.new_idx]['old'],
term_old=self.term_map.get('old', 'old'),
term_new=self.term_map.get('new', 'new'))
def check_proceed_with_skip(self, idx):
"""Checks if we should proceed after skips.
We just got one more 'skip' result. Should we tolerate it or stop bisecting?
Args:
idx: index of the new skip.
Raises:
errors.UnableToProceed: skip too many times, probably serious issue and
unable to proceed
"""
# we just got skip
assert self.rev_info[idx]['skip'] > 0
non_skip_count = self.rev_info[idx]['old'] + self.rev_info[idx]['new']
if ((idx in (self.old_idx, self.new_idx) and non_skip_count == 0 and
self.rev_info[idx]['skip'] >= SKIP_FOREVER) or
(self.rev_info[idx]['skip'] >=
(non_skip_count + 1) * (SKIP_FOREVER + 2))):
raise errors.UnableToProceed(
'something wrong, '
'unable to evaluate rev=%r (skip) %d times' %
(self.rev_info[idx].rev, self.rev_info[idx]['skip']))
def show_summary(self, more=False):
"""Shows top ranked candidates.
Args:
more: If true, shows versions with any test result as well.
"""
def show_candidate(idx):
if self.state == self.STARTED and self.is_noisy():
logger.info('[%d] %s %.4f%%; %s', idx, self.rev_info[idx].rev,
self.prob[idx] * 100, self.rev_info[idx].summary())
else:
logger.info('[%d] %s; %s', idx, self.rev_info[idx].rev,
self.rev_info[idx].summary())
if self.state == self.INITED:
logger.info('Verifying initial range:')
for i in (self.old_idx, self.new_idx):
show_candidate(i)
return
prob_rank = sorted(((p, i) for i, p in enumerate(self.prob) if p > 0),
reverse=True)
left, right = self.get_range(1.0 - self.confidence)
interesting = set([left, right])
if self.is_noisy():
# show top candidates
interesting.update(i for p, i in prob_rank[:10])
else:
# show all skip within range
for i in range(left + 1, right):
if self.rev_info[i]['skip'] and self.prob[i] > 0:
interesting.add(i)
if more:
# show all revs with test results
for i, info in enumerate(self.rev_info):
if sum(info.result_counter.values()) > 0:
interesting.add(i)
size = right - left
if size > 1 and not self.is_noisy():
steps = math.ceil(math.log(size) / math.log(2))
logger.info('Range: (%s, %s], %d revs left (roughly %.0f steps)',
self.rev_info[left].rev, self.rev_info[right].rev, size,
steps)
else:
logger.info('Range: (%s, %s], %d revs left', self.rev_info[left].rev,
self.rev_info[right].rev, size)
for i in sorted(interesting):
show_candidate(i)
def remaining_steps(self):
# TODO(kcwu): estimate remaining steps for noisy case
if self.is_noisy():
return None
left, right = self.get_range()
return math.ceil(math.log(right - left) / math.log(2))
def is_done(self):
"""If we have an answer with high confidence."""
if self.state == self.INITED:
return False
assert self.prob
# TODO(kcwu): revise to consider the real probability of true-positive is
# far away to estimation.
return max(self.prob) > self.confidence
def get_best_guess(self):
"""Gets current best guess.
If called before binary search done, it just returns one of candidates with
highest probability.
Returns:
Index of the element with max probability.
"""
if self.state == self.INITED:
return None
assert self.prob
return self.prob.index(max(self.prob))
@staticmethod
def _next_idx_by_utility(old_p, new_p, prob, cost_table=None):
"""Determines next index to binary search according to utility."""
assert prob
if cost_table is None:
cost_table = [(1, 1)] * len(prob)
orig_stats = math_util.EntropyStats([float(p) for p in prob])
orig_entropy = orig_stats.entropy()
new_stats = orig_stats.multiply(old_p)
old_stats = orig_stats.multiply(1.0 - old_p)
# Enumerate |i| as next to query, find best |i| to maximize utility.
best = float('-inf')
best_i = None
# |s| = cumulative probability of [p_0,..., p_i]
s = 0.0
for i, p in enumerate(prob):
# If rev[i] is observed with old behavior, the result entropy is
# entropy( [p_0, ... p_i] * (1-new_p) + (p_i, ... p_n) * (1-old_p) )
old_stats.replace(p * (1.0 - old_p), p * (1.0 - new_p))
# If rev[i] is observed with new behavior, the result entropy is
# entropy( [p_0, ... p_i] * new_p + (p_i, ... p_n) * old_p )
new_stats.replace(p * old_p, p * new_p)
# Let s = sum(p[:i+1]).
s += p
# Expected probability to observe new behavior.
r = s * new_p + (1.0 - s) * old_p
# The expected entropy after getting the next result at rev[i].
entropy = r * new_stats.entropy() + (1.0 - r) * old_stats.entropy()
information_gain = orig_entropy - entropy
cost = r * cost_table[i][1] + (1.0 - r) * cost_table[i][0]
utility = information_gain / cost
if utility > best:
best = utility
best_i = i
return best_i
def _next_idx_for_initial(self):
"""Returns index to probe for initial samples
1. Enough sample points for old and new version. By "enough", it means
(weak heuristic),
- At leat two samples.
- If the two samples are different, we need more.
- If eval is fast, we should get more samples.
2. Compare the value distribution between old and new version. Acquire
more samples until they are roughly distinguishable.
- Within reasonable time.
"""
assert self._state == self.INITED
if not self.recompute_init_values:
# Try 'new' before 'old' in order to know whether the issue is
# reproducible earlier
if self.rev_info[self.new_idx]['new'] == 0:
return self.new_idx
if self.rev_info[self.old_idx]['old'] == 0:
return self.old_idx
return None
def is_one_side_enough(info):
count = info['init']
if count > 30:
return True
if count < 2:
return False
averages = info.averages()
if count == 2 and averages[0] != averages[1]:
return False
# Spend 5 minutes for one side sounds reasonable.
return info.eval_time > 300
if not is_one_side_enough(self.rev_info[self.new_idx]):
logger.debug('%s not enough', self.rev_info[self.new_idx].rev)
return self.new_idx
if not is_one_side_enough(self.rev_info[self.old_idx]):
logger.debug('%s not enough', self.rev_info[self.old_idx].rev)
return self.old_idx
old_avg = math_util.average(self.rev_info[self.old_idx].averages())
new_avg = math_util.average(self.rev_info[self.new_idx].averages())
user_diff = self.new_value - self.old_value
assert user_diff != 0
# If the samples have the same (at least 1/2) trend as expected, we can
# stop collect more samples.
if trend_score(self.old_value, self.new_value, old_avg, new_avg) > 0.5:
return None
logger.debug('not distinguishable')
# If the best case values do not help, it probably has no hope.
if user_diff > 0:
new_max = max(self.rev_info[self.new_idx].averages())
old_min = min(self.rev_info[self.old_idx].averages())
if trend_score(self.old_value, self.new_value, old_min, new_max) < 0.5:
return None
else:
new_min = min(self.rev_info[self.new_idx].averages())
old_max = max(self.rev_info[self.old_idx].averages())
if trend_score(self.old_value, self.new_value, old_max, new_min) < 0.5:
return None
# If not obviously distinguishable, spend half hour to collect initial
# samples.
if self.rev_info[self.old_idx].eval_time < 1800:
return self.old_idx
if self.rev_info[self.new_idx].eval_time < 1800:
return self.new_idx
return None
def get_prob(self):
if self.state == self.INITED:
return None
return self.prob
def next_idx(self, cost_table=None):
"""Determines next index to binary search.
Args:
cost_table: list of cost estimation of candidates, optional. Each
estimation is a tuple (estimated_old, estimated_new) where
estimated_old: estimated cost if this candidate has old behavior
estimated_new: estimated cost if this candidate has new behavior
Returns:
Index to try for binary search. The returned index is optimized for
minimal total cost.
"""
if self.state == self.INITED:
idx = self._next_idx_for_initial()
assert idx is not None
return idx
assert self.state == self.STARTED
assert self.prob
return self._next_idx_by_utility(self.old_p, self.new_p, self.prob,
cost_table)