blob: f36f0c52cd216874a2c6fbea3d2047e1f3d09ff1 [file] [log] [blame]
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "remoting/base/grpc_support/grpc_async_server_streaming_request.h"
#include "base/bind.h"
namespace remoting {
namespace {
void RunTaskIfScopedStreamIsAlive(
base::WeakPtr<ScopedGrpcServerStream> scoped_stream,
base::OnceClosure task) {
if (scoped_stream) {
std::move(task).Run();
}
}
} // namespace
GrpcAsyncServerStreamingRequestBase::GrpcAsyncServerStreamingRequestBase(
base::OnceCallback<void(const grpc::Status&)> on_channel_closed,
std::unique_ptr<ScopedGrpcServerStream>* scoped_stream)
: weak_factory_(this) {
DCHECK(on_channel_closed);
DCHECK_NE(nullptr, scoped_stream);
on_channel_closed_ = std::move(on_channel_closed);
*scoped_stream =
std::make_unique<ScopedGrpcServerStream>(weak_factory_.GetWeakPtr());
scoped_stream_ = (*scoped_stream)->GetWeakPtr();
}
GrpcAsyncServerStreamingRequestBase::~GrpcAsyncServerStreamingRequestBase() =
default;
void GrpcAsyncServerStreamingRequestBase::RunTask(base::OnceClosure task) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(run_task_callback_);
run_task_callback_.Run(base::BindOnce(&RunTaskIfScopedStreamIsAlive,
scoped_stream_, std::move(task)));
}
bool GrpcAsyncServerStreamingRequestBase::OnDequeue(bool operation_succeeded) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (state_ == State::CLOSED) {
return false;
}
if (state_ == State::FINISHING) {
DCHECK(operation_succeeded);
state_ = State::CLOSED;
ResolveChannelClosed();
return false;
}
if (!operation_succeeded) {
VLOG(1) << "Can't read any more data. Figuring out the reason..."
<< " Streaming call: " << this;
state_ = State::FINISHING;
return true;
}
if (state_ == State::STARTING) {
VLOG(1) << "Streaming call started: " << this;
state_ = State::STREAMING;
return true;
}
if (state_ == State::STREAMING) {
VLOG(1) << "Streaming call received message: " << this;
ResolveIncomingMessage();
return true;
}
NOTREACHED();
return false;
}
void GrpcAsyncServerStreamingRequestBase::Reenqueue(void* event_tag) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
switch (state_) {
case State::STREAMING:
WaitForIncomingMessage(event_tag);
break;
case State::FINISHING:
FinishStream(event_tag);
break;
default:
NOTREACHED() << "Unexpected state: " << static_cast<int>(state_);
break;
}
}
void GrpcAsyncServerStreamingRequestBase::OnRequestCanceled() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
state_ = State::CLOSED;
status_ = grpc::Status::CANCELLED;
weak_factory_.InvalidateWeakPtrs();
}
bool GrpcAsyncServerStreamingRequestBase::CanStartRequest() const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
return state_ == State::STARTING;
}
void GrpcAsyncServerStreamingRequestBase::ResolveChannelClosed() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
RunTask(base::BindOnce(std::move(on_channel_closed_), status_));
}
} // namespace remoting