blob: b555263b8baed81f8292717581d0f43d8898f1cb [file] [log] [blame]
* Copyright 2015 gRPC authors.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
// When you update this API, please make the corresponding changes to
// the C++ API in src/cpp/common/channel_filter.{h,cc}
/* A channel filter defines how operations on a channel are implemented.
Channel filters are chained together to create full channels, and if those
chains are linear, then channel stacks provide a mechanism to minimize
allocations for that chain.
Call stacks are created by channel stacks and represent the per-call data
for that stack.
Implementations should take care of the following details for a batch -
1. Synchronization is achieved with a CallCombiner. View
src/core/lib/iomgr/call_combiner.h for more details.
2. If the filter wants to inject an error on the way down, it needs to call
grpc_transport_stream_op_batch_finish_with_failure from within the call
combiner. This will cause any batch callbacks to be called with that error.
3. If the filter wants to inject an error on the way up (from a callback), it
should also inject that error in the recv_trailing_metadata callback so that
it can have an effect on the call status.
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <functional>
#include <grpc/impl/codegen/gpr_types.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/time_precise.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/transport/transport.h"
struct grpc_channel_element_args {
grpc_channel_stack* channel_stack;
const grpc_channel_args* channel_args;
int is_first;
int is_last;
struct grpc_call_element_args {
grpc_call_stack* call_stack;
const void* server_transport_data;
grpc_call_context_element* context;
const grpc_slice& path;
gpr_cycle_counter start_time; // Note: not populated in subchannel stack.
grpc_core::Timestamp deadline;
grpc_core::Arena* arena;
grpc_core::CallCombiner* call_combiner;
struct grpc_call_stats {
grpc_transport_stream_stats transport_stream_stats;
gpr_timespec latency; /* From call creating to enqueing of received status */
/** Information about the call upon completion. */
struct grpc_call_final_info {
grpc_call_stats stats;
grpc_status_code final_status = GRPC_STATUS_OK;
const char* error_string = nullptr;
/* Channel filters specify:
1. the amount of memory needed in the channel & call (via the sizeof_XXX
2. functions to initialize and destroy channel & call data
(init_XXX, destroy_XXX)
3. functions to implement call operations and channel operations (call_op,
4. a name, which is useful when debugging
Members are laid out in approximate frequency of use order. */
struct grpc_channel_filter {
/* Called to eg. send/receive data on a call.
See grpc_call_next_op on how to call the next element in the stack */
void (*start_transport_stream_op_batch)(grpc_call_element* elem,
grpc_transport_stream_op_batch* op);
/* Create a promise to execute one call.
If this is non-null, it may be used in preference to
If this is used in preference to start_transport_stream_op_batch, the
following can be omitted also:
- calling init_call_elem, destroy_call_elem, set_pollset_or_pollset_set
- allocation of memory for call data
There is an on-going migration to move all filters to providing this, and
then to drop start_transport_stream_op_batch. */
grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> (*make_call_promise)(
grpc_channel_element* elem, grpc_core::CallArgs call_args,
grpc_core::NextPromiseFactory next_promise_factory);
/* Called to handle channel level operations - e.g. new calls, or transport
See grpc_channel_next_op on how to call the next element in the stack */
void (*start_transport_op)(grpc_channel_element* elem, grpc_transport_op* op);
/* sizeof(per call data) */
size_t sizeof_call_data;
/* Initialize per call data.
elem is initialized at the start of the call, and elem->call_data is what
needs initializing.
The filter does not need to do any chaining.
server_transport_data is an opaque pointer. If it is NULL, this call is
on a client; if it is non-NULL, then it points to memory owned by the
transport and is on the server. Most filters want to ignore this
Implementations may assume that elem->call_data is all zeros. */
grpc_error_handle (*init_call_elem)(grpc_call_element* elem,
const grpc_call_element_args* args);
void (*set_pollset_or_pollset_set)(grpc_call_element* elem,
grpc_polling_entity* pollent);
/* Destroy per call data.
The filter does not need to do any chaining.
The bottom filter of a stack will be passed a non-NULL pointer to
\a then_schedule_closure that should be passed to GRPC_CLOSURE_SCHED when
destruction is complete. \a final_info contains data about the completed
call, mainly for reporting purposes. */
void (*destroy_call_elem)(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* then_schedule_closure);
/* sizeof(per channel data) */
size_t sizeof_channel_data;
/* Initialize per-channel data.
elem is initialized at the creating of the channel, and elem->channel_data
is what needs initializing.
is_first, is_last designate this elements position in the stack, and are
useful for asserting correct configuration by upper layer code.
The filter does not need to do any chaining.
Implementations may assume that elem->channel_data is all zeros. */
grpc_error_handle (*init_channel_elem)(grpc_channel_element* elem,
grpc_channel_element_args* args);
/* Post init per-channel data.
Called after all channel elements have been successfully created. */
void (*post_init_channel_elem)(grpc_channel_stack* stk,
grpc_channel_element* elem);
/* Destroy per channel data.
The filter does not need to do any chaining */
void (*destroy_channel_elem)(grpc_channel_element* elem);
/* Implement grpc_channel_get_info() */
void (*get_channel_info)(grpc_channel_element* elem,
const grpc_channel_info* channel_info);
/* The name of this filter */
const char* name;
/* A channel_element tracks its filter and the filter requested memory within
a channel allocation */
struct grpc_channel_element {
const grpc_channel_filter* filter;
void* channel_data;
/* A call_element tracks its filter, the filter requested memory within
a channel allocation, and the filter requested memory within a call
allocation */
struct grpc_call_element {
const grpc_channel_filter* filter;
void* channel_data;
void* call_data;
/* A channel stack tracks a set of related filters for one channel, and
guarantees they live within a single malloc() allocation */
struct grpc_channel_stack {
grpc_stream_refcount refcount;
bool is_client;
size_t count;
/* Memory required for a call stack (computed at channel stack
initialization) */
size_t call_stack_size;
// TODO(ctiller): remove this mechanism... it's a hack to allow
// Channel to be separated from grpc_channel_stack's allocation. As the
// promise conversion continues, we'll reconsider what grpc_channel_stack
// should look like and this can go.
grpc_core::ManualConstructor<std::function<void()>> on_destroy;
// Minimal infrastructure to act like a RefCounted thing without converting
// everything.
// It's likely that we'll want to replace grpc_channel_stack with something
// less regimented once the promise conversion completes, so avoiding doing a
// full C++-ification for now.
void IncrementRefCount();
void Unref();
grpc_core::RefCountedPtr<grpc_channel_stack> Ref() {
return grpc_core::RefCountedPtr<grpc_channel_stack>(this);
grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> MakeCallPromise(
grpc_core::CallArgs call_args);
/* A call stack tracks a set of related filters for one call, and guarantees
they live within a single malloc() allocation */
struct grpc_call_stack {
/* shared refcount for this channel stack.
MUST be the first element: the underlying code calls destroy
with the address of the refcount, but higher layers prefer to think
about the address of the call stack itself. */
grpc_stream_refcount refcount;
size_t count;
// Minimal infrastructure to act like a RefCounted thing without converting
// everything.
// grpc_call_stack will be eliminated once the promise conversion completes.
void IncrementRefCount();
void Unref();
grpc_core::RefCountedPtr<grpc_call_stack> Ref() {
return grpc_core::RefCountedPtr<grpc_call_stack>(this);
/* Get a channel element given a channel stack and its index */
grpc_channel_element* grpc_channel_stack_element(grpc_channel_stack* stack,
size_t i);
/* Get the last channel element in a channel stack */
grpc_channel_element* grpc_channel_stack_last_element(
grpc_channel_stack* stack);
// A utility function for a filter to determine how many other instances
// of the same filter exist above it in the same stack. Intended to be
// used in the filter's init_channel_elem() method.
size_t grpc_channel_stack_filter_instance_number(
grpc_channel_stack* channel_stack, grpc_channel_element* elem);
/* Get a call stack element given a call stack and an index */
grpc_call_element* grpc_call_stack_element(grpc_call_stack* stack, size_t i);
/* Determine memory required for a channel stack containing a set of filters */
size_t grpc_channel_stack_size(const grpc_channel_filter** filters,
size_t filter_count);
/* Initialize a channel stack given some filters */
grpc_error_handle grpc_channel_stack_init(
int initial_refs, grpc_iomgr_cb_func destroy, void* destroy_arg,
const grpc_channel_filter** filters, size_t filter_count,
const grpc_channel_args* args, const char* name, grpc_channel_stack* stack);
/* Destroy a channel stack */
void grpc_channel_stack_destroy(grpc_channel_stack* stack);
/* Initialize a call stack given a channel stack. transport_server_data is
expected to be NULL on a client, or an opaque transport owned pointer on the
server. */
grpc_error_handle grpc_call_stack_init(grpc_channel_stack* channel_stack,
int initial_refs,
grpc_iomgr_cb_func destroy,
void* destroy_arg,
const grpc_call_element_args* elem_args);
/* Set a pollset or a pollset_set for a call stack: must occur before the first
* op is started */
void grpc_call_stack_set_pollset_or_pollset_set(grpc_call_stack* call_stack,
grpc_polling_entity* pollent);
#ifndef NDEBUG
#define GRPC_CALL_STACK_REF(call_stack, reason) \
grpc_stream_ref(&(call_stack)->refcount, reason)
#define GRPC_CALL_STACK_UNREF(call_stack, reason) \
grpc_stream_unref(&(call_stack)->refcount, reason)
#define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \
grpc_stream_ref(&(channel_stack)->refcount, reason)
#define GRPC_CHANNEL_STACK_UNREF(channel_stack, reason) \
grpc_stream_unref(&(channel_stack)->refcount, reason)
#define GRPC_CALL_STACK_REF(call_stack, reason) \
do { \
grpc_stream_ref(&(call_stack)->refcount); \
(void)(reason); \
} while (0);
#define GRPC_CALL_STACK_UNREF(call_stack, reason) \
do { \
grpc_stream_unref(&(call_stack)->refcount); \
(void)(reason); \
} while (0);
#define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \
do { \
grpc_stream_ref(&(channel_stack)->refcount); \
(void)(reason); \
} while (0);
#define GRPC_CHANNEL_STACK_UNREF(channel_stack, reason) \
do { \
grpc_stream_unref(&(channel_stack)->refcount); \
(void)(reason); \
} while (0);
inline void grpc_channel_stack::IncrementRefCount() {
GRPC_CHANNEL_STACK_REF(this, "smart_pointer");
inline void grpc_channel_stack::Unref() {
GRPC_CHANNEL_STACK_UNREF(this, "smart_pointer");
inline void grpc_call_stack::IncrementRefCount() {
GRPC_CALL_STACK_REF(this, "smart_pointer");
inline void grpc_call_stack::Unref() {
GRPC_CALL_STACK_UNREF(this, "smart_pointer");
/* Destroy a call stack */
void grpc_call_stack_destroy(grpc_call_stack* stack,
const grpc_call_final_info* final_info,
grpc_closure* then_schedule_closure);
/* Ignore set pollset{_set} - used by filters if they don't care about pollsets
* at all. Does nothing. */
void grpc_call_stack_ignore_set_pollset_or_pollset_set(
grpc_call_element* elem, grpc_polling_entity* pollent);
/* Call the next operation in a call stack */
void grpc_call_next_op(grpc_call_element* elem,
grpc_transport_stream_op_batch* op);
/* Call the next operation (depending on call directionality) in a channel
stack */
void grpc_channel_next_op(grpc_channel_element* elem, grpc_transport_op* op);
/* Pass through a request to get_channel_info() to the next child element */
void grpc_channel_next_get_info(grpc_channel_element* elem,
const grpc_channel_info* channel_info);
/* Given the top element of a channel stack, get the channel stack itself */
grpc_channel_stack* grpc_channel_stack_from_top_element(
grpc_channel_element* elem);
/* Given the top element of a call stack, get the call stack itself */
grpc_call_stack* grpc_call_stack_from_top_element(grpc_call_element* elem);
void grpc_call_log_op(const char* file, int line, gpr_log_severity severity,
grpc_call_element* elem,
grpc_transport_stream_op_batch* op);
void grpc_channel_stack_no_post_init(grpc_channel_stack* stk,
grpc_channel_element* elem);
extern grpc_core::TraceFlag grpc_trace_channel;
#define GRPC_CALL_LOG_OP(sev, elem, op) \
do { \
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_channel)) { \
grpc_call_log_op(sev, elem, op); \
} \
} while (0)