Add ThreadPool.abort() to stop processing early.
Make Progress more responsive in ThreadPool.join().
Change update_item() to allow decrement.
R=vadimsh@chromium.org
BUG=
Review URL: https://chromiumcodereview.appspot.com/25478010
git-svn-id: svn://svn.chromium.org/chrome/trunk/tools/swarm_client@226540 0039d316-1c4b-4281-b951-d872f2087c98
diff --git a/googletest/run_test_cases.py b/googletest/run_test_cases.py
index ded1cdd..adddef4 100755
--- a/googletest/run_test_cases.py
+++ b/googletest/run_test_cases.py
@@ -864,7 +864,7 @@
# detect that a test has been successfully retried).
if i['output']:
line += '\n' + i['output']
- self.progress.update_item(line, index=True, size=need_to_retry)
+ self.progress.update_item(line, index=1, size=int(need_to_retry))
if need_to_retry:
priority = self._retry(priority, i['test_case'], try_count)
@@ -889,7 +889,7 @@
self.progress.update_item(output, raw=True)
for i in results:
priority = self._retry(priority, i['test_case'], try_count)
- self.progress.update_item('', size=True)
+ self.progress.update_item('', size=1)
# Only yield once the process completed when there is only one test case as
# a safety precaution.
diff --git a/googletest/trace_test_cases.py b/googletest/trace_test_cases.py
index 05ed245..1808479 100755
--- a/googletest/trace_test_cases.py
+++ b/googletest/trace_test_cases.py
@@ -67,9 +67,9 @@
'Tracing %s done: %d, %.1fs' % (test_case, returncode, duration))
if retry:
self.progress.update_item(
- '%s - %d' % (test_case, retry), index=True, size=not valid)
+ '%s - %d' % (test_case, retry), index=1, size=int(not valid))
else:
- self.progress.update_item(test_case, index=True, size=not valid)
+ self.progress.update_item(test_case, index=1, size=int(not valid))
if valid:
break
return out
diff --git a/tests/threading_utils_test.py b/tests/threading_utils_test.py
index 723d4db..e06dc65 100755
--- a/tests/threading_utils_test.py
+++ b/tests/threading_utils_test.py
@@ -266,6 +266,34 @@
actual = pool.join()
self.assertEqual(['a', 'c', 'b'], actual)
+ @timeout(2)
+ def test_abort(self):
+ # Trigger a ridiculous amount of tasks, and abort the remaining.
+ with threading_utils.ThreadPool(2, 2, 0) as pool:
+ # Allow 10 tasks to run initially.
+ sem = threading.Semaphore(10)
+
+ def grab_and_return(x):
+ sem.acquire()
+ return x
+
+ for i in range(100):
+ pool.add_task(0, grab_and_return, i)
+
+ # Running at 11 would hang.
+ results = [pool.get_one_result() for _ in xrange(10)]
+ # At that point, there's 10 completed tasks and 2 tasks hanging, 88
+ # pending.
+ self.assertEqual(88, pool.abort())
+ # Calling .join() before these 2 .release() would hang.
+ sem.release()
+ sem.release()
+ results.extend(pool.join())
+ # The results *may* be out of order. Even if the calls are processed
+ # strictly in FIFO mode, a thread may preempt another one when returning the
+ # values.
+ self.assertEqual(range(12), sorted(results))
+
class AutoRetryThreadPoolTest(unittest.TestCase):
def test_bad_class(self):
diff --git a/tools/isolateserver_load_test.py b/tools/isolateserver_load_test.py
index f94c74d..a8fdf05 100755
--- a/tools/isolateserver_load_test.py
+++ b/tools/isolateserver_load_test.py
@@ -57,10 +57,10 @@
self.total = 0
def increment_index(self, name):
- self.update_item(name, index=True)
+ self.update_item(name, index=1)
def increment_count(self):
- self.update_item('', size=True)
+ self.update_item('', size=1)
def gen_line(self, name):
"""Generates the line to be printed.
diff --git a/tools/run_swarm_tests_on_swarm.py b/tools/run_swarm_tests_on_swarm.py
index 6916cbe..4e3132f 100755
--- a/tools/run_swarm_tests_on_swarm.py
+++ b/tools/run_swarm_tests_on_swarm.py
@@ -75,11 +75,11 @@
step_name = '%s/%s (%3.2fs)' % (platform, test_name, duration)
if returncode:
self.progress.update_item(
- 'Failed to archive %s\n%s' % (step_name, stdout), index=True)
+ 'Failed to archive %s\n%s' % (step_name, stdout), index=1)
else:
hash_value = hashlib.sha1(open(isolated, 'rb').read()).hexdigest()
logging.info('%s: %s', step_name, hash_value)
- self.progress.update_item('Archived %s' % step_name, index=True)
+ self.progress.update_item('Archived %s' % step_name, index=1)
self.add_task(0, self.trigger, test, platform, hash_value)
finally:
try:
@@ -111,9 +111,9 @@
step_name = '%s/%s (%3.2fs)' % (platform, test_name, duration)
if returncode:
self.progress.update_item(
- 'Failed to trigger %s\n%s' % (step_name, stdout), index=True)
+ 'Failed to trigger %s\n%s' % (step_name, stdout), index=1)
else:
- self.progress.update_item('Triggered %s' % step_name, index=True)
+ self.progress.update_item('Triggered %s' % step_name, index=1)
self.add_task(0, self.get_result, test, platform)
return None
@@ -131,9 +131,9 @@
# Only print the output for failures, successes are unexciting.
if returncode:
self.progress.update_item(
- 'Failed %s:\n%s' % (step_name, stdout), index=True)
+ 'Failed %s:\n%s' % (step_name, stdout), index=1)
return (test_name, platform, stdout)
- self.progress.update_item('Passed %s' % step_name, index=True)
+ self.progress.update_item('Passed %s' % step_name, index=1)
return None
diff --git a/utils/threading_utils.py b/utils/threading_utils.py
index e0ef216..12de304 100644
--- a/utils/threading_utils.py
+++ b/utils/threading_utils.py
@@ -265,6 +265,23 @@
'Thread pool \'%s\' closed: spawned %d threads total',
self._prefix, len(self._workers))
+ def abort(self):
+ """Empties the queue.
+
+ To be used when the pool should stop early, like when Ctrl-C was detected.
+
+ Returns:
+ Number of tasks cancelled.
+ """
+ index = 0
+ while True:
+ try:
+ self.tasks.get_nowait()
+ self.tasks.task_done()
+ index += 1
+ except Queue.Empty:
+ return index
+
def __enter__(self):
"""Enables 'with' statement."""
return self
@@ -369,14 +386,18 @@
# To be used in all threads.
self.queued_lines = Queue.Queue()
- def update_item(self, name, index=False, size=False, raw=False):
+ def update_item(self, name, index=0, size=0, raw=False):
"""Queue information to print out.
Arguments:
- index: index should be incremented.
- size: total size should be incremented.
+ index: increment to add to index. usually 0 or 1.
+ size: increment to add to size. usually 0 or 1.
raw: if True, prints the data without the header.
"""
+ assert isinstance(name, str)
+ assert isinstance(index, int)
+ assert isinstance(size, int)
+ assert isinstance(raw, bool)
self.queued_lines.put((name, index, size, raw))
def print_update(self):
@@ -392,12 +413,9 @@
except Queue.Empty:
break
- if size:
- self.size += 1
- self.value_changed = True
- if index:
- self.index += 1
- self.value_changed = True
+ self.size += size
+ self.index += index
+ self.value_changed = bool(size or index)
if not name:
# Even if raw=True, there's nothing to print.
continue
@@ -487,7 +505,10 @@
with self.all_tasks_done:
while self.unfinished_tasks:
self.progress.print_update()
- self.all_tasks_done.wait(60.)
+ # Use a short wait timeout so updates are printed in a timely manner.
+ # TODO(maruel): Find a way so Progress.queue and self.all_tasks_done
+ # share the same underlying event so no polling is necessary.
+ self.all_tasks_done.wait(0.1)
self.progress.print_update()