| /* ********************************************************** |
| * Copyright (c) 2023-2026 Google, Inc. All rights reserved. |
| * **********************************************************/ |
| |
| /* |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are met: |
| * |
| * * Redistributions of source code must retain the above copyright notice, |
| * this list of conditions and the following disclaimer. |
| * |
| * * Redistributions in binary form must reproduce the above copyright notice, |
| * this list of conditions and the following disclaimer in the documentation |
| * and/or other materials provided with the distribution. |
| * |
| * * Neither the name of Google, Inc. nor the names of its contributors may be |
| * used to endorse or promote products derived from this software without |
| * specific prior written permission. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| * ARE DISCLAIMED. IN NO EVENT SHALL GOOGLE, INC. OR CONTRIBUTORS BE LIABLE |
| * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL |
| * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR |
| * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER |
| * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
| * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY |
| * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH |
| * DAMAGE. |
| */ |
| |
| /* Unit tests for trace analysis APIs. */ |
| |
| #include "test_helpers.h" |
| #include <assert.h> |
| |
| #include <atomic> |
| #include <iostream> |
| #include <thread> |
| #include <vector> |
| |
| #include "analyzer.h" |
| #include "mock_reader.h" |
| #include "scheduler.h" |
| #ifdef HAS_ZIP |
| # include "zipfile_istream.h" |
| # include "zipfile_ostream.h" |
| #endif |
| |
| namespace dynamorio { |
| namespace drmemtrace { |
| |
| template <typename RecordType, typename ReaderType> |
| // An analyzer that takes in any number of scheduler inputs, plus optional direct |
| // scheduler options for SHARD_BY_CORE. |
| class mock_analyzer_tmpl_t : public analyzer_tmpl_t<RecordType, ReaderType> { |
| public: |
| using analyzer_tmpl_t<RecordType, ReaderType>::num_tools_; |
| using analyzer_tmpl_t<RecordType, ReaderType>::tools_; |
| using analyzer_tmpl_t<RecordType, ReaderType>::parallel_; |
| using analyzer_tmpl_t<RecordType, ReaderType>::verbosity_; |
| using analyzer_tmpl_t<RecordType, ReaderType>::worker_count_; |
| using analyzer_tmpl_t<RecordType, ReaderType>::shard_type_; |
| using analyzer_tmpl_t<RecordType, ReaderType>::sched_mapping_; |
| using analyzer_tmpl_t<RecordType, ReaderType>::sched_by_time_; |
| using analyzer_tmpl_t<RecordType, ReaderType>::scheduler_; |
| using analyzer_tmpl_t<RecordType, ReaderType>::success_; |
| using analyzer_tmpl_t<RecordType, ReaderType>::worker_data_; |
| using analyzer_tmpl_t<RecordType, ReaderType>::load_balance_; |
| using analyzer_tmpl_t<RecordType, ReaderType>::load_balance_cadence_; |
| using analyzer_tmpl_t<RecordType, ReaderType>::max_allowed_imbalance_; |
| using typename analyzer_tmpl_t<RecordType, ReaderType>::analyzer_worker_data_t; |
| mock_analyzer_tmpl_t( |
| std::vector<typename scheduler_tmpl_t<RecordType, ReaderType>::input_workload_t> |
| &sched_inputs, |
| analysis_tool_tmpl_t<RecordType> **tools, int num_tools, bool parallel, |
| int worker_count, |
| typename scheduler_tmpl_t<RecordType, ReaderType>::scheduler_options_t |
| *sched_ops_in) |
| : analyzer_tmpl_t<RecordType, ReaderType>() |
| { |
| num_tools_ = num_tools; |
| tools_ = tools; |
| parallel_ = parallel; |
| verbosity_ = 1; |
| worker_count_ = worker_count; |
| typename scheduler_tmpl_t<RecordType, ReaderType>::scheduler_options_t sched_ops; |
| if (sched_ops_in != nullptr) { |
| shard_type_ = SHARD_BY_CORE; |
| sched_ops = std::move(*sched_ops_in); |
| // XXX: We could refactor init_scheduler_common() to share a couple of |
| // these lines. |
| if (sched_ops.quantum_unit == |
| scheduler_tmpl_t<RecordType, ReaderType>::QUANTUM_TIME) |
| sched_by_time_ = true; |
| } else if (parallel) { |
| sched_ops = |
| scheduler_tmpl_t<RecordType, ReaderType>::make_scheduler_parallel_options( |
| verbosity_); |
| } else { |
| sched_ops = |
| scheduler_tmpl_t<RecordType, ReaderType>::make_scheduler_serial_options( |
| verbosity_); |
| } |
| sched_mapping_ = sched_ops.mapping; |
| if (scheduler_.init(sched_inputs, worker_count_, std::move(sched_ops)) != |
| scheduler_tmpl_t<RecordType, ReaderType>::STATUS_SUCCESS) { |
| assert(false); |
| success_ = false; |
| } |
| for (int i = 0; i < worker_count_; ++i) { |
| worker_data_.push_back(analyzer_worker_data_t(i, scheduler_.get_stream(i))); |
| } |
| } |
| void |
| set_load_balance(bool on) |
| { |
| load_balance_ = on; |
| } |
| void |
| set_load_balance_cadence(uint64_t value) |
| { |
| load_balance_cadence_ = value; |
| } |
| void |
| set_max_allowed_imbalance(double value) |
| { |
| max_allowed_imbalance_ = value; |
| } |
| }; |
| |
| template class mock_analyzer_tmpl_t<memref_t, reader_t>; |
| template class mock_analyzer_tmpl_t<trace_entry_t, |
| dynamorio::drmemtrace::record_reader_t>; |
| |
| typedef mock_analyzer_tmpl_t<memref_t, reader_t> mock_analyzer_t; |
| |
| bool |
| test_queries() |
| { |
| std::cerr << "\n----------------\nTesting queries\n"; |
| std::vector<trace_entry_t> input_sequence = { |
| test_util::make_thread(/*tid=*/1), |
| test_util::make_pid(/*pid=*/1), |
| test_util::make_instr(/*pc=*/42), |
| test_util::make_exit(/*tid=*/1), |
| }; |
| static constexpr int NUM_INPUTS = 3; |
| static constexpr int NUM_OUTPUTS = 2; |
| static constexpr int BASE_TID = 100; |
| std::vector<trace_entry_t> inputs[NUM_INPUTS]; |
| std::vector<scheduler_t::input_workload_t> sched_inputs; |
| for (int i = 0; i < NUM_INPUTS; i++) { |
| memref_tid_t tid = BASE_TID + i; |
| inputs[i] = input_sequence; |
| for (auto &record : inputs[i]) { |
| if (record.type == TRACE_TYPE_THREAD || record.type == TRACE_TYPE_THREAD_EXIT) |
| record.addr = static_cast<addr_t>(tid); |
| } |
| std::vector<scheduler_t::input_reader_t> readers; |
| readers.emplace_back( |
| std::unique_ptr<test_util::mock_reader_t>( |
| new test_util::mock_reader_t(inputs[i])), |
| std::unique_ptr<test_util::mock_reader_t>(new test_util::mock_reader_t()), |
| tid); |
| sched_inputs.emplace_back(std::move(readers)); |
| } |
| scheduler_t::scheduler_options_t sched_ops( |
| scheduler_t::MAP_TO_ANY_OUTPUT, scheduler_t::DEPENDENCY_IGNORE, |
| scheduler_t::SCHEDULER_DEFAULTS, /*verbosity=*/3); |
| |
| class test_tool_t : public analysis_tool_t { |
| public: |
| bool |
| process_memref(const memref_t &memref) override |
| { |
| assert(false); // Only expect parallel mode. |
| return false; |
| } |
| bool |
| print_results() override |
| { |
| return true; |
| } |
| bool |
| parallel_shard_supported() override |
| { |
| return true; |
| } |
| void * |
| parallel_shard_init_stream(int shard_index, void *worker_data, |
| memtrace_stream_t *stream) override |
| { |
| auto per_shard = new per_shard_t; |
| per_shard->index = shard_index; |
| per_shard->stream = stream; |
| return reinterpret_cast<void *>(per_shard); |
| } |
| bool |
| parallel_shard_exit(void *shard_data) override |
| { |
| per_shard_t *shard = reinterpret_cast<per_shard_t *>(shard_data); |
| delete shard; |
| return true; |
| } |
| bool |
| parallel_shard_memref(void *shard_data, const memref_t &memref) override |
| { |
| per_shard_t *shard = reinterpret_cast<per_shard_t *>(shard_data); |
| if (memref.marker.type == TRACE_TYPE_MARKER && |
| (memref.marker.marker_type == TRACE_MARKER_TYPE_CORE_WAIT || |
| memref.marker.marker_type == TRACE_MARKER_TYPE_CORE_IDLE)) |
| return true; |
| // These are our testing goals: these queries. |
| // We have one thread for each of our NUM_INPUTS workloads. |
| assert(shard->stream->get_output_cpuid() == shard->index); |
| // We have just one thread per workload, so they're the same. |
| memref_tid_t tid_only = memref.instr.tid & 0x00000000ffffffff; |
| assert(shard->stream->get_workload_id() == tid_only - BASE_TID); |
| assert(shard->stream->get_input_id() == tid_only - BASE_TID); |
| return true; |
| } |
| |
| private: |
| struct per_shard_t { |
| int index; |
| memtrace_stream_t *stream; |
| }; |
| }; |
| |
| std::vector<analysis_tool_t *> tools; |
| auto test_tool = std::unique_ptr<test_tool_t>(new test_tool_t); |
| tools.push_back(test_tool.get()); |
| mock_analyzer_t analyzer(sched_inputs, &tools[0], (int)tools.size(), |
| /*parallel=*/true, NUM_OUTPUTS, &sched_ops); |
| assert(!!analyzer); |
| bool res = analyzer.run(); |
| assert(res); |
| return true; |
| } |
| |
| template <typename RecordType> |
| class next_trace_pc_test_tool_t : public analysis_tool_tmpl_t<RecordType> { |
| public: |
| bool |
| process_memref(const RecordType &record) override |
| { |
| assert(false); // Only expect parallel mode. |
| return false; |
| } |
| bool |
| print_results() override |
| { |
| return true; |
| } |
| bool |
| parallel_shard_supported() override |
| { |
| return true; |
| } |
| void * |
| parallel_shard_init_stream(int shard_index, void *worker_data, |
| memtrace_stream_t *stream) override |
| { |
| auto per_shard = new per_shard_t; |
| per_shard->stream = stream; |
| per_shard->expected_next_trace_pc = stream->get_next_trace_pc(); |
| return reinterpret_cast<void *>(per_shard); |
| } |
| bool |
| parallel_shard_exit(void *shard_data) override |
| { |
| per_shard_t *shard = reinterpret_cast<per_shard_t *>(shard_data); |
| assert(shard->expected_next_trace_pc == 0); |
| delete shard; |
| return true; |
| } |
| bool |
| record_has_pc(RecordType record, uint64_t &pc); |
| bool |
| parallel_shard_memref(void *shard_data, const RecordType &record) override |
| { |
| per_shard_t *shard = reinterpret_cast<per_shard_t *>(shard_data); |
| uint64_t pc; |
| if (record_has_pc(record, pc)) { |
| assert(pc == shard->expected_next_trace_pc); |
| shard->expected_next_trace_pc = shard->stream->get_next_trace_pc(); |
| } else { |
| assert(shard->expected_next_trace_pc == shard->stream->get_next_trace_pc()); |
| } |
| return true; |
| } |
| |
| private: |
| struct per_shard_t { |
| // Just a non-zero init value so zero results do not sneak by us. |
| uint64_t expected_next_trace_pc = static_cast<uint64_t>(-1); |
| memtrace_stream_t *stream; |
| }; |
| }; |
| |
| template <> |
| bool |
| next_trace_pc_test_tool_t<memref_t>::record_has_pc(memref_t memref, uint64_t &pc) |
| { |
| return memref_has_pc(memref, pc); |
| } |
| template <> |
| bool |
| next_trace_pc_test_tool_t<trace_entry_t>::record_has_pc(trace_entry_t entry, uint64_t &pc) |
| { |
| return entry_has_pc(entry, pc); |
| } |
| |
| template class next_trace_pc_test_tool_t<memref_t>; |
| template class next_trace_pc_test_tool_t<trace_entry_t>; |
| |
| template <typename RecordType, typename ReaderType, typename MockReaderType> |
| bool |
| test_next_trace_pc_queries(std::string test_type) |
| { |
| std::cerr << "\n----------------\nTesting next trace pc queries for " << test_type |
| << "\n"; |
| std::vector<trace_entry_t> input_sequence = { |
| test_util::make_thread(/*tid=*/1), |
| test_util::make_pid(/*pid=*/1), |
| test_util::make_timestamp(1), |
| test_util::make_instr(/*pc=*/42), |
| test_util::make_memref(/*addr=*/420), |
| test_util::make_memref(/*addr=*/421), |
| test_util::make_instr(/*pc=*/43), |
| test_util::make_marker(TRACE_MARKER_TYPE_KERNEL_EVENT, 44), |
| test_util::make_timestamp(2), |
| test_util::make_instr(/*pc=*/101), |
| test_util::make_exit(/*tid=*/1), |
| }; |
| static constexpr int NUM_INPUTS = 3; |
| static constexpr int NUM_OUTPUTS = 1; |
| static constexpr int BASE_TID = 100; |
| std::vector<trace_entry_t> inputs[NUM_INPUTS]; |
| std::vector<typename scheduler_tmpl_t<RecordType, ReaderType>::input_workload_t> |
| sched_inputs; |
| for (int i = 0; i < NUM_INPUTS; i++) { |
| memref_tid_t tid = BASE_TID + i; |
| inputs[i] = input_sequence; |
| for (auto &record : inputs[i]) { |
| if (record.type == TRACE_TYPE_THREAD || record.type == TRACE_TYPE_THREAD_EXIT) |
| record.addr = static_cast<addr_t>(tid); |
| } |
| std::vector<typename scheduler_tmpl_t<RecordType, ReaderType>::input_reader_t> |
| readers; |
| readers.emplace_back( |
| std::unique_ptr<MockReaderType>(new MockReaderType(inputs[i])), |
| std::unique_ptr<MockReaderType>(new MockReaderType()), tid); |
| sched_inputs.emplace_back(std::move(readers)); |
| } |
| typename scheduler_tmpl_t<RecordType, ReaderType>::scheduler_options_t sched_ops = |
| scheduler_tmpl_t<RecordType, ReaderType>::make_scheduler_parallel_options( |
| /*verbosity=*/3); |
| std::vector<analysis_tool_tmpl_t<RecordType> *> tools; |
| auto test_tool = std::unique_ptr<next_trace_pc_test_tool_t<RecordType>>( |
| new next_trace_pc_test_tool_t<RecordType>()); |
| tools.push_back(test_tool.get()); |
| mock_analyzer_tmpl_t<RecordType, ReaderType> analyzer( |
| sched_inputs, &tools[0], (int)tools.size(), |
| /*parallel=*/true, NUM_OUTPUTS, &sched_ops); |
| assert(!!analyzer); |
| bool res = analyzer.run(); |
| assert(res); |
| return true; |
| } |
| |
| bool |
| test_wait_records() |
| { |
| #ifdef HAS_ZIP |
| std::cerr << "\n----------------\nTesting wait records\n"; |
| |
| static constexpr int NUM_INPUTS = 5; |
| static constexpr int NUM_OUTPUTS = 2; |
| static constexpr int NUM_INSTRS = 9; |
| static constexpr memref_tid_t TID_BASE = 100; |
| static constexpr int CPU0 = 6; |
| static constexpr int CPU1 = 9; |
| std::vector<trace_entry_t> inputs[NUM_INPUTS]; |
| for (int i = 0; i < NUM_INPUTS; i++) { |
| memref_tid_t tid = TID_BASE + i; |
| inputs[i].push_back(test_util::make_thread(tid)); |
| inputs[i].push_back(test_util::make_pid(1)); |
| // The last input will be earlier than all others. It will execute |
| // 3 instrs on each core. This is to test the case when an output |
| // begins in the wait state. |
| for (int j = 0; j < (i == NUM_INPUTS - 1 ? 6 : NUM_INSTRS); j++) |
| inputs[i].push_back(test_util::make_instr(42 + j * 4)); |
| inputs[i].push_back(test_util::make_exit(tid)); |
| } |
| |
| // Synthesize a cpu-schedule file with some waits in it, if run in lockstep. |
| // In pure lockstep it looks like this with a - for a wait and . for a |
| // non-instruction record, to help understand the file entries below: |
| // core0: "EEE-AAA-CCCAAACCCBBB.DDD." |
| // core1: "---EEE.BBBDDDBBBDDDAAA.CCC." |
| std::string cpu_fname = "tmp_test_wait_records.zip"; |
| { |
| // Instr counts are 1-based, but the first lists 0 (really starts at 1). |
| std::vector<schedule_entry_t> sched0; |
| sched0.emplace_back(TID_BASE + 4, 10, CPU0, 0); |
| sched0.emplace_back(TID_BASE, 101, CPU0, 0); |
| sched0.emplace_back(TID_BASE + 2, 103, CPU0, 0); |
| sched0.emplace_back(TID_BASE, 105, CPU0, 4); |
| sched0.emplace_back(TID_BASE + 2, 107, CPU0, 4); |
| sched0.emplace_back(TID_BASE + 1, 109, CPU0, 7); |
| sched0.emplace_back(TID_BASE + 3, 111, CPU0, 7); |
| std::vector<schedule_entry_t> sched1; |
| sched1.emplace_back(TID_BASE + 4, 20, CPU1, 4); |
| sched1.emplace_back(TID_BASE + 1, 102, CPU1, 0); |
| sched1.emplace_back(TID_BASE + 3, 104, CPU1, 0); |
| sched1.emplace_back(TID_BASE + 1, 106, CPU1, 4); |
| sched1.emplace_back(TID_BASE + 3, 108, CPU1, 4); |
| sched1.emplace_back(TID_BASE, 110, CPU1, 7); |
| sched1.emplace_back(TID_BASE + 2, 112, CPU1, 7); |
| std::ostringstream cpu0_string; |
| cpu0_string << CPU0; |
| std::ostringstream cpu1_string; |
| cpu1_string << CPU1; |
| zipfile_ostream_t outfile(cpu_fname); |
| std::string err = outfile.open_new_component(cpu0_string.str()); |
| assert(err.empty()); |
| if (!outfile.write(reinterpret_cast<char *>(sched0.data()), |
| sched0.size() * sizeof(sched0[0]))) |
| assert(false); |
| err = outfile.open_new_component(cpu1_string.str()); |
| assert(err.empty()); |
| if (!outfile.write(reinterpret_cast<char *>(sched1.data()), |
| sched1.size() * sizeof(sched1[0]))) |
| assert(false); |
| } |
| |
| // Replay the recorded schedule. |
| std::vector<scheduler_t::input_workload_t> sched_inputs; |
| for (int i = 0; i < NUM_INPUTS; i++) { |
| memref_tid_t tid = TID_BASE + i; |
| std::vector<scheduler_t::input_reader_t> readers; |
| readers.emplace_back( |
| std::unique_ptr<test_util::mock_reader_t>( |
| new test_util::mock_reader_t(inputs[i])), |
| std::unique_ptr<test_util::mock_reader_t>(new test_util::mock_reader_t()), |
| tid); |
| sched_inputs.emplace_back(std::move(readers)); |
| } |
| scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_RECORDED_OUTPUT, |
| scheduler_t::DEPENDENCY_TIMESTAMPS, |
| scheduler_t::SCHEDULER_DEFAULTS, |
| /*verbosity=*/1); |
| zipfile_istream_t infile(cpu_fname); |
| sched_ops.replay_as_traced_istream = &infile; |
| |
| class test_tool_t : public analysis_tool_t { |
| public: |
| // Caller must pre-allocate the vector with a slot per output stream. |
| test_tool_t(std::vector<std::string> *schedule_strings) |
| : schedule_strings_(schedule_strings) |
| { |
| } |
| bool |
| process_memref(const memref_t &memref) override |
| { |
| assert(false); // Only expect parallel mode. |
| return false; |
| } |
| bool |
| print_results() override |
| { |
| return true; |
| } |
| bool |
| parallel_shard_supported() override |
| { |
| return true; |
| } |
| void * |
| parallel_shard_init_stream(int shard_index, void *worker_data, |
| memtrace_stream_t *stream) override |
| { |
| auto per_shard = new per_shard_t; |
| per_shard->index = shard_index; |
| per_shard->stream = stream; |
| return reinterpret_cast<void *>(per_shard); |
| } |
| bool |
| parallel_shard_exit(void *shard_data) override |
| { |
| per_shard_t *shard = reinterpret_cast<per_shard_t *>(shard_data); |
| (*schedule_strings_)[shard->index] = shard->schedule; |
| delete shard; |
| return true; |
| } |
| bool |
| parallel_shard_memref(void *shard_data, const memref_t &memref) override |
| { |
| per_shard_t *shard = reinterpret_cast<per_shard_t *>(shard_data); |
| // We run in *rough* lockstep to avoid a flaky test: we just need to |
| // avoid the 2nd output making it through several initial records |
| // before the 1st output runs and sees a STATUS_WAIT. |
| static constexpr int MAX_WAITS = 100000; |
| int waits; |
| while (global_records_ < 3 * shard->records / 2) { |
| std::this_thread::yield(); |
| // Avoid a hang. It shouldn't happen with these inputs though. |
| if (++waits > MAX_WAITS) |
| break; |
| } |
| ++shard->records; |
| ++global_records_; |
| if (memref.marker.type == TRACE_TYPE_MARKER && |
| memref.marker.marker_type == TRACE_MARKER_TYPE_CORE_WAIT) { |
| shard->schedule += '-'; |
| return true; |
| } |
| int64_t input = shard->stream->get_input_id(); |
| shard->schedule += 'A' + static_cast<char>(input % 26); |
| return true; |
| } |
| |
| private: |
| struct per_shard_t { |
| int index; |
| memtrace_stream_t *stream; |
| std::string schedule; |
| int64_t records = 0; |
| }; |
| std::atomic<int64_t> global_records_ { 0 }; |
| std::vector<std::string> *schedule_strings_; |
| }; |
| |
| std::vector<std::string> schedule_strings(NUM_OUTPUTS); |
| std::vector<analysis_tool_t *> tools; |
| auto test_tool = std::unique_ptr<test_tool_t>(new test_tool_t(&schedule_strings)); |
| tools.push_back(test_tool.get()); |
| mock_analyzer_t analyzer(sched_inputs, &tools[0], (int)tools.size(), |
| /*parallel=*/true, NUM_OUTPUTS, &sched_ops); |
| assert(!!analyzer); |
| bool res = analyzer.run(); |
| assert(res); |
| for (const auto &sched : schedule_strings) { |
| std::cerr << "Schedule: " << sched << "\n"; |
| } |
| // Due to non-determinism we can't put too many restrictions here so we |
| // just ensure we saw at least one wait at the start. |
| assert(schedule_strings[1][0] == '-'); |
| #endif |
| return true; |
| } |
| |
| bool |
| test_tool_errors() |
| { |
| // Tool errors can hang the analyzer if it doesn't tell the scheduler |
| // it's giving up on its input. We test that here. |
| std::cerr << "\n----------------\nTesting tool errors\n"; |
| |
| static constexpr int NUM_INPUTS = 5; |
| static constexpr int NUM_OUTPUTS = 2; |
| static constexpr int NUM_INSTRS = 9; |
| static constexpr memref_tid_t TID_BASE = 100; |
| std::vector<trace_entry_t> inputs[NUM_INPUTS]; |
| for (int i = 0; i < NUM_INPUTS; i++) { |
| memref_tid_t tid = TID_BASE + i; |
| inputs[i].push_back(test_util::make_thread(tid)); |
| inputs[i].push_back(test_util::make_pid(1)); |
| for (int j = 0; j < NUM_INSTRS; j++) |
| inputs[i].push_back(test_util::make_instr(42 + j * 4)); |
| if (i == 4) { |
| // This one input will trigger an error in our error_tool_t. |
| inputs[i].push_back(test_util::make_marker(TRACE_MARKER_TYPE_CPU_ID, 4)); |
| } |
| inputs[i].push_back(test_util::make_exit(tid)); |
| } |
| |
| std::vector<scheduler_t::input_workload_t> sched_inputs; |
| for (int i = 0; i < NUM_INPUTS; i++) { |
| memref_tid_t tid = TID_BASE + i; |
| std::vector<scheduler_t::input_reader_t> readers; |
| readers.emplace_back( |
| std::unique_ptr<test_util::mock_reader_t>( |
| new test_util::mock_reader_t(inputs[i])), |
| std::unique_ptr<test_util::mock_reader_t>(new test_util::mock_reader_t()), |
| tid); |
| sched_inputs.emplace_back(std::move(readers)); |
| } |
| scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT, |
| scheduler_t::DEPENDENCY_IGNORE, |
| scheduler_t::SCHEDULER_DEFAULTS, |
| /*verbosity=*/1); |
| |
| static const char *const TOOL_ERROR_STRING = "cpuid not supported"; |
| |
| class error_tool_t : public analysis_tool_t { |
| public: |
| bool |
| process_memref(const memref_t &memref) override |
| { |
| assert(false); // Only expect parallel mode. |
| return false; |
| } |
| bool |
| print_results() override |
| { |
| return true; |
| } |
| bool |
| parallel_shard_supported() override |
| { |
| return true; |
| } |
| void * |
| parallel_shard_init_stream(int shard_index, void *worker_data, |
| memtrace_stream_t *stream) override |
| { |
| auto per_shard = new per_shard_t; |
| return reinterpret_cast<void *>(per_shard); |
| } |
| bool |
| parallel_shard_exit(void *shard_data) override |
| { |
| per_shard_t *shard = reinterpret_cast<per_shard_t *>(shard_data); |
| delete shard; |
| return true; |
| } |
| std::string |
| parallel_shard_error(void *shard_data) override |
| { |
| per_shard_t *shard = reinterpret_cast<per_shard_t *>(shard_data); |
| return shard->error; |
| } |
| bool |
| parallel_shard_memref(void *shard_data, const memref_t &memref) override |
| { |
| per_shard_t *shard = reinterpret_cast<per_shard_t *>(shard_data); |
| // Return an error in one of the inputs. |
| if (memref.marker.type == TRACE_TYPE_MARKER && |
| memref.marker.marker_type == TRACE_MARKER_TYPE_CPU_ID) { |
| shard->error = TOOL_ERROR_STRING; |
| return false; |
| } |
| return true; |
| } |
| |
| private: |
| struct per_shard_t { |
| std::string error; |
| }; |
| }; |
| |
| std::vector<analysis_tool_t *> tools; |
| auto test_tool = std::unique_ptr<error_tool_t>(new error_tool_t); |
| tools.push_back(test_tool.get()); |
| mock_analyzer_t analyzer(sched_inputs, &tools[0], (int)tools.size(), |
| /*parallel=*/true, NUM_OUTPUTS, &sched_ops); |
| assert(!!analyzer); |
| // If the analyzer doesn't give up the input in the output stream that |
| // encounters it, the scheduler will hang waiting for that input, |
| // so failure in this test would be a CTest timeout. |
| bool res = analyzer.run(); |
| assert(!res); |
| assert(analyzer.get_error_string() == TOOL_ERROR_STRING); |
| return true; |
| } |
| |
| bool |
| test_load_balance() |
| { |
| std::cerr << "\n----------------\nTesting load balancing\n"; |
| |
| // We want inputs with tens of thousands of instructions, so we |
| // synthesize them on the fly instead of requiring arrays. |
| class many_instr_reader_t : public reader_t { |
| public: |
| many_instr_reader_t() = default; |
| many_instr_reader_t(memref_tid_t tid, uint64_t instr_count) |
| : reader_t(/*online=*/false, /*verbosity=*/3, "many_instr_reader_t") |
| , tid_(tid) |
| , instr_count_(instr_count) |
| { |
| } |
| bool |
| init() override |
| { |
| at_eof_ = false; |
| ++*this; |
| return true; |
| } |
| trace_entry_t * |
| read_next_entry() override |
| { |
| ++index_; |
| if (index_ == 0) { |
| entry_ = test_util::make_thread(tid_); |
| } else if (index_ == 1) { |
| entry_ = test_util::make_pid(/*pid=*/1); |
| } else if (index_ < instr_count_ + META_ENTRIES - 1) { |
| entry_ = test_util::make_instr(/*pc=*/index_); |
| } else if (index_ == instr_count_ + META_ENTRIES - 1) { |
| entry_ = test_util::make_exit(tid_); |
| } else { |
| at_eof_ = true; |
| return nullptr; |
| } |
| return &entry_; |
| } |
| std::string |
| get_stream_name() const override |
| { |
| return ""; |
| } |
| |
| private: |
| uint64_t instr_count_ = 0; |
| int index_ = -1; |
| trace_entry_t entry_; |
| memref_tid_t tid_; |
| // We have tid, pid, and exit entries. |
| const int META_ENTRIES = 3; |
| }; |
| |
| // We want many inputs so other workers can steal them. |
| static constexpr int NUM_INPUTS = 512; |
| // We have 4 workers: one slow and 3 fast. |
| static constexpr int NUM_OUTPUTS = 4; |
| // We need enough total instructions to hit the load balance cadence |
| // of 100K. |
| static constexpr int NUM_INSTRS = 5000; |
| static constexpr int BASE_TID = 100; |
| std::vector<trace_entry_t> inputs[NUM_INPUTS]; |
| std::vector<scheduler_t::input_workload_t> sched_inputs; |
| for (int i = 0; i < NUM_INPUTS; i++) { |
| memref_tid_t tid = BASE_TID + i; |
| std::vector<scheduler_t::input_reader_t> readers; |
| readers.emplace_back( |
| std::unique_ptr<many_instr_reader_t>( |
| new many_instr_reader_t(tid, NUM_INSTRS)), |
| std::unique_ptr<many_instr_reader_t>(new many_instr_reader_t()), tid); |
| sched_inputs.emplace_back(std::move(readers)); |
| } |
| scheduler_t::scheduler_options_t sched_ops( |
| scheduler_t::MAP_TO_ANY_OUTPUT, scheduler_t::DEPENDENCY_IGNORE, |
| scheduler_t::SCHEDULER_DEFAULTS, /*verbosity=*/1); |
| // Make it easy to migrate so the fast workers will steal from the slow. |
| sched_ops.time_units_per_us = 1.; |
| sched_ops.migration_threshold_us = 1; |
| // Exit early before the slow can finish its current input. |
| sched_ops.exit_if_fraction_inputs_left = 0.3; |
| |
| constexpr double MAX_RATIO = 2.5; |
| |
| class test_tool_t : public analysis_tool_t { |
| public: |
| bool |
| process_memref(const memref_t &memref) override |
| { |
| assert(false); // Only expect parallel mode. |
| return false; |
| } |
| bool |
| print_results() override |
| { |
| for (int i = 0; i < NUM_OUTPUTS; ++i) { |
| std::cerr << "shard " << i << " saw " << instr_count_[i] |
| << " instructions\n"; |
| if (i > 0) { |
| assert(static_cast<double>(instr_count_[0]) * MAX_RATIO >= |
| instr_count_[i]); |
| } |
| } |
| return true; |
| } |
| bool |
| parallel_shard_supported() override |
| { |
| return true; |
| } |
| void * |
| parallel_shard_init_stream(int shard_index, void *worker_data, |
| memtrace_stream_t *stream) override |
| { |
| auto per_shard = new per_shard_t; |
| per_shard->index = shard_index; |
| per_shard->stream = stream; |
| return reinterpret_cast<void *>(per_shard); |
| } |
| bool |
| parallel_shard_exit(void *shard_data) override |
| { |
| per_shard_t *shard = reinterpret_cast<per_shard_t *>(shard_data); |
| instr_count_[shard->index] = shard->instr_count; |
| delete shard; |
| return true; |
| } |
| bool |
| parallel_shard_memref(void *shard_data, const memref_t &memref) override |
| { |
| per_shard_t *shard = reinterpret_cast<per_shard_t *>(shard_data); |
| if (type_is_instr(memref.instr.type)) { |
| ++shard->instr_count; |
| // Here is where we make worker#0 slow, by sleeping frequently. |
| // Increasing the frequency here makes it more uneven but makes |
| // the load balancing take longer to catch up and we want |
| // to keep the test time down. |
| if (shard->index == 0 && shard->instr_count % 50 == 0) { |
| std::this_thread::sleep_for(std::chrono::milliseconds(1)); |
| } |
| } |
| return true; |
| } |
| |
| private: |
| struct per_shard_t { |
| int index; |
| memtrace_stream_t *stream; |
| uint64_t instr_count = 0; |
| }; |
| uint64_t instr_count_[NUM_OUTPUTS]; |
| }; |
| |
| std::vector<analysis_tool_t *> tools; |
| // This test_tool_t contains our assert that everything is balanced. |
| auto test_tool = std::unique_ptr<test_tool_t>(new test_tool_t); |
| tools.push_back(test_tool.get()); |
| mock_analyzer_t analyzer(sched_inputs, &tools[0], (int)tools.size(), |
| /*parallel=*/true, NUM_OUTPUTS, &sched_ops); |
| assert(!!analyzer); |
| analyzer.set_load_balance(true); |
| // Our test is scaled down to make it fast enough for automation. |
| analyzer.set_load_balance_cadence(NUM_INSTRS / 5); |
| analyzer.set_max_allowed_imbalance(MAX_RATIO); |
| bool res = analyzer.run(); |
| // It would be nice to assert that there was some waiting by having |
| // analyzer give us the worker imbalance_wait_count, but this likely |
| // would become a flaky test, being so dependent on timing. |
| // For now we live with manually observing some waiting. |
| assert(res); |
| analyzer.print_stats(); |
| return true; |
| } |
| |
| int |
| test_main(int argc, const char *argv[]) |
| { |
| if (!test_queries() || !test_wait_records() || !test_tool_errors() || |
| !test_next_trace_pc_queries<memref_t, reader_t, test_util::mock_reader_t>( |
| "memref_t") || |
| !test_next_trace_pc_queries<trace_entry_t, record_reader_t, |
| test_util::mock_record_reader_t>("trace_entry_t") || |
| !test_load_balance()) |
| return 1; |
| std::cerr << "All done!\n"; |
| return 0; |
| } |
| |
| } // namespace drmemtrace |
| } // namespace dynamorio |