| #!/usr/bin/env python | 
 | # Copyright 2019 The Chromium Authors | 
 | # Use of this source code is governed by a BSD-style license that can be | 
 | # found in the LICENSE file. | 
 | """Unit tests for xvfb.py functionality. | 
 |  | 
 | Each unit test is launching xvfb_test_script.py | 
 | through xvfb.py as a subprocess, then tests its expected output. | 
 | """ | 
 |  | 
 | import os | 
 | import signal | 
 | import subprocess | 
 | import sys | 
 | import time | 
 | import unittest | 
 |  | 
 | # pylint: disable=super-with-arguments | 
 |  | 
 | TEST_FILE = __file__.replace('.pyc', '.py') | 
 | XVFB = TEST_FILE.replace('_unittest', '') | 
 | XVFB_TEST_SCRIPT = TEST_FILE.replace('_unittest', '_test_script') | 
 |  | 
 |  | 
 | def launch_process(args): | 
 |   """Launches a sub process to run through xvfb.py.""" | 
 |   return subprocess.Popen([XVFB, XVFB_TEST_SCRIPT] + args, | 
 |                           stdout=subprocess.PIPE, | 
 |                           stderr=subprocess.STDOUT, | 
 |                           env=os.environ.copy()) | 
 |  | 
 |  | 
 | # pylint: disable=inconsistent-return-statements | 
 | def read_subprocess_message(proc, starts_with): | 
 |   """Finds the value after first line prefix condition.""" | 
 |   for line in proc.stdout.read().decode('utf-8').splitlines(True): | 
 |     if str(line).startswith(starts_with): | 
 |       return line.rstrip().replace(starts_with, '') | 
 |  | 
 |  | 
 | # pylint: enable=inconsistent-return-statements | 
 |  | 
 |  | 
 | def send_signal(proc, sig, sleep_time=0.3): | 
 |   """Sends a signal to subprocess.""" | 
 |   time.sleep(sleep_time)  # gives process time to launch. | 
 |   os.kill(proc.pid, sig) | 
 |   proc.wait() | 
 |  | 
 |  | 
 | class XvfbLinuxTest(unittest.TestCase): | 
 |  | 
 |   def setUp(self): | 
 |     super(XvfbLinuxTest, self).setUp() | 
 |     if not sys.platform.startswith('linux'): | 
 |       self.skipTest('linux only test') | 
 |     self._procs = [] | 
 |  | 
 |   def test_no_xvfb_display(self): | 
 |     self._procs.append(launch_process(['--no-xvfb'])) | 
 |     self._procs[0].wait() | 
 |     display = read_subprocess_message(self._procs[0], 'Display :') | 
 |     self.assertEqual(display, os.environ.get('DISPLAY', 'None')) | 
 |  | 
 |   def test_xvfb_display(self): | 
 |     self._procs.append(launch_process([])) | 
 |     self._procs[0].wait() | 
 |     display = read_subprocess_message(self._procs[0], 'Display :') | 
 |     self.assertIsNotNone(display)  # Openbox likely failed to open DISPLAY | 
 |     self.assertNotEqual(display, os.environ.get('DISPLAY', 'None')) | 
 |  | 
 |   def test_no_xvfb_flag(self): | 
 |     self._procs.append(launch_process(['--no-xvfb'])) | 
 |     self._procs[0].wait() | 
 |  | 
 |   def test_xvfb_flag(self): | 
 |     self._procs.append(launch_process([])) | 
 |     self._procs[0].wait() | 
 |  | 
 |   @unittest.skip('flaky; crbug.com/1320399') | 
 |   def test_xvfb_race_condition(self): | 
 |     self._procs = [launch_process([]) for _ in range(15)] | 
 |     for proc in self._procs: | 
 |       proc.wait() | 
 |     display_list = [ | 
 |         read_subprocess_message(p, 'Display :') for p in self._procs | 
 |     ] | 
 |     for display in display_list: | 
 |       self.assertIsNotNone(display)  # Openbox likely failed to open DISPLAY | 
 |       self.assertNotEqual(display, os.environ.get('DISPLAY', 'None')) | 
 |  | 
 |   def tearDown(self): | 
 |     super(XvfbLinuxTest, self).tearDown() | 
 |     for proc in self._procs: | 
 |       if proc.stdout: | 
 |         proc.stdout.close() | 
 |  | 
 |  | 
 | class XvfbTest(unittest.TestCase): | 
 |  | 
 |   def setUp(self): | 
 |     super(XvfbTest, self).setUp() | 
 |     if sys.platform == 'win32': | 
 |       self.skipTest('non-win32 test') | 
 |     self._proc = None | 
 |  | 
 |   def test_send_sigint(self): | 
 |     self._proc = launch_process(['--sleep']) | 
 |     # Give time for subprocess to install signal handlers | 
 |     time.sleep(.3) | 
 |     send_signal(self._proc, signal.SIGINT, 1) | 
 |     sig = read_subprocess_message(self._proc, 'Signal :') | 
 |     self.assertIsNotNone(sig)  # OpenBox likely failed to start | 
 |     self.assertEqual(int(sig), int(signal.SIGINT)) | 
 |  | 
 |   def test_send_sigterm(self): | 
 |     self._proc = launch_process(['--sleep']) | 
 |     # Give time for subprocess to install signal handlers | 
 |     time.sleep(.3) | 
 |     send_signal(self._proc, signal.SIGTERM, 1) | 
 |     sig = read_subprocess_message(self._proc, 'Signal :') | 
 |     self.assertIsNotNone(sig)  # OpenBox likely failed to start | 
 |     self.assertEqual(int(sig), int(signal.SIGTERM)) | 
 |  | 
 |   def tearDown(self): | 
 |     super(XvfbTest, self).tearDown() | 
 |     if self._proc.stdout: | 
 |       self._proc.stdout.close() | 
 |  | 
 |  | 
 | if __name__ == '__main__': | 
 |   unittest.main() |