[tools] Replace yield-based output processing with explicit method calls

This also makes processing immediate, i.e. outputs are parsed into results as
soon as test run is finished, which will allow us to implement logic that checks
whether we have enough runs based on already-accumulated results.

Since we process each output immediately, we do not need Measurement class any
longer and its ConsumeOutput is now integrated directly into TraceConfig.
Similarly AccumulateResults is replaced with RunnableConfig.ProcessOutput as we
do not accumulate results any longer.

R=machenbach@chromium.org

No-Try: true
No-Tree-Checks: true
Bug: chromium:880724
Change-Id: I0fc4846024c43258c10ba8d568312aa4746d746f
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/1584325
Commit-Queue: Sergiy Belozorov <sergiyb@chromium.org>
Reviewed-by: Michael Achenbach <machenbach@chromium.org>
Cr-Commit-Position: refs/heads/master@{#61048}
diff --git a/tools/run_perf.py b/tools/run_perf.py
index 9bbecf4..973dec5 100755
--- a/tools/run_perf.py
+++ b/tools/run_perf.py
@@ -192,38 +192,41 @@
     self.near_timeouts = []  # > 90% of the max runtime
     self.runnables = {}
 
-  def AddTraceResults(self, trace, results, stddev):
+  def AddTraceResult(self, trace, result, stddev):
     if trace.name not in self.traces:
       self.traces[trace.name] = {
         'graphs': trace.graphs,
         'units': trace.units,
-        'results': results,
+        'results': [result],
         'stddev': stddev or '',
       }
     else:
       existing_entry = self.traces[trace.name]
       assert trace.graphs == existing_entry['graphs']
       assert trace.units == existing_entry['units']
-      assert not (stddev and existing_entry['stddev'])
-      existing_entry['stddev'] = stddev
-      existing_entry['results'].extend(results)
+      if stddev:
+        existing_entry['stddev'] = stddev
+      existing_entry['results'].append(result)
 
-  def AddErrors(self, errors):
-    self.errors.extend(errors)
+  def TraceHasStdDev(self, trace):
+    return trace.name in self.traces and self.traces[trace.name]['stddev'] != ''
 
-  def AddRunnableDurations(self, runnable, durations):
-    """Adds a list of durations of the different runs of the runnable."""
+  def AddError(self, error):
+    self.errors.append(error)
+
+  def AddRunnableDuration(self, runnable, duration):
+    """Records a duration of a specific run of the runnable."""
     if runnable.name not in self.runnables:
       self.runnables[runnable.name] = {
         'graphs': runnable.graphs,
-        'durations': durations,
+        'durations': [duration],
         'timeout': runnable.timeout,
       }
     else:
       existing_entry = self.runnables[runnable.name]
       assert runnable.timeout == existing_entry['timeout']
       assert runnable.graphs == existing_entry['graphs']
-      existing_entry['durations'].extend(durations)
+      existing_entry['durations'].append(duration)
 
   def ToDict(self):
     return {
@@ -242,64 +245,6 @@
     return json.dumps(self.ToDict(), indent=2, separators=(',', ': '))
 
 
-class Measurement(object):
-  """Represents a series of results of one trace.
-
-  The results are from repetitive runs of the same executable. They are
-  gathered by repeated calls to ConsumeOutput.
-  """
-  def __init__(self, trace, results_regexp, stddev_regexp):
-    self.trace = trace
-    self.results_regexp = results_regexp
-    self.stddev_regexp = stddev_regexp
-    self.results = []
-    self.errors = []
-    self.stddev = ''
-
-  def ConsumeOutput(self, output):
-    try:
-      result = re.search(self.results_regexp, output.stdout, re.M).group(1)
-      self.results.append(str(float(result)))
-    except ValueError:
-      self.errors.append('Regexp "%s" returned a non-numeric for test %s.'
-                         % (self.results_regexp, self.trace.name))
-    except:
-      self.errors.append('Regexp "%s" did not match for test %s.'
-                         % (self.results_regexp, self.trace.name))
-
-    try:
-      if self.stddev_regexp and self.stddev:
-        self.errors.append('Test %s should only run once since a stddev '
-                           'is provided by the test.' % self.trace.name)
-      if self.stddev_regexp:
-        self.stddev = re.search(
-            self.stddev_regexp, output.stdout, re.M).group(1)
-    except:
-      self.errors.append('Regexp "%s" did not match for test %s.'
-                         % (self.stddev_regexp, self.trace.name))
-
-  def UpdateResults(self, result_tracker):
-    result_tracker.AddTraceResults(self.trace, self.results, self.stddev)
-    result_tracker.AddErrors(self.errors)
-
-  def GetResults(self):
-    return self.results
-
-
-class NullMeasurement(object):
-  """Null object to avoid having extra logic for configurations that don't
-  require secondary run, e.g. CI bots.
-  """
-  def ConsumeOutput(self, output):
-    pass
-
-  def UpdateResults(self, result_tracker):
-    pass
-
-  def GetResults(self):
-    return []
-
-
 def Unzip(iterable):
   left = []
   right = []
@@ -328,52 +273,6 @@
   return new_output
 
 
-def AccumulateResults(
-    graph, output_iter, perform_measurement, calc_total, result_tracker):
-  """Iterates over the output of multiple benchmark reruns and accumulates
-  results for a configured list of traces.
-
-  Args:
-    graph: Parent GraphConfig for which results are to be accumulated.
-    output_iter: Iterator over the output of each test run.
-    perform_measurement: Whether to actually run tests and perform measurements.
-                         This is needed so that we reuse this script for both CI
-                         and trybot, but want to ignore second run on CI without
-                         having to spread this logic throughout the script.
-    calc_total: Boolean flag to specify the calculation of a summary trace.
-    result_tracker: ResultTracker object to be updated.
-  """
-  measurements = [trace.CreateMeasurement(perform_measurement)
-                  for trace in graph.children]
-  for output in output_iter():
-    for measurement in measurements:
-      measurement.ConsumeOutput(output)
-
-  for measurement in measurements:
-    measurement.UpdateResults(result_tracker)
-
-  raw_results = [m.GetResults() for m in measurements]
-  if not raw_results or not calc_total:
-    return
-
-  # Assume all traces have the same structure.
-  if len(set(map(len, raw_results))) != 1:
-    result_tracker.AddErrors(
-        ['Not all traces have the same number of results. Can not compute '
-         'total for %s' % graph.name])
-    return
-
-  # Calculate the geometric means for all traces. Above we made sure that
-  # there is at least one trace and that the number of results is the same
-  # for each trace.
-  n_results = len(raw_results[0])
-  total_results = [GeometricMean(r[i] for r in raw_results)
-                   for i in range(0, n_results)]
-  total_trace = TraceConfig(
-      {'name': 'Total', 'units': graph.children[0].units}, graph, graph.arch)
-  result_tracker.AddTraceResults(total_trace, total_results, '')
-
-
 class Node(object):
   """Represents a node in the suite tree structure."""
   def __init__(self, *args):
@@ -417,7 +316,6 @@
   def __init__(self, suite, parent, arch):
     super(GraphConfig, self).__init__()
     self._suite = suite
-    self.arch = arch
 
     assert isinstance(suite.get('path', []), list)
     assert isinstance(suite.get('owners', []), list)
@@ -480,11 +378,46 @@
     assert self.results_regexp
     assert self.owners
 
-  def CreateMeasurement(self, perform_measurement):
-    if not perform_measurement:
-      return NullMeasurement()
+  def ConsumeOutput(self, output, result_tracker):
+    """Extracts trace results from the output.
 
-    return Measurement(self, self.results_regexp, self.stddev_regexp)
+    Args:
+      output: Output object from the test run.
+      result_tracker: Result tracker to be updated.
+
+    Returns:
+      The raw extracted result value or None if an error occurred.
+    """
+    result = None
+    stddev = None
+
+    try:
+      result = str(float(
+        re.search(self.results_regexp, output.stdout, re.M).group(1)))
+    except ValueError:
+      result_tracker.AddError(
+          'Regexp "%s" returned a non-numeric for test %s.' %
+          (self.results_regexp, self.name))
+    except:
+      result_tracker.AddError(
+          'Regexp "%s" did not match for test %s.' %
+          (self.results_regexp, self.name))
+
+    try:
+      if self.stddev_regexp:
+        if result_tracker.TraceHasStdDev(self):
+          result_tracker.AddError(
+              'Test %s should only run once since a stddev is provided by the '
+              'test.' % self.name)
+        stddev = re.search(self.stddev_regexp, output.stdout, re.M).group(1)
+    except:
+      result_tracker.AddError(
+          'Regexp "%s" did not match for test %s.' %
+          (self.stddev_regexp, self.name))
+
+    if result:
+      result_tracker.AddTraceResult(self, result, stddev)
+    return result
 
 
 class RunnableConfig(GraphConfig):
@@ -494,20 +427,12 @@
     super(RunnableConfig, self).__init__(suite, parent, arch)
     self.has_timeouts = False
     self.has_near_timeouts = False
+    self.arch = arch
 
   @property
   def main(self):
     return self._suite.get('main', '')
 
-  def PostProcess(self, outputs_iter):
-    if self.results_processor:
-      def it():
-        for i, output in enumerate(outputs_iter()):
-          yield RunResultsProcessor(self.results_processor, output, i + 1)
-      return it
-    else:
-      return outputs_iter
-
   def ChangeCWD(self, suite_path):
     """Changes the cwd to to path defined in the current graph.
 
@@ -539,23 +464,37 @@
         args=self.GetCommandFlags(extra_flags=extra_flags),
         timeout=self.timeout or 60)
 
-  def Run(self, runner, secondary, result_tracker, results_secondary):
-    """Iterates over several runs and handles the output for all traces."""
-    output, output_secondary = Unzip(runner())
-    AccumulateResults(
-        self,
-        output_iter=self.PostProcess(output),
-        perform_measurement=True,
-        calc_total=self.total,
-        result_tracker=result_tracker,
-     )
-    AccumulateResults(
-        self,
-        output_iter=self.PostProcess(output_secondary),
-        perform_measurement=secondary,  # only run second time on trybots
-        calc_total=self.total,
-        result_tracker=results_secondary,
-    )
+  def ProcessOutput(self, output, result_tracker, count):
+    """Processes test run output and updates result tracker.
+
+    Args:
+      output: Output object from the test run.
+      result_tracker: ResultTracker object to be updated.
+      count: Index of the test run (used for better logging).
+    """
+    result_tracker.AddRunnableDuration(self, output.duration)
+    if self.results_processor:
+      output = RunResultsProcessor(self.results_processor, output, count)
+
+    results_for_total = []
+    for trace in self.children:
+      result = trace.ConsumeOutput(output, result_tracker)
+      if result:
+        results_for_total.append(result)
+
+    if self.total:
+      # Produce total metric only when all traces have produced results.
+      if len(self.children) != len(results_for_total):
+        result_tracker.AddError(
+            'Not all traces have produced results. Can not compute total for '
+            '%s.' % self.name)
+        return
+
+      # Calculate total as a the geometric mean for results from all traces.
+      total_trace = TraceConfig(
+          {'name': 'Total', 'units': self.children[0].units}, self, self.arch)
+      result_tracker.AddTraceResult(
+          total_trace, GeometricMean(results_for_total), '')
 
 
 class RunnableTraceConfig(TraceConfig, RunnableConfig):
@@ -563,16 +502,9 @@
   def __init__(self, suite, parent, arch):
     super(RunnableTraceConfig, self).__init__(suite, parent, arch)
 
-  def Run(self, runner, secondary, result_tracker, results_secondary):
-    """Iterates over several runs and handles the output."""
-    measurement = self.CreateMeasurement(perform_measurement=True)
-    measurement_secondary = self.CreateMeasurement(
-        perform_measurement=secondary)
-    for output, output_secondary in runner():
-      measurement.ConsumeOutput(output)
-      measurement_secondary.ConsumeOutput(output_secondary)
-    measurement.UpdateResults(result_tracker)
-    measurement_secondary.UpdateResults(results_secondary)
+  def ProcessOutput(self, output, result_tracker, count):
+    result_tracker.AddRunnableDuration(self, output.duration)
+    self.ConsumeOutput(output, result_tracker)
 
 
 def MakeGraphConfig(suite, arch, parent):
@@ -673,19 +605,20 @@
       logging.warning('>>> Test crashed with exit code %d.', output.exit_code)
     return output
 
-  def Run(self, runnable, count):
+  def Run(self, runnable, count, secondary):
     """Execute the benchmark's main file.
 
-    If args.shell_dir_secondary is specified, the benchmark is run twice, e.g.
-    with and without patch.
     Args:
       runnable: A Runnable benchmark instance.
       count: The number of this (repeated) run.
-    Returns: A tuple with the two benchmark outputs. The latter will be None if
-             args.shell_dir_secondary was not specified.
+      secondary: True if secondary run should be executed.
+
+    Returns:
+      A tuple with the two benchmark outputs. The latter will be NULL_OUTPUT if
+      secondary is False.
     """
     output = self._LoggedRun(runnable, count, secondary=False)
-    if self.shell_dir_secondary:
+    if secondary:
       return output, self._LoggedRun(runnable, count, secondary=True)
     else:
       return output, NULL_OUTPUT
@@ -1056,8 +989,7 @@
                                   disable_aslr = args.noaslr) as conf:
     for path in args.suite:
       if not os.path.exists(path):  # pragma: no cover
-        result_tracker.AddErrors([
-            'Configuration file %s does not exist.' % path])
+        result_tracker.AddError('Configuration file %s does not exist.' % path)
         continue
 
       with open(path) as f:
@@ -1087,35 +1019,28 @@
         durations = []
         durations_secondary = []
 
-        def Runner():
-          """Output generator that reruns several times."""
-          for i in range(0, max(1, args.run_count or runnable.run_count)):
-            attempts_left = runnable.retry_count + 1
-            while attempts_left:
-              output, output_secondary = platform.Run(runnable, i)
-              if output.IsSuccess() and output_secondary.IsSuccess():
-                durations.append(output.duration)
-                if output_secondary is not NULL_OUTPUT:
-                  durations_secondary.append(output_secondary.duration)
-                yield output, output_secondary
-                break
-              attempts_left -= 1
-              if not attempts_left:  # ignore failures until last attempt
-                have_failed_tests[0] = True
-              else:
-                logging.info('>>> Retrying suite: %s', runnable_name)
+        for i in range(0, max(1, args.run_count or runnable.run_count)):
+          attempts_left = runnable.retry_count + 1
+          while attempts_left:
+            output, output_secondary = platform.Run(
+                runnable, i, secondary=args.shell_dir_secondary)
+            if output.IsSuccess() and output_secondary.IsSuccess():
+              runnable.ProcessOutput(output, result_tracker, i)
+              if output_secondary is not NULL_OUTPUT:
+                runnable.ProcessOutput(
+                    output_secondary, result_tracker_secondary, i)
+              break
 
-        # Let runnable iterate over all runs and handle output.
-        runnable.Run(Runner, args.shell_dir_secondary, result_tracker,
-                     result_tracker_secondary)
+            attempts_left -= 1
+            if not attempts_left:  # ignore failures until last attempt
+              have_failed_tests[0] = True
+            else:
+              logging.info('>>> Retrying suite: %s', runnable_name)
+
         if runnable.has_timeouts:
           result_tracker.timeouts.append(runnable_name)
         if runnable.has_near_timeouts:
           result_tracker.near_timeouts.append(runnable_name)
-        result_tracker.AddRunnableDurations(runnable, durations)
-        if durations_secondary:
-          result_tracker_secondary.AddRunnableDurations(
-              runnable, durations_secondary)
 
       platform.PostExecution()
 
@@ -1124,10 +1049,11 @@
     else:  # pragma: no cover
       print('Primary results:', result_tracker)
 
-  if args.json_test_results_secondary:
-    result_tracker_secondary.WriteToFile(args.json_test_results_secondary)
-  else:  # pragma: no cover
-    print('Secondary results:', result_tracker_secondary)
+  if args.shell_dir_secondary:
+    if args.json_test_results_secondary:
+      result_tracker_secondary.WriteToFile(args.json_test_results_secondary)
+    else:  # pragma: no cover
+      print('Secondary results:', result_tracker_secondary)
 
   if (result_tracker.errors or result_tracker_secondary.errors or
       have_failed_tests[0]):
diff --git a/tools/unittests/run_perf_test.py b/tools/unittests/run_perf_test.py
index 154cf6f..e18bae2 100755
--- a/tools/unittests/run_perf_test.py
+++ b/tools/unittests/run_perf_test.py
@@ -381,14 +381,13 @@
         mock.MagicMock(return_value={'is_android': False})).start()
     self.assertEqual(1, self._CallMain('--buildbot'))
     self._VerifyResults('test', 'score', [
-      {'name': 'Richards', 'results': [], 'stddev': ''},
       {'name': 'DeltaBlue', 'results': ['10657567.0'], 'stddev': ''},
     ])
     self._VerifyErrors(
         ['Regexp "^Richards: (.+)$" '
          'returned a non-numeric for test test/Richards.',
-         'Not all traces have the same number of results. Can not compute '
-         'total for test'])
+         'Not all traces have produced results. Can not compute total for '
+         'test.'])
     self._VerifyMock(os.path.join('out', 'Release', 'd7'), '--flag', 'run.js')
 
   def testRegexpNoMatch(self):
@@ -396,7 +395,6 @@
     self._MockCommand(['.'], ['x\nRichaards: 1.234\nDeltaBlue: 10657567\ny\n'])
     self.assertEqual(1, self._CallMain())
     self._VerifyResults('test', 'score', [
-      {'name': 'Richards', 'results': [], 'stddev': ''},
       {'name': 'DeltaBlue', 'results': ['10657567.0'], 'stddev': ''},
     ])
     self._VerifyErrors(
@@ -409,10 +407,7 @@
     self._MockCommand(
         ['.'], ['x\nRichards: 1.234\nDeltaBlue: 10657567\ny\n'], exit_code=-1)
     self.assertEqual(1, self._CallMain())
-    self._VerifyResults('test', 'score', [
-      {'name': 'Richards', 'results': [], 'stddev': ''},
-      {'name': 'DeltaBlue', 'results': [], 'stddev': ''},
-    ])
+    self._VerifyResults('test', 'score', [])
     self._VerifyErrors([])
     self._VerifyMock(
         os.path.join('out', 'x64.release', 'd7'), '--flag', 'run.js')
@@ -423,10 +418,7 @@
     self._WriteTestInput(test_input)
     self._MockCommand(['.'], [''], timed_out=True)
     self.assertEqual(1, self._CallMain())
-    self._VerifyResults('test', 'score', [
-      {'name': 'Richards', 'results': [], 'stddev': ''},
-      {'name': 'DeltaBlue', 'results': [], 'stddev': ''},
-    ])
+    self._VerifyResults('test', 'score', [])
     self._VerifyErrors([])
     self._VerifyMock(os.path.join('out', 'x64.release', 'd7'),
                      '--flag', 'run.js', timeout=70)