blob: b08c21c3dae5c955f7fcec9c2ec41dd0f1c980e3 [file] [log] [blame]
/* **********************************************************
* Copyright (c) 2022-2024 Google, Inc. All rights reserved.
* **********************************************************/
/*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* * Neither the name of Google, Inc. nor the names of its contributors may be
* used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL VMWARE, INC. OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
* DAMAGE.
*/
#include "record_filter.h"
#include <inttypes.h>
#include <stdint.h>
#include <cstdio>
#include <fstream>
#include <iostream>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#ifdef HAS_ZLIB
# include "common/gzip_ostream.h"
#endif
#ifdef HAS_ZIP
# include "common/zipfile_ostream.h"
#endif
#include "memref.h"
#include "memtrace_stream.h"
#include "raw2trace_shared.h"
#include "trace_entry.h"
#include "utils.h"
#include "null_filter.h"
#include "cache_filter.h"
#include "trim_filter.h"
#include "type_filter.h"
#include "encodings2regdeps_filter.h"
#include "func_id_filter.h"
#undef VPRINT
#ifdef DEBUG
# define VPRINT(reader, level, ...) \
do { \
if ((reader)->verbosity_ >= (level)) { \
fprintf(stderr, "%s ", (reader)->output_prefix_); \
fprintf(stderr, __VA_ARGS__); \
} \
} while (0)
// clang-format off
# define UNUSED(x) /* nothing */
// clang-format on
#else
# define VPRINT(reader, level, ...) /* nothing */
# define UNUSED(x) ((void)(x))
#endif
namespace dynamorio {
namespace drmemtrace {
namespace {
template <typename T>
std::vector<T>
parse_string(const std::string &s, char sep = ',')
{
size_t pos, at = 0;
if (s.empty())
return {};
std::vector<T> vec;
do {
pos = s.find(sep, at);
unsigned long long parsed_number = std::stoull(s.substr(at, pos));
// XXX: parsed_number may be truncated if T is not large enough.
// We could check that parsed_number is within the limits of T using
// std::numeric_limits<>::min()/max(), but this returns 0 on T that are enums,
// which we have when parsing trace_marker_type_t and trace_type_t values for
// type_filter. In order to make numeric_limits work on enum, we need to add
// std::underlying_type support to these enums.
// We also need to consider what should happen when T is not large enough to
// contain parsed_number. Should we skip that value? Output a warning? Output an
// error and abort?
vec.push_back(static_cast<T>(parsed_number));
at = pos + 1;
} while (pos != std::string::npos);
return vec;
}
} // namespace
record_analysis_tool_t *
record_filter_tool_create(const std::string &output_dir, uint64_t stop_timestamp,
int cache_filter_size, const std::string &remove_trace_types,
const std::string &remove_marker_types,
uint64_t trim_before_timestamp, uint64_t trim_after_timestamp,
bool encodings2regdeps, const std::string &keep_func_ids,
unsigned int verbose)
{
std::vector<
std::unique_ptr<dynamorio::drmemtrace::record_filter_t::record_filter_func_t>>
filter_funcs;
if (cache_filter_size > 0) {
filter_funcs.emplace_back(
std::unique_ptr<dynamorio::drmemtrace::record_filter_t::record_filter_func_t>(
// XXX: add more command-line options to allow the user to set these
// parameters.
new dynamorio::drmemtrace::cache_filter_t(
/*cache_associativity=*/1, /*cache_line_size=*/64, cache_filter_size,
/*filter_data=*/true, /*filter_instrs=*/false)));
}
if (!remove_trace_types.empty() || !remove_marker_types.empty()) {
std::vector<trace_type_t> filter_trace_types =
parse_string<trace_type_t>(remove_trace_types);
std::vector<trace_marker_type_t> filter_marker_types =
parse_string<trace_marker_type_t>(remove_marker_types);
filter_funcs.emplace_back(
std::unique_ptr<dynamorio::drmemtrace::record_filter_t::record_filter_func_t>(
new dynamorio::drmemtrace::type_filter_t(filter_trace_types,
filter_marker_types)));
}
if (trim_before_timestamp > 0 || trim_after_timestamp > 0) {
filter_funcs.emplace_back(
std::unique_ptr<dynamorio::drmemtrace::record_filter_t::record_filter_func_t>(
new dynamorio::drmemtrace::trim_filter_t(trim_before_timestamp,
trim_after_timestamp)));
}
if (encodings2regdeps) {
filter_funcs.emplace_back(
std::unique_ptr<dynamorio::drmemtrace::record_filter_t::record_filter_func_t>(
new dynamorio::drmemtrace::encodings2regdeps_filter_t()));
}
if (!keep_func_ids.empty()) {
std::vector<uint64_t> keep_func_ids_list = parse_string<uint64_t>(keep_func_ids);
filter_funcs.emplace_back(
std::unique_ptr<dynamorio::drmemtrace::record_filter_t::record_filter_func_t>(
new dynamorio::drmemtrace::func_id_filter_t(keep_func_ids_list)));
}
// TODO i#5675: Add other filters.
return new dynamorio::drmemtrace::record_filter_t(output_dir, std::move(filter_funcs),
stop_timestamp, verbose);
}
record_filter_t::record_filter_t(
const std::string &output_dir,
std::vector<std::unique_ptr<record_filter_func_t>> filters, uint64_t stop_timestamp,
unsigned int verbose)
: output_dir_(output_dir)
, filters_(std::move(filters))
, stop_timestamp_(stop_timestamp)
, verbosity_(verbose)
{
UNUSED(verbosity_);
UNUSED(output_prefix_);
}
record_filter_t::~record_filter_t()
{
for (auto &iter : shard_map_) {
delete iter.second;
}
}
bool
record_filter_t::parallel_shard_supported()
{
return true;
}
std::string
record_filter_t::initialize_shard_type(shard_type_t shard_type)
{
shard_type_ = shard_type;
return "";
}
std::string
record_filter_t::get_output_basename(memtrace_stream_t *shard_stream)
{
if (shard_type_ == SHARD_BY_CORE) {
return output_dir_ + DIRSEP + "drmemtrace.core." +
std::to_string(shard_stream->get_shard_index()) + ".trace";
} else {
return output_dir_ + DIRSEP + shard_stream->get_stream_name();
}
}
std::string
record_filter_t::initialize_shard_output(per_shard_t *per_shard,
memtrace_stream_t *shard_stream)
{
if (shard_type_ == SHARD_BY_CORE) {
// Each output is a mix of inputs so we do not want to reuse the input
// names with tids.
// Since some shards may not have inputs, we need to synchronize determining
// the file extension.
// First, get our path without the extension, so we can add it later.
per_shard->output_path = get_output_basename(shard_stream);
std::string input_name = shard_stream->get_stream_name();
// Now synchronize determining the extension.
auto lock = std::unique_lock<std::mutex>(input_info_mutex_);
if (!output_ext_.empty()) {
VPRINT(this, 2,
"Shard #%d using pre-set ext=%s, ver=%" PRIu64 ", type=%" PRIu64 "\n",
shard_stream->get_shard_index(), output_ext_.c_str(), version_,
filetype_);
per_shard->output_path += output_ext_;
per_shard->filetype = static_cast<addr_t>(filetype_);
lock.unlock();
} else if (!input_name.empty()) {
size_t last_dot = input_name.rfind('.');
if (last_dot == std::string::npos)
return "Failed to determine filename type from extension";
output_ext_ = input_name.substr(last_dot);
// Set the other key input data.
version_ = shard_stream->get_version();
filetype_ = add_to_filetype(shard_stream->get_filetype());
if (version_ == 0) {
// We give up support for version 0 to have an up-front error check
// rather than having some output files with bad headers (i#6721).
return "Version not available at shard init time";
}
VPRINT(this, 2,
"Shard #%d setting ext=%s, ver=%" PRIu64 ", type=%" PRIu64 "\n",
shard_stream->get_shard_index(), output_ext_.c_str(), version_,
filetype_);
per_shard->output_path += output_ext_;
per_shard->filetype = static_cast<addr_t>(filetype_);
lock.unlock();
input_info_cond_var_.notify_all();
} else {
// We have to wait for another shard with an input to set output_ext_.
input_info_cond_var_.wait(lock, [this] { return !output_ext_.empty(); });
VPRINT(this, 2,
"Shard #%d waited for ext=%s, ver=%" PRIu64 ", type=%" PRIu64 "\n",
shard_stream->get_shard_index(), output_ext_.c_str(), version_,
filetype_);
per_shard->output_path += output_ext_;
per_shard->filetype = static_cast<addr_t>(filetype_);
lock.unlock();
}
} else {
per_shard->output_path = get_output_basename(shard_stream);
}
return "";
}
std::string
record_filter_t::get_writer(per_shard_t *per_shard, memtrace_stream_t *shard_stream)
{
if (per_shard->output_path.empty())
return "Error: output_path is empty";
#ifdef HAS_ZLIB
if (ends_with(per_shard->output_path, ".gz")) {
VPRINT(this, 3, "Using the gzip writer for %s\n", per_shard->output_path.c_str());
per_shard->file_writer =
std::unique_ptr<std::ostream>(new gzip_ostream_t(per_shard->output_path));
per_shard->writer = per_shard->file_writer.get();
return "";
}
#endif
#ifdef HAS_ZIP
if (ends_with(per_shard->output_path, ".zip")) {
VPRINT(this, 3, "Using the zip writer for %s\n", per_shard->output_path.c_str());
per_shard->archive_writer = std::unique_ptr<archive_ostream_t>(
new zipfile_ostream_t(per_shard->output_path));
per_shard->writer = per_shard->archive_writer.get();
return open_new_chunk(per_shard);
}
#endif
VPRINT(this, 3, "Using the default writer for %s\n", per_shard->output_path.c_str());
per_shard->file_writer = std::unique_ptr<std::ostream>(
new std::ofstream(per_shard->output_path, std::ofstream::binary));
per_shard->writer = per_shard->file_writer.get();
return "";
}
std::string
record_filter_t::remove_output_file(per_shard_t *per_shard)
{
VPRINT(this, 1, "Removing zero-instruction file %s for tid %" PRId64 "\n",
per_shard->output_path.c_str(), per_shard->tid);
if (std::remove(per_shard->output_path.c_str()) != 0)
return "Failed to remove zero-instruction file " + per_shard->output_path;
return "";
}
std::string
record_filter_t::emit_marker(per_shard_t *shard, unsigned short marker_type,
uint64_t marker_value)
{
trace_entry_t marker;
marker.type = TRACE_TYPE_MARKER;
marker.size = marker_type;
marker.addr = static_cast<addr_t>(marker_value);
if (!write_trace_entry(shard, marker))
return "Failed to write marker";
return "";
}
std::string
record_filter_t::open_new_chunk(per_shard_t *shard)
{
VPRINT(this, 1, "Opening new chunk #%" PRIu64 "\n", shard->chunk_ordinal);
std::string err;
if (shard->chunk_ordinal > 0) {
err =
emit_marker(shard, TRACE_MARKER_TYPE_CHUNK_FOOTER, shard->chunk_ordinal - 1);
if (!err.empty())
return err;
}
std::ostringstream stream;
stream << TRACE_CHUNK_PREFIX << std::setfill('0') << std::setw(4)
<< shard->chunk_ordinal;
err = shard->archive_writer->open_new_component(stream.str());
if (!err.empty())
return err;
if (shard->chunk_ordinal > 0) {
// XXX i#6593: This sequence is currently duplicated with
// raw2trace_t::emit_new_chunk_header(). Could we share it?
err = emit_marker(shard, TRACE_MARKER_TYPE_RECORD_ORDINAL, shard->cur_refs);
if (!err.empty())
return err;
err = emit_marker(shard, TRACE_MARKER_TYPE_TIMESTAMP, shard->last_timestamp);
if (!err.empty())
return err;
err = emit_marker(shard, TRACE_MARKER_TYPE_CPU_ID, shard->last_cpu_id);
if (!err.empty())
return err;
// We need to re-emit all encodings.
shard->cur_chunk_pcs.clear();
}
++shard->chunk_ordinal;
shard->cur_chunk_instrs = 0;
return "";
}
std::string
record_filter_t::initialize_stream(memtrace_stream_t *serial_stream)
{
dcontext_.dcontext = dr_standalone_init();
return "";
}
void *
record_filter_t::parallel_shard_init_stream(int shard_index, void *worker_data,
memtrace_stream_t *shard_stream)
{
auto per_shard = new per_shard_t;
std::string error = initialize_shard_output(per_shard, shard_stream);
if (!error.empty()) {
per_shard->error = "Failure initializing output: " + error;
success_ = false;
return reinterpret_cast<void *>(per_shard);
}
error = get_writer(per_shard, shard_stream);
if (!error.empty()) {
per_shard->error = "Failure in opening writer: " + error;
success_ = false;
return reinterpret_cast<void *>(per_shard);
}
if (per_shard->writer == nullptr) {
per_shard->error = "Could not open a writer for " + per_shard->output_path;
success_ = false;
return reinterpret_cast<void *>(per_shard);
}
per_shard->shard_stream = shard_stream;
per_shard->enabled = true;
per_shard->input_entry_count = 0;
per_shard->output_entry_count = 0;
per_shard->tid = shard_stream->get_tid();
if (shard_type_ == SHARD_BY_CORE) {
per_shard->memref_counter.set_core_sharded(true);
}
for (auto &f : filters_) {
per_shard->filter_shard_data.push_back(
f->parallel_shard_init(shard_stream, stop_timestamp_ != 0));
if (f->get_error_string() != "") {
per_shard->error =
"Failure in initializing filter function " + f->get_error_string();
success_ = false;
}
}
per_shard->record_filter_info.last_encoding = &per_shard->last_encoding;
per_shard->record_filter_info.dcontext = dcontext_.dcontext;
std::lock_guard<std::mutex> guard(shard_map_mutex_);
shard_map_[shard_index] = per_shard;
return reinterpret_cast<void *>(per_shard);
}
bool
record_filter_t::parallel_shard_exit(void *shard_data)
{
per_shard_t *per_shard = reinterpret_cast<per_shard_t *>(shard_data);
bool res = true;
for (int i = 0; i < static_cast<int>(filters_.size()); ++i) {
if (!filters_[i]->parallel_shard_exit(per_shard->filter_shard_data[i]))
res = false;
}
if (per_shard->last_written_record.type != TRACE_TYPE_FOOTER) {
// When core-sharded some cores can end in TRACE_TYPE_IDLE.
// i#6703: The scheduler should add this footer for us.
trace_entry_t footer = {};
footer.type = TRACE_TYPE_FOOTER;
if (!write_trace_entry(per_shard, footer)) {
per_shard->error = "Failed to write footer";
return false;
}
}
// Destroy the writer since we do not need it anymore. This also makes sure
// that data is written out to the file; curiously, a simple flush doesn't
// do it.
per_shard->file_writer.reset(nullptr);
per_shard->archive_writer.reset(nullptr);
per_shard->writer = nullptr;
// If the shard ended up with no instructions, delete it (otherwise the
// invariant checker complains).
VPRINT(this, 2, "shard %s chunk=%" PRIu64 " cur-instrs=%" PRIu64 "\n",
per_shard->output_path.c_str(), per_shard->chunk_ordinal,
per_shard->cur_chunk_instrs);
if (!TESTANY(OFFLINE_FILE_TYPE_FILTERED | OFFLINE_FILE_TYPE_IFILTERED,
per_shard->filetype) &&
// chunk_ordinal is 1 after the init-time call for archives; it
// remains 0 for non-archives.
per_shard->chunk_ordinal <= 1 && per_shard->cur_chunk_instrs == 0 &&
// Leave a core-sharded completely-idle file.
shard_type_ != SHARD_BY_CORE) {
// Mark for removal. We delay removal in case it involves global
// operations that might race with other workers.
per_shard->now_empty = true;
}
return res;
}
std::string
record_filter_t::parallel_shard_error(void *shard_data)
{
per_shard_t *per_shard = reinterpret_cast<per_shard_t *>(shard_data);
return per_shard->error;
}
bool
record_filter_t::write_trace_entry(per_shard_t *shard, const trace_entry_t &entry)
{
if (shard->output_entry_count == 0 && entry.type != TRACE_TYPE_HEADER) {
// When core-sharded with initially-idle cores we can start without a header.
// XXX i#6703: The scheduler should insert these headers for us, as this
// issue can affect other tools as well.
// Our own stream's version + filetype are 0 so we use another shard's.
std::lock_guard<std::mutex> guard(input_info_mutex_);
std::vector<trace_entry_t> header;
header.push_back({ TRACE_TYPE_HEADER, 0, { static_cast<addr_t>(version_) } });
header.push_back({ TRACE_TYPE_MARKER,
TRACE_MARKER_TYPE_VERSION,
{ static_cast<addr_t>(version_) } });
header.push_back({ TRACE_TYPE_MARKER,
TRACE_MARKER_TYPE_FILETYPE,
{ static_cast<addr_t>(filetype_) } });
// file_reader_t::open_input_file demands tid+pid so we insert sentinel values.
// We can't use INVALID_THREAD_ID as scheduler_t::open_reader() loops until
// record_type_has_tid() which requires record.marker.tid != INVALID_THREAD_ID.
header.push_back({ TRACE_TYPE_THREAD,
sizeof(thread_id_t),
{ static_cast<addr_t>(IDLE_THREAD_ID) } });
header.push_back({ TRACE_TYPE_PID,
sizeof(process_id_t),
{ static_cast<addr_t>(INVALID_PID) } });
// The scheduler itself demands a timestamp,cpuid pair.
// We don't have a good value to use here though:
// XXX i#6703: The scheduler should insert these for us.
// As-is, these can cause confusion with -1 values, but this is our best
// effort support until i#6703.
header.push_back({ TRACE_TYPE_MARKER,
TRACE_MARKER_TYPE_TIMESTAMP,
{ static_cast<addr_t>(-1) } });
header.push_back(
{ TRACE_TYPE_MARKER, TRACE_MARKER_TYPE_CPU_ID, { static_cast<addr_t>(-1) } });
if (!write_trace_entries(shard, header)) {
shard->error += "Failed to write synthetic header";
return false;
}
}
if (!shard->writer->write((char *)&entry, sizeof(entry))) {
shard->error = "Failed to write to output file " + shard->output_path;
success_ = false;
return false;
}
shard->cur_refs += shard->memref_counter.entry_memref_count(&entry);
++shard->output_entry_count;
shard->last_written_record = entry;
return true;
}
bool
record_filter_t::write_trace_entries(per_shard_t *shard,
const std::vector<trace_entry_t> &entries)
{
for (const trace_entry_t &entry : entries) {
if (!write_trace_entry(shard, entry))
return false;
}
return true;
}
std::string
record_filter_t::process_markers(per_shard_t *per_shard, trace_entry_t &entry,
bool &output)
{
if (entry.type == TRACE_TYPE_MARKER) {
switch (entry.size) {
case TRACE_MARKER_TYPE_CHUNK_INSTR_COUNT:
per_shard->chunk_size = entry.addr;
break;
case TRACE_MARKER_TYPE_FILETYPE:
entry.addr = static_cast<addr_t>(add_to_filetype(entry.addr));
per_shard->filetype = entry.addr;
break;
case TRACE_MARKER_TYPE_CHUNK_FOOTER:
// We insert ourselves in open_new_chunk().
output = false;
break;
case TRACE_MARKER_TYPE_RECORD_ORDINAL:
// We insert ourselves in open_new_chunk().
per_shard->input_count_at_ordinal = per_shard->input_entry_count;
output = false;
break;
case TRACE_MARKER_TYPE_TIMESTAMP:
if (output)
per_shard->last_timestamp = entry.addr;
// We insert our own start-of-chunk timestamp.
if (per_shard->archive_writer &&
per_shard->input_entry_count - per_shard->input_count_at_ordinal == 1)
output = false;
break;
case TRACE_MARKER_TYPE_CPU_ID:
if (output)
per_shard->last_cpu_id = entry.addr;
// We insert our own start-of-chunk cpuid.
if (per_shard->archive_writer &&
per_shard->input_entry_count - per_shard->input_count_at_ordinal == 2)
output = false;
if (output) {
uint64_t instr_ord = per_shard->cur_chunk_instrs +
// For archives we increment chunk_ordinal up front.
(per_shard->archive_writer ? per_shard->chunk_ordinal - 1
: per_shard->chunk_ordinal) *
per_shard->chunk_size;
per_shard->sched_info.record_cpu_id(per_shard->tid, entry.addr,
per_shard->last_timestamp, instr_ord);
}
break;
case TRACE_MARKER_TYPE_PHYSICAL_ADDRESS:
case TRACE_MARKER_TYPE_PHYSICAL_ADDRESS_NOT_AVAILABLE:
if (!output && per_shard->archive_writer) {
// TODO i#6654: These markers need to be repeated across chunks. Even
// raw2trace doesn't support this yet: once we add it there we can add it
// here or try to share code.
return "Removing physical address markers from archive output is not yet "
"supported";
}
break;
case TRACE_MARKER_TYPE_CORE_WAIT:
// These are artificial timing records: do not output them, nor consider
// them real input records.
output = false;
--per_shard->input_entry_count;
break;
}
}
return "";
}
std::string
record_filter_t::process_chunk_encodings(per_shard_t *per_shard, trace_entry_t &entry,
bool output)
{
if (!per_shard->archive_writer ||
!is_any_instr_type(static_cast<trace_type_t>(entry.type)))
return "";
if (!per_shard->last_encoding.empty()) {
if (per_shard->per_input == nullptr)
return "Invalid input id for instruction";
std::lock_guard<std::mutex> guard(per_shard->per_input->lock);
per_shard->per_input->pc2encoding[entry.addr] = per_shard->last_encoding;
// Disable the just-delayed encoding output in process_delayed_encodings() if
// this is what used to be a new-chunk encoding but is no longer.
if (per_shard->cur_chunk_pcs.find(entry.addr) != per_shard->cur_chunk_pcs.end()) {
VPRINT(this, 3, "clearing new-chunk last encoding @pc=0x%zx\n", entry.addr);
per_shard->last_encoding.clear();
}
} else if (output) {
// Insert the cached encoding if this is the first instance of this PC
// (without an encoding) in this chunk, unless the user is removing all encodings.
// XXX: What if there is a filter removing all encodings but only
// to the stop point, so a partial remove that does not change
// the filetype? For now we do not support that, and we re-add
// encodings at chunk boundaries regardless. Note that filters that modify
// encodings (even if they add or remove trace_entry_t records) do not incur in
// this problem and we don't need support for partial removal of encodings in this
// case. An example of such filters is encodings2regdeps_filter_t.
if (TESTANY(OFFLINE_FILE_TYPE_ENCODINGS, per_shard->filetype) &&
per_shard->cur_chunk_pcs.find(entry.addr) == per_shard->cur_chunk_pcs.end()) {
if (per_shard->per_input == nullptr)
return "Invalid input id for instruction";
std::lock_guard<std::mutex> guard(per_shard->per_input->lock);
if (per_shard->per_input->pc2encoding.find(entry.addr) ==
per_shard->per_input->pc2encoding.end()) {
return "Missing encoding for PC " + std::to_string(entry.addr) +
" in shard " + per_shard->shard_stream->get_stream_name() +
" at input entry " + std::to_string(per_shard->input_entry_count);
}
VPRINT(this, 3,
"output new-chunk encoding chunk=%" PRIu64 " ref=%" PRIu64 "\n",
per_shard->chunk_ordinal, per_shard->cur_refs);
// Sanity check that the encoding size is correct.
const auto &enc = per_shard->per_input->pc2encoding[entry.addr];
/* OFFLINE_FILE_TYPE_ARCH_REGDEPS traces have encodings with size != ifetch.
* It's a design choice, not an error, hence we avoid this sanity check.
*/
if (!TESTANY(OFFLINE_FILE_TYPE_ARCH_REGDEPS, per_shard->filetype)) {
size_t enc_sz = 0;
// Since all but the last entry are fixed-size we could avoid a loop
// but the loop is easier to read and we have just 1 or 2 iters.
for (const auto &record : enc)
enc_sz += record.size;
if (enc_sz != entry.size) {
return "New-chunk encoding size " + std::to_string(enc_sz) +
" != instr size " + std::to_string(entry.size);
}
}
if (!write_trace_entries(per_shard, enc)) {
return "Failed to write";
}
// Avoid emitting the encoding twice.
per_shard->delayed_encodings[entry.addr].clear();
}
}
if (output)
per_shard->cur_chunk_pcs.insert(entry.addr);
return "";
}
std::string
record_filter_t::process_delayed_encodings(per_shard_t *per_shard, trace_entry_t &entry,
bool output)
{
if (!is_any_instr_type(static_cast<trace_type_t>(entry.type)))
return "";
if (!output) {
if (!per_shard->last_encoding.empty()) {
// Overwrite in case the encoding for this pc was already recorded.
per_shard->delayed_encodings[entry.addr] =
std::move(per_shard->last_encoding);
}
} else if (TESTANY(OFFLINE_FILE_TYPE_ENCODINGS, per_shard->filetype)) {
// Output if we have encodings that haven't yet been output, and
// there is no filter removing all encodings (we don't support
// partial encoding removal). Note that filters that modify encodings (even if
// they add or remove trace_entry_t records) do not incur in this problem and we
// don't need support for partial removal of encodings in this case. An example
// of such filters is encodings2regdeps_filter_t.
// We check prev_was_output to rule out filtered-out encodings
// (we record all encodings for new-chunk insertion).
if (!per_shard->last_encoding.empty() && per_shard->prev_was_output) {
// This instruction is accompanied by a preceding encoding. Since
// this instruction is not filtered out, output the encoding now.
VPRINT(this, 3,
"output just-delayed encoding chunk=%" PRIu64 " ref=%" PRIu64
" pc=0x%zx\n",
per_shard->chunk_ordinal, per_shard->cur_refs, entry.addr);
if (!write_trace_entries(per_shard, per_shard->last_encoding)) {
return "Failed to write";
}
// Remove previously delayed encoding that doesn't need to be output
// now that we have a more recent version for this instr.
per_shard->delayed_encodings.erase(entry.addr);
} else if (!per_shard->delayed_encodings[entry.addr].empty()) {
// The previous instance of this instruction was filtered out and
// its encoding was saved. Now that we have an instance of the same
// instruction that is not filtered out, we need to output its
// encoding.
VPRINT(this, 3,
"output long-delayed encoding chunk=%" PRIu64 " ref=%" PRIu64
" pc=0x%zx\n",
per_shard->chunk_ordinal, per_shard->cur_refs, entry.addr);
if (!write_trace_entries(per_shard,
per_shard->delayed_encodings[entry.addr])) {
return "Failed to write";
}
per_shard->delayed_encodings.erase(entry.addr);
}
}
return "";
}
bool
record_filter_t::parallel_shard_memref(void *shard_data, const trace_entry_t &input_entry)
{
if (!success_) {
// Report an error that happened during shard init.
return false;
}
per_shard_t *per_shard = reinterpret_cast<per_shard_t *>(shard_data);
++per_shard->input_entry_count;
trace_entry_t entry = input_entry;
bool output = true;
// XXX: Once we have multi-workload inputs we'll want all our PC keys to become
// pairs <get_workload_ordinal(), PC>.
if (per_shard->shard_stream->get_workload_id() != per_shard->prev_workload_id &&
per_shard->shard_stream->get_workload_id() >= 0 &&
per_shard->prev_workload_id >= 0) {
per_shard->error = "Multi-workload inputs not yet supported";
return false;
}
int64_t input_id = per_shard->shard_stream->get_input_id();
if (per_shard->prev_input_id != input_id) {
VPRINT(this, 3,
"shard %d switch from %" PRId64 " to %" PRId64 " (refs=%" PRIu64
" instrs=%" PRIu64 ")\n",
per_shard->shard_stream->get_shard_index(), per_shard->prev_input_id,
input_id,
per_shard->shard_stream->get_input_interface() == nullptr
? 0
: per_shard->shard_stream->get_input_interface()->get_record_ordinal(),
per_shard->shard_stream->get_input_interface() == nullptr
? 0
: per_shard->shard_stream->get_input_interface()
->get_instruction_ordinal());
std::lock_guard<std::mutex> guard(input2info_mutex_);
auto it = input2info_.find(input_id);
if (it == input2info_.end()) {
input2info_[input_id] = std::unique_ptr<per_input_t>(new per_input_t);
it = input2info_.find(input_id);
}
// It would be nice to assert that this pointer is not in use in other shards
// but that is too expensive.
per_shard->per_input = it->second.get();
// Not supposed to see a switch that splits an encoding from its instr.
// That would cause recording an incorrect encoding into pc2encoding.
if (!per_shard->last_encoding.empty()) {
per_shard->error = "Input switch immediately after encoding not supported";
return false;
}
}
if (per_shard->enabled && stop_timestamp_ != 0 &&
per_shard->shard_stream->get_last_timestamp() >= stop_timestamp_) {
per_shard->enabled = false;
trace_entry_t filter_boundary_entry = { TRACE_TYPE_MARKER,
TRACE_MARKER_TYPE_FILTER_ENDPOINT,
{ 0 } };
if (!write_trace_entry(per_shard, filter_boundary_entry)) {
per_shard->error = "Failed to write";
return false;
}
}
if (per_shard->enabled) {
for (int i = 0; i < static_cast<int>(filters_.size()); ++i) {
if (!filters_[i]->parallel_shard_filter(entry,
per_shard->filter_shard_data[i],
per_shard->record_filter_info)) {
output = false;
}
if (!filters_[i]->get_error_string().empty()) {
per_shard->error = "Filter error: " + filters_[i]->get_error_string();
return false;
}
}
}
if (per_shard->archive_writer) {
// Wait until we reach the next instr or timestamp past the threshold to
// insert the new chunk, to ensure we get all associated records with the
// chunk-final instr.
VPRINT(this, 4, "Cur chunk instr count: %" PRIu64 " vs threshold %" PRIu64 "\n",
per_shard->cur_chunk_instrs, per_shard->chunk_size);
if (per_shard->cur_chunk_instrs >= per_shard->chunk_size &&
per_shard->chunk_size > 0 &&
(is_any_instr_type(static_cast<trace_type_t>(entry.type)) ||
(entry.type == TRACE_TYPE_MARKER &&
entry.size == TRACE_MARKER_TYPE_TIMESTAMP) ||
entry.type == TRACE_TYPE_THREAD_EXIT || entry.type == TRACE_TYPE_FOOTER)) {
std::string error = open_new_chunk(per_shard);
if (!error.empty()) {
per_shard->error = error;
return false;
}
}
}
per_shard->error = process_markers(per_shard, entry, output);
if (!per_shard->error.empty())
return false;
per_shard->error = process_chunk_encodings(per_shard, entry, output);
if (!per_shard->error.empty())
return false;
if (output && type_is_instr(static_cast<trace_type_t>(entry.type)) &&
// Do not count PC-only i-filtered instrs.
entry.size > 0)
++per_shard->cur_chunk_instrs;
per_shard->error = process_delayed_encodings(per_shard, entry, output);
if (!per_shard->error.empty())
return false;
per_shard->prev_was_output = output;
if (entry.type == TRACE_TYPE_ENCODING) {
// Delay output until we know whether its instr will be output.
VPRINT(this, 4, "@%" PRIu64 " remembering last encoding %d %d 0x%zx\n",
per_shard->input_entry_count, entry.type, entry.size, entry.addr);
per_shard->last_encoding.push_back(entry);
output = false;
} else if (is_any_instr_type(static_cast<trace_type_t>(entry.type))) {
per_shard->last_encoding.clear();
}
per_shard->prev_input_id = per_shard->shard_stream->get_input_id();
per_shard->prev_workload_id = per_shard->shard_stream->get_workload_id();
if (output) {
// XXX i#5675: Currently we support writing to a single output file, but we may
// want to write to multiple in the same run; e.g. splitting a trace. For now,
// we can simply run the tool multiple times, but it can be made more efficient.
if (!write_trace_entry(per_shard, entry)) {
per_shard->error = "Failed to write";
return false;
}
}
return true;
}
bool
record_filter_t::process_memref(const trace_entry_t &memref)
{
// XXX i#5675: Serial analysis is not yet supported. Each shard is processed
// independently of the others. A cache filter may want to use a global cache.
return false;
}
std::string
record_filter_t::open_serial_schedule_file()
{
if (serial_schedule_ostream_ != nullptr)
return "Already opened";
if (output_dir_.empty())
return "No output directory specified";
std::string path = output_dir_ + DIRSEP + DRMEMTRACE_SERIAL_SCHEDULE_FILENAME;
#ifdef HAS_ZLIB
path += ".gz";
serial_schedule_file_ = std::unique_ptr<std::ostream>(new gzip_ostream_t(path));
#else
serial_schedule_file_ =
std::unique_ptr<std::ostream>(new std::ofstream(path, std::ofstream::binary));
#endif
if (!serial_schedule_file_)
return "Failed to open serial schedule file " + path;
serial_schedule_ostream_ = serial_schedule_file_.get();
return "";
}
std::string
record_filter_t::open_cpu_schedule_file()
{
if (cpu_schedule_ostream_ != nullptr)
return "Already opened";
if (output_dir_.empty())
return "No output directory specified";
std::string path = output_dir_ + DIRSEP + DRMEMTRACE_CPU_SCHEDULE_FILENAME;
#ifdef HAS_ZIP
cpu_schedule_file_ = std::unique_ptr<archive_ostream_t>(new zipfile_ostream_t(path));
if (!cpu_schedule_file_)
return "Failed to open cpu schedule file " + path;
cpu_schedule_ostream_ = cpu_schedule_file_.get();
return "";
#else
return "Zipfile support is required for cpu schedule files";
#endif
}
std::string
record_filter_t::write_schedule_files()
{
schedule_file_t sched;
std::string err;
err = open_serial_schedule_file();
if (!err.empty())
return err;
err = open_cpu_schedule_file();
if (!err.empty()) {
#ifdef HAS_ZIP
return err;
#else
if (starts_with(err, "Zipfile support")) {
// Just skip the cpu file.
} else {
return err;
}
#endif
}
for (const auto &shard : shard_map_) {
err = sched.merge_shard_data(shard.second->sched_info);
if (!err.empty())
return err;
}
if (serial_schedule_ostream_ == nullptr)
return "Serial file not opened";
err = sched.write_serial_file(serial_schedule_ostream_);
if (!err.empty())
return err;
// Make the cpu file optional for !HAS_ZIP, but don't wrap this inside
// HAS_ZIP as some subclasses have non-minizip zip support and don't have
// that define.
if (cpu_schedule_ostream_ != nullptr) {
err = sched.write_cpu_file(cpu_schedule_ostream_);
if (!err.empty())
return err;
}
return "";
}
bool
record_filter_t::print_results()
{
bool res = true;
uint64_t input_entry_count = 0;
uint64_t output_entry_count = 0;
for (const auto &shard : shard_map_) {
input_entry_count += shard.second->input_entry_count;
if (shard.second->now_empty) {
error_string_ = remove_output_file(shard.second);
if (!error_string_.empty())
res = false;
} else
output_entry_count += shard.second->output_entry_count;
}
std::cerr << "Output " << output_entry_count << " entries from " << input_entry_count
<< " entries.\n";
if (output_dir_.empty()) {
std::cerr << "Not writing schedule files: no output directory was specified.\n";
return res;
}
error_string_ = write_schedule_files();
if (!error_string_.empty())
res = false;
return res;
}
} // namespace drmemtrace
} // namespace dynamorio