blob: 4cc50a980bb5a293f491184653f9d8bb10ecfd3f [file] [edit]
#!/usr/bin/env python3
# Copyright 2026 the V8 project authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import annotations
import argparse
import colorsys
import os
import psutil # type: ignore[import-untyped]
import queue
import re
import shlex
import shutil
import statistics
import subprocess
import sys
import threading
import time
from enum import Enum, IntEnum, auto
from pathlib import Path
from typing import IO, Any, Iterator, Optional, cast
COMBINED_TEST_NAME = "JetStream3 (Combined)"
class Engine(Enum):
V8 = auto()
JSC = auto()
SPIDERMONKEY = auto()
UNKNOWN = auto()
@classmethod
def detect(cls, engine_path: Path) -> Engine:
name = engine_path.name.lower()
if "d8" in name or "v8" in name:
return Engine.V8
elif "jsc" in name:
return Engine.JSC
elif "spidermonkey" in name or "js" == name:
return Engine.SPIDERMONKEY
return Engine.UNKNOWN
class DataPointAnnotation(IntEnum):
NONE = 0
MINOR_GC = 1
MAJOR_GC = 2
class DataPoint:
"""Represents a single memory measurement with associated GC and workload annotations."""
def __init__(
self,
rss: float = 0,
annotation: DataPointAnnotation = DataPointAnnotation.NONE,
workload_idx: int = 0,
is_new_workload: bool = False,
):
self.rss = rss
self.annotation = annotation
self.workload_idx = workload_idx
self.is_new_workload = is_new_workload
def merge(self, other: "DataPoint"):
self.rss = max(self.rss, other.rss)
self.annotation = max(self.annotation, other.annotation)
if other.is_new_workload:
if not self.is_new_workload:
self.workload_idx = other.workload_idx
self.is_new_workload = True
elif not self.is_new_workload:
self.workload_idx = max(self.workload_idx, other.workload_idx)
class Terminal:
"""
Provides utility methods for ANSI terminal color constants and cursor control.
"""
RED = "\033[31m"
YELLOW = "\033[33m"
RESET = "\033[0m"
@classmethod
def move_cursor_up(cls, lines: int) -> None:
sys.stdout.write(f"\033[{lines}F")
@classmethod
def erase_line(cls) -> None:
sys.stdout.write("\033[K")
class ResultsTableView:
"""
Handles the formatting and display of benchmark result tables and
aggregate summaries.
"""
PARTS = [
("Test Name", "<33"),
("Score", ">11"),
("Median ⌀", ">9"),
("Peak ▲", ">9"),
]
@classmethod
def get_parts(cls, sparkline: bool = False) -> list[tuple[str, str]]:
parts = list(cls.PARTS)
if sparkline:
parts.append(("History", "<12"))
return parts
@classmethod
def print_header(cls, sparkline: bool = False) -> None:
parts = cls.get_parts(sparkline)
header = " │ ".join(f"{name:{pad}}" for name, pad in parts)
sys.stdout.write(header + "\n")
separator = "─┼─".join("─" * int(pad[1:]) for _, pad in parts)
sys.stdout.write(separator + "\n")
@classmethod
def print_line(
cls,
name_col: str,
score_col: str,
median_col: str,
peak_col: str,
history_col: str = "",
sparkline: bool = False,
end_separator: bool = False,
) -> None:
parts = cls.get_parts(sparkline)
values = [name_col, score_col, median_col, peak_col]
if sparkline:
values.append(history_col)
line = " │ ".join(f"{str(val):{pad}}"
for val, (_, pad) in zip(values, parts))
if end_separator:
line += " │"
sys.stdout.write(f"{line}\n")
@classmethod
def print_summary(cls, workloads: list[Workload]) -> None:
if not workloads:
return
print("")
print("")
# Separate dummy summary workloads from regular ones
summary_workload = None
if workloads[-1].is_summary():
summary_workload = workloads.pop()
regular_workloads = workloads
assert not any(
w.is_summary()
for w in regular_workloads), ("Got more than one summary workload")
cls.print_header()
sorted_workloads = sorted(regular_workloads,
key=lambda w: w.max_rss,
reverse=True)
for workload in sorted_workloads:
workload.print()
all_max_rss_mb = [w.max_rss_mb for w in regular_workloads]
all_median_mb = [w.median_rss_mb for w in regular_workloads]
all_scores = [w.score for w in regular_workloads if w.score is not None]
if not all_max_rss_mb:
return
g_min_max = min(all_max_rss_mb)
g_med_max = statistics.median(all_max_rss_mb)
g_max_max = max(all_max_rss_mb)
g_sum_max = sum(all_max_rss_mb)
g_min_med = min(all_median_mb)
g_med_med = statistics.median(all_median_mb)
g_max_med = max(all_median_mb)
g_sum_med = sum(all_median_mb)
overall_score_str = ""
if summary_workload is not None and summary_workload.score is not None:
overall_score_str = f"{summary_workload.score:.2f} pts"
elif all_scores:
# JetStream uses geometric mean for aggregation
product = 1.0
for s in all_scores:
product *= s
overall_score = product**(1.0 / len(all_scores))
overall_score_str = f"{overall_score:.2f} pts"
print("")
print(f"{'OVERALL Minimum:':<40} {'':>12} {g_min_med:>12} {g_min_max:>12}")
print(
f"{'OVERALL Median:':<40} {'':>12} {g_med_med:>12.0f} {g_med_max:>12.0f}"
)
print(f"{'OVERALL Maximum:':<40} {'':>12} {g_max_med:>12} {g_max_max:>12}")
print(f"{'OVERALL Score:':<40} {overall_score_str:>12}")
print(f"{'OVERALL Sum:':<40} {'':>12} {g_sum_med:>12} {g_sum_max:>12}")
class Workload:
"""Tracks the memory history and score of a single benchmark workload."""
SUMMARY_NAME = "Summary"
@classmethod
def summary(cls, score: float) -> Workload:
return Workload(cls.SUMMARY_NAME, -1, score=score)
def __init__(self, name: str, index: int, score: Optional[float] = None):
self.name = name
self.index = index
self.timeline: list[DataPoint] = []
self.max_rss: float = 0
self.score: Optional[float] = score
if self.is_summary():
assert self.name == self.SUMMARY_NAME, "Invalid summary workload name"
def is_summary(self) -> bool:
return self.index == -1
def add_data_point(self, data_point: DataPoint):
self.timeline.append(data_point)
if data_point.rss > self.max_rss:
self.max_rss = data_point.rss
@property
def median_rss(self) -> float:
if not self.timeline:
return 0
return statistics.median([data_point.rss for data_point in self.timeline])
@property
def median_rss_mb(self) -> int:
return int(self.median_rss // 1024)
@property
def max_rss_mb(self) -> int:
return int(self.max_rss // 1024)
@property
def score_str(self) -> str:
if self.score is None:
return ""
return f"{self.score:.2f} pts"
def print(self, sparkline: bool = False) -> None:
name_col = f"{self.index:02d} {self.name}"
score_col = self.score_str
median_col = f"{self.median_rss_mb} MB ⌀"
peak_col = f"{self.max_rss_mb} MB ▲"
history_col = TimelineView.get_sparkline(
self.timeline) if sparkline else ""
Terminal.erase_line()
ResultsTableView.print_line(name_col, score_col, median_col, peak_col,
history_col, sparkline)
POLL_INTERVAL_SECONDS = 0.1
JS_FLAG_PREFIXES = (
"--tag",
"--test",
"--iteration",
"--worst",
"--dump-test-list",
"--force-gc",
"--no-prefetch",
"--worst-case-count",
"--iteration-count",
)
IGNORED_TEST_PATTERNS = [
"---",
"Profiling",
"block",
"Warning:",
"Try --help",
"Starting JetStream3",
]
class TimelineView:
"""Helper methods to render memory timelines and sparklines."""
HEIGHT = 4
BLOCK_HEIGHT = HEIGHT + 2
LABEL_PADDING = " "
SPARKLINE_CHARS = " ▂▃▄▅▆▇█"
@classmethod
def get_color(cls, v: float, max_v: float = 8 * 1024 * 1024) -> str:
"""Interpolate colors using a piecewise HSV-based spiral."""
t = max(0, min(1.0, v / max_v))
hue = (199 + (t * 5 * 360)) % 360
# Piecewise saturation and value
if t < 0.5:
sat = 0.36 + (t * 2 * 0.64)
val = 0.27 + (t * 2 * 0.73)
else:
sat = 1.0 + ((t - 0.5) * 2 * (-1.00))
val = 1.0
r, g, b = colorsys.hsv_to_rgb(hue / 360, sat, val)
return f"\033[38;2;{int(r * 255)};{int(g * 255)};{int(b * 255)}m"
@classmethod
def compress_timeline(cls, timeline: list[DataPoint],
width: int) -> list[DataPoint]:
"""Compresses a larger timeline into smaller one."""
if not timeline:
return []
if len(timeline) <= width:
return timeline
chunk_size = len(timeline) / width
compressed = []
for i in range(width):
start = int(i * chunk_size)
end = int((i + 1) * chunk_size)
if chunk := timeline[start:end]:
data_point = DataPoint(
chunk[0].rss,
chunk[0].annotation,
chunk[0].workload_idx,
chunk[0].is_new_workload,
)
for other in chunk[1:]:
data_point.merge(other)
compressed.append(data_point)
return compressed
@classmethod
def get_overlay_print_line(cls, timeline: list[DataPoint],
show_markers: bool) -> list[Optional[str]]:
"""
Build horizontal text overlay for workload markers and separators
used for the first graph line."""
overlay: list[Optional[str]] = [None] * len(timeline)
if not show_markers:
return overlay
for i, data_point in enumerate(timeline):
if not data_point.is_new_workload:
continue
idx = data_point.workload_idx
if idx >= 0:
s = f"┌{idx:02d}"
for j, c in enumerate(s):
if i + j < len(timeline):
overlay[i + j] = c
return overlay
@classmethod
def get_graph(
cls,
timeline: list[DataPoint],
width: int,
height: int,
show_labels: bool = False,
compress: bool = False,
show_markers: bool = False,
) -> list[str]:
if not timeline:
return [" " * width] * height
if compress:
timeline = cls.compress_timeline(timeline, width)
else:
timeline = timeline
overlay = cls.get_overlay_print_line(timeline, show_markers)
visible_timeline: list[DataPoint] = timeline[-width:]
visible_overlay: list[str | None] = overlay[-width:]
if len(visible_timeline) < width:
padding_val = visible_timeline[0].rss if visible_timeline else 0
pad_len = width - len(visible_timeline)
visible_timeline = [DataPoint(padding_val)] * pad_len + visible_timeline
padding_overlay: list[str | None] = [None]
visible_overlay = (padding_overlay * pad_len) + visible_overlay
else:
# Add sticky workload index at the start
first_workload_index = visible_timeline[0].workload_idx
if first_workload_index >= 0:
s = f"{first_workload_index:02d}"
visible_overlay[0:len(s)] = list(s)
rss_vals = [data_point.rss for data_point in visible_timeline]
min_val, max_val = min(rss_vals), max(rss_vals)
if max_val == min_val:
max_val = min_val + 1024
# For multi-line overview charts, snap min/max to 100MB boundaries.
if height > 1:
SNAP = 100 * 1024
min_val = (min_val // SNAP) * SNAP
max_val = ((max_val + SNAP - 1) // SNAP) * SNAP
if max_val == min_val:
max_val = min_val + SNAP
val_range = max(1, max_val - min_val)
print_lines = []
for h in range(height, 0, -1):
t_low = min_val + (h - 1) / height * val_range
t_high = min_val + h / height * val_range
line_range = max(1, t_high - t_low)
print_line = ""
if show_labels:
if h == height:
print_line = f"{cls.get_color(max_val)}{max_val // 1024:>4.0f} MB{Terminal.RESET}┐"
elif h == 1:
print_line = f"{cls.get_color(min_val)}{min_val // 1024:>4.0f} MB{Terminal.RESET}┘"
else:
print_line = f"{cls.LABEL_PADDING}│"
for i, data_point in enumerate(visible_timeline):
value = data_point.rss
if show_markers:
if h == height and (overlay_char := visible_overlay[i]):
print_line += f"{Terminal.RESET}{overlay_char}"
continue
if data_point.is_new_workload:
print_line += f"{Terminal.RESET}│{Terminal.RESET}"
continue
if value <= t_low and h > 1:
print_line += " "
continue
if value <= 0:
print_line += " "
continue
if value >= t_high:
char = cls.SPARKLINE_CHARS[-1]
else:
if height == 1:
idx_char = int(((value - min_val) / val_range) *
(len(cls.SPARKLINE_CHARS) - 1))
else:
idx_char = int(((value - t_low) / line_range) *
(len(cls.SPARKLINE_CHARS) - 1))
if h == 1 and idx_char <= 0 and value > 0:
idx_char = 1
char = cls.SPARKLINE_CHARS[idx_char]
if data_point.annotation == DataPointAnnotation.MAJOR_GC:
print_line += f"{Terminal.RED}{char}{Terminal.RESET}"
elif data_point.annotation == DataPointAnnotation.MINOR_GC:
print_line += f"{Terminal.YELLOW}{char}{Terminal.RESET}"
else:
print_line += f"{cls.get_color(value)}{char}{Terminal.RESET}"
print_lines.append(print_line)
if show_labels and show_markers:
print_lines.append(
f"{cls.LABEL_PADDING} {cls.get_annotation_print_line(visible_timeline)}"
)
return print_lines
@classmethod
def get_annotation_print_line(cls, data: list[DataPoint]) -> str:
"""Last line in a graph with annotation symbols."""
annotation_line = ""
for data_point in data:
if data_point.annotation == DataPointAnnotation.MAJOR_GC:
annotation_line += f"{Terminal.RED}♻{Terminal.RESET}"
elif data_point.annotation == DataPointAnnotation.MINOR_GC:
annotation_line += f"{Terminal.YELLOW}♻{Terminal.RESET}"
elif data_point.is_new_workload:
annotation_line += f"{Terminal.RESET}│{Terminal.RESET}"
else:
annotation_line += " "
return annotation_line
@classmethod
def get_sparkline(cls, history: list[DataPoint], width: int = 12) -> str:
"""Single line compressed memory timeline / sparkline."""
# Filter out NEW_WORKLOAD behavior and MINOR GC for sparklines.
clean_history = []
for data_point in history:
annotation = data_point.annotation
if annotation == DataPointAnnotation.MINOR_GC:
annotation = DataPointAnnotation.NONE
clean_history.append(
DataPoint(data_point.rss, annotation, data_point.workload_idx,
False))
return cls.get_graph(clean_history,
width=width,
height=1,
compress=True,
show_markers=False)[0]
@classmethod
def get_multi_line_graph(cls,
history: list[DataPoint],
height: int | None = None) -> list[str]:
"""Large multiline graph with workload markers and annotations."""
height = height or cls.HEIGHT
width = max(
10,
shutil.get_terminal_size().columns - len(cls.LABEL_PADDING) - 1)
return cls.get_graph(
history,
width=width,
height=height,
show_labels=True,
compress=False,
show_markers=True,
)
class MonitorMessageType(Enum):
GC = auto()
WORKLOAD_START = auto()
SCORE = auto()
OVERALL_SCORE = auto()
class MonitorMessage:
"""Container to pass data from the stdout/stderr pipe to the main thread."""
def __init__(self, type: MonitorMessageType, value: Any):
self.type = type
self.value = value
class LineParser:
"""Parses benchmark output and pose MonitorMessages to a message_queue."""
JETSTREAM_TEST_START_RE = re.compile(r"Running (.*):")
JETSTREAM_SCORE_RE = re.compile(
r"^(.*?)\s+(?:Score|Overall.*?-Score)\s+([\d.]+)\s+pts", re.M)
JETSTREAM_OVERALL_SCORE_RE = re.compile(
r"^(?:JetStream3|Overall)\s+Score\s*:\s*([\d.]+)\s+pts", re.I | re.M)
def __init__(self, engine_type: Engine, message_queue: queue.Queue):
self.message_queue = message_queue
if engine_type == Engine.JSC:
self.major_re = re.compile(r"FullCollection", re.I)
self.minor_re = re.compile(r"EdenCollection", re.I)
self.gc_mark = "[GC<"
elif engine_type == Engine.SPIDERMONKEY:
self.major_re = re.compile(r"GC\(T\+", re.I)
self.minor_re = re.compile(r"MinorGCs", re.I)
self.gc_mark = "GC"
else:
self.major_re = re.compile(r"Mark-compact|Mark-sweep|Full GC", re.I)
self.minor_re = re.compile(r"Scavenge|MinorMS|Minor GC", re.I)
self.gc_mark = "ms: "
def post_message(self, message_type: MonitorMessageType, value: Any) -> None:
self.message_queue.put(MonitorMessage(message_type, value))
def parse_line(self, line: str) -> None:
if self.gc_mark in line:
if self.major_re.search(line):
self.post_message(MonitorMessageType.GC, DataPointAnnotation.MAJOR_GC)
elif self.minor_re.search(line):
self.post_message(MonitorMessageType.GC, DataPointAnnotation.MINOR_GC)
if match := self.JETSTREAM_TEST_START_RE.search(line):
self.post_message(MonitorMessageType.WORKLOAD_START, match.group(1))
if match := self.JETSTREAM_OVERALL_SCORE_RE.search(line):
self.post_message(MonitorMessageType.OVERALL_SCORE,
float(match.group(1)))
elif match := self.JETSTREAM_SCORE_RE.search(line):
# (benchmark_name, score)
self.post_message(
MonitorMessageType.SCORE,
(match.group(1).strip(), float(match.group(2))),
)
class IOPipeWatcher(threading.Thread):
"""Background thread to read from a pipe and feed lines to the LineParser."""
def __init__(self, pipe: IO[bytes], parser: LineParser):
super().__init__(daemon=True)
self.pipe = pipe
self.parser = parser
def run(self):
pending = ""
try:
while chunk := self.read_chunk():
lines = (pending + chunk).split("\n")
pending = lines.pop()
for line in lines:
self.parser.parse_line(line)
except Exception:
pass
def read_chunk(self) -> str:
return os.read(self.pipe.fileno(), 4096).decode("utf-8", errors="ignore")
SPINNER_CHARS = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]
def spinner() -> Iterator[str]:
while True:
for char in SPINNER_CHARS:
yield char
class Runner:
"""
Executes the benchmark engine and aggregates memory measurements over time.
"""
POLL_INTERVAL_SECONDS: float = 0.1
@classmethod
def get_test_list(cls, engine_path: Path, engine_type: Engine,
extra_args: list[str]) -> list[str]:
separator = ["--"] if engine_type in (Engine.V8, Engine.JSC) else []
cmd = [str(engine_path)]
cmd += ["./cli.js"] + separator + ["--dump-test-list"] + extra_args
raw_lines = (subprocess.run(cmd,
capture_output=True,
text=True,
check=True).stdout.strip().splitlines())
tests = [
t.strip() for t in raw_lines
if t.strip() and not any(p in t for p in IGNORED_TEST_PATTERNS)
]
if not tests:
raise Exception(f"No tests found matching: {shlex.join(cmd)}")
return tests
@classmethod
def measure_test(
cls,
engine_path: Path,
engine_type: Engine,
test_name: str,
engine_flags: list[str],
cli_js_args: list[str],
combined: bool = False,
start_index: int = 1,
) -> list[Workload]:
env = os.environ.copy()
if engine_type == Engine.SPIDERMONKEY:
env["MOZ_GCTIMER"] = "stdout"
engine_flags_copy = list(engine_flags)
if engine_type == Engine.JSC and "--logGC=1" not in engine_flags_copy:
engine_flags_copy.append("--logGC=1")
if engine_type == Engine.V8 and "--force-gc" in cli_js_args:
if "--expose-gc" not in engine_flags_copy:
engine_flags_copy.append("--expose-gc")
separator = ["--"] if engine_type in (Engine.V8, Engine.JSC) else []
cmd = [str(engine_path)] + engine_flags_copy + ["./cli.js"] + separator
if not combined:
cmd.append(f"--test={test_name}")
cmd.extend(cli_js_args)
try:
return cls(cmd, env, engine_type, test_name, start_index).run()
except Exception as e:
print(f"\n ERROR: {e}")
return []
def __init__(
self,
cmd: list[str],
env: dict[str, str],
engine_type: Engine,
test_name: str,
start_index: int,
):
self.process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=False,
bufsize=0,
env=env,
)
self.message_queue: queue.Queue[MonitorMessage] = queue.Queue()
self.global_max_rss = 0.0
self.global_rss_history: list[DataPoint] = []
self.start_index = start_index
if test_name == COMBINED_TEST_NAME:
self.start_index = 0
self.current_workload = Workload(test_name, self.start_index)
self.workloads: list[Workload] = []
self.spinner = spinner()
self.parser = LineParser(engine_type, self.message_queue)
assert self.process.stdout is not None
assert self.process.stderr is not None
self.stdout_watcher = IOPipeWatcher(self.process.stdout, self.parser)
self.stderr_watcher = IOPipeWatcher(self.process.stderr, self.parser)
self.stdout_watcher.start()
self.stderr_watcher.start()
def run(self) -> list[Workload]:
# Allocate graph block (1 status + GRAPH_HEIGHT + 1 markers line)
sys.stdout.write("\n" * TimelineView.BLOCK_HEIGHT)
while self.process.poll() is None:
start_time = time.monotonic()
self._incremental_update()
elapsed = time.monotonic() - start_time
time.sleep(max(0.0, self.POLL_INTERVAL_SECONDS - elapsed))
self.process.communicate()
if self.process.returncode == 0:
self._process_messages()
self._finalize_current_test(is_final=True)
return self.workloads
else:
Terminal.move_cursor_up(TimelineView.BLOCK_HEIGHT)
Terminal.erase_line()
print(f"{Terminal.RED}{self.current_workload.name:<33} FAILED "
f"(code {self.process.returncode}){Terminal.RESET}")
for _ in range(TimelineView.BLOCK_HEIGHT - 1):
Terminal.erase_line()
sys.stdout.write("\n")
Terminal.move_cursor_up(TimelineView.BLOCK_HEIGHT - 1)
sys.stdout.flush()
return self.workloads
def _incremental_update(self):
current_ann, is_new_workload = self._process_messages()
rss = self._get_total_rss_kb(self.process.pid)
if rss <= 0:
return
data_point = DataPoint(rss, current_ann, self.current_workload.index,
is_new_workload)
self.global_max_rss = max(self.global_max_rss, rss)
self.global_rss_history.append(data_point)
self.current_workload.add_data_point(data_point)
self._incremental_update_view(rss)
def _incremental_update_view(self, rss: float) -> None:
spin_char = next(self.spinner)
rss_mb = int(rss) // 1024
name_col = f"{spin_char} {self.current_workload.name}"
score_col = f"{spin_char} pts"
median_col = f"{rss_mb:>4} MB ⌀"
peak_col = f"{int(self.global_max_rss) // 1024:>4} MB ▲"
Terminal.move_cursor_up(TimelineView.BLOCK_HEIGHT)
Terminal.erase_line()
ResultsTableView.print_line(name_col,
score_col,
median_col,
peak_col,
end_separator=True)
for line in TimelineView.get_multi_line_graph(self.global_rss_history):
Terminal.erase_line()
sys.stdout.write(line)
sys.stdout.write("\n")
sys.stdout.flush()
def _get_total_rss_kb(self, parent_pid: int) -> float:
total_rss = 0
try:
parent = psutil.Process(parent_pid)
total_rss += parent.memory_info().rss
for child in parent.children(recursive=True):
total_rss += child.memory_info().rss
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
return total_rss / 1024
def _process_messages(self) -> tuple[DataPointAnnotation, bool]:
"""Drain messages posted from the IOPipeWatcher threads."""
current_ann = DataPointAnnotation.NONE
is_new_workload = False
while not self.message_queue.empty():
message = self.message_queue.get()
ann, new_wl = self._process_message(message)
current_ann = max(current_ann, ann)
is_new_workload = is_new_workload or new_wl
return current_ann, is_new_workload
def _process_message(
self, message: MonitorMessage) -> tuple[DataPointAnnotation, bool]:
if message.type == MonitorMessageType.GC:
return self._process_gc_message(message)
elif message.type == MonitorMessageType.SCORE:
return self._process_score_message(message)
elif message.type == MonitorMessageType.WORKLOAD_START:
return self._process_workload_start_message(message)
elif message.type == MonitorMessageType.OVERALL_SCORE:
summary = Workload.summary(score=message.value)
self.workloads.append(summary)
return DataPointAnnotation.NONE, False
def _process_gc_message(
self, message: MonitorMessage) -> tuple[DataPointAnnotation, bool]:
return message.value, False
def _process_score_message(
self, message: MonitorMessage) -> tuple[DataPointAnnotation, bool]:
# Search for matching workload by name (or use current if it matches)
score_name, score_val = message.value
if score_name == self.current_workload.name:
self.current_workload.score = score_val
else:
for w in self.workloads:
if w.name == score_name:
w.score = score_val
break
return DataPointAnnotation.NONE, False
def _process_workload_start_message(
self, message: MonitorMessage) -> tuple[DataPointAnnotation, bool]:
new_test_name = message.value
is_new_workload = False
if self.current_workload.name == COMBINED_TEST_NAME and (
new_test_name != self.current_workload.name):
self.current_workload.index = self.start_index
self.current_workload.name = new_test_name
self.current_workload.timeline = []
self.current_workload.max_rss = 0
is_new_workload = True
elif new_test_name != self.current_workload.name:
self._finalize_current_test()
self.current_workload = Workload(new_test_name,
self.current_workload.index + 1)
is_new_workload = True
return DataPointAnnotation.NONE, is_new_workload
def _finalize_current_test(self, is_final: bool = False):
if not self.current_workload.timeline:
return
self.workloads.append(self.current_workload)
self._print_finished_workload(is_final)
def _print_finished_workload(self, is_final):
Terminal.move_cursor_up(TimelineView.BLOCK_HEIGHT)
self.current_workload.print(sparkline=True)
num_lines_to_clear = TimelineView.BLOCK_HEIGHT
# Clear the remaining lines of the graph block
for _ in range(num_lines_to_clear - 1):
Terminal.erase_line()
sys.stdout.write("\n")
# Move cursor back up to the line immediately after the summary
Terminal.move_cursor_up(num_lines_to_clear - 1)
sys.stdout.flush()
if not is_final:
# After print_finished_test, we are at the line immediately after
# the summary. Allocate lines for the NEXT test's graph.
sys.stdout.write("\n" * TimelineView.BLOCK_HEIGHT)
def main() -> None:
parser = argparse.ArgumentParser(
description="Measure memory of JetStream3 benchmarks.",
usage="%(prog)s <engine_path> [engine_flags...] [cli_js_args...]")
parser.add_argument("engine_path",
type=Path,
help="Path to the JS engine executable")
parser.add_argument("--split",
"--separate",
action="store_true",
default=False,
help="Run each benchmark in a separate engine process")
args, unknown_args = parser.parse_known_args()
engine_path = args.engine_path.expanduser()
if not engine_path.exists():
return print(f"Engine not found: {engine_path}")
engine_type = Engine.detect(engine_path)
cli_js_args, engine_flags = _prepare_engine_flags(engine_type, unknown_args)
run_combined = not args.split
if run_combined:
tests = [COMBINED_TEST_NAME]
else:
tests = Runner.get_test_list(engine_path, engine_type, cli_js_args)
print(f"Found {len(tests)} tests.")
print(f"Engine: {engine_path} {shlex.join(engine_flags)}")
print(f"CLI args: {shlex.join(cli_js_args)}\n")
ResultsTableView.print_header(sparkline=True)
results: list[Workload] = []
try:
for idx, test in enumerate(tests, start=1):
if res := Runner.measure_test(
engine_path,
engine_type,
test,
engine_flags,
cli_js_args,
combined=run_combined,
start_index=idx,
):
results.extend(res)
except KeyboardInterrupt:
print("\nInterrupted.")
ResultsTableView.print_summary(results)
def _prepare_engine_flags(
engine_type: Engine, extra_args: list[str]) -> tuple[list[str], list[str]]:
base_engine_flags: list[str] = []
if engine_type == Engine.V8:
base_engine_flags = ["--trace-gc"]
user_engine_flags: list[str] = []
cli_js_args: list[str] = []
for arg in extra_args:
if arg.startswith("--") and not any(
arg.startswith(p) for p in JS_FLAG_PREFIXES):
user_engine_flags.append(arg)
elif arg.startswith("-") and not arg.startswith("--"):
user_engine_flags.append(arg)
else:
cli_js_args.append(arg)
engine_flags = base_engine_flags + user_engine_flags
return cli_js_args, engine_flags
if __name__ == "__main__":
main()