blob: 0d76ea62492499272b29f9bae3ae65817d702515 [file] [log] [blame]
# -*- coding: utf-8 -*-
#
# Copyright (c) 2019, Memfault
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code or in binary form must reproduce
# the above copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided with the
# distribution.
#
# 2. Neither the name of Memfault nor the names of its contributors may be
# used to endorse or promote products derived from this software without
# specific prior written permission.
#
# 3. This software, with or without modification, must only be used with
# the Memfault services and integrated with the Memfault server.
#
# 4. Any software provided in binary form under this license must not be
# reverse engineered, decompiled, modified and/or disassembled.
#
# THIS SOFTWARE IS PROVIDED BY MEMFAULT "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY,
# NONINFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
# SHALL MEMFAULT OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
# THE POSSIBILITY OF SUCH DAMAGE.
import argparse
import os
import platform
import re
import sys
import traceback
import uuid
from binascii import b2a_base64
from hashlib import md5
from json import dump, dumps, load, loads
from os.path import expanduser
from struct import pack, unpack
from tempfile import TemporaryFile
from threading import Thread
from time import sleep, time
try:
import gdb
except ImportError:
error_str = """
This script can only be run within gdb!
"""
raise ImportError(error_str) # (no raise-from in Python 2.7)
# Note: not using `requests` but using the built-in http.client instead, so
# there will be no additional dependencies other than Python itself.
try:
from httplib import HTTPConnection, HTTPSConnection
from Queue import Queue
from urlparse import urlparse, urlunparse
except ImportError:
from http.client import HTTPConnection, HTTPSConnection
from queue import Queue
from urllib.parse import urlparse, urlunparse
MEMFAULT_DEFAULT_INGRESS_BASE_URI = "https://ingress.memfault.com"
MEMFAULT_DEFAULT_CHUNKS_BASE_URI = "https://chunks.memfault.com"
MEMFAULT_DEFAULT_API_BASE_URI = "https://api.memfault.com"
try: # noqa: SIM105
# In Python 3.x, raw_input was renamed to input
# NOTE: Python 2.x also had an input() function which eval'd the input...!
input = raw_input
except NameError:
pass
class MemfaultConfig(object):
ingress_uri = MEMFAULT_DEFAULT_INGRESS_BASE_URI
api_uri = MEMFAULT_DEFAULT_API_BASE_URI
email = None
password = None
organization = None
project = None
user_id = None
# indirection so tests can mock this
prompt = input
# Added `json_path` and `input` as attributes on the config to aid unit testing:
json_path = expanduser("~/.memfault/gdb.json")
def can_make_project_api_request(self):
return self.email and self.password and self.project and self.organization
MEMFAULT_CONFIG = MemfaultConfig()
def register_value_to_bytes(gdb_scalar_value, little_endian=True):
"""
This helper is meant to be used with values that are not addressable, i.e. registers.
If you've got an addressable value, it's probably faster/better to use the Inferior.read_memory() API.
"""
# try to get the int representation of the value from gdb via py-value.c valpy_long/ valpy_int
#
# If that fails, fallback to parsing the string. Older versions of the GDB python API
# can't do int conversions on certain types such as pointers which some registers get
# defined as (i.e pc & sp)
try:
value_as_int = int(gdb_scalar_value)
except gdb.error:
# register string representation can look something like: "0x17d8 <__start>"
value_as_str = str(gdb_scalar_value).split()[0]
value_as_int = int(value_as_str, 0)
# if the value we get back from gdb python is negative, use the signed representation instead
# of the unsigned representation
#
# We represent all registers in the kMfltCoredumpBlockType_CurrentRegisters section of the coredump as 32 bit values (MfltCortexMRegs) so try
# to convert to a 4 byte representation regardless of the width reported by the gdb-server
fmt = "i" if value_as_int < 0 else "I"
return pack(fmt, value_as_int)
def _get_register_value(reglist, name):
return unpack("<I", reglist[name])[0]
def _pc_in_vector_table(register_list, exception_number, analytics_props):
try:
# The VTOR is implemented on armv6m and up though on chips like the Cortex-M0,
# it will always read as 0
vtor, _ = _read_register(0xE000ED08)
curr_pc = _get_register_value(register_list, "pc")
exc_handler, _ = _read_register(0x0 + vtor + (exception_number * 4))
exc_handler &= ~0x1 # Clear thumb bit
return exc_handler == curr_pc # noqa: TRY300
except Exception:
analytics_props["pc_in_vtor_check_error"] = {"traceback": traceback.format_exc()}
return False
def check_and_patch_reglist_for_fault(register_list, analytics_props):
# Fault exceptions on armv6m, armv7m, & armv8m will fall between 2 and 10
exception_number = _get_register_value(register_list, "xpsr") & 0xF
analytics_props["exception_number"] = exception_number
if exception_number < 2 or exception_number > 10:
# Not in an exception so keep the register list we already have
return
# If we have faulted it's typically useful to get the backtrace for the code where the fault
# originated. The integrated memfault-firmware-sdk does this by installing into the fault
# handlers and capturing the necessary registers. For the try script, we'll try to apply the
# same logic
fault_start_prompt = """
We see you are trying out Memfault from a Fault handler. That's great!
For the best results and to mirror the behavior of our firmware SDK,
please run "memfault coredump" at exception entry before other code has run
in the exception handler
"""
gdb_how_to_fault_prompt = """
It's easy to halt at exception entry by installing a breakpoint from gdb.
For example,
(gdb) breakpoint HardFault_Handler
"""
exc_return = _get_register_value(register_list, "lr")
if exc_return >> 28 != 0xF:
print("{} {}".format(fault_start_prompt, gdb_how_to_fault_prompt))
raise RuntimeError("LR no longer set to EXC_RETURN value")
# DCRS - armv8m only - only relevant when chaining secure and non-secure exceptions
# so pretty unlikely to be hit in a try test scenario
if exc_return & (1 << 5) == 0:
raise RuntimeError("DCRS exception unwinding unimplemented")
if not _pc_in_vector_table(register_list, exception_number, analytics_props):
analytics_props["displayed_fault_prompt"] = True
# The pc is not at the start of the exception handler. Some firmware implementations
# will redirect the vector table to a software vector table. If that's the case, it's
# hard to detect programmatically, let's check in with the user
y = MEMFAULT_CONFIG.prompt(
"{}\nAre you currently at the start of an exception handler [y/n]?".format(
fault_start_prompt
)
)
if "Y" not in y.upper():
print(gdb_how_to_fault_prompt)
raise RuntimeError("User did not confirm being at beginning of exception")
else:
analytics_props["displayed_fault_prompt"] = False
sp_name = "psp" if (exc_return & 0x4 != 0) else "msp"
sp = _get_register_value(register_list, sp_name)
# restore the register state prior to exception entry so we get an unwind
# from where the exception actually occurred
exception_frame = ("r0", "r1", "r2", "r3", "r12", "lr", "pc", "xpsr")
for idx, r in enumerate(exception_frame):
_, data = _read_register(sp + idx * 4)
register_list[r] = data
orig_sp_offset = 0x68 if (exc_return & (1 << 4) == 0) else 0x20
if _get_register_value(register_list, "xpsr") & (1 << 9) != 0:
orig_sp_offset += 0x4
register_list["sp"] = pack("<I", sp + orig_sp_offset)
def is_debug_info_section(section):
return section.name.startswith(".debug_")
def should_capture_section(section):
if section.size == 0:
return False
# Assuming we never want to grab .text:
if section.name == ".text":
return False
# Assuming we never want to grab debug info:
if is_debug_info_section(section):
return False
# Sometimes these get flagged incorrectly as READONLY:
if any(filter(lambda n: n in section.name, (".heap", ".bss", ".data", ".stack"))):
return True
# Only grab non-readonly stuff:
return not section.read_only
class CoredumpArch(object):
pass
class XtensaCoredumpArch(CoredumpArch):
MACHINE_TYPE = 94 # XTENSA
@property
def num_cores(self):
return 2
@property
def register_collection_list(self):
# The order in which we collect XTENSA registers in a crash
# fmt: off
return (
# We use the first word to convey what registers are collected in the coredump
"collection_type",
# Actual registers
"pc", "ps", "ar0", "ar1", "ar2", "ar3", "ar4", "ar5", "ar6", "ar7", "ar8", "ar9",
"ar10", "ar11", "ar12", "ar13", "ar14", "ar15", "ar16", "ar17", "ar18", "ar19", "ar20",
"ar21", "ar22", "ar23", "ar24", "ar25", "ar26", "ar27", "ar28", "ar29", "ar30", "ar31",
"ar32", "ar33", "ar34", "ar35", "ar36", "ar37", "ar38", "ar39", "ar40", "ar41", "ar42",
"ar43", "ar44", "ar45", "ar46", "ar47", "ar48", "ar49", "ar50", "ar51", "ar52", "ar53",
"ar54", "ar55", "ar56", "ar57", "ar58", "ar59", "ar60", "ar61", "ar62", "ar63", "sar",
# Note: Only enabled for xtensa targets which define XCHAL_HAVE_LOOPS
"lbeg",
"lend",
"lcount",
# Special registers to collect
"windowbase",
"windowstart",
)
# fmt: on
@property
def alternative_register_name_dict(self):
return {}
def add_platform_specific_sections(self, cd_writer, inferior, analytics_props):
pass
def guess_ram_regions(self, elf_sections):
# TODO: support esp32-s2 & esp8266
# For now we just use the memory map from the esp32 for xtensa
# Memory map:
# https://github.com/espressif/esp-idf/blob/v3.3.1/components/soc/esp32/include/soc/soc.h#L286-L304
return (
# SOC_DRAM
(0x3FFAE000, 0x52000),
# SOC_RTC_DATA
(0x50000000, 0x2000),
# SOC_RTC_DRAM
(0x3FF80000, 0x2000),
)
def _read_registers(self, core, gdb_thread, analytics_props):
# NOTE: The only way I could figure out to read raw registers for CPU1 was
# to send the raw gdb command,"g" after explicitly setting the active core
gdb.execute("mon set_core {}".format(core))
registers = gdb.execute("maintenance packet g", to_string=True)
registers = registers.split("received: ")[1].replace('"', "")
# The order GDB sends xtensa registers (for esp32) in:
# https://github.com/espressif/xtensa-overlays/blob/master/xtensa_esp32/gdb/gdb/regformats/reg-xtensa.dat#L3-L107
# fmt: off
xtensa_gdb_idx_regs = (
"pc", "ar0", "ar1", "ar2", "ar3", "ar4", "ar5", "ar6", "ar7",
"ar8", "ar9", "ar10", "ar11", "ar12", "ar13", "ar14", "ar15", "ar16", "ar17", "ar18",
"ar19", "ar20", "ar21", "ar22", "ar23", "ar24", "ar25", "ar26", "ar27", "ar28", "ar29",
"ar30", "ar31", "ar32", "ar33", "ar34", "ar35", "ar36", "ar37", "ar38", "ar39", "ar40",
"ar41", "ar42", "ar43", "ar44", "ar45", "ar46", "ar47", "ar48", "ar49", "ar50", "ar51",
"ar52", "ar53", "ar54", "ar55", "ar56", "ar57", "ar58", "ar59", "ar60", "ar61", "ar62",
"ar63", "lbeg", "lend", "lcount", "sar", "windowbase", "windowstart", "configid0",
"configid1", "ps", "threadptr", "br", "scompare1", "acclo", "acchi", "m0", "m1", "m2",
"m3", "expstate", "f64r_lo", "f64r_hi", "f64s", "f0", "f1", "f2", "f3", "f4", "f5",
"f6", "f7", "f8", "f9", "f10", "f11", "f12", "f13", "f14", "f15", "fcr", "fsr",
)
# fmt: on
# Scoop up all register values
vals = []
for i in range(len(xtensa_gdb_idx_regs)):
start_idx = i * 8
hexstr = registers[start_idx : start_idx + 8]
vals.append(bytearray.fromhex(hexstr))
register_list = {}
for register_name in self.register_collection_list:
# A "special" value we use to convey what ESP32 registers were collected
if register_name == "collection_type":
# A value of 0 means we collected the full set of windows. When a debugger is
# active, some register windows may not have been spilled to the stack so we need
# all the windows.
register_list[register_name] = bytearray.fromhex("00000000")
continue
idx = xtensa_gdb_idx_regs.index(register_name)
register_list[register_name] = vals[idx]
return register_list
def get_current_registers(self, gdb_thread, analytics_props):
result = []
try:
for core_id in range(self.num_cores):
result.append(self._read_registers(core_id, gdb_thread, analytics_props))
except Exception:
analytics_props["core_reg_collection_error"] = {"traceback": traceback.format_exc()}
return result
class ArmCortexMCoredumpArch(CoredumpArch):
MACHINE_TYPE = 40 # ARM
@property
def num_cores(self):
return 1
@property
def register_collection_list(self):
# The order in which we collect ARM registers in a crash
return (
"r0",
"r1",
"r2",
"r3",
"r4",
"r5",
"r6",
"r7",
"r8",
"r9",
"r10",
"r11",
"r12",
"sp",
"lr",
"pc", # 15
"xpsr",
"msp",
"psp",
"primask",
"control",
)
@property
def alternative_register_name_dict(self):
# GDB allows the remote server to provide register names via the target description which can
# be exchanged between client and server:
# https://sourceware.org/gdb/onlinedocs/gdb/Target-Description-Format.html#Target-Description-Format
#
# Different implementations of the gdb server use different names for these custom register sets
# This dictionary holds known mappings between the names we will use and the ones in different GDBDebugContextFacade
# server implementations
#
# NOTE: If the only difference is capitalization, that will be automagically resolved below
return {"xpsr": ("cpsr",)}
@staticmethod
def _try_collect_mpu_settings():
cpuid = 0xE000ED00
reg_val, _ = _read_register(cpuid)
partno = (reg_val >> 4) & 0xFFF
cortex_m_cpuids = {
0xC20: "M0",
0xC21: "M1",
0xC23: "M3",
0xC24: "M4",
0xC27: "M7",
0xC60: "M0+",
}
if partno not in cortex_m_cpuids:
return None
print("Cortex-{} detected".format(cortex_m_cpuids[partno]))
mpu_type = 0xE000ED90
mpu_ctrl = 0xE000ED94
mpu_rnr = 0xE000ED98
mpu_rbar = 0xE000ED9C
mpu_rasr = 0xE000EDA0
result = b""
mpu_type, mpu_type_data = _read_register(mpu_type)
result += mpu_type_data
mpu_ctrl, mpu_ctrl_data = _read_register(mpu_ctrl)
result += mpu_ctrl_data
num_regions = (mpu_type >> 8) & 0xFF
for i in range(num_regions):
_write_register(mpu_rnr, i)
_, data = _read_register(mpu_rbar)
result += data
_, data = _read_register(mpu_rasr)
result += data
return result
def add_platform_specific_sections(self, cd_writer, inferior, analytics_props):
mem_mapped_regs = [
("ictr", "Interrupt Controller Type Register", 0xE000E004, 0xE000E008),
("systick", "ARMv7-M System Timer", 0xE000E010, 0xE000E020),
("scb", "ARMv7-M System Control Block", 0xE000ED00, 0xE000ED8F),
("scs_debug", "ARMv7-M SCS Debug Registers", 0xE000EDFC, 0xE000EE00),
("nvic", "ARMv7-M External Interrupt Controller", 0xE000E100, 0xE000E600),
]
for mem_mapped_reg in mem_mapped_regs:
try:
short_name, desc, base, top = mem_mapped_reg
section = Section(base, top - base, desc)
section.data = inferior.read_memory(section.addr, section.size)
cd_writer.add_section(section)
analytics_props["{}_ok".format(short_name)] = True
except Exception:
analytics_props["{}_collection_error".format(short_name)] = {
"traceback": traceback.format_exc()
}
try:
cd_writer.armv67_mpu = self._try_collect_mpu_settings()
print("Collected MPU config")
except Exception:
analytics_props["mpu_collection_error"] = {"traceback": traceback.format_exc()}
@staticmethod
def _merge_memory_regions(regions):
"""Take a set of memory regions and merge any overlapping regions"""
regions.sort(key=lambda x: x.addr) # Sort the regions based on starting address
merged_regions = []
for region in regions:
if not merged_regions:
merged_regions.append(region)
else:
prev_region = merged_regions[-1]
if region.addr <= prev_region.addr + prev_region.size:
prev_region.size = max(
prev_region.size, region.addr + region.size - prev_region.addr
)
else:
merged_regions.append(region)
return merged_regions
def guess_ram_regions(self, elf_sections):
capturable_elf_sections = list(filter(should_capture_section, elf_sections))
def _is_ram(base_addr):
# See Table B3-1 ARMv7-M address map in "ARMv7-M Architecture Reference Manual"
return (base_addr & 0xF0000000) in (0x20000000, 0x30000000, 0x60000000, 0x80000000)
capture_size = 1024 * 1024 # Capture up to 1MB per section
for section in capturable_elf_sections:
section.size = capture_size
# merge any overlapping sections to make the core a little more efficient
capturable_elf_sections = self._merge_memory_regions(capturable_elf_sections)
filtered_addrs = set(filter(_is_ram, (s.addr for s in capturable_elf_sections)))
# Capture up to 1MB for each region
return [(addr, capture_size) for addr in filtered_addrs]
def get_current_registers(self, gdb_thread, analytics_props):
gdb_thread.switch()
# GDB Doesn't have a convenient way to know all of the registers in Python, so this is the
# best way. Call this, rip out the first element in each row...that's the register name
#
# NOTE: Using the 'all-registers' command below, because on some
# versions of gdb "msp, psp, etc" are not considered part of them core
# set. This will also dump all the fpu registers which we don't collect
# but thats fine. 'info all-registers' is the preferred command, where
# 'info reg [all]' is arch-specific, see:
# https://sourceware.org/gdb/onlinedocs/gdb/Registers.html
try:
info_reg_all_list = gdb.execute("info all-registers", to_string=True)
except gdb.error:
# Some versions of gdb don't support 'all' and return an error, fall
# back to 'info reg'
info_reg_all_list = gdb.execute("info reg", to_string=True)
return (lookup_registers_from_list(self, info_reg_all_list, analytics_props),)
# TODO: De-duplicate with code from rtos_register_stacking.py
def concat_registers_dict_to_bytes(arch, regs):
result = b""
for reg_name in arch.register_collection_list:
assert reg_name.lower() == reg_name
if reg_name not in regs:
result += b"\x00\x00\x00\x00"
continue
result += regs[reg_name]
return result
def _is_expected_reg(arch, reg_name):
return reg_name in arch.register_collection_list
def _add_reg_collection_error_analytic(arch, analytics_props, reg_name, error):
if not _is_expected_reg(arch, reg_name):
return
reg_collection_error = "reg_collection_error"
if reg_collection_error not in analytics_props:
analytics_props[reg_collection_error] = {}
analytics_props[reg_collection_error][reg_name] = error
def _try_read_register(arch, frame, lookup_name, register_list, analytics_props, result_name=None):
# `info reg` will print all registers, even though they are not part of the core.
# If that's the case, doing frame.read_register() will raise a gdb.error.
try:
if hasattr(frame, "read_register"):
value = frame.read_register(lookup_name)
else:
# GCC <= 4.9 doesn't have the read_register API
value = gdb.parse_and_eval("${}".format(lookup_name))
value_str = str(value)
if value_str != "<unavailable>":
name_to_use = lookup_name if result_name is None else result_name
register_list[name_to_use] = register_value_to_bytes(value)
else:
_add_reg_collection_error_analytic(
arch, analytics_props, lookup_name, "<unavailable> value"
)
except Exception:
_add_reg_collection_error_analytic(
arch, analytics_props, lookup_name, traceback.format_exc()
)
def lookup_registers_from_list(arch, info_reg_all_list, analytics_props):
frame = gdb.newest_frame()
frame.select()
register_names = []
for reg_row in info_reg_all_list.strip().split("\n"):
name = reg_row.split()[0]
register_names.append(name)
def _search_list_for_alt_name(reg, found_registers):
# first see if it's just case getting in the way i.e 'CONTROL' instead of 'control'. We
# need to preserve case when we actually issue the read so the gdb API works correctly
for found_reg in found_registers:
if found_reg.lower() == reg:
return found_reg
alt_reg_names = arch.alternative_register_name_dict.get(reg, [])
for alt_reg_name in alt_reg_names:
if alt_reg_name in found_registers:
return alt_reg_name
return None
alt_reg_names = []
for expected_reg in arch.register_collection_list:
if expected_reg in register_names:
continue
alt_reg_name = _search_list_for_alt_name(expected_reg, register_names)
if alt_reg_name:
alt_reg_names.append((alt_reg_name, expected_reg))
continue
_add_reg_collection_error_analytic(
arch, analytics_props, expected_reg, "Not found in register set"
)
# Iterate over all register names and pull the value out of the frame
register_list = {}
# Remove register_names we don't care about before actually looking up values
register_names = filter(lambda n: _is_expected_reg(arch, n), register_names)
for reg_name in register_names:
_try_read_register(arch, frame, reg_name, register_list, analytics_props)
for lookup_reg_name, result_reg_name in alt_reg_names:
_try_read_register(
arch, frame, lookup_reg_name, register_list, analytics_props, result_reg_name
)
# if we can't patch the registers, we'll just fallback to the active state
try:
check_and_patch_reglist_for_fault(register_list, analytics_props)
except Exception:
analytics_props["fault_register_recover_error"] = {"traceback": traceback.format_exc()}
return register_list
# TODO: De-duplicate with code from core_convert.py
MEMFAULT_COREDUMP_MAGIC = 0x45524F43
MEMFAULT_COREDUMP_VERSION = 1
MEMFAULT_COREDUMP_FILE_HEADER_FMT = "<III" # magic, version, file length (incl. file header)
MEMFAULT_COREDUMP_BLOCK_HEADER_FMT = "<bxxxII" # type, address, block payload length
class MemfaultCoredumpBlockType(object): # (IntEnum): # trying to be python2.7 compatible
CURRENT_REGISTERS = 0
MEMORY_REGION = 1
DEVICE_SERIAL = 2
FIRMWARE_VERSION = 3
HARDWARE_REVISION = 4
TRACE_REASON = 5
PADDING_REGION = 6
MACHINE_TYPE = 7
VENDOR_COREDUMP_ESP_IDF_V2_TO_V3_1 = 8
ARM_V7M_MPU = 9
SOFTWARE_VERSION = 10
SOFTWARE_TYPE = 11
class MemfaultCoredumpWriter(object):
def __init__(self, arch, device_serial, software_type, software_version, hardware_revision):
self.device_serial = device_serial
self.software_version = software_version
self.software_type = software_type
self.hardware_revision = hardware_revision
self.trace_reason = 5 # Debugger Halted
self.regs = {}
self.sections = []
self.armv67_mpu = None
self.arch = arch
def add_section(self, section):
self.sections.append(section)
def _write(self, write, file_length=0):
# file header:
write(
pack(
MEMFAULT_COREDUMP_FILE_HEADER_FMT,
MEMFAULT_COREDUMP_MAGIC,
MEMFAULT_COREDUMP_VERSION,
file_length,
)
)
def _write_block(type, payload, address=0): # noqa: A002
write(pack(MEMFAULT_COREDUMP_BLOCK_HEADER_FMT, type, address, len(payload)))
write(payload)
for core_regs in self.regs:
_write_block(
MemfaultCoredumpBlockType.CURRENT_REGISTERS,
concat_registers_dict_to_bytes(self.arch, core_regs),
)
_write_block(MemfaultCoredumpBlockType.DEVICE_SERIAL, self.device_serial.encode("utf8"))
_write_block(
MemfaultCoredumpBlockType.SOFTWARE_VERSION, self.software_version.encode("utf8")
)
_write_block(MemfaultCoredumpBlockType.SOFTWARE_TYPE, self.software_type.encode("utf8"))
_write_block(
MemfaultCoredumpBlockType.HARDWARE_REVISION, self.hardware_revision.encode("utf8")
)
_write_block(MemfaultCoredumpBlockType.MACHINE_TYPE, pack("<I", self.arch.MACHINE_TYPE))
_write_block(MemfaultCoredumpBlockType.TRACE_REASON, pack("<I", self.trace_reason))
# Record the time in a fake memory region. By doing this we guarantee a unique coredump
# will be generated each time "memfault coredump" is run. This makes it easier to discover
# the de-duplication logic get run by the Memfault backend
time_data = pack("<I", int(time()))
_write_block(MemfaultCoredumpBlockType.MEMORY_REGION, time_data, 0xFFFFFFFC)
for section in self.sections:
_write_block(MemfaultCoredumpBlockType.MEMORY_REGION, section.data, section.addr)
if self.armv67_mpu:
_write_block(MemfaultCoredumpBlockType.ARM_V7M_MPU, self.armv67_mpu)
def write(self, out_f):
# Count the total size first:
total_size = {"size": 0}
def _counting_write(data):
# nonlocal total_size # Not python 2.x compatible :(
# total_size += len(data)
total_size["size"] = total_size["size"] + len(data)
self._write(_counting_write)
# Actually write out to the file:
self._write(out_f.write, total_size["size"])
class Section(object): # noqa: PLW1641
def __init__(self, addr, size, name, read_only=True):
self.addr = addr
self.size = size
self.name = name
self.read_only = read_only
self.data = b""
def __eq__(self, other):
return (
self.addr == other.addr
and self.size == other.size
and self.name == other.name
and self.read_only == other.read_only
and self.data == other.data
)
def __str__(self):
return "Section(addr=0x%x, size=0x%x, name='%s', read_only=%s, data_len=%d)" % (
self.addr,
self.size,
self.name,
self.read_only,
len(self.data),
)
def parse_maintenance_info_sections(output):
fn_match = re.search(r"`([^']+)', file type", output)
if fn_match is None:
return None, None
# Using groups() here instead of fn_match[1] for python2.x compatibility
fn = fn_match.groups()[0]
# Grab start addr, end addr, name and flags for each section line:
# [2] 0x6b784->0x6b7a8 at 0x0004b784: .gnu_build_id ALLOC LOAD READONLY DATA HAS_CONTENTS
section_matches = re.findall(
r"\s+\[\d+\]\s+(0x[\da-fA-F]+)[^0]+(0x[\da-fA-F]+)[^:]+: ([^ ]+) (.*)$",
output,
re.MULTILINE,
)
def _tuple_to_section(tpl):
addr = int(tpl[0], base=16)
size = int(tpl[1], base=16) - addr
name = tpl[2]
read_only = "READONLY" in tpl[3]
return Section(addr, size, name, read_only)
sections = map(_tuple_to_section, section_matches)
return fn, list(sections)
def read_memory_until_error(inferior, start, size, read_size=4 * 1024):
data = b""
end = start + size
try:
for addr in range(start, end, read_size):
data += bytes(inferior.read_memory(addr, min(read_size, end - addr)))
except Exception as e:
# Catch gdbserver read exceptions -- not sure what exception classes can get raised here
print(e)
return data
def _create_http_connection(base_uri):
url = urlparse(base_uri)
if url.hostname is None:
raise RuntimeError("Invalid base URI, must be http(s)://hostname")
if url.scheme == "http":
conn_class = HTTPConnection
default_port = 80
else:
conn_class = HTTPSConnection
default_port = 443
port = url.port or default_port
return conn_class(url.hostname, port=port)
def _http(method, base_uri, path, headers=None, body=None):
if headers is None:
headers = {}
conn = _create_http_connection(base_uri)
# Convert to a string/bytes object so 'Content-Length' is set appropriately
# Python 2.7 uses this by default but 3.6 & up were using 'chunked'
if sys.version_info.major >= 3 and hasattr(body, "read"):
body = body.read()
conn.request(method, path, body=body, headers=headers)
response = conn.getresponse()
status = response.status
reason = response.reason
body = response.read()
try:
json_body = loads(body)
except Exception:
json_body = None
conn.close()
return status, reason, json_body
def add_basic_auth(user, password, headers=None):
headers = dict(headers) if headers else {}
headers["Authorization"] = "Basic {}".format(
b2a_base64("{}:{}".format(user, password).encode("utf8")).decode("ascii").strip()
)
return headers
class HttpApiError(Exception):
def __init__(self, status, reason):
super(Exception, self).__init__("{} (HTTP {})".format(reason, status))
def _check_http_response(status, reason):
if status < 200 or status >= 300:
raise HttpApiError(status, reason)
def _http_api(config, method, path, headers=None, body=None, should_raise=False):
headers = add_basic_auth(config.email, config.password, headers=headers)
status, reason, body = _http(method, config.api_uri, path, headers, body)
if should_raise:
_check_http_response(status, reason)
return status, reason, body
def http_post_coredump(coredump_file, project_key, ingress_uri):
headers = {"Content-Type": "application/octet-stream", "Memfault-Project-Key": project_key}
status, reason, _ = _http(
"POST", ingress_uri, "/api/v0/upload/coredump", headers=headers, body=coredump_file
)
return status, reason
def http_post_chunk(chunk_data_file, project_key, chunks_uri, device_serial):
headers = {"Content-Type": "application/octet-stream", "Memfault-Project-Key": project_key}
status, reason, _ = _http(
"POST",
chunks_uri,
"/api/v0/chunks/{}".format(device_serial),
headers=headers,
body=chunk_data_file,
)
return status, reason
def http_get_auth_me(api_uri, email, password):
headers = add_basic_auth(email, password)
return _http("GET", api_uri, "/auth/me", headers=headers)
def http_get_prepared_url(config):
_, _, body = _http_api(
config,
"POST",
"/api/v0/organizations/{organization}/projects/{project}/upload".format(
organization=config.organization,
project=config.project,
),
headers={"Accept": "application/json"},
should_raise=True,
)
data = body["data"]
return data["token"], data["upload_url"]
def http_upload_file(config, file_readable):
token, upload_url = http_get_prepared_url(config)
url_parts = urlparse(upload_url)
base_uri = urlunparse((url_parts[0], url_parts[1], "", "", "", ""))
path = urlunparse(("", "", url_parts[2], url_parts[3], url_parts[4], url_parts[5]))
status, reason, _ = _http(
"PUT",
base_uri,
path,
# NB: Prepared upload API does not expect Content-Type to be set so
# we need to exclude the Content-Type or set it to an empty string
headers={"Content-Type": ""},
body=file_readable,
)
_check_http_response(status, reason)
return token
def http_upload_symbol_file(config, artifact_readable, software_type, software_version):
token = http_upload_file(config, artifact_readable)
_http_api(
config,
"POST",
"/api/v0/organizations/{organization}/projects/{project}/symbols".format(
organization=config.organization,
project=config.project,
),
headers={"Content-Type": "application/json", "Accept": "application/json"},
body=dumps({
"file": {"token": token, "name": "symbols.elf"},
"software_version": {
"version": software_version,
"software_type": software_type,
},
}),
should_raise=True,
)
def http_get_software_version(config, software_type, software_version):
software_version_url = "/api/v0/organizations/{organization}/projects/{project}/software_types/{software_type}/software_versions/{software_version}".format(
organization=config.organization,
project=config.project,
software_type=software_type,
software_version=software_version,
)
status, _, body = _http_api(config, "GET", software_version_url)
if status < 200 or status >= 300:
return None
return body["data"]
def http_get_project_key(config):
status, reason, body = _http_api(
config,
"GET",
"/api/v0/organizations/{organization}/projects/{project}/api_key".format(
organization=config.organization, project=config.project
),
headers={"Accept": "application/json"},
)
if status < 200 or status >= 300:
return None, (status, reason)
return body["data"]["api_key"], None
def get_file_hash(fn):
with open(fn, "rb") as f:
return md5(f.read()).hexdigest()
def has_uploaded_symbols(config, software_type, software_version):
software_version_obj = http_get_software_version(config, software_type, software_version)
if not software_version_obj:
return False
symbol_file = software_version_obj.get("symbol_file")
if not symbol_file:
return False
return bool(symbol_file.get("downloadable"))
def upload_symbols_if_needed(config, elf_fn, software_type, software_version):
has_symbols = has_uploaded_symbols(config, software_type, software_version)
if has_symbols:
print("Symbols have already been uploaded, skipping!")
return
if not has_symbols:
print("Uploading symbols...")
with open(elf_fn, "rb") as elf_f:
try:
http_upload_symbol_file(config, elf_f, software_type, software_version)
# NOTE: upload is processed asynchronously. Give the symbol file
# a little time to be processed. In the future, we could poll here
# for completion
sleep(0.3)
print("Done!")
except HttpApiError as e:
print("Failed to upload symbols: {}".format(e))
# TODO: Duped from mflt.tools/gdb_memfault.py
class MemfaultGdbArgumentParseError(Exception):
pass
class MemfaultGdbArgumentParser(argparse.ArgumentParser):
def exit(self, status=0, message=None):
if message:
self._print_message(message)
# Don't call sys.exit()
raise MemfaultGdbArgumentParseError
def populate_config_args_and_parse_args(parser, unicode_args, config):
parser.add_argument(
"--email",
help="The username (email address) of the user to use",
default=MEMFAULT_CONFIG.email,
)
parser.add_argument(
"--password",
help="The user API key or password of the user to use",
default=MEMFAULT_CONFIG.password,
)
parser.add_argument(
"--organization",
"-o",
help="Default organization (slug) to use",
default=MEMFAULT_CONFIG.organization,
)
parser.add_argument(
"--project", "-p", help="Default project (slug) to use", default=MEMFAULT_CONFIG.project
)
parser.add_argument(
"--ingress-uri",
default=MEMFAULT_CONFIG.ingress_uri,
help="Default ingress base URI to use (default: {})".format(MEMFAULT_CONFIG.ingress_uri),
)
parser.add_argument(
"--api-uri",
help="Default API base URI to use (default: {})".format(MEMFAULT_CONFIG.api_uri),
default=MEMFAULT_CONFIG.api_uri,
)
args = list(filter(None, unicode_args.split(" ")))
parsed_args = parser.parse_args(args)
config.ingress_uri = parsed_args.ingress_uri
config.api_uri = parsed_args.api_uri
config.email = parsed_args.email
config.password = parsed_args.password
config.organization = parsed_args.organization
config.project = parsed_args.project
# If the user overrides the email, we need to re-auth to get the new user ID
if parsed_args.email == MEMFAULT_CONFIG.email:
config.user_id = MEMFAULT_CONFIG.user_id
else:
_, _, json_body = http_get_auth_me(config.api_uri, config.email, config.password)
config.user_id = json_body["id"]
return parsed_args
class MemfaultGdbCommand(gdb.Command):
GDB_CMD = "memfault override_me"
def __init__(self, *args, **kwargs):
super(MemfaultGdbCommand, self).__init__(self.GDB_CMD, gdb.COMMAND_USER, *args, **kwargs)
def invoke(self, arg, from_tty):
# Work-around for GDB-py not printing out the backtrace upon exceptions
analytics_props = {}
start_time = time()
try:
self._invoke(arg, from_tty, analytics_props, MEMFAULT_CONFIG)
except Exception:
print(traceback.format_exc())
ANALYTICS.track("Exception", {"traceback": traceback.format_exc(), "args": arg})
raise # Important, otherwise unit test failures may go undetected!
analytics_props["duration_ms"] = int((time() - start_time) * 1000)
ANALYTICS.track("Command {}".format(self.GDB_CMD), analytics_props, MEMFAULT_CONFIG.user_id)
def _invoke(self, unicode_args, from_tty, analytics_props, config):
pass
class Memfault(MemfaultGdbCommand):
"""Memfault GDB commands"""
GDB_CMD = "memfault"
def __init__(self):
super(Memfault, self).__init__(gdb.COMPLETE_NONE, prefix=True)
def _invoke(self, unicode_args, from_tty, analytics_props, config):
gdb.execute("help memfault")
def settings_load():
try:
with open(MEMFAULT_CONFIG.json_path, "rb") as f:
return load(f)
except Exception:
return {}
def settings_save(settings):
try: # noqa: SIM105
# exist_ok does not exist yet in Python 2.7!
os.makedirs(os.path.dirname(MEMFAULT_CONFIG.json_path))
except OSError:
pass
with open(MEMFAULT_CONFIG.json_path, "w") as f:
dump(settings, f, sort_keys=True)
def _infer_issues_html_url(ingress_uri, config):
if "dev." in ingress_uri or "localhost" in ingress_uri:
html_base_uri = ingress_uri
elif "ingress.try" in ingress_uri:
html_base_uri = ingress_uri.replace("ingress.try.", "try.")
elif "ingress." in ingress_uri:
html_base_uri = ingress_uri.replace("ingress.", "app.")
else:
return None
return "{}/organizations/{}/projects/{}/issues?live".format(
html_base_uri, config.organization, config.project
)
def _read_register(address):
reg_data = gdb.selected_inferior().read_memory(address, 4)
reg_val = unpack("<I", reg_data)[0]
return reg_val, bytes(reg_data)
def _write_register(address, value):
reg_val = pack("<I", value)
gdb.selected_inferior().write_memory(address, reg_val)
def _post_chunk_data(chunk_data, **kwargs):
with TemporaryFile() as f_out:
f_out.write(chunk_data)
f_out.seek(0)
return http_post_chunk(f_out, **kwargs)
class GdbMemfaultPostChunkBreakpoint(gdb.Breakpoint):
def __init__(self, project_key, chunks_uri, verbose, device_serial, *args, **kwargs):
gdb.Breakpoint.__init__(self, *args, **kwargs)
self.project_key = project_key
self.chunks_uri = chunks_uri
self.verbose = verbose
self.chunk_buf_size_tuple = None
self.device_serial = device_serial
print("Memfault GDB Chunk Handler Installed")
def _determine_param_names(self):
if self.chunk_buf_size_tuple is not None:
return self.chunk_buf_size_tuple
frame = gdb.newest_frame()
chunk_buf_param = None
chunk_buf_size_param = None
for symbol in frame.block():
if not symbol.is_argument:
continue
symbol_type = symbol.type.strip_typedefs().code
if symbol_type == gdb.TYPE_CODE_PTR:
chunk_buf_param = symbol.name
if symbol_type == gdb.TYPE_CODE_INT:
chunk_buf_size_param = symbol.name
if chunk_buf_param is None or chunk_buf_size_param is None:
return None
print(
"Params Found! Chunk buffer: '{}' Length: '{}'".format(
chunk_buf_param, chunk_buf_size_param
)
)
self.chunk_buf_size_tuple = (chunk_buf_param, chunk_buf_size_param)
return self.chunk_buf_size_tuple
def stop(self):
params = self._determine_param_names()
if params is None:
print("""ERROR: Could not determine names of parameters holding chunk buffer and size
Disabling Memfault GDB Chunk Handler & Halting
""")
self.enabled = False
return True
buf_param, buf_size_param = params
# NB: Cast packet_buffer pointer to a uint32_t because older versions of GDB don't perform
# the address conversion correctly
packet_buffer_addr = int(gdb.parse_and_eval("(uint32_t){}".format(buf_param)))
packet_buffer_len = gdb.parse_and_eval(buf_size_param)
chunk_data = gdb.selected_inferior().read_memory(packet_buffer_addr, packet_buffer_len)
status, reason = _post_chunk_data(
chunk_data,
project_key=self.project_key,
chunks_uri=self.chunks_uri,
device_serial=self.device_serial,
)
if status != 202:
print("Chunk Post Failed with Http Status Code {}".format(status))
print("Reason: {}".format(reason))
print("ERROR: Disabling Memfault GDB Chunk Handler and Halting")
self.enabled = False
return True
if self.verbose:
print("Successfully posted {} bytes of chunk data".format(len(chunk_data)))
return False
class MemfaultPostChunk(MemfaultGdbCommand):
"""Installs a hook which sends chunks to the Memfault cloud chunk endpoint automagically.
See https://mflt.io/posting-chunks-with-gdb for more details.
"""
GDB_CMD = "memfault install_chunk_handler"
USER_TRANSPORT_SEND_CHUNK_HANDLER = [ # noqa: RUF012
"memfault_data_export_chunk",
"user_transport_send_chunk_data",
]
DEFAULT_CHUNK_DEVICE_SERIAL = "GDB_TESTSERIAL"
FILENAME_FROM_INFO_FUNC_PATTERN = re.compile(r"File\s+([^\s]+):")
def _invoke(self, unicode_args, from_tty, analytics_props, config):
try:
parsed_args = self.parse_args(unicode_args)
except MemfaultGdbArgumentParseError:
return
if isinstance(parsed_args.chunk_handler_func, list):
chunk_handler_funcs = parsed_args.chunk_handler_func
else:
chunk_handler_funcs = [parsed_args.chunk_handler_func]
for chunk_handler_func in chunk_handler_funcs:
output = gdb.execute("info functions {}$".format(chunk_handler_func), to_string=True)
lines = output.splitlines()[1:]
if len(lines) != 0:
break
else:
print(
"""Chunk handler function '{}' does not exist.
A custom handler location can be specified with the --chunk-handler-func argument.
""".format(chunk_handler_func)
)
return
# It's possible to have multiple function declarations when the default weak symbol in
# components/demo is overridden. This is because the gnu linker does not edit the DWARF DIE
# generated by a compilation unit (https://sourceware.org/bugzilla/show_bug.cgi?id=25561)
# To workaround this limitation, we'll use the <file>:<function> syntax to ensure the
# breakpoint is set for the correct function.
chunk_handler_func_filename = None
for line in lines:
file_match = re.search(self.FILENAME_FROM_INFO_FUNC_PATTERN, line)
if not file_match:
continue
filename = file_match.groups()[0]
if chunk_handler_func_filename is None or "components/demo" not in filename:
chunk_handler_func_filename = filename
spec_prefix = (
""
if chunk_handler_func_filename is None
else "'{}':".format(chunk_handler_func_filename)
)
# We always delete any pre-existing breakpoints on the function. This way
# a re-execution of the command can always be used to re-init the setup
#
# NB: Wrapped in a try catch because older versions of gdb.breakpoints() eventually return None
# instead of raising StopIteration
try:
for breakpoint in gdb.breakpoints(): # noqa: A001
if chunk_handler_func in breakpoint.location:
print("Deleting breakpoint for '{}'".format(chunk_handler_func))
breakpoint.delete()
except TypeError:
pass
print("Installing Memfault GDB Chunk Handler for function '{}'".format(chunk_handler_func))
GdbMemfaultPostChunkBreakpoint(
spec=spec_prefix + chunk_handler_func,
project_key=parsed_args.project_key,
chunks_uri=parsed_args.chunks_uri,
verbose=parsed_args.verbose,
device_serial=parsed_args.device_serial,
internal=True,
)
def parse_args(self, unicode_args):
parser = MemfaultGdbArgumentParser(description=MemfaultPostChunk.__doc__)
parser.add_argument(
"--project-key", "-pk", help="Memfault Project Key", required=True, default=None
)
parser.add_argument(
"--chunk-handler-func",
"-ch",
help=(
"Name of function that handles sending Memfault Chunks."
"(default: search for one of {})".format(self.USER_TRANSPORT_SEND_CHUNK_HANDLER)
),
default=self.USER_TRANSPORT_SEND_CHUNK_HANDLER,
)
parser.add_argument(
"--chunks-uri",
help="Default chunks base URI to use (default: {})".format(
MEMFAULT_DEFAULT_CHUNKS_BASE_URI
),
default=MEMFAULT_DEFAULT_CHUNKS_BASE_URI,
)
parser.add_argument(
"--device-serial",
"-ds",
help="Device Serial to post chunks as (default: {})".format(
self.DEFAULT_CHUNK_DEVICE_SERIAL
),
default=self.DEFAULT_CHUNK_DEVICE_SERIAL,
)
parser.add_argument(
"--verbose",
help="Prints a summary every time a chunk is posted instead of just on failures",
action="store_true",
)
args = list(filter(None, unicode_args.split(" ")))
return parser.parse_args(args)
class MemfaultCoredump(MemfaultGdbCommand):
"""Captures a coredump from the target and uploads it to Memfault for analysis"""
ALPHANUM_SLUG_DOTS_COLON_REGEX = r"^[-a-zA-Z0-9_\.\+:]+$"
ALPHANUM_SLUG_DOTS_COLON_SPACES_PARENS_SLASH_COMMA_REGEX = r"^[-a-zA-Z0-9_\.\+: \(\)\[\]/,]+$"
DEFAULT_CORE_DUMP_HARDWARE_REVISION = "DEVBOARD"
DEFAULT_CORE_DUMP_SERIAL_NUMBER = "DEMOSERIALNUMBER"
DEFAULT_CORE_DUMP_SOFTWARE_TYPE = "main"
DEFAULT_CORE_DUMP_SOFTWARE_VERSION = "1.0.0"
GDB_CMD = "memfault coredump"
def _check_permission(self, analytics_props):
settings = settings_load()
key = "coredump.allow"
allowed = settings.get(key, False)
if allowed:
analytics_props["permission"] = "accepted-stored"
return True
y = MEMFAULT_CONFIG.prompt("""
You are about to capture a coredump from the attached target.
This means that memory contents of the target will be captured
and sent to Memfault's web server for analysis. The currently
loaded binary file (.elf) will be sent as well, because it
contains debugging information (symbols) that are needed for
Memfault's analysis to work correctly.
Memfault will never share your data, coredumps, binary files (.elf)
or other proprietary information with other companies or anyone else.
Proceed? [y/n]
""") # This last newline is important! If it's not here, the last line is not shown on Windows!
if "Y" not in y.upper():
print("Aborting...")
analytics_props["user_input"] = y
analytics_props["permission"] = "rejected"
return False
analytics_props["permission"] = "accepted"
settings[key] = True
settings_save(settings)
return True
def _invoke(self, unicode_args, from_tty, analytics_props, config):
if not self._check_permission(analytics_props):
return
try:
parsed_args = self.parse_args(unicode_args, config)
except MemfaultGdbArgumentParseError:
return
can_make_project_api_request = config.can_make_project_api_request()
analytics_props["has_project_key"] = bool(parsed_args.project_key)
analytics_props["can_make_project_api_request"] = bool(can_make_project_api_request)
analytics_props["regions"] = len(parsed_args.region) if parsed_args.region else 0
analytics_props["no_symbols"] = bool(parsed_args.no_symbols)
analytics_props["api_uri"] = config.api_uri
analytics_props["ingress_uri"] = config.ingress_uri
if not can_make_project_api_request and not parsed_args.project_key:
print(
"Cannot post coredump. Please specify either --project-key or"
" use `memfault login` and specify the project and organization.\n"
"See `memfault login --help` for more information"
)
ANALYTICS.error("Missing login or key")
return
cd_writer, elf_fn = self.build_coredump_writer(parsed_args, analytics_props)
if not cd_writer:
return
elf_hash = get_file_hash(elf_fn)
# TODO: try calling memfault_platform_get_device_info() and use returned info if present
# Populate the version based on hash of the .elf for now:
software_version = cd_writer.software_version + "-md5+{}".format(elf_hash[:8])
cd_writer.software_version = software_version
if not parsed_args.no_symbols:
if can_make_project_api_request:
upload_symbols_if_needed(config, elf_fn, cd_writer.software_type, software_version)
else:
print("Skipping symbols upload because not logged in. Hint: use `memfault login`")
if parsed_args.project_key:
project_key = parsed_args.project_key
else:
assert can_make_project_api_request
project_key, status_and_reason = http_get_project_key(config)
if not project_key:
error = "Failed to get Project Key: {}".format(status_and_reason)
analytics_props["error"] = error
ANALYTICS.error(error)
print(error)
return
with TemporaryFile() as f_out:
cd_writer.write(f_out)
f_out.seek(0)
status, reason = http_post_coredump(f_out, project_key, parsed_args.ingress_uri)
analytics_props["http_status"] = status
if status == 409:
print("Coredump already exists!")
elif status >= 200 and status < 300:
print("Coredump uploaded successfully!")
print("Once it has been processed, it will appear here:")
# TODO: Print direct link to trace
# MFLT-461
print(_infer_issues_html_url(parsed_args.ingress_uri, config))
else:
print("Error occurred... HTTP Status {} {}".format(status, reason))
def parse_args(self, unicode_args, config):
parser = MemfaultGdbArgumentParser(description=MemfaultCoredump.__doc__)
parser.add_argument(
"source",
help=(
"Source of coredump: 'live' (default) or 'storage' (target's live RAM or stored"
" coredump)"
),
default="live",
choices=["live", "storage"],
nargs="?",
)
parser.add_argument("--project-key", "-pk", help="Project Key", default=None)
parser.add_argument(
"--no-symbols",
"-ns",
help="Do not upload symbol (.elf) automatically if missing",
default=False,
action="store_true",
)
def _auto_int(x):
try:
return int(x, 0)
except ValueError:
return int(gdb.parse_and_eval(x))
parser.add_argument(
"--region",
"-r",
help=(
"Specify memory region and size in bytes to capture. This option can be passed"
" multiple times. Example: `-r 0x20000000 1024 0x80000000 512` will capture 1024"
" bytes starting at 0x20000000. When no regions are passed, the command will"
" attempt to infer what to capture based on the sections in the loaded .elf."
),
type=_auto_int,
nargs=2,
action="append",
)
def _character_check(regex, name):
def _check_inner(input):
pattern = re.compile(regex)
if not pattern.match(input):
raise argparse.ArgumentTypeError(
"Invalid characters in {}: {}.".format(name, input)
)
return input
return _check_inner
parser.add_argument(
"--device-serial",
type=_character_check(self.ALPHANUM_SLUG_DOTS_COLON_REGEX, "device serial"),
help=(
"Overrides the device serial that will be reported in the core dump. (default: {})".format(
self.DEFAULT_CORE_DUMP_SERIAL_NUMBER
)
),
default=self.DEFAULT_CORE_DUMP_SERIAL_NUMBER,
)
parser.add_argument(
"--software-type",
type=_character_check(self.ALPHANUM_SLUG_DOTS_COLON_REGEX, "software type"),
help=(
"Overrides the software type that will be reported in the core dump. (default: {})".format(
self.DEFAULT_CORE_DUMP_SOFTWARE_TYPE
)
),
default=self.DEFAULT_CORE_DUMP_SOFTWARE_TYPE,
)
parser.add_argument(
"--software-version",
type=_character_check(
self.ALPHANUM_SLUG_DOTS_COLON_SPACES_PARENS_SLASH_COMMA_REGEX, "software version"
),
help=(
"Overrides the software version that will be reported in the core dump."
" (default: {})".format(self.DEFAULT_CORE_DUMP_SOFTWARE_VERSION)
),
default=self.DEFAULT_CORE_DUMP_SOFTWARE_VERSION,
)
parser.add_argument(
"--hardware-revision",
type=_character_check(self.ALPHANUM_SLUG_DOTS_COLON_REGEX, "hardware revision"),
help=(
"Overrides the hardware revision that will be reported in the core dump."
" (default: {})".format(self.DEFAULT_CORE_DUMP_HARDWARE_REVISION)
),
default=self.DEFAULT_CORE_DUMP_HARDWARE_REVISION,
)
return populate_config_args_and_parse_args(parser, unicode_args, config)
@staticmethod
def _get_arch(current_arch, analytics_props):
if "arm" in current_arch:
return ArmCortexMCoredumpArch()
if "xtensa" in current_arch:
target = gdb.execute("monitor target current", to_string=True)
if "esp32" in target:
return XtensaCoredumpArch()
analytics_props["xtensa_target"] = target
return None
def build_coredump_writer(self, parsed_args, analytics_props):
# inferior.architecture() is a relatively new API, so let's use "show arch" instead:
show_arch_output = gdb.execute("show arch", to_string=True).lower()
current_arch_matches = re.search("currently ([^)]+)", show_arch_output)
if current_arch_matches:
# Using groups() here instead of fn_match[1] for python2.x compatibility
current_arch = current_arch_matches.groups()[0]
# Should tell us about different arm flavors:
analytics_props["arch"] = current_arch
arch = self._get_arch(current_arch, analytics_props)
if arch is None:
print("This command is currently only supported for ARM and XTENSA targets!")
analytics_props["error"] = "Unsupported architecture"
return None, None
else:
error = "show arch has unexpected output"
analytics_props["error"] = error
ANALYTICS.error(error, info=show_arch_output)
return None, None
inferior = gdb.selected_inferior()
# Make sure thread info has been requested before doing anything else:
gdb.execute("info threads", to_string=True)
threads = inferior.threads()
if not inferior:
print("No target!? Make sure to attach to a target first.")
analytics_props["error"] = "No target"
return None, None
if len(threads) == 0:
print("No target!? Make sure to attach to a target first.")
analytics_props["error"] = "No threads"
return None, None
print("One moment please, capturing memory...")
# [MT]: How to figure out which thread/context the CPU is actually in?
# JLink nor OpenOCD do not seem to be setting thread.is_running() correctly.
# print("Note: for correct results, do not switch threads!")
try:
thread = gdb.selected_thread()
except SystemError as e:
# Exception can be raised if selected thread has disappeared in the mean time
# SystemError: could not find gdb thread object
print(e)
thread = None
if not thread:
print("Did not find any threads!?")
analytics_props["error"] = "Failed to activate thread"
return None, None
info_sections_output = gdb.execute("maintenance info sections", to_string=True)
elf_fn, sections = parse_maintenance_info_sections(info_sections_output)
if elf_fn is None or sections is None:
print("""Could not find file and sections.
This command requires that you use the 'load' command to load a binary/symbol (.elf) file first""")
error = "Failed to parse sections"
analytics_props["error"] = error
ANALYTICS.error(error, info=info_sections_output)
return None, None
has_debug_info = any(map(is_debug_info_section, sections))
analytics_props["has_debug_info"] = has_debug_info
if not has_debug_info:
print("WARNING: no debug info sections found!")
print(
"Hints: did you compile with -g or similar flags? did you inadvertently strip the"
" binary?"
)
cd_writer = MemfaultCoredumpWriter(
arch,
parsed_args.device_serial,
parsed_args.software_type,
parsed_args.software_version,
parsed_args.hardware_revision,
)
cd_writer.regs = arch.get_current_registers(thread, analytics_props)
arch.add_platform_specific_sections(cd_writer, inferior, analytics_props)
regions = parsed_args.region
if regions is None:
print(
"No capturing regions were specified; will default to capturing the first 1MB for"
" each used RAM address range."
)
print(
"Tip: optionally, you can use `--region <addr> <size>` to manually specify"
" capturing regions and increase capturing speed."
)
regions = arch.guess_ram_regions(sections)
for addr, size in regions:
print("Capturing RAM @ 0x{:x} ({} bytes)...".format(addr, size))
data = read_memory_until_error(inferior, addr, size)
section = Section(addr, len(data), "RAM")
section.data = data
cd_writer.add_section(section)
print("Captured RAM @ 0x{:x} ({} bytes)".format(section.addr, section.size))
return cd_writer, elf_fn
class MemfaultLogin(MemfaultGdbCommand):
"""Authenticate the current GDB session with Memfault"""
GDB_CMD = "memfault login"
def _invoke(self, unicode_args, from_tty, analytics_props, config):
try:
parsed_args = self.parse_args(unicode_args)
except MemfaultGdbArgumentParseError:
return
api_uri = parsed_args.api_uri
ingress_uri = parsed_args.ingress_uri
status, reason, json_body = http_get_auth_me(
api_uri, parsed_args.email, parsed_args.password
)
analytics_props["http_status"] = status
analytics_props["api_uri"] = api_uri
analytics_props["ingress_uri"] = ingress_uri
if status >= 200 and status < 300:
config.api_uri = MEMFAULT_CONFIG.api_uri = api_uri
config.email = MEMFAULT_CONFIG.email = parsed_args.email
config.ingress_uri = MEMFAULT_CONFIG.ingress_uri = ingress_uri
config.organization = MEMFAULT_CONFIG.organization = parsed_args.organization
config.password = MEMFAULT_CONFIG.password = parsed_args.password
config.project = MEMFAULT_CONFIG.project = parsed_args.project
config.user_id = MEMFAULT_CONFIG.user_id = json_body["id"]
print("Authenticated successfully!")
elif status == 404:
print("Login failed. Make sure you have entered the email and password/key correctly.")
else:
print("Error occurred... HTTP Status {} {}".format(status, reason))
def parse_args(self, unicode_args):
parser = MemfaultGdbArgumentParser(description=MemfaultLogin.__doc__)
parser.add_argument(
"email", help="The username (email address) of the user to authenticate"
)
parser.add_argument(
"password", help="The user API key or password of the user to authenticate"
)
parser.add_argument(
"--organization", "-o", help="Default organization (slug) to use", default=None
)
parser.add_argument("--project", "-p", help="Default project (slug) to use", default=None)
parser.add_argument(
"--api-uri",
help="Default API base URI to use (default: {})".format(MEMFAULT_DEFAULT_API_BASE_URI),
default=MEMFAULT_DEFAULT_API_BASE_URI,
)
parser.add_argument(
"--ingress-uri",
help="Default ingress base URI to use (default: {})".format(
MEMFAULT_DEFAULT_INGRESS_BASE_URI
),
default=MEMFAULT_DEFAULT_INGRESS_BASE_URI,
)
args = list(filter(None, unicode_args.split(" ")))
return parser.parse_args(args)
Memfault()
MemfaultCoredump()
MemfaultLogin()
MemfaultPostChunk()
class AnalyticsTracker(Thread):
def __init__(self):
super(AnalyticsTracker, self).__init__()
self._queue = Queue()
def track(self, event_name, event_properties=None, user_id=None):
# Put in queue to offload to background thread, to avoid slowing down the GDB commands
self._queue.put((event_name, event_properties, user_id))
def log(self, level, type, **kwargs): # noqa: A002
props = dict(**kwargs)
props["type"] = type
props["level"] = level
self.track("Log", props)
def error(self, type, info=None): # noqa: A002
self.log("error", type, info=info)
def _is_analytics_disabled(self):
settings = settings_load()
key = "analytics.disabled"
return settings.get(key, False)
def _is_unittest(self):
# Pretty gross, but because importing this script has side effects (hitting the analytics endpoint),
# mocking gets pretty tricky. Use this check to disable analytics for unit and integration tests
return (
hasattr(gdb, "MEMFAULT_MOCK_IMPLEMENTATION")
or os.environ.get("MEMFAULT_DISABLE_ANALYTICS", None) is not None
)
def run(self):
# uuid.getnode() is based on the mac address, so should be stable across multiple sessions on the same machine:
anonymous_id = md5(uuid.UUID(int=uuid.getnode()).bytes).hexdigest()
while True:
try:
event_name, event_properties, user_id = self._queue.get()
if self._is_analytics_disabled() or self._is_unittest():
continue
if event_properties is None:
event_properties = {}
conn = HTTPSConnection("api.segment.io")
body = {
"anonymousId": anonymous_id,
"event": event_name,
"properties": event_properties,
}
if user_id is not None:
body["userId"] = "user{}".format(user_id)
headers = {
"Content-Type": "application/json",
"Authorization": "Basic RFl3Q0E1TENRTW5Uek95ajk5MTJKRjFORTkzMU5yb0o6",
}
conn.request("POST", "/v1/track", body=dumps(body), headers=headers)
response = conn.getresponse()
response.read()
# Throttle a bit
sleep(0.2)
except Exception: # noqa: S110
pass # Never fail due to analytics requests erroring out
ANALYTICS = AnalyticsTracker()
ANALYTICS.daemon = True
ANALYTICS.start()
def _track_script_sourced():
system, _, release, version, machine, processor = platform.uname()
try:
from platform import mac_ver
mac_version = mac_ver()[0]
except Exception:
mac_version = ""
try:
gdb_version = gdb.execute("show version", to_string=True).strip().split("\n")[0]
except Exception:
gdb_version = ""
ANALYTICS.track(
"Script sourced",
# TODO: MFLT-497 -- properly version this
{
"version": "1",
"python": platform.python_version(),
"uname_system": system,
"uname_release": release,
"uname_version": version,
"uname_machine": machine,
"uname_processor": processor,
"mac_version": mac_version,
"gdb_version": gdb_version,
},
)
_track_script_sourced()
print("Memfault Commands Initialized successfully.")