Automated rollback of commit 260c33988f8300b27ebaa9f325e5934778da9589. PiperOrigin-RevId: 912910521
diff --git a/include/grpcpp/impl/server_callback_handlers.h b/include/grpcpp/impl/server_callback_handlers.h index c21f44c..3e9628d 100644 --- a/include/grpcpp/impl/server_callback_handlers.h +++ b/include/grpcpp/impl/server_callback_handlers.h
@@ -18,7 +18,6 @@ #ifndef GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H #define GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H -#include <grpc/event_engine/event_engine.h> #include <grpc/grpc.h> #include <grpc/impl/call.h> #include <grpcpp/impl/rpc_service_method.h> @@ -1049,14 +1048,7 @@ } void BindInnerServer(grpc::Server* inner_server) override { - grpc::internal::BindSessionToInnerServer(call_.call(), inner_server, - &transport_, &endpoint_); - } - - void InitiateGracefulShutdown( - absl::AnyInvocable<void(absl::Status)> on_shutdown) override { - grpc::internal::InitiateSessionGracefulShutdown(transport_, endpoint_, - std::move(on_shutdown)); + grpc::internal::BindSessionToInnerServer(call_.call(), inner_server); } private: @@ -1117,8 +1109,6 @@ grpc::CallbackServerContext* const ctx_; grpc::internal::Call call_; - grpc_core::Transport* transport_ = nullptr; - grpc_event_engine::experimental::EventEngine::Endpoint* endpoint_ = nullptr; MessageHolder<RequestType, grpc::ByteBuffer>* const allocator_state_; std::function<void()> call_requester_; grpc::Server* inner_server_;
diff --git a/include/grpcpp/support/server_callback.h b/include/grpcpp/support/server_callback.h index 5691230..6595573 100644 --- a/include/grpcpp/support/server_callback.h +++ b/include/grpcpp/support/server_callback.h
@@ -19,7 +19,6 @@ #ifndef GRPCPP_SUPPORT_SERVER_CALLBACK_H #define GRPCPP_SUPPORT_SERVER_CALLBACK_H -#include <grpc/event_engine/event_engine.h> #include <grpc/impl/call.h> #include <grpcpp/impl/call.h> #include <grpcpp/impl/call_op_set.h> @@ -35,11 +34,6 @@ #include "absl/functional/any_invocable.h" -struct grpc_endpoint; -namespace grpc_core { -class Transport; -} - namespace grpc { class Server; @@ -48,14 +42,7 @@ namespace internal { // Forward declarations -void BindSessionToInnerServer( - grpc_call* call, grpc::Server* inner_server, - grpc_core::Transport** out_transport, - grpc_event_engine::experimental::EventEngine::Endpoint** out_endpoint); -void InitiateSessionGracefulShutdown( - grpc_core::Transport* transport, - grpc_event_engine::experimental::EventEngine::Endpoint* endpoint, - absl::AnyInvocable<void(absl::Status)> on_shutdown); +void BindSessionToInnerServer(grpc_call* call, grpc::Server* inner_server); template <class Request, class Response> class CallbackUnaryHandler; template <class Request, class Response> @@ -236,8 +223,6 @@ virtual void Finish(grpc::Status s) = 0; virtual void SendInitialMetadata() = 0; virtual void BindInnerServer(grpc::Server* inner_server) = 0; - virtual void InitiateGracefulShutdown( - absl::AnyInvocable<void(absl::Status)> on_shutdown) = 0; protected: template <class Reactor> @@ -842,27 +827,6 @@ session->Finish(std::move(s)); } - /// Initiate a graceful shutdown of a SESSION_RPC. - /// - /// This will send a GOAWAY frame to the client, telling it to finish active - /// virtual RPCs on this session and stop opening new ones. The provided - /// callback will be invoked with the final status of the inner transport - /// when it completely shuts down (drains). - void InitiateGracefulShutdown( - absl::AnyInvocable<void(absl::Status)> on_shutdown) - ABSL_LOCKS_EXCLUDED(session_mu_) { - ServerCallbackSession* session = session_.load(std::memory_order_acquire); - if (session == nullptr) { - grpc::internal::MutexLock l(&session_mu_); - session = session_.load(std::memory_order_relaxed); - if (session == nullptr) { - backlog_.graceful_shutdown_wanted_callback = std::move(on_shutdown); - return; - } - } - session->InitiateGracefulShutdown(std::move(on_shutdown)); - } - /// The following notifications are exactly like ServerBidiReactor. virtual void OnSendInitialMetadataDone(bool /*ok*/) {} void OnDone() override = 0; @@ -879,10 +843,6 @@ if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { session->SendInitialMetadata(); } - if (GPR_UNLIKELY(backlog_.graceful_shutdown_wanted_callback != nullptr)) { - session->InitiateGracefulShutdown( - std::move(backlog_.graceful_shutdown_wanted_callback)); - } if (GPR_UNLIKELY(backlog_.finish_wanted)) { session->Finish(std::move(backlog_.status_wanted)); } @@ -895,7 +855,6 @@ struct PreBindBacklog { bool send_initial_metadata_wanted = false; bool finish_wanted = false; - absl::AnyInvocable<void(absl::Status)> graceful_shutdown_wanted_callback; grpc::Status status_wanted; };
diff --git a/src/core/transport/session_endpoint.cc b/src/core/transport/session_endpoint.cc index 08b1f8a..a1bd740 100644 --- a/src/core/transport/session_endpoint.cc +++ b/src/core/transport/session_endpoint.cc
@@ -30,7 +30,6 @@ #include <memory> #include <utility> -#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/event_engine_shims/endpoint.h" #include "src/core/lib/iomgr/exec_ctx.h" @@ -242,7 +241,8 @@ Ref(); if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) == kShutdownBit + 1) { - CompleteShutdown(); + grpc_call_cancel_internal(call_); + Unref(); } break; } @@ -251,27 +251,7 @@ Unref(); } - void SetGracefulShutdown() { - if (!ShutdownRef()) return; - graceful_shutdown_ = true; - ShutdownUnref(); - } - - void ClearCall() { call_cleared_.store(true, std::memory_order_release); } - private: - void CompleteShutdown() { - if (graceful_shutdown_ || call_cleared_.load(std::memory_order_acquire)) { - // Just drop our refs so the call can finish normally. - call_cleared_.store(true, std::memory_order_release); - Unref(); - } else { - call_cleared_.store(true, std::memory_order_release); - grpc_call_cancel_internal(call_); - Unref(); - } - } - bool ShutdownRef() { int64_t curr = shutdown_ref_.load(std::memory_order_acquire); while (true) { @@ -289,12 +269,12 @@ void ShutdownUnref() { if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) == kShutdownBit + 1) { - CompleteShutdown(); + grpc_call_cancel_internal(call_); + Unref(); } } grpc_call* const call_; - std::atomic<bool> call_cleared_{false}; const bool is_client_; std::atomic<int64_t> refs_{1}; @@ -306,7 +286,6 @@ SessionEndpointTag write_tag_; std::atomic<bool> write_in_progress_{false}; - bool graceful_shutdown_ = false; }; grpc_endpoint* SessionEndpoint::Create(grpc_call* call, bool is_client) { @@ -326,10 +305,6 @@ return impl_->Read(std::move(on_read), buffer); } -void SessionEndpoint::SetGracefulShutdown() { impl_->SetGracefulShutdown(); } - -void SessionEndpoint::ClearCall() { impl_->ClearCall(); } - bool SessionEndpoint::Write(absl::AnyInvocable<void(absl::Status)> on_writable, grpc_event_engine::experimental::SliceBuffer* data, WriteArgs /*args*/) {
diff --git a/src/core/transport/session_endpoint.h b/src/core/transport/session_endpoint.h index a8df1ac..08e4aa0 100644 --- a/src/core/transport/session_endpoint.h +++ b/src/core/transport/session_endpoint.h
@@ -74,10 +74,6 @@ return nullptr; } - void SetGracefulShutdown(); - - void ClearCall(); - private: SessionEndpointImpl* impl_; grpc_event_engine::experimental::EventEngine::ResolvedAddress local_address_;
diff --git a/src/cpp/server/server_callback.cc b/src/cpp/server/server_callback.cc index 6bc307b..310562c 100644 --- a/src/cpp/server/server_callback.cc +++ b/src/cpp/server/server_callback.cc
@@ -22,7 +22,6 @@ #include "src/core/call/server_call.h" #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" -#include "src/core/lib/iomgr/event_engine_shims/endpoint.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel_stack_type.h" #include "src/core/server/server.h" @@ -32,10 +31,7 @@ namespace grpc { namespace internal { -void BindSessionToInnerServer( - grpc_call* call, grpc::Server* inner_server, - grpc_core::Transport** out_transport, - grpc_event_engine::experimental::EventEngine::Endpoint** out_endpoint) { +void BindSessionToInnerServer(grpc_call* call, grpc::Server* inner_server) { grpc_core::ExecCtx exec_ctx; grpc_core::Server* core_inner_server = @@ -44,11 +40,6 @@ // Create ServerSessionEndpoint grpc_endpoint* endpoint = grpc_core::SessionEndpoint::Create(call, /*is_client=*/false); - if (out_endpoint != nullptr) { - *out_endpoint = - grpc_event_engine::experimental::grpc_get_wrapped_event_engine_endpoint( - endpoint); - } grpc_core::ChannelArgs args = core_inner_server->channel_args(); if (args.GetObject<grpc_core::ResourceQuota>() == nullptr) { @@ -68,21 +59,11 @@ args, grpc_core::OrphanablePtr<grpc_endpoint>(endpoint), /*is_client=*/false); - if (out_transport != nullptr) { - *out_transport = transport_ptr; - } - auto status = core_inner_server->SetupTransport( transport_ptr, /*accepting_pollset=*/nullptr, args, GRPC_SERVER_VIRTUAL_CHANNEL); if (!status.ok()) { LOG(ERROR) << "SetupTransport failed: " << status; - if (out_transport != nullptr) { - *out_transport = nullptr; - } - if (out_endpoint != nullptr) { - *out_endpoint = nullptr; - } grpc_core::Call::FromC(call)->CancelWithError(status); } else { // The transport is set up, but we need to start reading from it. @@ -91,57 +72,6 @@ } } -namespace { -class ShutdownWatcher : public grpc_core::Transport::StateWatcher { - public: - explicit ShutdownWatcher(absl::AnyInvocable<void(absl::Status)> on_shutdown) - : on_shutdown_(std::move(on_shutdown)) {} - void OnDisconnect(absl::Status status, - DisconnectInfo disconnect_info) override { - if (on_shutdown_) { - if (disconnect_info.reason == - grpc_core::Transport::StateWatcher::kGoaway) { - on_shutdown_(absl::OkStatus()); - } else { - on_shutdown_(std::move(status)); - } - } - } - void OnPeerMaxConcurrentStreamsUpdate( - uint32_t /*max_concurrent_streams*/, - std::unique_ptr<MaxConcurrentStreamsUpdateDoneHandle> /*on_done*/) - override {} - grpc_pollset_set* interested_parties() const override { return nullptr; } - - private: - absl::AnyInvocable<void(absl::Status)> on_shutdown_; -}; -} // namespace - -void InitiateSessionGracefulShutdown( - grpc_core::Transport* transport, - grpc_event_engine::experimental::EventEngine::Endpoint* endpoint, - absl::AnyInvocable<void(absl::Status)> on_shutdown) { - grpc_core::ExecCtx exec_ctx; - if (endpoint != nullptr) { - auto* session_endpoint = static_cast<grpc_core::SessionEndpoint*>(endpoint); - session_endpoint->SetGracefulShutdown(); - } - if (transport != nullptr) { - transport->StartWatch( - grpc_core::MakeRefCounted<ShutdownWatcher>(std::move(on_shutdown))); - - grpc_transport_op* op = grpc_make_transport_op(nullptr); - op->goaway_error = grpc_error_set_int( - GRPC_ERROR_CREATE("Graceful shutdown"), - grpc_core::StatusIntProperty::kHttp2Error, - static_cast<int>(grpc_core::http2::Http2ErrorCode::kNoError)); - transport->PerformOp(op); - } else if (on_shutdown) { - on_shutdown(absl::UnavailableError("No transport available")); - } -} - void ServerCallbackCall::ScheduleOnDone(bool inline_ondone) { if (inline_ondone) { CallOnDone();