xds: store server config for LRS server in xdsresource.ClusterUpdate (#7191)

* xds: support LRS server config

* switch to the new bootstrap package in internal/xds
diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go
index 0d0e9d2..8e97e10 100644
--- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go
+++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go
@@ -609,21 +609,7 @@
 			Cluster:               cluster.ClusterName,
 			EDSServiceName:        cluster.EDSServiceName,
 			MaxConcurrentRequests: cluster.MaxRequests,
-		}
-		if cluster.LRSServerConfig == xdsresource.ClusterLRSServerSelf {
-			bootstrapConfig := b.xdsClient.BootstrapConfig()
-			parsedName := xdsresource.ParseName(cluster.ClusterName)
-			if parsedName.Scheme == xdsresource.FederationScheme {
-				// Is a federation resource name, find the corresponding
-				// authority server config.
-				if cfg, ok := bootstrapConfig.Authorities[parsedName.Authority]; ok {
-					dm.LoadReportingServer = cfg.XDSServer
-				}
-			} else {
-				// Not a federation resource name, use the default
-				// authority.
-				dm.LoadReportingServer = bootstrapConfig.XDSServer
-			}
+			LoadReportingServer:   cluster.LRSServerConfig,
 		}
 	case xdsresource.ClusterTypeLogicalDNS:
 		dm = clusterresolver.DiscoveryMechanism{
diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go
index 6c82e70..62d7a17 100644
--- a/xds/internal/xdsclient/authority.go
+++ b/xds/internal/xdsclient/authority.go
@@ -94,12 +94,6 @@
 	// (although the former is part of the latter) is because authorities in the
 	// bootstrap config might contain an empty server config, and in this case,
 	// the top-level server config is to be used.
-	//
-	// There are two code paths from where a new authority struct might be
-	// created. One is when a watch is registered for a resource, and one is
-	// when load reporting needs to be started. We have the authority name in
-	// the first case, but do in the second. We only have the server config in
-	// the second case.
 	serverCfg          *bootstrap.ServerConfig
 	bootstrapCfg       *bootstrap.Config
 	serializer         *grpcsync.CallbackSerializer
@@ -156,7 +150,10 @@
 		return xdsresource.NewErrorf(xdsresource.ErrorTypeResourceTypeUnsupported, "Resource URL %v unknown in response from server", resourceUpdate.URL)
 	}
 
-	opts := &xdsresource.DecodeOptions{BootstrapConfig: a.bootstrapCfg}
+	opts := &xdsresource.DecodeOptions{
+		BootstrapConfig: a.bootstrapCfg,
+		ServerConfig:    a.serverCfg,
+	}
 	updates, md, err := decodeAllResources(opts, rType, resourceUpdate)
 	a.updateResourceStateAndScheduleCallbacks(rType, updates, md)
 	return err
diff --git a/xds/internal/xdsclient/clientimpl_authority.go b/xds/internal/xdsclient/clientimpl_authority.go
index c73e3e2..69db79e 100644
--- a/xds/internal/xdsclient/clientimpl_authority.go
+++ b/xds/internal/xdsclient/clientimpl_authority.go
@@ -36,7 +36,7 @@
 // authority, without holding c.authorityMu.
 //
 // Caller must not hold c.authorityMu.
-func (c *clientImpl) findAuthority(n *xdsresource.Name) (_ *authority, unref func(), _ error) {
+func (c *clientImpl) findAuthority(n *xdsresource.Name) (*authority, func(), error) {
 	scheme, authority := n.Scheme, n.Authority
 
 	c.authorityMu.Lock()
diff --git a/xds/internal/xdsclient/clientimpl_watchers.go b/xds/internal/xdsclient/clientimpl_watchers.go
index 045fe77..f64124d 100644
--- a/xds/internal/xdsclient/clientimpl_watchers.go
+++ b/xds/internal/xdsclient/clientimpl_watchers.go
@@ -48,15 +48,6 @@
 		return func() {}
 	}
 
-	// TODO: replace this with the code does the following when we have
-	// implemented generic watch API on the authority:
-	//  - Parse the resource name and extract the authority.
-	//  - Locate the corresponding authority object and acquire a reference to
-	//    it. If the authority is not found, error out.
-	//  - Call the watchResource() method on the authority.
-	//  - Return a cancel function to cancel the watch on the authority and to
-	//    release the reference.
-
 	// TODO: Make ParseName return an error if parsing fails, and
 	// schedule the OnError callback in that case.
 	n := xdsresource.ParseName(resourceName)
diff --git a/xds/internal/xdsclient/tests/resource_update_test.go b/xds/internal/xdsclient/tests/resource_update_test.go
index 783e9e4..89b5d33 100644
--- a/xds/internal/xdsclient/tests/resource_update_test.go
+++ b/xds/internal/xdsclient/tests/resource_update_test.go
@@ -669,9 +669,8 @@
 				Resources:   []*anypb.Any{testutils.MarshalAny(t, resource1)},
 			},
 			wantUpdate: xdsresource.ClusterUpdate{
-				ClusterName:     "resource-name-1",
-				EDSServiceName:  "eds-service-name",
-				LRSServerConfig: xdsresource.ClusterLRSServerSelf,
+				ClusterName:    "resource-name-1",
+				EDSServiceName: "eds-service-name",
 			},
 			wantUpdateMetadata: map[string]xdsresource.UpdateWithMD{
 				"resource-name-1": {
@@ -689,9 +688,8 @@
 				Resources:   []*anypb.Any{testutils.MarshalAny(t, resource1), testutils.MarshalAny(t, resource2)},
 			},
 			wantUpdate: xdsresource.ClusterUpdate{
-				ClusterName:     "resource-name-1",
-				EDSServiceName:  "eds-service-name",
-				LRSServerConfig: xdsresource.ClusterLRSServerSelf,
+				ClusterName:    "resource-name-1",
+				EDSServiceName: "eds-service-name",
 			},
 			wantUpdateMetadata: map[string]xdsresource.UpdateWithMD{
 				"resource-name-1": {
@@ -763,6 +761,16 @@
 			if gotErr != nil && !strings.Contains(gotErr.Error(), test.wantErr) {
 				t.Fatalf("Got error from handling update: %v, want %v", gotErr, test.wantErr)
 			}
+
+			// For tests expected to succeed, we expect an LRS server config in
+			// the update from the xDS client, because the LRS bit is turned on
+			// in the cluster resource. We *cannot* set the LRS server config in
+			// the test table because we do not have the address of the xDS
+			// server at that point, hence we do it here before verifying the
+			// received update.
+			if test.wantErr == "" {
+				test.wantUpdate.LRSServerConfig = xdstestutils.ServerConfigForAddress(t, mgmtServer.Address)
+			}
 			cmpOpts := []cmp.Option{
 				cmpopts.EquateEmpty(),
 				cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicy"),
diff --git a/xds/internal/xdsclient/xdsresource/cluster_resource_type.go b/xds/internal/xdsclient/xdsresource/cluster_resource_type.go
index 183801c..5ac7f03 100644
--- a/xds/internal/xdsclient/xdsresource/cluster_resource_type.go
+++ b/xds/internal/xdsclient/xdsresource/cluster_resource_type.go
@@ -55,7 +55,7 @@
 // Decode deserializes and validates an xDS resource serialized inside the
 // provided `Any` proto, as received from the xDS management server.
 func (clusterResourceType) Decode(opts *DecodeOptions, resource *anypb.Any) (*DecodeResult, error) {
-	name, cluster, err := unmarshalClusterResource(resource)
+	name, cluster, err := unmarshalClusterResource(resource, opts.ServerConfig)
 	switch {
 	case name == "":
 		// Name is unset only when protobuf deserialization fails.
diff --git a/xds/internal/xdsclient/xdsresource/resource_type.go b/xds/internal/xdsclient/xdsresource/resource_type.go
index cf1dff7..a1e15e2 100644
--- a/xds/internal/xdsclient/xdsresource/resource_type.go
+++ b/xds/internal/xdsclient/xdsresource/resource_type.go
@@ -133,9 +133,13 @@
 // DecodeOptions wraps the options required by ResourceType implementation for
 // decoding configuration received from the xDS management server.
 type DecodeOptions struct {
-	// BootstrapConfig contains the bootstrap configuration passed to the
-	// top-level xdsClient. This contains useful data for resource validation.
+	// BootstrapConfig contains the complete bootstrap configuration passed to
+	// the xDS client. This contains useful data for resource validation.
 	BootstrapConfig *bootstrap.Config
+	// ServerConfig contains the server config (from the above bootstrap
+	// configuration) of the xDS server from which the current resource, for
+	// which Decode() is being invoked, was received.
+	ServerConfig *bootstrap.ServerConfig
 }
 
 // DecodeResult is the result of a decode operation.
diff --git a/xds/internal/xdsclient/xdsresource/tests/unmarshal_cds_test.go b/xds/internal/xdsclient/xdsresource/tests/unmarshal_cds_test.go
index e373034..fff78fe 100644
--- a/xds/internal/xdsclient/xdsresource/tests/unmarshal_cds_test.go
+++ b/xds/internal/xdsclient/xdsresource/tests/unmarshal_cds_test.go
@@ -26,18 +26,20 @@
 	"github.com/google/go-cmp/cmp"
 	"github.com/google/go-cmp/cmp/cmpopts"
 	"google.golang.org/grpc/balancer/leastrequest"
-	_ "google.golang.org/grpc/balancer/roundrobin" // To register round_robin load balancer.
 	"google.golang.org/grpc/internal/balancer/stub"
 	"google.golang.org/grpc/internal/envconfig"
 	"google.golang.org/grpc/internal/grpctest"
 	iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
 	"google.golang.org/grpc/internal/testutils"
 	"google.golang.org/grpc/internal/testutils/xds/e2e"
+	"google.golang.org/grpc/internal/xds/bootstrap"
 	"google.golang.org/grpc/serviceconfig"
-	_ "google.golang.org/grpc/xds" // Register the xDS LB Registry Converters.
 	"google.golang.org/grpc/xds/internal/balancer/ringhash"
 	"google.golang.org/grpc/xds/internal/balancer/wrrlocality"
 	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
+	"google.golang.org/protobuf/proto"
+	"google.golang.org/protobuf/types/known/anypb"
+	"google.golang.org/protobuf/types/known/structpb"
 	"google.golang.org/protobuf/types/known/wrapperspb"
 
 	v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3"
@@ -48,9 +50,9 @@
 	v3ringhashpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/ring_hash/v3"
 	v3roundrobinpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/round_robin/v3"
 	v3wrrlocalitypb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/wrr_locality/v3"
-	"google.golang.org/protobuf/proto"
-	"google.golang.org/protobuf/types/known/anypb"
-	"google.golang.org/protobuf/types/known/structpb"
+
+	_ "google.golang.org/grpc/balancer/roundrobin" // To register round_robin load balancer.
+	_ "google.golang.org/grpc/xds"                 // Register the xDS LB Registry Converters.
 )
 
 type s struct {
@@ -66,8 +68,6 @@
 	serviceName = "service"
 )
 
-var emptyUpdate = xdsresource.ClusterUpdate{ClusterName: clusterName, LRSServerConfig: xdsresource.ClusterLRSOff}
-
 func wrrLocality(t *testing.T, m proto.Message) *v3wrrlocalitypb.WrrLocality {
 	return &v3wrrlocalitypb.WrrLocality{
 		EndpointPickingPolicy: &v3clusterpb.LoadBalancingPolicy{
@@ -105,6 +105,7 @@
 	tests := []struct {
 		name         string
 		cluster      *v3clusterpb.Cluster
+		serverCfg    *bootstrap.ServerConfig
 		wantUpdate   xdsresource.ClusterUpdate
 		wantLBConfig *iserviceconfig.BalancerConfig
 	}{
@@ -164,7 +165,8 @@
 				LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN,
 			},
 			wantUpdate: xdsresource.ClusterUpdate{
-				ClusterName: clusterName, LRSServerConfig: xdsresource.ClusterLRSOff, ClusterType: xdsresource.ClusterTypeAggregate,
+				ClusterName:             clusterName,
+				ClusterType:             xdsresource.ClusterTypeAggregate,
 				PrioritizedClusterNames: []string{"a", "b", "c"},
 			},
 			wantLBConfig: &iserviceconfig.BalancerConfig{
@@ -179,7 +181,7 @@
 		{
 			name:       "happy-case-no-service-name-no-lrs",
 			cluster:    e2e.DefaultCluster(clusterName, "", e2e.SecurityLevelNone),
-			wantUpdate: emptyUpdate,
+			wantUpdate: xdsresource.ClusterUpdate{ClusterName: clusterName},
 			wantLBConfig: &iserviceconfig.BalancerConfig{
 				Name: wrrlocality.Name,
 				Config: &wrrlocality.LBConfig{
@@ -206,16 +208,17 @@
 			},
 		},
 		{
-			name: "happiest-case",
+			name: "happiest-case-with-lrs",
 			cluster: e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
 				ClusterName: clusterName,
 				ServiceName: serviceName,
 				EnableLRS:   true,
 			}),
+			serverCfg: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
 			wantUpdate: xdsresource.ClusterUpdate{
 				ClusterName:     clusterName,
 				EDSServiceName:  serviceName,
-				LRSServerConfig: xdsresource.ClusterLRSServerSelf,
+				LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
 			},
 			wantLBConfig: &iserviceconfig.BalancerConfig{
 				Name: wrrlocality.Name,
@@ -248,10 +251,11 @@
 				}
 				return c
 			}(),
+			serverCfg: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
 			wantUpdate: xdsresource.ClusterUpdate{
 				ClusterName:     clusterName,
 				EDSServiceName:  serviceName,
-				LRSServerConfig: xdsresource.ClusterLRSServerSelf,
+				LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
 				MaxRequests:     func() *uint32 { i := uint32(512); return &i }(),
 			},
 			wantLBConfig: &iserviceconfig.BalancerConfig{
@@ -298,7 +302,8 @@
 				LbPolicy: v3clusterpb.Cluster_LEAST_REQUEST,
 			},
 			wantUpdate: xdsresource.ClusterUpdate{
-				ClusterName: clusterName, EDSServiceName: serviceName,
+				ClusterName:    clusterName,
+				EDSServiceName: serviceName,
 			},
 			wantLBConfig: &iserviceconfig.BalancerConfig{
 				Name: "least_request_experimental",
@@ -353,7 +358,8 @@
 				},
 			},
 			wantUpdate: xdsresource.ClusterUpdate{
-				ClusterName: clusterName, EDSServiceName: serviceName,
+				ClusterName:    clusterName,
+				EDSServiceName: serviceName,
 			},
 			wantLBConfig: &iserviceconfig.BalancerConfig{
 				Name: "least_request_experimental",
@@ -527,7 +533,7 @@
 
 	for _, test := range tests {
 		t.Run(test.name, func(t *testing.T) {
-			update, err := xdsresource.ValidateClusterAndConstructClusterUpdateForTesting(test.cluster)
+			update, err := xdsresource.ValidateClusterAndConstructClusterUpdateForTesting(test.cluster, test.serverCfg)
 			if err != nil {
 				t.Errorf("validateClusterAndConstructClusterUpdate(%+v) failed: %v", test.cluster, err)
 			}
diff --git a/xds/internal/xdsclient/xdsresource/type_cds.go b/xds/internal/xdsclient/xdsresource/type_cds.go
index b782e24..ae4cb1e 100644
--- a/xds/internal/xdsclient/xdsresource/type_cds.go
+++ b/xds/internal/xdsclient/xdsresource/type_cds.go
@@ -20,6 +20,7 @@
 import (
 	"encoding/json"
 
+	"google.golang.org/grpc/internal/xds/bootstrap"
 	"google.golang.org/protobuf/types/known/anypb"
 )
 
@@ -39,18 +40,6 @@
 	ClusterTypeAggregate
 )
 
-// ClusterLRSServerConfigType is the type of LRS server config.
-type ClusterLRSServerConfigType int
-
-const (
-	// ClusterLRSOff indicates LRS is off (loads are not reported for this
-	// cluster).
-	ClusterLRSOff ClusterLRSServerConfigType = iota
-	// ClusterLRSServerSelf indicates loads should be reported to the same
-	// server (the authority) where the CDS resp is received from.
-	ClusterLRSServerSelf
-)
-
 // ClusterUpdate contains information from a received CDS response, which is of
 // interest to the registered CDS watcher.
 type ClusterUpdate struct {
@@ -60,10 +49,10 @@
 	// EDSServiceName is an optional name for EDS. If it's not set, the balancer
 	// should watch ClusterName for the EDS resources.
 	EDSServiceName string
-	// LRSServerConfig contains the server where the load reports should be sent
-	// to. This can be change to an interface, to support other types, e.g. a
-	// ServerConfig with ServerURI, creds.
-	LRSServerConfig ClusterLRSServerConfigType
+	// LRSServerConfig contains configuration about the xDS server that sent
+	// this cluster resource. This is also the server where load reports are to
+	// be sent, for this cluster.
+	LRSServerConfig *bootstrap.ServerConfig
 	// SecurityCfg contains security configuration sent by the control plane.
 	SecurityCfg *SecurityConfig
 	// MaxRequests for circuit breaking, if any (otherwise nil).
diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_cds.go b/xds/internal/xdsclient/xdsresource/unmarshal_cds.go
index 276bf40..de12f47 100644
--- a/xds/internal/xdsclient/xdsresource/unmarshal_cds.go
+++ b/xds/internal/xdsclient/xdsresource/unmarshal_cds.go
@@ -34,6 +34,7 @@
 	"google.golang.org/grpc/internal/envconfig"
 	"google.golang.org/grpc/internal/pretty"
 	iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
+	"google.golang.org/grpc/internal/xds/bootstrap"
 	"google.golang.org/grpc/internal/xds/matcher"
 	"google.golang.org/grpc/xds/internal/xdsclient/xdslbregistry"
 	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
@@ -50,7 +51,7 @@
 // to this value by the management server.
 const transportSocketName = "envoy.transport_sockets.tls"
 
-func unmarshalClusterResource(r *anypb.Any) (string, ClusterUpdate, error) {
+func unmarshalClusterResource(r *anypb.Any, serverCfg *bootstrap.ServerConfig) (string, ClusterUpdate, error) {
 	r, err := UnwrapResource(r)
 	if err != nil {
 		return "", ClusterUpdate{}, fmt.Errorf("failed to unwrap resource: %v", err)
@@ -64,7 +65,7 @@
 	if err := proto.Unmarshal(r.GetValue(), cluster); err != nil {
 		return "", ClusterUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err)
 	}
-	cu, err := validateClusterAndConstructClusterUpdate(cluster)
+	cu, err := validateClusterAndConstructClusterUpdate(cluster, serverCfg)
 	if err != nil {
 		return cluster.GetName(), ClusterUpdate{}, err
 	}
@@ -81,7 +82,7 @@
 	defaultLeastRequestChoiceCount = 2
 )
 
-func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (ClusterUpdate, error) {
+func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster, serverCfg *bootstrap.ServerConfig) (ClusterUpdate, error) {
 	telemetryLabels := make(map[string]string)
 	if fmd := cluster.GetMetadata().GetFilterMetadata(); fmd != nil {
 		if val, ok := fmd["com.google.csm.telemetry_labels"]; ok {
@@ -182,21 +183,11 @@
 		TelemetryLabels:  telemetryLabels,
 	}
 
-	// Note that this is different from the gRFC (gRFC A47 says to include the
-	// full ServerConfig{URL,creds,server feature} here). This information is
-	// not available here, because this function doesn't have access to the
-	// xdsclient bootstrap information now (can be added if necessary). The
-	// ServerConfig will be read and populated by the CDS balancer when
-	// processing this field.
-	// According to A27:
-	// If the `lrs_server` field is set, it must have its `self` field set, in
-	// which case the client should use LRS for load reporting. Otherwise
-	// (the `lrs_server` field is not set), LRS load reporting will be disabled.
 	if lrs := cluster.GetLrsServer(); lrs != nil {
 		if lrs.GetSelf() == nil {
 			return ClusterUpdate{}, fmt.Errorf("unsupported config_source_specifier %T in lrs_server field", lrs.ConfigSourceSpecifier)
 		}
-		ret.LRSServerConfig = ClusterLRSServerSelf
+		ret.LRSServerConfig = serverCfg
 	}
 
 	// Validate and set cluster type from the response.
diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go b/xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go
index a345466..d008b25 100644
--- a/xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go
+++ b/xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go
@@ -27,6 +27,7 @@
 	"github.com/google/go-cmp/cmp/cmpopts"
 	"google.golang.org/grpc/internal/pretty"
 	"google.golang.org/grpc/internal/testutils"
+	"google.golang.org/grpc/internal/xds/bootstrap"
 	"google.golang.org/grpc/internal/xds/matcher"
 	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
 
@@ -50,14 +51,11 @@
 	serviceName = "service"
 )
 
-var emptyUpdate = ClusterUpdate{ClusterName: clusterName, LRSServerConfig: ClusterLRSOff}
-
 func (s) TestValidateCluster_Failure(t *testing.T) {
 	tests := []struct {
-		name       string
-		cluster    *v3clusterpb.Cluster
-		wantUpdate ClusterUpdate
-		wantErr    bool
+		name    string
+		cluster *v3clusterpb.Cluster
+		wantErr bool
 	}{
 		{
 			name: "non-supported-cluster-type-static",
@@ -72,8 +70,7 @@
 				},
 				LbPolicy: v3clusterpb.Cluster_LEAST_REQUEST,
 			},
-			wantUpdate: emptyUpdate,
-			wantErr:    true,
+			wantErr: true,
 		},
 		{
 			name: "non-supported-cluster-type-original-dst",
@@ -88,8 +85,7 @@
 				},
 				LbPolicy: v3clusterpb.Cluster_LEAST_REQUEST,
 			},
-			wantUpdate: emptyUpdate,
-			wantErr:    true,
+			wantErr: true,
 		},
 		{
 			name: "no-eds-config",
@@ -97,8 +93,7 @@
 				ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS},
 				LbPolicy:             v3clusterpb.Cluster_ROUND_ROBIN,
 			},
-			wantUpdate: emptyUpdate,
-			wantErr:    true,
+			wantErr: true,
 		},
 		{
 			name: "no-ads-config-source",
@@ -107,8 +102,7 @@
 				EdsClusterConfig:     &v3clusterpb.Cluster_EdsClusterConfig{},
 				LbPolicy:             v3clusterpb.Cluster_ROUND_ROBIN,
 			},
-			wantUpdate: emptyUpdate,
-			wantErr:    true,
+			wantErr: true,
 		},
 		{
 			name: "non-round-robin-or-ring-hash-lb-policy",
@@ -123,8 +117,7 @@
 				},
 				LbPolicy: v3clusterpb.Cluster_LEAST_REQUEST,
 			},
-			wantUpdate: emptyUpdate,
-			wantErr:    true,
+			wantErr: true,
 		},
 		{
 			name: "logical-dns-multiple-localities",
@@ -140,8 +133,7 @@
 					},
 				},
 			},
-			wantUpdate: emptyUpdate,
-			wantErr:    true,
+			wantErr: true,
 		},
 		{
 			name: "ring-hash-hash-function-not-xx-hash",
@@ -153,8 +145,7 @@
 					},
 				},
 			},
-			wantUpdate: emptyUpdate,
-			wantErr:    true,
+			wantErr: true,
 		},
 		{
 			name: "least-request-choice-count-less-than-two",
@@ -166,8 +157,7 @@
 					},
 				},
 			},
-			wantUpdate: emptyUpdate,
-			wantErr:    true,
+			wantErr: true,
 		},
 		{
 			name: "ring-hash-max-bound-greater-than-upper-bound",
@@ -179,8 +169,7 @@
 					},
 				},
 			},
-			wantUpdate: emptyUpdate,
-			wantErr:    true,
+			wantErr: true,
 		},
 		{
 			name: "ring-hash-max-bound-greater-than-upper-bound-load-balancing-policy",
@@ -209,8 +198,7 @@
 					},
 				},
 			},
-			wantUpdate: emptyUpdate,
-			wantErr:    true,
+			wantErr: true,
 		},
 		{
 			name: "least-request-unsupported-in-converter-since-env-var-unset",
@@ -235,8 +223,7 @@
 					},
 				},
 			},
-			wantUpdate: emptyUpdate,
-			wantErr:    true,
+			wantErr: true,
 		},
 		{
 			name: "aggregate-nil-clusters",
@@ -250,8 +237,7 @@
 				},
 				LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN,
 			},
-			wantUpdate: emptyUpdate,
-			wantErr:    true,
+			wantErr: true,
 		},
 		{
 			name: "aggregate-empty-clusters",
@@ -267,14 +253,13 @@
 				},
 				LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN,
 			},
-			wantUpdate: emptyUpdate,
-			wantErr:    true,
+			wantErr: true,
 		},
 	}
 
 	for _, test := range tests {
 		t.Run(test.name, func(t *testing.T) {
-			if update, err := validateClusterAndConstructClusterUpdate(test.cluster); err == nil {
+			if update, err := validateClusterAndConstructClusterUpdate(test.cluster, nil); err == nil {
 				t.Errorf("validateClusterAndConstructClusterUpdate(%+v) = %v, wanted error", test.cluster, update)
 			}
 		})
@@ -882,9 +867,8 @@
 				},
 			},
 			wantUpdate: ClusterUpdate{
-				ClusterName:     clusterName,
-				EDSServiceName:  serviceName,
-				LRSServerConfig: ClusterLRSOff,
+				ClusterName:    clusterName,
+				EDSServiceName: serviceName,
 				SecurityCfg: &SecurityConfig{
 					RootInstanceName: rootPluginInstance,
 					RootCertName:     rootCertName,
@@ -924,9 +908,8 @@
 				},
 			},
 			wantUpdate: ClusterUpdate{
-				ClusterName:     clusterName,
-				EDSServiceName:  serviceName,
-				LRSServerConfig: ClusterLRSOff,
+				ClusterName:    clusterName,
+				EDSServiceName: serviceName,
 				SecurityCfg: &SecurityConfig{
 					RootInstanceName: rootPluginInstance,
 					RootCertName:     rootCertName,
@@ -968,9 +951,8 @@
 				},
 			},
 			wantUpdate: ClusterUpdate{
-				ClusterName:     clusterName,
-				EDSServiceName:  serviceName,
-				LRSServerConfig: ClusterLRSOff,
+				ClusterName:    clusterName,
+				EDSServiceName: serviceName,
 				SecurityCfg: &SecurityConfig{
 					RootInstanceName:     rootPluginInstance,
 					RootCertName:         rootCertName,
@@ -1016,9 +998,8 @@
 				},
 			},
 			wantUpdate: ClusterUpdate{
-				ClusterName:     clusterName,
-				EDSServiceName:  serviceName,
-				LRSServerConfig: ClusterLRSOff,
+				ClusterName:    clusterName,
+				EDSServiceName: serviceName,
 				SecurityCfg: &SecurityConfig{
 					RootInstanceName:     rootPluginInstance,
 					RootCertName:         rootCertName,
@@ -1076,9 +1057,8 @@
 				},
 			},
 			wantUpdate: ClusterUpdate{
-				ClusterName:     clusterName,
-				EDSServiceName:  serviceName,
-				LRSServerConfig: ClusterLRSOff,
+				ClusterName:    clusterName,
+				EDSServiceName: serviceName,
 				SecurityCfg: &SecurityConfig{
 					RootInstanceName:     rootPluginInstance,
 					RootCertName:         rootCertName,
@@ -1143,9 +1123,8 @@
 				},
 			},
 			wantUpdate: ClusterUpdate{
-				ClusterName:     clusterName,
-				EDSServiceName:  serviceName,
-				LRSServerConfig: ClusterLRSOff,
+				ClusterName:    clusterName,
+				EDSServiceName: serviceName,
 				SecurityCfg: &SecurityConfig{
 					RootInstanceName:     rootPluginInstance,
 					RootCertName:         rootCertName,
@@ -1165,7 +1144,7 @@
 
 	for _, test := range tests {
 		t.Run(test.name, func(t *testing.T) {
-			update, err := validateClusterAndConstructClusterUpdate(test.cluster)
+			update, err := validateClusterAndConstructClusterUpdate(test.cluster, nil)
 			if (err != nil) != test.wantErr {
 				t.Errorf("validateClusterAndConstructClusterUpdate() returned err %v wantErr %v)", err, test.wantErr)
 			}
@@ -1287,6 +1266,7 @@
 	tests := []struct {
 		name       string
 		resource   *anypb.Any
+		serverCfg  *bootstrap.ServerConfig
 		wantName   string
 		wantUpdate ClusterUpdate
 		wantErr    bool
@@ -1337,43 +1317,50 @@
 			wantErr:  true,
 		},
 		{
-			name:     "v3 cluster",
-			resource: v3ClusterAny,
-			wantName: v3ClusterName,
-			wantUpdate: ClusterUpdate{
-				ClusterName:    v3ClusterName,
-				EDSServiceName: v3Service, LRSServerConfig: ClusterLRSServerSelf,
-				Raw: v3ClusterAny,
-			},
-		},
-		{
-			name:     "v3 cluster wrapped",
-			resource: testutils.MarshalAny(t, &v3discoverypb.Resource{Resource: v3ClusterAny}),
-			wantName: v3ClusterName,
-			wantUpdate: ClusterUpdate{
-				ClusterName:    v3ClusterName,
-				EDSServiceName: v3Service, LRSServerConfig: ClusterLRSServerSelf,
-				Raw: v3ClusterAny,
-			},
-		},
-		{
-			name:     "v3 cluster with EDS config source self",
-			resource: v3ClusterAnyWithEDSConfigSourceSelf,
-			wantName: v3ClusterName,
-			wantUpdate: ClusterUpdate{
-				ClusterName:    v3ClusterName,
-				EDSServiceName: v3Service, LRSServerConfig: ClusterLRSServerSelf,
-				Raw: v3ClusterAnyWithEDSConfigSourceSelf,
-			},
-		},
-		{
-			name:     "v3 cluster with telemetry case",
-			resource: v3ClusterAnyWithTelemetryLabels,
-			wantName: v3ClusterName,
+			name:      "v3 cluster",
+			resource:  v3ClusterAny,
+			serverCfg: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
+			wantName:  v3ClusterName,
 			wantUpdate: ClusterUpdate{
 				ClusterName:     v3ClusterName,
 				EDSServiceName:  v3Service,
-				LRSServerConfig: ClusterLRSServerSelf,
+				LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
+				Raw:             v3ClusterAny,
+			},
+		},
+		{
+			name:      "v3 cluster wrapped",
+			resource:  testutils.MarshalAny(t, &v3discoverypb.Resource{Resource: v3ClusterAny}),
+			serverCfg: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
+			wantName:  v3ClusterName,
+			wantUpdate: ClusterUpdate{
+				ClusterName:     v3ClusterName,
+				EDSServiceName:  v3Service,
+				LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
+				Raw:             v3ClusterAny,
+			},
+		},
+		{
+			name:      "v3 cluster with EDS config source self",
+			resource:  v3ClusterAnyWithEDSConfigSourceSelf,
+			serverCfg: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
+			wantName:  v3ClusterName,
+			wantUpdate: ClusterUpdate{
+				ClusterName:     v3ClusterName,
+				EDSServiceName:  v3Service,
+				LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
+				Raw:             v3ClusterAnyWithEDSConfigSourceSelf,
+			},
+		},
+		{
+			name:      "v3 cluster with telemetry case",
+			resource:  v3ClusterAnyWithTelemetryLabels,
+			serverCfg: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
+			wantName:  v3ClusterName,
+			wantUpdate: ClusterUpdate{
+				ClusterName:     v3ClusterName,
+				EDSServiceName:  v3Service,
+				LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
 				Raw:             v3ClusterAnyWithTelemetryLabels,
 				TelemetryLabels: map[string]string{
 					"service_name":      "grpc-service",
@@ -1382,13 +1369,14 @@
 			},
 		},
 		{
-			name:     "v3 metadata ignore other types not string and not com.google.csm.telemetry_labels",
-			resource: v3ClusterAnyWithTelemetryLabelsIgnoreSome,
-			wantName: v3ClusterName,
+			name:      "v3 metadata ignore other types not string and not com.google.csm.telemetry_labels",
+			resource:  v3ClusterAnyWithTelemetryLabelsIgnoreSome,
+			serverCfg: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
+			wantName:  v3ClusterName,
 			wantUpdate: ClusterUpdate{
 				ClusterName:     v3ClusterName,
 				EDSServiceName:  v3Service,
-				LRSServerConfig: ClusterLRSServerSelf,
+				LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
 				Raw:             v3ClusterAnyWithTelemetryLabelsIgnoreSome,
 				TelemetryLabels: map[string]string{
 					"service_name": "grpc-service",
@@ -1415,7 +1403,7 @@
 	}
 	for _, test := range tests {
 		t.Run(test.name, func(t *testing.T) {
-			name, update, err := unmarshalClusterResource(test.resource)
+			name, update, err := unmarshalClusterResource(test.resource, test.serverCfg)
 			if (err != nil) != test.wantErr {
 				t.Fatalf("unmarshalClusterResource(%s), got err: %v, wantErr: %v", pretty.ToJSON(test.resource), err, test.wantErr)
 			}
@@ -1584,7 +1572,7 @@
 	}
 	for _, test := range tests {
 		t.Run(test.name, func(t *testing.T) {
-			update, err := validateClusterAndConstructClusterUpdate(test.cluster)
+			update, err := validateClusterAndConstructClusterUpdate(test.cluster, nil)
 			if (err != nil) != test.wantErr {
 				t.Errorf("validateClusterAndConstructClusterUpdate() returned err %v wantErr %v)", err, test.wantErr)
 			}