stats/opentelemetry: Add OpenTelemetry instrumentation component (#7166)

diff --git a/stats/opentelemetry/client_metrics.go b/stats/opentelemetry/client_metrics.go
new file mode 100644
index 0000000..ca78253
--- /dev/null
+++ b/stats/opentelemetry/client_metrics.go
@@ -0,0 +1,198 @@
+/*
+ * Copyright 2024 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package opentelemetry
+
+import (
+	"context"
+	"sync/atomic"
+	"time"
+
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/stats"
+	"google.golang.org/grpc/status"
+
+	"go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/metric"
+)
+
+type clientStatsHandler struct {
+	o Options
+
+	clientMetrics clientMetrics
+}
+
+func (csh *clientStatsHandler) initializeMetrics() {
+	// Will set no metrics to record, logically making this stats handler a
+	// no-op.
+	if csh.o.MetricsOptions.MeterProvider == nil {
+		return
+	}
+
+	meter := csh.o.MetricsOptions.MeterProvider.Meter("grpc-go " + grpc.Version)
+	if meter == nil {
+		return
+	}
+
+	setOfMetrics := csh.o.MetricsOptions.Metrics.metrics
+
+	csh.clientMetrics.attemptStarted = createInt64Counter(setOfMetrics, "grpc.client.attempt.started", meter, metric.WithUnit("attempt"), metric.WithDescription("Number of client call attempts started."))
+	csh.clientMetrics.attemptDuration = createFloat64Histogram(setOfMetrics, "grpc.client.attempt.duration", meter, metric.WithUnit("s"), metric.WithDescription("End-to-end time taken to complete a client call attempt."), metric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
+	csh.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.sent_total_compressed_message_size", meter, metric.WithUnit("By"), metric.WithDescription("Compressed message bytes sent per client call attempt."), metric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
+	csh.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.rcvd_total_compressed_message_size", meter, metric.WithUnit("By"), metric.WithDescription("Compressed message bytes received per call attempt."), metric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
+	csh.clientMetrics.callDuration = createFloat64Histogram(setOfMetrics, "grpc.client.call.duration", meter, metric.WithUnit("s"), metric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), metric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
+}
+
+func (csh *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
+	ci := &callInfo{
+		target: csh.determineTarget(cc),
+		method: csh.determineMethod(method, opts...),
+	}
+	ctx = setCallInfo(ctx, ci)
+
+	startTime := time.Now()
+	err := invoker(ctx, method, req, reply, cc, opts...)
+	csh.perCallMetrics(ctx, err, startTime, ci)
+	return err
+}
+
+// determineTarget determines the target to record attributes with. This will be
+// "other" if target filter is set and specifies, the target name as is
+// otherwise.
+func (csh *clientStatsHandler) determineTarget(cc *grpc.ClientConn) string {
+	target := cc.CanonicalTarget()
+	if f := csh.o.MetricsOptions.TargetAttributeFilter; f != nil && !f(target) {
+		target = "other"
+	}
+	return target
+}
+
+// determineMethod determines the method to record attributes with. This will be
+// "other" if StaticMethod isn't specified or if method filter is set and
+// specifies, the method name as is otherwise.
+func (csh *clientStatsHandler) determineMethod(method string, opts ...grpc.CallOption) string {
+	for _, opt := range opts {
+		if _, ok := opt.(grpc.StaticMethodCallOption); ok {
+			return removeLeadingSlash(method)
+		}
+	}
+	return "other"
+}
+
+func (csh *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
+	ci := &callInfo{
+		target: csh.determineTarget(cc),
+		method: csh.determineMethod(method, opts...),
+	}
+	ctx = setCallInfo(ctx, ci)
+	startTime := time.Now()
+
+	callback := func(err error) {
+		csh.perCallMetrics(ctx, err, startTime, ci)
+	}
+	opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...)
+	return streamer(ctx, desc, cc, method, opts...)
+}
+
+func (csh *clientStatsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) {
+	s := status.Convert(err)
+	callLatency := float64(time.Since(startTime)) / float64(time.Second)
+	csh.clientMetrics.callDuration.Record(ctx, callLatency, metric.WithAttributes(attribute.String("grpc.method", ci.method), attribute.String("grpc.target", ci.target), attribute.String("grpc.status", canonicalString(s.Code()))))
+}
+
+// TagConn exists to satisfy stats.Handler.
+func (csh *clientStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
+	return ctx
+}
+
+// HandleConn exists to satisfy stats.Handler.
+func (csh *clientStatsHandler) HandleConn(context.Context, stats.ConnStats) {}
+
+// TagRPC implements per RPC attempt context management.
+func (csh *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
+	mi := &metricsInfo{ // populates information about RPC start.
+		startTime: time.Now(),
+	}
+	ri := &rpcInfo{
+		mi: mi,
+	}
+	return setRPCInfo(ctx, ri)
+}
+
+func (csh *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
+	ri := getRPCInfo(ctx)
+	if ri == nil {
+		logger.Error("ctx passed into client side stats handler metrics event handling has no client attempt data present")
+		return
+	}
+	csh.processRPCEvent(ctx, rs, ri.mi)
+}
+
+func (csh *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCStats, mi *metricsInfo) {
+	switch st := s.(type) {
+	case *stats.Begin:
+		ci := getCallInfo(ctx)
+		if ci == nil {
+			logger.Error("ctx passed into client side stats handler metrics event handling has no metrics data present")
+			return
+		}
+
+		csh.clientMetrics.attemptStarted.Add(ctx, 1, metric.WithAttributes(attribute.String("grpc.method", ci.method), attribute.String("grpc.target", ci.target)))
+	case *stats.OutPayload:
+		atomic.AddInt64(&mi.sentCompressedBytes, int64(st.CompressedLength))
+	case *stats.InPayload:
+		atomic.AddInt64(&mi.recvCompressedBytes, int64(st.CompressedLength))
+	case *stats.End:
+		csh.processRPCEnd(ctx, mi, st)
+	default:
+	}
+}
+
+func (csh *clientStatsHandler) processRPCEnd(ctx context.Context, mi *metricsInfo, e *stats.End) {
+	ci := getCallInfo(ctx)
+	if ci == nil {
+		logger.Error("ctx passed into client side stats handler metrics event handling has no metrics data present")
+		return
+	}
+	latency := float64(time.Since(mi.startTime)) / float64(time.Second)
+	st := "OK"
+	if e.Error != nil {
+		s, _ := status.FromError(e.Error)
+		st = canonicalString(s.Code())
+	}
+
+	clientAttributeOption := metric.WithAttributes(attribute.String("grpc.method", ci.method), attribute.String("grpc.target", ci.target), attribute.String("grpc.status", st))
+	csh.clientMetrics.attemptDuration.Record(ctx, latency, clientAttributeOption)
+	csh.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.sentCompressedBytes), clientAttributeOption)
+	csh.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.recvCompressedBytes), clientAttributeOption)
+}
+
+const (
+	// ClientAttemptStarted is the number of client call attempts started.
+	ClientAttemptStarted Metric = "grpc.client.attempt.started"
+	// ClientAttemptDuration is the end-to-end time taken to complete a client
+	// call attempt.
+	ClientAttemptDuration Metric = "grpc.client.attempt.duration"
+	// ClientAttemptSentCompressedTotalMessageSize is the compressed message
+	// bytes sent per client call attempt.
+	ClientAttemptSentCompressedTotalMessageSize Metric = "grpc.client.attempt.sent_total_compressed_message_size"
+	// ClientAttemptRcvdCompressedTotalMessageSize is the compressed message
+	// bytes received per call attempt.
+	ClientAttemptRcvdCompressedTotalMessageSize Metric = "grpc.client.attempt.rcvd_total_compressed_message_size"
+	// ClientCallDuration is the time taken by gRPC to complete an RPC from
+	// application's perspective.
+	ClientCallDuration Metric = "grpc.client.call.duration"
+)
diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go
new file mode 100644
index 0000000..c0850d6
--- /dev/null
+++ b/stats/opentelemetry/e2e_test.go
@@ -0,0 +1,602 @@
+/*
+ * Copyright 2024 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package opentelemetry
+
+import (
+	"context"
+	"fmt"
+	"io"
+	"testing"
+	"time"
+
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/encoding/gzip"
+	"google.golang.org/grpc/internal/grpctest"
+	"google.golang.org/grpc/internal/stubserver"
+	testgrpc "google.golang.org/grpc/interop/grpc_testing"
+	testpb "google.golang.org/grpc/interop/grpc_testing"
+
+	"go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/sdk/metric"
+	"go.opentelemetry.io/otel/sdk/metric/metricdata"
+	"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
+)
+
+var defaultTestTimeout = 5 * time.Second
+
+type s struct {
+	grpctest.Tester
+}
+
+func Test(t *testing.T) {
+	grpctest.RunSubTests(t, s{})
+}
+
+// waitForServerCompletedRPCs waits until the unary and streaming stats.End
+// calls are finished processing. It does this by waiting for the expected
+// metric triggered by stats.End to appear through the passed in metrics reader.
+func waitForServerCompletedRPCs(ctx context.Context, reader metric.Reader, wantMetric metricdata.Metrics, t *testing.T) (map[string]metricdata.Metrics, error) {
+	for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
+		rm := &metricdata.ResourceMetrics{}
+		reader.Collect(ctx, rm)
+		gotMetrics := map[string]metricdata.Metrics{}
+		for _, sm := range rm.ScopeMetrics {
+			for _, m := range sm.Metrics {
+				gotMetrics[m.Name] = m
+			}
+		}
+		val, ok := gotMetrics[wantMetric.Name]
+		if !ok {
+			continue
+		}
+		if !metricdatatest.AssertEqual(t, wantMetric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) {
+			continue
+		}
+		return gotMetrics, nil
+	}
+	return nil, fmt.Errorf("error waiting for metric %v: %v", wantMetric, ctx.Err())
+}
+
+// setup creates a stub server with OpenTelemetry component configured on client
+// and server side. It returns a reader for metrics emitted from OpenTelemetry
+// component and the server.
+func setup(t *testing.T, tafOn bool, maf func(string) bool) (*metric.ManualReader, *stubserver.StubServer) {
+	reader := metric.NewManualReader()
+	provider := metric.NewMeterProvider(
+		metric.WithReader(reader),
+	)
+	ss := &stubserver.StubServer{
+		UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
+			return &testpb.SimpleResponse{Payload: &testpb.Payload{
+				Body: make([]byte, 10000),
+			}}, nil
+		},
+		FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
+			for {
+				_, err := stream.Recv()
+				if err == io.EOF {
+					return nil
+				}
+			}
+		},
+	}
+	var taf func(string) bool
+	if tafOn {
+		taf = func(str string) bool {
+			return str != ss.Target
+		}
+	}
+	if err := ss.Start([]grpc.ServerOption{ServerOption(Options{
+		MetricsOptions: MetricsOptions{
+			MeterProvider:         provider,
+			Metrics:               DefaultMetrics,
+			TargetAttributeFilter: taf,
+			MethodAttributeFilter: maf,
+		}})}, DialOption(Options{
+		MetricsOptions: MetricsOptions{
+			MeterProvider:         provider,
+			Metrics:               DefaultMetrics,
+			TargetAttributeFilter: taf,
+			MethodAttributeFilter: maf,
+		},
+	})); err != nil {
+		t.Fatalf("Error starting endpoint server: %v", err)
+	}
+	return reader, ss
+}
+
+// TestMethodTargetAttributeFilter tests the method and target attribute filter.
+// The method and target filter set should bucket the grpc.method/grpc.target
+// attribute into "other" if filter specifies.
+func (s) TestMethodTargetAttributeFilter(t *testing.T) {
+	maf := func(str string) bool {
+		// Will allow duplex/any other type of RPC.
+		return str != "/grpc.testing.TestService/UnaryCall"
+	}
+	// pull out setup into a helper
+	reader, ss := setup(t, true, maf)
+	defer ss.Stop()
+
+	// make a single RPC (unary rpc), and filter out the target and method
+	// that would correspond.
+	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+	defer cancel()
+	if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{
+		Body: make([]byte, 10000),
+	}}); err != nil {
+		t.Fatalf("Unexpected error from UnaryCall: %v", err)
+	}
+	stream, err := ss.Client.FullDuplexCall(ctx)
+	if err != nil {
+		t.Fatalf("ss.Client.FullDuplexCall failed: %f", err)
+	}
+
+	stream.CloseSend()
+	if _, err = stream.Recv(); err != io.EOF {
+		t.Fatalf("unexpected error: %v, expected an EOF error", err)
+	}
+	rm := &metricdata.ResourceMetrics{}
+	reader.Collect(ctx, rm)
+
+	wantMetrics := []metricdata.Metrics{
+		{
+			Name:        "grpc.client.attempt.started",
+			Description: "Number of client call attempts started.",
+			Unit:        "attempt",
+			Data: metricdata.Sum[int64]{
+				DataPoints: []metricdata.DataPoint[int64]{
+					{
+						Attributes: attribute.NewSet(attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall"), attribute.String("grpc.target", "other")),
+						Value:      1,
+					},
+					{
+						Attributes: attribute.NewSet(attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall"), attribute.String("grpc.target", "other")),
+						Value:      1,
+					},
+				},
+				Temporality: metricdata.CumulativeTemporality,
+				IsMonotonic: true,
+			},
+		},
+	}
+	gotMetrics := map[string]metricdata.Metrics{}
+	for _, sm := range rm.ScopeMetrics {
+		for _, m := range sm.Metrics {
+			gotMetrics[m.Name] = m
+		}
+	}
+
+	for _, metric := range wantMetrics {
+		val, ok := gotMetrics[metric.Name]
+		if !ok {
+			t.Fatalf("metric %v not present in recorded metrics", metric.Name)
+		}
+		if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) {
+			t.Fatalf("metrics data type not equal for metric: %v", metric.Name)
+		}
+	}
+}
+
+// assertDataPointWithinFiveSeconds asserts the metric passed in contains
+// a histogram with dataPoints that fall within buckets that are <=5.
+func assertDataPointWithinFiveSeconds(metric metricdata.Metrics) error {
+	histo, ok := metric.Data.(metricdata.Histogram[float64])
+	if !ok {
+		return fmt.Errorf("metric data is not histogram")
+	}
+	for _, dataPoint := range histo.DataPoints {
+		var boundWithFive int
+		for i, bucket := range dataPoint.Bounds {
+			if bucket >= 5 {
+				boundWithFive = i
+			}
+		}
+		foundPoint := false
+		for i, bucket := range dataPoint.BucketCounts {
+			if i >= boundWithFive {
+				return fmt.Errorf("data point not found in bucket <=5 seconds")
+			}
+			if bucket == 1 {
+				foundPoint = true
+				break
+			}
+		}
+		if !foundPoint {
+			return fmt.Errorf("no data point found for metric")
+		}
+	}
+	return nil
+}
+
+// TestAllMetricsOneFunction tests emitted metrics from OpenTelemetry
+// instrumentation component. It then configures a system with a gRPC Client and
+// gRPC server with the OpenTelemetry Dial and Server Option configured
+// specifying all the metrics provided by this package, and makes a Unary RPC
+// and a Streaming RPC. These two RPCs should cause certain recording for each
+// registered metric observed through a Manual Metrics Reader on the provided
+// OpenTelemetry SDK's Meter Provider. It then makes an RPC that is unregistered
+// on the Client (no StaticMethodCallOption set) and Server. The method
+// attribute on subsequent metrics should be bucketed in "other".
+func (s) TestAllMetricsOneFunction(t *testing.T) {
+	reader, ss := setup(t, false, nil)
+	defer ss.Stop()
+	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+	defer cancel()
+	// Make two RPC's, a unary RPC and a streaming RPC. These should cause
+	// certain metrics to be emitted, which should be able to be observed
+	// through the Metric Reader.
+	if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{
+		Body: make([]byte, 10000),
+	}}, grpc.UseCompressor(gzip.Name)); err != nil { // Deterministic compression.
+		t.Fatalf("Unexpected error from UnaryCall: %v", err)
+	}
+	stream, err := ss.Client.FullDuplexCall(ctx)
+	if err != nil {
+		t.Fatalf("ss.Client.FullDuplexCall failed: %f", err)
+	}
+
+	stream.CloseSend()
+	if _, err = stream.Recv(); err != io.EOF {
+		t.Fatalf("unexpected error: %v, expected an EOF error", err)
+	}
+
+	rm := &metricdata.ResourceMetrics{}
+	reader.Collect(ctx, rm)
+
+	gotMetrics := map[string]metricdata.Metrics{}
+	for _, sm := range rm.ScopeMetrics {
+		for _, m := range sm.Metrics {
+			gotMetrics[m.Name] = m
+		}
+	}
+
+	unaryMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall")
+	duplexMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall")
+
+	targetAttr := attribute.String("grpc.target", ss.Target)
+	statusAttr := attribute.String("grpc.status", "OK")
+
+	wantMetrics := []metricdata.Metrics{
+		{
+			Name:        "grpc.client.attempt.started",
+			Description: "Number of client call attempts started.",
+			Unit:        "attempt",
+			Data: metricdata.Sum[int64]{
+				DataPoints: []metricdata.DataPoint[int64]{
+					{
+						Attributes: attribute.NewSet(unaryMethodAttr, targetAttr),
+						Value:      1,
+					},
+					{
+						Attributes: attribute.NewSet(duplexMethodAttr, targetAttr),
+						Value:      1,
+					},
+				},
+				Temporality: metricdata.CumulativeTemporality,
+				IsMonotonic: true,
+			},
+		},
+		{
+			Name:        "grpc.client.attempt.duration",
+			Description: "End-to-end time taken to complete a client call attempt.",
+			Unit:        "s",
+			Data: metricdata.Histogram[float64]{
+				DataPoints: []metricdata.HistogramDataPoint[float64]{
+					{
+						Attributes: attribute.NewSet(unaryMethodAttr, targetAttr, statusAttr),
+						Count:      1,
+						Bounds:     DefaultLatencyBounds,
+					},
+					{
+						Attributes: attribute.NewSet(duplexMethodAttr, targetAttr, statusAttr),
+						Count:      1,
+						Bounds:     DefaultLatencyBounds,
+					},
+				},
+				Temporality: metricdata.CumulativeTemporality,
+			},
+		},
+		{
+			Name:        "grpc.client.attempt.sent_total_compressed_message_size",
+			Description: "Compressed message bytes sent per client call attempt.",
+			Unit:        "By",
+			Data: metricdata.Histogram[int64]{
+				DataPoints: []metricdata.HistogramDataPoint[int64]{
+					{
+						Attributes:   attribute.NewSet(unaryMethodAttr, targetAttr, statusAttr),
+						Count:        1,
+						Bounds:       DefaultSizeBounds,
+						BucketCounts: []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
+						Min:          metricdata.NewExtrema(int64(57)),
+						Max:          metricdata.NewExtrema(int64(57)),
+						Sum:          57,
+					},
+					{
+						Attributes:   attribute.NewSet(duplexMethodAttr, targetAttr, statusAttr),
+						Count:        1,
+						Bounds:       DefaultSizeBounds,
+						BucketCounts: []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
+						Min:          metricdata.NewExtrema(int64(0)),
+						Max:          metricdata.NewExtrema(int64(0)),
+						Sum:          0,
+					},
+				},
+				Temporality: metricdata.CumulativeTemporality,
+			},
+		},
+		{
+			Name:        "grpc.client.attempt.rcvd_total_compressed_message_size",
+			Description: "Compressed message bytes received per call attempt.",
+			Unit:        "By",
+			Data: metricdata.Histogram[int64]{
+				DataPoints: []metricdata.HistogramDataPoint[int64]{
+					{
+						Attributes:   attribute.NewSet(unaryMethodAttr, targetAttr, statusAttr),
+						Count:        1,
+						Bounds:       DefaultSizeBounds,
+						BucketCounts: []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
+						Min:          metricdata.NewExtrema(int64(57)),
+						Max:          metricdata.NewExtrema(int64(57)),
+						Sum:          57,
+					},
+					{
+						Attributes:   attribute.NewSet(duplexMethodAttr, targetAttr, statusAttr),
+						Count:        1,
+						Bounds:       DefaultSizeBounds,
+						BucketCounts: []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
+						Min:          metricdata.NewExtrema(int64(0)),
+						Max:          metricdata.NewExtrema(int64(0)),
+						Sum:          0,
+					},
+				},
+				Temporality: metricdata.CumulativeTemporality,
+			},
+		},
+		{
+			Name:        "grpc.client.call.duration",
+			Description: "Time taken by gRPC to complete an RPC from application's perspective.",
+			Unit:        "s",
+			Data: metricdata.Histogram[float64]{
+				DataPoints: []metricdata.HistogramDataPoint[float64]{
+					{
+						Attributes: attribute.NewSet(unaryMethodAttr, targetAttr, statusAttr),
+						Count:      1,
+						Bounds:     DefaultLatencyBounds,
+					},
+					{
+						Attributes: attribute.NewSet(duplexMethodAttr, targetAttr, statusAttr),
+						Count:      1,
+						Bounds:     DefaultLatencyBounds,
+					},
+				},
+				Temporality: metricdata.CumulativeTemporality,
+			},
+		},
+		{
+			Name:        "grpc.server.call.started",
+			Description: "Number of server calls started.",
+			Unit:        "call",
+			Data: metricdata.Sum[int64]{
+				DataPoints: []metricdata.DataPoint[int64]{
+					{
+						Attributes: attribute.NewSet(unaryMethodAttr),
+						Value:      1,
+					},
+					{
+						Attributes: attribute.NewSet(duplexMethodAttr),
+						Value:      1,
+					},
+				},
+				Temporality: metricdata.CumulativeTemporality,
+				IsMonotonic: true,
+			},
+		},
+		{
+			Name:        "grpc.server.call.sent_total_compressed_message_size",
+			Unit:        "By",
+			Description: "Compressed message bytes sent per server call.",
+			Data: metricdata.Histogram[int64]{
+				DataPoints: []metricdata.HistogramDataPoint[int64]{
+					{
+						Attributes:   attribute.NewSet(unaryMethodAttr, statusAttr),
+						Count:        1,
+						Bounds:       DefaultSizeBounds,
+						BucketCounts: []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
+						Min:          metricdata.NewExtrema(int64(57)),
+						Max:          metricdata.NewExtrema(int64(57)),
+						Sum:          57,
+					},
+					{
+						Attributes:   attribute.NewSet(duplexMethodAttr, statusAttr),
+						Count:        1,
+						Bounds:       DefaultSizeBounds,
+						BucketCounts: []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
+						Min:          metricdata.NewExtrema(int64(0)),
+						Max:          metricdata.NewExtrema(int64(0)),
+						Sum:          0,
+					},
+				},
+				Temporality: metricdata.CumulativeTemporality,
+			},
+		},
+		{
+			Name:        "grpc.server.call.rcvd_total_compressed_message_size",
+			Unit:        "By",
+			Description: "Compressed message bytes received per server call.",
+			Data: metricdata.Histogram[int64]{
+				DataPoints: []metricdata.HistogramDataPoint[int64]{
+					{
+						Attributes:   attribute.NewSet(unaryMethodAttr, statusAttr),
+						Count:        1,
+						Bounds:       DefaultSizeBounds,
+						BucketCounts: []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
+						Min:          metricdata.NewExtrema(int64(57)),
+						Max:          metricdata.NewExtrema(int64(57)),
+						Sum:          57,
+					},
+					{
+						Attributes:   attribute.NewSet(duplexMethodAttr, statusAttr),
+						Count:        1,
+						Bounds:       DefaultSizeBounds,
+						BucketCounts: []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
+						Min:          metricdata.NewExtrema(int64(0)),
+						Max:          metricdata.NewExtrema(int64(0)),
+						Sum:          0,
+					},
+				},
+				Temporality: metricdata.CumulativeTemporality,
+			},
+		},
+		{
+			Name:        "grpc.server.call.duration",
+			Description: "End-to-end time taken to complete a call from server transport's perspective.",
+			Unit:        "s",
+			Data: metricdata.Histogram[float64]{
+				DataPoints: []metricdata.HistogramDataPoint[float64]{
+					{
+						Attributes: attribute.NewSet(unaryMethodAttr, statusAttr),
+						Count:      1,
+						Bounds:     DefaultLatencyBounds,
+					},
+					{
+						Attributes: attribute.NewSet(duplexMethodAttr, statusAttr),
+						Count:      1,
+						Bounds:     DefaultLatencyBounds,
+					},
+				},
+				Temporality: metricdata.CumulativeTemporality,
+			},
+		},
+	}
+
+	for _, metric := range wantMetrics {
+		if metric.Name == "grpc.server.call.sent_total_compressed_message_size" || metric.Name == "grpc.server.call.rcvd_total_compressed_message_size" {
+			// Sync the metric reader to see the event because stats.End is
+			// handled async server side. Thus, poll until metrics created from
+			// stats.End show up.
+			if gotMetrics, err = waitForServerCompletedRPCs(ctx, reader, metric, t); err != nil {
+				t.Fatalf("error waiting for sent total compressed message size for metric: %v", metric.Name)
+			}
+			continue
+		}
+
+		// If one of the duration metrics, ignore the bucket counts, and make
+		// sure it count falls within a bucket <= 5 seconds (maximum duration of
+		// test due to context).
+		val, ok := gotMetrics[metric.Name]
+		if !ok {
+			t.Fatalf("metric %v not present in recorded metrics", metric.Name)
+		}
+		if metric.Name == "grpc.client.attempt.duration" || metric.Name == "grpc.client.call.duration" || metric.Name == "grpc.server.call.duration" {
+			if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars(), metricdatatest.IgnoreValue()) {
+				t.Fatalf("metrics data type not equal for metric: %v", metric.Name)
+			}
+			if err := assertDataPointWithinFiveSeconds(val); err != nil {
+				t.Fatalf("Data point not within five seconds for metric %v: %v", metric.Name, err)
+			}
+			continue
+		}
+
+		if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) {
+			t.Fatalf("metrics data type not equal for metric: %v", metric.Name)
+		}
+	}
+
+	stream, err = ss.Client.FullDuplexCall(ctx)
+	if err != nil {
+		t.Fatalf("ss.Client.FullDuplexCall failed: %f", err)
+	}
+
+	stream.CloseSend()
+	if _, err = stream.Recv(); err != io.EOF {
+		t.Fatalf("unexpected error: %v, expected an EOF error", err)
+	}
+	// This Invoke doesn't pass the StaticMethodCallOption. Thus, the method
+	// attribute should become "other" on client side metrics. Since it is also
+	// not registered on the server either, it should also become "other" on the
+	// server metrics method attribute.
+	ss.CC.Invoke(ctx, "/grpc.testing.TestService/UnregisteredCall", nil, nil, []grpc.CallOption{}...)
+	ss.CC.Invoke(ctx, "/grpc.testing.TestService/UnregisteredCall", nil, nil, []grpc.CallOption{}...)
+	ss.CC.Invoke(ctx, "/grpc.testing.TestService/UnregisteredCall", nil, nil, []grpc.CallOption{}...)
+
+	rm = &metricdata.ResourceMetrics{}
+	reader.Collect(ctx, rm)
+	gotMetrics = map[string]metricdata.Metrics{}
+	for _, sm := range rm.ScopeMetrics {
+		for _, m := range sm.Metrics {
+			gotMetrics[m.Name] = m
+		}
+	}
+	otherMethodAttr := attribute.String("grpc.method", "other")
+	wantMetrics = []metricdata.Metrics{
+		{
+			Name:        "grpc.client.attempt.started",
+			Description: "Number of client call attempts started.",
+			Unit:        "attempt",
+			Data: metricdata.Sum[int64]{
+				DataPoints: []metricdata.DataPoint[int64]{
+					{
+						Attributes: attribute.NewSet(unaryMethodAttr, targetAttr),
+						Value:      1,
+					},
+					{
+						Attributes: attribute.NewSet(duplexMethodAttr, targetAttr),
+						Value:      2,
+					},
+					{
+						Attributes: attribute.NewSet(otherMethodAttr, targetAttr),
+						Value:      3,
+					},
+				},
+				Temporality: metricdata.CumulativeTemporality,
+				IsMonotonic: true,
+			},
+		},
+		{
+			Name:        "grpc.server.call.started",
+			Description: "Number of server calls started.",
+			Unit:        "call",
+			Data: metricdata.Sum[int64]{
+				DataPoints: []metricdata.DataPoint[int64]{
+					{
+						Attributes: attribute.NewSet(unaryMethodAttr),
+						Value:      1,
+					},
+					{
+						Attributes: attribute.NewSet(duplexMethodAttr),
+						Value:      2,
+					},
+					{
+						Attributes: attribute.NewSet(otherMethodAttr),
+						Value:      3,
+					},
+				},
+				Temporality: metricdata.CumulativeTemporality,
+				IsMonotonic: true,
+			},
+		},
+	}
+	for _, metric := range wantMetrics {
+		val, ok := gotMetrics[metric.Name]
+		if !ok {
+			t.Fatalf("metric %v not present in recorded metrics", metric.Name)
+		}
+		if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) {
+			t.Fatalf("metrics data type not equal for metric: %v", metric.Name)
+		}
+	}
+}
diff --git a/stats/opentelemetry/example_test.go b/stats/opentelemetry/example_test.go
new file mode 100644
index 0000000..607bcf5
--- /dev/null
+++ b/stats/opentelemetry/example_test.go
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2024 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package opentelemetry_test
+
+import (
+	"strings"
+
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/credentials/insecure"
+	"google.golang.org/grpc/stats/opentelemetry"
+
+	"go.opentelemetry.io/otel/sdk/metric"
+)
+
+func Example_dialOption() {
+	// This is setting default bounds for a view. Setting these bounds through
+	// meter provider from SDK is recommended, as API calls in this module
+	// provide default bounds, but these calls are not guaranteed to be stable
+	// and API implementors are not required to implement bounds. Setting bounds
+	// through SDK ensures the bounds get picked up. The specific fields in
+	// Aggregation take precedence over defaults from API. For any fields unset
+	// in aggregation, defaults get picked up, so can have a mix of fields from
+	// SDK and fields created from API call. The overridden views themselves
+	// also follow same logic, only the specific views being created in the SDK
+	// use SDK information, the rest are created from API call.
+	reader := metric.NewManualReader()
+	provider := metric.NewMeterProvider(
+		metric.WithReader(reader),
+		metric.WithView(metric.NewView(metric.Instrument{
+			Name: "grpc.client.call.duration",
+		},
+			metric.Stream{
+				Aggregation: metric.AggregationExplicitBucketHistogram{
+					Boundaries: opentelemetry.DefaultSizeBounds, // The specific fields set in SDK take precedence over API.
+				},
+			},
+		)),
+	)
+
+	opts := opentelemetry.Options{
+		MetricsOptions: opentelemetry.MetricsOptions{
+			MeterProvider: provider,
+			Metrics:       opentelemetry.DefaultMetrics, // equivalent to unset - distinct from empty
+			TargetAttributeFilter: func(str string) bool {
+				return !strings.HasPrefix(str, "dns") // Filter out DNS targets.
+			},
+		},
+	}
+	do := opentelemetry.DialOption(opts)
+	cc, err := grpc.NewClient("<target string>", do, grpc.WithTransportCredentials(insecure.NewCredentials()))
+	if err != nil {
+		// Handle err.
+	}
+	defer cc.Close()
+}
+
+func Example_serverOption() {
+	reader := metric.NewManualReader()
+	provider := metric.NewMeterProvider(metric.WithReader(reader))
+	opts := opentelemetry.Options{
+		MetricsOptions: opentelemetry.MetricsOptions{
+			MeterProvider: provider,
+			// Because Metrics is unset, the user will get default metrics.
+			MethodAttributeFilter: func(str string) bool {
+				// Will allow duplex/any other type of RPC.
+				return str != "/grpc.testing.TestService/UnaryCall"
+			},
+		},
+	}
+	cc, err := grpc.NewClient("some-target", opentelemetry.DialOption(opts), grpc.WithTransportCredentials(insecure.NewCredentials()))
+	if err != nil {
+		// Handle err.
+	}
+	defer cc.Close()
+}
+
+func ExampleMetrics_excludeSome() {
+	// To exclude specific metrics, initialize Options as follows:
+	opts := opentelemetry.Options{
+		MetricsOptions: opentelemetry.MetricsOptions{
+			Metrics: opentelemetry.DefaultMetrics.Remove(opentelemetry.ClientAttemptDuration, opentelemetry.ClientAttemptRcvdCompressedTotalMessageSize),
+		},
+	}
+	do := opentelemetry.DialOption(opts)
+	cc, err := grpc.NewClient("<target string>", do, grpc.WithTransportCredentials(insecure.NewCredentials()))
+	if err != nil {
+		// Handle err.
+	}
+	defer cc.Close()
+}
+
+func ExampleMetrics_disableAll() {
+	// To disable all metrics, initialize Options as follows:
+	opts := opentelemetry.Options{
+		MetricsOptions: opentelemetry.MetricsOptions{
+			Metrics: opentelemetry.NewMetrics(), // Distinct to nil, which creates default metrics. This empty set creates no metrics.
+		},
+	}
+	do := opentelemetry.DialOption(opts)
+	cc, err := grpc.NewClient("<target string>", do, grpc.WithTransportCredentials(insecure.NewCredentials()))
+	if err != nil {
+		// Handle err.
+	}
+	defer cc.Close()
+}
+
+func ExampleMetrics_enableSome() {
+	// To only create specific metrics, initialize Options as follows:
+	opts := opentelemetry.Options{
+		MetricsOptions: opentelemetry.MetricsOptions{
+			Metrics: opentelemetry.NewMetrics(opentelemetry.ClientAttemptDuration, opentelemetry.ClientAttemptRcvdCompressedTotalMessageSize), // only create these metrics
+		},
+	}
+	do := opentelemetry.DialOption(opts)
+	cc, err := grpc.NewClient("<target string>", do, grpc.WithTransportCredentials(insecure.NewCredentials()))
+	if err != nil { // might fail vet
+		// Handle err.
+	}
+	defer cc.Close()
+}
diff --git a/stats/opentelemetry/go.mod b/stats/opentelemetry/go.mod
new file mode 100644
index 0000000..1573ad9
--- /dev/null
+++ b/stats/opentelemetry/go.mod
@@ -0,0 +1,24 @@
+module google.golang.org/grpc/stats/opentelemetry
+
+go 1.20
+
+replace google.golang.org/grpc => ../..
+
+require (
+	go.opentelemetry.io/otel v1.24.0
+	go.opentelemetry.io/otel/metric v1.24.0
+	go.opentelemetry.io/otel/sdk/metric v1.24.0
+	google.golang.org/grpc v1.62.1
+)
+
+require (
+	github.com/go-logr/logr v1.4.1 // indirect
+	github.com/go-logr/stdr v1.2.2 // indirect
+	go.opentelemetry.io/otel/sdk v1.24.0 // indirect
+	go.opentelemetry.io/otel/trace v1.24.0 // indirect
+	golang.org/x/net v0.22.0 // indirect
+	golang.org/x/sys v0.18.0 // indirect
+	golang.org/x/text v0.14.0 // indirect
+	google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
+	google.golang.org/protobuf v1.33.0 // indirect
+)
diff --git a/stats/opentelemetry/go.sum b/stats/opentelemetry/go.sum
new file mode 100644
index 0000000..a2ec727
--- /dev/null
+++ b/stats/opentelemetry/go.sum
@@ -0,0 +1,30 @@
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
+github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
+github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
+github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
+github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
+github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
+go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo=
+go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo=
+go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI=
+go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco=
+go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw=
+go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg=
+go.opentelemetry.io/otel/sdk/metric v1.24.0 h1:yyMQrPzF+k88/DbH7o4FMAs80puqd+9osbiBrJrz/w8=
+go.opentelemetry.io/otel/sdk/metric v1.24.0/go.mod h1:I6Y5FjH6rvEnTTAYQz3Mmv2kl6Ek5IIrmwTLqMrrOE0=
+go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI=
+go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU=
+golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
+golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
+golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
+golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
+golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
+google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 h1:NnYq6UN9ReLM9/Y01KWNOWyI5xQ9kbIms5GGJVwS/Yc=
+google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
+google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
+google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
diff --git a/stats/opentelemetry/opentelemetry.go b/stats/opentelemetry/opentelemetry.go
new file mode 100644
index 0000000..07fbac3
--- /dev/null
+++ b/stats/opentelemetry/opentelemetry.go
@@ -0,0 +1,298 @@
+/*
+ * Copyright 2024 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package opentelemetry implements opentelemetry instrumentation code for
+// gRPC-Go clients and servers.
+package opentelemetry
+
+import (
+	"context"
+	"strings"
+	"time"
+
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/grpclog"
+	"google.golang.org/grpc/internal"
+
+	"go.opentelemetry.io/otel/metric"
+	"go.opentelemetry.io/otel/metric/noop"
+)
+
+var logger = grpclog.Component("otel-plugin")
+
+var canonicalString = internal.CanonicalString.(func(codes.Code) string)
+
+var joinDialOptions = internal.JoinDialOptions.(func(...grpc.DialOption) grpc.DialOption)
+
+// Metric is an identifier for a metric provided by this package.
+type Metric string
+
+// Metrics is a set of metrics to record. Once created, Metrics is immutable,
+// however Add and Remove can make copies with specific metrics added or
+// removed, respectively.
+type Metrics struct {
+	// metrics are the set of metrics to initialize.
+	metrics map[Metric]bool
+}
+
+// NewMetrics returns a Metrics containing Metrics.
+func NewMetrics(metrics ...Metric) *Metrics {
+	newMetrics := make(map[Metric]bool)
+	for _, metric := range metrics {
+		newMetrics[metric] = true
+	}
+	return &Metrics{
+		metrics: newMetrics,
+	}
+}
+
+// Add adds the metrics to the metrics set and returns a new copy with the
+// additional metrics.
+func (m *Metrics) Add(metrics ...Metric) *Metrics {
+	newMetrics := make(map[Metric]bool)
+	for metric := range m.metrics {
+		newMetrics[metric] = true
+	}
+
+	for _, metric := range metrics {
+		newMetrics[metric] = true
+	}
+	return &Metrics{
+		metrics: newMetrics,
+	}
+}
+
+// Remove removes the metrics from the metrics set and returns a new copy with
+// the metrics removed.
+func (m *Metrics) Remove(metrics ...Metric) *Metrics {
+	newMetrics := make(map[Metric]bool)
+	for metric := range m.metrics {
+		newMetrics[metric] = true
+	}
+
+	for _, metric := range metrics {
+		delete(newMetrics, metric)
+	}
+	return &Metrics{
+		metrics: newMetrics,
+	}
+}
+
+// Options are the options for OpenTelemetry instrumentation.
+type Options struct {
+	// MetricsOptions are the metrics options for OpenTelemetry instrumentation.
+	MetricsOptions MetricsOptions
+}
+
+// MetricsOptions are the metrics options for OpenTelemetry instrumentation.
+type MetricsOptions struct {
+	// MeterProvider is the MeterProvider instance that will be used to create
+	// instruments. To enable metrics collection, set a meter provider. If
+	// unset, no metrics will be recorded. Any implementation knobs (i.e. views,
+	// bounds) set in the MeterProvider take precedence over the API calls from
+	// this interface. (i.e. it will create default views for unset views).
+	MeterProvider metric.MeterProvider
+
+	// Metrics are the metrics to instrument. Will create instrument and record telemetry
+	// for corresponding metric supported by the client and server
+	// instrumentation components if applicable. If not set, the default metrics
+	// will be recorded.
+	Metrics *Metrics
+
+	// TargetAttributeFilter is a callback that takes the target string of the
+	// channel and returns a bool representing whether to use target as a label
+	// value or use the string "other". If unset, will use the target string as
+	// is. This only applies for client side metrics.
+	TargetAttributeFilter func(string) bool
+
+	// MethodAttributeFilter is to record the method name of RPCs handled by
+	// grpc.UnknownServiceHandler, but take care to limit the values allowed, as
+	// allowing too many will increase cardinality and could cause severe memory
+	// or performance problems. On Client Side, pass a
+	// grpc.StaticMethodCallOption as a call option into Invoke or NewStream.
+	// This only applies for server side metrics.
+	MethodAttributeFilter func(string) bool
+}
+
+// DialOption returns a dial option which enables OpenTelemetry instrumentation
+// code for a grpc.ClientConn.
+//
+// Client applications interested in instrumenting their grpc.ClientConn should
+// pass the dial option returned from this function as a dial option to
+// grpc.NewClient().
+//
+// For the metrics supported by this instrumentation code, specify the client
+// metrics to record in metrics options. Also provide an implementation of a
+// MeterProvider. If the passed in Meter Provider does not have the view
+// configured for an individual metric turned on, the API call in this component
+// will create a default view for that metric.
+func DialOption(o Options) grpc.DialOption {
+	csh := &clientStatsHandler{o: o}
+	csh.initializeMetrics()
+	return joinDialOptions(grpc.WithChainUnaryInterceptor(csh.unaryInterceptor), grpc.WithChainStreamInterceptor(csh.streamInterceptor), grpc.WithStatsHandler(csh))
+}
+
+// ServerOption returns a server option which enables OpenTelemetry
+// instrumentation code for a grpc.Server.
+//
+// Server applications interested in instrumenting their grpc.Server should pass
+// the server option returned from this function as an argument to
+// grpc.NewServer().
+//
+// For the metrics supported by this instrumentation code, specify the server
+// metrics to record in metrics options. Also provide an implementation of a
+// MeterProvider. If the passed in Meter Provider does not have the view
+// configured for an individual metric turned on, the API call in this component
+// will create a default view for that metric.
+func ServerOption(o Options) grpc.ServerOption {
+	ssh := &serverStatsHandler{o: o}
+	ssh.initializeMetrics()
+	return grpc.StatsHandler(ssh)
+}
+
+// callInfo is information pertaining to the lifespan of the RPC client side.
+type callInfo struct {
+	target string
+
+	method string
+}
+
+type callInfoKey struct{}
+
+func setCallInfo(ctx context.Context, ci *callInfo) context.Context {
+	return context.WithValue(ctx, callInfoKey{}, ci)
+}
+
+// getCallInfo returns the callInfo stored in the context, or nil
+// if there isn't one.
+func getCallInfo(ctx context.Context) *callInfo {
+	ci, _ := ctx.Value(callInfoKey{}).(*callInfo)
+	return ci
+}
+
+// rpcInfo is RPC information scoped to the RPC attempt life span client side,
+// and the RPC life span server side.
+type rpcInfo struct {
+	mi *metricsInfo
+}
+
+type rpcInfoKey struct{}
+
+func setRPCInfo(ctx context.Context, ri *rpcInfo) context.Context {
+	return context.WithValue(ctx, rpcInfoKey{}, ri)
+}
+
+// getRPCInfo returns the rpcInfo stored in the context, or nil
+// if there isn't one.
+func getRPCInfo(ctx context.Context) *rpcInfo {
+	ri, _ := ctx.Value(rpcInfoKey{}).(*rpcInfo)
+	return ri
+}
+
+func removeLeadingSlash(mn string) string {
+	return strings.TrimLeft(mn, "/")
+}
+
+// metricsInfo is RPC information scoped to the RPC attempt life span client
+// side, and the RPC life span server side.
+type metricsInfo struct {
+	// access these counts atomically for hedging in the future:
+	// number of bytes after compression (within each message) from side (client
+	// || server).
+	sentCompressedBytes int64
+	// number of compressed bytes received (within each message) received on
+	// side (client || server).
+	recvCompressedBytes int64
+
+	startTime time.Time
+	method    string
+}
+
+type clientMetrics struct {
+	// "grpc.client.attempt.started"
+	attemptStarted metric.Int64Counter
+	// "grpc.client.attempt.duration"
+	attemptDuration metric.Float64Histogram
+	// "grpc.client.attempt.sent_total_compressed_message_size"
+	attemptSentTotalCompressedMessageSize metric.Int64Histogram
+	// "grpc.client.attempt.rcvd_total_compressed_message_size"
+	attemptRcvdTotalCompressedMessageSize metric.Int64Histogram
+
+	// "grpc.client.call.duration"
+	callDuration metric.Float64Histogram
+}
+
+type serverMetrics struct {
+	// "grpc.server.call.started"
+	callStarted metric.Int64Counter
+	// "grpc.server.call.sent_total_compressed_message_size"
+	callSentTotalCompressedMessageSize metric.Int64Histogram
+	// "grpc.server.call.rcvd_total_compressed_message_size"
+	callRcvdTotalCompressedMessageSize metric.Int64Histogram
+	// "grpc.server.call.duration"
+	callDuration metric.Float64Histogram
+}
+
+func createInt64Counter(setOfMetrics map[Metric]bool, metricName Metric, meter metric.Meter, options ...metric.Int64CounterOption) metric.Int64Counter {
+	if _, ok := setOfMetrics[metricName]; !ok {
+		return noop.Int64Counter{}
+	}
+	ret, err := meter.Int64Counter(string(metricName), options...)
+	if err != nil {
+		logger.Errorf("failed to register metric \"%v\", will not record", metricName)
+		return noop.Int64Counter{}
+	}
+	return ret
+}
+
+func createInt64Histogram(setOfMetrics map[Metric]bool, metricName Metric, meter metric.Meter, options ...metric.Int64HistogramOption) metric.Int64Histogram {
+	if _, ok := setOfMetrics[metricName]; !ok {
+		return noop.Int64Histogram{}
+	}
+	ret, err := meter.Int64Histogram(string(metricName), options...)
+	if err != nil {
+		logger.Errorf("failed to register metric \"%v\", will not record", metricName)
+		return noop.Int64Histogram{}
+	}
+	return ret
+}
+
+func createFloat64Histogram(setOfMetrics map[Metric]bool, metricName Metric, meter metric.Meter, options ...metric.Float64HistogramOption) metric.Float64Histogram {
+	if _, ok := setOfMetrics[metricName]; !ok {
+		return noop.Float64Histogram{}
+	}
+	ret, err := meter.Float64Histogram(string(metricName), options...)
+	if err != nil {
+		logger.Errorf("failed to register metric \"%v\", will not record", metricName)
+		return noop.Float64Histogram{}
+	}
+	return ret
+}
+
+// Users of this component should use these bucket boundaries as part of their
+// SDK MeterProvider passed in. This component sends this as "advice" to the
+// API, which works, however this stability is not guaranteed, so for safety the
+// SDK Meter Provider provided should set these bounds for corresponding
+// metrics.
+var (
+	// DefaultLatencyBounds are the default bounds for latency metrics.
+	DefaultLatencyBounds = []float64{0, 0.00001, 0.00005, 0.0001, 0.0003, 0.0006, 0.0008, 0.001, 0.002, 0.003, 0.004, 0.005, 0.006, 0.008, 0.01, 0.013, 0.016, 0.02, 0.025, 0.03, 0.04, 0.05, 0.065, 0.08, 0.1, 0.13, 0.16, 0.2, 0.25, 0.3, 0.4, 0.5, 0.65, 0.8, 1, 2, 5, 10, 20, 50, 100} // provide "advice" through API, SDK should set this too
+	// DefaultSizeBounds are the default bounds for metrics which record size.
+	DefaultSizeBounds = []float64{0, 1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864, 268435456, 1073741824, 4294967296}
+	// DefaultMetrics are the default metrics provided by this module.
+	DefaultMetrics = NewMetrics(ClientAttemptStarted, ClientAttemptDuration, ClientAttemptSentCompressedTotalMessageSize, ClientAttemptRcvdCompressedTotalMessageSize, ClientCallDuration, ServerCallStarted, ServerCallSentCompressedTotalMessageSize, ServerCallRcvdCompressedTotalMessageSize, ServerCallDuration)
+)
diff --git a/stats/opentelemetry/server_metrics.go b/stats/opentelemetry/server_metrics.go
new file mode 100644
index 0000000..13e8dfc
--- /dev/null
+++ b/stats/opentelemetry/server_metrics.go
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2024 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package opentelemetry
+
+import (
+	"context"
+	"sync/atomic"
+	"time"
+
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/internal"
+	"google.golang.org/grpc/stats"
+	"google.golang.org/grpc/status"
+
+	"go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/metric"
+)
+
+type serverStatsHandler struct {
+	o Options
+
+	serverMetrics serverMetrics
+}
+
+func (ssh *serverStatsHandler) initializeMetrics() {
+	// Will set no metrics to record, logically making this stats handler a
+	// no-op.
+	if ssh.o.MetricsOptions.MeterProvider == nil {
+		return
+	}
+
+	meter := ssh.o.MetricsOptions.MeterProvider.Meter("grpc-go " + grpc.Version)
+	if meter == nil {
+		return
+	}
+	setOfMetrics := ssh.o.MetricsOptions.Metrics.metrics
+
+	ssh.serverMetrics.callStarted = createInt64Counter(setOfMetrics, "grpc.server.call.started", meter, metric.WithUnit("call"), metric.WithDescription("Number of server calls started."))
+	ssh.serverMetrics.callSentTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.server.call.sent_total_compressed_message_size", meter, metric.WithUnit("By"), metric.WithDescription("Compressed message bytes sent per server call."), metric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
+	ssh.serverMetrics.callRcvdTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.server.call.rcvd_total_compressed_message_size", meter, metric.WithUnit("By"), metric.WithDescription("Compressed message bytes received per server call."), metric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
+	ssh.serverMetrics.callDuration = createFloat64Histogram(setOfMetrics, "grpc.server.call.duration", meter, metric.WithUnit("s"), metric.WithDescription("End-to-end time taken to complete a call from server transport's perspective."), metric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
+}
+
+// TagConn exists to satisfy stats.Handler.
+func (ssh *serverStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
+	return ctx
+}
+
+// HandleConn exists to satisfy stats.Handler.
+func (ssh *serverStatsHandler) HandleConn(context.Context, stats.ConnStats) {}
+
+// TagRPC implements per RPC context management.
+func (ssh *serverStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
+	method := info.FullMethodName
+	if ssh.o.MetricsOptions.MethodAttributeFilter != nil {
+		if !ssh.o.MetricsOptions.MethodAttributeFilter(method) {
+			method = "other"
+		}
+	}
+	server := internal.ServerFromContext.(func(context.Context) *grpc.Server)(ctx)
+	if server == nil { // Shouldn't happen, defensive programming.
+		logger.Error("ctx passed into server side stats handler has no grpc server ref")
+		method = "other"
+	} else {
+		isRegisteredMethod := internal.IsRegisteredMethod.(func(*grpc.Server, string) bool)
+		if !isRegisteredMethod(server, method) {
+			method = "other"
+		}
+	}
+
+	mi := &metricsInfo{
+		startTime: time.Now(),
+		method:    removeLeadingSlash(method),
+	}
+	ri := &rpcInfo{
+		mi: mi,
+	}
+	return setRPCInfo(ctx, ri)
+}
+
+// HandleRPC implements per RPC tracing and stats implementation.
+func (ssh *serverStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
+	ri := getRPCInfo(ctx)
+	if ri == nil {
+		logger.Error("ctx passed into server side stats handler metrics event handling has no server call data present")
+		return
+	}
+	ssh.processRPCData(ctx, rs, ri.mi)
+}
+
+func (ssh *serverStatsHandler) processRPCData(ctx context.Context, s stats.RPCStats, mi *metricsInfo) {
+	switch st := s.(type) {
+	case *stats.InHeader:
+		ssh.serverMetrics.callStarted.Add(ctx, 1, metric.WithAttributes(attribute.String("grpc.method", mi.method)))
+	case *stats.OutPayload:
+		atomic.AddInt64(&mi.sentCompressedBytes, int64(st.CompressedLength))
+	case *stats.InPayload:
+		atomic.AddInt64(&mi.recvCompressedBytes, int64(st.CompressedLength))
+	case *stats.End:
+		ssh.processRPCEnd(ctx, mi, st)
+	default:
+	}
+}
+
+func (ssh *serverStatsHandler) processRPCEnd(ctx context.Context, mi *metricsInfo, e *stats.End) {
+	latency := float64(time.Since(mi.startTime)) / float64(time.Second)
+	st := "OK"
+	if e.Error != nil {
+		s, _ := status.FromError(e.Error)
+		st = canonicalString(s.Code())
+	}
+	serverAttributeOption := metric.WithAttributes(attribute.String("grpc.method", mi.method), attribute.String("grpc.status", st))
+
+	ssh.serverMetrics.callDuration.Record(ctx, latency, serverAttributeOption)
+	ssh.serverMetrics.callSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.sentCompressedBytes), serverAttributeOption)
+	ssh.serverMetrics.callRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.recvCompressedBytes), serverAttributeOption)
+}
+
+const (
+	// ServerCallStarted is the number of server calls started.
+	ServerCallStarted Metric = "grpc.server.call.started"
+	// ServerCallSentCompressedTotalMessageSize is the compressed message bytes
+	// sent per server call.
+	ServerCallSentCompressedTotalMessageSize Metric = "grpc.server.call.sent_total_compressed_message_size"
+	// ServerCallRcvdCompressedTotalMessageSize is the compressed message bytes
+	// received per server call.
+	ServerCallRcvdCompressedTotalMessageSize Metric = "grpc.server.call.rcvd_total_compressed_message_size"
+	// ServerCallDuration is the end-to-end time taken to complete a call from
+	// server transport's perspective.
+	ServerCallDuration Metric = "grpc.server.call.duration"
+)