blob: 8a0ad30c4bc770b790edebfb9b01b76d4597e15e [file] [log] [blame]
import codecs
import io
import _pyio as pyio
import threading
from unittest import TestCase
from test.support import threading_helper
from test.support.threading_helper import run_concurrently
from random import randint
from sys import getsizeof
threading_helper.requires_working_threading(module=True)
class ThreadSafetyMixin:
# Test pretty much everything that can break under free-threading.
# Non-deterministic, but at least one of these things will fail if
# BytesIO object is not free-thread safe.
def check(self, funcs, *args):
barrier = threading.Barrier(len(funcs))
threads = []
for func in funcs:
thread = threading.Thread(target=func, args=(barrier, *args))
threads.append(thread)
with threading_helper.start_threads(threads):
pass
@threading_helper.requires_working_threading()
@threading_helper.reap_threads
def test_free_threading(self):
"""Test for segfaults and aborts."""
def write(barrier, b, *ignore):
barrier.wait()
try: b.write(b'0' * randint(100, 1000))
except ValueError: pass # ignore write fail to closed file
def writelines(barrier, b, *ignore):
barrier.wait()
b.write(b'0\n' * randint(100, 1000))
def truncate(barrier, b, *ignore):
barrier.wait()
try: b.truncate(0)
except BufferError: pass # ignore exported buffer
def read(barrier, b, *ignore):
barrier.wait()
b.read()
def read1(barrier, b, *ignore):
barrier.wait()
b.read1()
def readline(barrier, b, *ignore):
barrier.wait()
b.readline()
def readlines(barrier, b, *ignore):
barrier.wait()
b.readlines()
def readinto(barrier, b, into, *ignore):
barrier.wait()
b.readinto(into)
def close(barrier, b, *ignore):
barrier.wait()
b.close()
def getvalue(barrier, b, *ignore):
barrier.wait()
b.getvalue()
def getbuffer(barrier, b, *ignore):
barrier.wait()
b.getbuffer()
def iter(barrier, b, *ignore):
barrier.wait()
list(b)
def getstate(barrier, b, *ignore):
barrier.wait()
b.__getstate__()
def setstate(barrier, b, st, *ignore):
barrier.wait()
b.__setstate__(st)
def sizeof(barrier, b, *ignore):
barrier.wait()
getsizeof(b)
self.check([write] * 10, self.ioclass())
self.check([writelines] * 10, self.ioclass())
self.check([write] * 10 + [truncate] * 10, self.ioclass())
self.check([truncate] + [read] * 10, self.ioclass(b'0\n'*204800))
self.check([truncate] + [read1] * 10, self.ioclass(b'0\n'*204800))
self.check([truncate] + [readline] * 10, self.ioclass(b'0\n'*20480))
self.check([truncate] + [readlines] * 10, self.ioclass(b'0\n'*20480))
self.check([truncate] + [readinto] * 10, self.ioclass(b'0\n'*204800), bytearray(b'0\n'*204800))
self.check([close] + [write] * 10, self.ioclass())
self.check([truncate] + [getvalue] * 10, self.ioclass(b'0\n'*204800))
self.check([truncate] + [getbuffer] * 10, self.ioclass(b'0\n'*204800))
self.check([truncate] + [iter] * 10, self.ioclass(b'0\n'*20480))
self.check([truncate] + [getstate] * 10, self.ioclass(b'0\n'*204800))
state = self.ioclass(b'123').__getstate__()
self.check([truncate] + [setstate] * 10, self.ioclass(b'0\n'*204800), state)
self.check([truncate] + [sizeof] * 10, self.ioclass(b'0\n'*204800))
# no tests for seek or tell because they don't break anything
class CBytesIOTest(ThreadSafetyMixin, TestCase):
ioclass = io.BytesIO
class PyBytesIOTest(ThreadSafetyMixin, TestCase):
ioclass = pyio.BytesIO
class IncrementalNewlineDecoderTest(TestCase):
def make_decoder(self):
utf8_decoder = codecs.getincrementaldecoder('utf-8')()
return io.IncrementalNewlineDecoder(utf8_decoder, translate=True)
def test_concurrent_reset(self):
decoder = self.make_decoder()
def worker():
for _ in range(100):
decoder.reset()
run_concurrently(worker_func=worker, nthreads=2)
def test_concurrent_decode(self):
decoder = self.make_decoder()
def worker():
for _ in range(100):
decoder.decode(b"line\r\n", final=False)
run_concurrently(worker_func=worker, nthreads=2)
def test_concurrent_getstate_setstate(self):
decoder = self.make_decoder()
state = decoder.getstate()
def getstate_worker():
for _ in range(100):
decoder.getstate()
def setstate_worker():
for _ in range(100):
decoder.setstate(state)
run_concurrently([getstate_worker] * 2 + [setstate_worker] * 2)
def test_concurrent_decode_and_reset(self):
decoder = self.make_decoder()
def decode_worker():
for _ in range(100):
decoder.decode(b"line\r\n", final=False)
def reset_worker():
for _ in range(100):
decoder.reset()
run_concurrently([decode_worker] * 2 + [reset_worker] * 2)