Merge pull request #19453 from arjunroy/hptbl_init
Faster hpack table creation
diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.cc b/src/core/ext/transport/chttp2/transport/hpack_parser.cc
index 616d6c5..3284892 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_parser.cc
+++ b/src/core/ext/transport/chttp2/transport/hpack_parser.cc
@@ -779,6 +779,7 @@
const uint8_t* cur, const uint8_t* end) {
p->dynamic_table_update_allowed = 0;
p->index = (*cur) & 0x7f;
+ p->md_for_index.payload = 0; /* Invalidate cached md when index changes. */
return finish_indexed_field(p, cur + 1, end);
}
@@ -791,17 +792,32 @@
p->dynamic_table_update_allowed = 0;
p->next_state = and_then;
p->index = 0x7f;
+ p->md_for_index.payload = 0; /* Invalidate cached md when index changes. */
p->parsing.value = &p->index;
return parse_value0(p, cur + 1, end);
}
+/* When finishing with a header, get the cached md element for this index.
+ This is set in parse_value_string(). We ensure (in debug mode) that the
+ cached metadata corresponds with the index we are examining. */
+static grpc_mdelem get_precomputed_md_for_idx(grpc_chttp2_hpack_parser* p) {
+ GPR_DEBUG_ASSERT(p->md_for_index.payload != 0);
+ GPR_DEBUG_ASSERT(static_cast<int64_t>(p->index) == p->precomputed_md_index);
+ grpc_mdelem md = p->md_for_index;
+ GPR_DEBUG_ASSERT(!GRPC_MDISNULL(md)); /* handled in string parsing */
+ p->md_for_index.payload = 0; /* Invalidate cached md when index changes. */
+#ifndef NDEBUG
+ p->precomputed_md_index = -1;
+#endif
+ return md;
+}
+
/* finish a literal header with incremental indexing */
static grpc_error* finish_lithdr_incidx(grpc_chttp2_hpack_parser* p,
const uint8_t* cur,
const uint8_t* end) {
- grpc_mdelem md = grpc_chttp2_hptbl_lookup(&p->table, p->index);
- GPR_ASSERT(!GRPC_MDISNULL(md)); /* handled in string parsing */
GRPC_STATS_INC_HPACK_RECV_LITHDR_INCIDX();
+ grpc_mdelem md = get_precomputed_md_for_idx(p);
grpc_error* err = on_hdr<true>(
p, grpc_mdelem_from_slices(grpc_slice_ref_internal(GRPC_MDKEY(md)),
take_string(p, &p->value, true)));
@@ -829,6 +845,7 @@
p->dynamic_table_update_allowed = 0;
p->next_state = and_then;
p->index = (*cur) & 0x3f;
+ p->md_for_index.payload = 0; /* Invalidate cached md when index changes. */
return parse_string_prefix(p, cur + 1, end);
}
@@ -842,6 +859,7 @@
p->dynamic_table_update_allowed = 0;
p->next_state = and_then;
p->index = 0x3f;
+ p->md_for_index.payload = 0; /* Invalidate cached md when index changes. */
p->parsing.value = &p->index;
return parse_value0(p, cur + 1, end);
}
@@ -862,9 +880,8 @@
static grpc_error* finish_lithdr_notidx(grpc_chttp2_hpack_parser* p,
const uint8_t* cur,
const uint8_t* end) {
- grpc_mdelem md = grpc_chttp2_hptbl_lookup(&p->table, p->index);
- GPR_ASSERT(!GRPC_MDISNULL(md)); /* handled in string parsing */
GRPC_STATS_INC_HPACK_RECV_LITHDR_NOTIDX();
+ grpc_mdelem md = get_precomputed_md_for_idx(p);
grpc_error* err = on_hdr<false>(
p, grpc_mdelem_from_slices(grpc_slice_ref_internal(GRPC_MDKEY(md)),
take_string(p, &p->value, false)));
@@ -892,6 +909,7 @@
p->dynamic_table_update_allowed = 0;
p->next_state = and_then;
p->index = (*cur) & 0xf;
+ p->md_for_index.payload = 0; /* Invalidate cached md when index changes. */
return parse_string_prefix(p, cur + 1, end);
}
@@ -905,6 +923,7 @@
p->dynamic_table_update_allowed = 0;
p->next_state = and_then;
p->index = 0xf;
+ p->md_for_index.payload = 0; /* Invalidate cached md when index changes. */
p->parsing.value = &p->index;
return parse_value0(p, cur + 1, end);
}
@@ -925,9 +944,8 @@
static grpc_error* finish_lithdr_nvridx(grpc_chttp2_hpack_parser* p,
const uint8_t* cur,
const uint8_t* end) {
- grpc_mdelem md = grpc_chttp2_hptbl_lookup(&p->table, p->index);
- GPR_ASSERT(!GRPC_MDISNULL(md)); /* handled in string parsing */
GRPC_STATS_INC_HPACK_RECV_LITHDR_NVRIDX();
+ grpc_mdelem md = get_precomputed_md_for_idx(p);
grpc_error* err = on_hdr<false>(
p, grpc_mdelem_from_slices(grpc_slice_ref_internal(GRPC_MDKEY(md)),
take_string(p, &p->value, false)));
@@ -955,6 +973,7 @@
p->dynamic_table_update_allowed = 0;
p->next_state = and_then;
p->index = (*cur) & 0xf;
+ p->md_for_index.payload = 0; /* Invalidate cached md when index changes. */
return parse_string_prefix(p, cur + 1, end);
}
@@ -968,6 +987,7 @@
p->dynamic_table_update_allowed = 0;
p->next_state = and_then;
p->index = 0xf;
+ p->md_for_index.payload = 0; /* Invalidate cached md when index changes. */
p->parsing.value = &p->index;
return parse_value0(p, cur + 1, end);
}
@@ -1007,6 +1027,7 @@
}
p->dynamic_table_update_allowed--;
p->index = (*cur) & 0x1f;
+ p->md_for_index.payload = 0; /* Invalidate cached md when index changes. */
return finish_max_tbl_size(p, cur + 1, end);
}
@@ -1025,6 +1046,7 @@
p->dynamic_table_update_allowed--;
p->next_state = and_then;
p->index = 0x1f;
+ p->md_for_index.payload = 0; /* Invalidate cached md when index changes. */
p->parsing.value = &p->index;
return parse_value0(p, cur + 1, end);
}
@@ -1499,6 +1521,23 @@
: p->key.data.referenced);
}
+/* Cache the metadata for the given index during initial parsing. This avoids a
+ pointless recomputation of the metadata when finishing a header. We read the
+ cached value in get_precomputed_md_for_idx(). */
+static void set_precomputed_md_idx(grpc_chttp2_hpack_parser* p,
+ grpc_mdelem md) {
+ GPR_DEBUG_ASSERT(p->md_for_index.payload == 0);
+ GPR_DEBUG_ASSERT(p->precomputed_md_index == -1);
+ p->md_for_index = md;
+#ifndef NDEBUG
+ p->precomputed_md_index = p->index;
+#endif
+}
+
+/* Determines if a metadata element key associated with the current parser index
+ is a binary indexed header during string parsing. We'll need to revisit this
+ metadata when we're done parsing, so we cache the metadata for this index
+ here using set_precomputed_md_idx(). */
static grpc_error* is_binary_indexed_header(grpc_chttp2_hpack_parser* p,
bool* is) {
grpc_mdelem elem = grpc_chttp2_hptbl_lookup(&p->table, p->index);
@@ -1519,6 +1558,7 @@
* interned.
* 4. Both static and interned element slices have non-null refcounts. */
*is = grpc_is_refcounted_slice_binary_header(GRPC_MDKEY(elem));
+ set_precomputed_md_idx(p, elem);
return GRPC_ERROR_NONE;
}
@@ -1557,6 +1597,18 @@
p->value.data.copied.str = nullptr;
p->value.data.copied.capacity = 0;
p->value.data.copied.length = 0;
+ /* Cached metadata for the current index the parser is handling. This is set
+ to 0 initially, invalidated when the index changes, and invalidated when it
+ is read (by get_precomputed_md_for_idx()). It is set during string parsing,
+ by set_precomputed_md_idx() - which is called by parse_value_string().
+ The goal here is to avoid recomputing the metadata for the index when
+ finishing with a header as well as the initial parse. */
+ p->md_for_index.payload = 0;
+#ifndef NDEBUG
+ /* In debug mode, this ensures that the cached metadata we're reading is in
+ * fact correct for the index we are examining. */
+ p->precomputed_md_index = -1;
+#endif
p->dynamic_table_update_allowed = 2;
p->last_error = GRPC_ERROR_NONE;
}
diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.h b/src/core/ext/transport/chttp2/transport/hpack_parser.h
index 3dc8e13..c569124 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_parser.h
+++ b/src/core/ext/transport/chttp2/transport/hpack_parser.h
@@ -69,6 +69,14 @@
grpc_chttp2_hpack_parser_string value;
/* parsed index */
uint32_t index;
+ /* When we parse a value string, we determine the metadata element for a
+ specific index, which we need again when we're finishing up with that
+ header. To avoid calculating the metadata element for that index a second
+ time at that stage, we cache (and invalidate) the element here. */
+ grpc_mdelem md_for_index;
+#ifndef NDEBUG
+ int64_t precomputed_md_index;
+#endif
/* length of source bytes for the currently parsing string */
uint32_t strlen;
/* number of source bytes read for the currently parsing string */
diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc
index 184ba17..b9f6587 100644
--- a/src/core/lib/channel/channelz.cc
+++ b/src/core/lib/channel/channelz.cc
@@ -309,31 +309,42 @@
ServerNode::~ServerNode() {}
+void ServerNode::AddChildSocket(RefCountedPtr<SocketNode> node) {
+ MutexLock lock(&child_mu_);
+ child_sockets_.insert(MakePair(node->uuid(), std::move(node)));
+}
+
+void ServerNode::RemoveChildSocket(intptr_t child_uuid) {
+ MutexLock lock(&child_mu_);
+ child_sockets_.erase(child_uuid);
+}
+
char* ServerNode::RenderServerSockets(intptr_t start_socket_id,
intptr_t max_results) {
- // if user does not set max_results, we choose 500.
+ // If user does not set max_results, we choose 500.
size_t pagination_limit = max_results == 0 ? 500 : max_results;
grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
grpc_json* json = top_level_json;
grpc_json* json_iterator = nullptr;
- ChildSocketsList socket_refs;
- grpc_server_populate_server_sockets(server_, &socket_refs, start_socket_id);
- // declared early so it can be used outside of the loop.
- size_t i = 0;
- if (!socket_refs.empty()) {
- // create list of socket refs
+ MutexLock lock(&child_mu_);
+ size_t sockets_rendered = 0;
+ if (!child_sockets_.empty()) {
+ // Create list of socket refs
grpc_json* array_parent = grpc_json_create_child(
nullptr, json, "socketRef", nullptr, GRPC_JSON_ARRAY, false);
- for (i = 0; i < GPR_MIN(socket_refs.size(), pagination_limit); ++i) {
+ const size_t limit = GPR_MIN(child_sockets_.size(), pagination_limit);
+ for (auto it = child_sockets_.lower_bound(start_socket_id);
+ it != child_sockets_.end() && sockets_rendered < limit;
+ ++it, ++sockets_rendered) {
grpc_json* socket_ref_json = grpc_json_create_child(
nullptr, array_parent, nullptr, nullptr, GRPC_JSON_OBJECT, false);
json_iterator = grpc_json_add_number_string_child(
- socket_ref_json, nullptr, "socketId", socket_refs[i]->uuid());
+ socket_ref_json, nullptr, "socketId", it->first);
grpc_json_create_child(json_iterator, socket_ref_json, "name",
- socket_refs[i]->remote(), GRPC_JSON_STRING, false);
+ it->second->remote(), GRPC_JSON_STRING, false);
}
}
- if (i == socket_refs.size()) {
+ if (sockets_rendered == child_sockets_.size()) {
json_iterator = grpc_json_create_child(nullptr, json, "end", nullptr,
GRPC_JSON_TRUE, false);
}
diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h
index d268962..a02cb82 100644
--- a/src/core/lib/channel/channelz.h
+++ b/src/core/lib/channel/channelz.h
@@ -64,7 +64,6 @@
typedef InlinedVector<intptr_t, 10> ChildRefsList;
class SocketNode;
-typedef InlinedVector<RefCountedPtr<SocketNode>, 10> ChildSocketsList;
namespace testing {
class CallCountingHelperPeer;
@@ -207,12 +206,16 @@
class ServerNode : public BaseNode {
public:
ServerNode(grpc_server* server, size_t channel_tracer_max_nodes);
+
~ServerNode() override;
grpc_json* RenderJson() override;
- char* RenderServerSockets(intptr_t start_socket_id,
- intptr_t pagination_limit);
+ char* RenderServerSockets(intptr_t start_socket_id, intptr_t max_results);
+
+ void AddChildSocket(RefCountedPtr<SocketNode>);
+
+ void RemoveChildSocket(intptr_t child_uuid);
// proxy methods to composed classes.
void AddTraceEvent(ChannelTrace::Severity severity, const grpc_slice& data) {
@@ -232,6 +235,8 @@
grpc_server* server_;
CallCountingHelper call_counter_;
ChannelTrace trace_;
+ Mutex child_mu_; // Guards child map below.
+ Map<intptr_t, RefCountedPtr<SocketNode>> child_sockets_;
};
// Handles channelz bookkeeping for sockets
diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc
index d78ac2d..02e4da1 100644
--- a/src/core/lib/iomgr/ev_posix.cc
+++ b/src/core/lib/iomgr/ev_posix.cc
@@ -206,7 +206,8 @@
GPR_ASSERT(false);
}
-/* Call this only after calling grpc_event_engine_init() */
+/*If grpc_event_engine_init() has been called, returns the poll_strategy_name.
+ * Otherwise, returns nullptr. */
const char* grpc_get_poll_strategy_name() { return g_poll_strategy_name; }
void grpc_event_engine_init(void) {
diff --git a/src/core/lib/iomgr/fork_posix.cc b/src/core/lib/iomgr/fork_posix.cc
index 629b081..e678b4f 100644
--- a/src/core/lib/iomgr/fork_posix.cc
+++ b/src/core/lib/iomgr/fork_posix.cc
@@ -59,8 +59,10 @@
"environment variable GRPC_ENABLE_FORK_SUPPORT=1");
return;
}
- if (strcmp(grpc_get_poll_strategy_name(), "epoll1") != 0 &&
- strcmp(grpc_get_poll_strategy_name(), "poll") != 0) {
+ const char* poll_strategy_name = grpc_get_poll_strategy_name();
+ if (poll_strategy_name == nullptr ||
+ (strcmp(poll_strategy_name, "epoll1") != 0 &&
+ strcmp(poll_strategy_name, "poll") != 0)) {
gpr_log(GPR_INFO,
"Fork support is only compatible with the epoll1 and poll polling "
"strategies");
diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc
index 6cd8600..a1c7d13 100644
--- a/src/core/lib/surface/server.cc
+++ b/src/core/lib/surface/server.cc
@@ -31,6 +31,7 @@
#include <utility>
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/gpr/mpscq.h"
@@ -111,7 +112,7 @@
uint32_t registered_method_max_probes;
grpc_closure finish_destroy_channel_closure;
grpc_closure channel_connectivity_changed;
- grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> socket_node;
+ intptr_t channelz_socket_uuid;
};
typedef struct shutdown_tag {
@@ -941,7 +942,6 @@
static void destroy_channel_elem(grpc_channel_element* elem) {
size_t i;
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- chand->socket_node.reset();
if (chand->registered_methods) {
for (i = 0; i < chand->registered_method_slots; i++) {
grpc_slice_unref_internal(chand->registered_methods[i].method);
@@ -952,6 +952,11 @@
gpr_free(chand->registered_methods);
}
if (chand->server) {
+ if (chand->server->channelz_server != nullptr &&
+ chand->channelz_socket_uuid != 0) {
+ chand->server->channelz_server->RemoveChildSocket(
+ chand->channelz_socket_uuid);
+ }
gpr_mu_lock(&chand->server->mu_global);
chand->next->prev = chand->prev;
chand->prev->next = chand->next;
@@ -1144,7 +1149,8 @@
void grpc_server_setup_transport(
grpc_server* s, grpc_transport* transport, grpc_pollset* accepting_pollset,
const grpc_channel_args* args,
- grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> socket_node,
+ const grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>&
+ socket_node,
grpc_resource_user* resource_user) {
size_t num_registered_methods;
size_t alloc;
@@ -1166,7 +1172,12 @@
chand->server = s;
server_ref(s);
chand->channel = channel;
- chand->socket_node = std::move(socket_node);
+ if (socket_node != nullptr) {
+ chand->channelz_socket_uuid = socket_node->uuid();
+ s->channelz_server->AddChildSocket(socket_node);
+ } else {
+ chand->channelz_socket_uuid = 0;
+ }
size_t cq_idx;
for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) {
@@ -1241,19 +1252,6 @@
grpc_transport_perform_op(transport, op);
}
-void grpc_server_populate_server_sockets(
- grpc_server* s, grpc_core::channelz::ChildSocketsList* server_sockets,
- intptr_t start_idx) {
- gpr_mu_lock(&s->mu_global);
- channel_data* c = nullptr;
- for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
- if (c->socket_node != nullptr && c->socket_node->uuid() >= start_idx) {
- server_sockets->push_back(c->socket_node);
- }
- }
- gpr_mu_unlock(&s->mu_global);
-}
-
void grpc_server_populate_listen_sockets(
grpc_server* server, grpc_core::channelz::ChildRefsList* listen_sockets) {
gpr_mu_lock(&server->mu_global);
diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h
index 393bb24..926a582 100644
--- a/src/core/lib/surface/server.h
+++ b/src/core/lib/surface/server.h
@@ -47,14 +47,10 @@
void grpc_server_setup_transport(
grpc_server* server, grpc_transport* transport,
grpc_pollset* accepting_pollset, const grpc_channel_args* args,
- grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> socket_node,
+ const grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>&
+ socket_node,
grpc_resource_user* resource_user = nullptr);
-/* fills in the uuids of all sockets used for connections on this server */
-void grpc_server_populate_server_sockets(
- grpc_server* server, grpc_core::channelz::ChildSocketsList* server_sockets,
- intptr_t start_idx);
-
/* fills in the uuids of all listen sockets on this server */
void grpc_server_populate_listen_sockets(
grpc_server* server, grpc_core::channelz::ChildRefsList* listen_sockets);