| """JSON Lines (JSONL) collector for the sampling profiler. |
| |
| Emits a normalized newline-delimited JSON record stream suitable for |
| programmatic consumption by external tools, scripts, and agents. Each line |
| is one JSON object; consumers can parse the file incrementally line by |
| line, but the producer writes the whole file at the end of the run (it is |
| not a live/streaming producer). |
| |
| Record schema |
| ============= |
| |
| Every record is a JSON object with at least ``"type"``, ``"v"`` (record |
| schema version), and ``"run_id"`` (UUID4 hex tagging the run; allows |
| demultiplexing concatenated streams). Records appear in this fixed order: |
| |
| 1. ``meta`` (exactly one, first line):: |
| |
| {"type":"meta","v":0,"run_id":"<hex>", |
| "sample_interval_usec":<int>,"mode":"wall|cpu|gil|all|exception"} |
| |
| ``mode`` is omitted when not provided. |
| |
| 2. ``string_table`` (zero or more):: |
| |
| {"type":"string_table","v":0,"run_id":"<hex>", |
| "strings":[{"str_id":<int>,"value":"<str>"}, ...]} |
| |
| Strings (filenames, function names) are interned to keep repeated values |
| compact. IDs are zero-based. Each chunk holds up to ``_CHUNK_SIZE`` |
| entries, and each entry carries its explicit ``str_id`` so consumers do |
| not need to infer offsets across chunks. |
| |
| 3. ``frame_table`` (zero or more):: |
| |
| {"type":"frame_table","v":0,"run_id":"<hex>", |
| "frames":[{"frame_id":<int>,"path_str_id":<int>,"func_str_id":<int>, |
| "line":<int>,"end_line":<int>,"col":<int>, |
| "end_col":<int>}, ...]} |
| |
| ``end_line``/``col``/``end_col`` are *omitted* when source location data |
| is unavailable (a missing key means "not available", not zero or null). |
| ``line`` is ``0`` for synthetic frames (for example, internal marker |
| frames whose source location is None). Frame IDs are zero-based. |
| |
| 4. ``agg`` (zero or more):: |
| |
| {"type":"agg","v":0,"run_id":"<hex>","kind":"frame","scope":"final", |
| "samples_total":<int>, |
| "entries":[{"frame_id":<int>,"self":<int>,"cumulative":<int>}, ...]} |
| |
| ``self`` counts samples where the frame was the leaf (currently |
| executing); ``cumulative`` counts samples where the frame appeared |
| anywhere in the stack (deduped per sample so recursion does not |
| double-count). ``samples_total`` is the run-wide total, repeated on |
| each chunk so a streaming consumer always knows the denominator. |
| |
| 5. ``end`` (exactly one, last line):: |
| |
| {"type":"end","v":0,"run_id":"<hex>","samples_total":<int>} |
| |
| Presence of ``end`` is the consumer's signal that the file is complete. |
| |
| Forward compatibility |
| ===================== |
| |
| Consumers MUST ignore unknown record ``"type"`` values and unknown object |
| fields. New fields will be added by adding optional keys; an incompatible |
| schema change will bump the per-record ``"v"``. |
| """ |
| |
| from collections import Counter |
| import json |
| import uuid |
| from itertools import batched |
| |
| from .constants import PROFILING_MODE_NAMES |
| from .collector import normalize_location |
| from .stack_collector import StackTraceCollector |
| |
| |
| _CHUNK_SIZE = 256 |
| _SCHEMA_VERSION = 0 |
| |
| |
| class JsonlCollector(StackTraceCollector): |
| """Collector that exports finalized profiling data as JSONL. |
| |
| See the module docstring for the full record schema. The collector |
| accumulates samples in memory and writes the complete file at |
| ``export()`` time. |
| """ |
| |
| def __init__(self, sample_interval_usec, *, skip_idle=False, mode=None): |
| super().__init__(sample_interval_usec, skip_idle=skip_idle) |
| self.run_id = uuid.uuid4().hex |
| |
| self._string_to_id = {} |
| self._strings = [] |
| |
| self._frame_to_id = {} |
| self._frames = [] |
| |
| self._frame_self = Counter() |
| self._frame_cumulative = Counter() |
| self._samples_total = 0 |
| self._seen_frame_ids = set() |
| |
| self._mode = mode |
| |
| def process_frames(self, frames, _thread_id, weight=1): |
| self._samples_total += weight |
| self._seen_frame_ids.clear() |
| |
| for i, (filename, location, funcname, _opcode) in enumerate(frames): |
| frame_id = self._get_or_create_frame_id( |
| filename, location, funcname |
| ) |
| is_leaf = i == 0 |
| count_cumulative = frame_id not in self._seen_frame_ids |
| |
| if count_cumulative: |
| self._seen_frame_ids.add(frame_id) |
| |
| if is_leaf: |
| self._frame_self[frame_id] += weight |
| |
| if count_cumulative: |
| self._frame_cumulative[frame_id] += weight |
| |
| def export(self, filename): |
| with open(filename, "w", encoding="utf-8") as output: |
| self._write_message(output, self._build_meta_record()) |
| self._write_chunked_records( |
| output, |
| { |
| "type": "string_table", |
| "v": _SCHEMA_VERSION, |
| "run_id": self.run_id, |
| }, |
| "strings", |
| self._strings, |
| ) |
| self._write_chunked_records( |
| output, |
| { |
| "type": "frame_table", |
| "v": _SCHEMA_VERSION, |
| "run_id": self.run_id, |
| }, |
| "frames", |
| self._frames, |
| ) |
| self._write_chunked_records( |
| output, |
| { |
| "type": "agg", |
| "v": _SCHEMA_VERSION, |
| "run_id": self.run_id, |
| "kind": "frame", |
| "scope": "final", |
| "samples_total": self._samples_total, |
| }, |
| "entries", |
| self._iter_final_agg_entries(), |
| ) |
| self._write_message(output, self._build_end_record()) |
| |
| def _build_meta_record(self): |
| record = { |
| "type": "meta", |
| "v": _SCHEMA_VERSION, |
| "run_id": self.run_id, |
| "sample_interval_usec": self.sample_interval_usec, |
| } |
| |
| if self._mode is not None: |
| record["mode"] = PROFILING_MODE_NAMES.get( |
| self._mode, str(self._mode) |
| ) |
| |
| return record |
| |
| def _build_end_record(self): |
| record = { |
| "type": "end", |
| "v": _SCHEMA_VERSION, |
| "run_id": self.run_id, |
| "samples_total": self._samples_total, |
| } |
| |
| return record |
| |
| def _iter_final_agg_entries(self): |
| for frame_record in self._frames: |
| frame_id = frame_record["frame_id"] |
| yield { |
| "frame_id": frame_id, |
| "self": self._frame_self[frame_id], |
| "cumulative": self._frame_cumulative[frame_id], |
| } |
| |
| def _get_or_create_frame_id(self, filename, location, funcname): |
| location_fields = self._location_to_export_fields(location) |
| func_str_id = self._intern_string(funcname) |
| path_str_id = self._intern_string(filename) |
| |
| frame_key = ( |
| path_str_id, |
| func_str_id, |
| location_fields["line"], |
| location_fields.get("end_line"), |
| location_fields.get("col"), |
| location_fields.get("end_col"), |
| ) |
| |
| if (frame_id := self._frame_to_id.get(frame_key)) is not None: |
| return frame_id |
| |
| frame_id = len(self._frames) |
| frame_record = { |
| "frame_id": frame_id, |
| "path_str_id": path_str_id, |
| "func_str_id": func_str_id, |
| **location_fields, |
| } |
| |
| self._frame_to_id[frame_key] = frame_id |
| self._frames.append(frame_record) |
| return frame_id |
| |
| def _intern_string(self, value): |
| value = str(value) |
| |
| if (string_id := self._string_to_id.get(value)) is not None: |
| return string_id |
| |
| string_id = len(self._strings) |
| self._string_to_id[value] = string_id |
| self._strings.append({"str_id": string_id, "value": value}) |
| return string_id |
| |
| @staticmethod |
| def _location_to_export_fields(location): |
| lineno, end_lineno, col_offset, end_col_offset = normalize_location( |
| location |
| ) |
| |
| fields = {"line": lineno} |
| if end_lineno > 0: |
| fields["end_line"] = end_lineno |
| if col_offset >= 0: |
| fields["col"] = col_offset |
| if end_col_offset >= 0: |
| fields["end_col"] = end_col_offset |
| return fields |
| |
| def _write_chunked_records( |
| self, output, base_record, chunk_field, entries |
| ): |
| for chunk in batched(entries, _CHUNK_SIZE): |
| self._write_message(output, {**base_record, chunk_field: chunk}) |
| |
| @staticmethod |
| def _write_message(output, record): |
| output.write(json.dumps(record, separators=(",", ":"))) |
| output.write("\n") |