blob: 96d87cab60a4b06f94ab3e4a3d9fbb53e29c0333 [file] [log] [blame]
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "remoting/base/grpc_support/grpc_async_executor.h"
#include <algorithm>
#include <utility>
#include "base/bind.h"
#include "base/callback.h"
#include "base/no_destructor.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "base/threading/thread.h"
#include "base/time/time.h"
#include "remoting/base/grpc_support/grpc_async_request.h"
#include "third_party/grpc/src/include/grpcpp/completion_queue.h"
namespace remoting {
namespace {
using DequeueCallback = base::OnceCallback<void(bool operation_succeeded)>;
struct DispatchTask {
scoped_refptr<base::SequencedTaskRunner> caller_sequence_task_runner;
DequeueCallback callback;
};
// Helper class that is shared by all GrpcAsyncExecutors to run the completion
// queue and dispatch tasks back to the right executor.
// When enqueueing, caller should create a DispatchTask and enqueue it as the
// event_tag. The ownership of the object will be taken by the
// CompletionQueueDispatcher.
class CompletionQueueDispatcher {
public:
CompletionQueueDispatcher();
~CompletionQueueDispatcher();
static CompletionQueueDispatcher* GetInstance();
grpc::CompletionQueue* completion_queue() { return &completion_queue_; }
private:
void RunQueueOnDispatcherThread();
// TODO(yuweih): Consider using task scheduler instead.
// We need a dedicated thread because getting response from the completion
// queue will block until any response is received. Note that the RPC call
// itself is still async, meaning any new RPC call when the queue is blocked
// can still be made, and can unblock the queue once the response is ready.
base::Thread dispatcher_thread_{"grpc_completion_queue_dispatcher"};
// Note that the gRPC library is thread-safe.
grpc::CompletionQueue completion_queue_;
DISALLOW_COPY_AND_ASSIGN(CompletionQueueDispatcher);
};
CompletionQueueDispatcher::CompletionQueueDispatcher() {
dispatcher_thread_.Start();
dispatcher_thread_.task_runner()->PostTask(
FROM_HERE,
base::BindOnce(&CompletionQueueDispatcher::RunQueueOnDispatcherThread,
base::Unretained(this)));
}
CompletionQueueDispatcher::~CompletionQueueDispatcher() = default;
// static
CompletionQueueDispatcher* CompletionQueueDispatcher::GetInstance() {
static base::NoDestructor<CompletionQueueDispatcher> dispatcher;
return dispatcher.get();
}
void CompletionQueueDispatcher::RunQueueOnDispatcherThread() {
void* event_tag;
bool operation_succeeded = false;
// completion_queue_.Next() blocks until a response is received.
while (completion_queue_.Next(&event_tag, &operation_succeeded)) {
DispatchTask* task = reinterpret_cast<DispatchTask*>(event_tag);
task->caller_sequence_task_runner->PostTask(
FROM_HERE,
base::BindOnce(std::move(task->callback), operation_succeeded));
delete task;
}
}
} // namespace
GrpcAsyncExecutor::GrpcAsyncExecutor() : weak_factory_(this) {
DETACH_FROM_SEQUENCE(sequence_checker_);
}
GrpcAsyncExecutor::~GrpcAsyncExecutor() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
VLOG(1) << "# of pending RPCs at destruction: " << pending_requests_.size();
CancelPendingRequests();
}
void GrpcAsyncExecutor::ExecuteRpc(std::unique_ptr<GrpcAsyncRequest> request) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
auto* unowned_request = request.get();
DCHECK(FindRequest(unowned_request) == pending_requests_.end());
auto task = std::make_unique<DispatchTask>();
task->caller_sequence_task_runner = base::SequencedTaskRunnerHandle::Get();
task->callback =
base::BindOnce(&GrpcAsyncExecutor::OnDequeue, weak_factory_.GetWeakPtr(),
std::move(request));
if (!unowned_request->CanStartRequest()) {
VLOG(1) << "RPC is canceled before execution: " << unowned_request;
return;
}
VLOG(1) << "Enqueuing RPC: " << unowned_request;
// User can potentially delete the executor in the callback, so we should
// delay it to prevent race condition. We also bind the closure with the
// WeakPtr of this object to make sure it won't run after the executor is
// deleted.
auto run_task_cb = base::BindRepeating(
&GrpcAsyncExecutor::PostTaskToRunClosure, weak_factory_.GetWeakPtr());
unowned_request->Start(
run_task_cb, CompletionQueueDispatcher::GetInstance()->completion_queue(),
task.release());
// You might think that we can invert the ownership and make GrpcAsyncExecutor
// own the request instead, but this doesn't work because the gRPC completion
// queue (which runs on a different thread) expects the client context to be
// alive when you try to pop out a completed/dead event.
pending_requests_.push_back(unowned_request->GetGrpcAsyncRequestWeakPtr());
}
void GrpcAsyncExecutor::CancelPendingRequests() {
VLOG(1) << "Canceling # of pending requests: " << pending_requests_.size();
// Drop pending response callbacks.
weak_factory_.InvalidateWeakPtrs();
for (auto& pending_request : pending_requests_) {
// If the sequence itself is being destroyed, pending tasks will be dropped
// in arbitrary order without checking the weak ptr. If the dequeue task is
// destroyed earlier than the executor itself, then |pending_request| will
// already be destroyed.
if (pending_request) {
pending_request->CancelRequest();
}
}
pending_requests_.clear();
}
void GrpcAsyncExecutor::OnDequeue(std::unique_ptr<GrpcAsyncRequest> request,
bool operation_succeeded) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!request->OnDequeue(operation_succeeded)) {
VLOG(1) << "Dequeuing RPC: " << request.get();
auto iter = FindRequest(request.get());
DCHECK(iter != pending_requests_.end());
pending_requests_.erase(iter);
return;
}
VLOG(1) << "Re-enqueuing RPC: " << request.get();
DCHECK(FindRequest(request.get()) != pending_requests_.end());
auto* unowned_request = request.get();
auto task = std::make_unique<DispatchTask>();
task->caller_sequence_task_runner = base::SequencedTaskRunnerHandle::Get();
task->callback =
base::BindOnce(&GrpcAsyncExecutor::OnDequeue, weak_factory_.GetWeakPtr(),
std::move(request));
unowned_request->Reenqueue(task.release());
}
void GrpcAsyncExecutor::PostTaskToRunClosure(base::OnceClosure closure) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
base::SequencedTaskRunnerHandle::Get()->PostTask(
FROM_HERE,
base::BindOnce(&GrpcAsyncExecutor::RunClosure, weak_factory_.GetWeakPtr(),
std::move(closure)));
}
void GrpcAsyncExecutor::RunClosure(base::OnceClosure closure) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
std::move(closure).Run();
}
GrpcAsyncExecutor::PendingRequestListIter GrpcAsyncExecutor::FindRequest(
GrpcAsyncRequest* request) {
return std::find_if(
pending_requests_.begin(), pending_requests_.end(),
[request](const base::WeakPtr<GrpcAsyncRequest>& current_request) {
return current_request.get() == request;
});
}
} // namespace remoting