blob: 7e16666d935a8dada16d75db504dc8ca4e4159ee [file] [log] [blame]
// Copyright 2021 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chromecast/cast_core/grpc/grpc_server.h"
#include <algorithm>
#include <utility>
#include "base/functional/bind.h"
#include "base/functional/callback.h"
#include "base/logging.h"
#include "base/task/bind_post_task.h"
#include "base/task/thread_pool.h"
#include "chromecast/cast_core/grpc/grpc_call_options.h"
namespace cast {
namespace utils {
namespace {
static const auto kDefaultServerStopTimeoutMs = 100;
// Stops gRPC server.
static void StopGrpcServer(
std::unique_ptr<grpc::Server> server,
std::unique_ptr<ServerReactorTracker> server_reactor_tracker,
int64_t timeout_ms,
base::OnceClosure server_stopped_callback) {
LOG(INFO) << "Shutting down gRPC server with "
<< server_reactor_tracker->active_reactor_count()
<< " active reactors: " << *server_reactor_tracker;
// The gRPC Reactors are owned by the gRPC framework and are 'pending'
// unless Reactor::Finish or similar (StartWriteAndFinish) API is called.
// This Shutdown call makes sure all the finished reactors are deleted via
// Reactor::OnDone API. As the timeout is reached, all pending reactors are
// cancelled via Reactor::OnCancel API. Hence, after Shutdow, all pending
// reactors can be treated as cancelled and manually destroyed.
auto gpr_timeout = GrpcCallOptions::ToGprTimespec(timeout_ms);
server->Shutdown(gpr_timeout);
// As mentioned above, all the pending reactors are now cancelled and must
// be destroyed by the ServerReactorTracker.
DCHECK_EQ(server_reactor_tracker->active_reactor_count(), 0UL)
<< "Not all reactors were cancelled: " << *server_reactor_tracker;
LOG(INFO) << "All active reactors are finished";
// Finish server shutdown.
server->Wait();
server.reset();
LOG(INFO) << "gRPC server is shut down";
std::move(server_stopped_callback).Run();
}
} // namespace
GrpcServer::GrpcServer()
: server_reactor_tracker_(std::make_unique<ServerReactorTracker>()) {}
GrpcServer::~GrpcServer() {
DCHECK(!server_) << "gRPC server must be explicitly stopped";
}
void GrpcServer::Start(const std::string& endpoint) {
DCHECK(!server_) << "Server is already running";
DCHECK(server_reactor_tracker_) << "Server was alreadys shutdown";
server_ = grpc::ServerBuilder()
.AddListeningPort(endpoint, grpc::InsecureServerCredentials())
.RegisterCallbackGenericService(this)
.BuildAndStart();
CHECK(server_) << "Failed to start server at " << endpoint;
}
void GrpcServer::Stop() {
if (!server_) {
LOG(WARNING) << "Grpc server was already stopped";
return;
}
StopGrpcServer(std::move(server_), std::move(server_reactor_tracker_),
kDefaultServerStopTimeoutMs, base::BindOnce([]() {}));
}
void GrpcServer::Stop(int64_t timeout_ms,
base::OnceClosure server_stopped_callback) {
if (!server_) {
LOG(WARNING) << "Grpc server was already stopped";
std::move(server_stopped_callback).Run();
return;
}
// Synchronous requests will block gRPC shutdown unless we post shutdown on
// a different thread.
base::ThreadPool::PostTask(
FROM_HERE, {base::MayBlock()},
base::BindOnce(&StopGrpcServer, std::move(server_),
std::move(server_reactor_tracker_), timeout_ms,
std::move(server_stopped_callback)));
}
grpc::ServerGenericBidiReactor* GrpcServer::CreateReactor(
grpc::GenericCallbackServerContext* ctx) {
auto iter = registered_handlers_.find(ctx->method());
if (iter != registered_handlers_.end()) {
DVLOG(1) << "Found a reactor for " << ctx->method();
return iter->second->CreateReactor(ctx);
}
LOG(WARNING) << "No reactor was specified for " << ctx->method()
<< " - falling back to a default unimplemented reactor";
return grpc::CallbackGenericService::CreateReactor(ctx);
}
} // namespace utils
} // namespace cast