| import array |
| import os |
| import struct |
| import sys |
| import threading |
| import unittest |
| from test import support |
| from test.support import os_helper, threading_helper |
| from test.support.import_helper import import_module |
| fcntl = import_module('fcntl') |
| termios = import_module('termios') |
| |
| class IoctlTestsTty(unittest.TestCase): |
| @classmethod |
| def setUpClass(cls): |
| TIOCGPGRP = support.get_attribute(termios, 'TIOCGPGRP') |
| try: |
| tty = open("/dev/tty", "rb") |
| except OSError: |
| raise unittest.SkipTest("Unable to open /dev/tty") |
| with tty: |
| # Skip if another process is in foreground |
| r = fcntl.ioctl(tty, TIOCGPGRP, struct.pack("i", 0)) |
| rpgrp = struct.unpack("i", r)[0] |
| if rpgrp not in (os.getpgrp(), os.getsid(0)): |
| raise unittest.SkipTest("Neither the process group nor the session " |
| "are attached to /dev/tty") |
| |
| def test_ioctl_immutable_buf(self): |
| # If this process has been put into the background, TIOCGPGRP returns |
| # the session ID instead of the process group id. |
| ids = (os.getpgrp(), os.getsid(0)) |
| with open("/dev/tty", "rb") as tty: |
| # string |
| buf = " "*8 |
| r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf) |
| self.assertIsInstance(r, bytes) |
| rpgrp = memoryview(r).cast('i')[0] |
| self.assertIn(rpgrp, ids) |
| |
| # bytes |
| buf = b" "*8 |
| r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf) |
| self.assertIsInstance(r, bytes) |
| rpgrp = memoryview(r).cast('i')[0] |
| self.assertIn(rpgrp, ids) |
| |
| # read-only buffer |
| r = fcntl.ioctl(tty, termios.TIOCGPGRP, memoryview(buf)) |
| self.assertIsInstance(r, bytes) |
| rpgrp = memoryview(r).cast('i')[0] |
| self.assertIn(rpgrp, ids) |
| |
| def test_ioctl_mutable_buf(self): |
| ids = (os.getpgrp(), os.getsid(0)) |
| with open("/dev/tty", "rb") as tty: |
| buf = bytearray(b" "*8) |
| r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf) |
| self.assertEqual(r, 0) |
| rpgrp = memoryview(buf).cast('i')[0] |
| self.assertIn(rpgrp, ids) |
| |
| def test_ioctl_no_mutate_buf(self): |
| ids = (os.getpgrp(), os.getsid(0)) |
| with open("/dev/tty", "rb") as tty: |
| buf = bytearray(b" "*8) |
| save_buf = bytes(buf) |
| r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf, False) |
| self.assertEqual(bytes(buf), save_buf) |
| self.assertIsInstance(r, bytes) |
| rpgrp = memoryview(r).cast('i')[0] |
| self.assertIn(rpgrp, ids) |
| |
| def _create_int_buf(self, nbytes=None): |
| buf = array.array('i') |
| intsize = buf.itemsize |
| # A fill value unlikely to be in `ids` |
| fill = -12345 |
| if nbytes is not None: |
| # Extend the buffer so that it is exactly `nbytes` bytes long |
| buf.extend([fill] * (nbytes // intsize)) |
| self.assertEqual(len(buf) * intsize, nbytes) # sanity check |
| else: |
| buf.append(fill) |
| return buf |
| |
| def _check_ioctl_mutate_len(self, nbytes=None): |
| ids = (os.getpgrp(), os.getsid(0)) |
| buf = self._create_int_buf(nbytes) |
| with open("/dev/tty", "rb") as tty: |
| r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf) |
| rpgrp = buf[0] |
| self.assertEqual(r, 0) |
| self.assertIn(rpgrp, ids) |
| |
| def _check_ioctl_not_mutate_len(self, nbytes=None): |
| ids = (os.getpgrp(), os.getsid(0)) |
| buf = self._create_int_buf(nbytes) |
| save_buf = bytes(buf) |
| with open("/dev/tty", "rb") as tty: |
| r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf, False) |
| self.assertIsInstance(r, bytes) |
| self.assertEqual(len(r), len(save_buf)) |
| self.assertEqual(bytes(buf), save_buf) |
| rpgrp = array.array('i', r)[0] |
| rpgrp = memoryview(r).cast('i')[0] |
| self.assertIn(rpgrp, ids) |
| |
| buf = bytes(buf) |
| with open("/dev/tty", "rb") as tty: |
| r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf, True) |
| self.assertIsInstance(r, bytes) |
| self.assertEqual(len(r), len(save_buf)) |
| self.assertEqual(buf, save_buf) |
| rpgrp = array.array('i', r)[0] |
| rpgrp = memoryview(r).cast('i')[0] |
| self.assertIn(rpgrp, ids) |
| |
| def test_ioctl_mutate(self): |
| self._check_ioctl_mutate_len() |
| self._check_ioctl_not_mutate_len() |
| |
| def test_ioctl_mutate_1024(self): |
| # Issue #9758: a mutable buffer of exactly 1024 bytes wouldn't be |
| # copied back after the system call. |
| self._check_ioctl_mutate_len(1024) |
| self._check_ioctl_not_mutate_len(1024) |
| |
| def test_ioctl_mutate_2048(self): |
| self._check_ioctl_mutate_len(2048) |
| self._check_ioctl_not_mutate_len(1024) |
| |
| |
| @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()") |
| class IoctlTestsPty(unittest.TestCase): |
| def setUp(self): |
| self.master_fd, self.slave_fd = os.openpty() |
| self.addCleanup(os.close, self.slave_fd) |
| self.addCleanup(os.close, self.master_fd) |
| |
| @unittest.skipUnless(hasattr(termios, 'TCFLSH'), 'requires termios.TCFLSH') |
| def test_ioctl_clear_input_or_output(self): |
| wfd = self.slave_fd |
| rfd = self.master_fd |
| # The data is buffered in the input buffer on Linux, and in |
| # the output buffer on other platforms. |
| inbuf = sys.platform in ('linux', 'android') |
| |
| os.write(wfd, b'abcdef') |
| self.assertEqual(os.read(rfd, 2), b'ab') |
| if inbuf: |
| # don't flush input |
| fcntl.ioctl(rfd, termios.TCFLSH, termios.TCOFLUSH) |
| else: |
| # don't flush output |
| fcntl.ioctl(wfd, termios.TCFLSH, termios.TCIFLUSH) |
| self.assertEqual(os.read(rfd, 2), b'cd') |
| if inbuf: |
| # flush input |
| fcntl.ioctl(rfd, termios.TCFLSH, termios.TCIFLUSH) |
| else: |
| # flush output |
| fcntl.ioctl(wfd, termios.TCFLSH, termios.TCOFLUSH) |
| os.write(wfd, b'ABCDEF') |
| self.assertEqual(os.read(rfd, 1024), b'ABCDEF') |
| |
| @support.skip_android_selinux('tcflow') |
| @unittest.skipUnless(sys.platform in ('linux', 'android'), 'only works on Linux') |
| @unittest.skipUnless(hasattr(termios, 'TCXONC'), 'requires termios.TCXONC') |
| def test_ioctl_suspend_and_resume_output(self): |
| wfd = self.slave_fd |
| rfd = self.master_fd |
| write_suspended = threading.Event() |
| write_finished = threading.Event() |
| |
| def writer(): |
| os.write(wfd, b'abc') |
| self.assertTrue(write_suspended.wait(support.SHORT_TIMEOUT)) |
| os.write(wfd, b'def') |
| write_finished.set() |
| |
| with threading_helper.start_threads([threading.Thread(target=writer)]): |
| self.assertEqual(os.read(rfd, 3), b'abc') |
| try: |
| try: |
| fcntl.ioctl(wfd, termios.TCXONC, termios.TCOOFF) |
| finally: |
| write_suspended.set() |
| self.assertFalse(write_finished.wait(0.5), |
| 'output was not suspended') |
| finally: |
| fcntl.ioctl(wfd, termios.TCXONC, termios.TCOON) |
| self.assertTrue(write_finished.wait(support.SHORT_TIMEOUT), |
| 'output was not resumed') |
| self.assertEqual(os.read(rfd, 1024), b'def') |
| |
| def test_ioctl_set_window_size(self): |
| # (rows, columns, xpixel, ypixel) |
| our_winsz = struct.pack("HHHH", 20, 40, 0, 0) |
| result = fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, our_winsz) |
| new_winsz = struct.unpack("HHHH", result) |
| self.assertEqual(new_winsz[:2], (20, 40)) |
| |
| @unittest.skipUnless(hasattr(fcntl, 'FICLONE'), 'need fcntl.FICLONE') |
| def test_bad_fd(self): |
| # gh-134744: Test error handling |
| fd = os_helper.make_bad_fd() |
| with self.assertRaises(OSError): |
| fcntl.ioctl(fd, fcntl.FICLONE, fd) |
| with self.assertRaises(OSError): |
| fcntl.ioctl(fd, fcntl.FICLONE, b'\0' * 10) |
| with self.assertRaises(OSError): |
| fcntl.ioctl(fd, fcntl.FICLONE, b'\0' * 2048) |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |