| # -*- coding: utf-8 -*- |
| # Copyright 2017 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Test strategy module.""" |
| |
| from __future__ import print_function |
| import collections |
| import random |
| import unittest |
| |
| from bisect_kit import core |
| from bisect_kit import errors |
| from bisect_kit import strategy |
| |
| |
| class TestStrategy(unittest.TestCase): |
| """Test global functions in strategy module.""" |
| |
| def test_trend_score(self): |
| self.assertEqual(strategy.trend_score(1.0, 2.0, 1.0, 2.0), 1.0) |
| self.assertEqual(strategy.trend_score(1.0, 3.0, 100, 101), 0.5) |
| self.assertLess(strategy.trend_score(1.0, 3.0, 101, 100), 0) |
| self.assertEqual(strategy.trend_score(100.0, 300.0, 10.0, 30.0), 1.0) |
| self.assertEqual(strategy.trend_score(100.0, 300.0, 10.0, 20.0), 0.5) |
| self.assertLess(strategy.trend_score(100.0, 300.0, 20.0, 10.0), 0) |
| |
| self.assertEqual(strategy.trend_score(0.0, 300.0, 0.0, 30.0), 0.1) |
| self.assertLess(strategy.trend_score(-10.0, 10.0, 10.0, -10.0), 0) |
| |
| |
| # pylint: disable=protected-access |
| class TestNoisyBinarySearch(unittest.TestCase): |
| """Test NoisyBinarySearch class.""" |
| |
| def setUp(self): |
| self.rev_info = [] |
| for i in range(100): |
| self.rev_info.append(core.RevInfo(str(i))) |
| self.init_prob = [1.0] * len(self.rev_info) |
| |
| def test_parse_observation(self): |
| Strategy = strategy.NoisyBinarySearch |
| self.assertEqual( |
| Strategy._parse_observation('new=9/10'), (strategy.NOT_NOISY, (9, 10))) |
| self.assertEqual( |
| Strategy._parse_observation('old=1/10,new=9/10'), ((1, 10), (9, 10))) |
| |
| def test_calculate_probs_0_1(self): |
| prob = strategy.NoisyBinarySearch._calculate_probs(0.0, 1.0, self.rev_info, |
| self.init_prob) |
| self.assertAlmostEqual(prob[10], 0.01) |
| self.assertAlmostEqual(prob[50], 0.01) |
| |
| self.rev_info[49]['old'] += 1 |
| prob = strategy.NoisyBinarySearch._calculate_probs(0.0, 1.0, self.rev_info, |
| self.init_prob) |
| self.assertAlmostEqual(prob[49], 0.0) |
| self.assertAlmostEqual(prob[50], 0.02) |
| |
| def test_calculate_probs_half(self): |
| self.rev_info[49]['old'] += 1 |
| prob = strategy.NoisyBinarySearch._calculate_probs(0.0, 0.5, self.rev_info, |
| self.init_prob) |
| self.assertAlmostEqual(prob[49], 0.00666667) |
| self.assertAlmostEqual(prob[50], 0.01333333) |
| |
| def test_calculate_probs_1_9(self): |
| self.rev_info[49]['old'] += 1 |
| prob = strategy.NoisyBinarySearch._calculate_probs(0.1, 0.9, self.rev_info, |
| self.init_prob) |
| self.assertAlmostEqual(prob[49], 0.002) |
| self.assertAlmostEqual(prob[50], 0.018) |
| |
| def test_calculate_probs_wrong_assumption(self): |
| self.rev_info[30]['new'] += 1 |
| self.rev_info[40]['old'] += 1 |
| with self.assertRaises(errors.WrongAssumption): |
| strategy.NoisyBinarySearch._calculate_probs(0.0, 1.0, self.rev_info, |
| self.init_prob) |
| |
| def test_calculate_probs_underflow(self): |
| """Test underflow situation if eval too many times. |
| |
| The algorithm may calculate p**n, where n is number of test runs. If n is |
| large enough, the whole calculation may underflow. This test makes sure the |
| algorithm works properly. |
| """ |
| for i in range(0, 50): |
| self.rev_info[i]['old'] += 100 |
| for i in range(50, 100): |
| self.rev_info[i]['new'] += 100 |
| prob = strategy.NoisyBinarySearch._calculate_probs(0.0, 0.5, self.rev_info, |
| self.init_prob) |
| self.assertAlmostEqual(prob[49], 0) |
| self.assertAlmostEqual(prob[50], 1) |
| |
| def test_prob(self): |
| bsearch = strategy.NoisyBinarySearch( |
| self.rev_info, confidence=0.99, oracle=(0, 0.9)) |
| for i in [0, 10, 20, 30, 40, 50]: |
| bsearch.add_sample(i, 'old') |
| for i in [60, 70, 80, 90, 99]: |
| bsearch.add_sample(i, 'new') |
| |
| prob = bsearch.get_prob() |
| self.assertLess(prob[5], prob[15]) |
| self.assertLess(prob[15], prob[25]) |
| self.assertLess(prob[25], prob[35]) |
| self.assertLess(prob[35], prob[45]) |
| self.assertLess(prob[45], prob[55]) |
| self.assertAlmostEqual(prob[55], 0.09, 6) |
| self.assertAlmostEqual(prob[65], 0) |
| self.assertAlmostEqual(prob[75], 0) |
| self.assertAlmostEqual(prob[85], 0) |
| self.assertAlmostEqual(prob[95], 0) |
| |
| self.assertEqual(bsearch.get_range(), (40, 60)) |
| self.assertEqual(bsearch.next_idx(), 54) |
| assert not bsearch.is_done() |
| |
| def test_get_noise_observation(self): |
| bsearch = strategy.NoisyBinarySearch( |
| self.rev_info, observation='old=1/10,new=9/10') |
| self.assertEqual(bsearch.get_noise_observation(), 'old=1/10,new=9/10') |
| |
| bsearch.add_sample(0, 'old') |
| bsearch.add_sample(10, 'new') |
| bsearch.add_sample(20, 'old', times=2) |
| bsearch.add_sample(30, 'old', times=20) |
| bsearch.add_sample(50, 'new', times=20) |
| bsearch.add_sample(60, 'new', times=20) |
| bsearch.add_sample(90, 'old', times=2) |
| bsearch.add_sample(99, 'new') |
| |
| self.assertEqual(bsearch.get_noise_observation(), 'old=2/34,new=50/53') |
| |
| def test_next_idx_arithmetic_error(self): |
| # Larger `n` may produce larger arithmetic error. For example, |
| # n=1e3 could lead to call math.log(-1e-14). |
| # n=1e6 could lead to call math.log(-1e-11). |
| # n=1e7 could lead to call math.log(-1e-10). |
| # Here we only test n=1000 because |
| # - we often run bisect with candidates of the same order of magnitude. |
| # - larger n is too slow as unittest. |
| n = 1000 |
| rev_info = [] |
| for i in range(n): |
| rev_info.append(core.RevInfo(str(i))) |
| |
| bsearch = strategy.NoisyBinarySearch(rev_info, oracle=(0.01, 1)) |
| bsearch.add_sample(n - 1, 'new') |
| bsearch.add_sample(0, 'old') |
| |
| rng = random.Random(0) |
| for _ in range(10): |
| idx = rng.randint(1, n - 2) |
| bsearch.add_sample(idx, 'skip') |
| |
| # Should not raise ValueError (math domain error). |
| bsearch.next_idx() |
| |
| def test_many_skip(self): |
| bsearch = strategy.NoisyBinarySearch(self.rev_info, oracle=(0.1, 0.9)) |
| for _ in range(strategy.SKIP_FOREVER - 1): |
| bsearch.add_sample(99, 'skip') |
| |
| with self.assertRaises(errors.UnableToProceed): |
| bsearch.add_sample(99, 'skip') |
| |
| def test_skip(self): |
| bsearch = strategy.NoisyBinarySearch(self.rev_info, oracle=(0, 0.9)) |
| bsearch.add_sample(0, 'old') |
| bsearch.add_sample(99, 'new') |
| self.assertEqual(bsearch.get_range(), (0, 99)) |
| self.assertEqual(bsearch.next_idx(), 45) |
| |
| bsearch.add_sample(45, 'skip') |
| self.assertEqual(bsearch.get_range(), (0, 99)) |
| self.assertNotEqual(bsearch.next_idx(), 45) |
| |
| def test_skip_then_success(self): |
| bsearch = strategy.NoisyBinarySearch(self.rev_info) |
| bsearch.add_sample(0, 'old') |
| bsearch.add_sample(99, 'new') |
| bsearch.add_sample(98, 'skip', times=strategy.SKIP_FOREVER) |
| # Skip too many times, the probability is set to 0. |
| self.assertEqual(bsearch.get_prob()[98], 0) |
| |
| # Suddenly the result is not 'skip' any more. The probability jump from |
| # zero to non-zero. |
| bsearch.add_sample(98, 'new') |
| self.assertGreater(bsearch.get_prob()[98], 0) |
| |
| def test_get_range_with_skip(self): |
| bsearch = strategy.NoisyBinarySearch(self.rev_info, oracle=(0, 0.9)) |
| bsearch.add_sample(0, 'old') |
| bsearch.add_sample(33, 'old', times=10) |
| bsearch.add_sample(34, 'skip', times=strategy.SKIP_FOREVER) |
| bsearch.add_sample(35, 'new', times=10) |
| bsearch.add_sample(99, 'new') |
| |
| self.assertEqual(bsearch.get_range(), (33, 35)) |
| |
| def test_noisy(self): |
| bsearch = strategy.NoisyBinarySearch( |
| self.rev_info, confidence=0.99, oracle=(0, 0.9)) |
| self.assertTrue(bsearch.is_noisy()) |
| for i in [0, 10, 20, 30, 40, 50]: |
| bsearch.add_sample(i, 'old') |
| for i in [60, 70, 80, 90, 99]: |
| bsearch.add_sample(i, 'new') |
| |
| prob = bsearch.get_prob() |
| self.assertLess(prob[5], prob[15]) |
| self.assertLess(prob[15], prob[25]) |
| self.assertLess(prob[25], prob[35]) |
| self.assertLess(prob[35], prob[45]) |
| self.assertLess(prob[45], prob[55]) |
| self.assertAlmostEqual(prob[55], 0.09, 6) |
| self.assertAlmostEqual(prob[65], 0) |
| self.assertAlmostEqual(prob[75], 0) |
| self.assertAlmostEqual(prob[85], 0) |
| self.assertAlmostEqual(prob[95], 0) |
| |
| self.assertEqual(bsearch.get_range(), (40, 60)) |
| self.assertEqual(bsearch.next_idx(), 54) |
| assert not bsearch.is_done() |
| |
| bsearch.show_summary() |
| |
| def test_classic(self): |
| self.rev_info = [] |
| for i in range(465): |
| self.rev_info.append(core.RevInfo(str(i))) |
| bsearch = strategy.NoisyBinarySearch(self.rev_info, 462, 463) |
| assert not bsearch.is_done() |
| |
| bsearch.add_sample(462, 'old') |
| assert not bsearch.is_done() |
| |
| bsearch.add_sample(463, 'new') |
| assert bsearch.is_done() |
| self.assertAlmostEqual(bsearch.get_prob()[463], 1) |
| self.assertEqual(bsearch.get_range(), (462, 463)) |
| self.assertEqual(bsearch.remaining_steps(), 0) |
| bsearch.show_summary() |
| |
| def perform_search(self, size, ans, old_p, new_p, confidence, random_seed=0): |
| """Performs full noisy binary search. |
| |
| Args: |
| size: Number of candidates. |
| ans: Position of answer. |
| old_p: False-positive probability for old candidates. |
| new_p: True-positive probability for new candidates. |
| confidence: Required confidence. |
| random_seed: Random seed. |
| |
| Returns: |
| (count, guess): |
| count: Number of eval. |
| guess: Best guess. |
| """ |
| rev_info = [core.RevInfo(str(i)) for i in range(size)] |
| bsearch = strategy.NoisyBinarySearch( |
| rev_info, |
| confidence=confidence, |
| oracle=(old_p, new_p), |
| # Verify range until success. |
| verify_confidence=1.0) |
| |
| rng = random.Random(random_seed) |
| count = 0 |
| while not bsearch.is_done(): |
| idx = bsearch.next_idx() |
| if rng.random() < (old_p if idx < ans else new_p): |
| result = 'new' |
| else: |
| result = 'old' |
| count += 1 |
| bsearch.add_sample(idx, result) |
| |
| return count, bsearch.get_best_guess() |
| |
| def test_classic_search(self): |
| # Settings for non-noisy case. |
| old_p = 0 |
| new_p = 1 |
| confidence = 0.99 # Doesn't matter. |
| |
| # Extreme case. |
| count, result = self.perform_search(2, 1, old_p, new_p, confidence) |
| self.assertEqual(count, 2) |
| self.assertEqual(result, 1) |
| |
| # Tests answer positions. |
| for i in range(1, 50): |
| count, result = self.perform_search(50, i, old_p, new_p, confidence) |
| self.assertLessEqual(7, count) |
| self.assertLessEqual(count, 8) |
| self.assertEqual(result, i) |
| |
| # Slightly larger case. |
| count, result = self.perform_search(1000, 42, old_p, new_p, confidence) |
| self.assertEqual(count, 11) |
| self.assertEqual(result, 42) |
| |
| def test_half_noisy_search(self): |
| """Tests noisy search with single side flaky.""" |
| # Extreme case. |
| count, result = self.perform_search(2, 1, 0.0, 0.5, 0.999) |
| self.assertEqual(result, 1) |
| |
| count, result = self.perform_search(3, 2, 0.0, 0.5, 0.999) |
| # 1 - 0.5**10 > 0.999, at least 10 times to have enough confidence. |
| self.assertGreaterEqual(count, 10) |
| self.assertEqual(result, 2) |
| |
| # Tests answer positions. |
| for i in range(1, 10): |
| self.perform_search(10, i, 0, 0.3, 0.999) |
| self.perform_search(10, i, 0.3, 1, 0.999) |
| |
| # Larger case. |
| # Only makes sure it works. Don't verify the values due to randomness. |
| self.perform_search(100, 42, 0, 0.3, 0.999) |
| |
| def test_full_noisy_search(self): |
| """Tests noisy binary search with flaky on two sides.""" |
| # Only makes sure it works. Don't verify the values due to randomness. |
| self.perform_search(1000, 42, 0.05, 0.7, 0.999) |
| |
| # Tests answer positions. |
| for i in range(1, 20): |
| self.perform_search(20, i, 0.1, 0.9, 0.999) |
| |
| def test_confidence(self): |
| counter_guess = collections.Counter() |
| sum_count = 0 |
| n = 1000 |
| ans = 7 |
| for i in range(n): |
| count, guess = self.perform_search( |
| 10, ans, 0.1, 0.9, confidence=0.9, random_seed=i) |
| counter_guess[guess] += 1 |
| sum_count += count |
| |
| self.assertLess(float(sum_count) / n, 9) |
| common_guess = counter_guess.most_common(1)[0] |
| self.assertEqual(common_guess[0], ans) |
| # It's expected that not all guesses are correct. |
| self.assertLess(common_guess[1], n) |
| # With 0.9 confidence, about 0.9 of guesses are correct. |
| self.assertGreater(common_guess[1], n * 0.9) |
| |
| def test_value_bisect(self): |
| bsearch = strategy.NoisyBinarySearch( |
| self.rev_info, 0, 99, old_value=10.0, new_value=20.0) |
| self.assertTrue(bsearch.is_value_bisection()) |
| self.assertEqual(bsearch.classify_result_from_values([100]), 'new') |
| self.assertEqual(bsearch.classify_result_from_values([0]), 'old') |
| |
| # verify the range |
| self.assertEqual(bsearch.next_idx(), 99) |
| bsearch.add_sample(99, 'new', values=[20.0], eval_time=1000) |
| self.assertEqual(bsearch.next_idx(), 0) |
| bsearch.add_sample(0, 'old', values=[11.0], eval_time=1000) |
| |
| # middle point |
| self.assertEqual(bsearch.next_idx(), 49) |
| |
| def test_value_bisect_unreproducible(self): |
| bsearch = strategy.NoisyBinarySearch( |
| self.rev_info, 0, 99, old_value=10.0, new_value=20.0) |
| self.assertEqual(bsearch.next_idx(), 99) |
| |
| with self.assertRaises(errors.VerifyNewBehaviorFailed): |
| bsearch.add_sample(99, 'old', values=[1.0], eval_time=1000) |
| |
| def test_value_bisect_noisy_unreproducible(self): |
| bsearch = strategy.NoisyBinarySearch( |
| self.rev_info, |
| 0, |
| 99, |
| old_value=10.0, |
| new_value=20.0, |
| observation='old=1/100,new=99/100') |
| self.assertEqual(bsearch.next_idx(), 99) |
| |
| # It's acceptable to have opposite status few times due to noise. |
| bsearch.add_sample(99, 'old', values=[1.0], eval_time=1000) |
| |
| # Pretty high confidence that opposite 100 times is almost impossible. |
| with self.assertRaises(errors.VerifyNewBehaviorFailed): |
| for _ in range(100): |
| bsearch.add_sample(99, 'old', values=[1.0], eval_time=1000) |
| |
| def test_recompute_init_values(self): |
| bsearch = strategy.NoisyBinarySearch( |
| self.rev_info, |
| 0, |
| 99, |
| old_value=10.0, |
| new_value=20.0, |
| recompute_init_values=True) |
| self.assertTrue(bsearch.is_value_bisection()) |
| self.assertEqual(bsearch.classify_result_from_values([100]), 'init') |
| self.assertEqual(bsearch.classify_result_from_values([0]), 'init') |
| self.assertEqual(bsearch.next_idx(), 99) |
| self.assertEqual(bsearch.next_idx(), 99) |
| |
| # same value, twice is enough |
| bsearch.add_sample(99, 'init', values=[20.0], eval_time=1000) |
| self.assertEqual(bsearch.next_idx(), 99) |
| bsearch.add_sample(99, 'init', values=[20.0], eval_time=1000) |
| self.assertEqual(bsearch.next_idx(), 0) |
| |
| # different value, at least 3 times |
| bsearch.add_sample(0, 'init', values=[10.0], eval_time=1000) |
| self.assertEqual(bsearch.next_idx(), 0) |
| bsearch.add_sample(0, 'init', values=[11.0], eval_time=1000) |
| self.assertEqual(bsearch.next_idx(), 0) |
| bsearch.add_sample(0, 'init', values=[12.0], eval_time=1000) |
| self.assertEqual(bsearch.next_idx(), 49) |
| |
| def test_recompute_init_values_unreproducible(self): |
| bsearch = strategy.NoisyBinarySearch( |
| self.rev_info, |
| 0, |
| 99, |
| old_value=10.0, |
| new_value=20.0, |
| recompute_init_values=True) |
| |
| # same value, twice is enough |
| bsearch.add_sample(99, 'init', values=[20.0], eval_time=1000) |
| self.assertEqual(bsearch.next_idx(), 99) |
| bsearch.add_sample(99, 'init', values=[20.0], eval_time=1000) |
| self.assertEqual(bsearch.next_idx(), 0) |
| |
| # different value, at least 3 times |
| bsearch.add_sample(0, 'init', values=[19.0], eval_time=1000) |
| self.assertEqual(bsearch.next_idx(), 0) |
| bsearch.add_sample(0, 'init', values=[18.0], eval_time=1000) |
| self.assertEqual(bsearch.next_idx(), 0) |
| with self.assertRaises(errors.WrongAssumption): |
| bsearch.add_sample(0, 'init', values=[21.0], eval_time=1000) |
| |
| def test_recompute_init_values_undecidable(self): |
| bsearch = strategy.NoisyBinarySearch( |
| self.rev_info, |
| 0, |
| 99, |
| old_value=10.0, |
| new_value=20.0, |
| recompute_init_values=True) |
| |
| # same value, twice is enough |
| bsearch.add_sample(99, 'init', values=[20.0], eval_time=1000) |
| bsearch.add_sample(99, 'init', values=[20.0], eval_time=1000) |
| |
| # One sample with good trend |
| bsearch.add_sample(0, 'init', values=[10.0], eval_time=100) |
| self.assertEqual(bsearch.next_idx(), 0) |
| # But others are not, so need more samples |
| for _ in range(10): |
| bsearch.add_sample(0, 'init', values=[17.0], eval_time=100) |
| self.assertEqual(bsearch.next_idx(), 0) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |