| /* |
| * |
| * Copyright 2025 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package xds_test |
| |
| import ( |
| "context" |
| "io" |
| "strings" |
| "testing" |
| |
| "github.com/google/uuid" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/connectivity" |
| "google.golang.org/grpc/credentials/insecure" |
| "google.golang.org/grpc/internal/testutils" |
| "google.golang.org/grpc/internal/testutils/xds/e2e" |
| "google.golang.org/grpc/internal/testutils/xds/e2e/setup" |
| "google.golang.org/grpc/internal/xds/bootstrap" |
| "google.golang.org/grpc/internal/xds/xdsclient" |
| "google.golang.org/grpc/status" |
| "google.golang.org/grpc/xds" |
| |
| v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" |
| v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" |
| testgrpc "google.golang.org/grpc/interop/grpc_testing" |
| testpb "google.golang.org/grpc/interop/grpc_testing" |
| ) |
| |
| // Tests the Server's logic as it transitions from NOT_SERVING to SERVING, then |
| // to NOT_SERVING again. Before it goes to SERVING, connections should be |
| // accepted and closed. After it goes SERVING, RPC's should proceed as normal |
| // according to matched route configuration. After it transitions back into |
| // NOT_SERVING, (through an explicit LDS Resource Not Found), previously running |
| // RPC's should be gracefully closed and still work, and new RPC's should fail. |
| func (s) TestServer_ServingModeChanges_SingleServer(t *testing.T) { |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| managementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) |
| |
| // Create bootstrap configuration pointing to the above management server. |
| nodeID := uuid.New().String() |
| bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, managementServer.Address) |
| |
| // Setup the management server to respond with a listener resource that |
| // specifies a route name to watch. Due to not having received the full |
| // configuration, this should cause the server to be in mode NOT_SERVING. |
| lis, err := testutils.LocalTCPListener() |
| if err != nil { |
| t.Fatalf("testutils.LocalTCPListener() failed: %v", err) |
| } |
| host, port, err := hostPortFromListener(lis) |
| if err != nil { |
| t.Fatalf("Failed to retrieve host and port of server: %v", err) |
| } |
| listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName") |
| resources := e2e.UpdateOptions{ |
| NodeID: nodeID, |
| Listeners: []*v3listenerpb.Listener{listener}, |
| SkipValidation: true, |
| } |
| if err := managementServer.Update(ctx, resources); err != nil { |
| t.Fatal(err) |
| } |
| |
| // Start an xDS-enabled gRPC server with the above bootstrap configuration. |
| config, err := bootstrap.NewConfigFromContents(bootstrapContents) |
| if err != nil { |
| t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err) |
| } |
| pool := xdsclient.NewPool(config) |
| modeChangeHandler := newServingModeChangeHandler(t) |
| modeChangeOpt := xds.ServingModeCallback(modeChangeHandler.modeChangeCallback) |
| createStubServer(t, lis, modeChangeOpt, xds.ClientPoolForTesting(pool)) |
| |
| // Start a gRPC channel to the above server. The server is yet to receive |
| // route configuration, and therefore RPCs must fail at this time. |
| cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) |
| if err != nil { |
| t.Fatalf("Failed to dial local test server: %v", err) |
| } |
| defer cc.Close() |
| waitForFailedRPCWithStatus(ctx, t, cc, codes.Unavailable, "", "") |
| |
| // Setup the route configuration resource on the management server. This |
| // should cause the xDS-enabled gRPC server to move to SERVING mode. |
| routeConfig := e2e.RouteConfigNonForwardingAction("routeName") |
| resources = e2e.UpdateOptions{ |
| NodeID: nodeID, |
| Listeners: []*v3listenerpb.Listener{listener}, |
| Routes: []*v3routepb.RouteConfiguration{routeConfig}, |
| SkipValidation: true, |
| } |
| defer cancel() |
| if err := managementServer.Update(ctx, resources); err != nil { |
| t.Fatal(err) |
| } |
| select { |
| case <-ctx.Done(): |
| t.Fatal("Timeout waiting for the xDS-enabled gRPC server to go SERVING") |
| case gotMode := <-modeChangeHandler.modeCh: |
| if gotMode != connectivity.ServingModeServing { |
| t.Fatalf("Mode changed to %v, want %v", gotMode, connectivity.ServingModeServing) |
| } |
| } |
| waitForSuccessfulRPC(ctx, t, cc) |
| |
| // Start a stream before switching the server to not serving. Due to the |
| // stream being created before the graceful stop of the underlying |
| // connection, it should be able to continue even after the server switches |
| // to not serving. |
| c := testgrpc.NewTestServiceClient(cc) |
| stream, err := c.FullDuplexCall(ctx) |
| if err != nil { |
| t.Fatalf("cc.FullDuplexCall failed: %f", err) |
| } |
| |
| // Remove the listener resource from the management server. |
| resources.Listeners = nil |
| if err := managementServer.Update(ctx, resources); err != nil { |
| t.Fatal(err) |
| } |
| |
| // Ensure the server is in NOT_SERVING mode. |
| select { |
| case <-ctx.Done(): |
| t.Fatal("Timeout waiting for the xDS-enabled gRPC server to go NOT_SERVING") |
| case gotMode := <-modeChangeHandler.modeCh: |
| if gotMode != connectivity.ServingModeNotServing { |
| t.Fatalf("Mode changed to %v, want %v", gotMode, connectivity.ServingModeNotServing) |
| } |
| gotErr := <-modeChangeHandler.errCh |
| if gotErr == nil || !strings.Contains(gotErr.Error(), nodeID) { |
| t.Fatalf("Unexpected error: %v, want xDS Node id: %s", gotErr, nodeID) |
| } |
| } |
| |
| // Due to graceful stop, any started streams continue to work. |
| if err = stream.Send(&testpb.StreamingOutputCallRequest{}); err != nil { |
| t.Fatalf("stream.Send() failed: %v, should continue to work due to graceful stop", err) |
| } |
| if err = stream.CloseSend(); err != nil { |
| t.Fatalf("stream.CloseSend() failed: %v, should continue to work due to graceful stop", err) |
| } |
| if _, err = stream.Recv(); err != io.EOF { |
| t.Fatalf("stream.Recv() failed with %v, want io.EOF", err) |
| } |
| |
| // New RPCs on that connection should eventually start failing. |
| waitForFailedRPCWithStatus(ctx, t, cc, codes.Unavailable, "", "") |
| } |
| |
| // Tests the serving mode functionality with multiple xDS enabled gRPC servers. |
| func (s) TestServer_ServingModeChanges_MultipleServers(t *testing.T) { |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| managementServer, nodeID, bootstrapContents, _ := setup.ManagementServerAndResolver(t) |
| |
| // Create two local listeners and pass it to Serve(). |
| lis1, err := testutils.LocalTCPListener() |
| if err != nil { |
| t.Fatalf("testutils.LocalTCPListener() failed: %v", err) |
| } |
| lis2, err := testutils.LocalTCPListener() |
| if err != nil { |
| t.Fatalf("testutils.LocalTCPListener() failed: %v", err) |
| } |
| |
| // Create a server option to get notified about serving mode changes. |
| modeChangeHandler1 := newServingModeChangeHandler(t) |
| modeChangeOpt1 := xds.ServingModeCallback(modeChangeHandler1.modeChangeCallback) |
| modeChangeHandler2 := newServingModeChangeHandler(t) |
| modeChangeOpt2 := xds.ServingModeCallback(modeChangeHandler2.modeChangeCallback) |
| |
| // Start two xDS-enabled gRPC servers with the above bootstrap configuration. |
| config, err := bootstrap.NewConfigFromContents(bootstrapContents) |
| if err != nil { |
| t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err) |
| } |
| pool := xdsclient.NewPool(config) |
| createStubServer(t, lis1, modeChangeOpt1, xds.ClientPoolForTesting(pool)) |
| createStubServer(t, lis2, modeChangeOpt2, xds.ClientPoolForTesting(pool)) |
| |
| // Setup the management server to respond with server-side Listener |
| // resources for both listeners. |
| host1, port1, err := hostPortFromListener(lis1) |
| if err != nil { |
| t.Fatalf("Failed to retrieve host and port of server: %v", err) |
| } |
| listener1 := e2e.DefaultServerListener(host1, port1, e2e.SecurityLevelNone, "routeName") |
| host2, port2, err := hostPortFromListener(lis2) |
| if err != nil { |
| t.Fatalf("Failed to retrieve host and port of server: %v", err) |
| } |
| listener2 := e2e.DefaultServerListener(host2, port2, e2e.SecurityLevelNone, "routeName") |
| resources := e2e.UpdateOptions{ |
| NodeID: nodeID, |
| Listeners: []*v3listenerpb.Listener{listener1, listener2}, |
| } |
| if err := managementServer.Update(ctx, resources); err != nil { |
| t.Fatal(err) |
| } |
| |
| // Wait for both listeners to move to "serving" mode. |
| select { |
| case <-ctx.Done(): |
| t.Fatalf("Timed out waiting for a mode change update: %v", err) |
| case mode := <-modeChangeHandler1.modeCh: |
| if mode != connectivity.ServingModeServing { |
| t.Fatalf("Listener 1 received new mode %v, want %v", mode, connectivity.ServingModeServing) |
| } |
| } |
| select { |
| case <-ctx.Done(): |
| t.Fatalf("Timed out waiting for a mode change update: %v", err) |
| case mode := <-modeChangeHandler2.modeCh: |
| if mode != connectivity.ServingModeServing { |
| t.Fatalf("Listener 2 received new mode %v, want %v", mode, connectivity.ServingModeServing) |
| } |
| } |
| |
| // Create a ClientConn to the first listener and make a successful RPCs. |
| cc1, err := grpc.NewClient(lis1.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) |
| if err != nil { |
| t.Fatalf("grpc.NewClient() failed: %v", err) |
| } |
| defer cc1.Close() |
| waitForSuccessfulRPC(ctx, t, cc1) |
| |
| // Create a ClientConn to the second listener and make a successful RPCs. |
| cc2, err := grpc.NewClient(lis2.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) |
| if err != nil { |
| t.Fatalf("grpc.NewClient() failed: %v", err) |
| } |
| defer cc2.Close() |
| waitForSuccessfulRPC(ctx, t, cc2) |
| |
| // Update the management server to remove the second listener resource. This |
| // should push only the second listener into "not-serving" mode. |
| if err := managementServer.Update(ctx, e2e.UpdateOptions{ |
| NodeID: nodeID, |
| Listeners: []*v3listenerpb.Listener{listener1}, |
| }); err != nil { |
| t.Fatal(err) |
| } |
| |
| // Wait for lis2 to move to "not-serving" mode. |
| select { |
| case <-ctx.Done(): |
| t.Fatalf("Timed out waiting for a mode change update: %v", err) |
| case mode := <-modeChangeHandler2.modeCh: |
| if mode != connectivity.ServingModeNotServing { |
| t.Fatalf("Listener received new mode %v, want %v", mode, connectivity.ServingModeNotServing) |
| } |
| gotErr := <-modeChangeHandler2.errCh |
| if gotErr == nil || !strings.Contains(gotErr.Error(), nodeID) { |
| t.Fatalf("Unexpected error: %v, want xDS Node id: %s", gotErr, nodeID) |
| } |
| } |
| |
| // Make sure RPCs succeed on cc1 and fail on cc2. |
| waitForSuccessfulRPC(ctx, t, cc1) |
| waitForFailedRPCWithStatus(ctx, t, cc2, codes.Unavailable, "", "") |
| |
| // Update the management server to remove the first listener resource as |
| // well. This should push the first listener into "not-serving" mode. Second |
| // listener is already in "not-serving" mode. |
| if err := managementServer.Update(ctx, e2e.UpdateOptions{ |
| NodeID: nodeID, |
| Listeners: []*v3listenerpb.Listener{}, |
| }); err != nil { |
| t.Fatal(err) |
| } |
| |
| // Wait for lis1 to move to "not-serving" mode. lis2 was already removed |
| // from the xdsclient's resource cache. So, lis2's callback will not be |
| // invoked this time around. |
| select { |
| case <-ctx.Done(): |
| t.Fatalf("Timed out waiting for a mode change update: %v", err) |
| case mode := <-modeChangeHandler1.modeCh: |
| if mode != connectivity.ServingModeNotServing { |
| t.Fatalf("Listener received new mode %v, want %v", mode, connectivity.ServingModeNotServing) |
| } |
| gotErr := <-modeChangeHandler1.errCh |
| if gotErr == nil || !strings.Contains(gotErr.Error(), nodeID) { |
| t.Fatalf("Unexpected error: %v, want xDS Node id: %s", gotErr, nodeID) |
| } |
| } |
| |
| // Make sure RPCs fail on both. |
| waitForFailedRPCWithStatus(ctx, t, cc1, codes.Unavailable, "", "") |
| waitForFailedRPCWithStatus(ctx, t, cc2, codes.Unavailable, "", "") |
| |
| // Make sure new connection attempts to "not-serving" servers fail. |
| if cc1, err = grpc.NewClient(lis1.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil { |
| t.Fatal("Failed to create clientConn to a server in \"not-serving\" state") |
| } |
| defer cc1.Close() |
| if _, err := testgrpc.NewTestServiceClient(cc1).FullDuplexCall(ctx); status.Code(err) != codes.Unavailable { |
| t.Fatalf("FullDuplexCall failed with status code: %v, want: Unavailable", status.Code(err)) |
| } |
| |
| // Update the management server with both listener resources. |
| if err := managementServer.Update(ctx, e2e.UpdateOptions{ |
| NodeID: nodeID, |
| Listeners: []*v3listenerpb.Listener{listener1, listener2}, |
| }); err != nil { |
| t.Fatal(err) |
| } |
| |
| // Wait for both listeners to move to "serving" mode. |
| select { |
| case <-ctx.Done(): |
| t.Fatalf("Timed out waiting for a mode change update: %v", err) |
| case mode := <-modeChangeHandler1.modeCh: |
| if mode != connectivity.ServingModeServing { |
| t.Fatalf("Listener received new mode %v, want %v", mode, connectivity.ServingModeServing) |
| } |
| } |
| select { |
| case <-ctx.Done(): |
| t.Fatalf("Timed out waiting for a mode change update: %v", err) |
| case mode := <-modeChangeHandler2.modeCh: |
| if mode != connectivity.ServingModeServing { |
| t.Fatalf("Listener received new mode %v, want %v", mode, connectivity.ServingModeServing) |
| } |
| } |
| |
| // The clientConns created earlier should be able to make RPCs now. |
| waitForSuccessfulRPC(ctx, t, cc1) |
| waitForSuccessfulRPC(ctx, t, cc2) |
| } |