/******************************************************************************
 * Remote Debugging Module - Main Module Implementation
 *
 * This file contains the main module initialization, the RemoteUnwinder
 * class implementation, and utility functions.
 ******************************************************************************/

#include "_remote_debugging.h"
#include "binary_io.h"

/* Forward declarations for clinic-generated code */
typedef struct {
    PyObject_HEAD
    BinaryWriter *writer;
    uint32_t cached_total_samples;  /* Preserved after finalize */
} BinaryWriterObject;

typedef struct {
    PyObject_HEAD
    BinaryReader *reader;
} BinaryReaderObject;

#include "clinic/module.c.h"

/* ============================================================================
 * STRUCTSEQ TYPE DEFINITIONS
 * ============================================================================ */

// TaskInfo structseq type
static PyStructSequence_Field TaskInfo_fields[] = {
    {"task_id", "Task ID (memory address)"},
    {"task_name", "Task name"},
    {"coroutine_stack", "Coroutine call stack"},
    {"awaited_by", "Tasks awaiting this task"},
    {NULL}
};

PyStructSequence_Desc TaskInfo_desc = {
    "_remote_debugging.TaskInfo",
    "Information about an asyncio task",
    TaskInfo_fields,
    4
};

// LocationInfo structseq type
static PyStructSequence_Field LocationInfo_fields[] = {
    {"lineno", "Line number"},
    {"end_lineno", "End line number"},
    {"col_offset", "Column offset"},
    {"end_col_offset", "End column offset"},
    {NULL}
};

PyStructSequence_Desc LocationInfo_desc = {
    "_remote_debugging.LocationInfo",
    "Source location information: (lineno, end_lineno, col_offset, end_col_offset)",
    LocationInfo_fields,
    4
};

// FrameInfo structseq type
static PyStructSequence_Field FrameInfo_fields[] = {
    {"filename", "Source code filename"},
    {"location", "LocationInfo structseq or None for synthetic frames"},
    {"funcname", "Function name"},
    {"opcode", "Opcode being executed (None if not gathered)"},
    {NULL}
};

PyStructSequence_Desc FrameInfo_desc = {
    "_remote_debugging.FrameInfo",
    "Information about a frame",
    FrameInfo_fields,
    4
};

// CoroInfo structseq type
static PyStructSequence_Field CoroInfo_fields[] = {
    {"call_stack", "Coroutine call stack"},
    {"task_name", "Task name"},
    {NULL}
};

PyStructSequence_Desc CoroInfo_desc = {
    "_remote_debugging.CoroInfo",
    "Information about a coroutine",
    CoroInfo_fields,
    2
};

// ThreadInfo structseq type
static PyStructSequence_Field ThreadInfo_fields[] = {
    {"thread_id", "Thread ID"},
    {"status", "Thread status (flags: HAS_GIL, ON_CPU, UNKNOWN or legacy enum)"},
    {"frame_info", "Frame information"},
    {NULL}
};

PyStructSequence_Desc ThreadInfo_desc = {
    "_remote_debugging.ThreadInfo",
    "Information about a thread",
    ThreadInfo_fields,
    3
};

// InterpreterInfo structseq type
static PyStructSequence_Field InterpreterInfo_fields[] = {
    {"interpreter_id", "Interpreter ID"},
    {"threads", "List of threads in this interpreter"},
    {NULL}
};

PyStructSequence_Desc InterpreterInfo_desc = {
    "_remote_debugging.InterpreterInfo",
    "Information about an interpreter",
    InterpreterInfo_fields,
    2
};

// AwaitedInfo structseq type
static PyStructSequence_Field AwaitedInfo_fields[] = {
    {"thread_id", "Thread ID"},
    {"awaited_by", "List of tasks awaited by this thread"},
    {NULL}
};

PyStructSequence_Desc AwaitedInfo_desc = {
    "_remote_debugging.AwaitedInfo",
    "Information about what a thread is awaiting",
    AwaitedInfo_fields,
    2
};

/* ============================================================================
 * UTILITY FUNCTIONS
 * ============================================================================ */

void
cached_code_metadata_destroy(void *ptr)
{
    CachedCodeMetadata *meta = (CachedCodeMetadata *)ptr;
    Py_DECREF(meta->func_name);
    Py_DECREF(meta->file_name);
    Py_DECREF(meta->linetable);
    PyMem_RawFree(meta);
}

RemoteDebuggingState *
RemoteDebugging_GetState(PyObject *module)
{
    void *state = _PyModule_GetState(module);
    assert(state != NULL);
    return (RemoteDebuggingState *)state;
}

RemoteDebuggingState *
RemoteDebugging_GetStateFromType(PyTypeObject *type)
{
    PyObject *module = PyType_GetModule(type);
    assert(module != NULL);
    return RemoteDebugging_GetState(module);
}

RemoteDebuggingState *
RemoteDebugging_GetStateFromObject(PyObject *obj)
{
    RemoteUnwinderObject *unwinder = (RemoteUnwinderObject *)obj;
    if (unwinder->cached_state == NULL) {
        unwinder->cached_state = RemoteDebugging_GetStateFromType(Py_TYPE(obj));
    }
    return unwinder->cached_state;
}

int
RemoteDebugging_InitState(RemoteDebuggingState *st)
{
    return 0;
}

int
is_prerelease_version(uint64_t version)
{
    return (version & 0xF0) != 0xF0;
}

int
validate_debug_offsets(struct _Py_DebugOffsets *debug_offsets)
{
    if (memcmp(debug_offsets->cookie, _Py_Debug_Cookie, sizeof(debug_offsets->cookie)) != 0) {
        // The remote is probably running a Python version predating debug offsets.
        PyErr_SetString(
            PyExc_RuntimeError,
            "Can't determine the Python version of the remote process");
        return -1;
    }

    // Assume debug offsets could change from one pre-release version to another,
    // or one minor version to another, but are stable across patch versions.
    if (is_prerelease_version(Py_Version) && Py_Version != debug_offsets->version) {
        PyErr_SetString(
            PyExc_RuntimeError,
            "Can't attach from a pre-release Python interpreter"
            " to a process running a different Python version");
        return -1;
    }

    if (is_prerelease_version(debug_offsets->version) && Py_Version != debug_offsets->version) {
        PyErr_SetString(
            PyExc_RuntimeError,
            "Can't attach to a pre-release Python interpreter"
            " from a process running a different Python version");
        return -1;
    }

    unsigned int remote_major = (debug_offsets->version >> 24) & 0xFF;
    unsigned int remote_minor = (debug_offsets->version >> 16) & 0xFF;

    if (PY_MAJOR_VERSION != remote_major || PY_MINOR_VERSION != remote_minor) {
        PyErr_Format(
            PyExc_RuntimeError,
            "Can't attach from a Python %d.%d process to a Python %d.%d process",
            PY_MAJOR_VERSION, PY_MINOR_VERSION, remote_major, remote_minor);
        return -1;
    }

    // The debug offsets differ between free threaded and non-free threaded builds.
    if (_Py_Debug_Free_Threaded && !debug_offsets->free_threaded) {
        PyErr_SetString(
            PyExc_RuntimeError,
            "Cannot attach from a free-threaded Python process"
            " to a process running a non-free-threaded version");
        return -1;
    }

    if (!_Py_Debug_Free_Threaded && debug_offsets->free_threaded) {
        PyErr_SetString(
            PyExc_RuntimeError,
            "Cannot attach to a free-threaded Python process"
            " from a process running a non-free-threaded version");
        return -1;
    }

    return 0;
}

/* ============================================================================
 * REMOTEUNWINDER CLASS IMPLEMENTATION
 * ============================================================================ */

/*[clinic input]
module _remote_debugging
class _remote_debugging.RemoteUnwinder "RemoteUnwinderObject *" "&RemoteUnwinder_Type"
[clinic start generated code]*/
/*[clinic end generated code: output=da39a3ee5e6b4b0d input=12b4dce200381115]*/

/*[clinic input]
@permit_long_summary
@permit_long_docstring_body
_remote_debugging.RemoteUnwinder.__init__
    pid: int
    *
    all_threads: bool = False
    only_active_thread: bool = False
    mode: int = 0
    debug: bool = False
    skip_non_matching_threads: bool = True
    native: bool = False
    gc: bool = False
    opcodes: bool = False
    cache_frames: bool = False
    stats: bool = False

Initialize a new RemoteUnwinder object for debugging a remote Python process.

Args:
    pid: Process ID of the target Python process to debug
    all_threads: If True, initialize state for all threads in the process.
                If False, only initialize for the main thread.
    only_active_thread: If True, only sample the thread holding the GIL.
    mode: Profiling mode: 0=WALL (wall-time), 1=CPU (cpu-time), 2=GIL (gil-time).
                       Cannot be used together with all_threads=True.
    debug: If True, chain exceptions to explain the sequence of events that
           lead to the exception.
    skip_non_matching_threads: If True, skip threads that don't match the selected mode.
                              If False, include all threads regardless of mode.
    native: If True, include artificial "<native>" frames to denote calls to
            non-Python code.
    gc: If True, include artificial "<GC>" frames to denote active garbage
        collection.
    opcodes: If True, gather bytecode opcode information for instruction-level
             profiling.
    cache_frames: If True, enable frame caching optimization to avoid re-reading
                 unchanged parent frames between samples.
    stats: If True, collect statistics about cache hits, memory reads, etc.
           Use get_stats() to retrieve the collected statistics.

The RemoteUnwinder provides functionality to inspect and debug a running Python
process, including examining thread states, stack frames and other runtime data.

Raises:
    PermissionError: If access to the target process is denied
    OSError: If unable to attach to the target process or access its memory
    RuntimeError: If unable to read debug information from the target process
    ValueError: If both all_threads and only_active_thread are True
[clinic start generated code]*/

static int
_remote_debugging_RemoteUnwinder___init___impl(RemoteUnwinderObject *self,
                                               int pid, int all_threads,
                                               int only_active_thread,
                                               int mode, int debug,
                                               int skip_non_matching_threads,
                                               int native, int gc,
                                               int opcodes, int cache_frames,
                                               int stats)
/*[clinic end generated code: output=0031f743f4b9ad52 input=8fb61b24102dec6e]*/
{
    // Validate that all_threads and only_active_thread are not both True
    if (all_threads && only_active_thread) {
        PyErr_SetString(PyExc_ValueError,
                       "all_threads and only_active_thread cannot both be True");
        return -1;
    }

#ifdef Py_GIL_DISABLED
    if (only_active_thread) {
        PyErr_SetString(PyExc_ValueError,
                       "only_active_thread is not supported in free-threaded builds");
        return -1;
    }
#endif

    self->native = native;
    self->gc = gc;
    self->opcodes = opcodes;
    self->cache_frames = cache_frames;
    self->collect_stats = stats;
    self->stale_invalidation_counter = 0;
    self->debug = debug;
    self->only_active_thread = only_active_thread;
    self->mode = mode;
    self->skip_non_matching_threads = skip_non_matching_threads;
    self->cached_state = NULL;
    self->frame_cache = NULL;
#ifdef Py_REMOTE_DEBUG_SUPPORTS_BLOCKING
    self->threads_stopped = 0;
#endif
    // Initialize stats to zero
    memset(&self->stats, 0, sizeof(self->stats));
    if (_Py_RemoteDebug_InitProcHandle(&self->handle, pid) < 0) {
        set_exception_cause(self, PyExc_RuntimeError, "Failed to initialize process handle");
        return -1;
    }

    self->runtime_start_address = _Py_RemoteDebug_GetPyRuntimeAddress(&self->handle);
    if (self->runtime_start_address == 0) {
        set_exception_cause(self, PyExc_RuntimeError, "Failed to get Python runtime address");
        return -1;
    }

    if (_Py_RemoteDebug_ReadDebugOffsets(&self->handle,
                                         &self->runtime_start_address,
                                         &self->debug_offsets) < 0)
    {
        set_exception_cause(self, PyExc_RuntimeError, "Failed to read debug offsets");
        return -1;
    }

    // Validate that the debug offsets are valid
    if (validate_debug_offsets(&self->debug_offsets) == -1) {
        set_exception_cause(self, PyExc_RuntimeError, "Invalid debug offsets found");
        return -1;
    }

    // Try to read async debug offsets, but don't fail if they're not available
    self->async_debug_offsets_available = 1;
    if (read_async_debug(self) < 0) {
        PyErr_Clear();
        memset(&self->async_debug_offsets, 0, sizeof(self->async_debug_offsets));
        self->async_debug_offsets_available = 0;
    }

    if (populate_initial_state_data(all_threads, self, self->runtime_start_address,
                    &self->interpreter_addr ,&self->tstate_addr) < 0)
    {
        set_exception_cause(self, PyExc_RuntimeError, "Failed to populate initial state data");
        return -1;
    }

    self->code_object_cache = _Py_hashtable_new_full(
        _Py_hashtable_hash_ptr,
        _Py_hashtable_compare_direct,
        NULL,  // keys are stable pointers, don't destroy
        cached_code_metadata_destroy,
        NULL
    );
    if (self->code_object_cache == NULL) {
        PyErr_NoMemory();
        set_exception_cause(self, PyExc_MemoryError, "Failed to create code object cache");
        return -1;
    }

#ifdef Py_GIL_DISABLED
    // Initialize TLBC cache
    self->tlbc_generation = 0;
    self->tlbc_cache = _Py_hashtable_new_full(
        _Py_hashtable_hash_ptr,
        _Py_hashtable_compare_direct,
        NULL,  // keys are stable pointers, don't destroy
        tlbc_cache_entry_destroy,
        NULL
    );
    if (self->tlbc_cache == NULL) {
        _Py_hashtable_destroy(self->code_object_cache);
        PyErr_NoMemory();
        set_exception_cause(self, PyExc_MemoryError, "Failed to create TLBC cache");
        return -1;
    }
#endif

#if defined(__APPLE__)
    self->thread_id_offset = 0;
#endif

#ifdef MS_WINDOWS
    self->win_process_buffer = NULL;
    self->win_process_buffer_size = 0;
#endif
#ifdef __linux__
    self->thread_tids = NULL;
    self->thread_tids_capacity = 0;
#endif

    if (cache_frames && frame_cache_init(self) < 0) {
        return -1;
    }

    // Clear stale last_profiled_frame values from previous profilers
    // This prevents us from stopping frame walking early due to stale values
    if (cache_frames) {
        clear_last_profiled_frames(self);
    }

    return 0;
}

/*[clinic input]
@permit_long_docstring_body
@critical_section
_remote_debugging.RemoteUnwinder.get_stack_trace

Returns stack traces for all interpreters and threads in process.

Each element in the returned list is a tuple of (interpreter_id, thread_list), where:
- interpreter_id is the interpreter identifier
- thread_list is a list of tuples (thread_id, frame_list) for threads in that interpreter
  - thread_id is the OS thread identifier
  - frame_list is a list of tuples (function_name, filename, line_number) representing
    the Python stack frames for that thread, ordered from most recent to oldest

The threads returned depend on the initialization parameters:
- If only_active_thread was True: returns only the thread holding the GIL across all interpreters
- If all_threads was True: returns all threads across all interpreters
- Otherwise: returns only the main thread of each interpreter

Example:
    [
        (0, [  # Main interpreter
            (1234, [
                ('process_data', 'worker.py', 127),
                ('run_worker', 'worker.py', 45),
                ('main', 'app.py', 23)
            ]),
            (1235, [
                ('handle_request', 'server.py', 89),
                ('serve_forever', 'server.py', 52)
            ])
        ]),
        (1, [  # Sub-interpreter
            (1236, [
                ('sub_worker', 'sub.py', 15)
            ])
        ])
    ]

Raises:
    RuntimeError: If there is an error copying memory from the target process
    OSError: If there is an error accessing the target process
    PermissionError: If access to the target process is denied
    UnicodeDecodeError: If there is an error decoding strings from the target process

[clinic start generated code]*/

static PyObject *
_remote_debugging_RemoteUnwinder_get_stack_trace_impl(RemoteUnwinderObject *self)
/*[clinic end generated code: output=666192b90c69d567 input=bcff01c73cccc1c0]*/
{
    STATS_INC(self, total_samples);

    PyObject* result = PyList_New(0);
    if (!result) {
        set_exception_cause(self, PyExc_MemoryError, "Failed to create stack trace result list");
        return NULL;
    }

    // Iterate over all interpreters
    uintptr_t current_interpreter = self->interpreter_addr;
    while (current_interpreter != 0) {
        // Read interpreter state to get the interpreter ID
        char interp_state_buffer[INTERP_STATE_BUFFER_SIZE];
        if (_Py_RemoteDebug_PagedReadRemoteMemory(
                &self->handle,
                current_interpreter,
                INTERP_STATE_BUFFER_SIZE,
                interp_state_buffer) < 0) {
            set_exception_cause(self, PyExc_RuntimeError, "Failed to read interpreter state buffer");
            Py_CLEAR(result);
            goto exit;
        }

        uintptr_t gc_frame = 0;
        if (self->gc) {
            gc_frame = GET_MEMBER(uintptr_t, interp_state_buffer,
                                  self->debug_offsets.interpreter_state.gc
                                  + self->debug_offsets.gc.frame);
        }

        int64_t interpreter_id = GET_MEMBER(int64_t, interp_state_buffer,
                self->debug_offsets.interpreter_state.id);

        // Get code object generation from buffer
        uint64_t code_object_generation = GET_MEMBER(uint64_t, interp_state_buffer,
                self->debug_offsets.interpreter_state.code_object_generation);

        if (code_object_generation != self->code_object_generation) {
            self->code_object_generation = code_object_generation;
            _Py_hashtable_clear(self->code_object_cache);
        }

#ifdef Py_GIL_DISABLED
        // Check TLBC generation and invalidate cache if needed
        uint32_t current_tlbc_generation = GET_MEMBER(uint32_t, interp_state_buffer,
                                                      self->debug_offsets.interpreter_state.tlbc_generation);
        if (current_tlbc_generation != self->tlbc_generation) {
            self->tlbc_generation = current_tlbc_generation;
            _Py_hashtable_clear(self->tlbc_cache);
        }
#endif

        // Create a list to hold threads for this interpreter
        PyObject *interpreter_threads = PyList_New(0);
        if (!interpreter_threads) {
            set_exception_cause(self, PyExc_MemoryError, "Failed to create interpreter threads list");
            Py_CLEAR(result);
            goto exit;
        }

        // Get the GIL holder for this interpreter (needed for GIL_WAIT logic)
        uintptr_t gil_holder_tstate = 0;
        int gil_locked = GET_MEMBER(int, interp_state_buffer,
            self->debug_offsets.interpreter_state.gil_runtime_state_locked);
        if (gil_locked) {
            gil_holder_tstate = (uintptr_t)GET_MEMBER(PyThreadState*, interp_state_buffer,
                self->debug_offsets.interpreter_state.gil_runtime_state_holder);
        }

        uintptr_t current_tstate;
        if (self->only_active_thread) {
            // Find the GIL holder for THIS interpreter
            if (!gil_locked) {
                // This interpreter's GIL is not locked, skip it
                Py_DECREF(interpreter_threads);
                goto next_interpreter;
            }

            current_tstate = gil_holder_tstate;
        } else if (self->tstate_addr == 0) {
            // Get all threads for this interpreter
            current_tstate = GET_MEMBER(uintptr_t, interp_state_buffer,
                    self->debug_offsets.interpreter_state.threads_head);
        } else {
            // Target specific thread (only process first interpreter)
            current_tstate = self->tstate_addr;
        }

        while (current_tstate != 0) {
            uintptr_t prev_tstate = current_tstate;
            PyObject* frame_info = unwind_stack_for_thread(self, &current_tstate,
                                                           gil_holder_tstate,
                                                           gc_frame);
            if (!frame_info) {
                // Check if this was an intentional skip due to mode-based filtering
                if ((self->mode == PROFILING_MODE_CPU || self->mode == PROFILING_MODE_GIL ||
                     self->mode == PROFILING_MODE_EXCEPTION) && !PyErr_Occurred()) {
                    // Detect cycle: if current_tstate didn't advance, we have corrupted data
                    if (current_tstate == prev_tstate) {
                        Py_DECREF(interpreter_threads);
                        set_exception_cause(self, PyExc_RuntimeError,
                            "Thread list cycle detected (corrupted remote memory)");
                        Py_CLEAR(result);
                        goto exit;
                    }
                    // Thread was skipped due to mode filtering, continue to next thread
                    continue;
                }
                // This was an actual error
                Py_DECREF(interpreter_threads);
                set_exception_cause(self, PyExc_RuntimeError, "Failed to unwind stack for thread");
                Py_CLEAR(result);
                goto exit;
            }

            if (PyList_Append(interpreter_threads, frame_info) == -1) {
                Py_DECREF(frame_info);
                Py_DECREF(interpreter_threads);
                set_exception_cause(self, PyExc_RuntimeError, "Failed to append thread frame info");
                Py_CLEAR(result);
                goto exit;
            }
            Py_DECREF(frame_info);

            // If targeting specific thread or only active thread, process just one
            if (self->tstate_addr || self->only_active_thread) {
                break;
            }
        }

        // Create the InterpreterInfo StructSequence
        RemoteDebuggingState *state = RemoteDebugging_GetStateFromObject((PyObject*)self);
        PyObject *interpreter_info = PyStructSequence_New(state->InterpreterInfo_Type);
        if (!interpreter_info) {
            Py_DECREF(interpreter_threads);
            set_exception_cause(self, PyExc_MemoryError, "Failed to create InterpreterInfo");
            Py_CLEAR(result);
            goto exit;
        }

        PyObject *interp_id = PyLong_FromLongLong(interpreter_id);
        if (!interp_id) {
            Py_DECREF(interpreter_threads);
            Py_DECREF(interpreter_info);
            set_exception_cause(self, PyExc_MemoryError, "Failed to create interpreter ID");
            Py_CLEAR(result);
            goto exit;
        }

        PyStructSequence_SetItem(interpreter_info, 0, interp_id);  // steals reference
        PyStructSequence_SetItem(interpreter_info, 1, interpreter_threads);  // steals reference

        // Add this interpreter to the result list
        if (PyList_Append(result, interpreter_info) == -1) {
            Py_DECREF(interpreter_info);
            set_exception_cause(self, PyExc_RuntimeError, "Failed to append interpreter info");
            Py_CLEAR(result);
            goto exit;
        }
        Py_DECREF(interpreter_info);

next_interpreter:

        // Get the next interpreter address
        current_interpreter = GET_MEMBER(uintptr_t, interp_state_buffer,
                self->debug_offsets.interpreter_state.next);

        // If we're targeting a specific thread, stop after first interpreter
        if (self->tstate_addr != 0) {
            break;
        }
    }

exit:
    // Invalidate cache entries for threads not seen in this sample.
    // Only do this every 1024 iterations to avoid performance overhead.
    if (self->cache_frames && result) {
        if (++self->stale_invalidation_counter >= 1024) {
            self->stale_invalidation_counter = 0;
            frame_cache_invalidate_stale(self, result);
        }
    }
    _Py_RemoteDebug_ClearCache(&self->handle);
    return result;
}

/*[clinic input]
@permit_long_summary
@permit_long_docstring_body
@critical_section
_remote_debugging.RemoteUnwinder.get_all_awaited_by

Get all tasks and their awaited_by relationships from the remote process.

This provides a tree structure showing which tasks are waiting for other tasks.

For each task, returns:
1. The call stack frames leading to where the task is currently executing
2. The name of the task
3. A list of tasks that this task is waiting for, with their own frames/names/etc

Returns a list of [frames, task_name, subtasks] where:
- frames: List of (func_name, filename, lineno) showing the call stack
- task_name: String identifier for the task
- subtasks: List of tasks being awaited by this task, in same format

Raises:
    RuntimeError: If AsyncioDebug section is not available in the remote process
    MemoryError: If memory allocation fails
    OSError: If reading from the remote process fails

Example output:
[
    # Task c2_root waiting for two subtasks
    [
        # Call stack of c2_root
        [("c5", "script.py", 10), ("c4", "script.py", 14)],
        "c2_root",
        [
            # First subtask (sub_main_2) and what it's waiting for
            [
                [("c1", "script.py", 23)],
                "sub_main_2",
                [...]
            ],
            # Second subtask and its waiters
            [...]
        ]
    ]
]
[clinic start generated code]*/

static PyObject *
_remote_debugging_RemoteUnwinder_get_all_awaited_by_impl(RemoteUnwinderObject *self)
/*[clinic end generated code: output=6a49cd345e8aec53 input=307f754cbe38250c]*/
{
    if (ensure_async_debug_offsets(self) < 0) {
        return NULL;
    }

    PyObject *result = PyList_New(0);
    if (result == NULL) {
        set_exception_cause(self, PyExc_MemoryError, "Failed to create awaited_by result list");
        goto result_err;
    }

    // Process all threads
    if (iterate_threads(self, process_thread_for_awaited_by, result) < 0) {
        goto result_err;
    }

    uintptr_t head_addr = self->interpreter_addr
        + (uintptr_t)self->async_debug_offsets.asyncio_interpreter_state.asyncio_tasks_head;

    // On top of a per-thread task lists used by default by asyncio to avoid
    // contention, there is also a fallback per-interpreter list of tasks;
    // any tasks still pending when a thread is destroyed will be moved to the
    // per-interpreter task list.  It's unlikely we'll find anything here, but
    // interesting for debugging.
    if (append_awaited_by(self, 0, head_addr, result))
    {
        set_exception_cause(self, PyExc_RuntimeError, "Failed to append interpreter awaited_by in get_all_awaited_by");
        goto result_err;
    }

    _Py_RemoteDebug_ClearCache(&self->handle);
    return result;

result_err:
    _Py_RemoteDebug_ClearCache(&self->handle);
    Py_XDECREF(result);
    return NULL;
}

/*[clinic input]
@permit_long_summary
@permit_long_docstring_body
@critical_section
_remote_debugging.RemoteUnwinder.get_async_stack_trace

Get the currently running async tasks and their dependency graphs from the remote process.

This returns information about running tasks and all tasks that are waiting for them,
forming a complete dependency graph for each thread's active task.

For each thread with a running task, returns the running task plus all tasks that
transitively depend on it (tasks waiting for the running task, tasks waiting for
those tasks, etc.).

Returns a list of per-thread results, where each thread result contains:
- Thread ID
- List of task information for the running task and all its waiters

Each task info contains:
- Task ID (memory address)
- Task name
- Call stack frames: List of (func_name, filename, lineno)
- List of tasks waiting for this task (recursive structure)

Raises:
    RuntimeError: If AsyncioDebug section is not available in the target process
    MemoryError: If memory allocation fails
    OSError: If reading from the remote process fails

Example output (similar structure to get_all_awaited_by but only for running tasks):
[
    # Thread 140234 results
    (140234, [
        # Running task and its complete waiter dependency graph
        (4345585712, 'main_task',
         [("run_server", "server.py", 127), ("main", "app.py", 23)],
         [
             # Tasks waiting for main_task
             (4345585800, 'worker_1', [...], [...]),
             (4345585900, 'worker_2', [...], [...])
         ])
    ])
]

[clinic start generated code]*/

static PyObject *
_remote_debugging_RemoteUnwinder_get_async_stack_trace_impl(RemoteUnwinderObject *self)
/*[clinic end generated code: output=6433d52b55e87bbe input=6129b7d509a887c9]*/
{
    if (ensure_async_debug_offsets(self) < 0) {
        return NULL;
    }

    PyObject *result = PyList_New(0);
    if (result == NULL) {
        set_exception_cause(self, PyExc_MemoryError, "Failed to create result list in get_async_stack_trace");
        return NULL;
    }

    // Process all threads
    if (iterate_threads(self, process_thread_for_async_stack_trace, result) < 0) {
        goto result_err;
    }

    _Py_RemoteDebug_ClearCache(&self->handle);
    return result;
result_err:
    _Py_RemoteDebug_ClearCache(&self->handle);
    Py_XDECREF(result);
    return NULL;
}

/*[clinic input]
@permit_long_docstring_body
@critical_section
_remote_debugging.RemoteUnwinder.get_stats

Get collected statistics about profiling performance.

Returns a dictionary containing statistics about cache performance,
memory reads, and other profiling metrics. Only available if the
RemoteUnwinder was created with stats=True.

Returns:
    dict: A dictionary containing:
        - total_samples: Total number of get_stack_trace calls
        - frame_cache_hits: Full cache hits (entire stack unchanged)
        - frame_cache_misses: Cache misses requiring full walk
        - frame_cache_partial_hits: Partial hits (stopped at cached frame)
        - frames_read_from_cache: Total frames retrieved from cache
        - frames_read_from_memory: Total frames read from remote memory
        - memory_reads: Total remote memory read operations
        - memory_bytes_read: Total bytes read from remote memory
        - code_object_cache_hits: Code object cache hits
        - code_object_cache_misses: Code object cache misses
        - stale_cache_invalidations: Times stale cache entries were cleared
        - frame_cache_hit_rate: Percentage of samples that hit the cache
        - code_object_cache_hit_rate: Percentage of code object lookups that hit cache

Raises:
    RuntimeError: If stats collection was not enabled (stats=False)
[clinic start generated code]*/

static PyObject *
_remote_debugging_RemoteUnwinder_get_stats_impl(RemoteUnwinderObject *self)
/*[clinic end generated code: output=21e36477122be2a0 input=75fef4134c12a8c9]*/
{
    if (!self->collect_stats) {
        PyErr_SetString(PyExc_RuntimeError,
                       "Statistics collection was not enabled. "
                       "Create RemoteUnwinder with stats=True to collect statistics.");
        return NULL;
    }

    PyObject *result = PyDict_New();
    if (!result) {
        return NULL;
    }

#define ADD_STAT(name) do { \
    PyObject *val = PyLong_FromUnsignedLongLong(self->stats.name); \
    if (!val || PyDict_SetItemString(result, #name, val) < 0) { \
        Py_XDECREF(val); \
        Py_DECREF(result); \
        return NULL; \
    } \
    Py_DECREF(val); \
} while(0)

    ADD_STAT(total_samples);
    ADD_STAT(frame_cache_hits);
    ADD_STAT(frame_cache_misses);
    ADD_STAT(frame_cache_partial_hits);
    ADD_STAT(frames_read_from_cache);
    ADD_STAT(frames_read_from_memory);
    ADD_STAT(memory_reads);
    ADD_STAT(memory_bytes_read);
    ADD_STAT(code_object_cache_hits);
    ADD_STAT(code_object_cache_misses);
    ADD_STAT(stale_cache_invalidations);

#undef ADD_STAT

    // Calculate and add derived statistics
    // Hit rate is calculated as (hits + partial_hits) / total_cache_lookups
    double frame_cache_hit_rate = 0.0;
    uint64_t total_cache_lookups = self->stats.frame_cache_hits + self->stats.frame_cache_partial_hits + self->stats.frame_cache_misses;
    if (total_cache_lookups > 0) {
        frame_cache_hit_rate = 100.0 * (double)(self->stats.frame_cache_hits + self->stats.frame_cache_partial_hits)
                               / (double)total_cache_lookups;
    }
    PyObject *hit_rate = PyFloat_FromDouble(frame_cache_hit_rate);
    if (!hit_rate || PyDict_SetItemString(result, "frame_cache_hit_rate", hit_rate) < 0) {
        Py_XDECREF(hit_rate);
        Py_DECREF(result);
        return NULL;
    }
    Py_DECREF(hit_rate);

    double code_object_hit_rate = 0.0;
    uint64_t total_code_lookups = self->stats.code_object_cache_hits + self->stats.code_object_cache_misses;
    if (total_code_lookups > 0) {
        code_object_hit_rate = 100.0 * (double)self->stats.code_object_cache_hits / (double)total_code_lookups;
    }
    PyObject *code_hit_rate = PyFloat_FromDouble(code_object_hit_rate);
    if (!code_hit_rate || PyDict_SetItemString(result, "code_object_cache_hit_rate", code_hit_rate) < 0) {
        Py_XDECREF(code_hit_rate);
        Py_DECREF(result);
        return NULL;
    }
    Py_DECREF(code_hit_rate);

    return result;
}

/*[clinic input]
@permit_long_docstring_body
@critical_section
_remote_debugging.RemoteUnwinder.pause_threads

Pause all threads in the target process.

This stops all threads in the target process to allow for consistent
memory reads during sampling. Must be paired with a call to resume_threads().

Returns True if threads were successfully paused, False if they were already paused.

Raises:
    RuntimeError: If there is an error stopping the threads
[clinic start generated code]*/

static PyObject *
_remote_debugging_RemoteUnwinder_pause_threads_impl(RemoteUnwinderObject *self)
/*[clinic end generated code: output=aaf2bdc0a725750c input=d8a266f19a81c67e]*/
{
#ifdef Py_REMOTE_DEBUG_SUPPORTS_BLOCKING
    if (self->threads_stopped) {
        Py_RETURN_FALSE;
    }

    _Py_RemoteDebug_InitThreadsState(self, &self->threads_state);
    if (_Py_RemoteDebug_StopAllThreads(self, &self->threads_state) < 0) {
        return NULL;
    }

    self->threads_stopped = 1;
    Py_RETURN_TRUE;
#else
    PyErr_SetString(PyExc_NotImplementedError,
                    "pause_threads is not supported on this platform");
    return NULL;
#endif
}

/*[clinic input]
@permit_long_docstring_body
@critical_section
_remote_debugging.RemoteUnwinder.resume_threads

Resume all threads in the target process.

This resumes threads that were previously paused with pause_threads().

Returns True if threads were successfully resumed, False if they were not paused.
[clinic start generated code]*/

static PyObject *
_remote_debugging_RemoteUnwinder_resume_threads_impl(RemoteUnwinderObject *self)
/*[clinic end generated code: output=8d6781ea37095536 input=16baaaab007f4259]*/
{
#ifdef Py_REMOTE_DEBUG_SUPPORTS_BLOCKING
    if (!self->threads_stopped) {
        Py_RETURN_FALSE;
    }

    _Py_RemoteDebug_ResumeAllThreads(self, &self->threads_state);
    self->threads_stopped = 0;
    Py_RETURN_TRUE;
#else
    PyErr_SetString(PyExc_NotImplementedError,
                    "resume_threads is not supported on this platform");
    return NULL;
#endif
}

static PyMethodDef RemoteUnwinder_methods[] = {
    _REMOTE_DEBUGGING_REMOTEUNWINDER_GET_STACK_TRACE_METHODDEF
    _REMOTE_DEBUGGING_REMOTEUNWINDER_GET_ALL_AWAITED_BY_METHODDEF
    _REMOTE_DEBUGGING_REMOTEUNWINDER_GET_ASYNC_STACK_TRACE_METHODDEF
    _REMOTE_DEBUGGING_REMOTEUNWINDER_GET_STATS_METHODDEF
    _REMOTE_DEBUGGING_REMOTEUNWINDER_PAUSE_THREADS_METHODDEF
    _REMOTE_DEBUGGING_REMOTEUNWINDER_RESUME_THREADS_METHODDEF
    {NULL, NULL}
};

static void
RemoteUnwinder_dealloc(PyObject *op)
{
    RemoteUnwinderObject *self = RemoteUnwinder_CAST(op);
    PyTypeObject *tp = Py_TYPE(self);

#ifdef Py_REMOTE_DEBUG_SUPPORTS_BLOCKING
    if (self->threads_stopped) {
        _Py_RemoteDebug_ResumeAllThreads(self, &self->threads_state);
        self->threads_stopped = 0;
    }
#endif
#ifdef __linux__
    if (self->thread_tids != NULL) {
        PyMem_RawFree(self->thread_tids);
        self->thread_tids = NULL;
    }
#endif

    if (self->code_object_cache) {
        _Py_hashtable_destroy(self->code_object_cache);
    }
#ifdef MS_WINDOWS
    if (self->win_process_buffer != NULL) {
        PyMem_Free(self->win_process_buffer);
    }
#endif

#ifdef Py_GIL_DISABLED
    if (self->tlbc_cache) {
        _Py_hashtable_destroy(self->tlbc_cache);
    }
#endif
    if (self->handle.pid != 0) {
        _Py_RemoteDebug_ClearCache(&self->handle);
        _Py_RemoteDebug_CleanupProcHandle(&self->handle);
    }
    frame_cache_cleanup(self);
    PyObject_Del(self);
    Py_DECREF(tp);
}

static PyType_Slot RemoteUnwinder_slots[] = {
    {Py_tp_doc, (void *)"RemoteUnwinder(pid): Inspect stack of a remote Python process."},
    {Py_tp_methods, RemoteUnwinder_methods},
    {Py_tp_init, _remote_debugging_RemoteUnwinder___init__},
    {Py_tp_dealloc, RemoteUnwinder_dealloc},
    {0, NULL}
};

static PyType_Spec RemoteUnwinder_spec = {
    .name = "_remote_debugging.RemoteUnwinder",
    .basicsize = sizeof(RemoteUnwinderObject),
    .flags = (
        Py_TPFLAGS_DEFAULT
        | Py_TPFLAGS_IMMUTABLETYPE
    ),
    .slots = RemoteUnwinder_slots,
};

/* Forward declarations for type specs defined later */
static PyType_Spec BinaryWriter_spec;
static PyType_Spec BinaryReader_spec;

/* ============================================================================
 * MODULE INITIALIZATION
 * ============================================================================ */

static int
_remote_debugging_exec(PyObject *m)
{
    RemoteDebuggingState *st = RemoteDebugging_GetState(m);
#define CREATE_TYPE(mod, type, spec)                                        \
    do {                                                                    \
        type = (PyTypeObject *)PyType_FromMetaclass(NULL, mod, spec, NULL); \
        if (type == NULL) {                                                 \
            return -1;                                                      \
        }                                                                   \
    } while (0)

    CREATE_TYPE(m, st->RemoteDebugging_Type, &RemoteUnwinder_spec);

    if (PyModule_AddType(m, st->RemoteDebugging_Type) < 0) {
        return -1;
    }

    // Initialize structseq types
    st->TaskInfo_Type = PyStructSequence_NewType(&TaskInfo_desc);
    if (st->TaskInfo_Type == NULL) {
        return -1;
    }
    if (PyModule_AddType(m, st->TaskInfo_Type) < 0) {
        return -1;
    }

    st->LocationInfo_Type = PyStructSequence_NewType(&LocationInfo_desc);
    if (st->LocationInfo_Type == NULL) {
        return -1;
    }
    if (PyModule_AddType(m, st->LocationInfo_Type) < 0) {
        return -1;
    }

    st->FrameInfo_Type = PyStructSequence_NewType(&FrameInfo_desc);
    if (st->FrameInfo_Type == NULL) {
        return -1;
    }
    if (PyModule_AddType(m, st->FrameInfo_Type) < 0) {
        return -1;
    }

    st->CoroInfo_Type = PyStructSequence_NewType(&CoroInfo_desc);
    if (st->CoroInfo_Type == NULL) {
        return -1;
    }
    if (PyModule_AddType(m, st->CoroInfo_Type) < 0) {
        return -1;
    }

    st->ThreadInfo_Type = PyStructSequence_NewType(&ThreadInfo_desc);
    if (st->ThreadInfo_Type == NULL) {
        return -1;
    }
    if (PyModule_AddType(m, st->ThreadInfo_Type) < 0) {
        return -1;
    }

    st->InterpreterInfo_Type = PyStructSequence_NewType(&InterpreterInfo_desc);
    if (st->InterpreterInfo_Type == NULL) {
        return -1;
    }
    if (PyModule_AddType(m, st->InterpreterInfo_Type) < 0) {
        return -1;
    }

    st->AwaitedInfo_Type = PyStructSequence_NewType(&AwaitedInfo_desc);
    if (st->AwaitedInfo_Type == NULL) {
        return -1;
    }
    if (PyModule_AddType(m, st->AwaitedInfo_Type) < 0) {
        return -1;
    }

    // Create BinaryWriter and BinaryReader types
    CREATE_TYPE(m, st->BinaryWriter_Type, &BinaryWriter_spec);
    if (PyModule_AddType(m, st->BinaryWriter_Type) < 0) {
        return -1;
    }

    CREATE_TYPE(m, st->BinaryReader_Type, &BinaryReader_spec);
    if (PyModule_AddType(m, st->BinaryReader_Type) < 0) {
        return -1;
    }

#ifdef Py_GIL_DISABLED
    PyUnstable_Module_SetGIL(m, Py_MOD_GIL_NOT_USED);
#endif
    int rc = PyModule_AddIntConstant(m, "PROCESS_VM_READV_SUPPORTED", HAVE_PROCESS_VM_READV);
    if (rc < 0) {
        return -1;
    }

    // Add thread status flag constants
    if (PyModule_AddIntConstant(m, "THREAD_STATUS_HAS_GIL", THREAD_STATUS_HAS_GIL) < 0) {
        return -1;
    }
    if (PyModule_AddIntConstant(m, "THREAD_STATUS_ON_CPU", THREAD_STATUS_ON_CPU) < 0) {
        return -1;
    }
    if (PyModule_AddIntConstant(m, "THREAD_STATUS_UNKNOWN", THREAD_STATUS_UNKNOWN) < 0) {
        return -1;
    }
    if (PyModule_AddIntConstant(m, "THREAD_STATUS_GIL_REQUESTED", THREAD_STATUS_GIL_REQUESTED) < 0) {
        return -1;
    }
    if (PyModule_AddIntConstant(m, "THREAD_STATUS_HAS_EXCEPTION", THREAD_STATUS_HAS_EXCEPTION) < 0) {
        return -1;
    }

    if (RemoteDebugging_InitState(st) < 0) {
        return -1;
    }
    return 0;
}

static int
remote_debugging_traverse(PyObject *mod, visitproc visit, void *arg)
{
    RemoteDebuggingState *state = RemoteDebugging_GetState(mod);
    Py_VISIT(state->RemoteDebugging_Type);
    Py_VISIT(state->TaskInfo_Type);
    Py_VISIT(state->LocationInfo_Type);
    Py_VISIT(state->FrameInfo_Type);
    Py_VISIT(state->CoroInfo_Type);
    Py_VISIT(state->ThreadInfo_Type);
    Py_VISIT(state->InterpreterInfo_Type);
    Py_VISIT(state->AwaitedInfo_Type);
    Py_VISIT(state->BinaryWriter_Type);
    Py_VISIT(state->BinaryReader_Type);
    return 0;
}

static int
remote_debugging_clear(PyObject *mod)
{
    RemoteDebuggingState *state = RemoteDebugging_GetState(mod);
    Py_CLEAR(state->RemoteDebugging_Type);
    Py_CLEAR(state->TaskInfo_Type);
    Py_CLEAR(state->LocationInfo_Type);
    Py_CLEAR(state->FrameInfo_Type);
    Py_CLEAR(state->CoroInfo_Type);
    Py_CLEAR(state->ThreadInfo_Type);
    Py_CLEAR(state->InterpreterInfo_Type);
    Py_CLEAR(state->AwaitedInfo_Type);
    Py_CLEAR(state->BinaryWriter_Type);
    Py_CLEAR(state->BinaryReader_Type);
    return 0;
}

static void
remote_debugging_free(void *mod)
{
    (void)remote_debugging_clear((PyObject *)mod);
}

/* ============================================================================
 * BINARY WRITER CLASS
 * ============================================================================ */

#define BinaryWriter_CAST(op) ((BinaryWriterObject *)(op))

/*[clinic input]
class _remote_debugging.BinaryWriter "BinaryWriterObject *" "&PyBinaryWriter_Type"
[clinic start generated code]*/
/*[clinic end generated code: output=da39a3ee5e6b4b0d input=e948838b90a2003c]*/

/*[clinic input]
@permit_long_docstring_body
_remote_debugging.BinaryWriter.__init__
    filename: str
    sample_interval_us: unsigned_long_long
    start_time_us: unsigned_long_long
    *
    compression: int = 0

High-performance binary writer for profiling data.

Arguments:
    filename: Path to output file
    sample_interval_us: Sampling interval in microseconds
    start_time_us: Start timestamp in microseconds (from time.monotonic() * 1e6)
    compression: 0=none, 1=zstd (default: 0)

Use as a context manager or call finalize() when done.
[clinic start generated code]*/

static int
_remote_debugging_BinaryWriter___init___impl(BinaryWriterObject *self,
                                             const char *filename,
                                             unsigned long long sample_interval_us,
                                             unsigned long long start_time_us,
                                             int compression)
/*[clinic end generated code: output=014c0306f1bacf4b input=3bdf01c1cc2f5a1d]*/
{
    if (self->writer) {
        binary_writer_destroy(self->writer);
    }

    self->writer = binary_writer_create(filename, sample_interval_us, compression, start_time_us);
    if (!self->writer) {
        return -1;
    }

    return 0;
}

/*[clinic input]
@permit_long_docstring_body
_remote_debugging.BinaryWriter.write_sample
    stack_frames: object
    timestamp_us: unsigned_long_long

Write a sample to the binary file.

Arguments:
    stack_frames: List of InterpreterInfo objects
    timestamp_us: Current timestamp in microseconds (from time.monotonic() * 1e6)
[clinic start generated code]*/

static PyObject *
_remote_debugging_BinaryWriter_write_sample_impl(BinaryWriterObject *self,
                                                 PyObject *stack_frames,
                                                 unsigned long long timestamp_us)
/*[clinic end generated code: output=24d5b86679b4128f input=4e6d832d360bea46]*/
{
    if (!self->writer) {
        PyErr_SetString(PyExc_ValueError, "Writer is closed");
        return NULL;
    }

    if (binary_writer_write_sample(self->writer, stack_frames, timestamp_us) < 0) {
        return NULL;
    }

    Py_RETURN_NONE;
}

/*[clinic input]
_remote_debugging.BinaryWriter.finalize

Finalize and close the binary file.

Writes string/frame tables, footer, and updates header.
[clinic start generated code]*/

static PyObject *
_remote_debugging_BinaryWriter_finalize_impl(BinaryWriterObject *self)
/*[clinic end generated code: output=3534b88c6628de88 input=c02191750682f6a2]*/
{
    if (!self->writer) {
        PyErr_SetString(PyExc_ValueError, "Writer is already closed");
        return NULL;
    }

    /* Save total_samples before finalizing */
    self->cached_total_samples = self->writer->total_samples;

    if (binary_writer_finalize(self->writer) < 0) {
        return NULL;
    }

    binary_writer_destroy(self->writer);
    self->writer = NULL;

    Py_RETURN_NONE;
}

/*[clinic input]
_remote_debugging.BinaryWriter.close

Close the writer without finalizing (discards data).
[clinic start generated code]*/

static PyObject *
_remote_debugging_BinaryWriter_close_impl(BinaryWriterObject *self)
/*[clinic end generated code: output=9571bb2256fd1fd2 input=6e0da206e60daf16]*/
{
    if (self->writer) {
        binary_writer_destroy(self->writer);
        self->writer = NULL;
    }
    Py_RETURN_NONE;
}

/*[clinic input]
_remote_debugging.BinaryWriter.__enter__

Enter context manager.
[clinic start generated code]*/

static PyObject *
_remote_debugging_BinaryWriter___enter___impl(BinaryWriterObject *self)
/*[clinic end generated code: output=8eb95f61daf2d120 input=8ef14ee18da561d2]*/
{
    Py_INCREF(self);
    return (PyObject *)self;
}

/*[clinic input]
_remote_debugging.BinaryWriter.__exit__
    exc_type: object = None
    exc_val: object = None
    exc_tb: object = None

Exit context manager, finalizing the file.
[clinic start generated code]*/

static PyObject *
_remote_debugging_BinaryWriter___exit___impl(BinaryWriterObject *self,
                                             PyObject *exc_type,
                                             PyObject *exc_val,
                                             PyObject *exc_tb)
/*[clinic end generated code: output=61831f47c72a53c6 input=12334ce1009af37f]*/
{
    if (self->writer) {
        /* Only finalize on normal exit (no exception) */
        if (exc_type == Py_None) {
            if (binary_writer_finalize(self->writer) < 0) {
                binary_writer_destroy(self->writer);
                self->writer = NULL;
                return NULL;
            }
        }
        binary_writer_destroy(self->writer);
        self->writer = NULL;
    }
    Py_RETURN_FALSE;
}

/*[clinic input]
@permit_long_docstring_body
_remote_debugging.BinaryWriter.get_stats

Get encoding statistics for the writer.

Returns a dict with encoding statistics including repeat/full/suffix/pop-push
record counts, frames written/saved, and compression ratio.
[clinic start generated code]*/

static PyObject *
_remote_debugging_BinaryWriter_get_stats_impl(BinaryWriterObject *self)
/*[clinic end generated code: output=06522cd52544df89 input=837c874ffdebd24c]*/
{
    if (!self->writer) {
        PyErr_SetString(PyExc_ValueError, "Writer is closed");
        return NULL;
    }
    return binary_writer_get_stats(self->writer);
}

static PyObject *
BinaryWriter_get_total_samples(BinaryWriterObject *self, void *closure)
{
    if (!self->writer) {
        /* Use cached value after finalize/close */
        return PyLong_FromUnsignedLong(self->cached_total_samples);
    }
    return PyLong_FromUnsignedLong(self->writer->total_samples);
}

static PyGetSetDef BinaryWriter_getset[] = {
    {"total_samples", (getter)BinaryWriter_get_total_samples, NULL, "Total samples written", NULL},
    {NULL}
};

static PyMethodDef BinaryWriter_methods[] = {
    _REMOTE_DEBUGGING_BINARYWRITER_WRITE_SAMPLE_METHODDEF
    _REMOTE_DEBUGGING_BINARYWRITER_FINALIZE_METHODDEF
    _REMOTE_DEBUGGING_BINARYWRITER_CLOSE_METHODDEF
    _REMOTE_DEBUGGING_BINARYWRITER___ENTER___METHODDEF
    _REMOTE_DEBUGGING_BINARYWRITER___EXIT___METHODDEF
    _REMOTE_DEBUGGING_BINARYWRITER_GET_STATS_METHODDEF
    {NULL, NULL, 0, NULL}
};

static void
BinaryWriter_dealloc(PyObject *op)
{
    BinaryWriterObject *self = BinaryWriter_CAST(op);
    PyTypeObject *tp = Py_TYPE(self);
    if (self->writer) {
        binary_writer_destroy(self->writer);
    }
    tp->tp_free(self);
    Py_DECREF(tp);
}

static PyType_Slot BinaryWriter_slots[] = {
    {Py_tp_getset, BinaryWriter_getset},
    {Py_tp_methods, BinaryWriter_methods},
    {Py_tp_init, _remote_debugging_BinaryWriter___init__},
    {Py_tp_dealloc, BinaryWriter_dealloc},
    {0, NULL}
};

static PyType_Spec BinaryWriter_spec = {
    .name = "_remote_debugging.BinaryWriter",
    .basicsize = sizeof(BinaryWriterObject),
    .flags = (
        Py_TPFLAGS_DEFAULT
        | Py_TPFLAGS_IMMUTABLETYPE
    ),
    .slots = BinaryWriter_slots,
};

/* ============================================================================
 * BINARY READER CLASS
 * ============================================================================ */

#define BinaryReader_CAST(op) ((BinaryReaderObject *)(op))

/*[clinic input]
class _remote_debugging.BinaryReader "BinaryReaderObject *" "&PyBinaryReader_Type"
[clinic start generated code]*/
/*[clinic end generated code: output=da39a3ee5e6b4b0d input=36400aaf6f53216d]*/

/*[clinic input]
_remote_debugging.BinaryReader.__init__
    filename: str

High-performance binary reader for profiling data.

Arguments:
    filename: Path to input file

Use as a context manager or call close() when done.
[clinic start generated code]*/

static int
_remote_debugging_BinaryReader___init___impl(BinaryReaderObject *self,
                                             const char *filename)
/*[clinic end generated code: output=9699226f7ae052bb input=4201f9cc500ef2f6]*/
{
    if (self->reader) {
        binary_reader_close(self->reader);
    }

    self->reader = binary_reader_open(filename);
    if (!self->reader) {
        return -1;
    }

    return 0;
}

/*[clinic input]
_remote_debugging.BinaryReader.replay
    collector: object
    progress_callback: object = None

Replay samples through a collector.

Arguments:
    collector: Collector object with collect() method
    progress_callback: Optional callable(current, total)

Returns:
    Number of samples replayed
[clinic start generated code]*/

static PyObject *
_remote_debugging_BinaryReader_replay_impl(BinaryReaderObject *self,
                                           PyObject *collector,
                                           PyObject *progress_callback)
/*[clinic end generated code: output=442345562574b61c input=ebb687aed3e0f4f1]*/
{
    if (!self->reader) {
        PyErr_SetString(PyExc_ValueError, "Reader is closed");
        return NULL;
    }

    Py_ssize_t replayed = binary_reader_replay(self->reader, collector, progress_callback);
    if (replayed < 0) {
        return NULL;
    }

    return PyLong_FromSsize_t(replayed);
}

/*[clinic input]
_remote_debugging.BinaryReader.get_info

Get metadata about the binary file.

Returns:
    Dict with file metadata
[clinic start generated code]*/

static PyObject *
_remote_debugging_BinaryReader_get_info_impl(BinaryReaderObject *self)
/*[clinic end generated code: output=7f641fbd39147391 input=02e75e39c8a6cd1f]*/
{
    if (!self->reader) {
        PyErr_SetString(PyExc_ValueError, "Reader is closed");
        return NULL;
    }

    return binary_reader_get_info(self->reader);
}

/*[clinic input]
_remote_debugging.BinaryReader.get_stats

Get reconstruction statistics from replay.

Returns a dict with statistics about record types decoded and samples
reconstructed during replay.
[clinic start generated code]*/

static PyObject *
_remote_debugging_BinaryReader_get_stats_impl(BinaryReaderObject *self)
/*[clinic end generated code: output=628b9ab5e4c4fd36 input=d8dd6654abd6c3c0]*/
{
    if (!self->reader) {
        PyErr_SetString(PyExc_ValueError, "Reader is closed");
        return NULL;
    }
    return binary_reader_get_stats(self->reader);
}

/*[clinic input]
_remote_debugging.BinaryReader.close

Close the reader and free resources.
[clinic start generated code]*/

static PyObject *
_remote_debugging_BinaryReader_close_impl(BinaryReaderObject *self)
/*[clinic end generated code: output=ad0238cf5240b4f8 input=b919a66c737712d5]*/
{
    if (self->reader) {
        binary_reader_close(self->reader);
        self->reader = NULL;
    }
    Py_RETURN_NONE;
}

/*[clinic input]
_remote_debugging.BinaryReader.__enter__

Enter context manager.
[clinic start generated code]*/

static PyObject *
_remote_debugging_BinaryReader___enter___impl(BinaryReaderObject *self)
/*[clinic end generated code: output=fade133538e93817 input=4794844c9efdc4f6]*/
{
    Py_INCREF(self);
    return (PyObject *)self;
}

/*[clinic input]
_remote_debugging.BinaryReader.__exit__
    exc_type: object = None
    exc_val: object = None
    exc_tb: object = None

Exit context manager, closing the file.
[clinic start generated code]*/

static PyObject *
_remote_debugging_BinaryReader___exit___impl(BinaryReaderObject *self,
                                             PyObject *exc_type,
                                             PyObject *exc_val,
                                             PyObject *exc_tb)
/*[clinic end generated code: output=2acdd36cfdc14e4a input=87284243d7935835]*/
{
    if (self->reader) {
        binary_reader_close(self->reader);
        self->reader = NULL;
    }
    Py_RETURN_FALSE;
}

static PyObject *
BinaryReader_get_sample_count(BinaryReaderObject *self, void *closure)
{
    if (!self->reader) {
        return PyLong_FromLong(0);
    }
    return PyLong_FromUnsignedLong(self->reader->sample_count);
}

static PyObject *
BinaryReader_get_sample_interval_us(BinaryReaderObject *self, void *closure)
{
    if (!self->reader) {
        return PyLong_FromLong(0);
    }
    return PyLong_FromUnsignedLongLong(self->reader->sample_interval_us);
}

static PyGetSetDef BinaryReader_getset[] = {
    {"sample_count", (getter)BinaryReader_get_sample_count, NULL, "Number of samples in file", NULL},
    {"sample_interval_us", (getter)BinaryReader_get_sample_interval_us, NULL, "Sample interval in microseconds", NULL},
    {NULL}
};

static PyMethodDef BinaryReader_methods[] = {
    _REMOTE_DEBUGGING_BINARYREADER_REPLAY_METHODDEF
    _REMOTE_DEBUGGING_BINARYREADER_GET_INFO_METHODDEF
    _REMOTE_DEBUGGING_BINARYREADER_GET_STATS_METHODDEF
    _REMOTE_DEBUGGING_BINARYREADER_CLOSE_METHODDEF
    _REMOTE_DEBUGGING_BINARYREADER___ENTER___METHODDEF
    _REMOTE_DEBUGGING_BINARYREADER___EXIT___METHODDEF
    {NULL, NULL, 0, NULL}
};

static void
BinaryReader_dealloc(PyObject *op)
{
    BinaryReaderObject *self = BinaryReader_CAST(op);
    PyTypeObject *tp = Py_TYPE(self);
    if (self->reader) {
        binary_reader_close(self->reader);
    }
    tp->tp_free(self);
    Py_DECREF(tp);
}

static PyType_Slot BinaryReader_slots[] = {
    {Py_tp_getset, BinaryReader_getset},
    {Py_tp_methods, BinaryReader_methods},
    {Py_tp_init, _remote_debugging_BinaryReader___init__},
    {Py_tp_dealloc, BinaryReader_dealloc},
    {0, NULL}
};

static PyType_Spec BinaryReader_spec = {
    .name = "_remote_debugging.BinaryReader",
    .basicsize = sizeof(BinaryReaderObject),
    .flags = (
        Py_TPFLAGS_DEFAULT
        | Py_TPFLAGS_IMMUTABLETYPE
    ),
    .slots = BinaryReader_slots,
};

/* ============================================================================
 * MODULE METHODS
 * ============================================================================ */

/*[clinic input]
_remote_debugging.zstd_available

Check if zstd compression is available.

Returns:
    True if zstd available, False otherwise
[clinic start generated code]*/

static PyObject *
_remote_debugging_zstd_available_impl(PyObject *module)
/*[clinic end generated code: output=55e35a70ef280cdd input=a1b4d41bc09c7cf9]*/
{
    return PyBool_FromLong(binary_io_zstd_available());
}

/* ============================================================================
 * MODULE-LEVEL FUNCTIONS
 * ============================================================================ */

/*[clinic input]
@permit_long_docstring_body
_remote_debugging.get_child_pids

    pid: int
        Process ID of the parent process
    *
    recursive: bool = True
        If True, return all descendants (children, grandchildren, etc.).
        If False, return only direct children.

Get all child process IDs of the given process.

Returns a list of child process IDs. Returns an empty list if no children
are found.

This function provides a snapshot of child processes at a moment in time.
Child processes may exit or new ones may be created after the list is returned.

Raises:
    OSError: If unable to enumerate processes
    NotImplementedError: If not supported on this platform
[clinic start generated code]*/

static PyObject *
_remote_debugging_get_child_pids_impl(PyObject *module, int pid,
                                      int recursive)
/*[clinic end generated code: output=1ae2289c6b953e4b input=19d8d5d6e2b59e6e]*/
{
    return enumerate_child_pids((pid_t)pid, recursive);
}

/*[clinic input]
_remote_debugging.is_python_process

    pid: int

Check if a process is a Python process.
[clinic start generated code]*/

static PyObject *
_remote_debugging_is_python_process_impl(PyObject *module, int pid)
/*[clinic end generated code: output=22947dc8afcac362 input=13488e28c7295d84]*/
{
    proc_handle_t handle;

    if (_Py_RemoteDebug_InitProcHandle(&handle, pid) < 0) {
        PyErr_Clear();
        Py_RETURN_FALSE;
    }

    uintptr_t runtime_start_address = _Py_RemoteDebug_GetPyRuntimeAddress(&handle);
    _Py_RemoteDebug_CleanupProcHandle(&handle);

    if (runtime_start_address == 0) {
        PyErr_Clear();
        Py_RETURN_FALSE;
    }

    Py_RETURN_TRUE;
}

static PyMethodDef remote_debugging_methods[] = {
    _REMOTE_DEBUGGING_ZSTD_AVAILABLE_METHODDEF
    _REMOTE_DEBUGGING_GET_CHILD_PIDS_METHODDEF
    _REMOTE_DEBUGGING_IS_PYTHON_PROCESS_METHODDEF
    {NULL, NULL, 0, NULL},
};

static PyModuleDef_Slot remote_debugging_slots[] = {
    {Py_mod_exec, _remote_debugging_exec},
    {Py_mod_multiple_interpreters, Py_MOD_PER_INTERPRETER_GIL_SUPPORTED},
    {Py_mod_gil, Py_MOD_GIL_NOT_USED},
    {0, NULL},
};

static struct PyModuleDef remote_debugging_module = {
    PyModuleDef_HEAD_INIT,
    .m_name = "_remote_debugging",
    .m_size = sizeof(RemoteDebuggingState),
    .m_methods = remote_debugging_methods,
    .m_slots = remote_debugging_slots,
    .m_traverse = remote_debugging_traverse,
    .m_clear = remote_debugging_clear,
    .m_free = remote_debugging_free,
};

PyMODINIT_FUNC
PyInit__remote_debugging(void)
{
    return PyModuleDef_Init(&remote_debugging_module);
}
