// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package host

import (
	"context"
	"crypto/sha256"
	"encoding/base64"
	"fmt"
	"os"
	"sync"
	"sync/atomic"
	"time"

	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/types/known/emptypb"

	"go.chromium.org/luci/common/errors"
	"go.chromium.org/luci/common/logging"

	"go.chromium.org/luci/cipd/client/cipd/plugin"
	"go.chromium.org/luci/cipd/client/cipd/plugin/plugins/admission"
	"go.chromium.org/luci/cipd/client/cipd/plugin/protocol"
	"go.chromium.org/luci/cipd/common"
)

// ErrAborted is returned by CheckAdmission promise when the plugin terminates.
var ErrAborted = errors.Reason("the admission plugin is terminating").Err()

// AdmissionPlugin launches and communicates with an admission plugin.
//
// It is instantiated by the CIPD client if it detects there's an admission
// plugin configured.
//
// Implements plugin.AdmissionPlugin interface.
type AdmissionPlugin struct {
	ctx             context.Context // for logging from the plugin
	host            *Host           // the Host to run the plugin in
	args            []string        // plugin's command line
	salt            int             // randomization for generated admission IDs
	protocolVersion int32           // the expected protocol version

	timeout   time.Duration // how long to wait for the ListAdmissions RPC
	connects  int32         // incremented in ListAdmissions RPC
	connected chan struct{} // closed in the first ListAdmissions RPC

	wg        sync.WaitGroup      // waits for p.launchPlugin to finish
	m         sync.Mutex          // protects everything below
	err       error               // if non-nil, denies all CheckAdmission calls
	launching bool                // true if we attempted to launch the plugin
	proc      *PluginProcess      // the running process, if started successfully
	closing   chan struct{}       // closed in Close
	closed    bool                // true if Close was called
	checks    map[string]*Promise // pending and finished checks
	pending   chan *Promise       // pending checks
}

// Promise is a pending or finished result of an admission check.
//
// Implements plugin.Promise interface.
type Promise struct {
	check    *protocol.Admission
	resolves int32         // how many times resolve(...) was called
	done     chan struct{} // closed in `resolve`
	err      error         // the result of the check (usually a gRPC status)
}

// newPromise constructs a new unresolved promise.
func newPromise(check *protocol.Admission) *Promise {
	return &Promise{
		check: check,
		done:  make(chan struct{}),
	}
}

// Wait blocks until the promise is fulfilled or the context expires.
//
// Returns nil if the admission check passed.
func (p *Promise) Wait(ctx context.Context) error {
	select {
	case <-p.done:
		return p.err
	case <-ctx.Done():
		return ctx.Err()
	}
}

// resolve records the check result and unblocks all Waits.
//
// Does nothing if the promise is already resolved.
func (p *Promise) resolve(err error) *Promise {
	if atomic.AddInt32(&p.resolves, 1) == 1 {
		p.err = err
		close(p.done)
	}
	return p
}

// resolved checks if the promise is already resolved.
func (p *Promise) resolved() bool {
	return atomic.LoadInt32(&p.resolves) != 0
}

// NewAdmissionPlugin returns a host-side representation of an admission plugin.
//
// The returned *AdmissionPlugin can be used right away to enqueue admission
// checks. The plugin subprocess will lazily be started on the first
// CheckAdmission call. All enqueued checks will eventually be processed by
// the plugin or rejected if the plugin fails to start.
//
// The context is used for logging from the plugin.
func NewAdmissionPlugin(ctx context.Context, host *Host, args []string) *AdmissionPlugin {
	return &AdmissionPlugin{
		ctx:             ctx,
		host:            host,
		args:            append([]string(nil), args...),
		salt:            os.Getpid(), // note: predictability is fine
		protocolVersion: admission.ProtocolVersion,
		timeout:         30 * time.Second, // see launchPlugin
		connected:       make(chan struct{}),
		closing:         make(chan struct{}),
		checks:          map[string]*Promise{},
		pending:         make(chan *Promise, 1000000), // ~infinite
	}
}

// Executable is a path to this plugin's executable.
func (p *AdmissionPlugin) Executable() string {
	return p.args[0]
}

// Close terminates the plugin (if it was running) and aborts all pending
// checks.
//
// Tries to gracefully terminate the plugin, killing it with SIGKILL on the
// context timeout or after 5 sec.
//
// Note that calling Close is not necessary if the plugin host itself
// terminates. The plugin subprocess will be terminated by the host in this
// case.
func (p *AdmissionPlugin) Close(ctx context.Context) {
	defer p.wg.Wait() // don't leak launchPlugin goroutine past Plugin's lifetime

	p.m.Lock()
	if !p.closed {
		p.closed = true
		p.rejectAllLocked(ErrAborted) // set p.err, abort all pending checks
		close(p.closing)              // notify launchPlugin (if running) to abort
	}
	proc := p.proc
	p.proc = nil
	p.m.Unlock()

	if proc != nil {
		proc.Terminate(ctx)
	}
}

// CheckAdmission enqueues an admission check to be performed by the plugin.
//
// The plugin will be asked if it's OK to deploy a package with the given pin
// hosted on the CIPD service used by the running CIPD client.
//
// Returns a promise which is resolved when the result is available. If such
// check is already pending (or has been done before), returns an existing
// (perhaps already resolved) promise.
func (p *AdmissionPlugin) CheckAdmission(pin common.Pin) plugin.Promise {
	admission, err := p.makeAdmission(pin)
	if err != nil {
		return newPromise(nil).resolve(err)
	}

	p.m.Lock()
	defer p.m.Unlock()

	// Reuse an existing promise (either pending or finished) if available.
	if existing, _ := p.checks[admission.AdmissionId]; existing != nil {
		return existing
	}

	// If already closed or broken, fail the check right away.
	if p.err != nil {
		return newPromise(nil).resolve(p.err)
	}

	// If we haven't tried to launch the plugin process yet, do it now.
	if !p.launching {
		p.launching = true
		p.wg.Add(1)
		go p.launchPlugin()
	}

	// Enqueue this request for processing when the plugin process is up.
	promise := newPromise(admission)
	p.checks[admission.AdmissionId] = promise
	p.pending <- promise
	return promise
}

// ClearCache drops all resolved promises to free up some memory.
func (p *AdmissionPlugin) ClearCache() {
	p.m.Lock()
	defer p.m.Unlock()
	for id, promise := range p.checks {
		if promise.resolved() {
			delete(p.checks, id)
		}
	}
}

// makeAdmission prepares *protocol.Admission, generating its ID.
//
// It hashes the request to make sure plugins do not rely on a particular format
// of the ID. It also randomizes it with some salt, to make sure plugins do not
// try to use it as a key in some persistent cache. Admission IDs are ephemeral.
func (p *AdmissionPlugin) makeAdmission(pin common.Pin) (*protocol.Admission, error) {
	admission := &protocol.Admission{
		ServiceUrl: p.host.Config().ServiceURL,
		Package:    pin.PackageName,
		Instance:   common.InstanceIDToObjectRef(pin.InstanceID),
	}

	// Binary proto serialization within a single process is stable. We can use it
	// to derive an ID.
	blob, err := proto.Marshal(admission)
	if err != nil {
		return nil, errors.Annotate(err, "failed to serialize Admission").Err()
	}

	// Mix in the salt to randomize admission IDs across CIPD client processes.
	h := sha256.New()
	fmt.Fprintf(h, "%d\n", p.salt)
	h.Write(blob)

	admission.AdmissionId = base64.RawStdEncoding.EncodeToString(h.Sum(nil))
	return admission, nil
}

// rejectAllLocked rejects all pending and future admission checks.
//
// Must be called with p.m locked.
func (p *AdmissionPlugin) rejectAllLocked(err error) {
	if p.err == nil {
		p.err = err
		for _, promise := range p.checks {
			promise.resolve(p.err)
		}
		close(p.pending)
		for range p.pending {
		}
	}
}

// launchPlugin launches the plugin subprocess and waits for it to connect.
//
// It is called from a background goroutine on a first CheckAdmission call.
func (p *AdmissionPlugin) launchPlugin() {
	defer p.wg.Done()

	proc, err := p.host.LaunchPlugin(p.ctx, p.args, &Controller{
		Admissions: &admissionsServer{plugin: p},
	})

	if err == nil {
		ctx, cancel := context.WithTimeout(p.ctx, p.timeout)
		defer cancel()
		select {
		case <-p.connected:
			// The plugin called ListAdmissions and is listening for requests now or
			// we asked it to go away due to incompatible protocol version (in which
			// case p.err is already set).
		case <-p.closing:
			// Already closing, p.err is not nil and will be handled below.
		case <-proc.Done():
			err = errors.Annotate(proc.Err(), "the admission plugin terminated before making ListAdmissions RPC").Err()
		case <-ctx.Done():
			err = errors.Annotate(ctx.Err(), "while waiting for ListAdmissions RPC").Err()
		}
	}

	p.m.Lock()
	switch {
	case p.err != nil:
		// We are closing or broken. The plugin process is no longer needed.
		err = p.err
	case err != nil:
		// The plugin failed to start, move us into the "broken" state.
		logging.Warningf(p.ctx, "The admission plugin failed to start: %s", err)
		p.rejectAllLocked(err)
	default:
		// The plugin has started successfully and some processing has begun!
		p.proc = proc
	}
	p.m.Unlock()

	// Kill the plugin if it is no longer needed.
	if err != nil && proc != nil {
		proc.Terminate(p.ctx)
	}
}

// onConnected is called when the plugin makes ListAdmissions RPC.
func (p *AdmissionPlugin) onConnected(req *protocol.ListAdmissionsRequest) error {
	// At most one ListAdmissions call per plugin's life cycle is allowed, since
	// we use its completion as a signal that the plugin has disconnected (e.g.
	// unexpectedly died). There's no sudden unexpected disconnects on localhost.
	if atomic.AddInt32(&p.connects, 1) != 1 {
		return status.Errorf(codes.FailedPrecondition, "already called ListAdmissions")
	}
	logging.Debugf(p.ctx, "Using deployment admission plugin %q", req.PluginVersion)

	var err error
	if req.ProtocolVersion != p.protocolVersion {
		logging.Errorf(p.ctx, "Unknown admission plugin protocol %d: expecting %d", req.ProtocolVersion, p.protocolVersion)
		err = status.Errorf(codes.FailedPrecondition, "unknown protocol version %d: expecting %d", req.ProtocolVersion, p.protocolVersion)
		p.m.Lock()
		p.rejectAllLocked(err)
		p.m.Unlock()
	}

	close(p.connected)
	return err
}

// onDisconnected is called when the plugin aborts ListAdmissions RPC.
//
// Note that it can potentially happen even before launchPlugin completes, if
// the plugin crashed particularly fast.
func (p *AdmissionPlugin) onDisconnected() {
	p.m.Lock()
	defer p.m.Unlock()

	err := ErrAborted

	// Quickly poll for the process status: if it crashed hard, we'd like to know.
	// If 50 ms is not enough for it to terminate after the disconnect, no big
	// deal, a generic error message in ErrAborted will suffice too.
	if p.proc != nil {
		select {
		case <-time.After(50 * time.Millisecond):
		case <-p.proc.Done():
			if p.proc.Err() != ErrTerminated {
				logging.Warningf(p.ctx, "The admission plugin has crashed: %s", p.proc.Err())
			}
			err = errors.Annotate(p.proc.Err(), "the admission plugin terminated").Err()
		}
	}

	p.rejectAllLocked(err)
}

// dequeue blocks until there's an unprocessed admission request available.
//
// Respects context's expiration.
func (p *AdmissionPlugin) dequeue(ctx context.Context) (*protocol.Admission, error) {
	for {
		select {
		case promise := <-p.pending:
			// There are two concurrent termination paths once the host decides to
			// stop the plugin: (1) it replies with codes.Aborted below, and (2) it
			// closes the plugin's stdin.
			//
			// (2) can win the race, which results in the plugin canceling
			// ListAdmissions on its own before receiving codes.Aborted. It manifests
			// as 'ctx' here being canceled.
			//
			// The termination path that uses stdin is more general and works for
			// any kind of a plugin. The path (1) exists because we need to react to
			// p.pending closing somehow. There's probably a way to get rid of it, but
			// it'll make the code more complicated.
			if promise == nil {
				return nil, status.Errorf(codes.Aborted, "terminating")
			}
			if promise.resolved() {
				// Likely we are already closing and the promise was resolved in
				// rejectAllLocked. If so, keep draining the channel until it returns
				// nil.
				continue
			}
			return promise.check, nil
		case <-ctx.Done():
			return nil, status.FromContextError(ctx.Err()).Err()
		}
	}
}

// resolve is called when an admission request is resolved by the plugin.
func (p *AdmissionPlugin) resolve(id string, err error) {
	p.m.Lock()
	promise, _ := p.checks[id]
	p.m.Unlock()
	if promise != nil {
		promise.resolve(err)
	}
}

////////////////////////////////////////////////////////////////////////////////

// admissionsServer receives RPCs from some single admission plugin.
type admissionsServer struct {
	protocol.UnimplementedAdmissionsServer
	plugin *AdmissionPlugin
}

func (s *admissionsServer) ListAdmissions(req *protocol.ListAdmissionsRequest, stream protocol.Admissions_ListAdmissionsServer) error {
	if err := s.plugin.onConnected(req); err != nil {
		return err
	}
	defer s.plugin.onDisconnected()

	for {
		admission, err := s.plugin.dequeue(stream.Context())
		if err != nil {
			return err
		}
		if err := stream.Send(admission); err != nil {
			return err
		}
	}
}

func (s *admissionsServer) ResolveAdmission(ctx context.Context, req *protocol.ResolveAdmissionRequest) (*emptypb.Empty, error) {
	s.plugin.resolve(req.AdmissionId, status.ErrorProto(req.Status))
	return &emptypb.Empty{}, nil
}
