blob: bea49df557aa2c5163182fc0c59c5c56c555e131 [file] [log] [blame]
import functools
import sys
import threading
import unittest
from test.support import threading_helper
threading_helper.requires_working_threading(module=True)
def run_with_frame(funcs, runner=None, iters=10):
"""Run funcs with a frame from another thread that is currently executing.
Args:
funcs: A function or list of functions that take a frame argument
runner: Optional function to run in the executor thread. If provided,
it will be called and should return eventually. The frame
passed to funcs will be the runner's frame.
iters: Number of iterations each func should run
"""
if not isinstance(funcs, list):
funcs = [funcs]
frame_var = None
e = threading.Event()
b = threading.Barrier(len(funcs) + 1)
if runner is None:
def runner():
j = 0
for i in range(100):
j += i
def executor():
nonlocal frame_var
frame_var = sys._getframe()
e.set()
b.wait()
runner()
def func_wrapper(func):
e.wait()
frame = frame_var
b.wait()
for _ in range(iters):
func(frame)
test_funcs = [functools.partial(func_wrapper, f) for f in funcs]
threading_helper.run_concurrently([executor] + test_funcs)
class TestFrameRaces(unittest.TestCase):
def test_concurrent_f_lasti(self):
run_with_frame(lambda frame: frame.f_lasti)
def test_concurrent_f_lineno(self):
run_with_frame(lambda frame: frame.f_lineno)
def test_concurrent_f_code(self):
run_with_frame(lambda frame: frame.f_code)
def test_concurrent_f_back(self):
run_with_frame(lambda frame: frame.f_back)
def test_concurrent_f_globals(self):
run_with_frame(lambda frame: frame.f_globals)
def test_concurrent_f_builtins(self):
run_with_frame(lambda frame: frame.f_builtins)
def test_concurrent_f_locals(self):
run_with_frame(lambda frame: frame.f_locals)
def test_concurrent_f_trace_read(self):
run_with_frame(lambda frame: frame.f_trace)
def test_concurrent_f_trace_opcodes_read(self):
run_with_frame(lambda frame: frame.f_trace_opcodes)
def test_concurrent_repr(self):
run_with_frame(lambda frame: repr(frame))
def test_concurrent_f_trace_write(self):
def trace_func(frame, event, arg):
return trace_func
def writer(frame):
frame.f_trace = trace_func
frame.f_trace = None
run_with_frame(writer)
def test_concurrent_f_trace_read_write(self):
# Test concurrent reads and writes of f_trace on a live frame.
def trace_func(frame, event, arg):
return trace_func
def reader(frame):
_ = frame.f_trace
def writer(frame):
frame.f_trace = trace_func
frame.f_trace = None
run_with_frame([reader, writer, reader, writer])
def test_concurrent_f_trace_opcodes_write(self):
def writer(frame):
frame.f_trace_opcodes = True
frame.f_trace_opcodes = False
run_with_frame(writer)
def test_concurrent_f_trace_opcodes_read_write(self):
# Test concurrent reads and writes of f_trace_opcodes on a live frame.
def reader(frame):
_ = frame.f_trace_opcodes
def writer(frame):
frame.f_trace_opcodes = True
frame.f_trace_opcodes = False
run_with_frame([reader, writer, reader, writer])
def test_concurrent_frame_clear(self):
# Test race between frame.clear() and attribute reads.
def create_frame():
x = 1
y = 2
return sys._getframe()
frame = create_frame()
def reader():
for _ in range(10):
try:
_ = frame.f_locals
_ = frame.f_code
_ = frame.f_lineno
except ValueError:
# Frame may be cleared
pass
def clearer():
frame.clear()
threading_helper.run_concurrently([reader, reader, clearer])
if __name__ == "__main__":
unittest.main()