Add more coordination between the watchdog thread and the test thread.

This is to resolve the racing betwen the watchdog failure reporting and test iterations.

PiperOrigin-RevId: 854343112
diff --git a/centipede/runner.cc b/centipede/runner.cc
index d47b2f2..fbf5a2b 100644
--- a/centipede/runner.cc
+++ b/centipede/runner.cc
@@ -48,6 +48,7 @@
 #include <vector>
 
 #include "absl/base/nullability.h"
+#include "absl/base/optimization.h"
 #include "./centipede/byte_array_mutator.h"
 #include "./centipede/dispatcher_flag_helper.h"
 #include "./centipede/execution_metadata.h"
@@ -99,6 +100,26 @@
   return tv.tv_sec * kUsecInSec + tv.tv_usec;
 }
 
+// Atomic flags to make sure that (a) watchdog failure is reported only for
+// the current input, and (b) only one thread is handling watchdog failures.
+
+// True if the watchdog thread is detecting failures, false otherwise.
+static std::atomic<bool> watchdog_thread_busy = false;
+// True if a watchdog failure is found, false otherwise.
+static std::atomic<bool> watchdog_failure_found = false;
+
+static void WaitWatchdogThreadIdle() {
+  while (ABSL_PREDICT_FALSE(watchdog_thread_busy.load())) {
+    if (ABSL_PREDICT_FALSE(watchdog_failure_found.load())) {
+      // A failure is found - wait for the process to terminate.
+      sleep(1);  // NOLINT
+    } else {
+      // Busy-wait for the detection.
+      sleep(0);  // NOLINT
+    }
+  }
+}
+
 static void CheckWatchdogLimits() {
   const uint64_t curr_time = time(nullptr);
   struct Resource {
@@ -142,11 +163,7 @@
   };
   for (const auto &resource : resources) {
     if (resource.limit != 0 && resource.value > resource.limit) {
-      // Allow only one invocation to handle a failure: needed because we call
-      // this function periodically in `WatchdogThread()`, but also call it in
-      // `RunOneInput()` after all the work is done.
-      static std::atomic<bool> already_handling_failure = false;
-      if (!already_handling_failure.exchange(true)) {
+      if (!watchdog_failure_found.exchange(true)) {
         if (resource.ignore_report) {
           fprintf(stderr,
                   "========= %s exceeded: %" PRIu64 " > %" PRIu64
@@ -192,7 +209,9 @@
     // No calls to ResetInputTimer() yet: input execution hasn't started.
     if (state->input_start_time == 0) continue;
 
+    watchdog_thread_busy = true;
     CheckWatchdogLimits();
+    watchdog_thread_busy = false;
   }
 }
 
@@ -376,6 +395,7 @@
   if (fuzztest::internal::state->input_start_time.exchange(0) != 0) {
     PostProcessSancov(target_return_value == -1);
   }
+  WaitWatchdogThreadIdle();
   state->stats.post_time_usec = UsecSinceLast();
   state->stats.peak_rss_mb = GetPeakRSSMb();
 }