blob: 8e5ad9eceb6ca73dd188105ce7f0e26f0040f15e [file] [log] [blame]
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/core/lib/transport/connectivity_state.h"
#include <string>
#include <type_traits>
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
namespace grpc_core {
TraceFlag grpc_connectivity_state_trace(false, "connectivity_state");
const char* ConnectivityStateName(grpc_connectivity_state state) {
switch (state) {
case GRPC_CHANNEL_IDLE:
return "IDLE";
case GRPC_CHANNEL_CONNECTING:
return "CONNECTING";
case GRPC_CHANNEL_READY:
return "READY";
case GRPC_CHANNEL_TRANSIENT_FAILURE:
return "TRANSIENT_FAILURE";
case GRPC_CHANNEL_SHUTDOWN:
return "SHUTDOWN";
}
GPR_UNREACHABLE_CODE(return "UNKNOWN");
}
//
// AsyncConnectivityStateWatcherInterface
//
// A fire-and-forget class to asynchronously deliver a connectivity
// state notification to a watcher.
class AsyncConnectivityStateWatcherInterface::Notifier {
public:
Notifier(RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher,
grpc_connectivity_state state, const absl::Status& status,
const std::shared_ptr<WorkSerializer>& work_serializer)
: watcher_(std::move(watcher)), state_(state), status_(status) {
if (work_serializer != nullptr) {
work_serializer->Run(
[this]() { SendNotification(this, GRPC_ERROR_NONE); },
DEBUG_LOCATION);
} else {
GRPC_CLOSURE_INIT(&closure_, SendNotification, this,
grpc_schedule_on_exec_ctx);
ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
}
}
private:
static void SendNotification(void* arg, grpc_error_handle /*ignored*/) {
Notifier* self = static_cast<Notifier*>(arg);
if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
gpr_log(GPR_INFO, "watcher %p: delivering async notification for %s (%s)",
self->watcher_.get(), ConnectivityStateName(self->state_),
self->status_.ToString().c_str());
}
self->watcher_->OnConnectivityStateChange(self->state_, self->status_);
delete self;
}
RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher_;
const grpc_connectivity_state state_;
const absl::Status status_;
grpc_closure closure_;
};
void AsyncConnectivityStateWatcherInterface::Notify(
grpc_connectivity_state state, const absl::Status& status) {
new Notifier(Ref(), state, status,
work_serializer_); // Deletes itself when done.
}
//
// ConnectivityStateTracker
//
ConnectivityStateTracker::~ConnectivityStateTracker() {
grpc_connectivity_state current_state =
state_.load(std::memory_order_relaxed);
if (current_state == GRPC_CHANNEL_SHUTDOWN) return;
for (const auto& p : watchers_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
gpr_log(GPR_INFO,
"ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s",
name_, this, p.first, ConnectivityStateName(current_state),
ConnectivityStateName(GRPC_CHANNEL_SHUTDOWN));
}
p.second->Notify(GRPC_CHANNEL_SHUTDOWN, absl::Status());
}
}
void ConnectivityStateTracker::AddWatcher(
grpc_connectivity_state initial_state,
OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: add watcher %p", name_,
this, watcher.get());
}
grpc_connectivity_state current_state =
state_.load(std::memory_order_relaxed);
if (initial_state != current_state) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
gpr_log(GPR_INFO,
"ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s",
name_, this, watcher.get(), ConnectivityStateName(initial_state),
ConnectivityStateName(current_state));
}
watcher->Notify(current_state, status_);
}
// If we're in state SHUTDOWN, don't add the watcher, so that it will
// be orphaned immediately.
if (current_state != GRPC_CHANNEL_SHUTDOWN) {
watchers_.insert(std::make_pair(watcher.get(), std::move(watcher)));
}
}
void ConnectivityStateTracker::RemoveWatcher(
ConnectivityStateWatcherInterface* watcher) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: remove watcher %p",
name_, this, watcher);
}
watchers_.erase(watcher);
}
void ConnectivityStateTracker::SetState(grpc_connectivity_state state,
const absl::Status& status,
const char* reason) {
grpc_connectivity_state current_state =
state_.load(std::memory_order_relaxed);
if (state == current_state) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: %s -> %s (%s, %s)",
name_, this, ConnectivityStateName(current_state),
ConnectivityStateName(state), reason, status.ToString().c_str());
}
state_.store(state, std::memory_order_relaxed);
status_ = status;
for (const auto& p : watchers_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
gpr_log(GPR_INFO,
"ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s",
name_, this, p.first, ConnectivityStateName(current_state),
ConnectivityStateName(state));
}
p.second->Notify(state, status);
}
// If the new state is SHUTDOWN, orphan all of the watchers. This
// avoids the need for the callers to explicitly cancel them.
if (state == GRPC_CHANNEL_SHUTDOWN) watchers_.clear();
}
grpc_connectivity_state ConnectivityStateTracker::state() const {
grpc_connectivity_state state = state_.load(std::memory_order_relaxed);
if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: get current state: %s",
name_, this, ConnectivityStateName(state));
}
return state;
}
} // namespace grpc_core