blob: d585457e81741acee593e308f0230d04e8463abc [file] [log] [blame]
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_PROMISE_PIPE_H
#define GRPC_CORE_LIB_PROMISE_PIPE_H
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include "absl/types/optional.h"
#include <grpc/support/log.h>
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/intra_activity_waiter.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/resource_quota/arena.h"
namespace grpc_core {
template <typename T>
struct Pipe;
namespace pipe_detail {
template <typename T>
class Push;
template <typename T>
class Next;
// Center sits between a sender and a receiver to provide a one-deep buffer of
// Ts
template <typename T>
class Center {
public:
// Initialize with one send ref (held by PipeSender) and one recv ref (held by
// PipeReceiver)
Center() {
send_refs_ = 1;
recv_refs_ = 1;
has_value_ = false;
}
// Add one ref to the send side of this object, and return this.
Center* RefSend() {
send_refs_++;
return this;
}
// Add one ref to the recv side of this object, and return this.
Center* RefRecv() {
recv_refs_++;
return this;
}
// Drop a send side ref
// If no send refs remain, wake due to send closure
// If no refs remain, destroy this object
void UnrefSend() {
GPR_DEBUG_ASSERT(send_refs_ > 0);
send_refs_--;
if (0 == send_refs_) {
on_full_.Wake();
on_empty_.Wake();
if (0 == recv_refs_) {
this->~Center();
}
}
}
// Drop a recv side ref
// If no recv refs remain, wake due to recv closure
// If no refs remain, destroy this object
void UnrefRecv() {
GPR_DEBUG_ASSERT(recv_refs_ > 0);
recv_refs_--;
if (0 == recv_refs_) {
on_full_.Wake();
on_empty_.Wake();
if (0 == send_refs_) {
this->~Center();
} else if (has_value_) {
ResetValue();
}
}
}
// Try to push *value into the pipe.
// Return Pending if there is no space.
// Return true if the value was pushed.
// Return false if the recv end is closed.
Poll<bool> Push(T* value) {
GPR_DEBUG_ASSERT(send_refs_ != 0);
if (recv_refs_ == 0) return false;
if (has_value_) return on_empty_.pending();
has_value_ = true;
value_ = std::move(*value);
on_full_.Wake();
return true;
}
// Try to receive a value from the pipe.
// Return Pending if there is no value.
// Return the value if one was retrieved.
// Return nullopt if the send end is closed and no value had been pushed.
Poll<absl::optional<T>> Next() {
GPR_DEBUG_ASSERT(recv_refs_ != 0);
if (!has_value_) {
if (send_refs_ == 0) return absl::nullopt;
return on_full_.pending();
}
has_value_ = false;
on_empty_.Wake();
return std::move(value_);
}
private:
void ResetValue() {
// Fancy dance to move out of value in the off chance that we reclaim some
// memory earlier.
[](T) {}(std::move(value_));
has_value_ = false;
}
T value_;
// Number of sending objects.
// 0 => send is closed.
// 1 ref each for PipeSender and Push.
uint8_t send_refs_ : 2;
// Number of receiving objects.
// 0 => recv is closed.
// 1 ref each for PipeReceiver and Next.
uint8_t recv_refs_ : 2;
// True iff there is a value in the pipe.
bool has_value_ : 1;
IntraActivityWaiter on_empty_;
IntraActivityWaiter on_full_;
};
} // namespace pipe_detail
// Send end of a Pipe.
template <typename T>
class PipeSender {
public:
PipeSender(const PipeSender&) = delete;
PipeSender& operator=(const PipeSender&) = delete;
PipeSender(PipeSender&& other) noexcept : center_(other.center_) {
other.center_ = nullptr;
}
PipeSender& operator=(PipeSender&& other) noexcept {
if (center_ != nullptr) center_->UnrefSend();
center_ = other.center_;
other.center_ = nullptr;
return *this;
}
~PipeSender() {
if (center_ != nullptr) center_->UnrefSend();
}
// Send a single message along the pipe.
// Returns a promise that will resolve to a bool - true if the message was
// sent, false if it could never be sent. Blocks the promise until the
// receiver is either closed or able to receive another message.
pipe_detail::Push<T> Push(T value);
private:
friend struct Pipe<T>;
explicit PipeSender(pipe_detail::Center<T>* center) : center_(center) {}
pipe_detail::Center<T>* center_;
};
// Receive end of a Pipe.
template <typename T>
class PipeReceiver {
public:
PipeReceiver(const PipeReceiver&) = delete;
PipeReceiver& operator=(const PipeReceiver&) = delete;
PipeReceiver(PipeReceiver&& other) noexcept : center_(other.center_) {
other.center_ = nullptr;
}
PipeReceiver& operator=(PipeReceiver&& other) noexcept {
if (center_ != nullptr) center_->UnrefRecv();
center_ = other.center_;
other.center_ = nullptr;
return *this;
}
~PipeReceiver() {
if (center_ != nullptr) center_->UnrefRecv();
}
// Receive a single message from the pipe.
// Returns a promise that will resolve to an optional<T> - with a value if a
// message was received, or no value if the other end of the pipe was closed.
// Blocks the promise until the receiver is either closed or a message is
// available.
pipe_detail::Next<T> Next();
private:
friend struct Pipe<T>;
explicit PipeReceiver(pipe_detail::Center<T>* center) : center_(center) {}
pipe_detail::Center<T>* center_;
};
namespace pipe_detail {
// Implementation of PipeSender::Push promise.
template <typename T>
class Push {
public:
Push(const Push&) = delete;
Push& operator=(const Push&) = delete;
Push(Push&& other) noexcept
: center_(other.center_), push_(std::move(other.push_)) {
other.center_ = nullptr;
}
Push& operator=(Push&& other) noexcept {
if (center_ != nullptr) center_->UnrefSend();
center_ = other.center_;
other.center_ = nullptr;
push_ = std::move(other.push_);
return *this;
}
~Push() {
if (center_ != nullptr) center_->UnrefSend();
}
Poll<bool> operator()() { return center_->Push(&push_); }
private:
friend class PipeSender<T>;
explicit Push(pipe_detail::Center<T>* center, T push)
: center_(center), push_(std::move(push)) {}
Center<T>* center_;
T push_;
};
// Implementation of PipeReceiver::Next promise.
template <typename T>
class Next {
public:
Next(const Next&) = delete;
Next& operator=(const Next&) = delete;
Next(Next&& other) noexcept : center_(other.center_) {
other.center_ = nullptr;
}
Next& operator=(Next&& other) noexcept {
if (center_ != nullptr) center_->UnrefRecv();
center_ = other.center_;
other.center_ = nullptr;
return *this;
}
~Next() {
if (center_ != nullptr) center_->UnrefRecv();
}
Poll<absl::optional<T>> operator()() { return center_->Next(); }
private:
friend class PipeReceiver<T>;
explicit Next(pipe_detail::Center<T>* center) : center_(center) {}
Center<T>* center_;
};
} // namespace pipe_detail
template <typename T>
pipe_detail::Push<T> PipeSender<T>::Push(T value) {
return pipe_detail::Push<T>(center_->RefSend(), std::move(value));
}
template <typename T>
pipe_detail::Next<T> PipeReceiver<T>::Next() {
return pipe_detail::Next<T>(center_->RefRecv());
}
// A Pipe is an intra-Activity communications channel that transmits T's from
// one end to the other.
// It is only safe to use a Pipe within the context of a single Activity.
// No synchronization is performed internally.
// The primary Pipe data structure is allocated from an arena, so the activity
// must have an arena as part of its context.
// By performing that allocation we can ensure stable pointer to shared data
// allowing PipeSender/PipeReceiver/Push/Next to be relatively simple in their
// implementation.
// This type has been optimized with the expectation that there are relatively
// few pipes per activity. If this assumption does not hold then a design
// allowing inline filtering of pipe contents (instead of connecting pipes with
// polling code) would likely be more appropriate.
template <typename T>
struct Pipe {
Pipe() : Pipe(GetContext<Arena>()->New<pipe_detail::Center<T>>()) {}
Pipe(const Pipe&) = delete;
Pipe& operator=(const Pipe&) = delete;
Pipe(Pipe&&) noexcept = default;
Pipe& operator=(Pipe&&) noexcept = default;
PipeSender<T> sender;
PipeReceiver<T> receiver;
private:
explicit Pipe(pipe_detail::Center<T>* center)
: sender(center), receiver(center) {}
};
} // namespace grpc_core
#endif // GRPC_CORE_LIB_PROMISE_PIPE_H