blob: b461540962a10fa04762d1139ea5eb73b1ceaa15 [file]
#!/usr/bin/env python3
# Copyright 2024 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from contextlib import ExitStack
from textwrap import dedent
import os
import platform
import subprocess
import tempfile
import typing as T
import shlex
import sys
import unittest
_SCRIPT_DIR = os.path.realpath(os.path.dirname(__file__))
_JOBSERVER_POOL_SCRIPT = os.path.join(_SCRIPT_DIR, "jobserver_pool.py")
_JOBSERVER_TEST_HELPER_SCRIPT = os.path.join(_SCRIPT_DIR, "jobserver_test_helper.py")
_PLATFORM_IS_WINDOWS = platform.system() == "Windows"
# Set this to True to debug command invocations.
_DEBUG = False
default_env = dict(os.environ)
default_env.pop("NINJA_STATUS", None)
default_env.pop("MAKEFLAGS", None)
default_env["TERM"] = "dumb"
NINJA_PATH = os.path.abspath("./ninja")
class BuildDir:
def __init__(self, build_ninja: str):
self.build_ninja = dedent(build_ninja)
self.d: T.Optional[tempfile.TemporaryDirectory] = None
def __enter__(self):
self.d = tempfile.TemporaryDirectory()
with open(os.path.join(self.d.name, "build.ninja"), "w") as f:
f.write(self.build_ninja)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.d.cleanup()
@property
def path(self) -> str:
assert self.d
return self.d.name
def run(
self,
cmd_flags: T.Sequence[str] = [],
env: T.Dict[str, str] = default_env,
) -> None:
"""Run a command, raise exception on error. Do not capture outputs."""
ret = subprocess.run(cmd_flags, env=env)
ret.check_returncode()
def ninja_run(
self,
ninja_args: T.List[str],
prefix_args: T.List[str] = [],
extra_env: T.Dict[str, str] = {},
) -> "subprocess.CompletedProcess[str]":
ret = self.ninja_spawn(
ninja_args,
prefix_args=prefix_args,
extra_env=extra_env,
capture_output=False,
)
ret.check_returncode()
return ret
def ninja_clean(self) -> None:
self.ninja_run(["-t", "clean"])
def ninja_spawn(
self,
ninja_args: T.List[str],
prefix_args: T.List[str] = [],
extra_env: T.Dict[str, str] = {},
capture_output: bool = True,
) -> "subprocess.CompletedProcess[str]":
"""Run Ninja command and capture outputs."""
cmd_args = prefix_args + [NINJA_PATH, "-C", self.path] + ninja_args
if _DEBUG:
cmd_str = " ".join(shlex.quote(c) for c in cmd_args)
print(f"CMD [{cmd_str}]", file=sys.stderr)
return subprocess.run(
cmd_args,
text=True,
stdout=subprocess.PIPE if capture_output else None,
stderr=subprocess.PIPE if capture_output else None,
env={**default_env, **extra_env},
)
def ninja_popen(
self,
ninja_args: T.List[str],
prefix_args: T.List[str] = [],
extra_env: T.Dict[str, str] = {},
capture_output: bool = True,
) -> subprocess.Popen[str]:
"""Start Ninja command and return the Popen object (i.e. without blocking)."""
cmd_args = prefix_args + [NINJA_PATH, "-C", self.path] + ninja_args
if _DEBUG:
cmd_str = " ".join(shlex.quote(c) for c in cmd_args)
print(f"CMD [{cmd_str}]", file=sys.stderr)
return subprocess.Popen(
cmd_args,
text=True,
stdout=subprocess.PIPE if capture_output else None,
stderr=subprocess.PIPE if capture_output else None,
env={**default_env, **extra_env},
)
def span_output_file(span_n: int) -> str:
return "out%02d" % span_n
def generate_build_plan(command_count: int) -> str:
"""Generate a Ninja build plan for |command_count| parallel tasks.
Each task calls the test helper script which waits for 50ms
then writes its own start and end time to its output file.
"""
result = f"""
rule span
command = {sys.executable} -S {_JOBSERVER_TEST_HELPER_SCRIPT} --duration-ms=50 $out
"""
for n in range(command_count):
result += "build %s: span\n" % span_output_file(n)
result += "build all: phony %s\n" % " ".join(
[span_output_file(n) for n in range(command_count)]
)
return result
def generate_double_build_plan(
wait_1_ms: int = 500,
wait_2_ms: int = 500,
offset: int = 0,
) -> str:
"""Generate a Ninja build plan that invokes the test helper script in two parallel commands, each one with its own duration argument
- `wait_1_ms` and `wait_2_ms` control task durations.
- `offset`: start index of the output files generated by this plan, allows multiple plans without conflicting filenames.
"""
result = f"""
rule short
command = {sys.executable} -S {_JOBSERVER_TEST_HELPER_SCRIPT} --duration-ms={wait_1_ms} $out
rule long
command = {sys.executable} -S {_JOBSERVER_TEST_HELPER_SCRIPT} --duration-ms={wait_2_ms} $out
"""
result += "build %s: long\n" % span_output_file(offset)
result += "build %s: short\n" % span_output_file(offset + 1)
result += "build all: phony %s\n" % " ".join(
[span_output_file(n + offset) for n in range(2)]
)
return result
def get_spans(build_dir: str, command_count: int) -> T.List[T.Tuple[int, int]]:
"""
Get span outputs for a single build directory.
- `build_dir`: Directory containing span output files.
- `command_count`: Number of commands executed in the build.
Returns a list of (start, end) tuples parsed from the span output files, sorted by start time.
"""
return get_spans_for_multiple_builds([build_dir], [command_count])
def get_spans_for_multiple_builds(build_dirs: T.List[str], command_counts: T.List[int]) -> T.List[T.Tuple[int, int]]:
"""
Get span outputs across multiple build directories. Assumes that the output files
in each build directory are named sequentially according to the `span_output_file` function,
and that the total number of output files across all build directories is equal to the sum of `command_counts`.
- `build_dirs`: Build directories containing span output files.
- `command_counts`: Number of commands executed in each build.
Returns a list of (start, end) span tuples in increasing order by start time.
"""
spans: T.List[T.Tuple[int, int]] = []
offset:int = 0
for b_path, command_count in zip(build_dirs, command_counts):
for n in range(command_count):
out_file = os.path.join(b_path, span_output_file(n + offset))
with open(out_file, "rb") as f:
content = f.read().decode("utf-8")
lines = content.splitlines()
assert len(lines) == 2, f"Unexpected output file content: [{content}]"
spans.append((int(lines[0]), int(lines[1])))
# Update the offset for the next build directory to ensure unique output file names.
offset += command_count
return sorted(spans, key=lambda x: x[0])
def compute_max_overlapped_spans(build_dir: str, command_count: int) -> int:
"""Compute the maximum number of overlapped spanned tasks.
This reads the output files from |build_dir| and look at their start and end times
to compute the maximum number of tasks that were run in parallel.
"""
# Read the output files.
if command_count < 2:
return 0
spans: T.List[T.Tuple[int, int]] = get_spans(build_dir, command_count)
# Stupid but simple, for each span, count the number of other spans that overlap it.
max_overlaps = 1
for n in range(command_count):
cur_start, cur_end = spans[n]
cur_overlaps = 1
for m in range(command_count):
other_start, other_end = spans[m]
if n != m and other_end > cur_start and other_start < cur_end:
cur_overlaps += 1
if cur_overlaps > max_overlaps:
max_overlaps = cur_overlaps
return max_overlaps
class JobserverTest(unittest.TestCase):
def test_no_jobserver_client(self):
task_count = 4
build_plan = generate_build_plan(task_count)
with BuildDir(build_plan) as b:
output = b.run([NINJA_PATH, "-C", b.path, f"-j{task_count}", "all"])
max_overlaps = compute_max_overlapped_spans(b.path, task_count)
self.assertEqual(max_overlaps, task_count)
b.ninja_clean()
output = b.run([NINJA_PATH, "-C", b.path, "-j1", "all"])
max_overlaps = compute_max_overlapped_spans(b.path, task_count)
self.assertEqual(max_overlaps, 1)
def _run_client_test(self, jobserver_args: T.List[str]) -> None:
task_count = 4
build_plan = generate_build_plan(task_count)
with BuildDir(build_plan) as b:
# First, run the full tasks with with {task_count} tokens, this should allow all
# tasks to run in parallel.
ret = b.ninja_run(
ninja_args=["all"],
prefix_args=jobserver_args + [f"--jobs={task_count}"],
)
max_overlaps = compute_max_overlapped_spans(b.path, task_count)
self.assertEqual(max_overlaps, task_count)
# Second, use 2 tokens only, and verify that this was enforced by Ninja.
b.ninja_clean()
b.ninja_run(
["all"],
prefix_args=jobserver_args + ["--jobs=2"],
)
max_overlaps = compute_max_overlapped_spans(b.path, task_count)
self.assertEqual(max_overlaps, 2)
# Third, verify that --jobs=1 serializes all tasks.
b.ninja_clean()
b.ninja_run(
["all"],
prefix_args=jobserver_args + ["--jobs=1"],
)
max_overlaps = compute_max_overlapped_spans(b.path, task_count)
self.assertEqual(max_overlaps, 1)
# Finally, verify that -j1 overrides the pool.
b.ninja_clean()
b.ninja_run(
["-j1", "all"],
prefix_args=jobserver_args + [f"--jobs={task_count}"],
)
max_overlaps = compute_max_overlapped_spans(b.path, task_count)
self.assertEqual(max_overlaps, 1)
# On Linux, use taskset to limit the number of available cores to 1
# and verify that the jobserver overrides the default Ninja parallelism
# and that {task_count} tasks are still spawned in parallel.
if platform.system() == "Linux":
# First, run without a jobserver, with a single CPU, Ninja will
# use a parallelism of 2 in this case (GuessParallelism() in ninja.cc)
b.ninja_clean()
b.ninja_run(
["all"],
prefix_args=["taskset", "-c", "0"],
)
max_overlaps = compute_max_overlapped_spans(b.path, task_count)
self.assertEqual(max_overlaps, 2)
# Now with a jobserver with {task_count} tasks.
b.ninja_clean()
b.ninja_run(
["all"],
prefix_args=jobserver_args
+ [f"--jobs={task_count}"]
+ ["taskset", "-c", "0"],
)
max_overlaps = compute_max_overlapped_spans(b.path, task_count)
self.assertEqual(max_overlaps, task_count)
@unittest.skipIf(_PLATFORM_IS_WINDOWS, "These test methods do not work on Windows")
def test_jobserver_client_with_posix_fifo(self):
self._run_client_test([sys.executable, "-S", _JOBSERVER_POOL_SCRIPT])
@unittest.skipIf(_PLATFORM_IS_WINDOWS, "These test methods do not work on Windows")
def test_jobserver_client_with_posix_pipe(self):
# Verify that setting up a --pipe server does not make Ninja exit with an error.
# Instead, a warning is printed.
task_count = 4
build_plan = generate_build_plan(task_count)
with BuildDir(build_plan) as b:
prefix_args = [
sys.executable,
"-S",
_JOBSERVER_POOL_SCRIPT,
"--pipe",
f"--jobs={task_count}",
]
def run_ninja_with_jobserver_pipe(args):
ret = b.ninja_spawn(args, prefix_args=prefix_args)
ret.check_returncode()
return ret.stdout, ret.stderr
output, error = run_ninja_with_jobserver_pipe(["all"])
if _DEBUG:
print(f"OUTPUT [{output}]\nERROR [{error}]\n", file=sys.stderr)
self.assertTrue(error.find("Pipe-based protocol is not supported!") >= 0)
max_overlaps = compute_max_overlapped_spans(b.path, task_count)
self.assertEqual(max_overlaps, task_count)
# Using an explicit -j<N> ignores the jobserver pool.
b.ninja_clean()
output, error = run_ninja_with_jobserver_pipe(["-j1", "all"])
if _DEBUG:
print(f"OUTPUT [{output}]\nERROR [{error}]\n", file=sys.stderr)
self.assertFalse(error.find("Pipe-based protocol is not supported!") >= 0)
max_overlaps = compute_max_overlapped_spans(b.path, task_count)
self.assertEqual(max_overlaps, 1)
@unittest.skipIf(_PLATFORM_IS_WINDOWS, "These test methods do not work on Windows")
def test_jobserver_client_with_posix_fifo_token_efficiency(self):
# Due to implicit slots, the max number of parallel tasks is task_count + num_clients - 1
task_count = 2
client_tasks = 2
with ExitStack() as stack:
tmp_dir = tempfile.mkdtemp()
stack.callback(os.rmdir, tmp_dir)
fifo_path = os.path.join(tmp_dir, "jobserver_fifo_test")
# Create FIFO for Jobserver - see notes about implicit slots above
os.mkfifo(fifo_path) # type: ignore
stack.callback(lambda: os.path.exists(fifo_path) and os.remove(fifo_path))
read_fd = os.open(fifo_path, os.O_RDONLY | os.O_NONBLOCK)
stack.callback(os.close, read_fd)
write_fd = os.open(fifo_path, os.O_WRONLY | os.O_NONBLOCK)
stack.callback(os.close, write_fd)
os.write(write_fd, (task_count - 1) * b"x")
env = dict(os.environ)
env["MAKEFLAGS"] = f" -j{task_count} --jobserver-auth=fifo:" + fifo_path
# First build plan has 2 short tasks, second build plan has 2 long tasks
build_plan_1 = generate_double_build_plan(wait_1_ms=1000, wait_2_ms=1000, offset=0)
build_plan_2 = generate_double_build_plan(wait_1_ms=2000, wait_2_ms=2000, offset=client_tasks)
with BuildDir(build_plan_1) as b1, BuildDir(build_plan_2) as b2:
# Start all builds in parallel, with the same jobserver environment.
p1 = b1.ninja_popen(["all"], extra_env=env)
p2 = b2.ninja_popen(["all"], extra_env=env)
# Wait for all builds to finish and verify they all succeeded.
p1.communicate()
p2.communicate()
self.assertEqual(p1.returncode, 0, "Build 1 failed")
self.assertEqual(p2.returncode, 0, "Build 2 failed")
"""
As noted above, with implicit slots, the max number of parallel tasks
is task_count + num_clients - 1, which in this case is 3.
The expected behavior is that 3 tasks run in parallel at the beginning as follows:
|---A---| (Client 1, 2000ms)
|---B---| (Client 1, 2000ms)
|-------C-------| (Client 2, 4000ms)
Once first 2 of 3 tasks finishes (i.e. the short sleeps), the 4th task can start,
and this should start before the long task finishes.
|---A---| (Client 1, 2000ms)
|---B---| (Client 1, 2000ms)
|-------C-------| (Client 2, 4000ms)
|-------D-------| (Client 2, 4000ms)
Make sure D starts after A and B finishes, but before C finishes.
"""
spans = get_spans_for_multiple_builds(
[b1.path, b2.path], [client_tasks, client_tasks]
)
spans.sort(key=lambda s: s[0])
initial_tasks = spans[: task_count + client_tasks - 1]
last_task = spans[-1]
earliest_end = min(task[1] for task in initial_tasks)
latest_end = max(task[1] for task in initial_tasks)
# Assert last task starts after at least one initial task finishes,
# but before all initial tasks finish.
self.assertTrue(
last_task[0] >= earliest_end and last_task[0] < latest_end
)
def _test_MAKEFLAGS_value(
self, ninja_args: T.List[str] = [], prefix_args: T.List[str] = []
):
build_plan = r"""
rule print
command = echo MAKEFLAGS="[$$MAKEFLAGS]"
build all: print
"""
with BuildDir(build_plan) as b:
ret = b.ninja_spawn(
ninja_args + ["--quiet", "all"], prefix_args=prefix_args
)
self.assertEqual(ret.returncode, 0)
output = ret.stdout.strip()
pos = output.find("MAKEFLAGS=[")
self.assertNotEqual(pos, -1, "Could not find MAKEFLAGS in output!")
makeflags, sep, _ = output[pos + len("MAKEFLAGS=[") :].partition("]")
self.assertEqual(sep, "]", "Missing ] in output!: " + output)
self.assertTrue(
"--jobserver-auth=" in makeflags,
f"Missing --jobserver-auth from MAKEFLAGS [{makeflags}]\nSTDOUT [{ret.stdout}]\nSTDERR [{ret.stderr}]",
)
def test_client_passes_MAKEFLAGS(self):
self._test_MAKEFLAGS_value(
prefix_args=[sys.executable, "-S", _JOBSERVER_POOL_SCRIPT]
)
if __name__ == "__main__":
unittest.main()