| // Copyright 2021 gRPC authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #ifndef GRPC_CORE_LIB_PROMISE_ACTIVITY_H |
| #define GRPC_CORE_LIB_PROMISE_ACTIVITY_H |
| |
| #include <grpc/support/port_platform.h> |
| |
| #include <stdint.h> |
| |
| #include <algorithm> |
| #include <atomic> |
| #include <memory> |
| #include <utility> |
| |
| #include "absl/base/thread_annotations.h" |
| #include "absl/status/status.h" |
| #include "absl/types/optional.h" |
| #include "absl/types/variant.h" |
| #include "absl/utility/utility.h" |
| |
| #include <grpc/support/log.h> |
| |
| #include "src/core/lib/gpr/tls.h" |
| #include "src/core/lib/gprpp/construct_destruct.h" |
| #include "src/core/lib/gprpp/orphanable.h" |
| #include "src/core/lib/gprpp/sync.h" |
| #include "src/core/lib/promise/context.h" |
| #include "src/core/lib/promise/detail/promise_factory.h" |
| #include "src/core/lib/promise/detail/status.h" |
| #include "src/core/lib/promise/poll.h" |
| |
| namespace grpc_core { |
| |
| // A Wakeable object is used by queues to wake activities. |
| class Wakeable { |
| public: |
| // Wake up the underlying activity. |
| // After calling, this Wakeable cannot be used again. |
| virtual void Wakeup() = 0; |
| // Drop this wakeable without waking up the underlying activity. |
| virtual void Drop() = 0; |
| |
| protected: |
| inline ~Wakeable() {} |
| }; |
| |
| // An owning reference to a Wakeable. |
| // This type is non-copyable but movable. |
| class Waker { |
| public: |
| explicit Waker(Wakeable* wakeable) : wakeable_(wakeable) {} |
| Waker() : wakeable_(&unwakeable_) {} |
| ~Waker() { wakeable_->Drop(); } |
| Waker(const Waker&) = delete; |
| Waker& operator=(const Waker&) = delete; |
| Waker(Waker&& other) noexcept : wakeable_(other.wakeable_) { |
| other.wakeable_ = &unwakeable_; |
| } |
| Waker& operator=(Waker&& other) noexcept { |
| std::swap(wakeable_, other.wakeable_); |
| return *this; |
| } |
| |
| // Wake the underlying activity. |
| void Wakeup() { |
| wakeable_->Wakeup(); |
| wakeable_ = &unwakeable_; |
| } |
| |
| template <typename H> |
| friend H AbslHashValue(H h, const Waker& w) { |
| return H::combine(std::move(h), w.wakeable_); |
| } |
| |
| bool operator==(const Waker& other) const noexcept { |
| return wakeable_ == other.wakeable_; |
| } |
| |
| private: |
| class Unwakeable final : public Wakeable { |
| public: |
| void Wakeup() final {} |
| void Drop() final {} |
| }; |
| |
| Wakeable* wakeable_; |
| static Unwakeable unwakeable_; |
| }; |
| |
| // An Activity tracks execution of a single promise. |
| // It executes the promise under a mutex. |
| // When the promise stalls, it registers the containing activity to be woken up |
| // later. |
| // The activity takes a callback, which will be called exactly once with the |
| // result of execution. |
| // Activity execution may be cancelled by simply deleting the activity. In such |
| // a case, if execution had not already finished, the done callback would be |
| // called with absl::CancelledError(). |
| class Activity : public Orphanable { |
| public: |
| // Force wakeup from the outside. |
| // This should be rarely needed, and usages should be accompanied with a note |
| // on why it's not possible to wakeup with a Waker object. |
| // Nevertheless, it's sometimes useful for integrations with Activity to force |
| // an Activity to repoll. |
| void ForceWakeup() { MakeOwningWaker().Wakeup(); } |
| |
| // Force the current activity to immediately repoll if it doesn't complete. |
| virtual void ForceImmediateRepoll() = 0; |
| |
| // Return the current activity. |
| // Additionally: |
| // - assert that there is a current activity (and catch bugs if there's not) |
| // - indicate to thread safety analysis that the current activity is indeed |
| // locked |
| // - back up that assertation with a runtime check in debug builds (it's |
| // prohibitively expensive in non-debug builds) |
| static Activity* current() { return g_current_activity_; } |
| |
| // Produce an activity-owning Waker. The produced waker will keep the activity |
| // alive until it's awoken or dropped. |
| virtual Waker MakeOwningWaker() = 0; |
| |
| // Produce a non-owning Waker. The waker will own a small heap allocated weak |
| // pointer to this activity. This is more suitable for wakeups that may not be |
| // delivered until long after the activity should be destroyed. |
| virtual Waker MakeNonOwningWaker() = 0; |
| |
| protected: |
| // Check if this activity is the current activity executing on the current |
| // thread. |
| bool is_current() const { return this == g_current_activity_; } |
| // Check if there is an activity executing on the current thread. |
| static bool have_current() { return g_current_activity_ != nullptr; } |
| // Set the current activity at construction, clean it up at destruction. |
| class ScopedActivity { |
| public: |
| explicit ScopedActivity(Activity* activity) |
| : prior_activity_(g_current_activity_) { |
| g_current_activity_ = activity; |
| } |
| ~ScopedActivity() { g_current_activity_ = prior_activity_; } |
| ScopedActivity(const ScopedActivity&) = delete; |
| ScopedActivity& operator=(const ScopedActivity&) = delete; |
| |
| private: |
| Activity* const prior_activity_; |
| }; |
| |
| private: |
| // Set during RunLoop to the Activity that's executing. |
| // Being set implies that mu_ is held. |
| static GPR_THREAD_LOCAL(Activity*) g_current_activity_; |
| }; |
| |
| // Owned pointer to one Activity. |
| using ActivityPtr = OrphanablePtr<Activity>; |
| |
| namespace promise_detail { |
| |
| template <typename Context> |
| class ContextHolder { |
| public: |
| using ContextType = Context; |
| |
| explicit ContextHolder(Context value) : value_(std::move(value)) {} |
| Context* GetContext() { return &value_; } |
| |
| private: |
| Context value_; |
| }; |
| |
| template <typename Context> |
| class ContextHolder<Context*> { |
| public: |
| using ContextType = Context; |
| |
| explicit ContextHolder(Context* value) : value_(value) {} |
| Context* GetContext() { return value_; } |
| |
| private: |
| Context* value_; |
| }; |
| |
| template <typename Context, typename Deleter> |
| class ContextHolder<std::unique_ptr<Context, Deleter>> { |
| public: |
| using ContextType = Context; |
| |
| explicit ContextHolder(std::unique_ptr<Context, Deleter> value) |
| : value_(std::move(value)) {} |
| Context* GetContext() { return value_.get(); } |
| |
| private: |
| std::unique_ptr<Context, Deleter> value_; |
| }; |
| |
| template <typename HeldContext> |
| using ContextTypeFromHeld = typename ContextHolder<HeldContext>::ContextType; |
| |
| template <typename... Contexts> |
| class ActivityContexts : public ContextHolder<Contexts>... { |
| public: |
| explicit ActivityContexts(Contexts&&... contexts) |
| : ContextHolder<Contexts>(std::forward<Contexts>(contexts))... {} |
| |
| class ScopedContext : public Context<ContextTypeFromHeld<Contexts>>... { |
| public: |
| explicit ScopedContext(ActivityContexts* contexts) |
| : Context<ContextTypeFromHeld<Contexts>>( |
| static_cast<ContextHolder<Contexts>*>(contexts) |
| ->GetContext())... {} |
| }; |
| }; |
| |
| // A free standing activity: an activity that owns its own synchronization and |
| // memory. |
| // The alternative is an activity that's somehow tied into another system, for |
| // instance the type seen in promise_based_filter.h as we're transitioning from |
| // the old filter stack to the new system. |
| // FreestandingActivity is-a Wakeable, but needs to increment a refcount before |
| // returning that Wakeable interface. Additionally, we want to keep |
| // FreestandingActivity as small as is possible, since it will be used |
| // everywhere. So we use inheritance to provide the Wakeable interface: this |
| // makes it zero sized, and we make the inheritance private to prevent |
| // accidental casting. |
| class FreestandingActivity : public Activity, private Wakeable { |
| public: |
| Waker MakeOwningWaker() final { |
| Ref(); |
| return Waker(this); |
| } |
| Waker MakeNonOwningWaker() final; |
| |
| void Orphan() final { |
| Cancel(); |
| Unref(); |
| } |
| |
| void ForceImmediateRepoll() final { |
| mu_.AssertHeld(); |
| SetActionDuringRun(ActionDuringRun::kWakeup); |
| } |
| |
| protected: |
| // Action received during a run, in priority order. |
| // If more than one action is received during a run, we use max() to resolve |
| // which one to report (so Cancel overrides Wakeup). |
| enum class ActionDuringRun : uint8_t { |
| kNone, // No action occured during run. |
| kWakeup, // A wakeup occured during run. |
| kCancel, // Cancel was called during run. |
| }; |
| |
| inline ~FreestandingActivity() override { |
| if (handle_) { |
| DropHandle(); |
| } |
| } |
| |
| // Check if we got an internal wakeup since the last time this function was |
| // called. |
| ActionDuringRun GotActionDuringRun() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { |
| return absl::exchange(action_during_run_, ActionDuringRun::kNone); |
| } |
| |
| // Implementors of Wakeable::Wakeup should call this after the wakeup has |
| // completed. |
| void WakeupComplete() { Unref(); } |
| |
| // Set the action that occured during this run. |
| // We use max to combine actions so that cancellation overrides wakeups. |
| void SetActionDuringRun(ActionDuringRun action) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { |
| action_during_run_ = std::max(action_during_run_, action); |
| } |
| |
| Mutex* mu() ABSL_LOCK_RETURNED(mu_) { return &mu_; } |
| |
| private: |
| class Handle; |
| |
| // Cancel execution of the underlying promise. |
| virtual void Cancel() = 0; |
| |
| void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); } |
| void Unref() { |
| if (1 == refs_.fetch_sub(1, std::memory_order_acq_rel)) { |
| delete this; |
| } |
| } |
| |
| // Return a Handle instance with a ref so that it can be stored waiting for |
| // some wakeup. |
| Handle* RefHandle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
| // If our refcount is non-zero, ref and return true. |
| // Otherwise, return false. |
| bool RefIfNonzero(); |
| // Drop the (proved existing) wait handle. |
| void DropHandle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
| |
| // All promise execution occurs under this mutex. |
| Mutex mu_; |
| |
| // Current refcount. |
| std::atomic<uint32_t> refs_{1}; |
| // If wakeup is called during Promise polling, we set this to Wakeup and |
| // repoll. If cancel is called during Promise polling, we set this to Cancel |
| // and cancel at the end of polling. |
| ActionDuringRun action_during_run_ ABSL_GUARDED_BY(mu_) = |
| ActionDuringRun::kNone; |
| // Handle for long waits. Allows a very small weak pointer type object to |
| // queue for wakeups while Activity may be deleted earlier. |
| Handle* handle_ ABSL_GUARDED_BY(mu_) = nullptr; |
| }; |
| |
| // Implementation details for an Activity of an arbitrary type of promise. |
| // There should exist a static function: |
| // struct WakeupScheduler { |
| // template <typename ActivityType> |
| // void ScheduleWakeup(ActivityType* activity); |
| // }; |
| // This function should arrange that activity->RunScheduledWakeup() be invoked |
| // at the earliest opportunity. |
| // It can assume that activity will remain live until RunScheduledWakeup() is |
| // invoked, and that a given activity will not be concurrently scheduled again |
| // until its RunScheduledWakeup() has been invoked. |
| // We use private inheritance here as a way of getting private members for |
| // each of the contexts. |
| // TODO(ctiller): We can probably reconsider the private inheritance here |
| // when we move away from C++11 and have more powerful template features. |
| template <class F, class WakeupScheduler, class OnDone, typename... Contexts> |
| class PromiseActivity final : public FreestandingActivity, |
| private ActivityContexts<Contexts...> { |
| public: |
| using Factory = PromiseFactory<void, F>; |
| using ResultType = typename Factory::Promise::Result; |
| |
| PromiseActivity(F promise_factory, WakeupScheduler wakeup_scheduler, |
| OnDone on_done, Contexts&&... contexts) |
| : FreestandingActivity(), |
| ActivityContexts<Contexts...>(std::forward<Contexts>(contexts)...), |
| wakeup_scheduler_(std::move(wakeup_scheduler)), |
| on_done_(std::move(on_done)) { |
| // Lock, construct an initial promise from the factory, and step it. |
| // This may hit a waiter, which could expose our this pointer to other |
| // threads, meaning we do need to hold this mutex even though we're still |
| // constructing. |
| mu()->Lock(); |
| auto status = Start(Factory(std::move(promise_factory))); |
| mu()->Unlock(); |
| // We may complete immediately. |
| if (status.has_value()) { |
| on_done_(std::move(*status)); |
| } |
| } |
| |
| ~PromiseActivity() override { |
| // We shouldn't destruct without calling Cancel() first, and that must get |
| // us to be done_, so we assume that and have no logic to destruct the |
| // promise here. |
| GPR_ASSERT(done_); |
| } |
| |
| void RunScheduledWakeup() { |
| GPR_ASSERT(wakeup_scheduled_.exchange(false, std::memory_order_acq_rel)); |
| Step(); |
| WakeupComplete(); |
| } |
| |
| private: |
| using typename ActivityContexts<Contexts...>::ScopedContext; |
| |
| void Cancel() final { |
| if (Activity::is_current()) { |
| mu()->AssertHeld(); |
| SetActionDuringRun(ActionDuringRun::kCancel); |
| return; |
| } |
| bool was_done; |
| { |
| MutexLock lock(mu()); |
| // Check if we were done, and flag done. |
| was_done = done_; |
| if (!done_) MarkDone(); |
| } |
| // If we were not done, then call the on_done callback. |
| if (!was_done) { |
| on_done_(absl::CancelledError()); |
| } |
| } |
| |
| // Wakeup this activity. Arrange to poll the activity again at a convenient |
| // time: this could be inline if it's deemed safe, or it could be by passing |
| // the activity to an external threadpool to run. If the activity is already |
| // running on this thread, a note is taken of such and the activity is |
| // repolled if it doesn't complete. |
| void Wakeup() final { |
| // If there is an active activity, but hey it's us, flag that and we'll loop |
| // in RunLoop (that's calling from above here!). |
| if (Activity::is_current()) { |
| mu()->AssertHeld(); |
| SetActionDuringRun(ActionDuringRun::kWakeup); |
| WakeupComplete(); |
| return; |
| } |
| if (!wakeup_scheduled_.exchange(true, std::memory_order_acq_rel)) { |
| // Can't safely run, so ask to run later. |
| wakeup_scheduler_.ScheduleWakeup(this); |
| } else { |
| // Already a wakeup scheduled for later, drop ref. |
| WakeupComplete(); |
| } |
| } |
| |
| // Drop a wakeup |
| void Drop() final { this->WakeupComplete(); } |
| |
| // Notification that we're no longer executing - it's ok to destruct the |
| // promise. |
| void MarkDone() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) { |
| GPR_ASSERT(!done_); |
| done_ = true; |
| Destruct(&promise_holder_.promise); |
| } |
| |
| // In response to Wakeup, run the Promise state machine again until it |
| // settles. Then check for completion, and if we have completed, call on_done. |
| void Step() ABSL_LOCKS_EXCLUDED(mu()) { |
| // Poll the promise until things settle out under a lock. |
| mu()->Lock(); |
| if (done_) { |
| // We might get some spurious wakeups after finishing. |
| mu()->Unlock(); |
| return; |
| } |
| auto status = RunStep(); |
| mu()->Unlock(); |
| if (status.has_value()) { |
| on_done_(std::move(*status)); |
| } |
| } |
| |
| // The main body of a step: set the current activity, and any contexts, and |
| // then run the main polling loop. Contained in a function by itself in |
| // order to keep the scoping rules a little easier in Step(). |
| absl::optional<ResultType> RunStep() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) { |
| ScopedActivity scoped_activity(this); |
| ScopedContext contexts(this); |
| return StepLoop(); |
| } |
| |
| // Similarly to RunStep, but additionally construct the promise from a |
| // promise factory before entering the main loop. Called once from the |
| // constructor. |
| absl::optional<ResultType> Start(Factory promise_factory) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) { |
| ScopedActivity scoped_activity(this); |
| ScopedContext contexts(this); |
| Construct(&promise_holder_.promise, promise_factory.Once()); |
| return StepLoop(); |
| } |
| |
| // Until there are no wakeups from within and the promise is incomplete: |
| // poll the promise. |
| absl::optional<ResultType> StepLoop() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) { |
| GPR_ASSERT(is_current()); |
| while (true) { |
| // Run the promise. |
| GPR_ASSERT(!done_); |
| auto r = promise_holder_.promise(); |
| if (auto* status = absl::get_if<kPollReadyIdx>(&r)) { |
| // If complete, destroy the promise, flag done, and exit this loop. |
| MarkDone(); |
| return IntoStatus(status); |
| } |
| // Continue looping til no wakeups occur. |
| switch (GotActionDuringRun()) { |
| case ActionDuringRun::kNone: |
| return {}; |
| case ActionDuringRun::kWakeup: |
| break; |
| case ActionDuringRun::kCancel: |
| MarkDone(); |
| return absl::CancelledError(); |
| } |
| } |
| } |
| |
| using Promise = typename Factory::Promise; |
| // Scheduler for wakeups |
| GPR_NO_UNIQUE_ADDRESS WakeupScheduler wakeup_scheduler_; |
| // Callback on completion of the promise. |
| GPR_NO_UNIQUE_ADDRESS OnDone on_done_; |
| // Has execution completed? |
| GPR_NO_UNIQUE_ADDRESS bool done_ ABSL_GUARDED_BY(mu()) = false; |
| // Is there a wakeup scheduled? |
| GPR_NO_UNIQUE_ADDRESS std::atomic<bool> wakeup_scheduled_{false}; |
| // We wrap the promise in a union to allow control over the construction |
| // simultaneously with annotating mutex requirements and noting that the |
| // promise contained may not use any memory. |
| union PromiseHolder { |
| PromiseHolder() {} |
| ~PromiseHolder() {} |
| GPR_NO_UNIQUE_ADDRESS Promise promise; |
| }; |
| GPR_NO_UNIQUE_ADDRESS PromiseHolder promise_holder_ ABSL_GUARDED_BY(mu()); |
| }; |
| |
| } // namespace promise_detail |
| |
| // Given a functor that returns a promise (a promise factory), a callback for |
| // completion, and a callback scheduler, construct an activity. |
| template <typename Factory, typename WakeupScheduler, typename OnDone, |
| typename... Contexts> |
| ActivityPtr MakeActivity(Factory promise_factory, |
| WakeupScheduler wakeup_scheduler, OnDone on_done, |
| Contexts&&... contexts) { |
| return ActivityPtr( |
| new promise_detail::PromiseActivity<Factory, WakeupScheduler, OnDone, |
| Contexts...>( |
| std::move(promise_factory), std::move(wakeup_scheduler), |
| std::move(on_done), std::forward<Contexts>(contexts)...)); |
| } |
| |
| } // namespace grpc_core |
| |
| #endif // GRPC_CORE_LIB_PROMISE_ACTIVITY_H |