Add ThreadPool.abort() to stop processing early.

Make Progress more responsive in ThreadPool.join().
Change update_item() to allow decrement.

R=vadimsh@chromium.org
BUG=

Review URL: https://chromiumcodereview.appspot.com/25478010

git-svn-id: svn://svn.chromium.org/chrome/trunk/tools/swarm_client@226540 0039d316-1c4b-4281-b951-d872f2087c98
diff --git a/googletest/run_test_cases.py b/googletest/run_test_cases.py
index ded1cdd..adddef4 100755
--- a/googletest/run_test_cases.py
+++ b/googletest/run_test_cases.py
@@ -864,7 +864,7 @@
         #   detect that a test has been successfully retried).
         if i['output']:
           line += '\n' + i['output']
-      self.progress.update_item(line, index=True, size=need_to_retry)
+      self.progress.update_item(line, index=1, size=int(need_to_retry))
 
       if need_to_retry:
         priority = self._retry(priority, i['test_case'], try_count)
@@ -889,7 +889,7 @@
           self.progress.update_item(output, raw=True)
         for i in results:
           priority = self._retry(priority, i['test_case'], try_count)
-          self.progress.update_item('', size=True)
+          self.progress.update_item('', size=1)
 
     # Only yield once the process completed when there is only one test case as
     # a safety precaution.
diff --git a/googletest/trace_test_cases.py b/googletest/trace_test_cases.py
index 05ed245..1808479 100755
--- a/googletest/trace_test_cases.py
+++ b/googletest/trace_test_cases.py
@@ -67,9 +67,9 @@
           'Tracing %s done: %d, %.1fs' % (test_case, returncode,  duration))
       if retry:
         self.progress.update_item(
-            '%s - %d' % (test_case, retry), index=True, size=not valid)
+            '%s - %d' % (test_case, retry), index=1, size=int(not valid))
       else:
-        self.progress.update_item(test_case, index=True, size=not valid)
+        self.progress.update_item(test_case, index=1, size=int(not valid))
       if valid:
         break
     return out
diff --git a/tests/threading_utils_test.py b/tests/threading_utils_test.py
index 723d4db..e06dc65 100755
--- a/tests/threading_utils_test.py
+++ b/tests/threading_utils_test.py
@@ -266,6 +266,34 @@
       actual = pool.join()
     self.assertEqual(['a', 'c', 'b'], actual)
 
+  @timeout(2)
+  def test_abort(self):
+    # Trigger a ridiculous amount of tasks, and abort the remaining.
+    with threading_utils.ThreadPool(2, 2, 0) as pool:
+      # Allow 10 tasks to run initially.
+      sem = threading.Semaphore(10)
+
+      def grab_and_return(x):
+        sem.acquire()
+        return x
+
+      for i in range(100):
+        pool.add_task(0, grab_and_return, i)
+
+      # Running at 11 would hang.
+      results = [pool.get_one_result() for _ in xrange(10)]
+      # At that point, there's 10 completed tasks and 2 tasks hanging, 88
+      # pending.
+      self.assertEqual(88, pool.abort())
+      # Calling .join() before these 2 .release() would hang.
+      sem.release()
+      sem.release()
+      results.extend(pool.join())
+    # The results *may* be out of order. Even if the calls are processed
+    # strictly in FIFO mode, a thread may preempt another one when returning the
+    # values.
+    self.assertEqual(range(12), sorted(results))
+
 
 class AutoRetryThreadPoolTest(unittest.TestCase):
   def test_bad_class(self):
diff --git a/tools/isolateserver_load_test.py b/tools/isolateserver_load_test.py
index f94c74d..a8fdf05 100755
--- a/tools/isolateserver_load_test.py
+++ b/tools/isolateserver_load_test.py
@@ -57,10 +57,10 @@
     self.total = 0
 
   def increment_index(self, name):
-    self.update_item(name, index=True)
+    self.update_item(name, index=1)
 
   def increment_count(self):
-    self.update_item('', size=True)
+    self.update_item('', size=1)
 
   def gen_line(self, name):
     """Generates the line to be printed.
diff --git a/tools/run_swarm_tests_on_swarm.py b/tools/run_swarm_tests_on_swarm.py
index 6916cbe..4e3132f 100755
--- a/tools/run_swarm_tests_on_swarm.py
+++ b/tools/run_swarm_tests_on_swarm.py
@@ -75,11 +75,11 @@
       step_name = '%s/%s (%3.2fs)' % (platform, test_name, duration)
       if returncode:
         self.progress.update_item(
-            'Failed to archive %s\n%s' % (step_name, stdout), index=True)
+            'Failed to archive %s\n%s' % (step_name, stdout), index=1)
       else:
         hash_value = hashlib.sha1(open(isolated, 'rb').read()).hexdigest()
         logging.info('%s: %s', step_name, hash_value)
-        self.progress.update_item('Archived %s' % step_name, index=True)
+        self.progress.update_item('Archived %s' % step_name, index=1)
         self.add_task(0, self.trigger, test, platform, hash_value)
     finally:
       try:
@@ -111,9 +111,9 @@
     step_name = '%s/%s (%3.2fs)' % (platform, test_name, duration)
     if returncode:
       self.progress.update_item(
-          'Failed to trigger %s\n%s' % (step_name, stdout), index=True)
+          'Failed to trigger %s\n%s' % (step_name, stdout), index=1)
     else:
-      self.progress.update_item('Triggered %s' % step_name, index=True)
+      self.progress.update_item('Triggered %s' % step_name, index=1)
       self.add_task(0, self.get_result, test, platform)
     return None
 
@@ -131,9 +131,9 @@
     # Only print the output for failures, successes are unexciting.
     if returncode:
       self.progress.update_item(
-          'Failed %s:\n%s' % (step_name, stdout), index=True)
+          'Failed %s:\n%s' % (step_name, stdout), index=1)
       return (test_name, platform, stdout)
-    self.progress.update_item('Passed %s' % step_name, index=True)
+    self.progress.update_item('Passed %s' % step_name, index=1)
     return None
 
 
diff --git a/utils/threading_utils.py b/utils/threading_utils.py
index e0ef216..12de304 100644
--- a/utils/threading_utils.py
+++ b/utils/threading_utils.py
@@ -265,6 +265,23 @@
       'Thread pool \'%s\' closed: spawned %d threads total',
       self._prefix, len(self._workers))
 
+  def abort(self):
+    """Empties the queue.
+
+    To be used when the pool should stop early, like when Ctrl-C was detected.
+
+    Returns:
+      Number of tasks cancelled.
+    """
+    index = 0
+    while True:
+      try:
+        self.tasks.get_nowait()
+        self.tasks.task_done()
+        index += 1
+      except Queue.Empty:
+        return index
+
   def __enter__(self):
     """Enables 'with' statement."""
     return self
@@ -369,14 +386,18 @@
     # To be used in all threads.
     self.queued_lines = Queue.Queue()
 
-  def update_item(self, name, index=False, size=False, raw=False):
+  def update_item(self, name, index=0, size=0, raw=False):
     """Queue information to print out.
 
     Arguments:
-      index: index should be incremented.
-      size: total size should be incremented.
+      index: increment to add to index. usually 0 or 1.
+      size: increment to add to size. usually 0 or 1.
       raw: if True, prints the data without the header.
     """
+    assert isinstance(name, str)
+    assert isinstance(index, int)
+    assert isinstance(size, int)
+    assert isinstance(raw, bool)
     self.queued_lines.put((name, index, size, raw))
 
   def print_update(self):
@@ -392,12 +413,9 @@
       except Queue.Empty:
         break
 
-      if size:
-        self.size += 1
-        self.value_changed = True
-      if index:
-        self.index += 1
-        self.value_changed = True
+      self.size += size
+      self.index += index
+      self.value_changed = bool(size or index)
       if not name:
         # Even if raw=True, there's nothing to print.
         continue
@@ -487,7 +505,10 @@
     with self.all_tasks_done:
       while self.unfinished_tasks:
         self.progress.print_update()
-        self.all_tasks_done.wait(60.)
+        # Use a short wait timeout so updates are printed in a timely manner.
+        # TODO(maruel): Find a way so Progress.queue and self.all_tasks_done
+        # share the same underlying event so no polling is necessary.
+        self.all_tasks_done.wait(0.1)
       self.progress.print_update()