Internal change

PiperOrigin-RevId: 369753540
diff --git a/cpp/core/internal/base_pcp_handler.cc b/cpp/core/internal/base_pcp_handler.cc
index ae2dbc9..0fd75f8 100644
--- a/cpp/core/internal/base_pcp_handler.cc
+++ b/cpp/core/internal/base_pcp_handler.cc
@@ -82,34 +82,37 @@
   NEARBY_LOG(INFO, "StartAdvertising with supported mediums: %s",
              GetStringValueOfSupportedMediums(options).c_str());
   ConnectionOptions advertising_options = options.CompatibleOptions();
-  RunOnPcpHandlerThread([this, client, &service_id, &info, &advertising_options,
-                         &response]() RUN_ON_PCP_HANDLER_THREAD() {
-    // The endpoint id inside of the advertisement is different to high
-    // visibility and low visibility mode. In order to decide if client should
-    // grab the high visibility or low visibility id, it needs to tell client
-    // which one right now, before client#StartedAdvertising.
-    if (ShouldEnterHighVisibilityMode(advertising_options)) {
-      client->EnterHighVisibilityMode();
-    }
+  RunOnPcpHandlerThread(
+      "start-advertising",
+      [this, client, &service_id, &info, &advertising_options, &response]()
+          RUN_ON_PCP_HANDLER_THREAD() {
+            // The endpoint id inside of the advertisement is different to high
+            // visibility and low visibility mode. In order to decide if client
+            // should grab the high visibility or low visibility id, it needs to
+            // tell client which one right now, before
+            // client#StartedAdvertising.
+            if (ShouldEnterHighVisibilityMode(advertising_options)) {
+              client->EnterHighVisibilityMode();
+            }
 
-    auto result =
-        StartAdvertisingImpl(client, service_id, client->GetLocalEndpointId(),
-                             info.endpoint_info, advertising_options);
-    if (!result.status.Ok()) {
-      client->ExitHighVisibilityMode();
-      response.Set(result.status);
-      return;
-    }
+            auto result = StartAdvertisingImpl(
+                client, service_id, client->GetLocalEndpointId(),
+                info.endpoint_info, advertising_options);
+            if (!result.status.Ok()) {
+              client->ExitHighVisibilityMode();
+              response.Set(result.status);
+              return;
+            }
 
-    // Now that we've succeeded, mark the client as advertising.
-    // Save the advertising options for local reference in later process like
-    // upgrading bandwidth.
-    advertising_listener_ = info.listener;
-    client->StartedAdvertising(service_id, GetStrategy(), info.listener,
-                               absl::MakeSpan(result.mediums),
-                               advertising_options);
-    response.Set({Status::kSuccess});
-  });
+            // Now that we've succeeded, mark the client as advertising.
+            // Save the advertising options for local reference in later process
+            // like upgrading bandwidth.
+            advertising_listener_ = info.listener;
+            client->StartedAdvertising(service_id, GetStrategy(), info.listener,
+                                       absl::MakeSpan(result.mediums),
+                                       advertising_options);
+            response.Set({Status::kSuccess});
+          });
   return WaitForResult(absl::StrCat("StartAdvertising(", service_id, ")"),
                        client->GetClientId(), &response);
 }
@@ -117,11 +120,12 @@
 void BasePcpHandler::StopAdvertising(ClientProxy* client) {
   NEARBY_LOGS(INFO) << "StopAdvertising id=" << client->GetLocalEndpointId();
   CountDownLatch latch(1);
-  RunOnPcpHandlerThread([this, client, &latch]() RUN_ON_PCP_HANDLER_THREAD() {
-    StopAdvertisingImpl(client);
-    client->StoppedAdvertising();
-    latch.CountDown();
-  });
+  RunOnPcpHandlerThread("stop-advertising",
+                        [this, client, &latch]() RUN_ON_PCP_HANDLER_THREAD() {
+                          StopAdvertisingImpl(client);
+                          client->StoppedAdvertising();
+                          latch.CountDown();
+                        });
   WaitForLatch("StopAdvertising", &latch);
 }
 
@@ -188,33 +192,36 @@
 
   NEARBY_LOG(INFO, "StartDiscovery with supported mediums: %s",
              GetStringValueOfSupportedMediums(options).c_str());
-  RunOnPcpHandlerThread([this, client, service_id, discovery_options, &listener,
-                         &response]() RUN_ON_PCP_HANDLER_THREAD() {
-    // Ask the implementation to attempt to start discovery.
-    auto result = StartDiscoveryImpl(client, service_id, discovery_options);
-    if (!result.status.Ok()) {
-      response.Set(result.status);
-      return;
-    }
+  RunOnPcpHandlerThread(
+      "start-discovery", [this, client, service_id, discovery_options,
+                          &listener, &response]() RUN_ON_PCP_HANDLER_THREAD() {
+        // Ask the implementation to attempt to start discovery.
+        auto result = StartDiscoveryImpl(client, service_id, discovery_options);
+        if (!result.status.Ok()) {
+          response.Set(result.status);
+          return;
+        }
 
-    // Now that we've succeeded, mark the client as discovering and clear
-    // out any old endpoints we had discovered.
-    discovered_endpoints_.clear();
-    client->StartedDiscovery(service_id, GetStrategy(), listener,
-                             absl::MakeSpan(result.mediums), discovery_options);
-    response.Set({Status::kSuccess});
-  });
+        // Now that we've succeeded, mark the client as discovering and clear
+        // out any old endpoints we had discovered.
+        discovered_endpoints_.clear();
+        client->StartedDiscovery(service_id, GetStrategy(), listener,
+                                 absl::MakeSpan(result.mediums),
+                                 discovery_options);
+        response.Set({Status::kSuccess});
+      });
   return WaitForResult(absl::StrCat("StartDiscovery(", service_id, ")"),
                        client->GetClientId(), &response);
 }
 
 void BasePcpHandler::StopDiscovery(ClientProxy* client) {
   CountDownLatch latch(1);
-  RunOnPcpHandlerThread([this, client, &latch]() RUN_ON_PCP_HANDLER_THREAD() {
-    StopDiscoveryImpl(client);
-    client->StoppedDiscovery();
-    latch.CountDown();
-  });
+  RunOnPcpHandlerThread("stop-discovery",
+                        [this, client, &latch]() RUN_ON_PCP_HANDLER_THREAD() {
+                          StopDiscoveryImpl(client);
+                          client->StoppedDiscovery();
+                          latch.CountDown();
+                        });
 
   WaitForLatch("StopDiscovery", &latch);
 }
@@ -223,7 +230,8 @@
     ClientProxy* client, const std::string& service_id,
     const OutOfBandConnectionMetadata& metadata) {
   CountDownLatch latch(1);
-  RunOnPcpHandlerThread([this, client, service_id, metadata, &latch]()
+  RunOnPcpHandlerThread("inject-endpoint",
+                        [this, client, service_id, metadata, &latch]()
                             RUN_ON_PCP_HANDLER_THREAD() {
                               InjectEndpointImpl(client, service_id, metadata);
                               latch.CountDown();
@@ -261,8 +269,9 @@
   return result.result();
 }
 
-void BasePcpHandler::RunOnPcpHandlerThread(Runnable runnable) {
-  serial_executor_.Execute(std::move(runnable));
+void BasePcpHandler::RunOnPcpHandlerThread(const std::string& name,
+                                           Runnable runnable) {
+  serial_executor_.Execute(name, std::move(runnable));
 }
 
 EncryptionRunner::ResultListener BasePcpHandler::GetResultListener() {
@@ -273,6 +282,7 @@
                  const std::string& auth_token,
                  const ByteArray& raw_auth_token) {
             RunOnPcpHandlerThread(
+                "encryption-success",
                 [this, endpoint_id, raw_ukey2 = ukey2.release(), auth_token,
                  raw_auth_token]() RUN_ON_PCP_HANDLER_THREAD() mutable {
                   OnEncryptionSuccessRunnable(
@@ -282,12 +292,13 @@
           },
       .on_failure_cb =
           [this](const std::string& endpoint_id, EndpointChannel* channel) {
-            RunOnPcpHandlerThread([this, endpoint_id,
-                                   channel]() RUN_ON_PCP_HANDLER_THREAD() {
-              NEARBY_LOG(ERROR, "Encryption failed for %s on medium %d",
-                         endpoint_id.c_str(), channel->GetMedium());
-              OnEncryptionFailureRunnable(endpoint_id, channel);
-            });
+            RunOnPcpHandlerThread(
+                "encryption-failure",
+                [this, endpoint_id, channel]() RUN_ON_PCP_HANDLER_THREAD() {
+                  NEARBY_LOG(ERROR, "Encryption failed for %s on medium %d",
+                             endpoint_id.c_str(), channel->GetMedium());
+                  OnEncryptionFailureRunnable(endpoint_id, channel);
+                });
           },
   };
 }
@@ -395,119 +406,126 @@
                                          const ConnectionRequestInfo& info,
                                          const ConnectionOptions& options) {
   auto result = std::make_shared<Future<Status>>();
-  RunOnPcpHandlerThread([this, client, &info, options, endpoint_id,
-                         result]() RUN_ON_PCP_HANDLER_THREAD() {
-    absl::Time start_time = SystemClock::ElapsedRealtime();
+  RunOnPcpHandlerThread(
+      "request-connection", [this, client, &info, options, endpoint_id,
+                             result]() RUN_ON_PCP_HANDLER_THREAD() {
+        absl::Time start_time = SystemClock::ElapsedRealtime();
 
-    // If we already have a pending connection, then we shouldn't allow any more
-    // outgoing connections to this endpoint.
-    if (pending_connections_.count(endpoint_id)) {
-      NEARBY_LOG(INFO, "Connection already exists: id=%s", endpoint_id.c_str());
-      result->Set({Status::kAlreadyConnectedToEndpoint});
-      return;
-    }
+        // If we already have a pending connection, then we shouldn't allow any
+        // more outgoing connections to this endpoint.
+        if (pending_connections_.count(endpoint_id)) {
+          NEARBY_LOG(INFO, "Connection already exists: id=%s",
+                     endpoint_id.c_str());
+          result->Set({Status::kAlreadyConnectedToEndpoint});
+          return;
+        }
 
-    // If our child class says we can't send any more outgoing connections,
-    // listen to them.
-    if (ShouldEnforceTopologyConstraints(client->GetAdvertisingOptions()) &&
-        !CanSendOutgoingConnection(client)) {
-      NEARBY_LOG(INFO, "Outgoing connection not allowed: id=%s",
-                 endpoint_id.c_str());
-      result->Set({Status::kOutOfOrderApiCall});
-      return;
-    }
+        // If our child class says we can't send any more outgoing connections,
+        // listen to them.
+        if (ShouldEnforceTopologyConstraints(client->GetAdvertisingOptions()) &&
+            !CanSendOutgoingConnection(client)) {
+          NEARBY_LOG(INFO, "Outgoing connection not allowed: id=%s",
+                     endpoint_id.c_str());
+          result->Set({Status::kOutOfOrderApiCall});
+          return;
+        }
 
-    DiscoveredEndpoint* endpoint = GetDiscoveredEndpoint(endpoint_id);
-    if (endpoint == nullptr) {
-      NEARBY_LOG(INFO, "Discovered endpoint not found: id=%s",
-                 endpoint_id.c_str());
-      result->Set({Status::kEndpointUnknown});
-      return;
-    }
+        DiscoveredEndpoint* endpoint = GetDiscoveredEndpoint(endpoint_id);
+        if (endpoint == nullptr) {
+          NEARBY_LOG(INFO, "Discovered endpoint not found: id=%s",
+                     endpoint_id.c_str());
+          result->Set({Status::kEndpointUnknown});
+          return;
+        }
 
-    auto remote_bluetooth_mac_address =
-        BluetoothUtils::ToString(options.remote_bluetooth_mac_address);
-    if (!remote_bluetooth_mac_address.empty()) {
-      if (AppendRemoteBluetoothMacAddressEndpoint(
-              endpoint_id, remote_bluetooth_mac_address,
-              client->GetDiscoveryOptions()))
-        NEARBY_LOGS(INFO) << "Appended remote Bluetooth MAC Address endpoint "
-                          << "[" << remote_bluetooth_mac_address << "]";
-    }
+        auto remote_bluetooth_mac_address =
+            BluetoothUtils::ToString(options.remote_bluetooth_mac_address);
+        if (!remote_bluetooth_mac_address.empty()) {
+          if (AppendRemoteBluetoothMacAddressEndpoint(
+                  endpoint_id, remote_bluetooth_mac_address,
+                  client->GetDiscoveryOptions()))
+            NEARBY_LOGS(INFO)
+                << "Appended remote Bluetooth MAC Address endpoint "
+                << "[" << remote_bluetooth_mac_address << "]";
+        }
 
-    if (AppendWebRTCEndpoint(endpoint_id, client->GetDiscoveryOptions()))
-      NEARBY_LOGS(INFO) << "Appended Web RTC endpoint.";
+        if (AppendWebRTCEndpoint(endpoint_id, client->GetDiscoveryOptions()))
+          NEARBY_LOGS(INFO) << "Appended Web RTC endpoint.";
 
-    auto discovered_endpoints = GetDiscoveredEndpoints(endpoint_id);
-    std::unique_ptr<EndpointChannel> channel;
-    ConnectImplResult connect_impl_result;
+        auto discovered_endpoints = GetDiscoveredEndpoints(endpoint_id);
+        std::unique_ptr<EndpointChannel> channel;
+        ConnectImplResult connect_impl_result;
 
-    for (auto connect_endpoint : discovered_endpoints) {
-      if (!MediumSupportedByClientOptions(connect_endpoint->medium, options))
-        continue;
-      connect_impl_result = ConnectImpl(client, connect_endpoint);
-      if (connect_impl_result.status.Ok()) {
-        channel = std::move(connect_impl_result.endpoint_channel);
-        break;
-      }
-    }
+        for (auto connect_endpoint : discovered_endpoints) {
+          if (!MediumSupportedByClientOptions(connect_endpoint->medium,
+                                              options))
+            continue;
+          connect_impl_result = ConnectImpl(client, connect_endpoint);
+          if (connect_impl_result.status.Ok()) {
+            channel = std::move(connect_impl_result.endpoint_channel);
+            break;
+          }
+        }
 
-    if (channel == nullptr) {
-      NEARBY_LOG(INFO, "Endpoint channel not available: id=%s",
-                 endpoint_id.c_str());
-      ProcessPreConnectionInitiationFailure(
-          endpoint_id, channel.get(), connect_impl_result.status, result.get());
-      return;
-    }
+        if (channel == nullptr) {
+          NEARBY_LOG(INFO, "Endpoint channel not available: id=%s",
+                     endpoint_id.c_str());
+          ProcessPreConnectionInitiationFailure(endpoint_id, channel.get(),
+                                                connect_impl_result.status,
+                                                result.get());
+          return;
+        }
 
-    NEARBY_LOG(INFO, "Sending connection request: id=%s", endpoint_id.c_str());
-    // Generate the nonce to use for this connection.
-    std::int32_t nonce = prng_.NextInt32();
+        NEARBY_LOG(INFO, "Sending connection request: id=%s",
+                   endpoint_id.c_str());
+        // Generate the nonce to use for this connection.
+        std::int32_t nonce = prng_.NextInt32();
 
-    // The first message we have to send, after connecting, is to tell the
-    // endpoint about ourselves.
-    Exception write_exception = WriteConnectionRequestFrame(
-        channel.get(), client->GetLocalEndpointId(), info.endpoint_info, nonce,
-        GetSupportedConnectionMediumsByPriority(options));
-    if (!write_exception.Ok()) {
-      NEARBY_LOG(INFO, "Failed to send connection request: id=%s",
-                 endpoint_id.c_str());
-      ProcessPreConnectionInitiationFailure(
-          endpoint_id, channel.get(), {Status::kEndpointIoError}, result.get());
-      return;
-    }
+        // The first message we have to send, after connecting, is to tell the
+        // endpoint about ourselves.
+        Exception write_exception = WriteConnectionRequestFrame(
+            channel.get(), client->GetLocalEndpointId(), info.endpoint_info,
+            nonce, GetSupportedConnectionMediumsByPriority(options));
+        if (!write_exception.Ok()) {
+          NEARBY_LOG(INFO, "Failed to send connection request: id=%s",
+                     endpoint_id.c_str());
+          ProcessPreConnectionInitiationFailure(endpoint_id, channel.get(),
+                                                {Status::kEndpointIoError},
+                                                result.get());
+          return;
+        }
 
-    NEARBY_LOG(INFO, "adding connection to pending set: id=%s",
-               endpoint_id.c_str());
+        NEARBY_LOG(INFO, "adding connection to pending set: id=%s",
+                   endpoint_id.c_str());
 
-    // We've successfully connected to the device, and are now about to jump on
-    // to the EncryptionRunner thread to start running our encryption protocol.
-    // We'll mark ourselves as pending in case we get another call to
-    // RequestConnection or OnIncomingConnection, so that we can cancel the
-    // connection if needed.
-    EndpointChannel* endpoint_channel =
-        pending_connections_
-            .emplace(endpoint_id,
-                     PendingConnectionInfo{
-                         .client = client,
-                         .remote_endpoint_info = endpoint->endpoint_info,
-                         .nonce = nonce,
-                         .is_incoming = false,
-                         .start_time = start_time,
-                         .listener = info.listener,
-                         .options = options,
-                         .result = result,
-                         .channel = std::move(channel),
-                     })
-            .first->second.channel.get();
+        // We've successfully connected to the device, and are now about to jump
+        // on to the EncryptionRunner thread to start running our encryption
+        // protocol. We'll mark ourselves as pending in case we get another call
+        // to RequestConnection or OnIncomingConnection, so that we can cancel
+        // the connection if needed.
+        EndpointChannel* endpoint_channel =
+            pending_connections_
+                .emplace(endpoint_id,
+                         PendingConnectionInfo{
+                             .client = client,
+                             .remote_endpoint_info = endpoint->endpoint_info,
+                             .nonce = nonce,
+                             .is_incoming = false,
+                             .start_time = start_time,
+                             .listener = info.listener,
+                             .options = options,
+                             .result = result,
+                             .channel = std::move(channel),
+                         })
+                .first->second.channel.get();
 
-    NEARBY_LOG(INFO, "Initiating secure connection: id=%s",
-               endpoint_id.c_str());
-    // Next, we'll set up encryption. When it's done, our future will return and
-    // RequestConnection() will finish.
-    encryption_runner_.StartClient(client, endpoint_id, endpoint_channel,
-                                   GetResultListener());
-  });
+        NEARBY_LOG(INFO, "Initiating secure connection: id=%s",
+                   endpoint_id.c_str());
+        // Next, we'll set up encryption. When it's done, our future will return
+        // and RequestConnection() will finish.
+        encryption_runner_.StartClient(client, endpoint_id, endpoint_channel,
+                                       GetResultListener());
+      });
   NEARBY_LOG(INFO, "Waiting for connection to complete: id=%s",
              endpoint_id.c_str());
   auto status =
@@ -672,8 +690,8 @@
     const PayloadListener& payload_listener) {
   Future<Status> response;
   RunOnPcpHandlerThread(
-      [this, client, endpoint_id, payload_listener,
-       &response]() RUN_ON_PCP_HANDLER_THREAD() {
+      "accept-connection", [this, client, endpoint_id, payload_listener,
+                            &response]() RUN_ON_PCP_HANDLER_THREAD() {
         NEARBY_LOG(INFO, "AcceptConnection: id=%s", endpoint_id.c_str());
         if (!pending_connections_.count(endpoint_id)) {
           NEARBY_LOG(INFO, "AcceptConnection: no pending connection for id=%s",
@@ -726,51 +744,52 @@
 Status BasePcpHandler::RejectConnection(ClientProxy* client,
                                         const std::string& endpoint_id) {
   Future<Status> response;
-  RunOnPcpHandlerThread([this, client, endpoint_id,
-                         &response]() RUN_ON_PCP_HANDLER_THREAD() {
-    NEARBY_LOG(INFO, "RejectConnection: id=%s", endpoint_id.c_str());
-    if (!pending_connections_.count(endpoint_id)) {
-      NEARBY_LOG(INFO, "RejectConnection: no pending connection for id=%s",
-                 endpoint_id.c_str());
-      response.Set({Status::kEndpointUnknown});
-      return;
-    }
-    auto& connection_info = pending_connections_[endpoint_id];
+  RunOnPcpHandlerThread(
+      "reject-connection",
+      [this, client, endpoint_id, &response]() RUN_ON_PCP_HANDLER_THREAD() {
+        NEARBY_LOG(INFO, "RejectConnection: id=%s", endpoint_id.c_str());
+        if (!pending_connections_.count(endpoint_id)) {
+          NEARBY_LOG(INFO, "RejectConnection: no pending connection for id=%s",
+                     endpoint_id.c_str());
+          response.Set({Status::kEndpointUnknown});
+          return;
+        }
+        auto& connection_info = pending_connections_[endpoint_id];
 
-    // By this point in the flow, connection_info->endpoint_channel_ has been
-    // nulled out because ownership of that EndpointChannel was passed on to
-    // EndpointChannelManager via a call to
-    // EndpointManager::registerEndpoint(), so we now need to get access to the
-    // EndpointChannel from the authoritative owner.
-    std::shared_ptr<EndpointChannel> channel =
-        channel_manager_->GetChannelForEndpoint(endpoint_id);
-    if (channel == nullptr) {
-      NEARBY_LOG(
-          ERROR,
-          "Channel destroyed before Reject; bring down connection: id=%s",
-          endpoint_id.c_str());
-      ProcessPreConnectionResultFailure(client, endpoint_id);
-      response.Set({Status::kEndpointUnknown});
-      return;
-    }
+        // By this point in the flow, connection_info->endpoint_channel_ has
+        // been nulled out because ownership of that EndpointChannel was passed
+        // on to EndpointChannelManager via a call to
+        // EndpointManager::registerEndpoint(), so we now need to get access to
+        // the EndpointChannel from the authoritative owner.
+        std::shared_ptr<EndpointChannel> channel =
+            channel_manager_->GetChannelForEndpoint(endpoint_id);
+        if (channel == nullptr) {
+          NEARBY_LOG(
+              ERROR,
+              "Channel destroyed before Reject; bring down connection: id=%s",
+              endpoint_id.c_str());
+          ProcessPreConnectionResultFailure(client, endpoint_id);
+          response.Set({Status::kEndpointUnknown});
+          return;
+        }
 
-    Exception write_exception = channel->Write(
-        parser::ForConnectionResponse(Status::kConnectionRejected));
-    if (!write_exception.Ok()) {
-      NEARBY_LOG(INFO, "RejectConnection: failed to send response: id=%s",
-                 endpoint_id.c_str());
-      ProcessPreConnectionResultFailure(client, endpoint_id);
-      response.Set({Status::kEndpointIoError});
-      return;
-    }
+        Exception write_exception = channel->Write(
+            parser::ForConnectionResponse(Status::kConnectionRejected));
+        if (!write_exception.Ok()) {
+          NEARBY_LOG(INFO, "RejectConnection: failed to send response: id=%s",
+                     endpoint_id.c_str());
+          ProcessPreConnectionResultFailure(client, endpoint_id);
+          response.Set({Status::kEndpointIoError});
+          return;
+        }
 
-    NEARBY_LOG(INFO, "RejectConnection: rejecting locally: id=%s",
-               endpoint_id.c_str());
-    connection_info.LocalEndpointRejectedConnection(endpoint_id);
-    EvaluateConnectionResult(client, endpoint_id,
-                             false /* can_close_immediately */);
-    response.Set({Status::kSuccess});
-  });
+        NEARBY_LOG(INFO, "RejectConnection: rejecting locally: id=%s",
+                   endpoint_id.c_str());
+        connection_info.LocalEndpointRejectedConnection(endpoint_id);
+        EvaluateConnectionResult(client, endpoint_id,
+                                 false /* can_close_immediately */);
+        response.Set({Status::kSuccess});
+      });
 
   return WaitForResult(absl::StrCat("RejectConnection(", endpoint_id, ")"),
                        client->GetClientId(), &response);
@@ -781,45 +800,46 @@
                                      ClientProxy* client,
                                      proto::connections::Medium medium) {
   CountDownLatch latch(1);
-  RunOnPcpHandlerThread([this, client, endpoint_id, frame,
-                         &latch]() RUN_ON_PCP_HANDLER_THREAD() {
-    NEARBY_LOG(INFO, "OnConnectionResponse: id=%s", endpoint_id.c_str());
+  RunOnPcpHandlerThread(
+      "incoming-frame",
+      [this, client, endpoint_id, frame, &latch]() RUN_ON_PCP_HANDLER_THREAD() {
+        NEARBY_LOG(INFO, "OnConnectionResponse: id=%s", endpoint_id.c_str());
 
-    if (client->HasRemoteEndpointResponded(endpoint_id)) {
-      NEARBY_LOG(INFO, "OnConnectionResponse: already handled; id=%s",
-                 endpoint_id.c_str());
-      return;
-    }
+        if (client->HasRemoteEndpointResponded(endpoint_id)) {
+          NEARBY_LOG(INFO, "OnConnectionResponse: already handled; id=%s",
+                     endpoint_id.c_str());
+          return;
+        }
 
-    const ConnectionResponseFrame& connection_response =
-        frame.v1().connection_response();
+        const ConnectionResponseFrame& connection_response =
+            frame.v1().connection_response();
 
-    // For backward compatible, here still check both status and
-    // response parameters until the response feature is roll out in all
-    // supported devices.
-    bool accepted = false;
-    if (connection_response.has_response()) {
-      accepted =
-          connection_response.response() == ConnectionResponseFrame::ACCEPT;
-    } else {
-      accepted = connection_response.status() == Status::kSuccess;
-    }
-    if (accepted) {
-      NEARBY_LOG(INFO, "OnConnectionResponse: remote accepted; id=%s",
-                 endpoint_id.c_str());
-      client->RemoteEndpointAcceptedConnection(endpoint_id);
-    } else {
-      NEARBY_LOG(INFO,
-                 "OnConnectionResponse: remote rejected; id=%s; status=%d",
-                 endpoint_id.c_str(), connection_response.status());
-      client->RemoteEndpointRejectedConnection(endpoint_id);
-    }
+        // For backward compatible, here still check both status and
+        // response parameters until the response feature is roll out in all
+        // supported devices.
+        bool accepted = false;
+        if (connection_response.has_response()) {
+          accepted =
+              connection_response.response() == ConnectionResponseFrame::ACCEPT;
+        } else {
+          accepted = connection_response.status() == Status::kSuccess;
+        }
+        if (accepted) {
+          NEARBY_LOG(INFO, "OnConnectionResponse: remote accepted; id=%s",
+                     endpoint_id.c_str());
+          client->RemoteEndpointAcceptedConnection(endpoint_id);
+        } else {
+          NEARBY_LOG(INFO,
+                     "OnConnectionResponse: remote rejected; id=%s; status=%d",
+                     endpoint_id.c_str(), connection_response.status());
+          client->RemoteEndpointRejectedConnection(endpoint_id);
+        }
 
-    EvaluateConnectionResult(client, endpoint_id,
-                             /* can_close_immediately= */ true);
+        EvaluateConnectionResult(client, endpoint_id,
+                                 /* can_close_immediately= */ true);
 
-    latch.CountDown();
-  });
+        latch.CountDown();
+      });
   WaitForLatch("OnIncomingFrame()", &latch);
 }
 
@@ -830,17 +850,19 @@
     barrier.CountDown();
     return;
   }
-  RunOnPcpHandlerThread([this, client, endpoint_id,
-                         barrier]() RUN_ON_PCP_HANDLER_THREAD() mutable {
-    auto item = pending_alarms_.find(endpoint_id);
-    if (item != pending_alarms_.end()) {
-      auto& alarm = item->second;
-      alarm.Cancel();
-      pending_alarms_.erase(item);
-    }
-    ProcessPreConnectionResultFailure(client, endpoint_id);
-    barrier.CountDown();
-  });
+  RunOnPcpHandlerThread("on-endpoint-disconnect",
+                        [this, client, endpoint_id, barrier]()
+                            RUN_ON_PCP_HANDLER_THREAD() mutable {
+                              auto item = pending_alarms_.find(endpoint_id);
+                              if (item != pending_alarms_.end()) {
+                                auto& alarm = item->second;
+                                alarm.Cancel();
+                                pending_alarms_.erase(item);
+                              }
+                              ProcessPreConnectionResultFailure(client,
+                                                                endpoint_id);
+                              barrier.CountDown();
+                            });
 }
 
 BluetoothDevice BasePcpHandler::GetRemoteBluetoothDevice(
diff --git a/cpp/core/internal/base_pcp_handler.h b/cpp/core/internal/base_pcp_handler.h
index 47ac391..3cf834b 100644
--- a/cpp/core/internal/base_pcp_handler.h
+++ b/cpp/core/internal/base_pcp_handler.h
@@ -223,7 +223,7 @@
     std::unique_ptr<EndpointChannel> endpoint_channel;
   };
 
-  void RunOnPcpHandlerThread(Runnable runnable);
+  void RunOnPcpHandlerThread(const std::string& name, Runnable runnable);
 
   BluetoothDevice GetRemoteBluetoothDevice(
       const std::string& remote_bluetooth_mac_address);
diff --git a/cpp/core/internal/bwu_manager.cc b/cpp/core/internal/bwu_manager.cc
index 626ac7a..b5063b8 100644
--- a/cpp/core/internal/bwu_manager.cc
+++ b/cpp/core/internal/bwu_manager.cc
@@ -131,7 +131,7 @@
                                         Medium new_medium) {
   NEARBY_LOG(INFO, "InitiateBwuForEndpoint for endpoint %s with medium %d",
              endpoint_id.c_str(), new_medium);
-  RunOnBwuManagerThread([this, client, endpoint_id, new_medium]() {
+  RunOnBwuManagerThread("bwu-init", [this, client, endpoint_id, new_medium]() {
     Medium proposed_medium = ChooseBestUpgradeMedium(
         client->GetUpgradeMediums(endpoint_id).GetMediums(true));
     if (new_medium != Medium::UNKNOWN_MEDIUM) {
@@ -207,12 +207,14 @@
     return;
   auto bwu_frame = frame.v1().bandwidth_upgrade_negotiation();
   if (FeatureFlags::GetInstance().GetFlags().enable_async_bandwidth_upgrade) {
-    RunOnBwuManagerThread([this, client, endpoint_id, bwu_frame]() {
-      OnBwuNegotiationFrame(client, bwu_frame, endpoint_id);
-    });
+    RunOnBwuManagerThread(
+        "bwu-on-incoming-frame", [this, client, endpoint_id, bwu_frame]() {
+          OnBwuNegotiationFrame(client, bwu_frame, endpoint_id);
+        });
   } else {
     CountDownLatch latch(1);
-    RunOnBwuManagerThread([this, client, endpoint_id, bwu_frame, &latch]() {
+    RunOnBwuManagerThread("bwu-on-incoming-frame", [this, client, endpoint_id,
+                                                    bwu_frame, &latch]() {
       OnBwuNegotiationFrame(client, bwu_frame, endpoint_id);
       latch.CountDown();
     });
@@ -224,38 +226,40 @@
                                       const std::string& endpoint_id,
                                       CountDownLatch barrier) {
   NEARBY_LOG(INFO, "OnEndpointDisconnect for endpoint %s", endpoint_id.c_str());
-  RunOnBwuManagerThread([this, client, endpoint_id, barrier]() mutable {
-    if (medium_ == Medium::UNKNOWN_MEDIUM) {
-      barrier.CountDown();
-      return;
-    }
+  RunOnBwuManagerThread(
+      "bwu-on-endpoint-disconnect",
+      [this, client, endpoint_id, barrier]() mutable {
+        if (medium_ == Medium::UNKNOWN_MEDIUM) {
+          barrier.CountDown();
+          return;
+        }
 
-    if (handler_) {
-      handler_->OnEndpointDisconnect(client, endpoint_id);
-    }
+        if (handler_) {
+          handler_->OnEndpointDisconnect(client, endpoint_id);
+        }
 
-    auto item = previous_endpoint_channels_.extract(endpoint_id);
+        auto item = previous_endpoint_channels_.extract(endpoint_id);
 
-    if (!item.empty()) {
-      auto old_channel = item.mapped();
-      if (old_channel != nullptr) {
-        old_channel->Close(DisconnectionReason::SHUTDOWN);
-      }
-    }
-    in_progress_upgrades_.erase(endpoint_id);
-    CancelRetryUpgradeAlarm(endpoint_id);
+        if (!item.empty()) {
+          auto old_channel = item.mapped();
+          if (old_channel != nullptr) {
+            old_channel->Close(DisconnectionReason::SHUTDOWN);
+          }
+        }
+        in_progress_upgrades_.erase(endpoint_id);
+        CancelRetryUpgradeAlarm(endpoint_id);
 
-    successfully_upgraded_endpoints_.erase(endpoint_id);
+        successfully_upgraded_endpoints_.erase(endpoint_id);
 
-    // If this was our very last endpoint:
-    //
-    // a) revert all the changes for currentBwuMedium.
-    // b) reset currentBwuMedium.
-    if (channel_manager_->GetConnectedEndpointsCount() <= 1) {
-      Revert();
-    }
-    barrier.CountDown();
-  });
+        // If this was our very last endpoint:
+        //
+        // a) revert all the changes for currentBwuMedium.
+        // b) reset currentBwuMedium.
+        if (channel_manager_->GetConnectedEndpointsCount() <= 1) {
+          Revert();
+        }
+        barrier.CountDown();
+      });
 }
 
 BwuHandler* BwuManager::SetCurrentBwuHandler(Medium medium) {
@@ -312,7 +316,8 @@
              client->GetServiceId().c_str());
   std::shared_ptr<BwuHandler::IncomingSocketConnection> connection(
       mutable_connection.release());
-  RunOnBwuManagerThread([this, client, connection]() {
+  RunOnBwuManagerThread("bwu-on-incoming-connection", [this, client,
+                                                       connection]() {
     EndpointChannel* channel = connection->channel.get();
     if (channel == nullptr) {
       connection->socket->Close();
@@ -356,8 +361,9 @@
   });
 }
 
-void BwuManager::RunOnBwuManagerThread(Runnable runnable) {
-  serial_executor_.Execute(std::move(runnable));
+void BwuManager::RunOnBwuManagerThread(const std::string& name,
+                                       Runnable runnable) {
+  serial_executor_.Execute(name, std::move(runnable));
 }
 
 void BwuManager::RunUpgradeProtocol(
@@ -912,7 +918,8 @@
   CancelableAlarm alarm(
       "BWU alarm",
       [this, client, endpoint_id]() {
-        RunOnBwuManagerThread([this, client, endpoint_id]() {
+        RunOnBwuManagerThread("bwu-retry-upgrade", [this, client,
+                                                    endpoint_id]() {
           if (!client->IsConnectedToEndpoint(endpoint_id)) {
             return;
           }
diff --git a/cpp/core/internal/bwu_manager.h b/cpp/core/internal/bwu_manager.h
index 2ba3766..f219429 100644
--- a/cpp/core/internal/bwu_manager.h
+++ b/cpp/core/internal/bwu_manager.h
@@ -103,7 +103,8 @@
       absl::Seconds(5);
   BwuHandler* SetCurrentBwuHandler(Medium medium);
   void InitBwuHandlers();
-  void RunOnBwuManagerThread(std::function<void()> runnable);
+  void RunOnBwuManagerThread(const std::string& name,
+                             std::function<void()> runnable);
   std::vector<Medium> StripOutUnavailableMediums(
       const std::vector<Medium>& mediums);
   Medium ChooseBestUpgradeMedium(const std::vector<Medium>& mediums);
diff --git a/cpp/core/internal/encryption_runner.cc b/cpp/core/internal/encryption_runner.cc
index 24832a6..24833d7 100644
--- a/cpp/core/internal/encryption_runner.cc
+++ b/cpp/core/internal/encryption_runner.cc
@@ -360,6 +360,7 @@
     EndpointChannel* endpoint_channel,
     EncryptionRunner::ResultListener&& listener) {
   server_executor_.Execute(
+      "encryption-server",
       [runnable{ServerRunnable(client, &alarm_executor_, endpoint_id,
                                endpoint_channel, std::move(listener))}]() {
         runnable();
@@ -371,6 +372,7 @@
     EndpointChannel* endpoint_channel,
     EncryptionRunner::ResultListener&& listener) {
   client_executor_.Execute(
+      "encryption-client",
       [runnable{ClientRunnable(client, &alarm_executor_, endpoint_id,
                                endpoint_channel, std::move(listener))}]() {
         runnable();
diff --git a/cpp/core/internal/endpoint_manager.cc b/cpp/core/internal/endpoint_manager.cc
index d731851..3fa4b48 100644
--- a/cpp/core/internal/endpoint_manager.cc
+++ b/cpp/core/internal/endpoint_manager.cc
@@ -91,6 +91,8 @@
   // will retry and attempt to pick another channel.
   // If channel is deleted (no mapping), or it is still the same channel
   // (same Medium) on which we got the Exception::kIo, we terminate the loop.
+  NEARBY_LOG(INFO, "Started worker loop name=%s, endpoint=%s",
+             runnable_name.c_str(), endpoint_id.c_str());
   Medium last_failed_medium = Medium::UNKNOWN_MEDIUM;
   while (true) {
     // It's important to keep re-fetching the EndpointChannel for an endpoint
@@ -271,7 +273,7 @@
 EndpointManager::~EndpointManager() {
   NEARBY_LOG(INFO, "EndpointManager going down");
   CountDownLatch latch(1);
-  RunOnEndpointManagerThread([this, &latch]() {
+  RunOnEndpointManagerThread("bring-down-endpoints", [this, &latch]() {
     NEARBY_LOG(INFO, "Bringing down endpoints");
     for (auto& item : endpoints_) {
       const std::string& endpoint_id = item.first;
@@ -392,9 +394,11 @@
   // Instead, we release() a pointer, and pass a raw pointer, which is copyalbe.
   // We ignore the risk of job not scheduled (and an associated risk of memory
   // leak), because this may only happen during service shutdown.
-  RunOnEndpointManagerThread([this, client, channel = channel.release(),
-                              &endpoint_id, &info, &options, &listener,
-                              &latch]() {
+  RunOnEndpointManagerThread("register-endpoint", [this, client,
+                                                   channel = channel.release(),
+                                                   &endpoint_id, &info,
+                                                   &options, &listener,
+                                                   &latch]() {
     if (endpoints_.contains(endpoint_id)) {
       NEARBY_LOG(WARNING, "Registing duplicate endpoint %s",
                  endpoint_id.c_str());
@@ -473,7 +477,8 @@
                                          const std::string& endpoint_id) {
   NEARBY_LOG(ERROR, "UnregisterEndpoint for endpoint %s", endpoint_id.c_str());
   CountDownLatch latch(1);
-  RunOnEndpointManagerThread([this, client, endpoint_id, &latch]() {
+  RunOnEndpointManagerThread("unregister-endpoint", [this, client, endpoint_id,
+                                                     &latch]() {
     RemoveEndpoint(client, endpoint_id,
                    client->IsConnectedToEndpoint(endpoint_id));
     latch.CountDown();
@@ -509,7 +514,7 @@
 void EndpointManager::DiscardEndpoint(ClientProxy* client,
                                       const std::string& endpoint_id) {
   NEARBY_LOG(ERROR, "DiscardEndpoint for endpoint %s", endpoint_id.c_str());
-  RunOnEndpointManagerThread([this, client, endpoint_id]() {
+  RunOnEndpointManagerThread("discard-endpoint", [this, client, endpoint_id]() {
     RemoveEndpoint(client, endpoint_id,
                    /*notify=*/
                    client->IsConnectedToEndpoint(endpoint_id));
@@ -632,15 +637,16 @@
 }
 
 void EndpointManager::StartEndpointReader(Runnable runnable) {
-  handlers_executor_.Execute(std::move(runnable));
+  handlers_executor_.Execute("reader", std::move(runnable));
 }
 
 void EndpointManager::StartEndpointKeepAliveManager(Runnable runnable) {
-  keep_alive_executor_.Execute(std::move(runnable));
+  keep_alive_executor_.Execute("keep-alive", std::move(runnable));
 }
 
-void EndpointManager::RunOnEndpointManagerThread(Runnable runnable) {
-  serial_executor_.Execute(std::move(runnable));
+void EndpointManager::RunOnEndpointManagerThread(const std::string& name,
+                                                 Runnable runnable) {
+  serial_executor_.Execute(name, std::move(runnable));
 }
 
 }  // namespace connections
diff --git a/cpp/core/internal/endpoint_manager.h b/cpp/core/internal/endpoint_manager.h
index 516bea7..d28bf1b 100644
--- a/cpp/core/internal/endpoint_manager.h
+++ b/cpp/core/internal/endpoint_manager.h
@@ -234,7 +234,7 @@
   void StartEndpointKeepAliveManager(Runnable runnable);
 
   // Executes all jobs sequentially, on a serial_executor_.
-  void RunOnEndpointManagerThread(Runnable runnable);
+  void RunOnEndpointManagerThread(const std::string& name, Runnable runnable);
 
   EndpointChannelManager* channel_manager_;
 
diff --git a/cpp/core/internal/mediums/bluetooth_classic.cc b/cpp/core/internal/mediums/bluetooth_classic.cc
index 9efff55..6f870b1 100644
--- a/cpp/core/internal/mediums/bluetooth_classic.cc
+++ b/cpp/core/internal/mediums/bluetooth_classic.cc
@@ -271,19 +271,20 @@
   // Start the accept loop on a dedicated thread - this stays alive and
   // listening for new incoming connections until StopAcceptingConnections() is
   // invoked.
-  accept_loops_runner_.Execute([callback = std::move(callback),
-                                server_socket = std::move(owned_socket),
-                                service_name]() mutable {
-    while (true) {
-      BluetoothSocket client_socket = server_socket.Accept();
-      if (!client_socket.IsValid()) {
-        server_socket.Close();
-        break;
-      }
+  accept_loops_runner_.Execute(
+      "bt-accept",
+      [callback = std::move(callback), server_socket = std::move(owned_socket),
+       service_name]() mutable {
+        while (true) {
+          BluetoothSocket client_socket = server_socket.Accept();
+          if (!client_socket.IsValid()) {
+            server_socket.Close();
+            break;
+          }
 
-      callback.accepted_cb(std::move(client_socket));
-    }
-  });
+          callback.accepted_cb(std::move(client_socket));
+        }
+      });
 
   return true;
 }
diff --git a/cpp/core/internal/mediums/webrtc.cc b/cpp/core/internal/mediums/webrtc.cc
index 047e1af..0d40747 100644
--- a/cpp/core/internal/mediums/webrtc.cc
+++ b/cpp/core/internal/mediums/webrtc.cc
@@ -266,7 +266,7 @@
     // This allows a remote device to message us over Tachyon.
     auto signaling_complete_callback = [this, &socket_future](bool success) {
       if (!success) {
-        OffloadFromThread([&socket_future]() {
+        OffloadFromThread("rtc-signaling-fail", [&socket_future]() {
           socket_future.SetException({Exception::kFailed});
         });
       }
@@ -388,7 +388,7 @@
 
 void WebRtc::OnSignalingMessage(const std::string& service_id,
                                 const ByteArray& message) {
-  OffloadFromThread([this, service_id, message]() {
+  OffloadFromThread("rtc-on-signaling-message", [this, service_id, message]() {
     ProcessTachyonInboxMessage(service_id, message);
   });
 }
@@ -399,7 +399,7 @@
     return;
   }
 
-  OffloadFromThread([this, service_id]() {
+  OffloadFromThread("rtc-on-signaling-complete", [this, service_id]() {
     MutexLock lock(&mutex_);
     const auto& info_entry = accepting_connections_info_.find(service_id);
     if (info_entry == accepting_connections_info_.end()) {
@@ -661,8 +661,9 @@
   // Transform the DataChannel into a socket.
   auto socket = std::make_unique<WebRtcSocket>("WebRtcSocket", data_channel);
   socket->SetOnSocketClosedListener({[this, remote_peer_id]() {
-    OffloadFromThread(
-        [this, remote_peer_id]() { ProcessDataChannelClosed(remote_peer_id); });
+    OffloadFromThread("rtc-socket-closed-cb", [this, remote_peer_id]() {
+      ProcessDataChannelClosed(remote_peer_id);
+    });
   }});
   WebRtcSocketWrapper wrapper = WebRtcSocketWrapper(std::move(socket));
 
@@ -745,6 +746,7 @@
              ::location::nearby::mediums::IceCandidate encoded_ice_candidate =
                  webrtc_frames::EncodeIceCandidate(*ice_candidate);
              OffloadFromThread(
+                 "rtc-ice-candidates",
                  [this, service_id, remote_peer_id, encoded_ice_candidate]() {
                    ProcessLocalIceCandidate(service_id, remote_peer_id,
                                             encoded_ice_candidate);
@@ -756,6 +758,7 @@
                 remote_peer_id](rtc::scoped_refptr<webrtc::DataChannelInterface>
                                     data_channel) {
                 OffloadFromThread(
+                    "rtc-channel-created",
                     [this, service_id, remote_peer_id, data_channel]() {
                       ProcessDataChannelCreated(service_id, remote_peer_id,
                                                 data_channel);
@@ -763,17 +766,19 @@
               }},
           .data_channel_message_received_cb = {[this, remote_peer_id](
                                                    const ByteArray& message) {
-            OffloadFromThread([this, remote_peer_id, message]() {
-              ProcessDataChannelMessage(remote_peer_id, message);
-            });
+            OffloadFromThread(
+                "rtc-channel-messsage", [this, remote_peer_id, message]() {
+                  ProcessDataChannelMessage(remote_peer_id, message);
+                });
           }},
           .data_channel_buffered_amount_changed_cb = {[this, remote_peer_id]() {
-            OffloadFromThread([this, remote_peer_id]() {
-              ProcessDataChannelBufferAmountChanged(remote_peer_id);
-            });
+            OffloadFromThread(
+                "rtc-channel-buffer-change", [this, remote_peer_id]() {
+                  ProcessDataChannelBufferAmountChanged(remote_peer_id);
+                });
           }},
           .data_channel_closed_cb = {[this, remote_peer_id]() {
-            OffloadFromThread([this, remote_peer_id]() {
+            OffloadFromThread("rtc-channel-closed", [this, remote_peer_id]() {
               ProcessDataChannelClosed(remote_peer_id);
             });
           }},
@@ -796,8 +801,8 @@
   }
 }
 
-void WebRtc::OffloadFromThread(Runnable runnable) {
-  single_thread_executor_.Execute(std::move(runnable));
+void WebRtc::OffloadFromThread(const std::string& name, Runnable runnable) {
+  single_thread_executor_.Execute(name, std::move(runnable));
 }
 
 }  // namespace mediums
diff --git a/cpp/core/internal/mediums/webrtc.h b/cpp/core/internal/mediums/webrtc.h
index ac3ae2c..7772b8e 100644
--- a/cpp/core/internal/mediums/webrtc.h
+++ b/cpp/core/internal/mediums/webrtc.h
@@ -241,7 +241,7 @@
   void RestartTachyonReceiveMessages(const std::string& service_id)
       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
 
-  void OffloadFromThread(Runnable runnable);
+  void OffloadFromThread(const std::string& name, Runnable runnable);
 
   Mutex mutex_;
 
diff --git a/cpp/core/internal/p2p_cluster_pcp_handler.cc b/cpp/core/internal/p2p_cluster_pcp_handler.cc
index f8cef07..de79374 100644
--- a/cpp/core/internal/p2p_cluster_pcp_handler.cc
+++ b/cpp/core/internal/p2p_cluster_pcp_handler.cc
@@ -206,155 +206,168 @@
 void P2pClusterPcpHandler::BluetoothDeviceDiscoveredHandler(
     ClientProxy* client, const std::string& service_id,
     BluetoothDevice device) {
-  RunOnPcpHandlerThread([this, client, service_id,
-                         device]() RUN_ON_PCP_HANDLER_THREAD() {
-    // Make sure we are still discovering before proceeding.
-    if (!client->IsDiscovering()) {
-      NEARBY_LOG(INFO,
-                 "BT discovery handler (FOUND) [client=%p, service=%s]: not "
-                 "in discovery mode",
-                 client, service_id.c_str());
-      return;
-    }
+  RunOnPcpHandlerThread(
+      "p2p-bt-device-discovered",
+      [this, client, service_id, device]()
+          RUN_ON_PCP_HANDLER_THREAD() {
+            // Make sure we are still discovering before proceeding.
+            if (!client->IsDiscovering()) {
+              NEARBY_LOG(
+                  INFO,
+                  "BT discovery handler (FOUND) [client=%p, service=%s]: not "
+                  "in discovery mode",
+                  client, service_id.c_str());
+              return;
+            }
 
-    // Parse the Bluetooth device name.
-    const std::string device_name_string = device.GetName();
-    BluetoothDeviceName device_name(device_name_string);
+            // Parse the Bluetooth device name.
+            const std::string device_name_string = device.GetName();
+            BluetoothDeviceName device_name(device_name_string);
 
-    // Make sure the Bluetooth device name points to a valid
-    // endpoint we're discovering.
-    if (!IsRecognizedBluetoothEndpoint(device_name_string, service_id,
-                                       device_name))
-      return;
+            // Make sure the Bluetooth device name points to a valid
+            // endpoint we're discovering.
+            if (!IsRecognizedBluetoothEndpoint(device_name_string, service_id,
+                                               device_name))
+              return;
 
-    // Report the discovered endpoint to the client.
-    NEARBY_LOGS(INFO)
-        << "Invoking BasePcpHandler::OnEndpointFound() for BT service="
-        << service_id << "; id=" << device_name.GetEndpointId() << "; name="
-        << absl::BytesToHexString(device_name.GetEndpointInfo().data());
-    OnEndpointFound(
-        client, std::make_shared<BluetoothEndpoint>(BluetoothEndpoint{
+            // Report the discovered endpoint to the client.
+            NEARBY_LOGS(INFO)
+                << "Invoking BasePcpHandler::OnEndpointFound() for BT service="
+                << service_id << "; id=" << device_name.GetEndpointId()
+                << "; name="
+                << absl::BytesToHexString(device_name.GetEndpointInfo().data());
+            OnEndpointFound(
+                client,
+                std::make_shared<BluetoothEndpoint>(BluetoothEndpoint{
                     {device_name.GetEndpointId(), device_name.GetEndpointInfo(),
                      service_id, proto::connections::Medium::BLUETOOTH,
                      device_name.GetWebRtcState()},
                     device,
                 }));
-  });
+          });
 }
 
 void P2pClusterPcpHandler::BluetoothNameChangedHandler(
     ClientProxy* client, const std::string& service_id,
     BluetoothDevice device) {
-  RunOnPcpHandlerThread([this, client, service_id,
-                         device]() RUN_ON_PCP_HANDLER_THREAD() {
-    // Make sure we are still discovering before proceeding.
-    if (!client->IsDiscovering()) {
-      NEARBY_LOG(INFO,
-                 "BT discovery handler (CHANGED) [client=%p, service=%s]: not "
-                 "in discovery mode",
-                 client, service_id.c_str());
-      return;
-    }
+  RunOnPcpHandlerThread(
+      "p2p-bt-name-changed",
+      [this, client, service_id, device]() RUN_ON_PCP_HANDLER_THREAD() {
+        // Make sure we are still discovering before proceeding.
+        if (!client->IsDiscovering()) {
+          NEARBY_LOG(
+              INFO,
+              "BT discovery handler (CHANGED) [client=%p, service=%s]: not "
+              "in discovery mode",
+              client, service_id.c_str());
+          return;
+        }
 
-    // Parse the Bluetooth device name.
-    const std::string device_name_string = device.GetName();
-    BluetoothDeviceName device_name(device_name_string);
-    NEARBY_LOG(INFO,
-               "BT discovery handler (CHANGED) [client=%p, service=%s]: "
-               "processing new name %s",
-               client, service_id.c_str(), device_name_string.c_str());
+        // Parse the Bluetooth device name.
+        const std::string device_name_string = device.GetName();
+        BluetoothDeviceName device_name(device_name_string);
+        NEARBY_LOG(INFO,
+                   "BT discovery handler (CHANGED) [client=%p, service=%s]: "
+                   "processing new name %s",
+                   client, service_id.c_str(), device_name_string.c_str());
 
-    // By this point, the BluetoothDevice passed to us has a different name than
-    // what we may have discovered before. We need to iterate over the found
-    // BluetoothEndpoints and compare their addresses to see the devices are the
-    // same. We are not guaranteed to discover a match, since the old name may
-    // not have been formatted for Nearby Connections.
-    for (auto endpoint :
-         GetDiscoveredEndpoints(proto::connections::Medium::BLUETOOTH)) {
-      BluetoothEndpoint* bluetoothEndpoint =
-          static_cast<BluetoothEndpoint*>(endpoint);
-      NEARBY_LOG(INFO,
-                 "BT discovery handler (CHANGED) [client=%p, service=%s]: "
-                 "comparing MAC addresses with existing endpoint %s. They have "
-                 "MAC address %s and the new endpoint has MAC address %s.",
-                 client, service_id.c_str(),
-                 bluetoothEndpoint->bluetooth_device.GetName().c_str(),
-                 bluetoothEndpoint->bluetooth_device.GetMacAddress().c_str(),
-                 device.GetMacAddress().c_str());
-      if (bluetoothEndpoint->bluetooth_device.GetMacAddress() ==
-          device.GetMacAddress()) {
-        // Report the BluetoothEndpoint as lost to the client.
-        NEARBY_LOG(
-            INFO,
-            "BT discovery handler (LOST) [client=%p, service=%s]: report "
-            "to client",
-            client, service_id.c_str());
-        OnEndpointLost(client, *endpoint);
-        break;
-      }
-    }
+        // By this point, the BluetoothDevice passed to us has a different name
+        // than what we may have discovered before. We need to iterate over the
+        // found BluetoothEndpoints and compare their addresses to see the
+        // devices are the same. We are not guaranteed to discover a match,
+        // since the old name may not have been formatted for Nearby
+        // Connections.
+        for (auto endpoint :
+             GetDiscoveredEndpoints(proto::connections::Medium::BLUETOOTH)) {
+          BluetoothEndpoint* bluetoothEndpoint =
+              static_cast<BluetoothEndpoint*>(endpoint);
+          NEARBY_LOG(
+              INFO,
+              "BT discovery handler (CHANGED) [client=%p, service=%s]: "
+              "comparing MAC addresses with existing endpoint %s. They have "
+              "MAC address %s and the new endpoint has MAC address %s.",
+              client, service_id.c_str(),
+              bluetoothEndpoint->bluetooth_device.GetName().c_str(),
+              bluetoothEndpoint->bluetooth_device.GetMacAddress().c_str(),
+              device.GetMacAddress().c_str());
+          if (bluetoothEndpoint->bluetooth_device.GetMacAddress() ==
+              device.GetMacAddress()) {
+            // Report the BluetoothEndpoint as lost to the client.
+            NEARBY_LOG(
+                INFO,
+                "BT discovery handler (LOST) [client=%p, service=%s]: report "
+                "to client",
+                client, service_id.c_str());
+            OnEndpointLost(client, *endpoint);
+            break;
+          }
+        }
 
-    // Make sure the Bluetooth device name points to a valid
-    // endpoint we're discovering.
-    if (!IsRecognizedBluetoothEndpoint(device_name_string, service_id,
-                                       device_name)) {
-      NEARBY_LOG(INFO,
-                 "BT discovery handler (CHANGED) [client=%p, service=%s]: The "
-                 "new name is not recognized. Ignoring.",
-                 client, service_id.c_str());
-      return;
-    }
+        // Make sure the Bluetooth device name points to a valid
+        // endpoint we're discovering.
+        if (!IsRecognizedBluetoothEndpoint(device_name_string, service_id,
+                                           device_name)) {
+          NEARBY_LOG(
+              INFO,
+              "BT discovery handler (CHANGED) [client=%p, service=%s]: The "
+              "new name is not recognized. Ignoring.",
+              client, service_id.c_str());
+          return;
+        }
 
-    // Report the discovered endpoint to the client.
-    NEARBY_LOGS(INFO)
-        << "Invoking BasePcpHandler::OnEndpointFound() for BT service="
-        << service_id << "; id=" << device_name.GetEndpointId() << "; name="
-        << absl::BytesToHexString(device_name.GetEndpointInfo().data());
-    OnEndpointFound(
-        client, std::make_shared<BluetoothEndpoint>(BluetoothEndpoint{
-                    {device_name.GetEndpointId(), device_name.GetEndpointInfo(),
-                     service_id, proto::connections::Medium::BLUETOOTH,
-                     device_name.GetWebRtcState()},
-                    device,
-                }));
-  });
+        // Report the discovered endpoint to the client.
+        NEARBY_LOGS(INFO)
+            << "Invoking BasePcpHandler::OnEndpointFound() for BT service="
+            << service_id << "; id=" << device_name.GetEndpointId() << "; name="
+            << absl::BytesToHexString(device_name.GetEndpointInfo().data());
+        OnEndpointFound(
+            client,
+            std::make_shared<BluetoothEndpoint>(BluetoothEndpoint{
+                {device_name.GetEndpointId(), device_name.GetEndpointInfo(),
+                 service_id, proto::connections::Medium::BLUETOOTH,
+                 device_name.GetWebRtcState()},
+                device,
+            }));
+      });
 }
 
 void P2pClusterPcpHandler::BluetoothDeviceLostHandler(
     ClientProxy* client, const std::string& service_id,
     BluetoothDevice& device) {
   const std::string& device_name_string = device.GetName();
-  RunOnPcpHandlerThread([this, client, service_id,
-                         device_name_string]() RUN_ON_PCP_HANDLER_THREAD() {
-    // Make sure we are still discovering before proceeding.
-    if (!client->IsDiscovering()) {
-      NEARBY_LOG(INFO,
-                 "BT discovery handler (LOST) [client=%p, service=%s]: not "
-                 "in discovery mode",
-                 client, service_id.c_str());
-      return;
-    }
+  RunOnPcpHandlerThread(
+      "p2p-bt-device-lost", [this, client, service_id,
+                             device_name_string]() RUN_ON_PCP_HANDLER_THREAD() {
+        // Make sure we are still discovering before proceeding.
+        if (!client->IsDiscovering()) {
+          NEARBY_LOG(INFO,
+                     "BT discovery handler (LOST) [client=%p, service=%s]: not "
+                     "in discovery mode",
+                     client, service_id.c_str());
+          return;
+        }
 
-    // Parse the Bluetooth device name.
-    BluetoothDeviceName device_name(device_name_string);
+        // Parse the Bluetooth device name.
+        BluetoothDeviceName device_name(device_name_string);
 
-    // Make sure the Bluetooth device name points to a valid
-    // endpoint we're discovering.
-    if (!IsRecognizedBluetoothEndpoint(device_name_string, service_id,
-                                       device_name))
-      return;
+        // Make sure the Bluetooth device name points to a valid
+        // endpoint we're discovering.
+        if (!IsRecognizedBluetoothEndpoint(device_name_string, service_id,
+                                           device_name))
+          return;
 
-    // Report the BluetoothEndpoint as lost to the client.
-    NEARBY_LOG(INFO,
-               "BT discovery handler (LOST) [client=%p, service=%s]: report "
-               "to client",
-               client, service_id.c_str());
-    OnEndpointLost(client,
-                   DiscoveredEndpoint{device_name.GetEndpointId(),
-                                      device_name.GetEndpointInfo(), service_id,
-                                      proto::connections::Medium::BLUETOOTH,
-                                      WebRtcState::kUndefined});
-  });
+        // Report the BluetoothEndpoint as lost to the client.
+        NEARBY_LOG(
+            INFO,
+            "BT discovery handler (LOST) [client=%p, service=%s]: report "
+            "to client",
+            client, service_id.c_str());
+        OnEndpointLost(client, DiscoveredEndpoint{
+                                   device_name.GetEndpointId(),
+                                   device_name.GetEndpointInfo(), service_id,
+                                   proto::connections::Medium::BLUETOOTH,
+                                   WebRtcState::kUndefined});
+      });
 }
 
 bool P2pClusterPcpHandler::IsRecognizedBleEndpoint(
@@ -408,76 +421,81 @@
     ClientProxy* client, BlePeripheral& peripheral,
     const std::string& service_id, const ByteArray& advertisement_bytes,
     bool fast_advertisement) {
-  RunOnPcpHandlerThread([this, client, &peripheral, service_id,
-                         advertisement_bytes,
-                         fast_advertisement]() RUN_ON_PCP_HANDLER_THREAD() {
-    // Make sure we are still discovering before proceeding.
-    if (!client->IsDiscovering()) {
-      NEARBY_LOG(INFO,
-                 "Ble scanning handler (FOUND) [client=%p, service_id=%s]: not "
-                 "in discovery mode",
-                 client, service_id.c_str());
-      return;
-    }
+  RunOnPcpHandlerThread(
+      "p2p-ble-device-discovered",
+      [this, client, &peripheral, service_id, advertisement_bytes,
+       fast_advertisement]() RUN_ON_PCP_HANDLER_THREAD() {
+        // Make sure we are still discovering before proceeding.
+        if (!client->IsDiscovering()) {
+          NEARBY_LOG(
+              INFO,
+              "Ble scanning handler (FOUND) [client=%p, service_id=%s]: not "
+              "in discovery mode",
+              client, service_id.c_str());
+          return;
+        }
 
-    // Parse the BLE advertisement bytes.
-    BleAdvertisement advertisement(fast_advertisement, advertisement_bytes);
+        // Parse the BLE advertisement bytes.
+        BleAdvertisement advertisement(fast_advertisement, advertisement_bytes);
 
-    // Make sure the BLE advertisement points to a valid
-    // endpoint we're discovering.
-    if (!IsRecognizedBleEndpoint(service_id, advertisement)) return;
+        // Make sure the BLE advertisement points to a valid
+        // endpoint we're discovering.
+        if (!IsRecognizedBleEndpoint(service_id, advertisement)) return;
 
-    // Store all the state we need to be able to re-create a BleEndpoint
-    // in BlePeripheralLostHandler, since that isn't privy to
-    // the bytes of the ble advertisement itself.
-    found_ble_endpoints_.emplace(
-        peripheral.GetName(),
-        BleEndpointState(advertisement.GetEndpointId(),
-                         advertisement.GetEndpointInfo()));
+        // Store all the state we need to be able to re-create a BleEndpoint
+        // in BlePeripheralLostHandler, since that isn't privy to
+        // the bytes of the ble advertisement itself.
+        found_ble_endpoints_.emplace(
+            peripheral.GetName(),
+            BleEndpointState(advertisement.GetEndpointId(),
+                             advertisement.GetEndpointInfo()));
 
-    // Report the discovered endpoint to the client.
-    NEARBY_LOGS(INFO)
-        << "Invoking BasePcpHandler::OnEndpointFound() for Ble service="
-        << service_id << "; id=" << advertisement.GetEndpointId() << "; name="
-        << absl::BytesToHexString(advertisement.GetEndpointInfo().data());
-    OnEndpointFound(client, std::make_shared<BleEndpoint>(BleEndpoint{
-                                {advertisement.GetEndpointId(),
-                                 advertisement.GetEndpointInfo(), service_id,
-                                 proto::connections::Medium::BLE,
-                                 advertisement.GetWebRtcState()},
-                                peripheral,
-                            }));
+        // Report the discovered endpoint to the client.
+        NEARBY_LOGS(INFO)
+            << "Invoking BasePcpHandler::OnEndpointFound() for Ble service="
+            << service_id << "; id=" << advertisement.GetEndpointId()
+            << "; name="
+            << absl::BytesToHexString(advertisement.GetEndpointInfo().data());
+        OnEndpointFound(
+            client,
+            std::make_shared<BleEndpoint>(BleEndpoint{
+                {advertisement.GetEndpointId(), advertisement.GetEndpointInfo(),
+                 service_id, proto::connections::Medium::BLE,
+                 advertisement.GetWebRtcState()},
+                peripheral,
+            }));
 
-    // Make sure we can connect to this device via Classic Bluetooth.
-    std::string remote_bluetooth_mac_address =
-        advertisement.GetBluetoothMacAddress();
-    if (remote_bluetooth_mac_address.empty()) {
-      NEARBY_LOGS(INFO)
-          << "No Bluetooth Classic MAC address found in advertisement";
-      return;
-    }
+        // Make sure we can connect to this device via Classic Bluetooth.
+        std::string remote_bluetooth_mac_address =
+            advertisement.GetBluetoothMacAddress();
+        if (remote_bluetooth_mac_address.empty()) {
+          NEARBY_LOGS(INFO)
+              << "No Bluetooth Classic MAC address found in advertisement";
+          return;
+        }
 
-    BluetoothDevice remote_bluetooth_device =
-        bluetooth_medium_.GetRemoteDevice(remote_bluetooth_mac_address);
-    if (!remote_bluetooth_device.IsValid()) {
-      NEARBY_LOGS(INFO) << "A valid Bluetooth device could not be derived from "
-                           "the MAC address "
-                        << remote_bluetooth_mac_address;
-      return;
-    }
+        BluetoothDevice remote_bluetooth_device =
+            bluetooth_medium_.GetRemoteDevice(remote_bluetooth_mac_address);
+        if (!remote_bluetooth_device.IsValid()) {
+          NEARBY_LOGS(INFO)
+              << "A valid Bluetooth device could not be derived from "
+                 "the MAC address "
+              << remote_bluetooth_mac_address;
+          return;
+        }
 
-    OnEndpointFound(client,
-                    std::make_shared<BluetoothEndpoint>(BluetoothEndpoint{
-                        {
-                            advertisement.GetEndpointId(),
-                            advertisement.GetEndpointInfo(),
-                            service_id,
-                            proto::connections::Medium::BLUETOOTH,
-                            advertisement.GetWebRtcState(),
-                        },
-                        remote_bluetooth_device,
-                    }));
-  });
+        OnEndpointFound(client,
+                        std::make_shared<BluetoothEndpoint>(BluetoothEndpoint{
+                            {
+                                advertisement.GetEndpointId(),
+                                advertisement.GetEndpointInfo(),
+                                service_id,
+                                proto::connections::Medium::BLUETOOTH,
+                                advertisement.GetWebRtcState(),
+                            },
+                            remote_bluetooth_device,
+                        }));
+      });
 }
 
 void P2pClusterPcpHandler::BlePeripheralLostHandler(
@@ -486,38 +504,40 @@
   std::string peripheral_name = peripheral.GetName();
   NEARBY_LOG(INFO, "Ble: [LOST, SCHED] peripheral_name=%s",
              peripheral_name.c_str());
-  RunOnPcpHandlerThread([this, client, service_id,
-                         &peripheral]() RUN_ON_PCP_HANDLER_THREAD() {
-    // Make sure we are still discovering before proceeding.
-    if (!client->IsDiscovering()) {
-      NEARBY_LOG(INFO,
-                 "Ble scanning handler (LOST) [client=%p, service_id=%s]: not "
-                 "in scanning mode",
-                 client, service_id.c_str());
-      return;
-    }
+  RunOnPcpHandlerThread(
+      "p2p-ble-device-lost",
+      [this, client, service_id, &peripheral]() RUN_ON_PCP_HANDLER_THREAD() {
+        // Make sure we are still discovering before proceeding.
+        if (!client->IsDiscovering()) {
+          NEARBY_LOG(
+              INFO,
+              "Ble scanning handler (LOST) [client=%p, service_id=%s]: not "
+              "in scanning mode",
+              client, service_id.c_str());
+          return;
+        }
 
-    // Remove this BlePeripheral from found_ble_endpoints_, and
-    // report the endpoint as lost to the client.
-    auto item = found_ble_endpoints_.find(peripheral.GetName());
-    if (item != found_ble_endpoints_.end()) {
-      BleEndpointState ble_endpoint_state(item->second);
-      found_ble_endpoints_.erase(item);
+        // Remove this BlePeripheral from found_ble_endpoints_, and
+        // report the endpoint as lost to the client.
+        auto item = found_ble_endpoints_.find(peripheral.GetName());
+        if (item != found_ble_endpoints_.end()) {
+          BleEndpointState ble_endpoint_state(item->second);
+          found_ble_endpoints_.erase(item);
 
-      // Report the discovered endpoint to the client.
-      NEARBY_LOG(INFO,
-                 "Ble scanning handler (LOST) [client=%p, "
-                 "service_id=%s]: report to client",
-                 client, service_id.c_str());
-      OnEndpointLost(client, DiscoveredEndpoint{
-                                 ble_endpoint_state.endpoint_id,
-                                 ble_endpoint_state.endpoint_info,
-                                 service_id,
-                                 proto::connections::Medium::BLE,
-                                 WebRtcState::kUndefined,
-                             });
-    }
-  });
+          // Report the discovered endpoint to the client.
+          NEARBY_LOG(INFO,
+                     "Ble scanning handler (LOST) [client=%p, "
+                     "service_id=%s]: report to client",
+                     client, service_id.c_str());
+          OnEndpointLost(client, DiscoveredEndpoint{
+                                     ble_endpoint_state.endpoint_id,
+                                     ble_endpoint_state.endpoint_info,
+                                     service_id,
+                                     proto::connections::Medium::BLE,
+                                     WebRtcState::kUndefined,
+                                 });
+        }
+      });
 }
 
 bool P2pClusterPcpHandler::IsRecognizedWifiLanEndpoint(
@@ -557,43 +577,46 @@
 void P2pClusterPcpHandler::WifiLanServiceDiscoveredHandler(
     ClientProxy* client, WifiLanService& wifi_lan_service,
     const std::string& service_id) {
-  RunOnPcpHandlerThread([this, client, service_id,
-                         &wifi_lan_service]() RUN_ON_PCP_HANDLER_THREAD() {
-    // Make sure we are still discovering before proceeding.
-    if (!client->IsDiscovering()) {
-      NEARBY_LOG(
-          INFO,
-          "WifiLan discovery handler (FOUND) [client=%p, service=%s]: not "
-          "in discovery mode",
-          client, service_id.c_str());
-      return;
-    }
+  RunOnPcpHandlerThread(
+      "p2p-wifi-service-discovered",
+      [this, client, service_id,
+       &wifi_lan_service]() RUN_ON_PCP_HANDLER_THREAD() {
+        // Make sure we are still discovering before proceeding.
+        if (!client->IsDiscovering()) {
+          NEARBY_LOG(
+              INFO,
+              "WifiLan discovery handler (FOUND) [client=%p, service=%s]: not "
+              "in discovery mode",
+              client, service_id.c_str());
+          return;
+        }
 
-    // Parse the WifiLanServiceInfo.
-    WifiLanServiceInfo service_info(wifi_lan_service.GetServiceInfo());
+        // Parse the WifiLanServiceInfo.
+        WifiLanServiceInfo service_info(wifi_lan_service.GetServiceInfo());
 
-    // Make sure the WifiLan service name points to a valid
-    // endpoint we're discovering.
-    if (!IsRecognizedWifiLanEndpoint(service_id, service_info)) return;
+        // Make sure the WifiLan service name points to a valid
+        // endpoint we're discovering.
+        if (!IsRecognizedWifiLanEndpoint(service_id, service_info)) return;
 
-    // Report the discovered endpoint to the client.
-    NEARBY_LOG(
-        INFO,
-        "Invoking BasePcpHandler::OnEndpointFound() for WifiLan "
-        "service_id=%s; endpoint_id=%s; endpoint_info=%s",
-        service_id.c_str(), service_info.GetEndpointId().c_str(),
-        absl::BytesToHexString(service_info.GetEndpointInfo().data()).c_str());
-    OnEndpointFound(client, std::make_shared<WifiLanEndpoint>(WifiLanEndpoint{
-                                {
-                                    service_info.GetEndpointId(),
-                                    service_info.GetEndpointInfo(),
-                                    service_id,
-                                    proto::connections::Medium::WIFI_LAN,
-                                    service_info.GetWebRtcState(),
-                                },
-                                wifi_lan_service,
-                            }));
-  });
+        // Report the discovered endpoint to the client.
+        NEARBY_LOG(INFO,
+                   "Invoking BasePcpHandler::OnEndpointFound() for WifiLan "
+                   "service_id=%s; endpoint_id=%s; endpoint_info=%s",
+                   service_id.c_str(), service_info.GetEndpointId().c_str(),
+                   absl::BytesToHexString(service_info.GetEndpointInfo().data())
+                       .c_str());
+        OnEndpointFound(client,
+                        std::make_shared<WifiLanEndpoint>(WifiLanEndpoint{
+                            {
+                                service_info.GetEndpointId(),
+                                service_info.GetEndpointInfo(),
+                                service_id,
+                                proto::connections::Medium::WIFI_LAN,
+                                service_info.GetWebRtcState(),
+                            },
+                            wifi_lan_service,
+                        }));
+      });
 }
 
 void P2pClusterPcpHandler::WifiLanServiceLostHandler(
@@ -603,39 +626,41 @@
   NEARBY_LOG(INFO,
              "WifiLan: [LOST, SCHED] wifi_lan_service=%p, service_info_name=%s",
              &wifi_lan_service, nsd_service_info.GetServiceInfoName().c_str());
-  RunOnPcpHandlerThread([this, client, service_id,
-                         nsd_service_info]() RUN_ON_PCP_HANDLER_THREAD() {
-    // Make sure we are still discovering before proceeding.
-    if (!client->IsDiscovering()) {
-      NEARBY_LOG(
-          INFO,
-          "WifiLan discovery handler (LOST) [client=%p, service=%s]: not "
-          "in discovery mode",
-          client, service_id.c_str());
-      return;
-    }
+  RunOnPcpHandlerThread(
+      "p2p-wifi-service-lost",
+      [this, client, service_id,
+       nsd_service_info]() RUN_ON_PCP_HANDLER_THREAD() {
+        // Make sure we are still discovering before proceeding.
+        if (!client->IsDiscovering()) {
+          NEARBY_LOG(
+              INFO,
+              "WifiLan discovery handler (LOST) [client=%p, service=%s]: not "
+              "in discovery mode",
+              client, service_id.c_str());
+          return;
+        }
 
-    // Parse the WifiLanServiceInfo.
-    WifiLanServiceInfo service_info(nsd_service_info);
+        // Parse the WifiLanServiceInfo.
+        WifiLanServiceInfo service_info(nsd_service_info);
 
-    // Make sure the WifiLan service name points to a valid
-    // endpoint we're discovering.
-    if (!IsRecognizedWifiLanEndpoint(service_id, service_info)) return;
+        // Make sure the WifiLan service name points to a valid
+        // endpoint we're discovering.
+        if (!IsRecognizedWifiLanEndpoint(service_id, service_info)) return;
 
-    // Report the discovered endpoint to the client.
-    NEARBY_LOG(
-        INFO,
-        "WifiLan discovery handler (LOST) [client=%p, service_id=%s]: report "
-        "to client",
-        client, service_id.c_str());
-    OnEndpointLost(client, DiscoveredEndpoint{
-                               service_info.GetEndpointId(),
-                               service_info.GetEndpointInfo(),
-                               service_id,
-                               proto::connections::Medium::WIFI_LAN,
-                               WebRtcState::kUndefined,
-                           });
-  });
+        // Report the discovered endpoint to the client.
+        NEARBY_LOG(INFO,
+                   "WifiLan discovery handler (LOST) [client=%p, "
+                   "service_id=%s]: report "
+                   "to client",
+                   client, service_id.c_str());
+        OnEndpointLost(client, DiscoveredEndpoint{
+                                   service_info.GetEndpointId(),
+                                   service_info.GetEndpointInfo(),
+                                   service_id,
+                                   proto::connections::Medium::WIFI_LAN,
+                                   WebRtcState::kUndefined,
+                               });
+      });
 }
 
 BasePcpHandler::StartOperationResult P2pClusterPcpHandler::StartDiscoveryImpl(
@@ -830,6 +855,7 @@
               return;
             }
             RunOnPcpHandlerThread(
+                "p2p-bt-on-incoming-connection",
                 [this, client, local_endpoint_info,
                  socket =
                      std::move(socket)]() RUN_ON_PCP_HANDLER_THREAD() mutable {
@@ -959,6 +985,7 @@
                 return;
               }
               RunOnPcpHandlerThread(
+                  "p2p-ble-on-incoming-connection",
                   [this, client, local_endpoint_info, service_id,
                    socket = std::move(socket)]()
                       RUN_ON_PCP_HANDLER_THREAD() mutable {
@@ -999,6 +1026,7 @@
                   return;
                 }
                 RunOnPcpHandlerThread(
+                    "p2p-bt-on-incoming-connection",
                     [this, client, local_endpoint_info,
                      socket = std::move(socket)]()
                         RUN_ON_PCP_HANDLER_THREAD() mutable {
@@ -1143,6 +1171,7 @@
               return;
             }
             RunOnPcpHandlerThread(
+                "p2p-wifi-on-incoming-connection",
                 [this, client, local_endpoint_info,
                  socket = std::move(socket)]()
                     RUN_ON_PCP_HANDLER_THREAD() mutable {
@@ -1272,6 +1301,7 @@
               }
 
               RunOnPcpHandlerThread(
+                  "p2p-rtc-on-incoming-connection",
                   [this, client,
                    socket = std::move(socket)]() RUN_ON_PCP_HANDLER_THREAD() {
                     std::string remote_device_name = "WebRtcSocket";
diff --git a/cpp/core/internal/payload_manager.cc b/cpp/core/internal/payload_manager.cc
index 29b12e0..9f28697 100644
--- a/cpp/core/internal/payload_manager.cc
+++ b/cpp/core/internal/payload_manager.cc
@@ -281,6 +281,7 @@
   CountDownLatch stop_latch(1);
   // Clear our tracked pending payloads.
   RunOnStatusUpdateThread(
+      "~payload-manager",
       [this, &stop_latch]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() {
         NEARBY_LOG(INFO, "PayloadManager: stop tracking payloads; self=%p",
                    this);
@@ -334,7 +335,8 @@
   Payload::Type payload_type = payload.GetType();
   Payload::Id payload_id =
       CreateOutgoingPayload(std::move(payload), endpoint_ids);
-  executor->Execute([this, client, endpoint_ids, payload_id, payload_type]() {
+  executor->Execute("send-payload", [this, client, endpoint_ids, payload_id,
+                                     payload_type]() {
     if (shutdown_.Get()) return;
     PendingPayload* pending_payload = GetPayload(payload_id);
     if (!pending_payload) {
@@ -355,7 +357,8 @@
       should_continue = SendPayloadLoop(client, *pending_payload,
                                         payload_header, next_chunk_offset);
     }
-    RunOnStatusUpdateThread([this, payload_id]()
+    RunOnStatusUpdateThread("destroy-payload",
+                            [this, payload_id]()
                                 RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() {
                                   DestroyPendingPayload(payload_id);
                                 });
@@ -430,6 +433,7 @@
     return;
   }
   RunOnStatusUpdateThread(
+      "payload-manager-on-disconnect",
       [this, client, endpoint_id, barrier]()
           RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() mutable {
             // Iterate through all our payloads and look for payloads associated
@@ -586,62 +590,65 @@
     const PayloadTransferFrame::PayloadHeader& payload_header,
     std::int64_t num_bytes_successfully_transferred,
     proto::connections::PayloadStatus status) {
-  RunOnStatusUpdateThread([this, client, finished_endpoint_ids, payload_header,
-                           num_bytes_successfully_transferred,
-                           status]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() {
-    // Make sure we're still tracking this payload.
-    PendingPayload* pending_payload = GetPayload(payload_header.id());
-    if (!pending_payload) {
-      return;
-    }
+  RunOnStatusUpdateThread(
+      "outgoing-payload-callbacks",
+      [this, client, finished_endpoint_ids, payload_header,
+       num_bytes_successfully_transferred,
+       status]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() {
+        // Make sure we're still tracking this payload.
+        PendingPayload* pending_payload = GetPayload(payload_header.id());
+        if (!pending_payload) {
+          return;
+        }
 
-    PayloadProgressInfo update{
-        payload_header.id(),
-        PayloadManager::PayloadStatusToTransferUpdateStatus(status),
-        payload_header.total_size(), num_bytes_successfully_transferred};
-    for (const auto& endpoint_id : finished_endpoint_ids) {
-      // Skip sending notifications if we have stopped tracking this
-      // endpoint.
-      if (!pending_payload->GetEndpoint(endpoint_id)) {
-        continue;
-      }
+        PayloadProgressInfo update{
+            payload_header.id(),
+            PayloadManager::PayloadStatusToTransferUpdateStatus(status),
+            payload_header.total_size(), num_bytes_successfully_transferred};
+        for (const auto& endpoint_id : finished_endpoint_ids) {
+          // Skip sending notifications if we have stopped tracking this
+          // endpoint.
+          if (!pending_payload->GetEndpoint(endpoint_id)) {
+            continue;
+          }
 
-      // Notify the client.
-      client->OnPayloadProgress(endpoint_id, update);
-    }
+          // Notify the client.
+          client->OnPayloadProgress(endpoint_id, update);
+        }
 
-    // Remove these endpoints from our tracking list for this payload.
-    pending_payload->RemoveEndpoints(finished_endpoint_ids);
+        // Remove these endpoints from our tracking list for this payload.
+        pending_payload->RemoveEndpoints(finished_endpoint_ids);
 
-    // Close the payload if no endpoints remain.
-    if (pending_payload->GetEndpoints().empty()) {
-      pending_payload->Close();
-    }
-  });
+        // Close the payload if no endpoints remain.
+        if (pending_payload->GetEndpoints().empty()) {
+          pending_payload->Close();
+        }
+      });
 }
 
 void PayloadManager::SendClientCallbacksForFinishedIncomingPayload(
     ClientProxy* client, const std::string& endpoint_id,
     const PayloadTransferFrame::PayloadHeader& payload_header,
     std::int64_t offset_bytes, proto::connections::PayloadStatus status) {
-  RunOnStatusUpdateThread([this, client, endpoint_id, payload_header,
-                           offset_bytes,
-                           status]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() {
-    // Make sure we're still tracking this payload.
-    PendingPayload* pending_payload = GetPayload(payload_header.id());
-    if (!pending_payload) {
-      return;
-    }
+  RunOnStatusUpdateThread(
+      "incoming-payload-callbacks",
+      [this, client, endpoint_id, payload_header, offset_bytes,
+       status]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() {
+        // Make sure we're still tracking this payload.
+        PendingPayload* pending_payload = GetPayload(payload_header.id());
+        if (!pending_payload) {
+          return;
+        }
 
-    // Unless we never started tracking this payload (meaning we failed to
-    // even create the InternalPayload), notify the client (and close it).
-    PayloadProgressInfo update{
-        payload_header.id(),
-        PayloadManager::PayloadStatusToTransferUpdateStatus(status),
-        payload_header.total_size(), offset_bytes};
-    NotifyClientOfIncomingPayloadProgressInfo(client, endpoint_id, update);
-    DestroyPendingPayload(payload_header.id());
-  });
+        // Unless we never started tracking this payload (meaning we failed to
+        // even create the InternalPayload), notify the client (and close it).
+        PayloadProgressInfo update{
+            payload_header.id(),
+            PayloadManager::PayloadStatusToTransferUpdateStatus(status),
+            payload_header.total_size(), offset_bytes};
+        NotifyClientOfIncomingPayloadProgressInfo(client, endpoint_id, update);
+        DestroyPendingPayload(payload_header.id());
+      });
 }
 
 void PayloadManager::SendControlMessage(
@@ -734,6 +741,7 @@
     std::int32_t payload_chunk_flags, std::int64_t payload_chunk_offset,
     std::int64_t payload_chunk_body_size) {
   RunOnStatusUpdateThread(
+      "outgoing-chunk-success",
       [this, client, endpoint_id, payload_header, payload_chunk_flags,
        payload_chunk_offset,
        payload_chunk_body_size]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() {
@@ -799,6 +807,7 @@
     std::int32_t payload_chunk_flags, std::int64_t payload_chunk_offset,
     std::int64_t payload_chunk_body_size) {
   RunOnStatusUpdateThread(
+      "incoming-chunk-success",
       [this, client, endpoint_id, payload_header, payload_chunk_flags,
        payload_chunk_offset,
        payload_chunk_body_size]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() {
@@ -858,6 +867,7 @@
 
     // Also, let the client know of this new incoming payload.
     RunOnStatusUpdateThread(
+        "process-data-packet",
         [to_client, from_endpoint_id, pending_payload]()
             RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() {
               NEARBY_LOG(INFO,
@@ -1128,8 +1138,9 @@
   return close_event_.Await(absl::ZeroDuration()).result();
 }
 
-void PayloadManager::RunOnStatusUpdateThread(std::function<void()> runnable) {
-  payload_status_update_executor_.Execute(std::move(runnable));
+void PayloadManager::RunOnStatusUpdateThread(const std::string& name,
+                                             std::function<void()> runnable) {
+  payload_status_update_executor_.Execute(name, std::move(runnable));
 }
 
 /////////////////////////////// PendingPayloads ///////////////////////////////
diff --git a/cpp/core/internal/payload_manager.h b/cpp/core/internal/payload_manager.h
index 9c18756..9fb1630 100644
--- a/cpp/core/internal/payload_manager.h
+++ b/cpp/core/internal/payload_manager.h
@@ -277,7 +277,8 @@
 
   SingleThreadExecutor* GetOutgoingPayloadExecutor(Payload::Type payload_type);
 
-  void RunOnStatusUpdateThread(std::function<void()> runnable);
+  void RunOnStatusUpdateThread(const std::string& name,
+                               std::function<void()> runnable);
   bool NotifyShutdown() ABSL_LOCKS_EXCLUDED(mutex_);
   void DestroyPendingPayload(Payload::Id payload_id)
       ABSL_LOCKS_EXCLUDED(mutex_);
diff --git a/cpp/core/internal/service_controller_router.cc b/cpp/core/internal/service_controller_router.cc
index fd98901..70a368f 100644
--- a/cpp/core/internal/service_controller_router.cc
+++ b/cpp/core/internal/service_controller_router.cc
@@ -67,28 +67,31 @@
     ClientProxy* client, absl::string_view service_id,
     const ConnectionOptions& options, const ConnectionRequestInfo& info,
     const ResultCallback& callback) {
-  RouteToServiceController([this, client, service_id = std::string(service_id),
-                            options, info, callback]() {
-    Status status = AcquireServiceControllerForClient(client, options.strategy);
-    if (!status.Ok()) {
-      callback.result_cb(status);
-      return;
-    }
+  RouteToServiceController(
+      "scr-start-advertising",
+      [this, client, service_id = std::string(service_id), options, info,
+       callback]() {
+        Status status =
+            AcquireServiceControllerForClient(client, options.strategy);
+        if (!status.Ok()) {
+          callback.result_cb(status);
+          return;
+        }
 
-    if (client->IsAdvertising()) {
-      callback.result_cb({Status::kAlreadyAdvertising});
-      return;
-    }
+        if (client->IsAdvertising()) {
+          callback.result_cb({Status::kAlreadyAdvertising});
+          return;
+        }
 
-    status = service_controller_->StartAdvertising(client, service_id, options,
-                                                   info);
-    callback.result_cb(status);
-  });
+        status = service_controller_->StartAdvertising(client, service_id,
+                                                       options, info);
+        callback.result_cb(status);
+      });
 }
 
 void ServiceControllerRouter::StopAdvertising(ClientProxy* client,
                                               const ResultCallback& callback) {
-  RouteToServiceController([this, client, callback]() {
+  RouteToServiceController("scr-stop-advertising", [this, client, callback]() {
     if (ClientHasAcquiredServiceController(client) && client->IsAdvertising()) {
       service_controller_->StopAdvertising(client);
     }
@@ -101,8 +104,11 @@
                                              const ConnectionOptions& options,
                                              const DiscoveryListener& listener,
                                              const ResultCallback& callback) {
-  RouteToServiceController([this, client, service_id = std::string(service_id),
-                            options, listener, callback]() {
+  RouteToServiceController("scr-start-discovery", [this, client,
+                                                   service_id =
+                                                       std::string(service_id),
+                                                   options, listener,
+                                                   callback]() {
     Status status = AcquireServiceControllerForClient(client, options.strategy);
     if (!status.Ok()) {
       callback.result_cb(status);
@@ -122,7 +128,7 @@
 
 void ServiceControllerRouter::StopDiscovery(ClientProxy* client,
                                             const ResultCallback& callback) {
-  RouteToServiceController([this, client, callback]() {
+  RouteToServiceController("scr-stop-discovery", [this, client, callback]() {
     if (ClientHasAcquiredServiceController(client) && client->IsDiscovering()) {
       service_controller_->StopDiscovery(client);
     }
@@ -134,8 +140,10 @@
     ClientProxy* client, absl::string_view service_id,
     const OutOfBandConnectionMetadata& metadata,
     const ResultCallback& callback) {
-  RouteToServiceController([this, client, service_id = std::string(service_id),
-                            metadata, callback]() {
+  RouteToServiceController("scr-inject-endpoint", [this, client,
+                                                   service_id =
+                                                       std::string(service_id),
+                                                   metadata, callback]() {
     // Currently, Bluetooth is the only supported medium for endpoint injection.
     if (metadata.medium != Medium::BLUETOOTH ||
         metadata.remote_bluetooth_mac_address.size() != kMacAddressLength) {
@@ -173,36 +181,38 @@
   // CancellationListener as soon as possible.
   client->AddCancellationFlag(std::string(endpoint_id));
 
-  RouteToServiceController([this, client,
-                            endpoint_id = std::string(endpoint_id), info,
-                            options, callback]() {
-    if (!ClientHasAcquiredServiceController(client)) {
-      callback.result_cb({Status::kOutOfOrderApiCall});
-      return;
-    }
+  RouteToServiceController(
+      "scr-request-connection",
+      [this, client, endpoint_id = std::string(endpoint_id), info, options,
+       callback]() {
+        if (!ClientHasAcquiredServiceController(client)) {
+          callback.result_cb({Status::kOutOfOrderApiCall});
+          return;
+        }
 
-    if (client->HasPendingConnectionToEndpoint(endpoint_id) ||
-        client->IsConnectedToEndpoint(endpoint_id)) {
-      callback.result_cb({Status::kAlreadyConnectedToEndpoint});
-      return;
-    }
+        if (client->HasPendingConnectionToEndpoint(endpoint_id) ||
+            client->IsConnectedToEndpoint(endpoint_id)) {
+          callback.result_cb({Status::kAlreadyConnectedToEndpoint});
+          return;
+        }
 
-    Status status = service_controller_->RequestConnection(client, endpoint_id,
-                                                           info, options);
-    if (!status.Ok()) {
-      client->CancelEndpoint(endpoint_id);
-    }
-    callback.result_cb(status);
-  });
+        Status status = service_controller_->RequestConnection(
+            client, endpoint_id, info, options);
+        if (!status.Ok()) {
+          client->CancelEndpoint(endpoint_id);
+        }
+        callback.result_cb(status);
+      });
 }
 
 void ServiceControllerRouter::AcceptConnection(ClientProxy* client,
                                                absl::string_view endpoint_id,
                                                const PayloadListener& listener,
                                                const ResultCallback& callback) {
-  RouteToServiceController([this, client,
-                            endpoint_id = std::string(endpoint_id), listener,
-                            callback]() {
+  RouteToServiceController("scr-accept-connection", [this, client,
+                                                     endpoint_id = std::string(
+                                                         endpoint_id),
+                                                     listener, callback]() {
     if (!ClientHasAcquiredServiceController(client)) {
       callback.result_cb({Status::kOutOfOrderApiCall});
       return;
@@ -234,6 +244,7 @@
   client->CancelEndpoint(std::string(endpoint_id));
 
   RouteToServiceController(
+      "scr-reject-connection",
       [this, client, endpoint_id = std::string(endpoint_id), callback]() {
         if (!ClientHasAcquiredServiceController(client)) {
           callback.result_cb({Status::kOutOfOrderApiCall});
@@ -264,6 +275,7 @@
     ClientProxy* client, absl::string_view endpoint_id,
     const ResultCallback& callback) {
   RouteToServiceController(
+      "scr-init-bwu",
       [this, client, endpoint_id = std::string(endpoint_id), callback]() {
         if (!ClientHasAcquiredServiceController(client) ||
             !client->IsConnectedToEndpoint(endpoint_id)) {
@@ -293,6 +305,7 @@
       std::vector<std::string>(endpoint_ids.begin(), endpoint_ids.end());
 
   RouteToServiceController(
+      "scr-send-payload",
       [this, client, shared_payload, endpoints, callback]() {
         if (!ClientHasAcquiredServiceController(client)) {
           callback.result_cb({Status::kOutOfOrderApiCall});
@@ -318,7 +331,8 @@
 void ServiceControllerRouter::CancelPayload(ClientProxy* client,
                                             std::uint64_t payload_id,
                                             const ResultCallback& callback) {
-  RouteToServiceController([this, client, payload_id, callback]() {
+  RouteToServiceController("scr-cancel-payload", [this, client, payload_id,
+                                                  callback]() {
     if (!ClientHasAcquiredServiceController(client)) {
       callback.result_cb({Status::kOutOfOrderApiCall});
       return;
@@ -336,6 +350,7 @@
   client->CancelEndpoint(std::string(endpoint_id));
 
   RouteToServiceController(
+      "scr-disconnect-endpoint",
       [this, client, endpoint_id = std::string(endpoint_id), callback]() {
         if (ClientHasAcquiredServiceController(client)) {
           if (!client->IsConnectedToEndpoint(endpoint_id) &&
@@ -355,11 +370,12 @@
   // without further posting it.
   client->CancelAllEndpoints();
 
-  RouteToServiceController([this, client, callback]() {
+  RouteToServiceController("scr-stop-all-endpoints", [this, client,
+                                                      callback]() {
     NEARBY_LOGS(INFO) << "Client " << client->GetClientId()
                       << " has requested us to stop all endpoints. We will now "
                          "reset the client.";
-        if (ClientHasAcquiredServiceController(client)) {
+    if (ClientHasAcquiredServiceController(client)) {
       DoneWithStrategySessionForClient(client);
     }
     callback.result_cb({Status::kSuccess});
@@ -372,7 +388,8 @@
   // without further posting it.
   client->CancelAllEndpoints();
 
-  RouteToServiceController([this, client, callback]() {
+  RouteToServiceController("scr-client-disconnecting", [this, client,
+                                                        callback]() {
     if (ClientHasAcquiredServiceController(client)) {
       DoneWithStrategySessionForClient(client);
       NEARBY_LOGS(INFO) << "Client " << client->GetClientId()
@@ -465,8 +482,9 @@
   ReleaseServiceControllerForClient(client);
 }
 
-void ServiceControllerRouter::RouteToServiceController(Runnable runnable) {
-  serializer_.Execute(std::move(runnable));
+void ServiceControllerRouter::RouteToServiceController(const std::string& name,
+                                                       Runnable runnable) {
+  serializer_.Execute(name, std::move(runnable));
 }
 
 bool ServiceControllerRouter::ClientHasConnectionToAtLeastOneEndpoint(
diff --git a/cpp/core/internal/service_controller_router.h b/cpp/core/internal/service_controller_router.h
index 7c28246..62f070f 100644
--- a/cpp/core/internal/service_controller_router.h
+++ b/cpp/core/internal/service_controller_router.h
@@ -106,7 +106,7 @@
   static bool ClientHasConnectionToAtLeastOneEndpoint(
       ClientProxy* client, const std::vector<std::string>& remote_endpoint_ids);
 
-  void RouteToServiceController(Runnable runnable);
+  void RouteToServiceController(const std::string& name, Runnable runnable);
 
   Status AcquireServiceControllerForClient(ClientProxy* client,
                                            Strategy strategy);
diff --git a/cpp/platform/public/BUILD b/cpp/platform/public/BUILD
index 459584d..58dde04 100644
--- a/cpp/platform/public/BUILD
+++ b/cpp/platform/public/BUILD
@@ -32,9 +32,11 @@
         "future.h",
         "lockable.h",
         "logging.h",
+        "monitored_runnable.h",
         "multi_thread_executor.h",
         "mutex.h",
         "mutex_lock.h",
+        "pending_job_registry.h",
         "pipe.h",
         "scheduled_executor.h",
         "settable_future.h",
diff --git a/cpp/platform/public/monitored_runnable.h b/cpp/platform/public/monitored_runnable.h
new file mode 100644
index 0000000..63e3475
--- /dev/null
+++ b/cpp/platform/public/monitored_runnable.h
@@ -0,0 +1,71 @@
+// Copyright 2020 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef PLATFORM_PUBLIC_MONITORED_RUNNABLE_H_
+#define PLATFORM_PUBLIC_MONITORED_RUNNABLE_H_
+
+#include <utility>
+
+#include "platform/base/runnable.h"
+#include "platform/public/logging.h"
+#include "platform/public/pending_job_registry.h"
+#include "platform/public/system_clock.h"
+#include "absl/time/time.h"
+
+namespace location {
+namespace nearby {
+
+// A runnable with extra logging
+// We log if the task has been waiting long on the executor or if it was running
+// for a long time. The latter isn't always an issue - some tasks are expected
+// to run for longer periods of time (minutes).
+class MonitoredRunnable {
+ public:
+  explicit MonitoredRunnable(Runnable&& runnable) : runnable_{runnable} {}
+  MonitoredRunnable(const std::string& name, Runnable&& runnable)
+      : name_{name}, runnable_{runnable} {
+    PendingJobRegistry::GetInstance().AddPendingJob(name_, post_time_);
+  }
+
+  void operator()() const {
+    auto start_time = SystemClock::ElapsedRealtime();
+    auto start_delay = start_time - post_time_;
+    if (start_delay >= kMinReportedStartDelay) {
+      NEARBY_LOGS(INFO) << "Task: \"" << name_ << "\" started after "
+                        << absl::ToInt64Seconds(start_delay) << " seconds";
+    }
+    PendingJobRegistry::GetInstance().RemovePendingJob(name_, post_time_);
+    PendingJobRegistry::GetInstance().AddRunningJob(name_, post_time_);
+    runnable_();
+    auto task_duration = SystemClock::ElapsedRealtime() - start_time;
+    if (task_duration >= kMinReportedTaskDuration) {
+      NEARBY_LOGS(INFO) << "Task: \"" << name_ << "\" finished after "
+                        << absl::ToInt64Seconds(task_duration) << " seconds";
+    }
+    PendingJobRegistry::GetInstance().RemoveRunningJob(name_, post_time_);
+    PendingJobRegistry::GetInstance().ListJobs();
+  }
+
+ private:
+  static constexpr absl::Duration kMinReportedStartDelay = absl::Seconds(5);
+  static constexpr absl::Duration kMinReportedTaskDuration = absl::Seconds(10);
+  const std::string name_;
+  Runnable runnable_;
+  absl::Time post_time_ = SystemClock::ElapsedRealtime();
+};
+
+}  // namespace nearby
+}  // namespace location
+
+#endif  // PLATFORM_PUBLIC_MONITORED_RUNNABLE_H_
diff --git a/cpp/platform/public/pending_job_registry.h b/cpp/platform/public/pending_job_registry.h
new file mode 100644
index 0000000..1cb79cb
--- /dev/null
+++ b/cpp/platform/public/pending_job_registry.h
@@ -0,0 +1,102 @@
+// Copyright 2020 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef PLATFORM_PUBLIC_PENDING_JOB_REGISTRY_H_
+#define PLATFORM_PUBLIC_PENDING_JOB_REGISTRY_H_
+
+#include "platform/public/logging.h"
+#include "platform/public/mutex.h"
+#include "platform/public/mutex_lock.h"
+#include "platform/public/system_clock.h"
+#include "absl/base/thread_annotations.h"
+#include "absl/time/time.h"
+
+namespace location {
+namespace nearby {
+
+// A global registry of running tasks. The goal is to help us monitor
+// tasks that are either waiting too long for their turn or they never finish
+class PendingJobRegistry {
+ public:
+  static PendingJobRegistry& GetInstance() {
+    static PendingJobRegistry* instance = new PendingJobRegistry();
+    return *instance;
+  }
+
+  void AddPendingJob(const std::string& name, absl::Time post_time) {
+    MutexLock lock(&mutex_);
+    pending_jobs_.emplace(CreateKey(name, post_time), post_time);
+  }
+
+  void RemovePendingJob(const std::string& name, absl::Time post_time) {
+    MutexLock lock(&mutex_);
+    pending_jobs_.erase(CreateKey(name, post_time));
+  }
+
+  void AddRunningJob(const std::string& name, absl::Time post_time) {
+    MutexLock lock(&mutex_);
+    running_jobs_.emplace(CreateKey(name, post_time),
+                          SystemClock::ElapsedRealtime());
+  }
+
+  void RemoveRunningJob(const std::string& name, absl::Time post_time) {
+    MutexLock lock(&mutex_);
+    running_jobs_.erase(CreateKey(name, post_time));
+  }
+
+  void ListJobs() {
+    auto current_time = SystemClock::ElapsedRealtime();
+    if (current_time - list_jobs_time_ < kMinReportInterval) return;
+    MutexLock lock(&mutex_);
+    for (auto& job : pending_jobs_) {
+      auto age = current_time - job.second;
+      if (age >= kReportPendingJobsOlderThan) {
+        NEARBY_LOGS(INFO) << "Task \"" << job.first << "\" is waiting for "
+                          << absl::ToInt64Seconds(age) << " s";
+      }
+    }
+    for (auto& job : running_jobs_) {
+      auto age = current_time - job.second;
+      if (age >= kReportRunningJobsOlderThan) {
+        NEARBY_LOGS(INFO) << "Task \"" << job.first << "\" is running for "
+                          << absl::ToInt64Seconds(age) << " s";
+      }
+    }
+    list_jobs_time_ = current_time;
+  }
+
+ private:
+  PendingJobRegistry() = default;
+  static constexpr absl::Duration kMinReportInterval = absl::Seconds(60);
+  static constexpr absl::Duration kReportPendingJobsOlderThan =
+      absl::Seconds(40);
+  static constexpr absl::Duration kReportRunningJobsOlderThan =
+      absl::Seconds(60);
+
+  std::string CreateKey(const std::string& name, absl::Time post_time) {
+    return name + "." + std::to_string(absl::ToUnixNanos(post_time));
+  }
+
+  Mutex mutex_;
+  absl::flat_hash_map<const std::string, absl::Time> pending_jobs_
+      ABSL_GUARDED_BY(mutex_);
+  absl::flat_hash_map<const std::string, absl::Time> running_jobs_
+      ABSL_GUARDED_BY(mutex_);
+  absl::Time list_jobs_time_ = absl::UnixEpoch();
+};
+
+}  // namespace nearby
+}  // namespace location
+
+#endif  // PLATFORM_PUBLIC_PENDING_JOB_REGISTRY_H_
diff --git a/cpp/platform/public/scheduled_executor.h b/cpp/platform/public/scheduled_executor.h
index bd60388..26dc255 100644
--- a/cpp/platform/public/scheduled_executor.h
+++ b/cpp/platform/public/scheduled_executor.h
@@ -25,6 +25,7 @@
 #include "platform/public/cancelable.h"
 #include "platform/public/cancellable_task.h"
 #include "platform/public/lockable.h"
+#include "platform/public/monitored_runnable.h"
 #include "platform/public/mutex.h"
 #include "platform/public/mutex_lock.h"
 #include "platform/public/thread_check_callable.h"
@@ -59,6 +60,14 @@
     }
     return *this;
   }
+  void Execute(const std::string& name, Runnable&& runnable)
+      ABSL_LOCKS_EXCLUDED(mutex_) {
+    MutexLock lock(&mutex_);
+    if (impl_)
+      impl_->Execute(MonitoredRunnable(
+          name, ThreadCheckRunnable(this, std::move(runnable))));
+  }
+
   void Execute(Runnable&& runnable) ABSL_LOCKS_EXCLUDED(mutex_) {
     MutexLock lock(&mutex_);
     if (impl_) impl_->Execute(ThreadCheckRunnable(this, std::move(runnable)));
diff --git a/cpp/platform/public/single_thread_executor_test.cc b/cpp/platform/public/single_thread_executor_test.cc
index f89773f..e2059ed 100644
--- a/cpp/platform/public/single_thread_executor_test.cc
+++ b/cpp/platform/public/single_thread_executor_test.cc
@@ -16,6 +16,7 @@
 
 #include <atomic>
 #include <functional>
+#include <string>
 
 #include "platform/base/exception.h"
 #include "gtest/gtest.h"
@@ -47,6 +48,24 @@
   EXPECT_TRUE(done);
 }
 
+TEST(SingleThreadExecutorTest, CanExecuteNamedTask) {
+  absl::CondVar cond;
+  std::atomic_bool done = false;
+  SingleThreadExecutor executor;
+  executor.Execute("my task", [&done, &cond]() {
+    done = true;
+    cond.SignalAll();
+  });
+  absl::Mutex mutex;
+  {
+    absl::MutexLock lock(&mutex);
+    if (!done) {
+      cond.WaitWithTimeout(&mutex, absl::Seconds(1));
+    }
+  }
+  EXPECT_TRUE(done);
+}
+
 TEST(SingleThreadExecutorTest, JobsExecuteInOrder) {
   std::vector<int> results;
   SingleThreadExecutor executor;
diff --git a/cpp/platform/public/submittable_executor.h b/cpp/platform/public/submittable_executor.h
index c5c6f3a..8e4e073 100644
--- a/cpp/platform/public/submittable_executor.h
+++ b/cpp/platform/public/submittable_executor.h
@@ -26,6 +26,7 @@
 #include "platform/base/runnable.h"
 #include "platform/public/future.h"
 #include "platform/public/lockable.h"
+#include "platform/public/monitored_runnable.h"
 #include "platform/public/mutex.h"
 #include "platform/public/mutex_lock.h"
 #include "platform/public/thread_check_callable.h"
@@ -57,9 +58,19 @@
     }
     return *this;
   }
+  void Execute(const std::string& name, Runnable&& runnable)
+      ABSL_LOCKS_EXCLUDED(mutex_) {
+    MutexLock lock(&mutex_);
+    if (impl_)
+      impl_->Execute(MonitoredRunnable(
+          name, ThreadCheckRunnable(this, std::move(runnable))));
+  }
+
   void Execute(Runnable&& runnable) ABSL_LOCKS_EXCLUDED(mutex_) override {
     MutexLock lock(&mutex_);
-    if (impl_) impl_->Execute(ThreadCheckRunnable(this, std::move(runnable)));
+    if (impl_)
+      impl_->Execute(
+          MonitoredRunnable(ThreadCheckRunnable(this, std::move(runnable))));
   }
 
   int GetTid(int index) const ABSL_LOCKS_EXCLUDED(mutex_) override {