blob: 49a48b032fd5cf19e170d000f4689a9b9751bbac [file] [log] [blame]
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef REMOTING_SIGNALING_GRPC_ASYNC_DISPATCHER_H_
#define REMOTING_SIGNALING_GRPC_ASYNC_DISPATCHER_H_
#include <memory>
#include <utility>
#include "base/callback_forward.h"
#include "base/containers/flat_set.h"
#include "base/macros.h"
#include "base/synchronization/lock.h"
#include "base/threading/thread.h"
#include "remoting/signaling/grpc_async_call_data.h"
#include "third_party/grpc/src/include/grpcpp/completion_queue.h"
#include "third_party/grpc/src/include/grpcpp/support/async_unary_call.h"
namespace remoting {
// This class helps adopting the gRPC async completion queue handling logic into
// Chromium's callback paradigm.
//
// Basic usage looks like this:
//
// class MyClass {
// public:
// MyClass() : weak_factory_(this) {}
// ~MyClass() {}
//
// void SayHello() {
// HelloRequest request;
// dispatcher_->ExecuteAsyncRpc(
// // This is run immediately inside the call stack of
// // |ExecuteAsyncRpc|.
// base::BindOnce(&HelloService::Stub::AsyncSayHello,
// base::Unretained(stub_.get())),
// std::make_unique<grpc::ClientContext>(), request,
// // Callback might be called after the dispatcher is destroyed.
// base::BindOnce(&MyClass::OnHelloResult,
// weak_factory_.GetWeakPtr()));
// }
//
// private:
// void OnHelloResult(grpc::Status status,
// const HelloResponse& response) {
// if (status.error_code() == grpc::StatusCode::CANCELLED) {
// // The request has been canceled because |dispatcher_| is destroyed.
// // If you need to access class members here, make sure to bind a weak
// // pointer in the RpcCallback. Otherwise using base::Unretained() is
// // fine.
// return;
// }
//
// if (!status.ok()) {
// // Handle other error here.
// return;
// }
//
// // Response is received. Use the result here.
// }
//
// std::unique_ptr<HelloService::Stub> stub_;
// GrpcAsyncDispatcher dispatcher_;
// base::WeakPtrFactory<MyClass> weak_factory_;
// };
class GrpcAsyncDispatcher {
public:
template <typename ResponseType>
using RpcCallback =
base::OnceCallback<void(grpc::Status, const ResponseType&)>;
template <typename RequestType, typename ResponseType>
using AsyncRpcFunction = base::OnceCallback<std::unique_ptr<
grpc::ClientAsyncResponseReader<ResponseType>>(grpc::ClientContext*,
const RequestType&,
grpc::CompletionQueue*)>;
GrpcAsyncDispatcher();
~GrpcAsyncDispatcher();
// Immediately executes |rpc_function| inside the call stack of this function,
// and runs |callback| once the server responses. If the dispatcher is
// destroyed before the server responses, |callback| will be called with
// a CANCELLED status *after* the dispatcher is destroyed.
//
// It is safe to bind raw pointer into |rpc_function|, but you might want to
// bind weak pointer in |callback| if you need to access your bound object in
// the cancel case.
template <typename RequestType, typename ResponseType>
void ExecuteAsyncRpc(AsyncRpcFunction<RequestType, ResponseType> rpc_function,
std::unique_ptr<grpc::ClientContext> context,
const RequestType& request,
RpcCallback<ResponseType> callback) {
auto response_reader =
std::move(rpc_function).Run(context.get(), request, &completion_queue_);
auto data = std::make_unique<GrpcAsyncCallData<ResponseType>>(
std::move(context), std::move(response_reader), std::move(callback));
RegisterRpcData(std::move(data));
}
private:
void RunQueueOnDispatcherThread();
void RegisterRpcData(std::unique_ptr<GrpcAsyncCallDataBase> rpc_data);
// We need a dedicated thread because getting response from the completion
// queue will block until any response is received. Note that the RPC call
// itself is still async, meaning any new RPC call when the queue is blocked
// can still be made, and can unblock the queue once the response is ready.
base::Thread dispatcher_thread_{"grpc_async_dispatcher"};
// Note that the gRPC library is thread-safe.
grpc::CompletionQueue completion_queue_;
// Keep the list of pending RPCs so that we can cancel them at destruction.
base::flat_set<GrpcAsyncCallDataBase*> pending_rpcs_;
base::Lock pending_rpcs_lock_;
DISALLOW_COPY_AND_ASSIGN(GrpcAsyncDispatcher);
};
} // namespace remoting
#endif // REMOTING_SIGNALING_GRPC_ASYNC_DISPATCHER_H_