i#6897 sched file: Add schedule file output to record_filter (#6902)

Leverages the new schedule file library in the drmemtrace record_filter
to produce schedule files for derived traces, which were previously
missing.

Adds schedule file checks to the record filter unit tests. The existing
end-to-end tests run invariant_checker which checks schedule file
contents now that they're created.

Adds a missing check for print_stats failing to record_filter_launcher.

Fixes #6897
diff --git a/clients/drcachesim/CMakeLists.txt b/clients/drcachesim/CMakeLists.txt
index 13658e9..d81f7ba 100644
--- a/clients/drcachesim/CMakeLists.txt
+++ b/clients/drcachesim/CMakeLists.txt
@@ -201,7 +201,8 @@
   tools/filter/encodings2regdeps_filter.h
   tools/filter/func_id_filter.h
   tools/filter/null_filter.h)
-target_link_libraries(drmemtrace_record_filter drmemtrace_simulator)
+target_link_libraries(drmemtrace_record_filter drmemtrace_simulator
+  drmemtrace_schedule_file)
 configure_DynamoRIO_standalone(drmemtrace_record_filter)
 
 add_exported_library(directory_iterator STATIC common/directory_iterator.cpp)
diff --git a/clients/drcachesim/common/zipfile_ostream.h b/clients/drcachesim/common/zipfile_ostream.h
index 814d00c..fd1a56e 100644
--- a/clients/drcachesim/common/zipfile_ostream.h
+++ b/clients/drcachesim/common/zipfile_ostream.h
@@ -1,5 +1,5 @@
 /* **********************************************************
- * Copyright (c) 2022 Google, Inc.  All rights reserved.
+ * Copyright (c) 2022-2024 Google, Inc.  All rights reserved.
  * **********************************************************/
 
 /*
@@ -116,7 +116,7 @@
         // so it's not 1980 in the file.
         if (zipOpenNewFileInZip(zip_, name.c_str(), nullptr, nullptr, 0, nullptr, 0,
                                 nullptr, Z_DEFLATED, Z_DEFAULT_COMPRESSION) != ZIP_OK) {
-            return "Failed to add new component to zipfile";
+            return "Failed to add new component " + name + " to zipfile";
         }
         return "";
     }
diff --git a/clients/drcachesim/tests/record_filter_unit_tests.cpp b/clients/drcachesim/tests/record_filter_unit_tests.cpp
index 9b09e43..fd6d6cc 100644
--- a/clients/drcachesim/tests/record_filter_unit_tests.cpp
+++ b/clients/drcachesim/tests/record_filter_unit_tests.cpp
@@ -1331,31 +1331,59 @@
     if (!local_create_dir(output_dir.c_str())) {
         FATAL_ERROR("Failed to create filtered trace output dir %s", output_dir.c_str());
     }
-    auto null_filter =
-        std::unique_ptr<record_filter_func_t>(new dynamorio::drmemtrace::null_filter_t());
-    std::vector<std::unique_ptr<record_filter_func_t>> filter_funcs;
-    filter_funcs.push_back(std::move(null_filter));
-    // We use a very small stop_timestamp for the record filter. This is to verify that
-    // we emit the TRACE_MARKER_TYPE_FILTER_ENDPOINT marker for each thread even if it
-    // starts after the given stop_timestamp. Since the stop_timestamp is so small, all
-    // other entries are expected to stay.
-    static constexpr uint64_t stop_timestamp_us = 1;
-    auto record_filter = std::unique_ptr<dynamorio::drmemtrace::record_filter_t>(
-        new dynamorio::drmemtrace::record_filter_t(output_dir, std::move(filter_funcs),
-                                                   stop_timestamp_us,
-                                                   /*verbosity=*/0));
-    std::vector<record_analysis_tool_t *> tools;
-    tools.push_back(record_filter.get());
-    record_analyzer_t record_analyzer(op_trace_dir.get_value(), &tools[0],
-                                      static_cast<int>(tools.size()));
-    if (!record_analyzer) {
-        FATAL_ERROR("Failed to initialize record filter: %s",
-                    record_analyzer.get_error_string().c_str());
+    {
+        // New scope so the record_filter_t destructor flushes schedule files.
+        auto null_filter = std::unique_ptr<record_filter_func_t>(
+            new dynamorio::drmemtrace::null_filter_t());
+        std::vector<std::unique_ptr<record_filter_func_t>> filter_funcs;
+        filter_funcs.push_back(std::move(null_filter));
+        // We use a very small stop_timestamp for the record filter. This is to verify
+        // that we emit the TRACE_MARKER_TYPE_FILTER_ENDPOINT marker for each thread even
+        // if it starts after the given stop_timestamp. Since the stop_timestamp is so
+        // small, all other entries are expected to stay.
+        static constexpr uint64_t stop_timestamp_us = 1;
+        auto record_filter = std::unique_ptr<dynamorio::drmemtrace::record_filter_t>(
+            new dynamorio::drmemtrace::record_filter_t(
+                output_dir, std::move(filter_funcs), stop_timestamp_us,
+                /*verbosity=*/0));
+        std::vector<record_analysis_tool_t *> tools;
+        tools.push_back(record_filter.get());
+        record_analyzer_t record_analyzer(op_trace_dir.get_value(), &tools[0],
+                                          static_cast<int>(tools.size()));
+        if (!record_analyzer) {
+            FATAL_ERROR("Failed to initialize record filter: %s",
+                        record_analyzer.get_error_string().c_str());
+        }
+        if (!record_analyzer.run()) {
+            FATAL_ERROR("Failed to run record filter: %s",
+                        record_analyzer.get_error_string().c_str());
+        }
+        if (!record_analyzer.print_stats()) {
+            FATAL_ERROR("Failed to print record filter stats: %s",
+                        record_analyzer.get_error_string().c_str());
+        }
     }
-    if (!record_analyzer.run()) {
-        FATAL_ERROR("Failed to run record filter: %s",
-                    record_analyzer.get_error_string().c_str());
-    }
+
+    // Ensure schedule files were written out.  We leave validating their contents
+    // to the end-to-end tests which run invariant_checker.
+    std::string serial_path = output_dir + DIRSEP + DRMEMTRACE_SERIAL_SCHEDULE_FILENAME;
+#ifdef HAS_ZLIB
+    serial_path += ".gz";
+#endif
+    CHECK(dr_file_exists(serial_path.c_str()), "Serial schedule file missing\n");
+    file_t fd = dr_open_file(serial_path.c_str(), DR_FILE_READ);
+    CHECK(fd != INVALID_FILE, "Cannot open serial schedule file");
+    uint64 file_size;
+    CHECK(dr_file_size(fd, &file_size) && file_size > 0, "Serial schedule file empty");
+    dr_close_file(fd);
+#ifdef HAS_ZIP
+    std::string cpu_path = output_dir + DIRSEP + DRMEMTRACE_CPU_SCHEDULE_FILENAME;
+    CHECK(dr_file_exists(cpu_path.c_str()), "Cpu schedule file missing\n");
+    fd = dr_open_file(cpu_path.c_str(), DR_FILE_READ);
+    CHECK(fd != INVALID_FILE, "Cannot open cpu schedule file");
+    CHECK(dr_file_size(fd, &file_size) && file_size > 0, "Cpu schedule file empty");
+    dr_close_file(fd);
+#endif
 
     basic_counts_t::counters_t c1 = get_basic_counts(op_trace_dir.get_value());
     // We expect one extra marker (TRACE_MARKER_TYPE_FILTER_ENDPOINT) for each thread.
@@ -1419,11 +1447,13 @@
         FATAL_ERROR("Usage error: %s\nUsage:\n%s", parse_err.c_str(),
                     droption_parser_t::usage_short(DROPTION_SCOPE_ALL).c_str());
     }
+    dr_standalone_init();
     if (!test_cache_and_type_filter() || !test_chunk_update() || !test_trim_filter() ||
         !test_null_filter() || !test_wait_filter() || !test_encodings2regdeps_filter() ||
         !test_func_id_filter())
         return 1;
     fprintf(stderr, "All done!\n");
+    dr_standalone_exit();
     return 0;
 }
 
diff --git a/clients/drcachesim/tools/filter/record_filter.cpp b/clients/drcachesim/tools/filter/record_filter.cpp
index 4717b06..b08c21c 100644
--- a/clients/drcachesim/tools/filter/record_filter.cpp
+++ b/clients/drcachesim/tools/filter/record_filter.cpp
@@ -569,6 +569,15 @@
             if (per_shard->archive_writer &&
                 per_shard->input_entry_count - per_shard->input_count_at_ordinal == 2)
                 output = false;
+            if (output) {
+                uint64_t instr_ord = per_shard->cur_chunk_instrs +
+                    // For archives we increment chunk_ordinal up front.
+                    (per_shard->archive_writer ? per_shard->chunk_ordinal - 1
+                                               : per_shard->chunk_ordinal) *
+                        per_shard->chunk_size;
+                per_shard->sched_info.record_cpu_id(per_shard->tid, entry.addr,
+                                                    per_shard->last_timestamp, instr_ord);
+            }
             break;
         case TRACE_MARKER_TYPE_PHYSICAL_ADDRESS:
         case TRACE_MARKER_TYPE_PHYSICAL_ADDRESS_NOT_AVAILABLE:
@@ -861,6 +870,88 @@
     return false;
 }
 
+std::string
+record_filter_t::open_serial_schedule_file()
+{
+    if (serial_schedule_ostream_ != nullptr)
+        return "Already opened";
+    if (output_dir_.empty())
+        return "No output directory specified";
+    std::string path = output_dir_ + DIRSEP + DRMEMTRACE_SERIAL_SCHEDULE_FILENAME;
+#ifdef HAS_ZLIB
+    path += ".gz";
+    serial_schedule_file_ = std::unique_ptr<std::ostream>(new gzip_ostream_t(path));
+#else
+    serial_schedule_file_ =
+        std::unique_ptr<std::ostream>(new std::ofstream(path, std::ofstream::binary));
+#endif
+    if (!serial_schedule_file_)
+        return "Failed to open serial schedule file " + path;
+    serial_schedule_ostream_ = serial_schedule_file_.get();
+    return "";
+}
+
+std::string
+record_filter_t::open_cpu_schedule_file()
+{
+    if (cpu_schedule_ostream_ != nullptr)
+        return "Already opened";
+    if (output_dir_.empty())
+        return "No output directory specified";
+    std::string path = output_dir_ + DIRSEP + DRMEMTRACE_CPU_SCHEDULE_FILENAME;
+#ifdef HAS_ZIP
+    cpu_schedule_file_ = std::unique_ptr<archive_ostream_t>(new zipfile_ostream_t(path));
+    if (!cpu_schedule_file_)
+        return "Failed to open cpu schedule file " + path;
+    cpu_schedule_ostream_ = cpu_schedule_file_.get();
+    return "";
+#else
+    return "Zipfile support is required for cpu schedule files";
+#endif
+}
+
+std::string
+record_filter_t::write_schedule_files()
+{
+
+    schedule_file_t sched;
+    std::string err;
+    err = open_serial_schedule_file();
+    if (!err.empty())
+        return err;
+    err = open_cpu_schedule_file();
+    if (!err.empty()) {
+#ifdef HAS_ZIP
+        return err;
+#else
+        if (starts_with(err, "Zipfile support")) {
+            // Just skip the cpu file.
+        } else {
+            return err;
+        }
+#endif
+    }
+    for (const auto &shard : shard_map_) {
+        err = sched.merge_shard_data(shard.second->sched_info);
+        if (!err.empty())
+            return err;
+    }
+    if (serial_schedule_ostream_ == nullptr)
+        return "Serial file not opened";
+    err = sched.write_serial_file(serial_schedule_ostream_);
+    if (!err.empty())
+        return err;
+    // Make the cpu file optional for !HAS_ZIP, but don't wrap this inside
+    // HAS_ZIP as some subclasses have non-minizip zip support and don't have
+    // that define.
+    if (cpu_schedule_ostream_ != nullptr) {
+        err = sched.write_cpu_file(cpu_schedule_ostream_);
+        if (!err.empty())
+            return err;
+    }
+    return "";
+}
+
 bool
 record_filter_t::print_results()
 {
@@ -878,6 +969,13 @@
     }
     std::cerr << "Output " << output_entry_count << " entries from " << input_entry_count
               << " entries.\n";
+    if (output_dir_.empty()) {
+        std::cerr << "Not writing schedule files: no output directory was specified.\n";
+        return res;
+    }
+    error_string_ = write_schedule_files();
+    if (!error_string_.empty())
+        res = false;
     return res;
 }
 
diff --git a/clients/drcachesim/tools/filter/record_filter.h b/clients/drcachesim/tools/filter/record_filter.h
index ff17ff6..216b6b8 100644
--- a/clients/drcachesim/tools/filter/record_filter.h
+++ b/clients/drcachesim/tools/filter/record_filter.h
@@ -48,6 +48,7 @@
 #include "memref.h"
 #include "memtrace_stream.h"
 #include "raw2trace_shared.h"
+#include "schedule_file.h"
 #include "trace_entry.h"
 
 namespace dynamorio {
@@ -179,6 +180,12 @@
     std::string
     parallel_shard_error(void *shard_data) override;
 
+    // Automatically called from print_results().
+    // Calls open_serial_schedule_file() and open_cpu_schedule_file() and then
+    // writes out the file contents.
+    std::string
+    write_schedule_files();
+
 protected:
     struct dcontext_cleanup_last_t {
     public:
@@ -237,6 +244,7 @@
         // Cached value updated on context switches.
         per_input_t *per_input = nullptr;
         record_filter_info_t record_filter_info;
+        schedule_file_t::per_shard_t sched_info;
     };
 
     virtual std::string
@@ -279,6 +287,10 @@
     std::string output_ext_;
     uint64_t version_ = 0;
     uint64_t filetype_ = 0;
+    std::unique_ptr<std::ostream> serial_schedule_file_;
+    std::ostream *serial_schedule_ostream_ = nullptr;
+    std::unique_ptr<archive_ostream_t> cpu_schedule_file_;
+    archive_ostream_t *cpu_schedule_ostream_ = nullptr;
 
 private:
     virtual bool
@@ -293,6 +305,14 @@
     virtual std::string
     initialize_shard_output(per_shard_t *per_shard, memtrace_stream_t *shard_stream);
 
+    // Sets serial_schedule_ostream_, optionally using serial_schedule_file_.
+    virtual std::string
+    open_serial_schedule_file();
+
+    // Sets cpu_schedule_ostream_, optionally using cpu_schedule_file_.
+    virtual std::string
+    open_cpu_schedule_file();
+
     bool
     write_trace_entries(per_shard_t *shard, const std::vector<trace_entry_t> &entries);
 
diff --git a/clients/drcachesim/tools/record_filter_launcher.cpp b/clients/drcachesim/tools/record_filter_launcher.cpp
index 9d6fe3e..e1a8bb7 100644
--- a/clients/drcachesim/tools/record_filter_launcher.cpp
+++ b/clients/drcachesim/tools/record_filter_launcher.cpp
@@ -183,7 +183,10 @@
         FATAL_ERROR("Failed to run trace filter: %s",
                     record_analyzer.get_error_string().c_str());
     }
-    record_analyzer.print_stats();
+    if (!record_analyzer.print_stats()) {
+        FATAL_ERROR("Failed to print stats: %s",
+                    record_analyzer.get_error_string().c_str());
+    }
 
     fprintf(stderr, "Done!\n");
     return 0;