blob: b473b260b7d636b3ea827532cc405610c10124a1 [file] [log] [blame]
# Copyright 2024 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import re
from typing import AnyStr, Callable, Dict, Iterable, List, Optional, Union
from google.protobuf import struct_pb2
from grpc_observability._observability import OptionalLabelType
from grpc_observability._open_telemetry_plugin import OpenTelemetryLabelInjector
from grpc_observability._open_telemetry_plugin import OpenTelemetryPlugin
from grpc_observability._open_telemetry_plugin import OpenTelemetryPluginOption
# pytype: disable=pyi-error
from opentelemetry.metrics import MeterProvider
from opentelemetry.resourcedetector.gcp_resource_detector import (
GoogleCloudResourceDetector,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.semconv.resource import ResourceAttributes
TRAFFIC_DIRECTOR_AUTHORITY = "traffic-director-global.xds.googleapis.com"
UNKNOWN_VALUE = "unknown"
TYPE_GCE = "gcp_compute_engine"
TYPE_GKE = "gcp_kubernetes_engine"
MESH_ID_PREFIX = "mesh:"
METADATA_EXCHANGE_KEY_FIXED_MAP = {
"type": "csm.remote_workload_type",
"canonical_service": "csm.remote_workload_canonical_service",
}
METADATA_EXCHANGE_KEY_GKE_MAP = {
"workload_name": "csm.remote_workload_name",
"namespace_name": "csm.remote_workload_namespace_name",
"cluster_name": "csm.remote_workload_cluster_name",
"location": "csm.remote_workload_location",
"project_id": "csm.remote_workload_project_id",
}
METADATA_EXCHANGE_KEY_GCE_MAP = {
"workload_name": "csm.remote_workload_name",
"location": "csm.remote_workload_location",
"project_id": "csm.remote_workload_project_id",
}
class CSMOpenTelemetryLabelInjector(OpenTelemetryLabelInjector):
"""
An implementation of OpenTelemetryLabelInjector for CSM.
This injector will fetch labels from GCP resource detector and
environment, it's also responsible for serialize and deserialize
metadata exchange labels.
"""
_exchange_labels: Dict[str, AnyStr]
_additional_exchange_labels: Dict[str, str]
def __init__(self):
fields = {}
self._exchange_labels = {}
self._additional_exchange_labels = {}
# Labels from environment
canonical_service_value = os.getenv(
"CSM_CANONICAL_SERVICE_NAME", UNKNOWN_VALUE
)
workload_name_value = os.getenv("CSM_WORKLOAD_NAME", UNKNOWN_VALUE)
mesh_id = os.getenv("CSM_MESH_ID", UNKNOWN_VALUE)
gcp_resource = GoogleCloudResourceDetector().detect()
resource_type_value = get_resource_type(gcp_resource)
namespace_value = get_str_value_from_resource(
ResourceAttributes.K8S_NAMESPACE_NAME, gcp_resource
)
cluster_name_value = get_str_value_from_resource(
ResourceAttributes.K8S_CLUSTER_NAME, gcp_resource
)
# ResourceAttributes.CLOUD_AVAILABILITY_ZONE are called
# "zones" on Google Cloud.
location_value = get_str_value_from_resource("cloud.zone", gcp_resource)
if location_value == UNKNOWN_VALUE:
location_value = get_str_value_from_resource(
ResourceAttributes.CLOUD_REGION, gcp_resource
)
project_id_value = get_str_value_from_resource(
ResourceAttributes.CLOUD_ACCOUNT_ID, gcp_resource
)
fields["type"] = struct_pb2.Value(string_value=resource_type_value)
fields["canonical_service"] = struct_pb2.Value(
string_value=canonical_service_value
)
if resource_type_value == TYPE_GKE:
fields["workload_name"] = struct_pb2.Value(
string_value=workload_name_value
)
fields["namespace_name"] = struct_pb2.Value(
string_value=namespace_value
)
fields["cluster_name"] = struct_pb2.Value(
string_value=cluster_name_value
)
fields["location"] = struct_pb2.Value(string_value=location_value)
fields["project_id"] = struct_pb2.Value(
string_value=project_id_value
)
elif resource_type_value == TYPE_GCE:
fields["workload_name"] = struct_pb2.Value(
string_value=workload_name_value
)
fields["location"] = struct_pb2.Value(string_value=location_value)
fields["project_id"] = struct_pb2.Value(
string_value=project_id_value
)
serialized_struct = struct_pb2.Struct(fields=fields)
serialized_str = serialized_struct.SerializeToString()
self._exchange_labels = {"XEnvoyPeerMetadata": serialized_str}
self._additional_exchange_labels["csm.workload_canonical_service"] = (
canonical_service_value
)
self._additional_exchange_labels["csm.mesh_id"] = mesh_id
def get_labels_for_exchange(self) -> Dict[str, AnyStr]:
return self._exchange_labels
def get_additional_labels(
self, include_exchange_labels: bool
) -> Dict[str, str]:
if include_exchange_labels:
return self._additional_exchange_labels
return {}
@staticmethod
def deserialize_labels(labels: Dict[str, AnyStr]) -> Dict[str, AnyStr]:
deserialized_labels = {}
for key, value in labels.items():
if key == "XEnvoyPeerMetadata":
pb_struct = struct_pb2.Struct()
pb_struct.ParseFromString(value)
remote_type = get_value_from_struct("type", pb_struct)
for (
local_key,
remote_key,
) in METADATA_EXCHANGE_KEY_FIXED_MAP.items():
deserialized_labels[remote_key] = get_value_from_struct(
local_key, pb_struct
)
if remote_type == TYPE_GKE:
for (
local_key,
remote_key,
) in METADATA_EXCHANGE_KEY_GKE_MAP.items():
deserialized_labels[remote_key] = get_value_from_struct(
local_key, pb_struct
)
elif remote_type == TYPE_GCE:
for (
local_key,
remote_key,
) in METADATA_EXCHANGE_KEY_GCE_MAP.items():
deserialized_labels[remote_key] = get_value_from_struct(
local_key, pb_struct
)
# If CSM label injector is enabled on server side but client didn't send
# XEnvoyPeerMetadata, we'll record remote label as unknown.
else:
for remote_key in METADATA_EXCHANGE_KEY_FIXED_MAP.values():
deserialized_labels[remote_key] = UNKNOWN_VALUE
deserialized_labels[key] = value
return deserialized_labels
class CsmOpenTelemetryPluginOption(OpenTelemetryPluginOption):
"""
An implementation of OpenTelemetryPlugin for CSM.
"""
_label_injector: CSMOpenTelemetryLabelInjector
def __init__(self):
self._label_injector = CSMOpenTelemetryLabelInjector()
@staticmethod
def is_active_on_client_channel(target: str) -> bool:
"""Determines whether this plugin option is active on a channel based on target.
Args:
target: Required. The target for the RPC.
Returns:
True if this plugin option is active on the channel, false otherwise.
"""
# CSM channels should have an "xds" scheme
if not target.startswith("xds:"):
return False
# If scheme is correct, the authority should be TD if exist
authority_pattern = r"^xds:\/\/([^/]+)"
match = re.search(authority_pattern, target)
if match:
return TRAFFIC_DIRECTOR_AUTHORITY in match.group(1)
# Return True if the authority doesn't exist
return True
@staticmethod
def is_active_on_server(
xds: bool, # pylint: disable=unused-argument #noqa: ARG004
) -> bool:
"""Determines whether this plugin option is active on a given server.
Since servers don't need to be xds enabled to work as part of a service
mesh, we're returning True and enable this PluginOption for all servers.
Note: This always returns true because server can be part of the mesh even
if it's not xds-enabled. And we want CSM labels for those servers too.
Args:
xds: Required. if this server is build for xds.
Returns:
True if this plugin option is active on the server, false otherwise.
"""
return True
def get_label_injector(self) -> OpenTelemetryLabelInjector:
return self._label_injector
# pylint: disable=no-self-use
class CsmOpenTelemetryPlugin(OpenTelemetryPlugin):
"""Describes a Plugin for CSM OpenTelemetry observability.
This is class is part of an EXPERIMENTAL API.
"""
plugin_options: Iterable[OpenTelemetryPluginOption]
meter_provider: Optional[MeterProvider]
generic_method_attribute_filter: Callable[[str], bool]
def __init__(
self,
*,
plugin_options: Optional[Iterable[OpenTelemetryPluginOption]] = None,
meter_provider: Optional[MeterProvider] = None,
generic_method_attribute_filter: Optional[Callable[[str], bool]] = None,
):
plugin_options = plugin_options or []
new_options = list(plugin_options) + [CsmOpenTelemetryPluginOption()]
super().__init__(
plugin_options=new_options,
meter_provider=meter_provider,
generic_method_attribute_filter=generic_method_attribute_filter,
)
def _get_enabled_optional_labels(self) -> List[OptionalLabelType]:
return [OptionalLabelType.XDS_SERVICE_LABELS]
def get_value_from_struct(key: str, struct: struct_pb2.Struct) -> str:
value = struct.fields.get(key)
if not value:
return UNKNOWN_VALUE
return value.string_value
def get_str_value_from_resource(
attribute: Union[ResourceAttributes, str], resource: Resource
) -> str:
value = resource.attributes.get(attribute, UNKNOWN_VALUE)
return str(value)
# pylint: disable=line-too-long
def get_resource_type(gcp_resource: Resource) -> str:
# Convert resource type from GoogleCloudResourceDetector to the value we used for
# metadata exchange.
# Reference: https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/cc61f23a5ff2f16f4aa2c38d07e55153828849cc/opentelemetry-resourcedetector-gcp/src/opentelemetry/resourcedetector/gcp_resource_detector/__init__.py#L96
gcp_resource_type = get_str_value_from_resource(
"gcp.resource_type", gcp_resource
)
if gcp_resource_type == "gke_container":
return TYPE_GKE
if gcp_resource_type == "gce_instance":
return TYPE_GCE
return gcp_resource_type