blob: 86193a46d02bd8a1a60c1801d254eebf932362ca [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package host
import (
"context"
"crypto/sha256"
"encoding/base64"
"fmt"
"os"
"sync"
"sync/atomic"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/cipd/client/cipd/plugin"
"go.chromium.org/luci/cipd/client/cipd/plugin/plugins/admission"
"go.chromium.org/luci/cipd/client/cipd/plugin/protocol"
"go.chromium.org/luci/cipd/common"
)
// ErrAborted is returned by CheckAdmission promise when the plugin terminates.
var ErrAborted = errors.Reason("the admission plugin is terminating").Err()
// AdmissionPlugin launches and communicates with an admission plugin.
//
// It is instantiated by the CIPD client if it detects there's an admission
// plugin configured.
//
// Implements plugin.AdmissionPlugin interface.
type AdmissionPlugin struct {
ctx context.Context // for logging from the plugin
host *Host // the Host to run the plugin in
args []string // plugin's command line
salt int // randomization for generated admission IDs
protocolVersion int32 // the expected protocol version
timeout time.Duration // how long to wait for the ListAdmissions RPC
connects int32 // incremented in ListAdmissions RPC
connected chan struct{} // closed in the first ListAdmissions RPC
wg sync.WaitGroup // waits for p.launchPlugin to finish
m sync.Mutex // protects everything below
err error // if non-nil, denies all CheckAdmission calls
launching bool // true if we attempted to launch the plugin
proc *PluginProcess // the running process, if started successfully
closing chan struct{} // closed in Close
closed bool // true if Close was called
checks map[string]*Promise // pending and finished checks
pending chan *Promise // pending checks
}
// Promise is a pending or finished result of an admission check.
//
// Implements plugin.Promise interface.
type Promise struct {
check *protocol.Admission
resolves int32 // how many times resolve(...) was called
done chan struct{} // closed in `resolve`
err error // the result of the check (usually a gRPC status)
}
// newPromise constructs a new unresolved promise.
func newPromise(check *protocol.Admission) *Promise {
return &Promise{
check: check,
done: make(chan struct{}),
}
}
// Wait blocks until the promise is fulfilled or the context expires.
//
// Returns nil if the admission check passed.
func (p *Promise) Wait(ctx context.Context) error {
select {
case <-p.done:
return p.err
case <-ctx.Done():
return ctx.Err()
}
}
// resolve records the check result and unblocks all Waits.
//
// Does nothing if the promise is already resolved.
func (p *Promise) resolve(err error) *Promise {
if atomic.AddInt32(&p.resolves, 1) == 1 {
p.err = err
close(p.done)
}
return p
}
// resolved checks if the promise is already resolved.
func (p *Promise) resolved() bool {
return atomic.LoadInt32(&p.resolves) != 0
}
// NewAdmissionPlugin returns a host-side representation of an admission plugin.
//
// The returned *AdmissionPlugin can be used right away to enqueue admission
// checks. The plugin subprocess will lazily be started on the first
// CheckAdmission call. All enqueued checks will eventually be processed by
// the plugin or rejected if the plugin fails to start.
//
// The context is used for logging from the plugin.
func NewAdmissionPlugin(ctx context.Context, host *Host, args []string) *AdmissionPlugin {
return &AdmissionPlugin{
ctx: ctx,
host: host,
args: append([]string(nil), args...),
salt: os.Getpid(), // note: predictability is fine
protocolVersion: admission.ProtocolVersion,
timeout: 30 * time.Second, // see launchPlugin
connected: make(chan struct{}),
closing: make(chan struct{}),
checks: map[string]*Promise{},
pending: make(chan *Promise, 1000000), // ~infinite
}
}
// Executable is a path to this plugin's executable.
func (p *AdmissionPlugin) Executable() string {
return p.args[0]
}
// Close terminates the plugin (if it was running) and aborts all pending
// checks.
//
// Tries to gracefully terminate the plugin, killing it with SIGKILL on the
// context timeout or after 5 sec.
//
// Note that calling Close is not necessary if the plugin host itself
// terminates. The plugin subprocess will be terminated by the host in this
// case.
func (p *AdmissionPlugin) Close(ctx context.Context) {
defer p.wg.Wait() // don't leak launchPlugin goroutine past Plugin's lifetime
p.m.Lock()
if !p.closed {
p.closed = true
p.rejectAllLocked(ErrAborted) // set p.err, abort all pending checks
close(p.closing) // notify launchPlugin (if running) to abort
}
proc := p.proc
p.proc = nil
p.m.Unlock()
if proc != nil {
proc.Terminate(ctx)
}
}
// CheckAdmission enqueues an admission check to be performed by the plugin.
//
// The plugin will be asked if it's OK to deploy a package with the given pin
// hosted on the CIPD service used by the running CIPD client.
//
// Returns a promise which is resolved when the result is available. If such
// check is already pending (or has been done before), returns an existing
// (perhaps already resolved) promise.
func (p *AdmissionPlugin) CheckAdmission(pin common.Pin) plugin.Promise {
admission, err := p.makeAdmission(pin)
if err != nil {
return newPromise(nil).resolve(err)
}
p.m.Lock()
defer p.m.Unlock()
// Reuse an existing promise (either pending or finished) if available.
if existing, _ := p.checks[admission.AdmissionId]; existing != nil {
return existing
}
// If already closed or broken, fail the check right away.
if p.err != nil {
return newPromise(nil).resolve(p.err)
}
// If we haven't tried to launch the plugin process yet, do it now.
if !p.launching {
p.launching = true
p.wg.Add(1)
go p.launchPlugin()
}
// Enqueue this request for processing when the plugin process is up.
promise := newPromise(admission)
p.checks[admission.AdmissionId] = promise
p.pending <- promise
return promise
}
// ClearCache drops all resolved promises to free up some memory.
func (p *AdmissionPlugin) ClearCache() {
p.m.Lock()
defer p.m.Unlock()
for id, promise := range p.checks {
if promise.resolved() {
delete(p.checks, id)
}
}
}
// makeAdmission prepares *protocol.Admission, generating its ID.
//
// It hashes the request to make sure plugins do not rely on a particular format
// of the ID. It also randomizes it with some salt, to make sure plugins do not
// try to use it as a key in some persistent cache. Admission IDs are ephemeral.
func (p *AdmissionPlugin) makeAdmission(pin common.Pin) (*protocol.Admission, error) {
admission := &protocol.Admission{
ServiceUrl: p.host.Config().ServiceURL,
Package: pin.PackageName,
Instance: common.InstanceIDToObjectRef(pin.InstanceID),
}
// Binary proto serialization within a single process is stable. We can use it
// to derive an ID.
blob, err := proto.Marshal(admission)
if err != nil {
return nil, errors.Annotate(err, "failed to serialize Admission").Err()
}
// Mix in the salt to randomize admission IDs across CIPD client processes.
h := sha256.New()
fmt.Fprintf(h, "%d\n", p.salt)
h.Write(blob)
admission.AdmissionId = base64.RawStdEncoding.EncodeToString(h.Sum(nil))
return admission, nil
}
// rejectAllLocked rejects all pending and future admission checks.
//
// Must be called with p.m locked.
func (p *AdmissionPlugin) rejectAllLocked(err error) {
if p.err == nil {
p.err = err
for _, promise := range p.checks {
promise.resolve(p.err)
}
close(p.pending)
for range p.pending {
}
}
}
// launchPlugin launches the plugin subprocess and waits for it to connect.
//
// It is called from a background goroutine on a first CheckAdmission call.
func (p *AdmissionPlugin) launchPlugin() {
defer p.wg.Done()
proc, err := p.host.LaunchPlugin(p.ctx, p.args, &Controller{
Admissions: &admissionsServer{plugin: p},
})
if err == nil {
ctx, cancel := context.WithTimeout(p.ctx, p.timeout)
defer cancel()
select {
case <-p.connected:
// The plugin called ListAdmissions and is listening for requests now or
// we asked it to go away due to incompatible protocol version (in which
// case p.err is already set).
case <-p.closing:
// Already closing, p.err is not nil and will be handled below.
case <-proc.Done():
err = errors.Annotate(proc.Err(), "the admission plugin terminated before making ListAdmissions RPC").Err()
case <-ctx.Done():
err = errors.Annotate(ctx.Err(), "while waiting for ListAdmissions RPC").Err()
}
}
p.m.Lock()
switch {
case p.err != nil:
// We are closing or broken. The plugin process is no longer needed.
err = p.err
case err != nil:
// The plugin failed to start, move us into the "broken" state.
logging.Warningf(p.ctx, "The admission plugin failed to start: %s", err)
p.rejectAllLocked(err)
default:
// The plugin has started successfully and some processing has begun!
p.proc = proc
}
p.m.Unlock()
// Kill the plugin if it is no longer needed.
if err != nil && proc != nil {
proc.Terminate(p.ctx)
}
}
// onConnected is called when the plugin makes ListAdmissions RPC.
func (p *AdmissionPlugin) onConnected(req *protocol.ListAdmissionsRequest) error {
// At most one ListAdmissions call per plugin's life cycle is allowed, since
// we use its completion as a signal that the plugin has disconnected (e.g.
// unexpectedly died). There's no sudden unexpected disconnects on localhost.
if atomic.AddInt32(&p.connects, 1) != 1 {
return status.Errorf(codes.FailedPrecondition, "already called ListAdmissions")
}
logging.Debugf(p.ctx, "Using deployment admission plugin %q", req.PluginVersion)
var err error
if req.ProtocolVersion != p.protocolVersion {
logging.Errorf(p.ctx, "Unknown admission plugin protocol %d: expecting %d", req.ProtocolVersion, p.protocolVersion)
err = status.Errorf(codes.FailedPrecondition, "unknown protocol version %d: expecting %d", req.ProtocolVersion, p.protocolVersion)
p.m.Lock()
p.rejectAllLocked(err)
p.m.Unlock()
}
close(p.connected)
return err
}
// onDisconnected is called when the plugin aborts ListAdmissions RPC.
//
// Note that it can potentially happen even before launchPlugin completes, if
// the plugin crashed particularly fast.
func (p *AdmissionPlugin) onDisconnected() {
p.m.Lock()
defer p.m.Unlock()
err := ErrAborted
// Quickly poll for the process status: if it crashed hard, we'd like to know.
// If 50 ms is not enough for it to terminate after the disconnect, no big
// deal, a generic error message in ErrAborted will suffice too.
if p.proc != nil {
select {
case <-time.After(50 * time.Millisecond):
case <-p.proc.Done():
if p.proc.Err() != ErrTerminated {
logging.Warningf(p.ctx, "The admission plugin has crashed: %s", p.proc.Err())
}
err = errors.Annotate(p.proc.Err(), "the admission plugin terminated").Err()
}
}
p.rejectAllLocked(err)
}
// dequeue blocks until there's an unprocessed admission request available.
//
// Respects context's expiration.
func (p *AdmissionPlugin) dequeue(ctx context.Context) (*protocol.Admission, error) {
for {
select {
case promise := <-p.pending:
// There are two concurrent termination paths once the host decides to
// stop the plugin: (1) it replies with codes.Aborted below, and (2) it
// closes the plugin's stdin.
//
// (2) can win the race, which results in the plugin canceling
// ListAdmissions on its own before receiving codes.Aborted. It manifests
// as 'ctx' here being canceled.
//
// The termination path that uses stdin is more general and works for
// any kind of a plugin. The path (1) exists because we need to react to
// p.pending closing somehow. There's probably a way to get rid of it, but
// it'll make the code more complicated.
if promise == nil {
return nil, status.Errorf(codes.Aborted, "terminating")
}
if promise.resolved() {
// Likely we are already closing and the promise was resolved in
// rejectAllLocked. If so, keep draining the channel until it returns
// nil.
continue
}
return promise.check, nil
case <-ctx.Done():
return nil, status.FromContextError(ctx.Err()).Err()
}
}
}
// resolve is called when an admission request is resolved by the plugin.
func (p *AdmissionPlugin) resolve(id string, err error) {
p.m.Lock()
promise, _ := p.checks[id]
p.m.Unlock()
if promise != nil {
promise.resolve(err)
}
}
////////////////////////////////////////////////////////////////////////////////
// admissionsServer receives RPCs from some single admission plugin.
type admissionsServer struct {
protocol.UnimplementedAdmissionsServer
plugin *AdmissionPlugin
}
func (s *admissionsServer) ListAdmissions(req *protocol.ListAdmissionsRequest, stream protocol.Admissions_ListAdmissionsServer) error {
if err := s.plugin.onConnected(req); err != nil {
return err
}
defer s.plugin.onDisconnected()
for {
admission, err := s.plugin.dequeue(stream.Context())
if err != nil {
return err
}
if err := stream.Send(admission); err != nil {
return err
}
}
}
func (s *admissionsServer) ResolveAdmission(ctx context.Context, req *protocol.ResolveAdmissionRequest) (*emptypb.Empty, error) {
s.plugin.resolve(req.AdmissionId, status.ErrorProto(req.Status))
return &emptypb.Empty{}, nil
}