blob: 3a20f3463b03844d61aef2d7ca81242fad391477 [file] [log] [blame]
/******************************************************************************
* Python Remote Debugging Module - Binary Writer Implementation
*
* High-performance binary file writer for profiling data with optional zstd
* streaming compression.
******************************************************************************/
#ifndef Py_BUILD_CORE_MODULE
# define Py_BUILD_CORE_MODULE
#endif
#include "binary_io.h"
#include "_remote_debugging.h"
#include <string.h>
#ifdef HAVE_ZSTD
#include <zstd.h>
#endif
/* ============================================================================
* CONSTANTS FOR BINARY FORMAT SIZES
* ============================================================================ */
/* Sample header sizes */
#define SAMPLE_HEADER_FIXED_SIZE 13 /* thread_id(8) + interpreter_id(4) + encoding(1) */
#define SAMPLE_HEADER_MAX_SIZE 26 /* fixed + max_varint(10) + status(1) + margin */
#define MAX_VARINT_SIZE 10 /* Maximum bytes for a varint64 */
#define MAX_VARINT_SIZE_U32 5 /* Maximum bytes for a varint32 */
/* Frame buffer: depth varint (max 2 bytes for 256) + 256 frames * 5 bytes/varint + margin */
#define MAX_FRAME_BUFFER_SIZE ((MAX_STACK_DEPTH * MAX_VARINT_SIZE_U32) + MAX_VARINT_SIZE_U32 + 16)
/* File structure sizes */
#define FILE_FOOTER_SIZE 32
/* ============================================================================
* WRITER-SPECIFIC UTILITY HELPERS
* ============================================================================ */
/* Grow two parallel arrays together (e.g., strings and string_lengths).
* Returns 0 on success, -1 on error (sets PyErr).
* On error, original arrays are preserved (truly atomic update). */
static inline int
grow_parallel_arrays(void **array1, void **array2, size_t *capacity,
size_t elem_size1, size_t elem_size2)
{
size_t old_cap = *capacity;
if (old_cap > SIZE_MAX / 2) {
PyErr_SetString(PyExc_OverflowError, "Array capacity overflow");
return -1;
}
size_t new_cap = old_cap * 2;
if (new_cap > SIZE_MAX / elem_size1 || new_cap > SIZE_MAX / elem_size2) {
PyErr_SetString(PyExc_OverflowError, "Array allocation size overflow");
return -1;
}
size_t new_size1 = new_cap * elem_size1;
size_t new_size2 = new_cap * elem_size2;
size_t old_size1 = old_cap * elem_size1;
size_t old_size2 = old_cap * elem_size2;
/* Allocate fresh memory blocks (not realloc) to ensure atomicity.
* If either allocation fails, original arrays are completely unchanged. */
void *new_array1 = PyMem_Malloc(new_size1);
if (!new_array1) {
PyErr_NoMemory();
return -1;
}
void *new_array2 = PyMem_Malloc(new_size2);
if (!new_array2) {
/* Second allocation failed - free first and return with no state change */
PyMem_Free(new_array1);
PyErr_NoMemory();
return -1;
}
/* Both allocations succeeded - copy data and update pointers atomically */
memcpy(new_array1, *array1, old_size1);
memcpy(new_array2, *array2, old_size2);
PyMem_Free(*array1);
PyMem_Free(*array2);
*array1 = new_array1;
*array2 = new_array2;
*capacity = new_cap;
return 0;
}
/* Checked fwrite with GIL release - returns 0 on success, -1 on error (sets PyErr).
* This version releases the GIL during the write operation to allow other Python
* threads to run during potentially blocking I/O. */
static inline int
fwrite_checked_allow_threads(const void *data, size_t size, FILE *fp)
{
size_t written;
Py_BEGIN_ALLOW_THREADS
written = fwrite(data, 1, size, fp);
Py_END_ALLOW_THREADS
if (written != size) {
PyErr_SetFromErrno(PyExc_IOError);
return -1;
}
return 0;
}
/* Forward declaration for writer_write_bytes */
static inline int writer_write_bytes(BinaryWriter *writer, const void *data, size_t size);
/* Encode and write a varint u32 - returns 0 on success, -1 on error */
static inline int
writer_write_varint_u32(BinaryWriter *writer, uint32_t value)
{
uint8_t buf[MAX_VARINT_SIZE];
size_t len = encode_varint_u32(buf, value);
return writer_write_bytes(writer, buf, len);
}
/* Encode and write a varint u64 - returns 0 on success, -1 on error */
static inline int
writer_write_varint_u64(BinaryWriter *writer, uint64_t value)
{
uint8_t buf[MAX_VARINT_SIZE];
size_t len = encode_varint_u64(buf, value);
return writer_write_bytes(writer, buf, len);
}
/* ============================================================================
* UTILITY FUNCTIONS
* ============================================================================ */
int
binary_io_zstd_available(void)
{
#ifdef HAVE_ZSTD
return 1;
#else
return 0;
#endif
}
int
binary_io_get_best_compression(void)
{
#ifdef HAVE_ZSTD
return COMPRESSION_ZSTD;
#else
return COMPRESSION_NONE;
#endif
}
/* ============================================================================
* BINARY WRITER IMPLEMENTATION
* ============================================================================ */
static int
writer_init_zstd(BinaryWriter *writer)
{
#ifdef HAVE_ZSTD
writer->zstd.cctx = ZSTD_createCCtx();
if (!writer->zstd.cctx) {
PyErr_SetString(PyExc_MemoryError, "Failed to create zstd compression context");
return -1;
}
/* Compression level 5: better ratio for repetitive profiling data */
size_t result = ZSTD_CCtx_setParameter(writer->zstd.cctx,
ZSTD_c_compressionLevel, 5);
if (ZSTD_isError(result)) {
PyErr_Format(PyExc_RuntimeError, "Failed to set zstd compression level: %s",
ZSTD_getErrorName(result));
ZSTD_freeCCtx(writer->zstd.cctx);
writer->zstd.cctx = NULL;
return -1;
}
/* Use large buffer (512KB) for fewer I/O syscalls */
writer->zstd.compressed_buffer = PyMem_Malloc(COMPRESSED_BUFFER_SIZE);
if (!writer->zstd.compressed_buffer) {
ZSTD_freeCCtx(writer->zstd.cctx);
writer->zstd.cctx = NULL;
PyErr_NoMemory();
return -1;
}
writer->zstd.compressed_buffer_size = COMPRESSED_BUFFER_SIZE;
return 0;
#else
PyErr_SetString(PyExc_RuntimeError,
"zstd compression requested but not available (HAVE_ZSTD not defined)");
return -1;
#endif
}
static int
writer_flush_buffer(BinaryWriter *writer)
{
if (writer->buffer_pos == 0) {
return 0;
}
#ifdef HAVE_ZSTD
if (writer->compression_type == COMPRESSION_ZSTD) {
ZSTD_inBuffer input = { writer->write_buffer, writer->buffer_pos, 0 };
while (input.pos < input.size) {
ZSTD_outBuffer output = {
writer->zstd.compressed_buffer,
writer->zstd.compressed_buffer_size,
0
};
size_t result = ZSTD_compressStream2(
writer->zstd.cctx, &output, &input, ZSTD_e_continue
);
if (ZSTD_isError(result)) {
PyErr_Format(PyExc_IOError, "zstd compression error: %s",
ZSTD_getErrorName(result));
return -1;
}
if (output.pos > 0) {
if (fwrite_checked_allow_threads(writer->zstd.compressed_buffer, output.pos, writer->fp) < 0) {
return -1;
}
}
}
} else
#endif
{
if (fwrite_checked_allow_threads(writer->write_buffer, writer->buffer_pos, writer->fp) < 0) {
return -1;
}
}
writer->buffer_pos = 0;
return 0;
}
static inline int
writer_write_bytes(BinaryWriter *writer, const void *data, size_t size)
{
const uint8_t *src = (const uint8_t *)data;
size_t original_size = size;
while (size > 0) {
size_t space = writer->buffer_size - writer->buffer_pos;
size_t to_copy = (size < space) ? size : space;
memcpy(writer->write_buffer + writer->buffer_pos, src, to_copy);
writer->buffer_pos += to_copy;
src += to_copy;
size -= to_copy;
if (writer->buffer_pos == writer->buffer_size) {
if (writer_flush_buffer(writer) < 0) {
return -1;
}
}
}
writer->stats.bytes_written += original_size;
return 0;
}
/* ============================================================================
* HASH TABLE SUPPORT FUNCTIONS (using _Py_hashtable)
* ============================================================================ */
/* Hash function for Python strings - uses Python's cached hash */
static Py_uhash_t
string_hash_func(const void *key)
{
PyObject *str = (PyObject *)key;
Py_hash_t hash = PyObject_Hash(str);
if (hash == -1) {
PyErr_Clear();
return 0;
}
return (Py_uhash_t)hash;
}
static int
string_compare_func(const void *key1, const void *key2)
{
PyObject *str1 = (PyObject *)key1;
PyObject *str2 = (PyObject *)key2;
if (str1 == str2) {
return 1;
}
int result = PyObject_RichCompareBool(str1, str2, Py_EQ);
if (result == -1) {
PyErr_Clear();
return 0;
}
return result;
}
static void
string_key_destroy(void *key)
{
Py_XDECREF((PyObject *)key);
}
static Py_uhash_t
frame_key_hash_func(const void *key)
{
const FrameKey *fk = (const FrameKey *)key;
/* FNV-1a style hash combining all three values */
Py_uhash_t hash = 2166136261u;
hash ^= fk->filename_idx;
hash *= 16777619u;
hash ^= fk->funcname_idx;
hash *= 16777619u;
hash ^= (uint32_t)fk->lineno;
hash *= 16777619u;
return hash;
}
static int
frame_key_compare_func(const void *key1, const void *key2)
{
const FrameKey *fk1 = (const FrameKey *)key1;
const FrameKey *fk2 = (const FrameKey *)key2;
return (fk1->filename_idx == fk2->filename_idx &&
fk1->funcname_idx == fk2->funcname_idx &&
fk1->lineno == fk2->lineno);
}
static void
frame_key_destroy(void *key)
{
PyMem_Free(key);
}
static inline int
writer_intern_string(BinaryWriter *writer, PyObject *string, uint32_t *index)
{
void *existing = _Py_hashtable_get(writer->string_hash, string);
if (existing != NULL) {
*index = (uint32_t)(uintptr_t)existing - 1; /* index+1 stored to distinguish from NULL */
return 0;
}
if (writer->string_count >= writer->string_capacity) {
if (grow_parallel_arrays((void **)&writer->strings,
(void **)&writer->string_lengths,
&writer->string_capacity,
sizeof(char *), sizeof(size_t)) < 0) {
return -1;
}
}
Py_ssize_t str_len;
const char *str_data = PyUnicode_AsUTF8AndSize(string, &str_len);
if (!str_data) {
return -1;
}
char *str_copy = PyMem_Malloc(str_len + 1);
if (!str_copy) {
PyErr_NoMemory();
return -1;
}
memcpy(str_copy, str_data, str_len + 1);
*index = (uint32_t)writer->string_count;
/* Add to hash table FIRST to ensure atomic rollback on failure */
Py_INCREF(string);
if (_Py_hashtable_set(writer->string_hash, string, (void *)(uintptr_t)(*index + 1)) < 0) {
Py_DECREF(string);
PyMem_Free(str_copy);
PyErr_NoMemory();
return -1;
}
writer->strings[writer->string_count] = str_copy;
writer->string_lengths[writer->string_count] = str_len;
writer->string_count++;
return 0;
}
static inline int
writer_intern_frame(BinaryWriter *writer, uint32_t filename_idx, uint32_t funcname_idx,
int32_t lineno, uint32_t *index)
{
FrameKey lookup_key = {filename_idx, funcname_idx, lineno};
void *existing = _Py_hashtable_get(writer->frame_hash, &lookup_key);
if (existing != NULL) {
*index = (uint32_t)(uintptr_t)existing - 1; /* index+1 stored to distinguish from NULL */
return 0;
}
if (GROW_ARRAY(writer->frame_entries, writer->frame_count,
writer->frame_capacity, FrameEntry) < 0) {
return -1;
}
FrameKey *key = PyMem_Malloc(sizeof(FrameKey));
if (!key) {
PyErr_NoMemory();
return -1;
}
*key = lookup_key;
*index = (uint32_t)writer->frame_count;
FrameEntry *fe = &writer->frame_entries[writer->frame_count];
fe->filename_idx = filename_idx;
fe->funcname_idx = funcname_idx;
fe->lineno = lineno;
if (_Py_hashtable_set(writer->frame_hash, key, (void *)(uintptr_t)(*index + 1)) < 0) {
PyMem_Free(key);
PyErr_NoMemory();
return -1;
}
writer->frame_count++;
return 0;
}
/* Get or create a thread entry for the given thread_id.
* Returns pointer to ThreadEntry, or NULL on allocation failure.
* If is_new is non-NULL, sets it to 1 if this is a new thread, 0 otherwise. */
static ThreadEntry *
writer_get_or_create_thread_entry(BinaryWriter *writer, uint64_t thread_id,
uint32_t interpreter_id, int *is_new)
{
/* Linear search is OK for small number of threads.
* Key is (thread_id, interpreter_id) since same thread_id can exist in different interpreters. */
for (size_t i = 0; i < writer->thread_count; i++) {
if (writer->thread_entries[i].thread_id == thread_id &&
writer->thread_entries[i].interpreter_id == interpreter_id) {
if (is_new) {
*is_new = 0;
}
return &writer->thread_entries[i];
}
}
if (writer->thread_count >= writer->thread_capacity) {
ThreadEntry *new_entries = grow_array(writer->thread_entries,
&writer->thread_capacity,
sizeof(ThreadEntry));
if (!new_entries) {
return NULL;
}
writer->thread_entries = new_entries;
}
ThreadEntry *entry = &writer->thread_entries[writer->thread_count];
memset(entry, 0, sizeof(ThreadEntry));
entry->thread_id = thread_id;
entry->interpreter_id = interpreter_id;
entry->prev_timestamp = writer->start_time_us;
entry->prev_stack_capacity = MAX_STACK_DEPTH;
entry->pending_rle_capacity = INITIAL_RLE_CAPACITY;
entry->prev_stack = PyMem_Malloc(entry->prev_stack_capacity * sizeof(uint32_t));
if (!entry->prev_stack) {
PyErr_NoMemory();
return NULL;
}
entry->pending_rle = PyMem_Malloc(entry->pending_rle_capacity * sizeof(PendingRLESample));
if (!entry->pending_rle) {
PyMem_Free(entry->prev_stack);
PyErr_NoMemory();
return NULL;
}
writer->thread_count++;
if (is_new) {
*is_new = 1;
}
return entry;
}
/* Compare two stacks and return the encoding type and parameters.
* Sets:
* - shared_count: number of frames matching from bottom of stack
* - pop_count: frames to remove from prev stack
* - push_count: new frames to add
*
* Returns the best encoding type to use. */
static int
compare_stacks(const uint32_t *prev_stack, size_t prev_depth,
const uint32_t *curr_stack, size_t curr_depth,
size_t *shared_count, size_t *pop_count, size_t *push_count)
{
if (prev_depth == curr_depth) {
int identical = 1;
for (size_t i = 0; i < prev_depth; i++) {
if (prev_stack[i] != curr_stack[i]) {
identical = 0;
break;
}
}
if (identical) {
*shared_count = prev_depth;
*pop_count = 0;
*push_count = 0;
return STACK_REPEAT;
}
}
/* Find longest common suffix (frames at the bottom/outer part of stack).
* Stacks are stored innermost-first, so suffix is at the end. */
size_t suffix_len = 0;
size_t min_depth = (prev_depth < curr_depth) ? prev_depth : curr_depth;
for (size_t i = 0; i < min_depth; i++) {
size_t prev_idx = prev_depth - 1 - i;
size_t curr_idx = curr_depth - 1 - i;
if (prev_stack[prev_idx] == curr_stack[curr_idx]) {
suffix_len++;
} else {
break;
}
}
*shared_count = suffix_len;
*pop_count = prev_depth - suffix_len;
*push_count = curr_depth - suffix_len;
/* Choose best encoding based on byte cost */
/* STACK_FULL: 1 (type) + 1-2 (depth) + sum(frame varints) */
/* STACK_SUFFIX: 1 (type) + 1-2 (shared) + 1-2 (new_count) + sum(new frame varints) */
/* STACK_POP_PUSH: 1 (type) + 1-2 (pop) + 1-2 (push) + sum(new frame varints) */
/* If no common suffix, use full stack */
if (suffix_len == 0) {
return STACK_FULL;
}
/* If only adding frames (suffix == prev_depth), use SUFFIX */
if (*pop_count == 0 && *push_count > 0) {
return STACK_SUFFIX;
}
/* If popping and/or pushing, use POP_PUSH if it saves bytes */
/* Heuristic: POP_PUSH is better when we're modifying top frames */
if (*pop_count > 0 || *push_count > 0) {
/* Use full stack if sharing less than half the frames */
if (suffix_len < curr_depth / 2) {
return STACK_FULL;
}
return STACK_POP_PUSH;
}
return STACK_FULL;
}
/* Write common sample header: thread_id(8) + interpreter_id(4) + encoding(1).
* Returns 0 on success, -1 on failure. */
static inline int
write_sample_header(BinaryWriter *writer, ThreadEntry *entry, uint8_t encoding)
{
uint8_t header[SAMPLE_HEADER_FIXED_SIZE];
memcpy(header, &entry->thread_id, 8);
memcpy(header + 8, &entry->interpreter_id, 4);
header[12] = encoding;
return writer_write_bytes(writer, header, SAMPLE_HEADER_FIXED_SIZE);
}
/* Flush pending RLE samples for a thread.
* Writes the RLE record to the output buffer.
* Returns 0 on success, -1 on failure. */
static int
flush_pending_rle(BinaryWriter *writer, ThreadEntry *entry)
{
if (!entry->has_pending_rle || entry->pending_rle_count == 0) {
return 0;
}
/* Write RLE record:
* [thread_id: 8] [interpreter_id: 4] [STACK_REPEAT: 1] [count: varint]
* [timestamp_delta_1: varint] [status_1: 1] ... [timestamp_delta_N: varint] [status_N: 1]
*/
if (write_sample_header(writer, entry, STACK_REPEAT) < 0) {
return -1;
}
if (writer_write_varint_u32(writer, (uint32_t)entry->pending_rle_count) < 0) {
return -1;
}
for (size_t i = 0; i < entry->pending_rle_count; i++) {
if (writer_write_varint_u64(writer, entry->pending_rle[i].timestamp_delta) < 0) {
return -1;
}
if (writer_write_bytes(writer, &entry->pending_rle[i].status, 1) < 0) {
return -1;
}
writer->total_samples++;
}
writer->stats.repeat_records++;
writer->stats.repeat_samples += entry->pending_rle_count;
/* Each RLE sample saves writing the entire stack */
writer->stats.frames_saved += entry->pending_rle_count * entry->prev_stack_depth;
entry->pending_rle_count = 0;
entry->has_pending_rle = 0;
return 0;
}
/* Write a single sample with the specified encoding.
* Returns 0 on success, -1 on failure. */
static int
write_sample_with_encoding(BinaryWriter *writer, ThreadEntry *entry,
uint64_t timestamp_delta, uint8_t status,
int encoding_type,
const uint32_t *frame_indices, size_t stack_depth,
size_t shared_count, size_t pop_count, size_t push_count)
{
/* Header: thread_id(8) + interpreter_id(4) + encoding(1) + delta(varint) + status(1) */
uint8_t header_buf[SAMPLE_HEADER_MAX_SIZE];
memcpy(header_buf, &entry->thread_id, 8);
memcpy(header_buf + 8, &entry->interpreter_id, 4);
header_buf[12] = (uint8_t)encoding_type;
size_t varint_len = encode_varint_u64(header_buf + 13, timestamp_delta);
header_buf[13 + varint_len] = status;
if (writer_write_bytes(writer, header_buf, 14 + varint_len) < 0) {
return -1;
}
uint8_t frame_buf[MAX_FRAME_BUFFER_SIZE];
size_t frame_buf_pos = 0;
size_t frames_written = 0;
switch (encoding_type) {
case STACK_FULL:
/* [depth: varint] [frame_idx: varint]... */
frame_buf_pos += encode_varint_u32(frame_buf, (uint32_t)stack_depth);
for (size_t i = 0; i < stack_depth; i++) {
frame_buf_pos += encode_varint_u32(frame_buf + frame_buf_pos, frame_indices[i]);
}
frames_written = stack_depth;
writer->stats.full_records++;
break;
case STACK_SUFFIX:
/* [shared_count: varint] [new_count: varint] [new_frame_idx: varint]... */
frame_buf_pos += encode_varint_u32(frame_buf, (uint32_t)shared_count);
frame_buf_pos += encode_varint_u32(frame_buf + frame_buf_pos, (uint32_t)push_count);
/* New frames are at the top (beginning) of current stack */
for (size_t i = 0; i < push_count; i++) {
frame_buf_pos += encode_varint_u32(frame_buf + frame_buf_pos, frame_indices[i]);
}
frames_written = push_count;
writer->stats.suffix_records++;
/* Saved writing shared_count frames */
writer->stats.frames_saved += shared_count;
break;
case STACK_POP_PUSH:
/* [pop_count: varint] [push_count: varint] [new_frame_idx: varint]... */
frame_buf_pos += encode_varint_u32(frame_buf, (uint32_t)pop_count);
frame_buf_pos += encode_varint_u32(frame_buf + frame_buf_pos, (uint32_t)push_count);
/* New frames are at the top (beginning) of current stack */
for (size_t i = 0; i < push_count; i++) {
frame_buf_pos += encode_varint_u32(frame_buf + frame_buf_pos, frame_indices[i]);
}
frames_written = push_count;
writer->stats.pop_push_records++;
/* Saved writing shared_count frames (stack_depth - push_count if we had written full) */
writer->stats.frames_saved += shared_count;
break;
default:
PyErr_SetString(PyExc_RuntimeError, "Invalid stack encoding type");
return -1;
}
if (writer_write_bytes(writer, frame_buf, frame_buf_pos) < 0) {
return -1;
}
writer->stats.total_frames_written += frames_written;
writer->total_samples++;
return 0;
}
BinaryWriter *
binary_writer_create(const char *filename, uint64_t sample_interval_us, int compression_type,
uint64_t start_time_us)
{
BinaryWriter *writer = PyMem_Calloc(1, sizeof(BinaryWriter));
if (!writer) {
PyErr_NoMemory();
return NULL;
}
writer->filename = PyMem_Malloc(strlen(filename) + 1);
if (!writer->filename) {
PyMem_Free(writer);
PyErr_NoMemory();
return NULL;
}
strcpy(writer->filename, filename);
writer->start_time_us = start_time_us;
writer->sample_interval_us = sample_interval_us;
writer->compression_type = compression_type;
writer->write_buffer = PyMem_Malloc(WRITE_BUFFER_SIZE);
if (!writer->write_buffer) {
goto error;
}
writer->buffer_size = WRITE_BUFFER_SIZE;
writer->string_hash = _Py_hashtable_new_full(
string_hash_func,
string_compare_func,
string_key_destroy, /* Key destroy: decref the Python string */
NULL, /* Value destroy: values are just indices, not pointers */
NULL /* Use default allocator */
);
if (!writer->string_hash) {
goto error;
}
writer->strings = PyMem_Malloc(INITIAL_STRING_CAPACITY * sizeof(char *));
if (!writer->strings) {
goto error;
}
writer->string_lengths = PyMem_Malloc(INITIAL_STRING_CAPACITY * sizeof(size_t));
if (!writer->string_lengths) {
goto error;
}
writer->string_capacity = INITIAL_STRING_CAPACITY;
writer->frame_hash = _Py_hashtable_new_full(
frame_key_hash_func,
frame_key_compare_func,
frame_key_destroy, /* Key destroy: free the FrameKey */
NULL, /* Value destroy: values are just indices, not pointers */
NULL /* Use default allocator */
);
if (!writer->frame_hash) {
goto error;
}
writer->frame_entries = PyMem_Malloc(INITIAL_FRAME_CAPACITY * sizeof(FrameEntry));
if (!writer->frame_entries) {
goto error;
}
writer->frame_capacity = INITIAL_FRAME_CAPACITY;
writer->thread_entries = PyMem_Malloc(INITIAL_THREAD_CAPACITY * sizeof(ThreadEntry));
if (!writer->thread_entries) {
goto error;
}
writer->thread_capacity = INITIAL_THREAD_CAPACITY;
if (compression_type == COMPRESSION_ZSTD) {
if (writer_init_zstd(writer) < 0) {
goto error;
}
}
writer->fp = fopen(filename, "wb");
if (!writer->fp) {
PyErr_SetFromErrnoWithFilename(PyExc_IOError, filename);
goto error;
}
/* Hint sequential write pattern to kernel for better I/O scheduling */
#if defined(__linux__) && defined(POSIX_FADV_SEQUENTIAL)
{
int fd = fileno(writer->fp);
if (fd >= 0) {
(void)posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
}
}
#endif
uint8_t header[FILE_HEADER_PLACEHOLDER_SIZE] = {0};
if (fwrite_checked_allow_threads(header, FILE_HEADER_PLACEHOLDER_SIZE, writer->fp) < 0) {
goto error;
}
return writer;
error:
binary_writer_destroy(writer);
return NULL;
}
/* Build a frame stack from Python frame list by interning all strings and frames.
* Returns 0 on success, -1 on error. */
static int
build_frame_stack(BinaryWriter *writer, PyObject *frame_list,
uint32_t *curr_stack, size_t *curr_depth)
{
Py_ssize_t stack_depth = PyList_Size(frame_list);
*curr_depth = (stack_depth < MAX_STACK_DEPTH) ? stack_depth : MAX_STACK_DEPTH;
for (Py_ssize_t k = 0; k < (Py_ssize_t)*curr_depth; k++) {
/* Use unchecked accessors since we control the data structures */
PyObject *frame_info = PyList_GET_ITEM(frame_list, k);
/* Get filename, location, funcname from FrameInfo using unchecked access */
PyObject *filename = PyStructSequence_GET_ITEM(frame_info, 0);
PyObject *location = PyStructSequence_GET_ITEM(frame_info, 1);
PyObject *funcname = PyStructSequence_GET_ITEM(frame_info, 2);
/* Extract lineno from location (can be None for synthetic frames) */
int32_t lineno = 0;
if (location != Py_None) {
/* Use unchecked access - first element is lineno */
PyObject *lineno_obj = PyTuple_Check(location) ?
PyTuple_GET_ITEM(location, 0) :
PyStructSequence_GET_ITEM(location, 0);
lineno = (int32_t)PyLong_AsLong(lineno_obj);
if (UNLIKELY(PyErr_Occurred() != NULL)) {
PyErr_Clear();
lineno = 0;
}
}
/* Intern filename */
uint32_t filename_idx;
if (writer_intern_string(writer, filename, &filename_idx) < 0) {
return -1;
}
/* Intern funcname */
uint32_t funcname_idx;
if (writer_intern_string(writer, funcname, &funcname_idx) < 0) {
return -1;
}
/* Intern frame */
uint32_t frame_idx;
if (writer_intern_frame(writer, filename_idx, funcname_idx, lineno, &frame_idx) < 0) {
return -1;
}
curr_stack[k] = frame_idx;
}
return 0;
}
/* Process a single thread's sample.
* Returns 0 on success, -1 on error. */
static int
process_thread_sample(BinaryWriter *writer, PyObject *thread_info,
uint32_t interpreter_id, uint64_t timestamp_us)
{
PyObject *thread_id_obj = PyStructSequence_GET_ITEM(thread_info, 0);
PyObject *status_obj = PyStructSequence_GET_ITEM(thread_info, 1);
PyObject *frame_list = PyStructSequence_GET_ITEM(thread_info, 2);
uint64_t thread_id = PyLong_AsUnsignedLongLong(thread_id_obj);
if (thread_id == (uint64_t)-1 && PyErr_Occurred()) {
return -1;
}
long status_long = PyLong_AsLong(status_obj);
if (status_long == -1 && PyErr_Occurred()) {
return -1;
}
uint8_t status = (uint8_t)status_long;
int is_new_thread = 0;
ThreadEntry *entry = writer_get_or_create_thread_entry(
writer, thread_id, interpreter_id, &is_new_thread);
if (!entry) {
return -1;
}
/* Calculate timestamp delta */
uint64_t delta = timestamp_us - entry->prev_timestamp;
entry->prev_timestamp = timestamp_us;
/* Process frames and build current stack */
uint32_t curr_stack[MAX_STACK_DEPTH];
size_t curr_depth;
if (build_frame_stack(writer, frame_list, curr_stack, &curr_depth) < 0) {
return -1;
}
/* Compare with previous stack to determine encoding */
size_t shared_count, pop_count, push_count;
int encoding = compare_stacks(
entry->prev_stack, entry->prev_stack_depth,
curr_stack, curr_depth,
&shared_count, &pop_count, &push_count);
if (encoding == STACK_REPEAT && !is_new_thread) {
/* Buffer this sample for RLE */
if (GROW_ARRAY(entry->pending_rle, entry->pending_rle_count,
entry->pending_rle_capacity, PendingRLESample) < 0) {
return -1;
}
entry->pending_rle[entry->pending_rle_count].timestamp_delta = delta;
entry->pending_rle[entry->pending_rle_count].status = status;
entry->pending_rle_count++;
entry->has_pending_rle = 1;
} else {
/* Stack changed - flush any pending RLE first */
if (entry->has_pending_rle) {
if (flush_pending_rle(writer, entry) < 0) {
return -1;
}
}
if (write_sample_with_encoding(writer, entry, delta, status, encoding,
curr_stack, curr_depth,
shared_count, pop_count, push_count) < 0) {
return -1;
}
memcpy(entry->prev_stack, curr_stack, curr_depth * sizeof(uint32_t));
entry->prev_stack_depth = curr_depth;
}
return 0;
}
int
binary_writer_write_sample(BinaryWriter *writer, PyObject *stack_frames, uint64_t timestamp_us)
{
if (!PyList_Check(stack_frames)) {
PyErr_SetString(PyExc_TypeError, "stack_frames must be a list");
return -1;
}
Py_ssize_t num_interpreters = PyList_GET_SIZE(stack_frames);
for (Py_ssize_t i = 0; i < num_interpreters; i++) {
PyObject *interp_info = PyList_GET_ITEM(stack_frames, i);
PyObject *interp_id_obj = PyStructSequence_GET_ITEM(interp_info, 0);
PyObject *threads = PyStructSequence_GET_ITEM(interp_info, 1);
unsigned long interp_id_long = PyLong_AsUnsignedLong(interp_id_obj);
if (interp_id_long == (unsigned long)-1 && PyErr_Occurred()) {
return -1;
}
/* Bounds check: interpreter_id is stored as uint32_t in binary format */
if (interp_id_long > UINT32_MAX) {
PyErr_Format(PyExc_OverflowError,
"interpreter_id %lu exceeds maximum value %lu",
interp_id_long, (unsigned long)UINT32_MAX);
return -1;
}
uint32_t interpreter_id = (uint32_t)interp_id_long;
Py_ssize_t num_threads = PyList_GET_SIZE(threads);
for (Py_ssize_t j = 0; j < num_threads; j++) {
PyObject *thread_info = PyList_GET_ITEM(threads, j);
if (process_thread_sample(writer, thread_info, interpreter_id, timestamp_us) < 0) {
return -1;
}
}
}
return 0;
}
int
binary_writer_finalize(BinaryWriter *writer)
{
for (size_t i = 0; i < writer->thread_count; i++) {
if (writer->thread_entries[i].has_pending_rle) {
if (flush_pending_rle(writer, &writer->thread_entries[i]) < 0) {
return -1;
}
}
}
if (writer_flush_buffer(writer) < 0) {
return -1;
}
#ifdef HAVE_ZSTD
/* Finalize compression stream */
if (writer->compression_type == COMPRESSION_ZSTD && writer->zstd.cctx) {
ZSTD_inBuffer input = { NULL, 0, 0 };
size_t remaining;
do {
ZSTD_outBuffer output = {
writer->zstd.compressed_buffer,
writer->zstd.compressed_buffer_size,
0
};
remaining = ZSTD_compressStream2(writer->zstd.cctx, &output, &input, ZSTD_e_end);
if (ZSTD_isError(remaining)) {
PyErr_Format(PyExc_IOError, "zstd finalization error: %s",
ZSTD_getErrorName(remaining));
return -1;
}
if (output.pos > 0) {
if (fwrite_checked_allow_threads(writer->zstd.compressed_buffer, output.pos, writer->fp) < 0) {
return -1;
}
}
} while (remaining > 0);
}
#endif
/* Use 64-bit file position for >2GB files */
file_offset_t string_table_offset = FTELL64(writer->fp);
if (string_table_offset < 0) {
PyErr_SetFromErrno(PyExc_IOError);
return -1;
}
/* Release GIL during potentially large writes */
for (size_t i = 0; i < writer->string_count; i++) {
uint8_t len_buf[10];
size_t len_size = encode_varint_u32(len_buf, (uint32_t)writer->string_lengths[i]);
if (fwrite_checked_allow_threads(len_buf, len_size, writer->fp) < 0 ||
fwrite_checked_allow_threads(writer->strings[i], writer->string_lengths[i], writer->fp) < 0) {
return -1;
}
}
file_offset_t frame_table_offset = FTELL64(writer->fp);
if (frame_table_offset < 0) {
PyErr_SetFromErrno(PyExc_IOError);
return -1;
}
for (size_t i = 0; i < writer->frame_count; i++) {
FrameEntry *entry = &writer->frame_entries[i];
uint8_t buf[30];
size_t pos = encode_varint_u32(buf, entry->filename_idx);
pos += encode_varint_u32(buf + pos, entry->funcname_idx);
pos += encode_varint_i32(buf + pos, entry->lineno);
if (fwrite_checked_allow_threads(buf, pos, writer->fp) < 0) {
return -1;
}
}
/* Footer: string_count(4) + frame_count(4) + file_size(8) + checksum(16) */
file_offset_t footer_offset = FTELL64(writer->fp);
if (footer_offset < 0) {
PyErr_SetFromErrno(PyExc_IOError);
return -1;
}
uint64_t file_size = (uint64_t)footer_offset + 32;
uint8_t footer[32] = {0};
/* Cast size_t to uint32_t before memcpy to ensure correct bytes are copied
* on both little-endian and big-endian systems (size_t is 8 bytes on 64-bit) */
uint32_t string_count_u32 = (uint32_t)writer->string_count;
uint32_t frame_count_u32 = (uint32_t)writer->frame_count;
memcpy(footer + 0, &string_count_u32, 4);
memcpy(footer + 4, &frame_count_u32, 4);
memcpy(footer + 8, &file_size, 8);
/* bytes 16-31: checksum placeholder (zeros) */
if (fwrite_checked_allow_threads(footer, 32, writer->fp) < 0) {
return -1;
}
if (FSEEK64(writer->fp, 0, SEEK_SET) < 0) {
PyErr_SetFromErrno(PyExc_IOError);
return -1;
}
/* Convert file offsets and counts to fixed-width types for portable header format.
* This ensures correct behavior on both little-endian and big-endian systems. */
uint64_t string_table_offset_u64 = (uint64_t)string_table_offset;
uint64_t frame_table_offset_u64 = (uint64_t)frame_table_offset;
uint32_t thread_count_u32 = (uint32_t)writer->thread_count;
uint32_t compression_type_u32 = (uint32_t)writer->compression_type;
uint8_t header[FILE_HEADER_SIZE] = {0};
uint32_t magic = BINARY_FORMAT_MAGIC;
uint32_t version = BINARY_FORMAT_VERSION;
memcpy(header + HDR_OFF_MAGIC, &magic, HDR_SIZE_MAGIC);
memcpy(header + HDR_OFF_VERSION, &version, HDR_SIZE_VERSION);
header[HDR_OFF_PY_MAJOR] = PY_MAJOR_VERSION;
header[HDR_OFF_PY_MINOR] = PY_MINOR_VERSION;
header[HDR_OFF_PY_MICRO] = PY_MICRO_VERSION;
memcpy(header + HDR_OFF_START_TIME, &writer->start_time_us, HDR_SIZE_START_TIME);
memcpy(header + HDR_OFF_INTERVAL, &writer->sample_interval_us, HDR_SIZE_INTERVAL);
memcpy(header + HDR_OFF_SAMPLES, &writer->total_samples, HDR_SIZE_SAMPLES);
memcpy(header + HDR_OFF_THREADS, &thread_count_u32, HDR_SIZE_THREADS);
memcpy(header + HDR_OFF_STR_TABLE, &string_table_offset_u64, HDR_SIZE_STR_TABLE);
memcpy(header + HDR_OFF_FRAME_TABLE, &frame_table_offset_u64, HDR_SIZE_FRAME_TABLE);
memcpy(header + HDR_OFF_COMPRESSION, &compression_type_u32, HDR_SIZE_COMPRESSION);
if (fwrite_checked_allow_threads(header, FILE_HEADER_SIZE, writer->fp) < 0) {
return -1;
}
if (fclose(writer->fp) != 0) {
writer->fp = NULL;
PyErr_SetFromErrno(PyExc_IOError);
return -1;
}
writer->fp = NULL;
return 0;
}
void
binary_writer_destroy(BinaryWriter *writer)
{
if (!writer) {
return;
}
if (writer->fp) {
fclose(writer->fp);
}
PyMem_Free(writer->filename);
PyMem_Free(writer->write_buffer);
#ifdef HAVE_ZSTD
if (writer->zstd.cctx) {
ZSTD_freeCCtx(writer->zstd.cctx);
}
PyMem_Free(writer->zstd.compressed_buffer);
#endif
if (writer->string_hash) {
_Py_hashtable_destroy(writer->string_hash);
}
if (writer->strings) {
for (size_t i = 0; i < writer->string_count; i++) {
PyMem_Free(writer->strings[i]);
}
PyMem_Free(writer->strings);
}
PyMem_Free(writer->string_lengths);
if (writer->frame_hash) {
_Py_hashtable_destroy(writer->frame_hash);
}
PyMem_Free(writer->frame_entries);
if (writer->thread_entries) {
for (size_t i = 0; i < writer->thread_count; i++) {
PyMem_Free(writer->thread_entries[i].prev_stack);
PyMem_Free(writer->thread_entries[i].pending_rle);
}
PyMem_Free(writer->thread_entries);
}
PyMem_Free(writer);
}