| /* ********************************************************** |
| * Copyright (c) 2016-2026 Google, Inc. All rights reserved. |
| * **********************************************************/ |
| |
| /* |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are met: |
| * |
| * * Redistributions of source code must retain the above copyright notice, |
| * this list of conditions and the following disclaimer. |
| * |
| * * Redistributions in binary form must reproduce the above copyright notice, |
| * this list of conditions and the following disclaimer in the documentation |
| * and/or other materials provided with the distribution. |
| * |
| * * Neither the name of Google, Inc. nor the names of its contributors may be |
| * used to endorse or promote products derived from this software without |
| * specific prior written permission. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| * ARE DISCLAIMED. IN NO EVENT SHALL VMWARE, INC. OR CONTRIBUTORS BE LIABLE |
| * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL |
| * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR |
| * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER |
| * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
| * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY |
| * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH |
| * DAMAGE. |
| */ |
| |
| #include "analyzer.h" |
| |
| #include <stddef.h> |
| #include <stdint.h> |
| |
| #include <algorithm> |
| #include <atomic> |
| #include <cassert> |
| #include <iostream> |
| #include <limits> |
| #include <memory> |
| #include <set> |
| #include <string> |
| #include <thread> |
| #include <unordered_set> |
| #include <utility> |
| #include <vector> |
| |
| #include "memref.h" |
| #include "scheduler.h" |
| #include "analysis_tool.h" |
| #ifdef HAS_ZLIB |
| # include "compressed_file_reader.h" |
| #else |
| # include "file_reader.h" |
| #endif |
| #include "reader.h" |
| #include "record_file_reader.h" |
| #include "noise_generator.h" |
| #include "trace_entry.h" |
| #ifdef HAS_ZIP |
| # include "reader/zipfile_file_reader.h" |
| #endif |
| #ifdef HAS_SNAPPY |
| # include "reader/snappy_file_reader.h" |
| #endif |
| #include "common/utils.h" |
| |
| #undef VPRINT |
| // Make printing available in release build by not using ifdef DEBUG here. |
| // The usefulness for diagnostics outweighs any overhead from extra branches. |
| #define VPRINT(analyzer, level, ...) \ |
| do { \ |
| if ((analyzer)->verbosity_ >= (level)) { \ |
| fprintf(stderr, "%s ", (analyzer)->output_prefix_); \ |
| fprintf(stderr, __VA_ARGS__); \ |
| } \ |
| } while (0) |
| |
| namespace dynamorio { |
| namespace drmemtrace { |
| |
| #ifdef HAS_ZLIB |
| // Even if the file is uncompressed, zlib's gzip interface is faster than |
| // file_reader_t's fstream in our measurements, so we always use it when |
| // available. |
| typedef compressed_file_reader_t default_file_reader_t; |
| typedef compressed_record_file_reader_t default_record_file_reader_t; |
| #else |
| typedef file_reader_t<std::ifstream *> default_file_reader_t; |
| typedef dynamorio::drmemtrace::record_file_reader_t<std::ifstream> |
| default_record_file_reader_t; |
| #endif |
| |
| /**************************************************************** |
| * Specializations for analyzer_tmpl_t<reader_t>, aka analyzer_t. |
| */ |
| |
| template <> |
| bool |
| analyzer_t::serial_mode_supported() |
| { |
| return true; |
| } |
| |
| template <> |
| bool |
| analyzer_t::record_has_tid(memref_t record, memref_tid_t &tid) |
| { |
| // All memref_t records have tids (after PR #5739 changed the reader). |
| tid = record.marker.tid; |
| return true; |
| } |
| |
| template <> |
| bool |
| analyzer_t::record_is_thread_final(memref_t record) |
| { |
| return record.exit.type == TRACE_TYPE_THREAD_EXIT; |
| } |
| |
| template <> |
| bool |
| analyzer_t::record_is_timestamp(const memref_t &record) |
| { |
| return record.marker.type == TRACE_TYPE_MARKER && |
| record.marker.marker_type == TRACE_MARKER_TYPE_TIMESTAMP; |
| } |
| |
| template <> |
| bool |
| analyzer_t::record_is_instr(const memref_t &record) |
| { |
| return type_is_instr(record.instr.type); |
| } |
| |
| template <> |
| memref_t |
| analyzer_t::create_wait_marker() |
| { |
| memref_t record = {}; // Zero the other fields. |
| record.marker.type = TRACE_TYPE_MARKER; |
| record.marker.marker_type = TRACE_MARKER_TYPE_CORE_WAIT; |
| record.marker.tid = INVALID_THREAD_ID; |
| return record; |
| } |
| |
| template <> |
| memref_t |
| analyzer_t::create_idle_marker() |
| { |
| memref_t record = {}; // Zero the other fields. |
| record.marker.type = TRACE_TYPE_MARKER; |
| record.marker.marker_type = TRACE_MARKER_TYPE_CORE_IDLE; |
| record.marker.tid = IDLE_THREAD_ID; |
| return record; |
| } |
| |
| /****************************************************************************** |
| * Specializations for analyzer_tmpl_t<record_reader_t>, aka record_analyzer_t. |
| */ |
| |
| template <> |
| bool |
| record_analyzer_t::serial_mode_supported() |
| { |
| // TODO i#5727,i#5843: Once we move serial interleaving from file_reader_t into |
| // the scheduler we can support serial mode for record files as we won't need |
| // to implement interleaving inside record_file_reader_t. |
| return false; |
| } |
| |
| template <> |
| bool |
| record_analyzer_t::record_has_tid(trace_entry_t record, memref_tid_t &tid) |
| { |
| if (record.type != TRACE_TYPE_THREAD) |
| return false; |
| tid = static_cast<memref_tid_t>(record.addr); |
| return true; |
| } |
| |
| template <> |
| bool |
| record_analyzer_t::record_is_thread_final(trace_entry_t record) |
| { |
| return record.type == TRACE_TYPE_FOOTER; |
| } |
| |
| template <> |
| bool |
| record_analyzer_t::record_is_timestamp(const trace_entry_t &record) |
| { |
| return record.type == TRACE_TYPE_MARKER && record.size == TRACE_MARKER_TYPE_TIMESTAMP; |
| } |
| |
| template <> |
| bool |
| record_analyzer_t::record_is_instr(const trace_entry_t &record) |
| { |
| return type_is_instr(static_cast<trace_type_t>(record.type)); |
| } |
| |
| template <> |
| trace_entry_t |
| record_analyzer_t::create_wait_marker() |
| { |
| trace_entry_t record; |
| record.type = TRACE_TYPE_MARKER; |
| record.size = TRACE_MARKER_TYPE_CORE_WAIT; |
| record.addr = 0; // Marker value has no meaning so we zero it. |
| return record; |
| } |
| |
| template <> |
| trace_entry_t |
| record_analyzer_t::create_idle_marker() |
| { |
| trace_entry_t record; |
| record.type = TRACE_TYPE_MARKER; |
| record.size = TRACE_MARKER_TYPE_CORE_IDLE; |
| record.addr = 0; // Marker value has no meaning so we zero it. |
| return record; |
| } |
| |
| /******************************************************************** |
| * Other analyzer_tmpl_t routines that do not need to be specialized. |
| */ |
| |
| template <typename RecordType, typename ReaderType> |
| analyzer_tmpl_t<RecordType, ReaderType>::analyzer_tmpl_t() |
| : success_(true) |
| , num_tools_(0) |
| , tools_(NULL) |
| , parallel_(true) |
| , worker_count_(0) |
| { |
| /* Nothing else: child class needs to initialize. */ |
| } |
| |
| template <typename RecordType, typename ReaderType> |
| bool |
| analyzer_tmpl_t<RecordType, ReaderType>::init_scheduler( |
| const std::vector<std::string> &trace_paths, |
| const std::set<memref_tid_t> &only_threads, const std::set<int> &only_shards, |
| int output_limit, int verbosity, typename sched_type_t::scheduler_options_t options) |
| { |
| verbosity_ = verbosity; |
| if (trace_paths.empty()) { |
| ERRMSG("Missing trace path(s)\n"); |
| return false; |
| } |
| std::vector<typename sched_type_t::range_t> regions; |
| if (skip_instrs_ > 0 || exit_after_instrs_ > 0) { |
| // TODO i#5843: For serial mode with multiple inputs this is not doing the |
| // right thing: this is skipping in every input stream, while the documented |
| // behavior is supposed to be an output stream skip. Once we have that |
| // capability in the scheduler we should switch to that. |
| regions.emplace_back(skip_instrs_ + 1, |
| exit_after_instrs_ == 0 ? 0 |
| : skip_instrs_ + exit_after_instrs_); |
| } |
| std::vector<typename sched_type_t::input_workload_t> workloads; |
| for (const std::string &path : trace_paths) { |
| if (path.empty()) { |
| ERRMSG("Trace path is empty\n"); |
| return false; |
| } |
| workloads.emplace_back(path, regions); |
| // As documented and checked in analyzer_multi.cpp, only_threads and only_shards |
| // limits are not supported with -multi_indir. That's already been checked, so |
| // we do not perform additional checks here. |
| workloads.back().only_threads = only_threads; |
| workloads.back().only_shards = only_shards; |
| workloads.back().output_limit = output_limit; |
| if (regions.empty() && skip_to_timestamp_ > 0) { |
| workloads.back().times_of_interest.emplace_back(skip_to_timestamp_, 0); |
| } |
| } |
| return init_scheduler_common(workloads, std::move(options)); |
| } |
| |
| template <typename RecordType, typename ReaderType> |
| bool |
| analyzer_tmpl_t<RecordType, ReaderType>::init_scheduler( |
| std::unique_ptr<ReaderType> reader, std::unique_ptr<ReaderType> reader_end, |
| int verbosity, typename sched_type_t::scheduler_options_t options) |
| { |
| verbosity_ = verbosity; |
| if (!reader || !reader_end) { |
| ERRMSG("Readers are empty\n"); |
| return false; |
| } |
| std::vector<typename sched_type_t::input_reader_t> readers; |
| // Use a sentinel for the tid so the scheduler will use the memref record tid. |
| readers.emplace_back(std::move(reader), std::move(reader_end), |
| /*tid=*/INVALID_THREAD_ID); |
| std::vector<typename sched_type_t::range_t> regions; |
| if (skip_instrs_ > 0) |
| regions.emplace_back(skip_instrs_ + 1, 0); |
| std::vector<typename sched_type_t::input_workload_t> workloads; |
| workloads.emplace_back(std::move(readers), regions); |
| return init_scheduler_common(workloads, std::move(options)); |
| } |
| |
| template <typename RecordType, typename ReaderType> |
| bool |
| analyzer_tmpl_t<RecordType, ReaderType>::init_scheduler_common( |
| std::vector<typename sched_type_t::input_workload_t> &workloads, |
| typename sched_type_t::scheduler_options_t options) |
| { |
| // Add noise generator to input workloads. |
| if (add_noise_generator_) { |
| // TODO i#7216: here can be a good place to analyze the workloads in order to |
| // tweak noise_generator_info_t parameters. For now we use noise_generator_info_t |
| // default values. |
| noise_generator_info_t noise_generator_info; |
| // TODO i#7216: currently we only create a single-process, single-thread noise |
| // generator. We plan to add more in the future (multi-process and/or |
| // multi-thread noise generators). |
| typename sched_type_t::input_reader_t noise_generator_reader = |
| noise_generator_factory_.create_noise_generator(noise_generator_info); |
| // Check for errors. |
| error_string_ += noise_generator_factory_.get_error_string(); |
| if (!error_string_.empty()) { |
| return false; |
| } |
| // input_workload_t needs a vector of input_reader_t, so we create a vector with |
| // a single input_reader_t (the noise generator). |
| std::vector<typename sched_type_t::input_reader_t> readers; |
| readers.emplace_back(std::move(noise_generator_reader)); |
| // Add the noise generator to the scheduler's input workloads. |
| workloads.emplace_back(std::move(readers)); |
| } |
| |
| for (int i = 0; i < num_tools_; ++i) { |
| if (parallel_ && !tools_[i]->parallel_shard_supported()) { |
| parallel_ = false; |
| break; |
| } |
| } |
| |
| typename sched_type_t::scheduler_options_t sched_ops; |
| int output_count = worker_count_; |
| if (shard_type_ == SHARD_BY_CORE) { |
| // Subclass must pass us options and set worker_count_ to # cores. |
| if (worker_count_ <= 0) { |
| error_string_ = "For -core_sharded, core count must be > 0"; |
| return false; |
| } |
| sched_ops = std::move(options); |
| if (sched_ops.quantum_unit == sched_type_t::QUANTUM_TIME) |
| sched_by_time_ = true; |
| if (!parallel_) { |
| // output_count remains the # of virtual cores, but we have just |
| // one worker thread. The scheduler multiplexes the output_count output |
| // cores onto a single stream for us with this option: |
| sched_ops.single_lockstep_output = true; |
| worker_count_ = 1; |
| } |
| } else { |
| if (parallel_) { |
| sched_ops = sched_type_t::make_scheduler_parallel_options(verbosity_); |
| if (worker_count_ <= 0) |
| worker_count_ = std::thread::hardware_concurrency(); |
| output_count = worker_count_; |
| } else { |
| sched_ops = sched_type_t::make_scheduler_serial_options(verbosity_); |
| worker_count_ = 1; |
| output_count = 1; |
| } |
| // As noted in the init_scheduler_common() header comment, we preserve only |
| // some select fields. |
| sched_ops.replay_as_traced_istream = options.replay_as_traced_istream; |
| sched_ops.read_inputs_in_init = options.read_inputs_in_init; |
| sched_ops.kernel_syscall_trace_path = options.kernel_syscall_trace_path; |
| } |
| sched_mapping_ = options.mapping; |
| if (sched_mapping_ == sched_type_t::MAP_TO_ANY_OUTPUT && worker_count_ > 0 && |
| max_allowed_imbalance_ >= 1.) { |
| load_balance_ = true; |
| } |
| if (scheduler_.init(workloads, output_count, std::move(sched_ops)) != |
| sched_type_t::STATUS_SUCCESS) { |
| ERRMSG("Failed to initialize scheduler: %s\n", |
| scheduler_.get_error_string().c_str()); |
| return false; |
| } |
| |
| for (int i = 0; i < worker_count_; ++i) { |
| worker_data_.push_back(analyzer_worker_data_t(i, scheduler_.get_stream(i))); |
| if (options.read_inputs_in_init) { |
| // The docs say we can query the filetype up front. |
| uint64_t filetype = scheduler_.get_stream(i)->get_filetype(); |
| VPRINT(this, 2, "Worker %d filetype %" PRIx64 "\n", i, filetype); |
| if (TESTANY(OFFLINE_FILE_TYPE_CORE_SHARDED, filetype)) { |
| if (i == 0 && shard_type_ == SHARD_BY_CORE) { |
| // This is almost certainly user error. |
| // Better to exit than risk user confusion. |
| // XXX i#7045: Ideally this could be reported as an error by the |
| // scheduler, and also detected early in analyzer_multi to auto-fix |
| // (when no mode is specified: if the user specifies core-sharding |
| // there could be config differences and this should be an error), |
| // but neither is simple so today the user has to re-run. |
| error_string_ = |
| "Re-scheduling a core-sharded-on-disk trace is generally a " |
| "mistake; re-run with -no_core_sharded.\n"; |
| return false; |
| } |
| shard_type_ = SHARD_BY_CORE; |
| } |
| } |
| } |
| |
| return true; |
| } |
| |
| template <typename RecordType, typename ReaderType> |
| analyzer_tmpl_t<RecordType, ReaderType>::analyzer_tmpl_t( |
| const std::string &trace_path, analysis_tool_tmpl_t<RecordType> **tools, |
| int num_tools, int worker_count, uint64_t skip_instrs, uint64_t interval_microseconds, |
| uint64_t interval_instr_count, int verbosity) |
| : success_(true) |
| , num_tools_(num_tools) |
| , tools_(tools) |
| , parallel_(true) |
| , worker_count_(worker_count) |
| , skip_instrs_(skip_instrs) |
| , interval_microseconds_(interval_microseconds) |
| , interval_instr_count_(interval_instr_count) |
| , verbosity_(verbosity) |
| { |
| if (interval_microseconds_ > 0 && interval_instr_count_ > 0) { |
| success_ = false; |
| error_string_ = "Cannot enable both kinds of interval analysis"; |
| return; |
| } |
| // The scheduler will call reader_t::init() for each input file. We assume |
| // that won't block (analyzer_multi_t separates out IPC readers). |
| typename sched_type_t::scheduler_options_t sched_ops; |
| if (!init_scheduler({ trace_path }, {}, {}, /*output_limit=*/0, verbosity, |
| std::move(sched_ops))) { |
| success_ = false; |
| error_string_ = "Failed to create scheduler"; |
| return; |
| } |
| for (int i = 0; i < num_tools; ++i) { |
| if (tools_[i] == NULL || !*tools_[i]) { |
| success_ = false; |
| error_string_ = "Tool is not successfully initialized"; |
| if (tools_[i] != NULL) |
| error_string_ += ": " + tools_[i]->get_error_string(); |
| return; |
| } |
| } |
| } |
| |
| // Work around clang-format bug: no newline after return type for single-char operator. |
| // clang-format off |
| template <typename RecordType, typename ReaderType> |
| // clang-format on |
| analyzer_tmpl_t<RecordType, ReaderType>::~analyzer_tmpl_t() |
| { |
| // Empty. |
| } |
| |
| template <typename RecordType, typename ReaderType> |
| // Work around clang-format bug: no newline after return type for single-char operator. |
| // clang-format off |
| bool |
| analyzer_tmpl_t<RecordType,ReaderType>::operator!() |
| // clang-format on |
| { |
| return !success_; |
| } |
| |
| template <typename RecordType, typename ReaderType> |
| std::string |
| analyzer_tmpl_t<RecordType, ReaderType>::get_error_string() |
| { |
| return error_string_; |
| } |
| |
| template <typename RecordType, typename ReaderType> |
| uint64_t |
| analyzer_tmpl_t<RecordType, ReaderType>::get_current_microseconds() |
| { |
| return get_microsecond_timestamp(); |
| } |
| |
| template <typename RecordType, typename ReaderType> |
| uint64_t |
| analyzer_tmpl_t<RecordType, ReaderType>::compute_timestamp_interval_id( |
| uint64_t first_timestamp, uint64_t latest_timestamp) |
| { |
| assert(first_timestamp <= latest_timestamp); |
| assert(interval_microseconds_ > 0); |
| // We keep the interval end timestamps independent of the first timestamp of the |
| // trace. For the parallel mode, where we need to merge intervals from different |
| // shards that were active during the same final whole-trace interval, having aligned |
| // interval-end points makes it easier to merge. Note that interval ids are however |
| // still dependent on the first timestamp since we want interval ids to start at a |
| // small number >= 1. |
| return latest_timestamp / interval_microseconds_ - |
| first_timestamp / interval_microseconds_ + 1; |
| } |
| |
| template <typename RecordType, typename ReaderType> |
| uint64_t |
| analyzer_tmpl_t<RecordType, ReaderType>::compute_instr_count_interval_id( |
| uint64_t cur_instr_count) |
| { |
| assert(interval_instr_count_ > 0); |
| if (cur_instr_count == 0) |
| return 1; |
| // We want all memory access entries following an instr to stay in the same |
| // interval as the instr, so we increment interval_id at instr entries. Also, |
| // we want the last instr in each interval to have an ordinal that's a multiple |
| // of interval_instr_count_. |
| return (cur_instr_count - 1) / interval_instr_count_ + 1; |
| } |
| |
| template <typename RecordType, typename ReaderType> |
| uint64_t |
| analyzer_tmpl_t<RecordType, ReaderType>::compute_interval_end_timestamp( |
| uint64_t first_timestamp, uint64_t interval_id) |
| { |
| assert(interval_microseconds_ > 0); |
| assert(interval_id >= 1); |
| uint64_t end_timestamp = |
| (first_timestamp / interval_microseconds_ + interval_id) * interval_microseconds_; |
| // Since the interval's end timestamp is exclusive, the end_timestamp would actually |
| // fall under the next interval. |
| assert(compute_timestamp_interval_id(first_timestamp, end_timestamp) == |
| interval_id + 1); |
| return end_timestamp; |
| } |
| |
| template <typename RecordType, typename ReaderType> |
| bool |
| analyzer_tmpl_t<RecordType, ReaderType>::advance_interval_id( |
| typename scheduler_tmpl_t<RecordType, ReaderType>::stream_t *stream, |
| analyzer_shard_data_t *shard, uint64_t &prev_interval_index, |
| uint64_t &prev_interval_init_instr_count, bool at_instr_record) |
| { |
| uint64_t next_interval_index = 0; |
| if (interval_microseconds_ > 0) { |
| next_interval_index = compute_timestamp_interval_id(stream->get_first_timestamp(), |
| stream->get_last_timestamp()); |
| } else if (interval_instr_count_ > 0) { |
| // The interval callbacks are invoked just prior to the process_memref or |
| // parallel_shard_memref callback for the first instr of the new interval; This |
| // keeps the instr's memory accesses in the same interval as the instr. |
| next_interval_index = |
| compute_instr_count_interval_id(stream->get_instruction_ordinal()); |
| } else { |
| return false; |
| } |
| if (next_interval_index != shard->cur_interval_index) { |
| assert(next_interval_index > shard->cur_interval_index); |
| prev_interval_index = shard->cur_interval_index; |
| prev_interval_init_instr_count = shard->cur_interval_init_instr_count; |
| shard->cur_interval_index = next_interval_index; |
| // If the next record to be presented to the tools is an instr record, we need to |
| // adjust for the fact that the record has already been read from the stream. |
| // Since we know that the next record is a part of the new interval and |
| // cur_interval_init_instr_count is supposed to be the count just prior to the |
| // new interval, we need to subtract one count for the instr. |
| shard->cur_interval_init_instr_count = |
| stream->get_instruction_ordinal() - (at_instr_record ? 1 : 0); |
| return true; |
| } |
| return false; |
| } |
| |
| template <typename RecordType, typename ReaderType> |
| void |
| analyzer_tmpl_t<RecordType, ReaderType>::process_serial(analyzer_worker_data_t &worker) |
| { |
| std::vector<void *> user_worker_data(num_tools_); |
| |
| worker.shard_data[0].tool_data.resize(num_tools_); |
| if (interval_microseconds_ != 0 || interval_instr_count_ != 0) |
| worker.shard_data[0].cur_interval_index = 1; |
| for (int i = 0; i < num_tools_; ++i) { |
| worker.error = tools_[i]->initialize_stream(worker.stream); |
| if (!worker.error.empty()) |
| return; |
| worker.error = tools_[i]->initialize_shard_type(shard_type_); |
| if (!worker.error.empty()) |
| return; |
| } |
| std::unordered_set<int> tool_exited; |
| while (true) { |
| RecordType record; |
| // The current time is used for time quanta; for instr quanta, it's ignored and |
| // we pass 0 and let the scheduler use instruction + idle counts. |
| uint64_t cur_micros = sched_by_time_ ? get_current_microseconds() : 0; |
| typename sched_type_t::stream_status_t status = |
| worker.stream->next_record(record, cur_micros); |
| if (status == sched_type_t::STATUS_WAIT) { |
| record = create_wait_marker(); |
| } else if (status == sched_type_t::STATUS_IDLE) { |
| assert(shard_type_ == SHARD_BY_CORE); |
| record = create_idle_marker(); |
| ++worker.activity_count; |
| } else if (status == sched_type_t::STATUS_OK) { |
| if (record_is_instr(record)) |
| ++worker.activity_count; |
| } else { |
| if (status != sched_type_t::STATUS_EOF) { |
| if (status == sched_type_t::STATUS_REGION_INVALID) { |
| worker.error = |
| "Too-far -skip_instrs for: " + worker.stream->get_stream_name(); |
| } else { |
| worker.error = |
| "Failed to read from trace: " + worker.stream->get_stream_name(); |
| } |
| } else if (interval_microseconds_ != 0 || interval_instr_count_ != 0) { |
| if (!process_interval(worker.shard_data[0].cur_interval_index, |
| worker.shard_data[0].cur_interval_init_instr_count, |
| &worker, |
| /*parallel=*/false, /*at_instr_record=*/false) || |
| !finalize_interval_snapshots(&worker, /*parallel=*/false)) |
| return; |
| } |
| return; |
| } |
| // For zipfiles, we could jump chunk to chunk and use the record ordinal |
| // marker, but this option is rarely used so we do a simple walk here. |
| // Users should use skip_instrs for fast skipping. |
| // We also do not present the prior timestamp when we get there. |
| // Nor do we count anything the scheduler doesn't add to the ordinals: |
| // dynamically injected synthetic records. |
| if (skip_records_ > 0 && |
| skip_records_ >= worker.stream->get_output_record_ordinal()) |
| continue; |
| uint64_t prev_interval_index; |
| uint64_t prev_interval_init_instr_count; |
| if ((record_is_timestamp(record) || record_is_instr(record)) && |
| advance_interval_id(worker.stream, &worker.shard_data[0], prev_interval_index, |
| prev_interval_init_instr_count, |
| record_is_instr(record)) && |
| !process_interval(prev_interval_index, prev_interval_init_instr_count, |
| &worker, /*parallel=*/false, record_is_instr(record))) { |
| return; |
| } |
| for (int i = 0; i < num_tools_; ++i) { |
| if (tool_exited.find(i) != tool_exited.end()) |
| continue; |
| if (!tools_[i]->process_memref(record)) { |
| worker.error = tools_[i]->get_error_string(); |
| if (worker.error.empty()) { |
| VPRINT(this, 1, "Worker %d tool %d exiting early on trace shard %s\n", |
| worker.index, i, worker.stream->get_stream_name().c_str()); |
| tool_exited.insert(i); |
| if (static_cast<int>(tool_exited.size()) >= num_tools_) { |
| VPRINT(this, 1, |
| "Worker %d all tools exited early on trace shard %s\n", |
| worker.index, worker.stream->get_stream_name().c_str()); |
| return; |
| } |
| } else { |
| VPRINT(this, 1, "Worker %d hit memref error %s on trace shard %s\n", |
| worker.index, worker.error.c_str(), |
| worker.stream->get_stream_name().c_str()); |
| return; |
| } |
| } |
| } |
| if (exit_after_records_ > 0 && |
| // We can't use get_record_ordinal() because it's the input |
| // ordinal due to SCHEDULER_USE_INPUT_ORDINALS. We do not want to |
| // include skipped records here. |
| worker.stream->get_output_record_ordinal() >= |
| skip_records_ + exit_after_records_) { |
| VPRINT(this, 1, |
| "Worker %d exiting after requested record count on shard %s\n", |
| worker.index, worker.stream->get_stream_name().c_str()); |
| return; |
| } |
| } |
| } |
| |
| template <typename RecordType, typename ReaderType> |
| bool |
| analyzer_tmpl_t<RecordType, ReaderType>::process_shard_exit( |
| analyzer_worker_data_t *worker, int shard_index, bool do_process_final_interval) |
| { |
| VPRINT(this, 1, "Worker %d finished trace shard %s\n", worker->index, |
| worker->stream->get_stream_name().c_str()); |
| worker->shard_data[shard_index].exited = true; |
| if (interval_microseconds_ != 0 || interval_instr_count_ != 0) { |
| if (!do_process_final_interval) { |
| ERRMSG("i#6793: Skipping process_interval for final interval of shard index " |
| "%d\n", |
| shard_index); |
| } else if (!process_interval( |
| worker->shard_data[shard_index].cur_interval_index, |
| worker->shard_data[shard_index].cur_interval_init_instr_count, |
| worker, |
| /*parallel=*/true, /*at_instr_record=*/false, shard_index)) { |
| return false; |
| } |
| if (!finalize_interval_snapshots(worker, /*parallel=*/true, shard_index)) { |
| return false; |
| } |
| } |
| for (int i = 0; i < num_tools_; ++i) { |
| if (!tools_[i]->parallel_shard_exit( |
| worker->shard_data[shard_index].tool_data[i].shard_data)) { |
| worker->error = tools_[i]->parallel_shard_error( |
| worker->shard_data[shard_index].tool_data[i].shard_data); |
| VPRINT(this, 1, "Worker %d hit shard exit error %s on trace shard index %d\n", |
| worker->index, worker->error.c_str(), shard_index); |
| return false; |
| } |
| } |
| return true; |
| } |
| template <typename RecordType, typename ReaderType> |
| void |
| analyzer_tmpl_t<RecordType, ReaderType>::check_load_balance( |
| analyzer_worker_data_t *worker) |
| { |
| // Only update the shared atomic value and read other threads' values every |
| // so often to reduce overhead. |
| if (worker->activity_count - worker->prev_ord_balance_check >= |
| load_balance_cadence_) { |
| worker->shared_activity_count.store(worker->activity_count, |
| std::memory_order_release); |
| worker->prev_ord_balance_check = worker->activity_count; |
| int64_t min_activity = std::numeric_limits<int64_t>::max(); |
| const analyzer_worker_data_t *min_worker = nullptr; |
| for (const auto &worker_data : worker_data_) { |
| // We can't wait for a finished worker (this only happens when |
| // all inputs, or exit_if_fraction_inputs_left inputs, are at EOF). |
| if (worker_data.exited.load(std::memory_order_acquire)) |
| continue; |
| int64_t worker_activity = |
| worker_data.shared_activity_count.load(std::memory_order_acquire); |
| if (worker_activity < min_activity) { |
| min_activity = worker_activity; |
| min_worker = &worker_data; |
| } |
| } |
| if (worker->activity_count <= max_allowed_imbalance_ * min_activity) { |
| VPRINT(this, 3, |
| "Worker %d @%" PRId64 " NOT waiting for slowest %d @%" PRId64 "\n", |
| worker->index, worker->activity_count, min_worker->index, |
| min_activity); |
| return; |
| } |
| constexpr int LOG_EVERY = 100; |
| if (worker->imbalance_wait_count % LOG_EVERY == 0) { |
| VPRINT( |
| this, 1, "Worker %d @%" PRId64 " waiting for slowest %d @%" PRId64 "\n", |
| worker->index, worker->activity_count, min_worker->index, min_activity); |
| } |
| ++worker->imbalance_wait_count; |
| int iters = 0; |
| // Don't stay here too long: return back to the main code to process |
| // exits and other conditions, even if the imbalance is not fully restored yet. |
| // We'll come back here if it's not, or if another worker has become the slowest, |
| // and try again. |
| constexpr int MAX_ITERS = 100; |
| while (worker->activity_count > max_allowed_imbalance_ * min_activity && |
| ++iters < MAX_ITERS) { |
| // A yield is not sufficient when the slower worker is competing on another |
| // core while the faster worker has no competition and will just run again |
| // with a yield: we need to give the slower worker *time*. |
| std::this_thread::sleep_for(std::chrono::milliseconds(1)); |
| min_activity = |
| min_worker->shared_activity_count.load(std::memory_order_acquire); |
| } |
| } |
| } |
| |
| template <typename RecordType, typename ReaderType> |
| bool |
| analyzer_tmpl_t<RecordType, ReaderType>::process_tasks_internal( |
| analyzer_worker_data_t *worker) |
| { |
| std::vector<void *> user_worker_data(num_tools_); |
| |
| for (int i = 0; i < num_tools_; ++i) |
| user_worker_data[i] = tools_[i]->parallel_worker_init(worker->index); |
| |
| RecordType record; |
| // The current time is used for time quanta; for instr quanta, it's ignored and |
| // we pass 0. |
| uint64_t cur_micros = sched_by_time_ ? get_current_microseconds() : 0; |
| std::unordered_set<int> tool_exited; |
| for (typename sched_type_t::stream_status_t status = |
| worker->stream->next_record(record, cur_micros); |
| status != sched_type_t::STATUS_EOF; |
| status = worker->stream->next_record(record, cur_micros)) { |
| if (sched_by_time_) |
| cur_micros = get_current_microseconds(); |
| if (status == sched_type_t::STATUS_WAIT) { |
| // We let tools know about waits so they can analyze the schedule. |
| // We synthesize a record here. If we wanted this to count toward output |
| // stream ordinals we would need to add a scheduler API to inject it. |
| record = create_wait_marker(); |
| if (parallel_) { |
| // Don't spin on this artificial wait; retry later. |
| std::this_thread::sleep_for(std::chrono::milliseconds(1)); |
| } |
| } else if (status == sched_type_t::STATUS_IDLE) { |
| assert(shard_type_ == SHARD_BY_CORE); |
| // We let tools know about idle time so they can analyze cpu usage. |
| // We synthesize a record here. If we wanted this to count toward output |
| // stream ordinals we would need to add a scheduler API to inject it. |
| record = create_idle_marker(); |
| ++worker->activity_count; |
| if (load_balance_) |
| check_load_balance(worker); |
| } else if (status == sched_type_t::STATUS_OK) { |
| if (record_is_instr(record)) { |
| ++worker->activity_count; |
| if (load_balance_) |
| check_load_balance(worker); |
| } |
| } else { |
| if (status == sched_type_t::STATUS_REGION_INVALID) { |
| worker->error = |
| "Too-far -skip_instrs for: " + worker->stream->get_stream_name(); |
| } else { |
| worker->error = |
| "Failed to read from trace: " + worker->stream->get_stream_name(); |
| } |
| return false; |
| } |
| int shard_index = worker->stream->get_shard_index(); |
| if (worker->shard_data.find(shard_index) == worker->shard_data.end()) { |
| VPRINT(this, 1, "Worker %d starting on trace shard %d stream is %p\n", |
| worker->index, shard_index, worker->stream); |
| worker->shard_data[shard_index].tool_data.resize(num_tools_); |
| if (interval_microseconds_ != 0 || interval_instr_count_ != 0) |
| worker->shard_data[shard_index].cur_interval_index = 1; |
| for (int i = 0; i < num_tools_; ++i) { |
| worker->shard_data[shard_index].tool_data[i].shard_data = |
| tools_[i]->parallel_shard_init_stream( |
| shard_index, user_worker_data[i], worker->stream); |
| } |
| worker->shard_data[shard_index].shard_index = shard_index; |
| } |
| memref_tid_t tid; |
| if (worker->shard_data[shard_index].shard_id == 0) { |
| if (shard_type_ == SHARD_BY_CORE) |
| worker->shard_data[shard_index].shard_id = worker->index; |
| else if (record_has_tid(record, tid)) |
| worker->shard_data[shard_index].shard_id = tid; |
| } |
| // See comment in process_serial() on skip_records. |
| // Parallel skipping is not well-supported: we skip in each worker, not each |
| // shard, and even each shard (as -skip_instrs does today) may not be what the |
| // user wants: XXX i#7230: Is there a better usage mode for parallel skipping? |
| if (skip_records_ > 0 && |
| skip_records_ >= worker->stream->get_output_record_ordinal()) |
| continue; |
| uint64_t prev_interval_index; |
| uint64_t prev_interval_init_instr_count; |
| if ((record_is_timestamp(record) || record_is_instr(record)) && |
| advance_interval_id(worker->stream, &worker->shard_data[shard_index], |
| prev_interval_index, prev_interval_init_instr_count, |
| record_is_instr(record)) && |
| !process_interval(prev_interval_index, prev_interval_init_instr_count, worker, |
| /*parallel=*/true, record_is_instr(record), shard_index)) { |
| return false; |
| } |
| for (int i = 0; i < num_tools_; ++i) { |
| if (tool_exited.find(i) != tool_exited.end()) |
| continue; |
| if (!tools_[i]->parallel_shard_memref( |
| worker->shard_data[shard_index].tool_data[i].shard_data, record)) { |
| worker->error = tools_[i]->parallel_shard_error( |
| worker->shard_data[shard_index].tool_data[i].shard_data); |
| if (worker->error.empty()) { |
| VPRINT(this, 1, "Worker %d tool %d exiting early on trace shard %s\n", |
| worker->index, i, worker->stream->get_stream_name().c_str()); |
| tool_exited.insert(i); |
| if (static_cast<int>(tool_exited.size()) >= num_tools_) { |
| VPRINT(this, 1, |
| "Worker %d all tools exited early on trace shard %s\n", |
| worker->index, worker->stream->get_stream_name().c_str()); |
| return true; |
| } |
| } else { |
| VPRINT(this, 1, |
| "Worker %d hit shard memref error %s on trace shard %s\n", |
| worker->index, worker->error.c_str(), |
| worker->stream->get_stream_name().c_str()); |
| return false; |
| } |
| } |
| } |
| if (record_is_thread_final(record) && shard_type_ != SHARD_BY_CORE) { |
| if (!process_shard_exit(worker, shard_index)) { |
| return false; |
| } |
| } |
| if (exit_after_records_ > 0 && |
| // We can't use get_record_ordinal() because it's the input |
| // ordinal due to SCHEDULER_USE_INPUT_ORDINALS. We do not want to |
| // include skipped records here. |
| worker->stream->get_output_record_ordinal() >= |
| skip_records_ + exit_after_records_) { |
| VPRINT(this, 1, |
| "Worker %d exiting after requested record count on shard %s\n", |
| worker->index, worker->stream->get_stream_name().c_str()); |
| return true; |
| } |
| } |
| if (shard_type_ == SHARD_BY_CORE) { |
| if (worker->shard_data.find(worker->index) != worker->shard_data.end()) { |
| if (!process_shard_exit(worker, worker->index)) { |
| return false; |
| } |
| } |
| } |
| // i#6444: Fallback for cases where there is a missing thread final record in |
| // non-core-sharded traces, in which case we have not yet invoked |
| // process_shard_exit. |
| for (const auto &keyval : worker->shard_data) { |
| if (!keyval.second.exited) { |
| // i#6793: We skip processing the final interval for shards exited here |
| // if the stream has already moved on and cannot provide the state for |
| // the shard anymore. |
| bool do_process_final_interval = |
| keyval.second.shard_index == worker->stream->get_shard_index(); |
| if (!process_shard_exit(worker, keyval.second.shard_index, |
| do_process_final_interval)) { |
| return false; |
| } |
| } |
| } |
| for (int i = 0; i < num_tools_; ++i) { |
| const std::string error = tools_[i]->parallel_worker_exit(user_worker_data[i]); |
| if (!error.empty()) { |
| worker->error = error; |
| VPRINT(this, 1, "Worker %d hit worker exit error %s\n", worker->index, |
| error.c_str()); |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| template <typename RecordType, typename ReaderType> |
| void |
| analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *worker) |
| { |
| if (!process_tasks_internal(worker)) { |
| if (sched_mapping_ == sched_type_t::MAP_TO_ANY_OUTPUT) { |
| // Avoid a hang in the scheduler if we leave our current input stranded. |
| // XXX: Better to just do a global exit and not let the other threads |
| // keep running? That breaks the current model where errors are |
| // propagated to the user to decide what to do. |
| // We could perhaps add thread synch points to have other threads |
| // exit earlier: but maybe some uses cases consider one shard error |
| // to not affect others and not be fatal? |
| if (worker->stream->set_active(false) != sched_type_t::STATUS_OK) { |
| ERRMSG("Failed to set failing worker to inactive; may hang"); |
| } |
| } |
| } |
| VPRINT(this, 1, "Worker %d finished; waited %" PRId64 " times for load balancing\n", |
| worker->index, worker->imbalance_wait_count); |
| worker->exited.store(true, std::memory_order_release); |
| } |
| |
| template <typename RecordType, typename ReaderType> |
| bool |
| analyzer_tmpl_t<RecordType, ReaderType>::combine_interval_snapshots( |
| const std::vector< |
| const typename analysis_tool_tmpl_t<RecordType>::interval_state_snapshot_t *> |
| &latest_shard_snapshots, |
| uint64_t interval_end_timestamp, int tool_idx, |
| typename analysis_tool_tmpl_t<RecordType>::interval_state_snapshot_t *&result) |
| { |
| result = tools_[tool_idx]->combine_interval_snapshots(latest_shard_snapshots, |
| interval_end_timestamp); |
| if (result == nullptr) { |
| error_string_ = "combine_interval_snapshots unexpectedly returned nullptr: " + |
| tools_[tool_idx]->get_error_string(); |
| return false; |
| } |
| result->instr_count_delta_ = 0; |
| result->instr_count_cumulative_ = 0; |
| for (auto snapshot : latest_shard_snapshots) { |
| if (snapshot == nullptr) |
| continue; |
| // As discussed in the doc for analysis_tool_t::combine_interval_snapshots, |
| // we combine all shard's latest snapshots for cumulative metrics, whereas |
| // we combine only the shards active in current interval for delta metrics. |
| result->instr_count_cumulative_ += snapshot->instr_count_cumulative_; |
| if (snapshot->interval_end_timestamp_ == interval_end_timestamp) |
| result->instr_count_delta_ += snapshot->instr_count_delta_; |
| } |
| return true; |
| } |
| |
| template <typename RecordType, typename ReaderType> |
| bool |
| analyzer_tmpl_t<RecordType, ReaderType>::merge_shard_interval_results( |
| // intervals[shard_idx] is a vector of interval_state_snapshot_t* |
| // representing the interval snapshots for that shard. |
| std::vector<std::vector< |
| typename analysis_tool_tmpl_t<RecordType>::interval_state_snapshot_t *>> |
| &intervals, |
| // This function will write the resulting whole-trace intervals to |
| // merged_intervals. |
| std::vector<typename analysis_tool_tmpl_t<RecordType>::interval_state_snapshot_t *> |
| &merged_intervals, |
| int tool_idx) |
| { |
| assert(!intervals.empty()); |
| assert(merged_intervals.empty()); |
| // Used to recompute the interval_id for the result whole trace intervals, which are |
| // numbered by the earliest shard's timestamp. |
| uint64_t earliest_ever_interval_end_timestamp = std::numeric_limits<uint64_t>::max(); |
| size_t shard_count = intervals.size(); |
| std::vector<size_t> at_idx(shard_count, 0); |
| bool any_shard_has_results_left = true; |
| std::vector<typename analysis_tool_tmpl_t<RecordType>::interval_state_snapshot_t *> |
| last_snapshot_per_shard(shard_count, nullptr); |
| while (any_shard_has_results_left) { |
| // Look for the next whole trace interval across all shards, which will be the |
| // one with the earliest interval-end timestamp. |
| uint64_t earliest_interval_end_timestamp = std::numeric_limits<uint64_t>::max(); |
| for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) { |
| if (at_idx[shard_idx] == intervals[shard_idx].size()) |
| continue; |
| earliest_interval_end_timestamp = std::min( |
| earliest_interval_end_timestamp, |
| intervals[shard_idx][at_idx[shard_idx]]->interval_end_timestamp_); |
| } |
| // We're done if no shard has any interval left unprocessed. |
| if (earliest_interval_end_timestamp == std::numeric_limits<uint64_t>::max()) { |
| any_shard_has_results_left = false; |
| continue; |
| } |
| assert(earliest_interval_end_timestamp % interval_microseconds_ == 0); |
| if (earliest_ever_interval_end_timestamp == |
| std::numeric_limits<uint64_t>::max()) { |
| earliest_ever_interval_end_timestamp = earliest_interval_end_timestamp; |
| } |
| // Update last_snapshot_per_shard for shards that were active during this |
| // interval, which have a timestamp == earliest_interval_end_timestamp. |
| for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) { |
| if (at_idx[shard_idx] == intervals[shard_idx].size()) |
| continue; |
| uint64_t cur_interval_end_timestamp = |
| intervals[shard_idx][at_idx[shard_idx]]->interval_end_timestamp_; |
| assert(cur_interval_end_timestamp >= earliest_interval_end_timestamp); |
| if (cur_interval_end_timestamp > earliest_interval_end_timestamp) |
| continue; |
| // This shard was active during this interval. So, we update the current |
| // shard's latest interval snapshot. |
| if (last_snapshot_per_shard[shard_idx] != nullptr) { |
| if (!tools_[tool_idx]->release_interval_snapshot( |
| last_snapshot_per_shard[shard_idx])) { |
| error_string_ = tools_[tool_idx]->get_error_string(); |
| return false; |
| } |
| } |
| last_snapshot_per_shard[shard_idx] = intervals[shard_idx][at_idx[shard_idx]]; |
| ++at_idx[shard_idx]; |
| } |
| // Merge last_snapshot_per_shard to form the result of the current |
| // whole-trace interval. |
| std::vector< |
| const typename analysis_tool_tmpl_t<RecordType>::interval_state_snapshot_t *> |
| const_last_snapshot_per_shard; |
| const_last_snapshot_per_shard.insert(const_last_snapshot_per_shard.end(), |
| last_snapshot_per_shard.begin(), |
| last_snapshot_per_shard.end()); |
| typename analysis_tool_tmpl_t<RecordType>::interval_state_snapshot_t |
| *cur_merged_interval; |
| if (!combine_interval_snapshots(const_last_snapshot_per_shard, |
| earliest_interval_end_timestamp, tool_idx, |
| cur_merged_interval)) |
| return false; |
| // Add the merged interval to the result list of whole trace intervals. |
| cur_merged_interval->shard_id_ = analysis_tool_tmpl_t< |
| RecordType>::interval_state_snapshot_t::WHOLE_TRACE_SHARD_ID; |
| cur_merged_interval->interval_end_timestamp_ = earliest_interval_end_timestamp; |
| cur_merged_interval->interval_id_ = compute_timestamp_interval_id( |
| earliest_ever_interval_end_timestamp, earliest_interval_end_timestamp); |
| merged_intervals.push_back(cur_merged_interval); |
| } |
| for (auto snapshot : last_snapshot_per_shard) { |
| if (snapshot != nullptr && |
| !tools_[tool_idx]->release_interval_snapshot(snapshot)) { |
| error_string_ = tools_[tool_idx]->get_error_string(); |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| template <typename RecordType, typename ReaderType> |
| void |
| analyzer_tmpl_t<RecordType, ReaderType>::populate_unmerged_shard_interval_results() |
| { |
| for (auto &worker : worker_data_) { |
| for (auto &shard_data : worker.shard_data) { |
| assert(static_cast<int>(shard_data.second.tool_data.size()) == num_tools_); |
| for (int tool_idx = 0; tool_idx < num_tools_; ++tool_idx) { |
| key_tool_shard_t tool_shard_key = { tool_idx, |
| shard_data.second.shard_index }; |
| per_shard_interval_snapshots_[tool_shard_key] = std::move( |
| shard_data.second.tool_data[tool_idx].interval_snapshot_data); |
| } |
| } |
| } |
| } |
| |
| template <typename RecordType, typename ReaderType> |
| void |
| analyzer_tmpl_t<RecordType, ReaderType>::populate_serial_interval_results() |
| { |
| assert(whole_trace_interval_snapshots_.empty()); |
| whole_trace_interval_snapshots_.resize(num_tools_); |
| assert(worker_data_.size() == 1); |
| assert(worker_data_[0].shard_data.size() == 1 && |
| worker_data_[0].shard_data.count(0) == 1); |
| assert(static_cast<int>(worker_data_[0].shard_data[0].tool_data.size()) == |
| num_tools_); |
| for (int tool_idx = 0; tool_idx < num_tools_; ++tool_idx) { |
| whole_trace_interval_snapshots_[tool_idx] = std::move( |
| worker_data_[0].shard_data[0].tool_data[tool_idx].interval_snapshot_data); |
| } |
| } |
| |
| template <typename RecordType, typename ReaderType> |
| bool |
| analyzer_tmpl_t<RecordType, ReaderType>::collect_and_maybe_merge_shard_interval_results() |
| { |
| assert(interval_microseconds_ != 0 || interval_instr_count_ != 0); |
| if (!parallel_) { |
| populate_serial_interval_results(); |
| return true; |
| } |
| if (interval_instr_count_ > 0) { |
| // We do not merge interval state snapshots across shards. See comment by |
| // per_shard_interval_snapshots for more details. |
| populate_unmerged_shard_interval_results(); |
| return true; |
| } |
| // all_intervals[tool_idx][shard_idx] contains a vector of the |
| // interval_state_snapshot_t* that were output by that tool for that shard. |
| std::vector<std::vector<std::vector< |
| typename analysis_tool_tmpl_t<RecordType>::interval_state_snapshot_t *>>> |
| all_intervals(num_tools_); |
| for (const auto &worker : worker_data_) { |
| for (const auto &shard_data : worker.shard_data) { |
| assert(static_cast<int>(shard_data.second.tool_data.size()) == num_tools_); |
| for (int tool_idx = 0; tool_idx < num_tools_; ++tool_idx) { |
| all_intervals[tool_idx].emplace_back(std::move( |
| shard_data.second.tool_data[tool_idx].interval_snapshot_data)); |
| } |
| } |
| } |
| assert(whole_trace_interval_snapshots_.empty()); |
| whole_trace_interval_snapshots_.resize(num_tools_); |
| for (int tool_idx = 0; tool_idx < num_tools_; ++tool_idx) { |
| // We need to do this separately per tool because all tools may not |
| // generate an interval_state_snapshot_t for the same intervals (even though |
| // the framework notifies all tools of all intervals). |
| if (!merge_shard_interval_results(all_intervals[tool_idx], |
| whole_trace_interval_snapshots_[tool_idx], |
| tool_idx)) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| template <typename RecordType, typename ReaderType> |
| bool |
| analyzer_tmpl_t<RecordType, ReaderType>::run() |
| { |
| // XXX i#3286: Add a %-completed progress message by looking at the file sizes. |
| if (!parallel_) { |
| process_serial(worker_data_[0]); |
| if (!worker_data_[0].error.empty()) { |
| error_string_ = worker_data_[0].error; |
| return false; |
| } |
| } else { |
| if (worker_count_ <= 0) { |
| error_string_ = "Invalid worker count: must be > 0"; |
| return false; |
| } |
| for (int i = 0; i < num_tools_; ++i) { |
| error_string_ = tools_[i]->initialize_stream(nullptr); |
| if (!error_string_.empty()) |
| return false; |
| error_string_ = tools_[i]->initialize_shard_type(shard_type_); |
| if (!error_string_.empty()) |
| return false; |
| } |
| std::vector<std::thread> threads; |
| VPRINT(this, 1, "Creating %d worker threads\n", worker_count_); |
| threads.reserve(worker_count_); |
| for (int i = 0; i < worker_count_; ++i) { |
| threads.emplace_back( |
| std::thread(&analyzer_tmpl_t::process_tasks, this, &worker_data_[i])); |
| } |
| for (std::thread &thread : threads) |
| thread.join(); |
| for (auto &worker : worker_data_) { |
| if (!worker.error.empty()) { |
| error_string_ = worker.error; |
| return false; |
| } |
| } |
| } |
| if (interval_microseconds_ != 0 || interval_instr_count_ != 0) { |
| return collect_and_maybe_merge_shard_interval_results(); |
| } |
| return true; |
| } |
| |
| static void |
| print_output_separator() |
| { |
| |
| std::cerr << "\n==========================================================" |
| "=================\n"; |
| } |
| |
| template <typename RecordType, typename ReaderType> |
| bool |
| analyzer_tmpl_t<RecordType, ReaderType>::print_stats() |
| { |
| for (int i = 0; i < num_tools_; ++i) { |
| // Each tool should reset i/o state, but we reset the format here just in case. |
| std::cerr << std::dec; |
| if (!tools_[i]->print_results()) { |
| error_string_ = tools_[i]->get_error_string(); |
| return false; |
| } |
| if (i + 1 < num_tools_) { |
| // Separate tool output. |
| print_output_separator(); |
| } |
| } |
| // Now print interval results. |
| // Should not have both whole-trace or per-shard interval snapshots. |
| assert(whole_trace_interval_snapshots_.empty() || |
| per_shard_interval_snapshots_.empty()); |
| // We may have whole-trace intervals snapshots for instr count intervals in serial |
| // mode, and for timestamp (microsecond) intervals in both serial and parallel mode. |
| if (!whole_trace_interval_snapshots_.empty()) { |
| // Separate non-interval and interval outputs. |
| print_output_separator(); |
| std::cerr << "Printing whole-trace interval results:\n"; |
| for (int i = 0; i < num_tools_; ++i) { |
| // whole_trace_interval_snapshots_[i] may be empty if the corresponding tool |
| // did not produce any interval results. |
| if (!whole_trace_interval_snapshots_[i].empty() && |
| !tools_[i]->print_interval_results(whole_trace_interval_snapshots_[i])) { |
| error_string_ = tools_[i]->get_error_string(); |
| return false; |
| } |
| for (auto snapshot : whole_trace_interval_snapshots_[i]) { |
| if (!tools_[i]->release_interval_snapshot(snapshot)) { |
| error_string_ = tools_[i]->get_error_string(); |
| return false; |
| } |
| } |
| if (i + 1 < num_tools_) { |
| // Separate tool output. |
| print_output_separator(); |
| } |
| } |
| } else if (!per_shard_interval_snapshots_.empty()) { |
| // Separate non-interval and interval outputs. |
| print_output_separator(); |
| std::cerr << "Printing unmerged per-shard interval results:\n"; |
| for (auto &interval_snapshots : per_shard_interval_snapshots_) { |
| int tool_idx = interval_snapshots.first.tool_idx; |
| if (!interval_snapshots.second.empty() && |
| !tools_[tool_idx]->print_interval_results(interval_snapshots.second)) { |
| error_string_ = tools_[tool_idx]->get_error_string(); |
| return false; |
| } |
| for (auto snapshot : interval_snapshots.second) { |
| if (!tools_[tool_idx]->release_interval_snapshot(snapshot)) { |
| error_string_ = tools_[tool_idx]->get_error_string(); |
| return false; |
| } |
| } |
| print_output_separator(); |
| } |
| } |
| return true; |
| } |
| |
| template <typename RecordType, typename ReaderType> |
| bool |
| analyzer_tmpl_t<RecordType, ReaderType>::finalize_interval_snapshots( |
| analyzer_worker_data_t *worker, bool parallel, int shard_idx) |
| { |
| assert(parallel || |
| shard_idx == 0); // Only parallel mode supports a non-zero shard_idx. |
| for (int tool_idx = 0; tool_idx < num_tools_; ++tool_idx) { |
| if (!worker->shard_data[shard_idx] |
| .tool_data[tool_idx] |
| .interval_snapshot_data.empty() && |
| !tools_[tool_idx]->finalize_interval_snapshots(worker->shard_data[shard_idx] |
| .tool_data[tool_idx] |
| .interval_snapshot_data)) { |
| worker->error = tools_[tool_idx]->get_error_string(); |
| VPRINT(this, 1, |
| "Worker %d hit finalize_interval_snapshots error %s during %s " |
| "analysis in trace shard %s\n", |
| worker->index, worker->error.c_str(), parallel ? "parallel" : "serial", |
| worker->stream->get_stream_name().c_str()); |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| template <typename RecordType, typename ReaderType> |
| bool |
| analyzer_tmpl_t<RecordType, ReaderType>::process_interval( |
| uint64_t interval_id, uint64_t interval_init_instr_count, |
| analyzer_worker_data_t *worker, bool parallel, bool at_instr_record, int shard_idx) |
| { |
| assert(parallel || |
| shard_idx == 0); // Only parallel mode supports a non-zero shard_idx. |
| for (int tool_idx = 0; tool_idx < num_tools_; ++tool_idx) { |
| typename analysis_tool_tmpl_t<RecordType>::interval_state_snapshot_t *snapshot; |
| if (parallel) { |
| snapshot = tools_[tool_idx]->generate_shard_interval_snapshot( |
| worker->shard_data[shard_idx].tool_data[tool_idx].shard_data, |
| interval_id); |
| } else { |
| snapshot = tools_[tool_idx]->generate_interval_snapshot(interval_id); |
| } |
| if (tools_[tool_idx]->get_error_string() != "") { |
| worker->error = tools_[tool_idx]->get_error_string(); |
| VPRINT(this, 1, |
| "Worker %d hit process_interval error %s during %s analysis in trace " |
| "shard %s at " |
| "interval %" PRId64 "\n", |
| worker->index, worker->error.c_str(), parallel ? "parallel" : "serial", |
| worker->stream->get_stream_name().c_str(), interval_id); |
| return false; |
| } |
| if (snapshot != nullptr) { |
| snapshot->shard_id_ = parallel |
| ? worker->shard_data[shard_idx].shard_id |
| : analysis_tool_tmpl_t< |
| RecordType>::interval_state_snapshot_t::WHOLE_TRACE_SHARD_ID; |
| snapshot->interval_id_ = interval_id; |
| if (interval_microseconds_ > 0) { |
| // For timestamp intervals, the interval_end_timestamp is the abstract |
| // non-inclusive end timestamp for the interval_id. This is to make it |
| // easier to line up the corresponding shard interval snapshots so that |
| // we can merge them to form the whole-trace interval snapshots. |
| snapshot->interval_end_timestamp_ = compute_interval_end_timestamp( |
| worker->stream->get_first_timestamp(), interval_id); |
| } else { |
| snapshot->interval_end_timestamp_ = worker->stream->get_last_timestamp(); |
| } |
| // instr_count_cumulative for the interval snapshot is supposed to be |
| // inclusive, so if the first record after the interval (that is, the record |
| // we're at right now) is an instr, it must be subtracted. |
| snapshot->instr_count_cumulative_ = |
| worker->stream->get_instruction_ordinal() - (at_instr_record ? 1 : 0); |
| snapshot->instr_count_delta_ = |
| snapshot->instr_count_cumulative_ - interval_init_instr_count; |
| worker->shard_data[shard_idx] |
| .tool_data[tool_idx] |
| .interval_snapshot_data.push_back(snapshot); |
| } |
| } |
| return true; |
| } |
| |
| template class analyzer_tmpl_t<memref_t, reader_t>; |
| template class analyzer_tmpl_t<trace_entry_t, dynamorio::drmemtrace::record_reader_t>; |
| |
| } // namespace drmemtrace |
| } // namespace dynamorio |