| // Copyright 2019 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #ifndef REMOTING_BASE_GRPC_SUPPORT_GRPC_ASYNC_SERVER_STREAMING_REQUEST_H_ |
| #define REMOTING_BASE_GRPC_SUPPORT_GRPC_ASYNC_SERVER_STREAMING_REQUEST_H_ |
| |
| #include <memory> |
| #include <utility> |
| |
| #include "base/bind.h" |
| #include "base/callback.h" |
| #include "base/macros.h" |
| #include "base/memory/weak_ptr.h" |
| #include "base/sequence_checker.h" |
| #include "remoting/base/grpc_support/grpc_async_request.h" |
| #include "remoting/base/grpc_support/scoped_grpc_server_stream.h" |
| #include "third_party/grpc/src/include/grpcpp/support/async_stream.h" |
| |
| namespace remoting { |
| |
| template <typename RequestType, typename ResponseType> |
| using GrpcAsyncServerStreamingRpcFunction = |
| base::OnceCallback<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>( |
| grpc::ClientContext*, |
| const RequestType&, |
| grpc::CompletionQueue*, |
| void*)>; |
| |
| // GrpcAsyncRequest implementation for server streaming call. The object is |
| // first enqueued for starting the stream, then kept being re-enqueued to |
| // receive a new message, until it's canceled by calling CancelRequest(). |
| class GrpcAsyncServerStreamingRequestBase : public GrpcAsyncRequest { |
| public: |
| GrpcAsyncServerStreamingRequestBase( |
| std::unique_ptr<grpc::ClientContext> context, |
| base::OnceCallback<void(const grpc::Status&)> on_channel_closed, |
| std::unique_ptr<ScopedGrpcServerStream>* scoped_stream); |
| ~GrpcAsyncServerStreamingRequestBase() override; |
| |
| protected: |
| enum class State { |
| STARTING, |
| STREAMING, |
| |
| // Server has closed the stream and we are getting back the reason. |
| FINISHING, |
| |
| CLOSED, |
| }; |
| |
| void set_run_task_callback(const RunTaskCallback& callback) { |
| run_task_callback_ = callback; |
| } |
| |
| // Schedules a task with |run_task_callback_|. Drops it if the scoped stream |
| // has been deleted right before it is being executed. |
| void RunTask(base::OnceClosure task); |
| |
| virtual void ResolveIncomingMessage() = 0; |
| virtual void WaitForIncomingMessage(void* event_tag) = 0; |
| virtual void FinishStream(void* event_tag) = 0; |
| |
| private: |
| // GrpcAsyncRequest implementations. |
| bool OnDequeue(bool operation_succeeded) override; |
| void Reenqueue(void* event_tag) override; |
| void OnRequestCanceled() override; |
| bool CanStartRequest() const override; |
| void ResolveChannelClosed(); |
| |
| base::OnceCallback<void(const grpc::Status&)> on_channel_closed_; |
| State state_ = State::STARTING; |
| |
| RunTaskCallback run_task_callback_; |
| base::WeakPtr<ScopedGrpcServerStream> scoped_stream_; |
| |
| SEQUENCE_CHECKER(sequence_checker_); |
| |
| base::WeakPtrFactory<GrpcAsyncServerStreamingRequestBase> weak_factory_; |
| DISALLOW_COPY_AND_ASSIGN(GrpcAsyncServerStreamingRequestBase); |
| }; |
| |
| template <typename ResponseType> |
| class GrpcAsyncServerStreamingRequest |
| : public GrpcAsyncServerStreamingRequestBase { |
| public: |
| using OnIncomingMessageCallback = |
| base::RepeatingCallback<void(const ResponseType&)>; |
| using StartAndCreateReaderCallback = |
| base::OnceCallback<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>( |
| grpc::CompletionQueue* cq, |
| void* event_tag)>; |
| |
| GrpcAsyncServerStreamingRequest( |
| std::unique_ptr<grpc::ClientContext> context, |
| StartAndCreateReaderCallback create_reader_callback, |
| const OnIncomingMessageCallback& on_incoming_msg, |
| base::OnceCallback<void(const grpc::Status&)> on_channel_closed, |
| std::unique_ptr<ScopedGrpcServerStream>* scoped_stream) |
| : GrpcAsyncServerStreamingRequestBase(std::move(context), |
| std::move(on_channel_closed), |
| scoped_stream) { |
| create_reader_callback_ = std::move(create_reader_callback); |
| on_incoming_msg_ = on_incoming_msg; |
| } |
| |
| ~GrpcAsyncServerStreamingRequest() override = default; |
| |
| private: |
| // GrpcAsyncRequest implementations |
| void Start(const RunTaskCallback& run_task_cb, |
| grpc::CompletionQueue* cq, |
| void* event_tag) override { |
| reader_ = std::move(create_reader_callback_).Run(cq, event_tag); |
| set_run_task_callback(run_task_cb); |
| } |
| |
| // GrpcAsyncServerStreamingRequestBase implementations. |
| void ResolveIncomingMessage() override { |
| RunTask(base::BindOnce(on_incoming_msg_, response_)); |
| } |
| |
| void WaitForIncomingMessage(void* event_tag) override { |
| DCHECK(reader_); |
| reader_->Read(&response_, event_tag); |
| } |
| |
| void FinishStream(void* event_tag) override { |
| DCHECK(reader_); |
| reader_->Finish(&status_, event_tag); |
| } |
| |
| StartAndCreateReaderCallback create_reader_callback_; |
| ResponseType response_; |
| std::unique_ptr<grpc::ClientAsyncReader<ResponseType>> reader_; |
| OnIncomingMessageCallback on_incoming_msg_; |
| |
| DISALLOW_COPY_AND_ASSIGN(GrpcAsyncServerStreamingRequest); |
| }; |
| |
| // Creates a server streaming request. |
| // |rpc_function| is called once GrpcExecutor is about to send out the request. |
| // |on_incoming_msg| is called once a message is streamed from the server. |
| // |on_channel_closed| is called once the channel is closed remotely by the |
| // server. |
| // |scoped_stream| is set with an object which upon destruction will cancel the |
| // stream. |
| template <typename RequestType, typename ResponseType> |
| std::unique_ptr<GrpcAsyncServerStreamingRequest<ResponseType>> |
| CreateGrpcAsyncServerStreamingRequest( |
| GrpcAsyncServerStreamingRpcFunction<RequestType, ResponseType> rpc_function, |
| std::unique_ptr<grpc::ClientContext> context, |
| const RequestType& request, |
| const base::RepeatingCallback<void(const ResponseType&)>& on_incoming_msg, |
| base::OnceCallback<void(const grpc::Status&)> on_channel_closed, |
| std::unique_ptr<ScopedGrpcServerStream>* scoped_stream) { |
| auto start_and_create_reader_cb = |
| base::BindOnce(std::move(rpc_function), context.get(), request); |
| return std::make_unique<GrpcAsyncServerStreamingRequest<ResponseType>>( |
| std::move(context), std::move(start_and_create_reader_cb), |
| on_incoming_msg, std::move(on_channel_closed), scoped_stream); |
| } |
| |
| } // namespace remoting |
| |
| #endif // REMOTING_BASE_GRPC_SUPPORT_GRPC_ASYNC_SERVER_STREAMING_REQUEST_H_ |