blob: 754355a4a07b21e5d45c3e6b0aec80f0933ed674 [file] [log] [blame]
// Copyright 2023 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"context"
"fmt"
"net"
"strings"
"sync"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
"go.chromium.org/luci/grpc/grpcutil"
)
// grpcPort implements servingPort on top of a gRPC server.
type grpcPort struct {
listener net.Listener // the bound socket, always non-nil, doesn't change
m sync.Mutex // protects mutable state below
services []grpcService // services to register when starting
opts []grpc.ServerOption // options to pass to the server when starting
server *grpc.Server // non-nil after the server has started
healthSrv *health.Server // the basic health check service
}
// grpcService is a service registered via registerService.
type grpcService struct {
desc *grpc.ServiceDesc
impl any
}
// registerService exposes the given service over gRPC.
func (p *grpcPort) registerService(desc *grpc.ServiceDesc, impl any) {
p.m.Lock()
defer p.m.Unlock()
if p.server != nil {
panic("the server has already started")
}
p.services = append(p.services, grpcService{desc, impl})
}
// addServerOptions adds an option to pass to NewServer.
func (p *grpcPort) addServerOptions(opts ...grpc.ServerOption) {
p.m.Lock()
defer p.m.Unlock()
if p.server != nil {
panic("the server has already started")
}
p.opts = append(p.opts, opts...)
}
// nameForLog returns a string to identify this port in the server logs.
//
// Part of the servingPort interface.
func (p *grpcPort) nameForLog() string {
return fmt.Sprintf("grpc://%s [grpc]", p.listener.Addr())
}
// serve runs the serving loop until it is gracefully stopped.
//
// Part of the servingPort interface.
func (p *grpcPort) serve(baseCtx func() context.Context) error {
p.m.Lock()
if p.server != nil {
panic("the server has already started")
}
// Install outer-most interceptors that append the root server context values
// to the per-request context (which is basically just a context.Background()
// with its cancellation controlled by the gRPC server). NewServer will append
// other interceptors in `opts` after these root ones.
injectCtx := contextInjector(baseCtx)
p.opts = append(p.opts,
grpc.UnaryInterceptor(injectCtx.Unary()),
grpc.StreamInterceptor(injectCtx.Stream()),
)
p.server = grpc.NewServer(p.opts...)
// Install reflection only into gRPC server (not pRPC one), since it uses
// streaming RPCs not supported by pRPC. pRPC has its own similar service
// called Discovery.
reflection.Register(p.server)
// Services installed into both pRPC and gRPC.
hasHealth := false
for _, svc := range p.services {
p.server.RegisterService(svc.desc, svc.impl)
if strings.HasPrefix(svc.desc.ServiceName, "grpc.health.") {
hasHealth = true
}
}
// If the health check service not installed yet by the server user, install
// our own basic one. The server users might want to install their own
// implementations if they plan to dynamically control the serving status.
if !hasHealth {
p.healthSrv = health.NewServer()
grpc_health_v1.RegisterHealthServer(p.server, p.healthSrv)
}
server := p.server
p.m.Unlock()
return server.Serve(p.listener)
}
// shutdown gracefully stops the server, blocking until it is closed.
//
// Does nothing if the server is not running.
//
// Part of the servingPort interface.
func (p *grpcPort) shutdown(ctx context.Context) {
p.m.Lock()
server := p.server
if p.healthSrv != nil {
p.healthSrv.Shutdown() // announce we are going away
}
p.m.Unlock()
if server != nil {
server.GracefulStop()
}
}
////////////////////////////////////////////////////////////////////////////////
// contextInjector is an interceptor that replaces a context with the one that
// takes values from the request context **and** baseCtx(), but keeps
// cancellation of the request context.
func contextInjector(baseCtx func() context.Context) grpcutil.UnifiedServerInterceptor {
return func(ctx context.Context, fullMethod string, handler func(ctx context.Context) error) error {
return handler(&mergedCtx{ctx, baseCtx()})
}
}
type mergedCtx struct {
context.Context
values context.Context
}
func (m mergedCtx) Value(key any) any {
if v := m.Context.Value(key); v != nil {
return v
}
return m.values.Value(key)
}