Internal change
PiperOrigin-RevId: 369753540
diff --git a/cpp/core/internal/base_pcp_handler.cc b/cpp/core/internal/base_pcp_handler.cc
index ae2dbc9..0fd75f8 100644
--- a/cpp/core/internal/base_pcp_handler.cc
+++ b/cpp/core/internal/base_pcp_handler.cc
@@ -82,34 +82,37 @@
NEARBY_LOG(INFO, "StartAdvertising with supported mediums: %s",
GetStringValueOfSupportedMediums(options).c_str());
ConnectionOptions advertising_options = options.CompatibleOptions();
- RunOnPcpHandlerThread([this, client, &service_id, &info, &advertising_options,
- &response]() RUN_ON_PCP_HANDLER_THREAD() {
- // The endpoint id inside of the advertisement is different to high
- // visibility and low visibility mode. In order to decide if client should
- // grab the high visibility or low visibility id, it needs to tell client
- // which one right now, before client#StartedAdvertising.
- if (ShouldEnterHighVisibilityMode(advertising_options)) {
- client->EnterHighVisibilityMode();
- }
+ RunOnPcpHandlerThread(
+ "start-advertising",
+ [this, client, &service_id, &info, &advertising_options, &response]()
+ RUN_ON_PCP_HANDLER_THREAD() {
+ // The endpoint id inside of the advertisement is different to high
+ // visibility and low visibility mode. In order to decide if client
+ // should grab the high visibility or low visibility id, it needs to
+ // tell client which one right now, before
+ // client#StartedAdvertising.
+ if (ShouldEnterHighVisibilityMode(advertising_options)) {
+ client->EnterHighVisibilityMode();
+ }
- auto result =
- StartAdvertisingImpl(client, service_id, client->GetLocalEndpointId(),
- info.endpoint_info, advertising_options);
- if (!result.status.Ok()) {
- client->ExitHighVisibilityMode();
- response.Set(result.status);
- return;
- }
+ auto result = StartAdvertisingImpl(
+ client, service_id, client->GetLocalEndpointId(),
+ info.endpoint_info, advertising_options);
+ if (!result.status.Ok()) {
+ client->ExitHighVisibilityMode();
+ response.Set(result.status);
+ return;
+ }
- // Now that we've succeeded, mark the client as advertising.
- // Save the advertising options for local reference in later process like
- // upgrading bandwidth.
- advertising_listener_ = info.listener;
- client->StartedAdvertising(service_id, GetStrategy(), info.listener,
- absl::MakeSpan(result.mediums),
- advertising_options);
- response.Set({Status::kSuccess});
- });
+ // Now that we've succeeded, mark the client as advertising.
+ // Save the advertising options for local reference in later process
+ // like upgrading bandwidth.
+ advertising_listener_ = info.listener;
+ client->StartedAdvertising(service_id, GetStrategy(), info.listener,
+ absl::MakeSpan(result.mediums),
+ advertising_options);
+ response.Set({Status::kSuccess});
+ });
return WaitForResult(absl::StrCat("StartAdvertising(", service_id, ")"),
client->GetClientId(), &response);
}
@@ -117,11 +120,12 @@
void BasePcpHandler::StopAdvertising(ClientProxy* client) {
NEARBY_LOGS(INFO) << "StopAdvertising id=" << client->GetLocalEndpointId();
CountDownLatch latch(1);
- RunOnPcpHandlerThread([this, client, &latch]() RUN_ON_PCP_HANDLER_THREAD() {
- StopAdvertisingImpl(client);
- client->StoppedAdvertising();
- latch.CountDown();
- });
+ RunOnPcpHandlerThread("stop-advertising",
+ [this, client, &latch]() RUN_ON_PCP_HANDLER_THREAD() {
+ StopAdvertisingImpl(client);
+ client->StoppedAdvertising();
+ latch.CountDown();
+ });
WaitForLatch("StopAdvertising", &latch);
}
@@ -188,33 +192,36 @@
NEARBY_LOG(INFO, "StartDiscovery with supported mediums: %s",
GetStringValueOfSupportedMediums(options).c_str());
- RunOnPcpHandlerThread([this, client, service_id, discovery_options, &listener,
- &response]() RUN_ON_PCP_HANDLER_THREAD() {
- // Ask the implementation to attempt to start discovery.
- auto result = StartDiscoveryImpl(client, service_id, discovery_options);
- if (!result.status.Ok()) {
- response.Set(result.status);
- return;
- }
+ RunOnPcpHandlerThread(
+ "start-discovery", [this, client, service_id, discovery_options,
+ &listener, &response]() RUN_ON_PCP_HANDLER_THREAD() {
+ // Ask the implementation to attempt to start discovery.
+ auto result = StartDiscoveryImpl(client, service_id, discovery_options);
+ if (!result.status.Ok()) {
+ response.Set(result.status);
+ return;
+ }
- // Now that we've succeeded, mark the client as discovering and clear
- // out any old endpoints we had discovered.
- discovered_endpoints_.clear();
- client->StartedDiscovery(service_id, GetStrategy(), listener,
- absl::MakeSpan(result.mediums), discovery_options);
- response.Set({Status::kSuccess});
- });
+ // Now that we've succeeded, mark the client as discovering and clear
+ // out any old endpoints we had discovered.
+ discovered_endpoints_.clear();
+ client->StartedDiscovery(service_id, GetStrategy(), listener,
+ absl::MakeSpan(result.mediums),
+ discovery_options);
+ response.Set({Status::kSuccess});
+ });
return WaitForResult(absl::StrCat("StartDiscovery(", service_id, ")"),
client->GetClientId(), &response);
}
void BasePcpHandler::StopDiscovery(ClientProxy* client) {
CountDownLatch latch(1);
- RunOnPcpHandlerThread([this, client, &latch]() RUN_ON_PCP_HANDLER_THREAD() {
- StopDiscoveryImpl(client);
- client->StoppedDiscovery();
- latch.CountDown();
- });
+ RunOnPcpHandlerThread("stop-discovery",
+ [this, client, &latch]() RUN_ON_PCP_HANDLER_THREAD() {
+ StopDiscoveryImpl(client);
+ client->StoppedDiscovery();
+ latch.CountDown();
+ });
WaitForLatch("StopDiscovery", &latch);
}
@@ -223,7 +230,8 @@
ClientProxy* client, const std::string& service_id,
const OutOfBandConnectionMetadata& metadata) {
CountDownLatch latch(1);
- RunOnPcpHandlerThread([this, client, service_id, metadata, &latch]()
+ RunOnPcpHandlerThread("inject-endpoint",
+ [this, client, service_id, metadata, &latch]()
RUN_ON_PCP_HANDLER_THREAD() {
InjectEndpointImpl(client, service_id, metadata);
latch.CountDown();
@@ -261,8 +269,9 @@
return result.result();
}
-void BasePcpHandler::RunOnPcpHandlerThread(Runnable runnable) {
- serial_executor_.Execute(std::move(runnable));
+void BasePcpHandler::RunOnPcpHandlerThread(const std::string& name,
+ Runnable runnable) {
+ serial_executor_.Execute(name, std::move(runnable));
}
EncryptionRunner::ResultListener BasePcpHandler::GetResultListener() {
@@ -273,6 +282,7 @@
const std::string& auth_token,
const ByteArray& raw_auth_token) {
RunOnPcpHandlerThread(
+ "encryption-success",
[this, endpoint_id, raw_ukey2 = ukey2.release(), auth_token,
raw_auth_token]() RUN_ON_PCP_HANDLER_THREAD() mutable {
OnEncryptionSuccessRunnable(
@@ -282,12 +292,13 @@
},
.on_failure_cb =
[this](const std::string& endpoint_id, EndpointChannel* channel) {
- RunOnPcpHandlerThread([this, endpoint_id,
- channel]() RUN_ON_PCP_HANDLER_THREAD() {
- NEARBY_LOG(ERROR, "Encryption failed for %s on medium %d",
- endpoint_id.c_str(), channel->GetMedium());
- OnEncryptionFailureRunnable(endpoint_id, channel);
- });
+ RunOnPcpHandlerThread(
+ "encryption-failure",
+ [this, endpoint_id, channel]() RUN_ON_PCP_HANDLER_THREAD() {
+ NEARBY_LOG(ERROR, "Encryption failed for %s on medium %d",
+ endpoint_id.c_str(), channel->GetMedium());
+ OnEncryptionFailureRunnable(endpoint_id, channel);
+ });
},
};
}
@@ -395,119 +406,126 @@
const ConnectionRequestInfo& info,
const ConnectionOptions& options) {
auto result = std::make_shared<Future<Status>>();
- RunOnPcpHandlerThread([this, client, &info, options, endpoint_id,
- result]() RUN_ON_PCP_HANDLER_THREAD() {
- absl::Time start_time = SystemClock::ElapsedRealtime();
+ RunOnPcpHandlerThread(
+ "request-connection", [this, client, &info, options, endpoint_id,
+ result]() RUN_ON_PCP_HANDLER_THREAD() {
+ absl::Time start_time = SystemClock::ElapsedRealtime();
- // If we already have a pending connection, then we shouldn't allow any more
- // outgoing connections to this endpoint.
- if (pending_connections_.count(endpoint_id)) {
- NEARBY_LOG(INFO, "Connection already exists: id=%s", endpoint_id.c_str());
- result->Set({Status::kAlreadyConnectedToEndpoint});
- return;
- }
+ // If we already have a pending connection, then we shouldn't allow any
+ // more outgoing connections to this endpoint.
+ if (pending_connections_.count(endpoint_id)) {
+ NEARBY_LOG(INFO, "Connection already exists: id=%s",
+ endpoint_id.c_str());
+ result->Set({Status::kAlreadyConnectedToEndpoint});
+ return;
+ }
- // If our child class says we can't send any more outgoing connections,
- // listen to them.
- if (ShouldEnforceTopologyConstraints(client->GetAdvertisingOptions()) &&
- !CanSendOutgoingConnection(client)) {
- NEARBY_LOG(INFO, "Outgoing connection not allowed: id=%s",
- endpoint_id.c_str());
- result->Set({Status::kOutOfOrderApiCall});
- return;
- }
+ // If our child class says we can't send any more outgoing connections,
+ // listen to them.
+ if (ShouldEnforceTopologyConstraints(client->GetAdvertisingOptions()) &&
+ !CanSendOutgoingConnection(client)) {
+ NEARBY_LOG(INFO, "Outgoing connection not allowed: id=%s",
+ endpoint_id.c_str());
+ result->Set({Status::kOutOfOrderApiCall});
+ return;
+ }
- DiscoveredEndpoint* endpoint = GetDiscoveredEndpoint(endpoint_id);
- if (endpoint == nullptr) {
- NEARBY_LOG(INFO, "Discovered endpoint not found: id=%s",
- endpoint_id.c_str());
- result->Set({Status::kEndpointUnknown});
- return;
- }
+ DiscoveredEndpoint* endpoint = GetDiscoveredEndpoint(endpoint_id);
+ if (endpoint == nullptr) {
+ NEARBY_LOG(INFO, "Discovered endpoint not found: id=%s",
+ endpoint_id.c_str());
+ result->Set({Status::kEndpointUnknown});
+ return;
+ }
- auto remote_bluetooth_mac_address =
- BluetoothUtils::ToString(options.remote_bluetooth_mac_address);
- if (!remote_bluetooth_mac_address.empty()) {
- if (AppendRemoteBluetoothMacAddressEndpoint(
- endpoint_id, remote_bluetooth_mac_address,
- client->GetDiscoveryOptions()))
- NEARBY_LOGS(INFO) << "Appended remote Bluetooth MAC Address endpoint "
- << "[" << remote_bluetooth_mac_address << "]";
- }
+ auto remote_bluetooth_mac_address =
+ BluetoothUtils::ToString(options.remote_bluetooth_mac_address);
+ if (!remote_bluetooth_mac_address.empty()) {
+ if (AppendRemoteBluetoothMacAddressEndpoint(
+ endpoint_id, remote_bluetooth_mac_address,
+ client->GetDiscoveryOptions()))
+ NEARBY_LOGS(INFO)
+ << "Appended remote Bluetooth MAC Address endpoint "
+ << "[" << remote_bluetooth_mac_address << "]";
+ }
- if (AppendWebRTCEndpoint(endpoint_id, client->GetDiscoveryOptions()))
- NEARBY_LOGS(INFO) << "Appended Web RTC endpoint.";
+ if (AppendWebRTCEndpoint(endpoint_id, client->GetDiscoveryOptions()))
+ NEARBY_LOGS(INFO) << "Appended Web RTC endpoint.";
- auto discovered_endpoints = GetDiscoveredEndpoints(endpoint_id);
- std::unique_ptr<EndpointChannel> channel;
- ConnectImplResult connect_impl_result;
+ auto discovered_endpoints = GetDiscoveredEndpoints(endpoint_id);
+ std::unique_ptr<EndpointChannel> channel;
+ ConnectImplResult connect_impl_result;
- for (auto connect_endpoint : discovered_endpoints) {
- if (!MediumSupportedByClientOptions(connect_endpoint->medium, options))
- continue;
- connect_impl_result = ConnectImpl(client, connect_endpoint);
- if (connect_impl_result.status.Ok()) {
- channel = std::move(connect_impl_result.endpoint_channel);
- break;
- }
- }
+ for (auto connect_endpoint : discovered_endpoints) {
+ if (!MediumSupportedByClientOptions(connect_endpoint->medium,
+ options))
+ continue;
+ connect_impl_result = ConnectImpl(client, connect_endpoint);
+ if (connect_impl_result.status.Ok()) {
+ channel = std::move(connect_impl_result.endpoint_channel);
+ break;
+ }
+ }
- if (channel == nullptr) {
- NEARBY_LOG(INFO, "Endpoint channel not available: id=%s",
- endpoint_id.c_str());
- ProcessPreConnectionInitiationFailure(
- endpoint_id, channel.get(), connect_impl_result.status, result.get());
- return;
- }
+ if (channel == nullptr) {
+ NEARBY_LOG(INFO, "Endpoint channel not available: id=%s",
+ endpoint_id.c_str());
+ ProcessPreConnectionInitiationFailure(endpoint_id, channel.get(),
+ connect_impl_result.status,
+ result.get());
+ return;
+ }
- NEARBY_LOG(INFO, "Sending connection request: id=%s", endpoint_id.c_str());
- // Generate the nonce to use for this connection.
- std::int32_t nonce = prng_.NextInt32();
+ NEARBY_LOG(INFO, "Sending connection request: id=%s",
+ endpoint_id.c_str());
+ // Generate the nonce to use for this connection.
+ std::int32_t nonce = prng_.NextInt32();
- // The first message we have to send, after connecting, is to tell the
- // endpoint about ourselves.
- Exception write_exception = WriteConnectionRequestFrame(
- channel.get(), client->GetLocalEndpointId(), info.endpoint_info, nonce,
- GetSupportedConnectionMediumsByPriority(options));
- if (!write_exception.Ok()) {
- NEARBY_LOG(INFO, "Failed to send connection request: id=%s",
- endpoint_id.c_str());
- ProcessPreConnectionInitiationFailure(
- endpoint_id, channel.get(), {Status::kEndpointIoError}, result.get());
- return;
- }
+ // The first message we have to send, after connecting, is to tell the
+ // endpoint about ourselves.
+ Exception write_exception = WriteConnectionRequestFrame(
+ channel.get(), client->GetLocalEndpointId(), info.endpoint_info,
+ nonce, GetSupportedConnectionMediumsByPriority(options));
+ if (!write_exception.Ok()) {
+ NEARBY_LOG(INFO, "Failed to send connection request: id=%s",
+ endpoint_id.c_str());
+ ProcessPreConnectionInitiationFailure(endpoint_id, channel.get(),
+ {Status::kEndpointIoError},
+ result.get());
+ return;
+ }
- NEARBY_LOG(INFO, "adding connection to pending set: id=%s",
- endpoint_id.c_str());
+ NEARBY_LOG(INFO, "adding connection to pending set: id=%s",
+ endpoint_id.c_str());
- // We've successfully connected to the device, and are now about to jump on
- // to the EncryptionRunner thread to start running our encryption protocol.
- // We'll mark ourselves as pending in case we get another call to
- // RequestConnection or OnIncomingConnection, so that we can cancel the
- // connection if needed.
- EndpointChannel* endpoint_channel =
- pending_connections_
- .emplace(endpoint_id,
- PendingConnectionInfo{
- .client = client,
- .remote_endpoint_info = endpoint->endpoint_info,
- .nonce = nonce,
- .is_incoming = false,
- .start_time = start_time,
- .listener = info.listener,
- .options = options,
- .result = result,
- .channel = std::move(channel),
- })
- .first->second.channel.get();
+ // We've successfully connected to the device, and are now about to jump
+ // on to the EncryptionRunner thread to start running our encryption
+ // protocol. We'll mark ourselves as pending in case we get another call
+ // to RequestConnection or OnIncomingConnection, so that we can cancel
+ // the connection if needed.
+ EndpointChannel* endpoint_channel =
+ pending_connections_
+ .emplace(endpoint_id,
+ PendingConnectionInfo{
+ .client = client,
+ .remote_endpoint_info = endpoint->endpoint_info,
+ .nonce = nonce,
+ .is_incoming = false,
+ .start_time = start_time,
+ .listener = info.listener,
+ .options = options,
+ .result = result,
+ .channel = std::move(channel),
+ })
+ .first->second.channel.get();
- NEARBY_LOG(INFO, "Initiating secure connection: id=%s",
- endpoint_id.c_str());
- // Next, we'll set up encryption. When it's done, our future will return and
- // RequestConnection() will finish.
- encryption_runner_.StartClient(client, endpoint_id, endpoint_channel,
- GetResultListener());
- });
+ NEARBY_LOG(INFO, "Initiating secure connection: id=%s",
+ endpoint_id.c_str());
+ // Next, we'll set up encryption. When it's done, our future will return
+ // and RequestConnection() will finish.
+ encryption_runner_.StartClient(client, endpoint_id, endpoint_channel,
+ GetResultListener());
+ });
NEARBY_LOG(INFO, "Waiting for connection to complete: id=%s",
endpoint_id.c_str());
auto status =
@@ -672,8 +690,8 @@
const PayloadListener& payload_listener) {
Future<Status> response;
RunOnPcpHandlerThread(
- [this, client, endpoint_id, payload_listener,
- &response]() RUN_ON_PCP_HANDLER_THREAD() {
+ "accept-connection", [this, client, endpoint_id, payload_listener,
+ &response]() RUN_ON_PCP_HANDLER_THREAD() {
NEARBY_LOG(INFO, "AcceptConnection: id=%s", endpoint_id.c_str());
if (!pending_connections_.count(endpoint_id)) {
NEARBY_LOG(INFO, "AcceptConnection: no pending connection for id=%s",
@@ -726,51 +744,52 @@
Status BasePcpHandler::RejectConnection(ClientProxy* client,
const std::string& endpoint_id) {
Future<Status> response;
- RunOnPcpHandlerThread([this, client, endpoint_id,
- &response]() RUN_ON_PCP_HANDLER_THREAD() {
- NEARBY_LOG(INFO, "RejectConnection: id=%s", endpoint_id.c_str());
- if (!pending_connections_.count(endpoint_id)) {
- NEARBY_LOG(INFO, "RejectConnection: no pending connection for id=%s",
- endpoint_id.c_str());
- response.Set({Status::kEndpointUnknown});
- return;
- }
- auto& connection_info = pending_connections_[endpoint_id];
+ RunOnPcpHandlerThread(
+ "reject-connection",
+ [this, client, endpoint_id, &response]() RUN_ON_PCP_HANDLER_THREAD() {
+ NEARBY_LOG(INFO, "RejectConnection: id=%s", endpoint_id.c_str());
+ if (!pending_connections_.count(endpoint_id)) {
+ NEARBY_LOG(INFO, "RejectConnection: no pending connection for id=%s",
+ endpoint_id.c_str());
+ response.Set({Status::kEndpointUnknown});
+ return;
+ }
+ auto& connection_info = pending_connections_[endpoint_id];
- // By this point in the flow, connection_info->endpoint_channel_ has been
- // nulled out because ownership of that EndpointChannel was passed on to
- // EndpointChannelManager via a call to
- // EndpointManager::registerEndpoint(), so we now need to get access to the
- // EndpointChannel from the authoritative owner.
- std::shared_ptr<EndpointChannel> channel =
- channel_manager_->GetChannelForEndpoint(endpoint_id);
- if (channel == nullptr) {
- NEARBY_LOG(
- ERROR,
- "Channel destroyed before Reject; bring down connection: id=%s",
- endpoint_id.c_str());
- ProcessPreConnectionResultFailure(client, endpoint_id);
- response.Set({Status::kEndpointUnknown});
- return;
- }
+ // By this point in the flow, connection_info->endpoint_channel_ has
+ // been nulled out because ownership of that EndpointChannel was passed
+ // on to EndpointChannelManager via a call to
+ // EndpointManager::registerEndpoint(), so we now need to get access to
+ // the EndpointChannel from the authoritative owner.
+ std::shared_ptr<EndpointChannel> channel =
+ channel_manager_->GetChannelForEndpoint(endpoint_id);
+ if (channel == nullptr) {
+ NEARBY_LOG(
+ ERROR,
+ "Channel destroyed before Reject; bring down connection: id=%s",
+ endpoint_id.c_str());
+ ProcessPreConnectionResultFailure(client, endpoint_id);
+ response.Set({Status::kEndpointUnknown});
+ return;
+ }
- Exception write_exception = channel->Write(
- parser::ForConnectionResponse(Status::kConnectionRejected));
- if (!write_exception.Ok()) {
- NEARBY_LOG(INFO, "RejectConnection: failed to send response: id=%s",
- endpoint_id.c_str());
- ProcessPreConnectionResultFailure(client, endpoint_id);
- response.Set({Status::kEndpointIoError});
- return;
- }
+ Exception write_exception = channel->Write(
+ parser::ForConnectionResponse(Status::kConnectionRejected));
+ if (!write_exception.Ok()) {
+ NEARBY_LOG(INFO, "RejectConnection: failed to send response: id=%s",
+ endpoint_id.c_str());
+ ProcessPreConnectionResultFailure(client, endpoint_id);
+ response.Set({Status::kEndpointIoError});
+ return;
+ }
- NEARBY_LOG(INFO, "RejectConnection: rejecting locally: id=%s",
- endpoint_id.c_str());
- connection_info.LocalEndpointRejectedConnection(endpoint_id);
- EvaluateConnectionResult(client, endpoint_id,
- false /* can_close_immediately */);
- response.Set({Status::kSuccess});
- });
+ NEARBY_LOG(INFO, "RejectConnection: rejecting locally: id=%s",
+ endpoint_id.c_str());
+ connection_info.LocalEndpointRejectedConnection(endpoint_id);
+ EvaluateConnectionResult(client, endpoint_id,
+ false /* can_close_immediately */);
+ response.Set({Status::kSuccess});
+ });
return WaitForResult(absl::StrCat("RejectConnection(", endpoint_id, ")"),
client->GetClientId(), &response);
@@ -781,45 +800,46 @@
ClientProxy* client,
proto::connections::Medium medium) {
CountDownLatch latch(1);
- RunOnPcpHandlerThread([this, client, endpoint_id, frame,
- &latch]() RUN_ON_PCP_HANDLER_THREAD() {
- NEARBY_LOG(INFO, "OnConnectionResponse: id=%s", endpoint_id.c_str());
+ RunOnPcpHandlerThread(
+ "incoming-frame",
+ [this, client, endpoint_id, frame, &latch]() RUN_ON_PCP_HANDLER_THREAD() {
+ NEARBY_LOG(INFO, "OnConnectionResponse: id=%s", endpoint_id.c_str());
- if (client->HasRemoteEndpointResponded(endpoint_id)) {
- NEARBY_LOG(INFO, "OnConnectionResponse: already handled; id=%s",
- endpoint_id.c_str());
- return;
- }
+ if (client->HasRemoteEndpointResponded(endpoint_id)) {
+ NEARBY_LOG(INFO, "OnConnectionResponse: already handled; id=%s",
+ endpoint_id.c_str());
+ return;
+ }
- const ConnectionResponseFrame& connection_response =
- frame.v1().connection_response();
+ const ConnectionResponseFrame& connection_response =
+ frame.v1().connection_response();
- // For backward compatible, here still check both status and
- // response parameters until the response feature is roll out in all
- // supported devices.
- bool accepted = false;
- if (connection_response.has_response()) {
- accepted =
- connection_response.response() == ConnectionResponseFrame::ACCEPT;
- } else {
- accepted = connection_response.status() == Status::kSuccess;
- }
- if (accepted) {
- NEARBY_LOG(INFO, "OnConnectionResponse: remote accepted; id=%s",
- endpoint_id.c_str());
- client->RemoteEndpointAcceptedConnection(endpoint_id);
- } else {
- NEARBY_LOG(INFO,
- "OnConnectionResponse: remote rejected; id=%s; status=%d",
- endpoint_id.c_str(), connection_response.status());
- client->RemoteEndpointRejectedConnection(endpoint_id);
- }
+ // For backward compatible, here still check both status and
+ // response parameters until the response feature is roll out in all
+ // supported devices.
+ bool accepted = false;
+ if (connection_response.has_response()) {
+ accepted =
+ connection_response.response() == ConnectionResponseFrame::ACCEPT;
+ } else {
+ accepted = connection_response.status() == Status::kSuccess;
+ }
+ if (accepted) {
+ NEARBY_LOG(INFO, "OnConnectionResponse: remote accepted; id=%s",
+ endpoint_id.c_str());
+ client->RemoteEndpointAcceptedConnection(endpoint_id);
+ } else {
+ NEARBY_LOG(INFO,
+ "OnConnectionResponse: remote rejected; id=%s; status=%d",
+ endpoint_id.c_str(), connection_response.status());
+ client->RemoteEndpointRejectedConnection(endpoint_id);
+ }
- EvaluateConnectionResult(client, endpoint_id,
- /* can_close_immediately= */ true);
+ EvaluateConnectionResult(client, endpoint_id,
+ /* can_close_immediately= */ true);
- latch.CountDown();
- });
+ latch.CountDown();
+ });
WaitForLatch("OnIncomingFrame()", &latch);
}
@@ -830,17 +850,19 @@
barrier.CountDown();
return;
}
- RunOnPcpHandlerThread([this, client, endpoint_id,
- barrier]() RUN_ON_PCP_HANDLER_THREAD() mutable {
- auto item = pending_alarms_.find(endpoint_id);
- if (item != pending_alarms_.end()) {
- auto& alarm = item->second;
- alarm.Cancel();
- pending_alarms_.erase(item);
- }
- ProcessPreConnectionResultFailure(client, endpoint_id);
- barrier.CountDown();
- });
+ RunOnPcpHandlerThread("on-endpoint-disconnect",
+ [this, client, endpoint_id, barrier]()
+ RUN_ON_PCP_HANDLER_THREAD() mutable {
+ auto item = pending_alarms_.find(endpoint_id);
+ if (item != pending_alarms_.end()) {
+ auto& alarm = item->second;
+ alarm.Cancel();
+ pending_alarms_.erase(item);
+ }
+ ProcessPreConnectionResultFailure(client,
+ endpoint_id);
+ barrier.CountDown();
+ });
}
BluetoothDevice BasePcpHandler::GetRemoteBluetoothDevice(
diff --git a/cpp/core/internal/base_pcp_handler.h b/cpp/core/internal/base_pcp_handler.h
index 47ac391..3cf834b 100644
--- a/cpp/core/internal/base_pcp_handler.h
+++ b/cpp/core/internal/base_pcp_handler.h
@@ -223,7 +223,7 @@
std::unique_ptr<EndpointChannel> endpoint_channel;
};
- void RunOnPcpHandlerThread(Runnable runnable);
+ void RunOnPcpHandlerThread(const std::string& name, Runnable runnable);
BluetoothDevice GetRemoteBluetoothDevice(
const std::string& remote_bluetooth_mac_address);
diff --git a/cpp/core/internal/bwu_manager.cc b/cpp/core/internal/bwu_manager.cc
index 626ac7a..b5063b8 100644
--- a/cpp/core/internal/bwu_manager.cc
+++ b/cpp/core/internal/bwu_manager.cc
@@ -131,7 +131,7 @@
Medium new_medium) {
NEARBY_LOG(INFO, "InitiateBwuForEndpoint for endpoint %s with medium %d",
endpoint_id.c_str(), new_medium);
- RunOnBwuManagerThread([this, client, endpoint_id, new_medium]() {
+ RunOnBwuManagerThread("bwu-init", [this, client, endpoint_id, new_medium]() {
Medium proposed_medium = ChooseBestUpgradeMedium(
client->GetUpgradeMediums(endpoint_id).GetMediums(true));
if (new_medium != Medium::UNKNOWN_MEDIUM) {
@@ -207,12 +207,14 @@
return;
auto bwu_frame = frame.v1().bandwidth_upgrade_negotiation();
if (FeatureFlags::GetInstance().GetFlags().enable_async_bandwidth_upgrade) {
- RunOnBwuManagerThread([this, client, endpoint_id, bwu_frame]() {
- OnBwuNegotiationFrame(client, bwu_frame, endpoint_id);
- });
+ RunOnBwuManagerThread(
+ "bwu-on-incoming-frame", [this, client, endpoint_id, bwu_frame]() {
+ OnBwuNegotiationFrame(client, bwu_frame, endpoint_id);
+ });
} else {
CountDownLatch latch(1);
- RunOnBwuManagerThread([this, client, endpoint_id, bwu_frame, &latch]() {
+ RunOnBwuManagerThread("bwu-on-incoming-frame", [this, client, endpoint_id,
+ bwu_frame, &latch]() {
OnBwuNegotiationFrame(client, bwu_frame, endpoint_id);
latch.CountDown();
});
@@ -224,38 +226,40 @@
const std::string& endpoint_id,
CountDownLatch barrier) {
NEARBY_LOG(INFO, "OnEndpointDisconnect for endpoint %s", endpoint_id.c_str());
- RunOnBwuManagerThread([this, client, endpoint_id, barrier]() mutable {
- if (medium_ == Medium::UNKNOWN_MEDIUM) {
- barrier.CountDown();
- return;
- }
+ RunOnBwuManagerThread(
+ "bwu-on-endpoint-disconnect",
+ [this, client, endpoint_id, barrier]() mutable {
+ if (medium_ == Medium::UNKNOWN_MEDIUM) {
+ barrier.CountDown();
+ return;
+ }
- if (handler_) {
- handler_->OnEndpointDisconnect(client, endpoint_id);
- }
+ if (handler_) {
+ handler_->OnEndpointDisconnect(client, endpoint_id);
+ }
- auto item = previous_endpoint_channels_.extract(endpoint_id);
+ auto item = previous_endpoint_channels_.extract(endpoint_id);
- if (!item.empty()) {
- auto old_channel = item.mapped();
- if (old_channel != nullptr) {
- old_channel->Close(DisconnectionReason::SHUTDOWN);
- }
- }
- in_progress_upgrades_.erase(endpoint_id);
- CancelRetryUpgradeAlarm(endpoint_id);
+ if (!item.empty()) {
+ auto old_channel = item.mapped();
+ if (old_channel != nullptr) {
+ old_channel->Close(DisconnectionReason::SHUTDOWN);
+ }
+ }
+ in_progress_upgrades_.erase(endpoint_id);
+ CancelRetryUpgradeAlarm(endpoint_id);
- successfully_upgraded_endpoints_.erase(endpoint_id);
+ successfully_upgraded_endpoints_.erase(endpoint_id);
- // If this was our very last endpoint:
- //
- // a) revert all the changes for currentBwuMedium.
- // b) reset currentBwuMedium.
- if (channel_manager_->GetConnectedEndpointsCount() <= 1) {
- Revert();
- }
- barrier.CountDown();
- });
+ // If this was our very last endpoint:
+ //
+ // a) revert all the changes for currentBwuMedium.
+ // b) reset currentBwuMedium.
+ if (channel_manager_->GetConnectedEndpointsCount() <= 1) {
+ Revert();
+ }
+ barrier.CountDown();
+ });
}
BwuHandler* BwuManager::SetCurrentBwuHandler(Medium medium) {
@@ -312,7 +316,8 @@
client->GetServiceId().c_str());
std::shared_ptr<BwuHandler::IncomingSocketConnection> connection(
mutable_connection.release());
- RunOnBwuManagerThread([this, client, connection]() {
+ RunOnBwuManagerThread("bwu-on-incoming-connection", [this, client,
+ connection]() {
EndpointChannel* channel = connection->channel.get();
if (channel == nullptr) {
connection->socket->Close();
@@ -356,8 +361,9 @@
});
}
-void BwuManager::RunOnBwuManagerThread(Runnable runnable) {
- serial_executor_.Execute(std::move(runnable));
+void BwuManager::RunOnBwuManagerThread(const std::string& name,
+ Runnable runnable) {
+ serial_executor_.Execute(name, std::move(runnable));
}
void BwuManager::RunUpgradeProtocol(
@@ -912,7 +918,8 @@
CancelableAlarm alarm(
"BWU alarm",
[this, client, endpoint_id]() {
- RunOnBwuManagerThread([this, client, endpoint_id]() {
+ RunOnBwuManagerThread("bwu-retry-upgrade", [this, client,
+ endpoint_id]() {
if (!client->IsConnectedToEndpoint(endpoint_id)) {
return;
}
diff --git a/cpp/core/internal/bwu_manager.h b/cpp/core/internal/bwu_manager.h
index 2ba3766..f219429 100644
--- a/cpp/core/internal/bwu_manager.h
+++ b/cpp/core/internal/bwu_manager.h
@@ -103,7 +103,8 @@
absl::Seconds(5);
BwuHandler* SetCurrentBwuHandler(Medium medium);
void InitBwuHandlers();
- void RunOnBwuManagerThread(std::function<void()> runnable);
+ void RunOnBwuManagerThread(const std::string& name,
+ std::function<void()> runnable);
std::vector<Medium> StripOutUnavailableMediums(
const std::vector<Medium>& mediums);
Medium ChooseBestUpgradeMedium(const std::vector<Medium>& mediums);
diff --git a/cpp/core/internal/encryption_runner.cc b/cpp/core/internal/encryption_runner.cc
index 24832a6..24833d7 100644
--- a/cpp/core/internal/encryption_runner.cc
+++ b/cpp/core/internal/encryption_runner.cc
@@ -360,6 +360,7 @@
EndpointChannel* endpoint_channel,
EncryptionRunner::ResultListener&& listener) {
server_executor_.Execute(
+ "encryption-server",
[runnable{ServerRunnable(client, &alarm_executor_, endpoint_id,
endpoint_channel, std::move(listener))}]() {
runnable();
@@ -371,6 +372,7 @@
EndpointChannel* endpoint_channel,
EncryptionRunner::ResultListener&& listener) {
client_executor_.Execute(
+ "encryption-client",
[runnable{ClientRunnable(client, &alarm_executor_, endpoint_id,
endpoint_channel, std::move(listener))}]() {
runnable();
diff --git a/cpp/core/internal/endpoint_manager.cc b/cpp/core/internal/endpoint_manager.cc
index d731851..3fa4b48 100644
--- a/cpp/core/internal/endpoint_manager.cc
+++ b/cpp/core/internal/endpoint_manager.cc
@@ -91,6 +91,8 @@
// will retry and attempt to pick another channel.
// If channel is deleted (no mapping), or it is still the same channel
// (same Medium) on which we got the Exception::kIo, we terminate the loop.
+ NEARBY_LOG(INFO, "Started worker loop name=%s, endpoint=%s",
+ runnable_name.c_str(), endpoint_id.c_str());
Medium last_failed_medium = Medium::UNKNOWN_MEDIUM;
while (true) {
// It's important to keep re-fetching the EndpointChannel for an endpoint
@@ -271,7 +273,7 @@
EndpointManager::~EndpointManager() {
NEARBY_LOG(INFO, "EndpointManager going down");
CountDownLatch latch(1);
- RunOnEndpointManagerThread([this, &latch]() {
+ RunOnEndpointManagerThread("bring-down-endpoints", [this, &latch]() {
NEARBY_LOG(INFO, "Bringing down endpoints");
for (auto& item : endpoints_) {
const std::string& endpoint_id = item.first;
@@ -392,9 +394,11 @@
// Instead, we release() a pointer, and pass a raw pointer, which is copyalbe.
// We ignore the risk of job not scheduled (and an associated risk of memory
// leak), because this may only happen during service shutdown.
- RunOnEndpointManagerThread([this, client, channel = channel.release(),
- &endpoint_id, &info, &options, &listener,
- &latch]() {
+ RunOnEndpointManagerThread("register-endpoint", [this, client,
+ channel = channel.release(),
+ &endpoint_id, &info,
+ &options, &listener,
+ &latch]() {
if (endpoints_.contains(endpoint_id)) {
NEARBY_LOG(WARNING, "Registing duplicate endpoint %s",
endpoint_id.c_str());
@@ -473,7 +477,8 @@
const std::string& endpoint_id) {
NEARBY_LOG(ERROR, "UnregisterEndpoint for endpoint %s", endpoint_id.c_str());
CountDownLatch latch(1);
- RunOnEndpointManagerThread([this, client, endpoint_id, &latch]() {
+ RunOnEndpointManagerThread("unregister-endpoint", [this, client, endpoint_id,
+ &latch]() {
RemoveEndpoint(client, endpoint_id,
client->IsConnectedToEndpoint(endpoint_id));
latch.CountDown();
@@ -509,7 +514,7 @@
void EndpointManager::DiscardEndpoint(ClientProxy* client,
const std::string& endpoint_id) {
NEARBY_LOG(ERROR, "DiscardEndpoint for endpoint %s", endpoint_id.c_str());
- RunOnEndpointManagerThread([this, client, endpoint_id]() {
+ RunOnEndpointManagerThread("discard-endpoint", [this, client, endpoint_id]() {
RemoveEndpoint(client, endpoint_id,
/*notify=*/
client->IsConnectedToEndpoint(endpoint_id));
@@ -632,15 +637,16 @@
}
void EndpointManager::StartEndpointReader(Runnable runnable) {
- handlers_executor_.Execute(std::move(runnable));
+ handlers_executor_.Execute("reader", std::move(runnable));
}
void EndpointManager::StartEndpointKeepAliveManager(Runnable runnable) {
- keep_alive_executor_.Execute(std::move(runnable));
+ keep_alive_executor_.Execute("keep-alive", std::move(runnable));
}
-void EndpointManager::RunOnEndpointManagerThread(Runnable runnable) {
- serial_executor_.Execute(std::move(runnable));
+void EndpointManager::RunOnEndpointManagerThread(const std::string& name,
+ Runnable runnable) {
+ serial_executor_.Execute(name, std::move(runnable));
}
} // namespace connections
diff --git a/cpp/core/internal/endpoint_manager.h b/cpp/core/internal/endpoint_manager.h
index 516bea7..d28bf1b 100644
--- a/cpp/core/internal/endpoint_manager.h
+++ b/cpp/core/internal/endpoint_manager.h
@@ -234,7 +234,7 @@
void StartEndpointKeepAliveManager(Runnable runnable);
// Executes all jobs sequentially, on a serial_executor_.
- void RunOnEndpointManagerThread(Runnable runnable);
+ void RunOnEndpointManagerThread(const std::string& name, Runnable runnable);
EndpointChannelManager* channel_manager_;
diff --git a/cpp/core/internal/mediums/bluetooth_classic.cc b/cpp/core/internal/mediums/bluetooth_classic.cc
index 9efff55..6f870b1 100644
--- a/cpp/core/internal/mediums/bluetooth_classic.cc
+++ b/cpp/core/internal/mediums/bluetooth_classic.cc
@@ -271,19 +271,20 @@
// Start the accept loop on a dedicated thread - this stays alive and
// listening for new incoming connections until StopAcceptingConnections() is
// invoked.
- accept_loops_runner_.Execute([callback = std::move(callback),
- server_socket = std::move(owned_socket),
- service_name]() mutable {
- while (true) {
- BluetoothSocket client_socket = server_socket.Accept();
- if (!client_socket.IsValid()) {
- server_socket.Close();
- break;
- }
+ accept_loops_runner_.Execute(
+ "bt-accept",
+ [callback = std::move(callback), server_socket = std::move(owned_socket),
+ service_name]() mutable {
+ while (true) {
+ BluetoothSocket client_socket = server_socket.Accept();
+ if (!client_socket.IsValid()) {
+ server_socket.Close();
+ break;
+ }
- callback.accepted_cb(std::move(client_socket));
- }
- });
+ callback.accepted_cb(std::move(client_socket));
+ }
+ });
return true;
}
diff --git a/cpp/core/internal/mediums/webrtc.cc b/cpp/core/internal/mediums/webrtc.cc
index 047e1af..0d40747 100644
--- a/cpp/core/internal/mediums/webrtc.cc
+++ b/cpp/core/internal/mediums/webrtc.cc
@@ -266,7 +266,7 @@
// This allows a remote device to message us over Tachyon.
auto signaling_complete_callback = [this, &socket_future](bool success) {
if (!success) {
- OffloadFromThread([&socket_future]() {
+ OffloadFromThread("rtc-signaling-fail", [&socket_future]() {
socket_future.SetException({Exception::kFailed});
});
}
@@ -388,7 +388,7 @@
void WebRtc::OnSignalingMessage(const std::string& service_id,
const ByteArray& message) {
- OffloadFromThread([this, service_id, message]() {
+ OffloadFromThread("rtc-on-signaling-message", [this, service_id, message]() {
ProcessTachyonInboxMessage(service_id, message);
});
}
@@ -399,7 +399,7 @@
return;
}
- OffloadFromThread([this, service_id]() {
+ OffloadFromThread("rtc-on-signaling-complete", [this, service_id]() {
MutexLock lock(&mutex_);
const auto& info_entry = accepting_connections_info_.find(service_id);
if (info_entry == accepting_connections_info_.end()) {
@@ -661,8 +661,9 @@
// Transform the DataChannel into a socket.
auto socket = std::make_unique<WebRtcSocket>("WebRtcSocket", data_channel);
socket->SetOnSocketClosedListener({[this, remote_peer_id]() {
- OffloadFromThread(
- [this, remote_peer_id]() { ProcessDataChannelClosed(remote_peer_id); });
+ OffloadFromThread("rtc-socket-closed-cb", [this, remote_peer_id]() {
+ ProcessDataChannelClosed(remote_peer_id);
+ });
}});
WebRtcSocketWrapper wrapper = WebRtcSocketWrapper(std::move(socket));
@@ -745,6 +746,7 @@
::location::nearby::mediums::IceCandidate encoded_ice_candidate =
webrtc_frames::EncodeIceCandidate(*ice_candidate);
OffloadFromThread(
+ "rtc-ice-candidates",
[this, service_id, remote_peer_id, encoded_ice_candidate]() {
ProcessLocalIceCandidate(service_id, remote_peer_id,
encoded_ice_candidate);
@@ -756,6 +758,7 @@
remote_peer_id](rtc::scoped_refptr<webrtc::DataChannelInterface>
data_channel) {
OffloadFromThread(
+ "rtc-channel-created",
[this, service_id, remote_peer_id, data_channel]() {
ProcessDataChannelCreated(service_id, remote_peer_id,
data_channel);
@@ -763,17 +766,19 @@
}},
.data_channel_message_received_cb = {[this, remote_peer_id](
const ByteArray& message) {
- OffloadFromThread([this, remote_peer_id, message]() {
- ProcessDataChannelMessage(remote_peer_id, message);
- });
+ OffloadFromThread(
+ "rtc-channel-messsage", [this, remote_peer_id, message]() {
+ ProcessDataChannelMessage(remote_peer_id, message);
+ });
}},
.data_channel_buffered_amount_changed_cb = {[this, remote_peer_id]() {
- OffloadFromThread([this, remote_peer_id]() {
- ProcessDataChannelBufferAmountChanged(remote_peer_id);
- });
+ OffloadFromThread(
+ "rtc-channel-buffer-change", [this, remote_peer_id]() {
+ ProcessDataChannelBufferAmountChanged(remote_peer_id);
+ });
}},
.data_channel_closed_cb = {[this, remote_peer_id]() {
- OffloadFromThread([this, remote_peer_id]() {
+ OffloadFromThread("rtc-channel-closed", [this, remote_peer_id]() {
ProcessDataChannelClosed(remote_peer_id);
});
}},
@@ -796,8 +801,8 @@
}
}
-void WebRtc::OffloadFromThread(Runnable runnable) {
- single_thread_executor_.Execute(std::move(runnable));
+void WebRtc::OffloadFromThread(const std::string& name, Runnable runnable) {
+ single_thread_executor_.Execute(name, std::move(runnable));
}
} // namespace mediums
diff --git a/cpp/core/internal/mediums/webrtc.h b/cpp/core/internal/mediums/webrtc.h
index ac3ae2c..7772b8e 100644
--- a/cpp/core/internal/mediums/webrtc.h
+++ b/cpp/core/internal/mediums/webrtc.h
@@ -241,7 +241,7 @@
void RestartTachyonReceiveMessages(const std::string& service_id)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
- void OffloadFromThread(Runnable runnable);
+ void OffloadFromThread(const std::string& name, Runnable runnable);
Mutex mutex_;
diff --git a/cpp/core/internal/p2p_cluster_pcp_handler.cc b/cpp/core/internal/p2p_cluster_pcp_handler.cc
index f8cef07..de79374 100644
--- a/cpp/core/internal/p2p_cluster_pcp_handler.cc
+++ b/cpp/core/internal/p2p_cluster_pcp_handler.cc
@@ -206,155 +206,168 @@
void P2pClusterPcpHandler::BluetoothDeviceDiscoveredHandler(
ClientProxy* client, const std::string& service_id,
BluetoothDevice device) {
- RunOnPcpHandlerThread([this, client, service_id,
- device]() RUN_ON_PCP_HANDLER_THREAD() {
- // Make sure we are still discovering before proceeding.
- if (!client->IsDiscovering()) {
- NEARBY_LOG(INFO,
- "BT discovery handler (FOUND) [client=%p, service=%s]: not "
- "in discovery mode",
- client, service_id.c_str());
- return;
- }
+ RunOnPcpHandlerThread(
+ "p2p-bt-device-discovered",
+ [this, client, service_id, device]()
+ RUN_ON_PCP_HANDLER_THREAD() {
+ // Make sure we are still discovering before proceeding.
+ if (!client->IsDiscovering()) {
+ NEARBY_LOG(
+ INFO,
+ "BT discovery handler (FOUND) [client=%p, service=%s]: not "
+ "in discovery mode",
+ client, service_id.c_str());
+ return;
+ }
- // Parse the Bluetooth device name.
- const std::string device_name_string = device.GetName();
- BluetoothDeviceName device_name(device_name_string);
+ // Parse the Bluetooth device name.
+ const std::string device_name_string = device.GetName();
+ BluetoothDeviceName device_name(device_name_string);
- // Make sure the Bluetooth device name points to a valid
- // endpoint we're discovering.
- if (!IsRecognizedBluetoothEndpoint(device_name_string, service_id,
- device_name))
- return;
+ // Make sure the Bluetooth device name points to a valid
+ // endpoint we're discovering.
+ if (!IsRecognizedBluetoothEndpoint(device_name_string, service_id,
+ device_name))
+ return;
- // Report the discovered endpoint to the client.
- NEARBY_LOGS(INFO)
- << "Invoking BasePcpHandler::OnEndpointFound() for BT service="
- << service_id << "; id=" << device_name.GetEndpointId() << "; name="
- << absl::BytesToHexString(device_name.GetEndpointInfo().data());
- OnEndpointFound(
- client, std::make_shared<BluetoothEndpoint>(BluetoothEndpoint{
+ // Report the discovered endpoint to the client.
+ NEARBY_LOGS(INFO)
+ << "Invoking BasePcpHandler::OnEndpointFound() for BT service="
+ << service_id << "; id=" << device_name.GetEndpointId()
+ << "; name="
+ << absl::BytesToHexString(device_name.GetEndpointInfo().data());
+ OnEndpointFound(
+ client,
+ std::make_shared<BluetoothEndpoint>(BluetoothEndpoint{
{device_name.GetEndpointId(), device_name.GetEndpointInfo(),
service_id, proto::connections::Medium::BLUETOOTH,
device_name.GetWebRtcState()},
device,
}));
- });
+ });
}
void P2pClusterPcpHandler::BluetoothNameChangedHandler(
ClientProxy* client, const std::string& service_id,
BluetoothDevice device) {
- RunOnPcpHandlerThread([this, client, service_id,
- device]() RUN_ON_PCP_HANDLER_THREAD() {
- // Make sure we are still discovering before proceeding.
- if (!client->IsDiscovering()) {
- NEARBY_LOG(INFO,
- "BT discovery handler (CHANGED) [client=%p, service=%s]: not "
- "in discovery mode",
- client, service_id.c_str());
- return;
- }
+ RunOnPcpHandlerThread(
+ "p2p-bt-name-changed",
+ [this, client, service_id, device]() RUN_ON_PCP_HANDLER_THREAD() {
+ // Make sure we are still discovering before proceeding.
+ if (!client->IsDiscovering()) {
+ NEARBY_LOG(
+ INFO,
+ "BT discovery handler (CHANGED) [client=%p, service=%s]: not "
+ "in discovery mode",
+ client, service_id.c_str());
+ return;
+ }
- // Parse the Bluetooth device name.
- const std::string device_name_string = device.GetName();
- BluetoothDeviceName device_name(device_name_string);
- NEARBY_LOG(INFO,
- "BT discovery handler (CHANGED) [client=%p, service=%s]: "
- "processing new name %s",
- client, service_id.c_str(), device_name_string.c_str());
+ // Parse the Bluetooth device name.
+ const std::string device_name_string = device.GetName();
+ BluetoothDeviceName device_name(device_name_string);
+ NEARBY_LOG(INFO,
+ "BT discovery handler (CHANGED) [client=%p, service=%s]: "
+ "processing new name %s",
+ client, service_id.c_str(), device_name_string.c_str());
- // By this point, the BluetoothDevice passed to us has a different name than
- // what we may have discovered before. We need to iterate over the found
- // BluetoothEndpoints and compare their addresses to see the devices are the
- // same. We are not guaranteed to discover a match, since the old name may
- // not have been formatted for Nearby Connections.
- for (auto endpoint :
- GetDiscoveredEndpoints(proto::connections::Medium::BLUETOOTH)) {
- BluetoothEndpoint* bluetoothEndpoint =
- static_cast<BluetoothEndpoint*>(endpoint);
- NEARBY_LOG(INFO,
- "BT discovery handler (CHANGED) [client=%p, service=%s]: "
- "comparing MAC addresses with existing endpoint %s. They have "
- "MAC address %s and the new endpoint has MAC address %s.",
- client, service_id.c_str(),
- bluetoothEndpoint->bluetooth_device.GetName().c_str(),
- bluetoothEndpoint->bluetooth_device.GetMacAddress().c_str(),
- device.GetMacAddress().c_str());
- if (bluetoothEndpoint->bluetooth_device.GetMacAddress() ==
- device.GetMacAddress()) {
- // Report the BluetoothEndpoint as lost to the client.
- NEARBY_LOG(
- INFO,
- "BT discovery handler (LOST) [client=%p, service=%s]: report "
- "to client",
- client, service_id.c_str());
- OnEndpointLost(client, *endpoint);
- break;
- }
- }
+ // By this point, the BluetoothDevice passed to us has a different name
+ // than what we may have discovered before. We need to iterate over the
+ // found BluetoothEndpoints and compare their addresses to see the
+ // devices are the same. We are not guaranteed to discover a match,
+ // since the old name may not have been formatted for Nearby
+ // Connections.
+ for (auto endpoint :
+ GetDiscoveredEndpoints(proto::connections::Medium::BLUETOOTH)) {
+ BluetoothEndpoint* bluetoothEndpoint =
+ static_cast<BluetoothEndpoint*>(endpoint);
+ NEARBY_LOG(
+ INFO,
+ "BT discovery handler (CHANGED) [client=%p, service=%s]: "
+ "comparing MAC addresses with existing endpoint %s. They have "
+ "MAC address %s and the new endpoint has MAC address %s.",
+ client, service_id.c_str(),
+ bluetoothEndpoint->bluetooth_device.GetName().c_str(),
+ bluetoothEndpoint->bluetooth_device.GetMacAddress().c_str(),
+ device.GetMacAddress().c_str());
+ if (bluetoothEndpoint->bluetooth_device.GetMacAddress() ==
+ device.GetMacAddress()) {
+ // Report the BluetoothEndpoint as lost to the client.
+ NEARBY_LOG(
+ INFO,
+ "BT discovery handler (LOST) [client=%p, service=%s]: report "
+ "to client",
+ client, service_id.c_str());
+ OnEndpointLost(client, *endpoint);
+ break;
+ }
+ }
- // Make sure the Bluetooth device name points to a valid
- // endpoint we're discovering.
- if (!IsRecognizedBluetoothEndpoint(device_name_string, service_id,
- device_name)) {
- NEARBY_LOG(INFO,
- "BT discovery handler (CHANGED) [client=%p, service=%s]: The "
- "new name is not recognized. Ignoring.",
- client, service_id.c_str());
- return;
- }
+ // Make sure the Bluetooth device name points to a valid
+ // endpoint we're discovering.
+ if (!IsRecognizedBluetoothEndpoint(device_name_string, service_id,
+ device_name)) {
+ NEARBY_LOG(
+ INFO,
+ "BT discovery handler (CHANGED) [client=%p, service=%s]: The "
+ "new name is not recognized. Ignoring.",
+ client, service_id.c_str());
+ return;
+ }
- // Report the discovered endpoint to the client.
- NEARBY_LOGS(INFO)
- << "Invoking BasePcpHandler::OnEndpointFound() for BT service="
- << service_id << "; id=" << device_name.GetEndpointId() << "; name="
- << absl::BytesToHexString(device_name.GetEndpointInfo().data());
- OnEndpointFound(
- client, std::make_shared<BluetoothEndpoint>(BluetoothEndpoint{
- {device_name.GetEndpointId(), device_name.GetEndpointInfo(),
- service_id, proto::connections::Medium::BLUETOOTH,
- device_name.GetWebRtcState()},
- device,
- }));
- });
+ // Report the discovered endpoint to the client.
+ NEARBY_LOGS(INFO)
+ << "Invoking BasePcpHandler::OnEndpointFound() for BT service="
+ << service_id << "; id=" << device_name.GetEndpointId() << "; name="
+ << absl::BytesToHexString(device_name.GetEndpointInfo().data());
+ OnEndpointFound(
+ client,
+ std::make_shared<BluetoothEndpoint>(BluetoothEndpoint{
+ {device_name.GetEndpointId(), device_name.GetEndpointInfo(),
+ service_id, proto::connections::Medium::BLUETOOTH,
+ device_name.GetWebRtcState()},
+ device,
+ }));
+ });
}
void P2pClusterPcpHandler::BluetoothDeviceLostHandler(
ClientProxy* client, const std::string& service_id,
BluetoothDevice& device) {
const std::string& device_name_string = device.GetName();
- RunOnPcpHandlerThread([this, client, service_id,
- device_name_string]() RUN_ON_PCP_HANDLER_THREAD() {
- // Make sure we are still discovering before proceeding.
- if (!client->IsDiscovering()) {
- NEARBY_LOG(INFO,
- "BT discovery handler (LOST) [client=%p, service=%s]: not "
- "in discovery mode",
- client, service_id.c_str());
- return;
- }
+ RunOnPcpHandlerThread(
+ "p2p-bt-device-lost", [this, client, service_id,
+ device_name_string]() RUN_ON_PCP_HANDLER_THREAD() {
+ // Make sure we are still discovering before proceeding.
+ if (!client->IsDiscovering()) {
+ NEARBY_LOG(INFO,
+ "BT discovery handler (LOST) [client=%p, service=%s]: not "
+ "in discovery mode",
+ client, service_id.c_str());
+ return;
+ }
- // Parse the Bluetooth device name.
- BluetoothDeviceName device_name(device_name_string);
+ // Parse the Bluetooth device name.
+ BluetoothDeviceName device_name(device_name_string);
- // Make sure the Bluetooth device name points to a valid
- // endpoint we're discovering.
- if (!IsRecognizedBluetoothEndpoint(device_name_string, service_id,
- device_name))
- return;
+ // Make sure the Bluetooth device name points to a valid
+ // endpoint we're discovering.
+ if (!IsRecognizedBluetoothEndpoint(device_name_string, service_id,
+ device_name))
+ return;
- // Report the BluetoothEndpoint as lost to the client.
- NEARBY_LOG(INFO,
- "BT discovery handler (LOST) [client=%p, service=%s]: report "
- "to client",
- client, service_id.c_str());
- OnEndpointLost(client,
- DiscoveredEndpoint{device_name.GetEndpointId(),
- device_name.GetEndpointInfo(), service_id,
- proto::connections::Medium::BLUETOOTH,
- WebRtcState::kUndefined});
- });
+ // Report the BluetoothEndpoint as lost to the client.
+ NEARBY_LOG(
+ INFO,
+ "BT discovery handler (LOST) [client=%p, service=%s]: report "
+ "to client",
+ client, service_id.c_str());
+ OnEndpointLost(client, DiscoveredEndpoint{
+ device_name.GetEndpointId(),
+ device_name.GetEndpointInfo(), service_id,
+ proto::connections::Medium::BLUETOOTH,
+ WebRtcState::kUndefined});
+ });
}
bool P2pClusterPcpHandler::IsRecognizedBleEndpoint(
@@ -408,76 +421,81 @@
ClientProxy* client, BlePeripheral& peripheral,
const std::string& service_id, const ByteArray& advertisement_bytes,
bool fast_advertisement) {
- RunOnPcpHandlerThread([this, client, &peripheral, service_id,
- advertisement_bytes,
- fast_advertisement]() RUN_ON_PCP_HANDLER_THREAD() {
- // Make sure we are still discovering before proceeding.
- if (!client->IsDiscovering()) {
- NEARBY_LOG(INFO,
- "Ble scanning handler (FOUND) [client=%p, service_id=%s]: not "
- "in discovery mode",
- client, service_id.c_str());
- return;
- }
+ RunOnPcpHandlerThread(
+ "p2p-ble-device-discovered",
+ [this, client, &peripheral, service_id, advertisement_bytes,
+ fast_advertisement]() RUN_ON_PCP_HANDLER_THREAD() {
+ // Make sure we are still discovering before proceeding.
+ if (!client->IsDiscovering()) {
+ NEARBY_LOG(
+ INFO,
+ "Ble scanning handler (FOUND) [client=%p, service_id=%s]: not "
+ "in discovery mode",
+ client, service_id.c_str());
+ return;
+ }
- // Parse the BLE advertisement bytes.
- BleAdvertisement advertisement(fast_advertisement, advertisement_bytes);
+ // Parse the BLE advertisement bytes.
+ BleAdvertisement advertisement(fast_advertisement, advertisement_bytes);
- // Make sure the BLE advertisement points to a valid
- // endpoint we're discovering.
- if (!IsRecognizedBleEndpoint(service_id, advertisement)) return;
+ // Make sure the BLE advertisement points to a valid
+ // endpoint we're discovering.
+ if (!IsRecognizedBleEndpoint(service_id, advertisement)) return;
- // Store all the state we need to be able to re-create a BleEndpoint
- // in BlePeripheralLostHandler, since that isn't privy to
- // the bytes of the ble advertisement itself.
- found_ble_endpoints_.emplace(
- peripheral.GetName(),
- BleEndpointState(advertisement.GetEndpointId(),
- advertisement.GetEndpointInfo()));
+ // Store all the state we need to be able to re-create a BleEndpoint
+ // in BlePeripheralLostHandler, since that isn't privy to
+ // the bytes of the ble advertisement itself.
+ found_ble_endpoints_.emplace(
+ peripheral.GetName(),
+ BleEndpointState(advertisement.GetEndpointId(),
+ advertisement.GetEndpointInfo()));
- // Report the discovered endpoint to the client.
- NEARBY_LOGS(INFO)
- << "Invoking BasePcpHandler::OnEndpointFound() for Ble service="
- << service_id << "; id=" << advertisement.GetEndpointId() << "; name="
- << absl::BytesToHexString(advertisement.GetEndpointInfo().data());
- OnEndpointFound(client, std::make_shared<BleEndpoint>(BleEndpoint{
- {advertisement.GetEndpointId(),
- advertisement.GetEndpointInfo(), service_id,
- proto::connections::Medium::BLE,
- advertisement.GetWebRtcState()},
- peripheral,
- }));
+ // Report the discovered endpoint to the client.
+ NEARBY_LOGS(INFO)
+ << "Invoking BasePcpHandler::OnEndpointFound() for Ble service="
+ << service_id << "; id=" << advertisement.GetEndpointId()
+ << "; name="
+ << absl::BytesToHexString(advertisement.GetEndpointInfo().data());
+ OnEndpointFound(
+ client,
+ std::make_shared<BleEndpoint>(BleEndpoint{
+ {advertisement.GetEndpointId(), advertisement.GetEndpointInfo(),
+ service_id, proto::connections::Medium::BLE,
+ advertisement.GetWebRtcState()},
+ peripheral,
+ }));
- // Make sure we can connect to this device via Classic Bluetooth.
- std::string remote_bluetooth_mac_address =
- advertisement.GetBluetoothMacAddress();
- if (remote_bluetooth_mac_address.empty()) {
- NEARBY_LOGS(INFO)
- << "No Bluetooth Classic MAC address found in advertisement";
- return;
- }
+ // Make sure we can connect to this device via Classic Bluetooth.
+ std::string remote_bluetooth_mac_address =
+ advertisement.GetBluetoothMacAddress();
+ if (remote_bluetooth_mac_address.empty()) {
+ NEARBY_LOGS(INFO)
+ << "No Bluetooth Classic MAC address found in advertisement";
+ return;
+ }
- BluetoothDevice remote_bluetooth_device =
- bluetooth_medium_.GetRemoteDevice(remote_bluetooth_mac_address);
- if (!remote_bluetooth_device.IsValid()) {
- NEARBY_LOGS(INFO) << "A valid Bluetooth device could not be derived from "
- "the MAC address "
- << remote_bluetooth_mac_address;
- return;
- }
+ BluetoothDevice remote_bluetooth_device =
+ bluetooth_medium_.GetRemoteDevice(remote_bluetooth_mac_address);
+ if (!remote_bluetooth_device.IsValid()) {
+ NEARBY_LOGS(INFO)
+ << "A valid Bluetooth device could not be derived from "
+ "the MAC address "
+ << remote_bluetooth_mac_address;
+ return;
+ }
- OnEndpointFound(client,
- std::make_shared<BluetoothEndpoint>(BluetoothEndpoint{
- {
- advertisement.GetEndpointId(),
- advertisement.GetEndpointInfo(),
- service_id,
- proto::connections::Medium::BLUETOOTH,
- advertisement.GetWebRtcState(),
- },
- remote_bluetooth_device,
- }));
- });
+ OnEndpointFound(client,
+ std::make_shared<BluetoothEndpoint>(BluetoothEndpoint{
+ {
+ advertisement.GetEndpointId(),
+ advertisement.GetEndpointInfo(),
+ service_id,
+ proto::connections::Medium::BLUETOOTH,
+ advertisement.GetWebRtcState(),
+ },
+ remote_bluetooth_device,
+ }));
+ });
}
void P2pClusterPcpHandler::BlePeripheralLostHandler(
@@ -486,38 +504,40 @@
std::string peripheral_name = peripheral.GetName();
NEARBY_LOG(INFO, "Ble: [LOST, SCHED] peripheral_name=%s",
peripheral_name.c_str());
- RunOnPcpHandlerThread([this, client, service_id,
- &peripheral]() RUN_ON_PCP_HANDLER_THREAD() {
- // Make sure we are still discovering before proceeding.
- if (!client->IsDiscovering()) {
- NEARBY_LOG(INFO,
- "Ble scanning handler (LOST) [client=%p, service_id=%s]: not "
- "in scanning mode",
- client, service_id.c_str());
- return;
- }
+ RunOnPcpHandlerThread(
+ "p2p-ble-device-lost",
+ [this, client, service_id, &peripheral]() RUN_ON_PCP_HANDLER_THREAD() {
+ // Make sure we are still discovering before proceeding.
+ if (!client->IsDiscovering()) {
+ NEARBY_LOG(
+ INFO,
+ "Ble scanning handler (LOST) [client=%p, service_id=%s]: not "
+ "in scanning mode",
+ client, service_id.c_str());
+ return;
+ }
- // Remove this BlePeripheral from found_ble_endpoints_, and
- // report the endpoint as lost to the client.
- auto item = found_ble_endpoints_.find(peripheral.GetName());
- if (item != found_ble_endpoints_.end()) {
- BleEndpointState ble_endpoint_state(item->second);
- found_ble_endpoints_.erase(item);
+ // Remove this BlePeripheral from found_ble_endpoints_, and
+ // report the endpoint as lost to the client.
+ auto item = found_ble_endpoints_.find(peripheral.GetName());
+ if (item != found_ble_endpoints_.end()) {
+ BleEndpointState ble_endpoint_state(item->second);
+ found_ble_endpoints_.erase(item);
- // Report the discovered endpoint to the client.
- NEARBY_LOG(INFO,
- "Ble scanning handler (LOST) [client=%p, "
- "service_id=%s]: report to client",
- client, service_id.c_str());
- OnEndpointLost(client, DiscoveredEndpoint{
- ble_endpoint_state.endpoint_id,
- ble_endpoint_state.endpoint_info,
- service_id,
- proto::connections::Medium::BLE,
- WebRtcState::kUndefined,
- });
- }
- });
+ // Report the discovered endpoint to the client.
+ NEARBY_LOG(INFO,
+ "Ble scanning handler (LOST) [client=%p, "
+ "service_id=%s]: report to client",
+ client, service_id.c_str());
+ OnEndpointLost(client, DiscoveredEndpoint{
+ ble_endpoint_state.endpoint_id,
+ ble_endpoint_state.endpoint_info,
+ service_id,
+ proto::connections::Medium::BLE,
+ WebRtcState::kUndefined,
+ });
+ }
+ });
}
bool P2pClusterPcpHandler::IsRecognizedWifiLanEndpoint(
@@ -557,43 +577,46 @@
void P2pClusterPcpHandler::WifiLanServiceDiscoveredHandler(
ClientProxy* client, WifiLanService& wifi_lan_service,
const std::string& service_id) {
- RunOnPcpHandlerThread([this, client, service_id,
- &wifi_lan_service]() RUN_ON_PCP_HANDLER_THREAD() {
- // Make sure we are still discovering before proceeding.
- if (!client->IsDiscovering()) {
- NEARBY_LOG(
- INFO,
- "WifiLan discovery handler (FOUND) [client=%p, service=%s]: not "
- "in discovery mode",
- client, service_id.c_str());
- return;
- }
+ RunOnPcpHandlerThread(
+ "p2p-wifi-service-discovered",
+ [this, client, service_id,
+ &wifi_lan_service]() RUN_ON_PCP_HANDLER_THREAD() {
+ // Make sure we are still discovering before proceeding.
+ if (!client->IsDiscovering()) {
+ NEARBY_LOG(
+ INFO,
+ "WifiLan discovery handler (FOUND) [client=%p, service=%s]: not "
+ "in discovery mode",
+ client, service_id.c_str());
+ return;
+ }
- // Parse the WifiLanServiceInfo.
- WifiLanServiceInfo service_info(wifi_lan_service.GetServiceInfo());
+ // Parse the WifiLanServiceInfo.
+ WifiLanServiceInfo service_info(wifi_lan_service.GetServiceInfo());
- // Make sure the WifiLan service name points to a valid
- // endpoint we're discovering.
- if (!IsRecognizedWifiLanEndpoint(service_id, service_info)) return;
+ // Make sure the WifiLan service name points to a valid
+ // endpoint we're discovering.
+ if (!IsRecognizedWifiLanEndpoint(service_id, service_info)) return;
- // Report the discovered endpoint to the client.
- NEARBY_LOG(
- INFO,
- "Invoking BasePcpHandler::OnEndpointFound() for WifiLan "
- "service_id=%s; endpoint_id=%s; endpoint_info=%s",
- service_id.c_str(), service_info.GetEndpointId().c_str(),
- absl::BytesToHexString(service_info.GetEndpointInfo().data()).c_str());
- OnEndpointFound(client, std::make_shared<WifiLanEndpoint>(WifiLanEndpoint{
- {
- service_info.GetEndpointId(),
- service_info.GetEndpointInfo(),
- service_id,
- proto::connections::Medium::WIFI_LAN,
- service_info.GetWebRtcState(),
- },
- wifi_lan_service,
- }));
- });
+ // Report the discovered endpoint to the client.
+ NEARBY_LOG(INFO,
+ "Invoking BasePcpHandler::OnEndpointFound() for WifiLan "
+ "service_id=%s; endpoint_id=%s; endpoint_info=%s",
+ service_id.c_str(), service_info.GetEndpointId().c_str(),
+ absl::BytesToHexString(service_info.GetEndpointInfo().data())
+ .c_str());
+ OnEndpointFound(client,
+ std::make_shared<WifiLanEndpoint>(WifiLanEndpoint{
+ {
+ service_info.GetEndpointId(),
+ service_info.GetEndpointInfo(),
+ service_id,
+ proto::connections::Medium::WIFI_LAN,
+ service_info.GetWebRtcState(),
+ },
+ wifi_lan_service,
+ }));
+ });
}
void P2pClusterPcpHandler::WifiLanServiceLostHandler(
@@ -603,39 +626,41 @@
NEARBY_LOG(INFO,
"WifiLan: [LOST, SCHED] wifi_lan_service=%p, service_info_name=%s",
&wifi_lan_service, nsd_service_info.GetServiceInfoName().c_str());
- RunOnPcpHandlerThread([this, client, service_id,
- nsd_service_info]() RUN_ON_PCP_HANDLER_THREAD() {
- // Make sure we are still discovering before proceeding.
- if (!client->IsDiscovering()) {
- NEARBY_LOG(
- INFO,
- "WifiLan discovery handler (LOST) [client=%p, service=%s]: not "
- "in discovery mode",
- client, service_id.c_str());
- return;
- }
+ RunOnPcpHandlerThread(
+ "p2p-wifi-service-lost",
+ [this, client, service_id,
+ nsd_service_info]() RUN_ON_PCP_HANDLER_THREAD() {
+ // Make sure we are still discovering before proceeding.
+ if (!client->IsDiscovering()) {
+ NEARBY_LOG(
+ INFO,
+ "WifiLan discovery handler (LOST) [client=%p, service=%s]: not "
+ "in discovery mode",
+ client, service_id.c_str());
+ return;
+ }
- // Parse the WifiLanServiceInfo.
- WifiLanServiceInfo service_info(nsd_service_info);
+ // Parse the WifiLanServiceInfo.
+ WifiLanServiceInfo service_info(nsd_service_info);
- // Make sure the WifiLan service name points to a valid
- // endpoint we're discovering.
- if (!IsRecognizedWifiLanEndpoint(service_id, service_info)) return;
+ // Make sure the WifiLan service name points to a valid
+ // endpoint we're discovering.
+ if (!IsRecognizedWifiLanEndpoint(service_id, service_info)) return;
- // Report the discovered endpoint to the client.
- NEARBY_LOG(
- INFO,
- "WifiLan discovery handler (LOST) [client=%p, service_id=%s]: report "
- "to client",
- client, service_id.c_str());
- OnEndpointLost(client, DiscoveredEndpoint{
- service_info.GetEndpointId(),
- service_info.GetEndpointInfo(),
- service_id,
- proto::connections::Medium::WIFI_LAN,
- WebRtcState::kUndefined,
- });
- });
+ // Report the discovered endpoint to the client.
+ NEARBY_LOG(INFO,
+ "WifiLan discovery handler (LOST) [client=%p, "
+ "service_id=%s]: report "
+ "to client",
+ client, service_id.c_str());
+ OnEndpointLost(client, DiscoveredEndpoint{
+ service_info.GetEndpointId(),
+ service_info.GetEndpointInfo(),
+ service_id,
+ proto::connections::Medium::WIFI_LAN,
+ WebRtcState::kUndefined,
+ });
+ });
}
BasePcpHandler::StartOperationResult P2pClusterPcpHandler::StartDiscoveryImpl(
@@ -830,6 +855,7 @@
return;
}
RunOnPcpHandlerThread(
+ "p2p-bt-on-incoming-connection",
[this, client, local_endpoint_info,
socket =
std::move(socket)]() RUN_ON_PCP_HANDLER_THREAD() mutable {
@@ -959,6 +985,7 @@
return;
}
RunOnPcpHandlerThread(
+ "p2p-ble-on-incoming-connection",
[this, client, local_endpoint_info, service_id,
socket = std::move(socket)]()
RUN_ON_PCP_HANDLER_THREAD() mutable {
@@ -999,6 +1026,7 @@
return;
}
RunOnPcpHandlerThread(
+ "p2p-bt-on-incoming-connection",
[this, client, local_endpoint_info,
socket = std::move(socket)]()
RUN_ON_PCP_HANDLER_THREAD() mutable {
@@ -1143,6 +1171,7 @@
return;
}
RunOnPcpHandlerThread(
+ "p2p-wifi-on-incoming-connection",
[this, client, local_endpoint_info,
socket = std::move(socket)]()
RUN_ON_PCP_HANDLER_THREAD() mutable {
@@ -1272,6 +1301,7 @@
}
RunOnPcpHandlerThread(
+ "p2p-rtc-on-incoming-connection",
[this, client,
socket = std::move(socket)]() RUN_ON_PCP_HANDLER_THREAD() {
std::string remote_device_name = "WebRtcSocket";
diff --git a/cpp/core/internal/payload_manager.cc b/cpp/core/internal/payload_manager.cc
index 29b12e0..9f28697 100644
--- a/cpp/core/internal/payload_manager.cc
+++ b/cpp/core/internal/payload_manager.cc
@@ -281,6 +281,7 @@
CountDownLatch stop_latch(1);
// Clear our tracked pending payloads.
RunOnStatusUpdateThread(
+ "~payload-manager",
[this, &stop_latch]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() {
NEARBY_LOG(INFO, "PayloadManager: stop tracking payloads; self=%p",
this);
@@ -334,7 +335,8 @@
Payload::Type payload_type = payload.GetType();
Payload::Id payload_id =
CreateOutgoingPayload(std::move(payload), endpoint_ids);
- executor->Execute([this, client, endpoint_ids, payload_id, payload_type]() {
+ executor->Execute("send-payload", [this, client, endpoint_ids, payload_id,
+ payload_type]() {
if (shutdown_.Get()) return;
PendingPayload* pending_payload = GetPayload(payload_id);
if (!pending_payload) {
@@ -355,7 +357,8 @@
should_continue = SendPayloadLoop(client, *pending_payload,
payload_header, next_chunk_offset);
}
- RunOnStatusUpdateThread([this, payload_id]()
+ RunOnStatusUpdateThread("destroy-payload",
+ [this, payload_id]()
RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() {
DestroyPendingPayload(payload_id);
});
@@ -430,6 +433,7 @@
return;
}
RunOnStatusUpdateThread(
+ "payload-manager-on-disconnect",
[this, client, endpoint_id, barrier]()
RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() mutable {
// Iterate through all our payloads and look for payloads associated
@@ -586,62 +590,65 @@
const PayloadTransferFrame::PayloadHeader& payload_header,
std::int64_t num_bytes_successfully_transferred,
proto::connections::PayloadStatus status) {
- RunOnStatusUpdateThread([this, client, finished_endpoint_ids, payload_header,
- num_bytes_successfully_transferred,
- status]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() {
- // Make sure we're still tracking this payload.
- PendingPayload* pending_payload = GetPayload(payload_header.id());
- if (!pending_payload) {
- return;
- }
+ RunOnStatusUpdateThread(
+ "outgoing-payload-callbacks",
+ [this, client, finished_endpoint_ids, payload_header,
+ num_bytes_successfully_transferred,
+ status]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() {
+ // Make sure we're still tracking this payload.
+ PendingPayload* pending_payload = GetPayload(payload_header.id());
+ if (!pending_payload) {
+ return;
+ }
- PayloadProgressInfo update{
- payload_header.id(),
- PayloadManager::PayloadStatusToTransferUpdateStatus(status),
- payload_header.total_size(), num_bytes_successfully_transferred};
- for (const auto& endpoint_id : finished_endpoint_ids) {
- // Skip sending notifications if we have stopped tracking this
- // endpoint.
- if (!pending_payload->GetEndpoint(endpoint_id)) {
- continue;
- }
+ PayloadProgressInfo update{
+ payload_header.id(),
+ PayloadManager::PayloadStatusToTransferUpdateStatus(status),
+ payload_header.total_size(), num_bytes_successfully_transferred};
+ for (const auto& endpoint_id : finished_endpoint_ids) {
+ // Skip sending notifications if we have stopped tracking this
+ // endpoint.
+ if (!pending_payload->GetEndpoint(endpoint_id)) {
+ continue;
+ }
- // Notify the client.
- client->OnPayloadProgress(endpoint_id, update);
- }
+ // Notify the client.
+ client->OnPayloadProgress(endpoint_id, update);
+ }
- // Remove these endpoints from our tracking list for this payload.
- pending_payload->RemoveEndpoints(finished_endpoint_ids);
+ // Remove these endpoints from our tracking list for this payload.
+ pending_payload->RemoveEndpoints(finished_endpoint_ids);
- // Close the payload if no endpoints remain.
- if (pending_payload->GetEndpoints().empty()) {
- pending_payload->Close();
- }
- });
+ // Close the payload if no endpoints remain.
+ if (pending_payload->GetEndpoints().empty()) {
+ pending_payload->Close();
+ }
+ });
}
void PayloadManager::SendClientCallbacksForFinishedIncomingPayload(
ClientProxy* client, const std::string& endpoint_id,
const PayloadTransferFrame::PayloadHeader& payload_header,
std::int64_t offset_bytes, proto::connections::PayloadStatus status) {
- RunOnStatusUpdateThread([this, client, endpoint_id, payload_header,
- offset_bytes,
- status]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() {
- // Make sure we're still tracking this payload.
- PendingPayload* pending_payload = GetPayload(payload_header.id());
- if (!pending_payload) {
- return;
- }
+ RunOnStatusUpdateThread(
+ "incoming-payload-callbacks",
+ [this, client, endpoint_id, payload_header, offset_bytes,
+ status]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() {
+ // Make sure we're still tracking this payload.
+ PendingPayload* pending_payload = GetPayload(payload_header.id());
+ if (!pending_payload) {
+ return;
+ }
- // Unless we never started tracking this payload (meaning we failed to
- // even create the InternalPayload), notify the client (and close it).
- PayloadProgressInfo update{
- payload_header.id(),
- PayloadManager::PayloadStatusToTransferUpdateStatus(status),
- payload_header.total_size(), offset_bytes};
- NotifyClientOfIncomingPayloadProgressInfo(client, endpoint_id, update);
- DestroyPendingPayload(payload_header.id());
- });
+ // Unless we never started tracking this payload (meaning we failed to
+ // even create the InternalPayload), notify the client (and close it).
+ PayloadProgressInfo update{
+ payload_header.id(),
+ PayloadManager::PayloadStatusToTransferUpdateStatus(status),
+ payload_header.total_size(), offset_bytes};
+ NotifyClientOfIncomingPayloadProgressInfo(client, endpoint_id, update);
+ DestroyPendingPayload(payload_header.id());
+ });
}
void PayloadManager::SendControlMessage(
@@ -734,6 +741,7 @@
std::int32_t payload_chunk_flags, std::int64_t payload_chunk_offset,
std::int64_t payload_chunk_body_size) {
RunOnStatusUpdateThread(
+ "outgoing-chunk-success",
[this, client, endpoint_id, payload_header, payload_chunk_flags,
payload_chunk_offset,
payload_chunk_body_size]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() {
@@ -799,6 +807,7 @@
std::int32_t payload_chunk_flags, std::int64_t payload_chunk_offset,
std::int64_t payload_chunk_body_size) {
RunOnStatusUpdateThread(
+ "incoming-chunk-success",
[this, client, endpoint_id, payload_header, payload_chunk_flags,
payload_chunk_offset,
payload_chunk_body_size]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() {
@@ -858,6 +867,7 @@
// Also, let the client know of this new incoming payload.
RunOnStatusUpdateThread(
+ "process-data-packet",
[to_client, from_endpoint_id, pending_payload]()
RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() {
NEARBY_LOG(INFO,
@@ -1128,8 +1138,9 @@
return close_event_.Await(absl::ZeroDuration()).result();
}
-void PayloadManager::RunOnStatusUpdateThread(std::function<void()> runnable) {
- payload_status_update_executor_.Execute(std::move(runnable));
+void PayloadManager::RunOnStatusUpdateThread(const std::string& name,
+ std::function<void()> runnable) {
+ payload_status_update_executor_.Execute(name, std::move(runnable));
}
/////////////////////////////// PendingPayloads ///////////////////////////////
diff --git a/cpp/core/internal/payload_manager.h b/cpp/core/internal/payload_manager.h
index 9c18756..9fb1630 100644
--- a/cpp/core/internal/payload_manager.h
+++ b/cpp/core/internal/payload_manager.h
@@ -277,7 +277,8 @@
SingleThreadExecutor* GetOutgoingPayloadExecutor(Payload::Type payload_type);
- void RunOnStatusUpdateThread(std::function<void()> runnable);
+ void RunOnStatusUpdateThread(const std::string& name,
+ std::function<void()> runnable);
bool NotifyShutdown() ABSL_LOCKS_EXCLUDED(mutex_);
void DestroyPendingPayload(Payload::Id payload_id)
ABSL_LOCKS_EXCLUDED(mutex_);
diff --git a/cpp/core/internal/service_controller_router.cc b/cpp/core/internal/service_controller_router.cc
index fd98901..70a368f 100644
--- a/cpp/core/internal/service_controller_router.cc
+++ b/cpp/core/internal/service_controller_router.cc
@@ -67,28 +67,31 @@
ClientProxy* client, absl::string_view service_id,
const ConnectionOptions& options, const ConnectionRequestInfo& info,
const ResultCallback& callback) {
- RouteToServiceController([this, client, service_id = std::string(service_id),
- options, info, callback]() {
- Status status = AcquireServiceControllerForClient(client, options.strategy);
- if (!status.Ok()) {
- callback.result_cb(status);
- return;
- }
+ RouteToServiceController(
+ "scr-start-advertising",
+ [this, client, service_id = std::string(service_id), options, info,
+ callback]() {
+ Status status =
+ AcquireServiceControllerForClient(client, options.strategy);
+ if (!status.Ok()) {
+ callback.result_cb(status);
+ return;
+ }
- if (client->IsAdvertising()) {
- callback.result_cb({Status::kAlreadyAdvertising});
- return;
- }
+ if (client->IsAdvertising()) {
+ callback.result_cb({Status::kAlreadyAdvertising});
+ return;
+ }
- status = service_controller_->StartAdvertising(client, service_id, options,
- info);
- callback.result_cb(status);
- });
+ status = service_controller_->StartAdvertising(client, service_id,
+ options, info);
+ callback.result_cb(status);
+ });
}
void ServiceControllerRouter::StopAdvertising(ClientProxy* client,
const ResultCallback& callback) {
- RouteToServiceController([this, client, callback]() {
+ RouteToServiceController("scr-stop-advertising", [this, client, callback]() {
if (ClientHasAcquiredServiceController(client) && client->IsAdvertising()) {
service_controller_->StopAdvertising(client);
}
@@ -101,8 +104,11 @@
const ConnectionOptions& options,
const DiscoveryListener& listener,
const ResultCallback& callback) {
- RouteToServiceController([this, client, service_id = std::string(service_id),
- options, listener, callback]() {
+ RouteToServiceController("scr-start-discovery", [this, client,
+ service_id =
+ std::string(service_id),
+ options, listener,
+ callback]() {
Status status = AcquireServiceControllerForClient(client, options.strategy);
if (!status.Ok()) {
callback.result_cb(status);
@@ -122,7 +128,7 @@
void ServiceControllerRouter::StopDiscovery(ClientProxy* client,
const ResultCallback& callback) {
- RouteToServiceController([this, client, callback]() {
+ RouteToServiceController("scr-stop-discovery", [this, client, callback]() {
if (ClientHasAcquiredServiceController(client) && client->IsDiscovering()) {
service_controller_->StopDiscovery(client);
}
@@ -134,8 +140,10 @@
ClientProxy* client, absl::string_view service_id,
const OutOfBandConnectionMetadata& metadata,
const ResultCallback& callback) {
- RouteToServiceController([this, client, service_id = std::string(service_id),
- metadata, callback]() {
+ RouteToServiceController("scr-inject-endpoint", [this, client,
+ service_id =
+ std::string(service_id),
+ metadata, callback]() {
// Currently, Bluetooth is the only supported medium for endpoint injection.
if (metadata.medium != Medium::BLUETOOTH ||
metadata.remote_bluetooth_mac_address.size() != kMacAddressLength) {
@@ -173,36 +181,38 @@
// CancellationListener as soon as possible.
client->AddCancellationFlag(std::string(endpoint_id));
- RouteToServiceController([this, client,
- endpoint_id = std::string(endpoint_id), info,
- options, callback]() {
- if (!ClientHasAcquiredServiceController(client)) {
- callback.result_cb({Status::kOutOfOrderApiCall});
- return;
- }
+ RouteToServiceController(
+ "scr-request-connection",
+ [this, client, endpoint_id = std::string(endpoint_id), info, options,
+ callback]() {
+ if (!ClientHasAcquiredServiceController(client)) {
+ callback.result_cb({Status::kOutOfOrderApiCall});
+ return;
+ }
- if (client->HasPendingConnectionToEndpoint(endpoint_id) ||
- client->IsConnectedToEndpoint(endpoint_id)) {
- callback.result_cb({Status::kAlreadyConnectedToEndpoint});
- return;
- }
+ if (client->HasPendingConnectionToEndpoint(endpoint_id) ||
+ client->IsConnectedToEndpoint(endpoint_id)) {
+ callback.result_cb({Status::kAlreadyConnectedToEndpoint});
+ return;
+ }
- Status status = service_controller_->RequestConnection(client, endpoint_id,
- info, options);
- if (!status.Ok()) {
- client->CancelEndpoint(endpoint_id);
- }
- callback.result_cb(status);
- });
+ Status status = service_controller_->RequestConnection(
+ client, endpoint_id, info, options);
+ if (!status.Ok()) {
+ client->CancelEndpoint(endpoint_id);
+ }
+ callback.result_cb(status);
+ });
}
void ServiceControllerRouter::AcceptConnection(ClientProxy* client,
absl::string_view endpoint_id,
const PayloadListener& listener,
const ResultCallback& callback) {
- RouteToServiceController([this, client,
- endpoint_id = std::string(endpoint_id), listener,
- callback]() {
+ RouteToServiceController("scr-accept-connection", [this, client,
+ endpoint_id = std::string(
+ endpoint_id),
+ listener, callback]() {
if (!ClientHasAcquiredServiceController(client)) {
callback.result_cb({Status::kOutOfOrderApiCall});
return;
@@ -234,6 +244,7 @@
client->CancelEndpoint(std::string(endpoint_id));
RouteToServiceController(
+ "scr-reject-connection",
[this, client, endpoint_id = std::string(endpoint_id), callback]() {
if (!ClientHasAcquiredServiceController(client)) {
callback.result_cb({Status::kOutOfOrderApiCall});
@@ -264,6 +275,7 @@
ClientProxy* client, absl::string_view endpoint_id,
const ResultCallback& callback) {
RouteToServiceController(
+ "scr-init-bwu",
[this, client, endpoint_id = std::string(endpoint_id), callback]() {
if (!ClientHasAcquiredServiceController(client) ||
!client->IsConnectedToEndpoint(endpoint_id)) {
@@ -293,6 +305,7 @@
std::vector<std::string>(endpoint_ids.begin(), endpoint_ids.end());
RouteToServiceController(
+ "scr-send-payload",
[this, client, shared_payload, endpoints, callback]() {
if (!ClientHasAcquiredServiceController(client)) {
callback.result_cb({Status::kOutOfOrderApiCall});
@@ -318,7 +331,8 @@
void ServiceControllerRouter::CancelPayload(ClientProxy* client,
std::uint64_t payload_id,
const ResultCallback& callback) {
- RouteToServiceController([this, client, payload_id, callback]() {
+ RouteToServiceController("scr-cancel-payload", [this, client, payload_id,
+ callback]() {
if (!ClientHasAcquiredServiceController(client)) {
callback.result_cb({Status::kOutOfOrderApiCall});
return;
@@ -336,6 +350,7 @@
client->CancelEndpoint(std::string(endpoint_id));
RouteToServiceController(
+ "scr-disconnect-endpoint",
[this, client, endpoint_id = std::string(endpoint_id), callback]() {
if (ClientHasAcquiredServiceController(client)) {
if (!client->IsConnectedToEndpoint(endpoint_id) &&
@@ -355,11 +370,12 @@
// without further posting it.
client->CancelAllEndpoints();
- RouteToServiceController([this, client, callback]() {
+ RouteToServiceController("scr-stop-all-endpoints", [this, client,
+ callback]() {
NEARBY_LOGS(INFO) << "Client " << client->GetClientId()
<< " has requested us to stop all endpoints. We will now "
"reset the client.";
- if (ClientHasAcquiredServiceController(client)) {
+ if (ClientHasAcquiredServiceController(client)) {
DoneWithStrategySessionForClient(client);
}
callback.result_cb({Status::kSuccess});
@@ -372,7 +388,8 @@
// without further posting it.
client->CancelAllEndpoints();
- RouteToServiceController([this, client, callback]() {
+ RouteToServiceController("scr-client-disconnecting", [this, client,
+ callback]() {
if (ClientHasAcquiredServiceController(client)) {
DoneWithStrategySessionForClient(client);
NEARBY_LOGS(INFO) << "Client " << client->GetClientId()
@@ -465,8 +482,9 @@
ReleaseServiceControllerForClient(client);
}
-void ServiceControllerRouter::RouteToServiceController(Runnable runnable) {
- serializer_.Execute(std::move(runnable));
+void ServiceControllerRouter::RouteToServiceController(const std::string& name,
+ Runnable runnable) {
+ serializer_.Execute(name, std::move(runnable));
}
bool ServiceControllerRouter::ClientHasConnectionToAtLeastOneEndpoint(
diff --git a/cpp/core/internal/service_controller_router.h b/cpp/core/internal/service_controller_router.h
index 7c28246..62f070f 100644
--- a/cpp/core/internal/service_controller_router.h
+++ b/cpp/core/internal/service_controller_router.h
@@ -106,7 +106,7 @@
static bool ClientHasConnectionToAtLeastOneEndpoint(
ClientProxy* client, const std::vector<std::string>& remote_endpoint_ids);
- void RouteToServiceController(Runnable runnable);
+ void RouteToServiceController(const std::string& name, Runnable runnable);
Status AcquireServiceControllerForClient(ClientProxy* client,
Strategy strategy);
diff --git a/cpp/platform/public/BUILD b/cpp/platform/public/BUILD
index 459584d..58dde04 100644
--- a/cpp/platform/public/BUILD
+++ b/cpp/platform/public/BUILD
@@ -32,9 +32,11 @@
"future.h",
"lockable.h",
"logging.h",
+ "monitored_runnable.h",
"multi_thread_executor.h",
"mutex.h",
"mutex_lock.h",
+ "pending_job_registry.h",
"pipe.h",
"scheduled_executor.h",
"settable_future.h",
diff --git a/cpp/platform/public/monitored_runnable.h b/cpp/platform/public/monitored_runnable.h
new file mode 100644
index 0000000..63e3475
--- /dev/null
+++ b/cpp/platform/public/monitored_runnable.h
@@ -0,0 +1,71 @@
+// Copyright 2020 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef PLATFORM_PUBLIC_MONITORED_RUNNABLE_H_
+#define PLATFORM_PUBLIC_MONITORED_RUNNABLE_H_
+
+#include <utility>
+
+#include "platform/base/runnable.h"
+#include "platform/public/logging.h"
+#include "platform/public/pending_job_registry.h"
+#include "platform/public/system_clock.h"
+#include "absl/time/time.h"
+
+namespace location {
+namespace nearby {
+
+// A runnable with extra logging
+// We log if the task has been waiting long on the executor or if it was running
+// for a long time. The latter isn't always an issue - some tasks are expected
+// to run for longer periods of time (minutes).
+class MonitoredRunnable {
+ public:
+ explicit MonitoredRunnable(Runnable&& runnable) : runnable_{runnable} {}
+ MonitoredRunnable(const std::string& name, Runnable&& runnable)
+ : name_{name}, runnable_{runnable} {
+ PendingJobRegistry::GetInstance().AddPendingJob(name_, post_time_);
+ }
+
+ void operator()() const {
+ auto start_time = SystemClock::ElapsedRealtime();
+ auto start_delay = start_time - post_time_;
+ if (start_delay >= kMinReportedStartDelay) {
+ NEARBY_LOGS(INFO) << "Task: \"" << name_ << "\" started after "
+ << absl::ToInt64Seconds(start_delay) << " seconds";
+ }
+ PendingJobRegistry::GetInstance().RemovePendingJob(name_, post_time_);
+ PendingJobRegistry::GetInstance().AddRunningJob(name_, post_time_);
+ runnable_();
+ auto task_duration = SystemClock::ElapsedRealtime() - start_time;
+ if (task_duration >= kMinReportedTaskDuration) {
+ NEARBY_LOGS(INFO) << "Task: \"" << name_ << "\" finished after "
+ << absl::ToInt64Seconds(task_duration) << " seconds";
+ }
+ PendingJobRegistry::GetInstance().RemoveRunningJob(name_, post_time_);
+ PendingJobRegistry::GetInstance().ListJobs();
+ }
+
+ private:
+ static constexpr absl::Duration kMinReportedStartDelay = absl::Seconds(5);
+ static constexpr absl::Duration kMinReportedTaskDuration = absl::Seconds(10);
+ const std::string name_;
+ Runnable runnable_;
+ absl::Time post_time_ = SystemClock::ElapsedRealtime();
+};
+
+} // namespace nearby
+} // namespace location
+
+#endif // PLATFORM_PUBLIC_MONITORED_RUNNABLE_H_
diff --git a/cpp/platform/public/pending_job_registry.h b/cpp/platform/public/pending_job_registry.h
new file mode 100644
index 0000000..1cb79cb
--- /dev/null
+++ b/cpp/platform/public/pending_job_registry.h
@@ -0,0 +1,102 @@
+// Copyright 2020 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef PLATFORM_PUBLIC_PENDING_JOB_REGISTRY_H_
+#define PLATFORM_PUBLIC_PENDING_JOB_REGISTRY_H_
+
+#include "platform/public/logging.h"
+#include "platform/public/mutex.h"
+#include "platform/public/mutex_lock.h"
+#include "platform/public/system_clock.h"
+#include "absl/base/thread_annotations.h"
+#include "absl/time/time.h"
+
+namespace location {
+namespace nearby {
+
+// A global registry of running tasks. The goal is to help us monitor
+// tasks that are either waiting too long for their turn or they never finish
+class PendingJobRegistry {
+ public:
+ static PendingJobRegistry& GetInstance() {
+ static PendingJobRegistry* instance = new PendingJobRegistry();
+ return *instance;
+ }
+
+ void AddPendingJob(const std::string& name, absl::Time post_time) {
+ MutexLock lock(&mutex_);
+ pending_jobs_.emplace(CreateKey(name, post_time), post_time);
+ }
+
+ void RemovePendingJob(const std::string& name, absl::Time post_time) {
+ MutexLock lock(&mutex_);
+ pending_jobs_.erase(CreateKey(name, post_time));
+ }
+
+ void AddRunningJob(const std::string& name, absl::Time post_time) {
+ MutexLock lock(&mutex_);
+ running_jobs_.emplace(CreateKey(name, post_time),
+ SystemClock::ElapsedRealtime());
+ }
+
+ void RemoveRunningJob(const std::string& name, absl::Time post_time) {
+ MutexLock lock(&mutex_);
+ running_jobs_.erase(CreateKey(name, post_time));
+ }
+
+ void ListJobs() {
+ auto current_time = SystemClock::ElapsedRealtime();
+ if (current_time - list_jobs_time_ < kMinReportInterval) return;
+ MutexLock lock(&mutex_);
+ for (auto& job : pending_jobs_) {
+ auto age = current_time - job.second;
+ if (age >= kReportPendingJobsOlderThan) {
+ NEARBY_LOGS(INFO) << "Task \"" << job.first << "\" is waiting for "
+ << absl::ToInt64Seconds(age) << " s";
+ }
+ }
+ for (auto& job : running_jobs_) {
+ auto age = current_time - job.second;
+ if (age >= kReportRunningJobsOlderThan) {
+ NEARBY_LOGS(INFO) << "Task \"" << job.first << "\" is running for "
+ << absl::ToInt64Seconds(age) << " s";
+ }
+ }
+ list_jobs_time_ = current_time;
+ }
+
+ private:
+ PendingJobRegistry() = default;
+ static constexpr absl::Duration kMinReportInterval = absl::Seconds(60);
+ static constexpr absl::Duration kReportPendingJobsOlderThan =
+ absl::Seconds(40);
+ static constexpr absl::Duration kReportRunningJobsOlderThan =
+ absl::Seconds(60);
+
+ std::string CreateKey(const std::string& name, absl::Time post_time) {
+ return name + "." + std::to_string(absl::ToUnixNanos(post_time));
+ }
+
+ Mutex mutex_;
+ absl::flat_hash_map<const std::string, absl::Time> pending_jobs_
+ ABSL_GUARDED_BY(mutex_);
+ absl::flat_hash_map<const std::string, absl::Time> running_jobs_
+ ABSL_GUARDED_BY(mutex_);
+ absl::Time list_jobs_time_ = absl::UnixEpoch();
+};
+
+} // namespace nearby
+} // namespace location
+
+#endif // PLATFORM_PUBLIC_PENDING_JOB_REGISTRY_H_
diff --git a/cpp/platform/public/scheduled_executor.h b/cpp/platform/public/scheduled_executor.h
index bd60388..26dc255 100644
--- a/cpp/platform/public/scheduled_executor.h
+++ b/cpp/platform/public/scheduled_executor.h
@@ -25,6 +25,7 @@
#include "platform/public/cancelable.h"
#include "platform/public/cancellable_task.h"
#include "platform/public/lockable.h"
+#include "platform/public/monitored_runnable.h"
#include "platform/public/mutex.h"
#include "platform/public/mutex_lock.h"
#include "platform/public/thread_check_callable.h"
@@ -59,6 +60,14 @@
}
return *this;
}
+ void Execute(const std::string& name, Runnable&& runnable)
+ ABSL_LOCKS_EXCLUDED(mutex_) {
+ MutexLock lock(&mutex_);
+ if (impl_)
+ impl_->Execute(MonitoredRunnable(
+ name, ThreadCheckRunnable(this, std::move(runnable))));
+ }
+
void Execute(Runnable&& runnable) ABSL_LOCKS_EXCLUDED(mutex_) {
MutexLock lock(&mutex_);
if (impl_) impl_->Execute(ThreadCheckRunnable(this, std::move(runnable)));
diff --git a/cpp/platform/public/single_thread_executor_test.cc b/cpp/platform/public/single_thread_executor_test.cc
index f89773f..e2059ed 100644
--- a/cpp/platform/public/single_thread_executor_test.cc
+++ b/cpp/platform/public/single_thread_executor_test.cc
@@ -16,6 +16,7 @@
#include <atomic>
#include <functional>
+#include <string>
#include "platform/base/exception.h"
#include "gtest/gtest.h"
@@ -47,6 +48,24 @@
EXPECT_TRUE(done);
}
+TEST(SingleThreadExecutorTest, CanExecuteNamedTask) {
+ absl::CondVar cond;
+ std::atomic_bool done = false;
+ SingleThreadExecutor executor;
+ executor.Execute("my task", [&done, &cond]() {
+ done = true;
+ cond.SignalAll();
+ });
+ absl::Mutex mutex;
+ {
+ absl::MutexLock lock(&mutex);
+ if (!done) {
+ cond.WaitWithTimeout(&mutex, absl::Seconds(1));
+ }
+ }
+ EXPECT_TRUE(done);
+}
+
TEST(SingleThreadExecutorTest, JobsExecuteInOrder) {
std::vector<int> results;
SingleThreadExecutor executor;
diff --git a/cpp/platform/public/submittable_executor.h b/cpp/platform/public/submittable_executor.h
index c5c6f3a..8e4e073 100644
--- a/cpp/platform/public/submittable_executor.h
+++ b/cpp/platform/public/submittable_executor.h
@@ -26,6 +26,7 @@
#include "platform/base/runnable.h"
#include "platform/public/future.h"
#include "platform/public/lockable.h"
+#include "platform/public/monitored_runnable.h"
#include "platform/public/mutex.h"
#include "platform/public/mutex_lock.h"
#include "platform/public/thread_check_callable.h"
@@ -57,9 +58,19 @@
}
return *this;
}
+ void Execute(const std::string& name, Runnable&& runnable)
+ ABSL_LOCKS_EXCLUDED(mutex_) {
+ MutexLock lock(&mutex_);
+ if (impl_)
+ impl_->Execute(MonitoredRunnable(
+ name, ThreadCheckRunnable(this, std::move(runnable))));
+ }
+
void Execute(Runnable&& runnable) ABSL_LOCKS_EXCLUDED(mutex_) override {
MutexLock lock(&mutex_);
- if (impl_) impl_->Execute(ThreadCheckRunnable(this, std::move(runnable)));
+ if (impl_)
+ impl_->Execute(
+ MonitoredRunnable(ThreadCheckRunnable(this, std::move(runnable))));
}
int GetTid(int index) const ABSL_LOCKS_EXCLUDED(mutex_) override {