| // |
| // |
| // Copyright 2025 gRPC authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| // |
| // |
| |
| #include <grpcpp/ext/otel_plugin.h> |
| #include <grpcpp/grpcpp.h> |
| |
| #include "opentelemetry/exporters/memory/in_memory_span_exporter_factory.h" |
| #include "opentelemetry/sdk/trace/simple_processor_factory.h" |
| #include "opentelemetry/sdk/trace/tracer.h" |
| #include "opentelemetry/sdk/trace/tracer_provider.h" |
| #include "src/core/config/core_configuration.h" |
| #include "src/core/ext/transport/chttp2/transport/internal.h" |
| #include "src/core/lib/event_engine/posix_engine/event_poller.h" |
| #include "gmock/gmock.h" |
| #include "gtest/gtest.h" |
| #include "absl/synchronization/notification.h" |
| #ifdef GRPC_POSIX_SOCKET_TCP |
| #include "src/core/lib/event_engine/posix_engine/event_poller_posix_default.h" |
| #endif // GRPC_POSIX_SOCKET_TCP |
| #include "src/core/telemetry/call_tracer.h" |
| #include "src/core/util/host_port.h" |
| #include "src/cpp/ext/otel/otel_plugin.h" |
| #include "test/core/test_util/fail_first_call_filter.h" |
| #include "test/core/test_util/fake_stats_plugin.h" |
| #include "test/core/test_util/port.h" |
| #include "test/cpp/end2end/test_service_impl.h" |
| |
| namespace grpc { |
| namespace testing { |
| namespace { |
| |
| using opentelemetry::sdk::trace::SpanData; |
| using opentelemetry::sdk::trace::SpanDataEvent; |
| using ::testing::ElementsAre; |
| using ::testing::FieldsAre; |
| using ::testing::Lt; |
| using ::testing::MatchesRegex; |
| using ::testing::Pair; |
| using ::testing::StrEq; |
| using ::testing::UnorderedElementsAre; |
| using ::testing::VariantWith; |
| |
| class OTelTracingTest : public ::testing::Test { |
| protected: |
| virtual absl::Status BuildAndRegisterOpenTelemetryPlugin( |
| std::shared_ptr<opentelemetry::sdk::trace::TracerProvider> |
| tracer_provider) { |
| return OpenTelemetryPluginBuilder() |
| .SetTracerProvider(std::move(tracer_provider)) |
| .SetTextMapPropagator( |
| OpenTelemetryPluginBuilder::MakeGrpcTraceBinTextMapPropagator()) |
| .BuildAndRegisterGlobal(); |
| } |
| |
| void SetUp() override { |
| grpc_init(); |
| data_ = |
| std::make_shared<opentelemetry::exporter::memory::InMemorySpanData>(10); |
| // Register OTel plugin for tracing with an in memory exporter |
| auto tracer_provider = |
| std::make_shared<opentelemetry::sdk::trace::TracerProvider>( |
| opentelemetry::sdk::trace::SimpleSpanProcessorFactory::Create( |
| opentelemetry::exporter::memory::InMemorySpanExporterFactory:: |
| Create(data_))); |
| tracer_ = tracer_provider->GetTracer("grpc-test"); |
| ASSERT_TRUE( |
| BuildAndRegisterOpenTelemetryPlugin(std::move(tracer_provider)).ok()); |
| port_ = grpc_pick_unused_port_or_die(); |
| server_address_ = absl::StrCat("localhost:", port_); |
| RestartServer(); |
| auto channel = grpc::CreateChannel(server_address_, |
| grpc::InsecureChannelCredentials()); |
| stub_ = EchoTestService::NewStub(channel); |
| } |
| |
| void RestartServer() { |
| if (server_ != nullptr) { |
| server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0)); |
| } |
| grpc::ServerBuilder builder; |
| // Use IPv4 here because it's less flaky than IPv6 ("[::]:0") on Travis. |
| builder.AddListeningPort(grpc_core::JoinHostPort("0.0.0.0", port_), |
| grpc::InsecureServerCredentials(), nullptr); |
| // Allow only one stream at a time. |
| builder.AddChannelArgument(GRPC_ARG_MAX_CONCURRENT_STREAMS, 1); |
| builder.AddChannelArgument( |
| GRPC_ARG_MAX_CONCURRENT_STREAMS_OVERLOAD_PROTECTION, false); |
| builder.RegisterService(&service_); |
| server_ = builder.BuildAndStart(); |
| } |
| |
| void TearDown() override { |
| server_->Shutdown(); |
| grpc_shutdown_blocking(); |
| grpc_core::ServerCallTracerFactory::TestOnlyReset(); |
| grpc_core::GlobalStatsPluginRegistryTestPeer:: |
| ResetGlobalStatsPluginRegistry(); |
| grpc_core::CoreConfiguration::Reset(); |
| } |
| |
| void SendRPC(EchoTestService::Stub* stub) { |
| EchoRequest request; |
| request.set_message("foo"); |
| EchoResponse response; |
| grpc::ClientContext context; |
| grpc::Status status = stub->Echo(&context, request, &response); |
| } |
| |
| // Waits for \a timeout for \a expected_size number of spans and returns them. |
| std::vector<std::unique_ptr<opentelemetry::sdk::trace::SpanData>> GetSpans( |
| size_t expected_size, absl::Duration timeout = absl::Seconds(10)) { |
| absl::Time start_time = absl::Now(); |
| std::vector<std::unique_ptr<opentelemetry::sdk::trace::SpanData>> spans; |
| do { |
| auto current_spans = data_->GetSpans(); |
| spans.insert(spans.end(), std::make_move_iterator(current_spans.begin()), |
| std::make_move_iterator(current_spans.end())); |
| if ((spans.size() >= expected_size) || |
| (absl::Now() - start_time > timeout)) { |
| break; |
| } |
| std::this_thread::yield(); |
| } while (true); |
| return spans; |
| } |
| |
| opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer> tracer_; |
| std::shared_ptr<opentelemetry::exporter::memory::InMemorySpanData> data_; |
| CallbackTestServiceImpl service_; |
| int port_; |
| std::string server_address_; |
| std::unique_ptr<grpc::Server> server_; |
| std::unique_ptr<EchoTestService::Stub> stub_; |
| }; |
| |
| TEST_F(OTelTracingTest, Basic) { |
| SendRPC(stub_.get()); |
| auto spans = GetSpans(3); |
| SpanData* client_span; |
| SpanData* attempt_span; |
| SpanData* server_span; |
| // Verify that we get 3 spans - |
| // 1) Client RPC Span - Sent.grpc.testing.EchoTestService/Echo |
| // 2) Attempt Span - Attempt.grpc.testing.EchoTestService/Echo |
| // 3) Server RPC Span - Recv.grpc.testing.EchoTestService/Echo |
| EXPECT_EQ(spans.size(), 3); |
| for (const auto& span : spans) { |
| EXPECT_TRUE(span->GetSpanContext().IsValid()); |
| if (span->GetName() == "Attempt.grpc.testing.EchoTestService/Echo") { |
| attempt_span = span.get(); |
| EXPECT_THAT(span->GetAttributes(), |
| UnorderedElementsAre( |
| Pair("transparent-retry", VariantWith<bool>(false)), |
| Pair("previous-rpc-attempts", VariantWith<uint64_t>(0)))); |
| // Verify outbound message event |
| const auto outbound_message_event = |
| std::find_if(span->GetEvents().begin(), span->GetEvents().end(), |
| [](const SpanDataEvent& event) { |
| return event.GetName() == "Outbound message"; |
| }); |
| ASSERT_NE(outbound_message_event, span->GetEvents().end()); |
| EXPECT_THAT(outbound_message_event->GetAttributes(), |
| UnorderedElementsAre( |
| Pair("sequence-number", VariantWith<uint64_t>(0)), |
| Pair("message-size", VariantWith<uint64_t>(5)))); |
| // Verify inbound message event |
| const auto inbound_message_event = |
| std::find_if(span->GetEvents().begin(), span->GetEvents().end(), |
| [](const SpanDataEvent& event) { |
| return event.GetName() == "Inbound message"; |
| }); |
| ASSERT_NE(inbound_message_event, span->GetEvents().end()); |
| EXPECT_THAT(inbound_message_event->GetAttributes(), |
| UnorderedElementsAre( |
| Pair("sequence-number", VariantWith<uint64_t>(0)), |
| Pair("message-size", VariantWith<uint64_t>(5)))); |
| EXPECT_EQ(span->GetStatus(), opentelemetry::trace::StatusCode::kOk); |
| } else if (span->GetName() == "Recv.grpc.testing.EchoTestService/Echo") { |
| server_span = span.get(); |
| // Verify outbound message event |
| const auto outbound_message_event = |
| std::find_if(span->GetEvents().begin(), span->GetEvents().end(), |
| [](const SpanDataEvent& event) { |
| return event.GetName() == "Outbound message"; |
| }); |
| ASSERT_NE(outbound_message_event, span->GetEvents().end()); |
| EXPECT_THAT(outbound_message_event->GetAttributes(), |
| UnorderedElementsAre( |
| Pair("sequence-number", VariantWith<uint64_t>(0)), |
| Pair("message-size", VariantWith<uint64_t>(5)))); |
| // Verify inbound message event |
| const auto inbound_message_event = |
| std::find_if(span->GetEvents().begin(), span->GetEvents().end(), |
| [](const SpanDataEvent& event) { |
| return event.GetName() == "Inbound message"; |
| }); |
| ASSERT_NE(inbound_message_event, span->GetEvents().end()); |
| EXPECT_THAT(inbound_message_event->GetAttributes(), |
| UnorderedElementsAre( |
| Pair("sequence-number", VariantWith<uint64_t>(0)), |
| Pair("message-size", VariantWith<uint64_t>(5)))); |
| EXPECT_EQ(span->GetStatus(), opentelemetry::trace::StatusCode::kOk); |
| } else { |
| client_span = span.get(); |
| EXPECT_EQ(span->GetName(), "Sent.grpc.testing.EchoTestService/Echo"); |
| } |
| } |
| // Check parent-child relationship |
| EXPECT_EQ(client_span->GetTraceId(), attempt_span->GetTraceId()); |
| EXPECT_EQ(attempt_span->GetParentSpanId(), client_span->GetSpanId()); |
| EXPECT_EQ(attempt_span->GetTraceId(), server_span->GetTraceId()); |
| EXPECT_EQ(server_span->GetParentSpanId(), attempt_span->GetSpanId()); |
| } |
| |
| TEST_F(OTelTracingTest, TestApplicationContextFlows) { |
| { |
| auto span = tracer_->StartSpan("TestSpan"); |
| auto scope = opentelemetry::sdk::trace::Tracer::WithActiveSpan(span); |
| SendRPC(stub_.get()); |
| } |
| auto spans = GetSpans(4); |
| EXPECT_EQ(spans.size(), 4); |
| const auto test_span = std::find_if( |
| spans.begin(), spans.end(), [](const std::unique_ptr<SpanData>& span) { |
| return span->GetName() == "TestSpan"; |
| }); |
| ASSERT_NE(test_span, spans.end()); |
| const auto client_span = std::find_if( |
| spans.begin(), spans.end(), [](const std::unique_ptr<SpanData>& span) { |
| return span->GetName() == "Sent.grpc.testing.EchoTestService/Echo"; |
| }); |
| ASSERT_NE(test_span, spans.end()); |
| EXPECT_EQ((*test_span)->GetTraceId(), (*client_span)->GetTraceId()); |
| EXPECT_EQ((*client_span)->GetParentSpanId(), (*test_span)->GetSpanId()); |
| } |
| |
| TEST_F(OTelTracingTest, MessageEventsWithoutCompression) { |
| { |
| EchoRequest request; |
| request.set_message("AAAAAAAAAAAAAAAAAAAAAAAAAAAAA"); |
| EchoResponse response; |
| grpc::ClientContext context; |
| grpc::Status status = stub_->Echo(&context, request, &response); |
| } |
| auto spans = GetSpans(3); |
| EXPECT_EQ(spans.size(), 3); |
| const auto attempt_span = std::find_if( |
| spans.begin(), spans.end(), [](const std::unique_ptr<SpanData>& span) { |
| return span->GetName() == "Attempt.grpc.testing.EchoTestService/Echo"; |
| }); |
| ASSERT_NE(attempt_span, spans.end()); |
| // Verify outbound message on the attempt |
| auto outbound_message_event = std::find_if( |
| (*attempt_span)->GetEvents().begin(), (*attempt_span)->GetEvents().end(), |
| [](const SpanDataEvent& event) { |
| return event.GetName() == "Outbound message"; |
| }); |
| ASSERT_NE(outbound_message_event, (*attempt_span)->GetEvents().end()); |
| EXPECT_THAT( |
| outbound_message_event->GetAttributes(), |
| UnorderedElementsAre(Pair("sequence-number", VariantWith<uint64_t>(0)), |
| Pair("message-size", VariantWith<uint64_t>(31)))); |
| // Verify inbound message on the attempt |
| auto inbound_message_event = std::find_if( |
| (*attempt_span)->GetEvents().begin(), (*attempt_span)->GetEvents().end(), |
| [](const SpanDataEvent& event) { |
| return event.GetName() == "Inbound message"; |
| }); |
| ASSERT_NE(inbound_message_event, (*attempt_span)->GetEvents().end()); |
| EXPECT_THAT( |
| inbound_message_event->GetAttributes(), |
| UnorderedElementsAre(Pair("sequence-number", VariantWith<uint64_t>(0)), |
| Pair("message-size", VariantWith<uint64_t>(31)))); |
| const auto server_span = std::find_if( |
| spans.begin(), spans.end(), [](const std::unique_ptr<SpanData>& span) { |
| return span->GetName() == "Recv.grpc.testing.EchoTestService/Echo"; |
| }); |
| ASSERT_NE(server_span, spans.end()); |
| // Verify inbound messages on the server |
| inbound_message_event = std::find_if( |
| (*server_span)->GetEvents().begin(), (*server_span)->GetEvents().end(), |
| [](const SpanDataEvent& event) { |
| return event.GetName() == "Inbound message"; |
| }); |
| ASSERT_NE(inbound_message_event, (*server_span)->GetEvents().end()); |
| EXPECT_THAT( |
| inbound_message_event->GetAttributes(), |
| UnorderedElementsAre(Pair("sequence-number", VariantWith<uint64_t>(0)), |
| Pair("message-size", VariantWith<uint64_t>(31)))); |
| // Verify outbound messages on the server |
| outbound_message_event = std::find_if( |
| (*server_span)->GetEvents().begin(), (*server_span)->GetEvents().end(), |
| [](const SpanDataEvent& event) { |
| return event.GetName() == "Outbound message"; |
| }); |
| ASSERT_NE(outbound_message_event, (*server_span)->GetEvents().end()); |
| EXPECT_THAT( |
| outbound_message_event->GetAttributes(), |
| UnorderedElementsAre(Pair("sequence-number", VariantWith<uint64_t>(0)), |
| Pair("message-size", VariantWith<uint64_t>(31)))); |
| } |
| |
| TEST_F(OTelTracingTest, CompressionMessageEvents) { |
| { |
| EchoRequest request; |
| request.set_message("AAAAAAAAAAAAAAAAAAAAAAAAAAAAA"); |
| request.mutable_param()->set_compression_algorithm(RequestParams::GZIP); |
| EchoResponse response; |
| grpc::ClientContext context; |
| context.set_compression_algorithm(GRPC_COMPRESS_GZIP); |
| grpc::Status status = stub_->Echo(&context, request, &response); |
| } |
| auto spans = GetSpans(3); |
| EXPECT_EQ(spans.size(), 3); |
| const auto attempt_span = std::find_if( |
| spans.begin(), spans.end(), [](const std::unique_ptr<SpanData>& span) { |
| return span->GetName() == "Attempt.grpc.testing.EchoTestService/Echo"; |
| }); |
| ASSERT_NE(attempt_span, spans.end()); |
| // Verify outbound messages on the attempt |
| auto outbound_message_event = std::find_if( |
| (*attempt_span)->GetEvents().begin(), (*attempt_span)->GetEvents().end(), |
| [](const SpanDataEvent& event) { |
| return event.GetName() == "Outbound message"; |
| }); |
| ASSERT_NE(outbound_message_event, (*attempt_span)->GetEvents().end()); |
| EXPECT_THAT( |
| outbound_message_event->GetAttributes(), |
| UnorderedElementsAre(Pair("sequence-number", VariantWith<uint64_t>(0)), |
| Pair("message-size", VariantWith<uint64_t>(36)))); |
| auto outbound_message_compressed_event = std::find_if( |
| (*attempt_span)->GetEvents().begin(), (*attempt_span)->GetEvents().end(), |
| [](const SpanDataEvent& event) { |
| return event.GetName() == "Outbound message compressed"; |
| }); |
| ASSERT_NE(outbound_message_compressed_event, |
| (*attempt_span)->GetEvents().end()); |
| EXPECT_THAT( |
| outbound_message_compressed_event->GetAttributes(), |
| UnorderedElementsAre( |
| Pair("sequence-number", VariantWith<uint64_t>(0)), |
| Pair("message-size-compressed", VariantWith<uint64_t>(Lt(36))))); |
| // Verify inbound messages on the attempt |
| auto inbound_message_event = std::find_if( |
| (*attempt_span)->GetEvents().begin(), (*attempt_span)->GetEvents().end(), |
| [](const SpanDataEvent& event) { |
| return event.GetName() == "Inbound compressed message"; |
| }); |
| ASSERT_NE(inbound_message_event, (*attempt_span)->GetEvents().end()); |
| EXPECT_THAT( |
| inbound_message_event->GetAttributes(), |
| UnorderedElementsAre( |
| Pair("sequence-number", VariantWith<uint64_t>(0)), |
| Pair("message-size-compressed", VariantWith<uint64_t>(Lt(31))))); |
| auto inbound_message_decompressed_event = std::find_if( |
| (*attempt_span)->GetEvents().begin(), (*attempt_span)->GetEvents().end(), |
| [](const SpanDataEvent& event) { |
| return event.GetName() == "Inbound message"; |
| }); |
| ASSERT_NE(inbound_message_decompressed_event, |
| (*attempt_span)->GetEvents().end()); |
| EXPECT_THAT( |
| inbound_message_decompressed_event->GetAttributes(), |
| UnorderedElementsAre(Pair("sequence-number", VariantWith<uint64_t>(0)), |
| Pair("message-size", VariantWith<uint64_t>(31)))); |
| const auto server_span = std::find_if( |
| spans.begin(), spans.end(), [](const std::unique_ptr<SpanData>& span) { |
| return span->GetName() == "Recv.grpc.testing.EchoTestService/Echo"; |
| }); |
| ASSERT_NE(server_span, spans.end()); |
| // Verify inbound messages on the server |
| inbound_message_event = std::find_if( |
| (*server_span)->GetEvents().begin(), (*server_span)->GetEvents().end(), |
| [](const SpanDataEvent& event) { |
| return event.GetName() == "Inbound compressed message"; |
| }); |
| ASSERT_NE(inbound_message_event, (*server_span)->GetEvents().end()); |
| EXPECT_THAT( |
| inbound_message_event->GetAttributes(), |
| UnorderedElementsAre( |
| Pair("sequence-number", VariantWith<uint64_t>(0)), |
| Pair("message-size-compressed", VariantWith<uint64_t>(Lt(36))))); |
| inbound_message_decompressed_event = std::find_if( |
| (*server_span)->GetEvents().begin(), (*server_span)->GetEvents().end(), |
| [](const SpanDataEvent& event) { |
| return event.GetName() == "Inbound message"; |
| }); |
| ASSERT_NE(inbound_message_decompressed_event, |
| (*server_span)->GetEvents().end()); |
| EXPECT_THAT( |
| inbound_message_decompressed_event->GetAttributes(), |
| UnorderedElementsAre(Pair("sequence-number", VariantWith<uint64_t>(0)), |
| Pair("message-size", VariantWith<uint64_t>(36)))); |
| // Verify outbound messages on the server |
| outbound_message_event = std::find_if( |
| (*server_span)->GetEvents().begin(), (*server_span)->GetEvents().end(), |
| [](const SpanDataEvent& event) { |
| return event.GetName() == "Outbound message"; |
| }); |
| ASSERT_NE(outbound_message_event, (*server_span)->GetEvents().end()); |
| EXPECT_THAT( |
| outbound_message_event->GetAttributes(), |
| UnorderedElementsAre(Pair("sequence-number", VariantWith<uint64_t>(0)), |
| Pair("message-size", VariantWith<uint64_t>(31)))); |
| outbound_message_compressed_event = std::find_if( |
| (*server_span)->GetEvents().begin(), (*server_span)->GetEvents().end(), |
| [](const SpanDataEvent& event) { |
| return event.GetName() == "Outbound message compressed"; |
| }); |
| ASSERT_NE(outbound_message_compressed_event, |
| (*server_span)->GetEvents().end()); |
| EXPECT_THAT( |
| outbound_message_compressed_event->GetAttributes(), |
| UnorderedElementsAre( |
| Pair("sequence-number", VariantWith<uint64_t>(0)), |
| Pair("message-size-compressed", VariantWith<uint64_t>(Lt(31))))); |
| } |
| |
| TEST_F(OTelTracingTest, FailedStatus) { |
| { |
| EchoRequest request; |
| request.set_message("foo"); |
| request.mutable_param()->mutable_expected_error()->set_code( |
| grpc::StatusCode::UNAVAILABLE); |
| request.mutable_param()->mutable_expected_error()->set_error_message( |
| "test message"); |
| EchoResponse response; |
| grpc::ClientContext context; |
| context.set_compression_algorithm(GRPC_COMPRESS_GZIP); |
| grpc::Status status = stub_->Echo(&context, request, &response); |
| } |
| auto spans = GetSpans(3); |
| EXPECT_EQ(spans.size(), 3); |
| const auto attempt_span = std::find_if( |
| spans.begin(), spans.end(), [](const std::unique_ptr<SpanData>& span) { |
| return span->GetName() == "Attempt.grpc.testing.EchoTestService/Echo"; |
| }); |
| ASSERT_NE(attempt_span, spans.end()); |
| EXPECT_EQ((*attempt_span)->GetStatus(), |
| opentelemetry::trace::StatusCode::kError); |
| EXPECT_THAT((*attempt_span)->GetDescription(), |
| MatchesRegex("UNAVAILABLE:.*test message.*")); |
| const auto server_span = std::find_if( |
| spans.begin(), spans.end(), [](const std::unique_ptr<SpanData>& span) { |
| return span->GetName() == "Recv.grpc.testing.EchoTestService/Echo"; |
| }); |
| ASSERT_NE(server_span, spans.end()); |
| EXPECT_EQ((*server_span)->GetStatus(), |
| opentelemetry::trace::StatusCode::kError); |
| EXPECT_THAT((*server_span)->GetDescription(), |
| MatchesRegex("UNAVAILABLE:.*test message.*")); |
| } |
| |
| TEST_F(OTelTracingTest, Streaming) { |
| { |
| EchoRequest request; |
| request.set_message("foo"); |
| EchoResponse response; |
| grpc::ClientContext context; |
| auto stream = stub_->BidiStream(&context); |
| for (int i = 0; i < 10; ++i) { |
| EXPECT_TRUE(stream->Write(request)); |
| EXPECT_TRUE(stream->Read(&response)); |
| } |
| stream->WritesDone(); |
| auto status = stream->Finish(); |
| EXPECT_TRUE(status.ok()) << "code=" << status.error_code() |
| << " message=" << status.error_message(); |
| } |
| auto spans = GetSpans(3); |
| EXPECT_EQ(spans.size(), 3); |
| const auto attempt_span = std::find_if( |
| spans.begin(), spans.end(), [](const std::unique_ptr<SpanData>& span) { |
| return span->GetName() == |
| "Attempt.grpc.testing.EchoTestService/BidiStream"; |
| }); |
| ASSERT_NE(attempt_span, spans.end()); |
| // Verify messages on the attempt span |
| std::vector<uint64_t> outbound_seq_nums; |
| std::vector<uint64_t> inbound_seq_nums; |
| for (const auto& event : (*attempt_span)->GetEvents()) { |
| if (event.GetName() == "Outbound message") { |
| outbound_seq_nums.push_back( |
| std::get<uint64_t>(event.GetAttributes().at("sequence-number"))); |
| } |
| if (event.GetName() == "Inbound message") { |
| inbound_seq_nums.push_back( |
| std::get<uint64_t>(event.GetAttributes().at("sequence-number"))); |
| } |
| } |
| EXPECT_THAT(outbound_seq_nums, ElementsAre(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)); |
| EXPECT_THAT(inbound_seq_nums, ElementsAre(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)); |
| const auto server_span = std::find_if( |
| spans.begin(), spans.end(), [](const std::unique_ptr<SpanData>& span) { |
| return span->GetName() == |
| "Recv.grpc.testing.EchoTestService/BidiStream"; |
| }); |
| outbound_seq_nums.clear(); |
| inbound_seq_nums.clear(); |
| // Verify messages on the server span |
| for (const auto& event : (*server_span)->GetEvents()) { |
| if (event.GetName() == "Outbound message") { |
| outbound_seq_nums.push_back( |
| std::get<uint64_t>(event.GetAttributes().at("sequence-number"))); |
| } |
| if (event.GetName() == "Inbound message") { |
| inbound_seq_nums.push_back( |
| std::get<uint64_t>(event.GetAttributes().at("sequence-number"))); |
| } |
| } |
| EXPECT_THAT(outbound_seq_nums, ElementsAre(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)); |
| EXPECT_THAT(inbound_seq_nums, ElementsAre(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)); |
| ASSERT_NE(server_span, spans.end()); |
| } |
| |
| TEST_F(OTelTracingTest, Retries) { |
| { |
| ChannelArguments args; |
| args.SetString(GRPC_ARG_SERVICE_CONFIG, |
| "{\n" |
| " \"methodConfig\": [ {\n" |
| " \"name\": [\n" |
| " { \"service\": \"grpc.testing.EchoTestService\" }\n" |
| " ],\n" |
| " \"retryPolicy\": {\n" |
| " \"maxAttempts\": 3,\n" |
| " \"initialBackoff\": \"0.1s\",\n" |
| " \"maxBackoff\": \"120s\",\n" |
| " \"backoffMultiplier\": 1,\n" |
| " \"retryableStatusCodes\": [ \"ABORTED\" ]\n" |
| " }\n" |
| " } ]\n" |
| "}"); |
| auto channel = CreateCustomChannel(server_address_, |
| InsecureChannelCredentials(), args); |
| auto stub = EchoTestService::NewStub(channel); |
| EchoRequest request; |
| request.set_message("foo"); |
| request.mutable_param()->mutable_expected_error()->set_code( |
| StatusCode::ABORTED); |
| EchoResponse response; |
| grpc::ClientContext context; |
| grpc::Status status = stub->Echo(&context, request, &response); |
| } |
| auto spans = GetSpans(7); |
| EXPECT_EQ(spans.size(), 7); // 1 client span, 3 attempt spans, 3 server spans |
| std::vector<uint64_t> attempt_seq_nums; |
| uint64_t server_span_count = 0; |
| for (const auto& span : spans) { |
| if (span->GetName() == "Attempt.grpc.testing.EchoTestService/Echo") { |
| attempt_seq_nums.push_back(std::get<uint64_t>( |
| span->GetAttributes().at("previous-rpc-attempts"))); |
| EXPECT_EQ(std::get<bool>(span->GetAttributes().at("transparent-retry")), |
| false); |
| } else if (span->GetName() == "Recv.grpc.testing.EchoTestService/Echo") { |
| ++server_span_count; |
| } |
| } |
| EXPECT_THAT(attempt_seq_nums, ElementsAre(0, 1, 2)); |
| EXPECT_EQ(server_span_count, 3); |
| } |
| |
| // An Echo Service that propagates an Echo request to another server. |
| class PropagatingEchoTestServiceImpl : public EchoTestService::CallbackService { |
| public: |
| explicit PropagatingEchoTestServiceImpl(EchoTestService::Stub* stub) |
| : stub_(stub) {} |
| |
| ServerUnaryReactor* Echo(CallbackServerContext* context, |
| const EchoRequest* request, |
| EchoResponse* response) override { |
| auto* reactor = context->DefaultReactor(); |
| ClientContext* child_context = |
| ClientContext::FromCallbackServerContext(*context).release(); |
| stub_->async()->Echo(child_context, request, response, |
| [child_context, reactor](Status s) mutable { |
| EXPECT_TRUE(s.ok()) |
| << "code=" << s.error_code() |
| << " message=" << s.error_message(); |
| reactor->Finish(s); |
| delete child_context; |
| }); |
| return reactor; |
| } |
| |
| private: |
| EchoTestService::Stub* const stub_; |
| }; |
| |
| // Tests that spans are propagated from parent call to child call. |
| TEST_F(OTelTracingTest, PropagationParentToChild) { |
| { |
| // Start a propagating echo service that propagates the echo request to the |
| // actual server. |
| grpc::ServerBuilder builder; |
| int port = grpc_pick_unused_port_or_die(); |
| // Use IPv4 here because it's less flaky than IPv6 ("[::]:0") on Travis. |
| builder.AddListeningPort(grpc_core::JoinHostPort("0.0.0.0", port), |
| grpc::InsecureServerCredentials(), nullptr); |
| PropagatingEchoTestServiceImpl service(stub_.get()); |
| builder.RegisterService(&service); |
| auto server = builder.BuildAndStart(); |
| auto channel = grpc::CreateChannel(absl::StrCat("localhost:", port), |
| grpc::InsecureChannelCredentials()); |
| auto stub = EchoTestService::NewStub(channel); |
| auto span = tracer_->StartSpan("TestSpan"); |
| auto scope = opentelemetry::sdk::trace::Tracer::WithActiveSpan(span); |
| SendRPC(stub.get()); |
| } |
| auto spans = |
| GetSpans(7); // test span, client span, attempt span, server span at |
| // propagating echo service, child client span at |
| // propagating echo service, attempt span at propagating |
| // echo service and server span at actual echo service. |
| EXPECT_EQ(spans.size(), 7); |
| const auto test_span = std::find_if( |
| spans.begin(), spans.end(), [&](const std::unique_ptr<SpanData>& span) { |
| return span->GetName() == "TestSpan"; |
| }); |
| ASSERT_NE(test_span, spans.end()); |
| const auto client_span = std::find_if( |
| spans.begin(), spans.end(), [&](const std::unique_ptr<SpanData>& span) { |
| return span->GetName() == "Sent.grpc.testing.EchoTestService/Echo" && |
| span->GetParentSpanId() == (*test_span)->GetSpanId(); |
| }); |
| ASSERT_NE(client_span, spans.end()); |
| EXPECT_EQ((*client_span)->GetTraceId(), (*test_span)->GetTraceId()); |
| const auto attempt_span = std::find_if( |
| spans.begin(), spans.end(), [&](const std::unique_ptr<SpanData>& span) { |
| return span->GetName() == "Attempt.grpc.testing.EchoTestService/Echo" && |
| span->GetParentSpanId() == (*client_span)->GetSpanId(); |
| }); |
| ASSERT_NE(attempt_span, spans.end()); |
| EXPECT_EQ((*attempt_span)->GetTraceId(), (*test_span)->GetTraceId()); |
| const auto propagating_server_span = std::find_if( |
| spans.begin(), spans.end(), [&](const std::unique_ptr<SpanData>& span) { |
| return span->GetName() == "Recv.grpc.testing.EchoTestService/Echo" && |
| span->GetParentSpanId() == (*attempt_span)->GetSpanId(); |
| }); |
| ASSERT_NE(propagating_server_span, spans.end()); |
| EXPECT_EQ((*propagating_server_span)->GetTraceId(), |
| (*test_span)->GetTraceId()); |
| const auto propagating_client_span = std::find_if( |
| spans.begin(), spans.end(), [&](const std::unique_ptr<SpanData>& span) { |
| return span->GetName() == "Sent.grpc.testing.EchoTestService/Echo" && |
| span->GetParentSpanId() == |
| (*propagating_server_span)->GetSpanId(); |
| }); |
| ASSERT_NE(propagating_client_span, spans.end()); |
| EXPECT_EQ((*propagating_client_span)->GetTraceId(), |
| (*test_span)->GetTraceId()); |
| const auto propagating_attempt_span = std::find_if( |
| spans.begin(), spans.end(), [&](const std::unique_ptr<SpanData>& span) { |
| return span->GetName() == "Attempt.grpc.testing.EchoTestService/Echo" && |
| span->GetParentSpanId() == |
| (*propagating_client_span)->GetSpanId(); |
| }); |
| ASSERT_NE(propagating_attempt_span, spans.end()); |
| EXPECT_EQ((*propagating_attempt_span)->GetTraceId(), |
| (*test_span)->GetTraceId()); |
| const auto server_span = std::find_if( |
| spans.begin(), spans.end(), [&](const std::unique_ptr<SpanData>& span) { |
| return span->GetName() == "Recv.grpc.testing.EchoTestService/Echo" && |
| span->GetParentSpanId() == |
| (*propagating_attempt_span)->GetSpanId(); |
| }); |
| ASSERT_NE(server_span, spans.end()); |
| EXPECT_EQ((*server_span)->GetTraceId(), (*test_span)->GetTraceId()); |
| } |
| |
| class OTelTracingTestNoPropagator : public OTelTracingTest { |
| protected: |
| absl::Status BuildAndRegisterOpenTelemetryPlugin( |
| std::shared_ptr<opentelemetry::sdk::trace::TracerProvider> |
| tracer_provider) override { |
| return OpenTelemetryPluginBuilder() |
| .SetTracerProvider(std::move(tracer_provider)) |
| .BuildAndRegisterGlobal(); |
| } |
| }; |
| |
| // Tests that spans are not propagated from parent call to child call when no |
| // propagator is set. |
| TEST_F(OTelTracingTestNoPropagator, PropagationParentToChildWithoutPropagator) { |
| { |
| // Start a propagating echo service that propagates the echo request to the |
| // actual server. |
| grpc::ServerBuilder builder; |
| int port = grpc_pick_unused_port_or_die(); |
| // Use IPv4 here because it's less flaky than IPv6 ("[::]:0") on Travis. |
| builder.AddListeningPort(grpc_core::JoinHostPort("0.0.0.0", port), |
| grpc::InsecureServerCredentials(), nullptr); |
| PropagatingEchoTestServiceImpl service(stub_.get()); |
| builder.RegisterService(&service); |
| auto server = builder.BuildAndStart(); |
| auto channel = grpc::CreateChannel(absl::StrCat("localhost:", port), |
| grpc::InsecureChannelCredentials()); |
| auto stub = EchoTestService::NewStub(channel); |
| auto span = tracer_->StartSpan("TestSpan"); |
| auto scope = opentelemetry::sdk::trace::Tracer::WithActiveSpan(span); |
| SendRPC(stub.get()); |
| } |
| auto spans = GetSpans(7); // test span, client span, attempt span, server |
| // span at propagating echo service, child client |
| // span at propagating echo service, attempt span |
| // at propagating echo service and server span at |
| // actual echo service. |
| EXPECT_EQ(spans.size(), 7); |
| const auto test_span = std::find_if( |
| spans.begin(), spans.end(), [&](const std::unique_ptr<SpanData>& span) { |
| return span->GetName() == "TestSpan"; |
| }); |
| ASSERT_NE(test_span, spans.end()); |
| const auto client_span = std::find_if( |
| spans.begin(), spans.end(), [&](const std::unique_ptr<SpanData>& span) { |
| return span->GetName() == "Sent.grpc.testing.EchoTestService/Echo" && |
| span->GetParentSpanId() == (*test_span)->GetSpanId(); |
| }); |
| ASSERT_NE(client_span, spans.end()); |
| EXPECT_EQ((*client_span)->GetTraceId(), (*test_span)->GetTraceId()); |
| const auto attempt_span = std::find_if( |
| spans.begin(), spans.end(), [&](const std::unique_ptr<SpanData>& span) { |
| return span->GetName() == "Attempt.grpc.testing.EchoTestService/Echo" && |
| span->GetParentSpanId() == (*client_span)->GetSpanId(); |
| }); |
| ASSERT_NE(attempt_span, spans.end()); |
| EXPECT_EQ((*attempt_span)->GetTraceId(), (*test_span)->GetTraceId()); |
| // Without a propagator, both the propagating server span and the final server |
| // span will have an empty parent span ID, but only the former should have a |
| // child (propagating client span). |
| const auto kZeroSpanId = opentelemetry::trace::SpanId(); |
| const auto propagating_client_span = std::find_if( |
| spans.begin(), spans.end(), [&](const std::unique_ptr<SpanData>& span) { |
| return span->GetName() == "Sent.grpc.testing.EchoTestService/Echo" && |
| span->GetTraceId() != (*client_span)->GetTraceId(); |
| }); |
| ASSERT_NE(propagating_client_span, spans.end()); |
| const auto propagating_server_span = std::find_if( |
| spans.begin(), spans.end(), [&](const std::unique_ptr<SpanData>& span) { |
| return span->GetName() == "Recv.grpc.testing.EchoTestService/Echo" && |
| span->GetParentSpanId() == kZeroSpanId && |
| span->GetSpanId() == |
| (*propagating_client_span)->GetParentSpanId(); |
| }); |
| ASSERT_NE(propagating_server_span, spans.end()); |
| EXPECT_NE((*propagating_server_span)->GetTraceId(), |
| (*test_span)->GetTraceId()); |
| EXPECT_EQ((*propagating_client_span)->GetTraceId(), |
| (*propagating_server_span)->GetTraceId()); |
| const auto propagating_attempt_span = std::find_if( |
| spans.begin(), spans.end(), [&](const std::unique_ptr<SpanData>& span) { |
| return span->GetName() == "Attempt.grpc.testing.EchoTestService/Echo" && |
| span->GetParentSpanId() == |
| (*propagating_client_span)->GetSpanId(); |
| }); |
| ASSERT_NE(propagating_attempt_span, spans.end()); |
| EXPECT_EQ((*propagating_attempt_span)->GetTraceId(), |
| (*propagating_client_span)->GetTraceId()); |
| // Without a propagator, the final server span will have an empty parent span |
| // ID and a different trace ID. |
| const auto server_span = std::find_if( |
| spans.begin(), spans.end(), [&](const std::unique_ptr<SpanData>& span) { |
| return span->GetName() == "Recv.grpc.testing.EchoTestService/Echo" && |
| span->GetParentSpanId() == kZeroSpanId && |
| span->GetTraceId() != (*propagating_server_span)->GetTraceId(); |
| }); |
| ASSERT_NE(server_span, spans.end()); |
| EXPECT_NE((*server_span)->GetTraceId(), (*test_span)->GetTraceId()); |
| } |
| |
| #ifdef GRPC_LINUX_ERRQUEUE |
| // Test presence of TCP write annotations |
| TEST_F(OTelTracingTest, TcpWriteAnnotations) { |
| if (!grpc_event_engine::experimental::MakeDefaultPoller( |
| /*thread_pool=*/nullptr) |
| ->CanTrackErrors()) { |
| GTEST_SKIP() << "Test disabled if poller doesn't support errqueue"; |
| } |
| |
| SendRPC(stub_.get()); |
| auto spans = GetSpans(3); |
| SpanData* attempt_span; |
| SpanData* server_span; |
| EXPECT_EQ(spans.size(), 3); |
| for (const auto& span : spans) { |
| EXPECT_TRUE(span->GetSpanContext().IsValid()); |
| if (span->GetName() == "Attempt.grpc.testing.EchoTestService/Echo") { |
| attempt_span = span.get(); |
| // Verify TCP sendmsg event |
| const auto sendmsg_event = std::find_if( |
| span->GetEvents().begin(), span->GetEvents().end(), |
| [](const SpanDataEvent& event) { |
| return absl::StrContains(event.GetName(), "TCP: SENDMSG"); |
| }); |
| EXPECT_NE(sendmsg_event, span->GetEvents().end()); |
| // Verify TCP scheduled event |
| const auto scheduled_event = std::find_if( |
| span->GetEvents().begin(), span->GetEvents().end(), |
| [](const SpanDataEvent& event) { |
| return absl::StrContains(event.GetName(), "TCP: SCHEDULED"); |
| }); |
| EXPECT_NE(scheduled_event, span->GetEvents().end()); |
| // Verify TCP sent event |
| const auto sent_event = |
| std::find_if(span->GetEvents().begin(), span->GetEvents().end(), |
| [](const SpanDataEvent& event) { |
| return absl::StrContains(event.GetName(), "TCP: SENT"); |
| }); |
| EXPECT_NE(sent_event, span->GetEvents().end()); |
| // Verify TCP acked event |
| const auto acked_event = std::find_if( |
| span->GetEvents().begin(), span->GetEvents().end(), |
| [](const SpanDataEvent& event) { |
| return absl::StrContains(event.GetName(), "TCP: ACKED"); |
| }); |
| EXPECT_NE(acked_event, span->GetEvents().end()); |
| } else if (span->GetName() == "Recv.grpc.testing.EchoTestService/Echo") { |
| server_span = span.get(); |
| // Verify TCP sendmsg event |
| const auto sendmsg_event = std::find_if( |
| span->GetEvents().begin(), span->GetEvents().end(), |
| [](const SpanDataEvent& event) { |
| return absl::StrContains(event.GetName(), "TCP: SENDMSG"); |
| }); |
| EXPECT_NE(sendmsg_event, span->GetEvents().end()); |
| // Verify TCP scheduled event |
| const auto scheduled_event = std::find_if( |
| span->GetEvents().begin(), span->GetEvents().end(), |
| [](const SpanDataEvent& event) { |
| return absl::StrContains(event.GetName(), "TCP: SCHEDULED"); |
| }); |
| EXPECT_NE(scheduled_event, span->GetEvents().end()); |
| // Verify TCP sent event |
| const auto sent_event = |
| std::find_if(span->GetEvents().begin(), span->GetEvents().end(), |
| [](const SpanDataEvent& event) { |
| return absl::StrContains(event.GetName(), "TCP: SENT"); |
| }); |
| EXPECT_NE(sent_event, span->GetEvents().end()); |
| // Verify TCP acked event |
| const auto acked_event = std::find_if( |
| span->GetEvents().begin(), span->GetEvents().end(), |
| [](const SpanDataEvent& event) { |
| return absl::StrContains(event.GetName(), "TCP: ACKED"); |
| }); |
| EXPECT_NE(acked_event, span->GetEvents().end()); |
| } |
| } |
| EXPECT_NE(attempt_span, nullptr); |
| EXPECT_NE(server_span, nullptr); |
| } |
| #endif // GRPC_LINUX_ERRQUEUE |
| |
| TEST(OTelTracingPluginTest, OTelSpanIdAndTraceIdToStringTest) { |
| char trace_id[] = "0123456789ABCDEF"; |
| char span_id[] = "01234567"; |
| auto span = std::shared_ptr<opentelemetry::trace::Span>( |
| new (std::nothrow) |
| opentelemetry::trace::DefaultSpan(opentelemetry::trace::SpanContext( |
| opentelemetry::trace::TraceId( |
| opentelemetry::nostd::span<const uint8_t, 16>( |
| reinterpret_cast<const uint8_t*>(trace_id), 16)), |
| opentelemetry::trace::SpanId( |
| opentelemetry::nostd::span<const uint8_t, 8>( |
| reinterpret_cast<const uint8_t*>(span_id), 8)), |
| opentelemetry::trace::TraceFlags(1), /*is_remote=*/true))); |
| EXPECT_THAT(grpc::internal::OTelSpanTraceIdToString(span.get()), |
| StrEq("30313233343536373839414243444546")); |
| EXPECT_THAT(grpc::internal::OTelSpanSpanIdToString(span.get()), |
| StrEq("3031323334353637")); |
| } |
| |
| class OTelTracingTestForTransparentRetries : public OTelTracingTest { |
| protected: |
| void SetUp() override { |
| grpc_core::CoreConfiguration::RegisterEphemeralBuilder( |
| [](grpc_core::CoreConfiguration::Builder* builder) { |
| // Register FailFirstCallFilter to simulate transparent retries. |
| builder->channel_init()->RegisterFilter( |
| GRPC_CLIENT_SUBCHANNEL, |
| &grpc_core::testing::FailFirstCallFilter::kFilterVtable); |
| }); |
| OTelTracingTest::SetUp(); |
| } |
| }; |
| |
| TEST_F(OTelTracingTestForTransparentRetries, TransparentRetries) { |
| SendRPC(stub_.get()); |
| auto spans = GetSpans(4); |
| // 1 client spans, 2 attempt spans, 1 server span. |
| EXPECT_EQ(spans.size(), 4); |
| struct AttemptAttributes { |
| std::string PrettyPrint() { |
| return absl::StrFormat( |
| "previous-rpc-attempts: %lu, transparent-retry: %s", |
| previous_rpc_attempts, transparent_retry ? "true" : "false"); |
| } |
| uint64_t previous_rpc_attempts; |
| bool transparent_retry; |
| }; |
| std::vector<AttemptAttributes> attempt_attributes; |
| uint64_t server_span_count = 0; |
| for (const auto& span : spans) { |
| if (span->GetName() == "Attempt.grpc.testing.EchoTestService/Echo") { |
| attempt_attributes.push_back( |
| {std::get<uint64_t>( |
| span->GetAttributes().at("previous-rpc-attempts")), |
| std::get<bool>(span->GetAttributes().at("transparent-retry"))}); |
| } else if (span->GetName() == "Recv.grpc.testing.EchoTestService/Echo") { |
| ++server_span_count; |
| } |
| } |
| EXPECT_EQ(attempt_attributes.size(), 2); |
| EXPECT_THAT(attempt_attributes[0], FieldsAre(/*previous-rpc-attempts=*/0, |
| /*transparent-retry=*/false)) |
| << attempt_attributes[0].PrettyPrint(); |
| for (int i = 1; i < attempt_attributes.size(); ++i) { |
| EXPECT_THAT(attempt_attributes[i], FieldsAre(/*previous-rpc-attempts=*/0, |
| /*transparent-retry=*/true)) |
| << attempt_attributes[i].PrettyPrint(); |
| } |
| EXPECT_EQ(server_span_count, 1); |
| } |
| |
| } // namespace |
| } // namespace testing |
| } // namespace grpc |
| |
| int main(int argc, char** argv) { |
| grpc::testing::TestEnvironment env(&argc, argv); |
| ::testing::InitGoogleTest(&argc, argv); |
| return RUN_ALL_TESTS(); |
| } |