blob: 02163d2fc1e9102c1dd34e05f55955e8ce8ad09a [file] [log] [blame]
"""Tests for concurrency libraries."""
import os, os.path, sys, threading
import coverage
from tests.coveragetest import CoverageTest
# These libraries aren't always available, we'll skip tests if they aren't.
try:
import eventlet
except ImportError:
eventlet = None
try:
import gevent
except ImportError:
gevent = None
try:
import greenlet
except ImportError:
greenlet = None
# Are we running with the C tracer or not?
C_TRACER = os.getenv('COVERAGE_TEST_TRACER', 'c') == 'c'
def line_count(s):
"""How many non-blank non-comment lines are in `s`?"""
def code_line(l):
"""Is this a code line? Not blank, and not a full-line comment."""
return l.strip() and not l.strip().startswith('#')
return sum(1 for l in s.splitlines() if code_line(l))
class ConcurrencyTest(CoverageTest):
"""Tests of the concurrency support in coverage.py."""
LIMIT = 1000
# The code common to all the concurrency models.
COMMON = """
class Producer(threading.Thread):
def __init__(self, q):
threading.Thread.__init__(self)
self.q = q
def run(self):
for i in range({LIMIT}):
self.q.put(i)
self.q.put(None)
class Consumer(threading.Thread):
def __init__(self, q):
threading.Thread.__init__(self)
self.q = q
def run(self):
sum = 0
while True:
i = self.q.get()
if i is None:
print(sum)
break
sum += i
q = queue.Queue()
c = Consumer(q)
p = Producer(q)
c.start()
p.start()
p.join()
c.join()
""".format(LIMIT=LIMIT)
# Import the things to use threads.
if sys.version_info < (3, 0):
THREAD = """\
import threading
import Queue as queue
""" + COMMON
else:
THREAD = """\
import threading
import queue
""" + COMMON
# Import the things to use eventlet.
EVENTLET = """\
import eventlet.green.threading as threading
import eventlet.queue as queue
""" + COMMON
# Import the things to use gevent.
GEVENT = """\
from gevent import monkey
monkey.patch_thread()
import threading
import gevent.queue as queue
""" + COMMON
# Uncomplicated code that doesn't use any of the concurrency stuff, to test
# the simple case under each of the regimes.
SIMPLE = """\
total = 0
for i in range({LIMIT}):
total += i
print(total)
""".format(LIMIT=LIMIT)
def try_some_code(self, code, concurrency, the_module, expected_out=None):
"""Run some concurrency testing code and see that it was all covered.
`code` is the Python code to execute. `concurrency` is the name of
the concurrency regime to test it under. `the_module` is the imported
module that must be available for this to work at all. `expected_out`
is the text we expect the code to produce.
"""
self.make_file("try_it.py", code)
cmd = "coverage run --concurrency=%s try_it.py" % concurrency
out = self.run_command(cmd)
if not the_module:
# We don't even have the underlying module installed, we expect
# coverage to alert us to this fact.
expected_out = (
"Couldn't trace with concurrency=%s, "
"the module isn't installed.\n" % concurrency
)
self.assertEqual(out, expected_out)
elif C_TRACER or concurrency == "thread":
# We can fully measure the code if we are using the C tracer, which
# can support all the concurrency, or if we are using threads.
if expected_out is None:
expected_out = "%d\n" % (sum(range(self.LIMIT)))
self.assertEqual(out, expected_out)
# Read the coverage file and see that try_it.py has all its lines
# executed.
data = coverage.CoverageData()
data.read_file(".coverage")
# If the test fails, it's helpful to see this info:
fname = os.path.abspath("try_it.py")
linenos = data.executed_lines(fname).keys()
print("{0}: {1}".format(len(linenos), linenos))
print_simple_annotation(code, linenos)
lines = line_count(code)
self.assertEqual(data.summary()['try_it.py'], lines)
else:
expected_out = (
"Can't support concurrency=%s with PyTracer, "
"only threads are supported\n" % concurrency
)
self.assertEqual(out, expected_out)
def test_threads(self):
self.try_some_code(self.THREAD, "thread", threading)
def test_threads_simple_code(self):
self.try_some_code(self.SIMPLE, "thread", threading)
def test_eventlet(self):
self.try_some_code(self.EVENTLET, "eventlet", eventlet)
def test_eventlet_simple_code(self):
self.try_some_code(self.SIMPLE, "eventlet", eventlet)
def test_gevent(self):
self.try_some_code(self.GEVENT, "gevent", gevent)
def test_gevent_simple_code(self):
self.try_some_code(self.SIMPLE, "gevent", gevent)
def test_greenlet(self):
GREENLET = """\
from greenlet import greenlet
def test1(x, y):
z = gr2.switch(x+y)
print(z)
def test2(u):
print(u)
gr1.switch(42)
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch("hello", " world")
"""
self.try_some_code(GREENLET, "greenlet", greenlet, "hello world\n42\n")
def test_greenlet_simple_code(self):
self.try_some_code(self.SIMPLE, "greenlet", greenlet)
def test_bug_330(self):
BUG_330 = """\
from weakref import WeakKeyDictionary
import eventlet
def do():
eventlet.sleep(.01)
gts = WeakKeyDictionary()
for _ in range(100):
gts[eventlet.spawn(do)] = True
eventlet.sleep(.005)
eventlet.sleep(.1)
print len(gts)
"""
self.try_some_code(BUG_330, "eventlet", eventlet, "0\n")
def print_simple_annotation(code, linenos):
"""Print the lines in `code` with X for each line number in `linenos`."""
for lineno, line in enumerate(code.splitlines(), start=1):
print(" {0} {1}".format("X" if lineno in linenos else " ", line))