blob: ffdeb4edcf3163bb257299c77de5833ee2817606 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package host
import (
"context"
"fmt"
"os"
"os/exec"
"strings"
"sync"
"testing"
"time"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/logging/gologger"
"go.chromium.org/luci/common/logging/memlogger"
"go.chromium.org/luci/cipd/client/cipd/plugin"
"go.chromium.org/luci/cipd/client/cipd/plugin/plugins"
"go.chromium.org/luci/cipd/client/cipd/plugin/protocol"
. "github.com/smartystreets/goconvey/convey"
)
func init() {
registerPluginMain("PLUGIN_BASIC", func(ctx context.Context, mode string) error {
if mode == "CRASHING_ON_START" {
os.Exit(2)
}
return plugins.Run(ctx, os.Stdin, func(ctx context.Context, conn *grpc.ClientConn) error {
// Abuse "ResolveAdmission" as a notification channel that we have started.
adm := protocol.NewAdmissionsClient(conn)
adm.ResolveAdmission(ctx, &protocol.ResolveAdmissionRequest{
AdmissionId: mode,
})
switch {
case mode == "WAIT":
// "Run" will wait until the context is canceled by default.
case mode == "STUCK_IN_TERMINATION":
<-ctx.Done()
time.Sleep(30 * time.Second)
case mode == "CRASHING_AFTER_RPC":
time.Sleep(100 * time.Millisecond)
os.Exit(2)
case mode == "LOG_STUFF_AND_EXIT":
logging.Infof(ctx, "Info")
logging.Warningf(ctx, "Warning")
logging.Errorf(ctx, "Error")
os.Exit(0)
case strings.HasPrefix(mode, "NUM_"):
// Just wait.
default:
return errors.Reason("unknown test mode %q", mode).Err()
}
return nil
})
})
}
func launchPlugin(ctx context.Context, host *Host, mode string) (*PluginProcess, error) {
fakeAdmissions := fakeAdmissionsServer{calls: make(chan string, 1)}
proc, err := host.LaunchPlugin(ctx, []string{os.Args[0], "PLUGIN_BASIC", mode}, &Controller{
Admissions: &fakeAdmissions,
})
if err != nil {
return nil, err
}
// Wait until the plugin calls an RPC to make sure it is connected.
select {
case <-time.After(30 * time.Second):
panic("the test is stuck")
case <-proc.Done():
return nil, proc.Err()
case msg := <-fakeAdmissions.calls:
if msg != mode {
return nil, errors.Reason("unexpected RPC call %q != %q", msg, mode).Err()
}
return proc, nil
}
}
func TestPlugins(t *testing.T) {
t.Parallel()
ctx := gologger.StdConfig.Use(context.Background())
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
Convey("With a host", t, func() {
host := &Host{}
host.Initialize(plugin.Config{ServiceURL: "https://example.com"})
defer host.Close(ctx)
Convey("Plugin wrong command line", func() {
_, err := host.LaunchPlugin(ctx, []string{"doesnt_exist"}, &Controller{})
So(err, ShouldNotBeNil)
})
Convey("Plugin exits when host closes", func() {
proc, err := launchPlugin(ctx, host, "WAIT")
So(err, ShouldBeNil)
So(proc.Err(), ShouldBeNil)
host.Close(ctx)
So(proc.Err(), ShouldEqual, ErrTerminated)
})
Convey("Terminate works", func() {
proc, err := launchPlugin(ctx, host, "WAIT")
So(err, ShouldBeNil)
So(proc.Err(), ShouldBeNil)
So(proc.Terminate(ctx), ShouldEqual, ErrTerminated)
So(proc.Err(), ShouldEqual, ErrTerminated)
})
Convey("Respects Terminate timeout", func() {
proc, err := launchPlugin(ctx, host, "STUCK_IN_TERMINATION")
So(err, ShouldBeNil)
ctx, cancel := context.WithTimeout(ctx, 50*time.Millisecond)
defer cancel()
So(proc.Terminate(ctx), ShouldHaveSameTypeAs, &exec.ExitError{})
})
Convey("Plugin crashing on start", func() {
_, err := launchPlugin(ctx, host, "CRASHING_ON_START")
// This is either an ExitError or a pipe error writing to stdin, depending
// on how far LaunchPlugin progressed before the plugin process crashed.
So(err, ShouldNotBeNil)
})
Convey("Plugin crashing after RPC", func() {
proc, err := launchPlugin(ctx, host, "CRASHING_AFTER_RPC")
So(err, ShouldBeNil)
select {
case <-time.After(30 * time.Second):
panic("the test is stuck")
case <-proc.Done():
So(proc.Err(), ShouldHaveSameTypeAs, &exec.ExitError{})
}
})
Convey("Logging from plugin works", func() {
ctx := memlogger.Use(ctx)
proc, err := launchPlugin(ctx, host, "LOG_STUFF_AND_EXIT")
So(err, ShouldBeNil)
select {
case <-time.After(30 * time.Second):
panic("the test is stuck")
case <-proc.Done():
So(proc.Err(), ShouldEqual, ErrTerminated)
}
msg := logging.Get(ctx).(*memlogger.MemLogger).Messages()
So(msg, ShouldHaveLength, 3)
So(msg[0].Level, ShouldEqual, logging.Info)
So(msg[0].Msg, ShouldEndWith, "Info")
So(msg[1].Level, ShouldEqual, logging.Warning)
So(msg[1].Msg, ShouldEndWith, "Warning")
So(msg[2].Level, ShouldEqual, logging.Error)
So(msg[2].Msg, ShouldEndWith, "Error")
})
Convey("Multiple plugins", func() {
proc := make([]*PluginProcess, 5)
wg := sync.WaitGroup{}
for i := 0; i < len(proc); i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
proc[i], _ = launchPlugin(ctx, host, fmt.Sprintf("NUM_%d", i))
}(i)
}
wg.Wait()
// All are running.
for _, p := range proc {
So(p, ShouldNotBeNil)
So(p.Err(), ShouldBeNil)
}
host.Close(ctx)
// All are stopped.
for _, p := range proc {
So(p.Err(), ShouldEqual, ErrTerminated)
}
})
Convey("Serving error", func() {
host.testServeErr = fmt.Errorf("simulated grpc startup error")
_, err := launchPlugin(ctx, host, "WAIT")
So(err, ShouldNotBeNil)
_, err = launchPlugin(ctx, host, "WAIT")
So(err, ShouldNotBeNil)
})
})
}
type fakeAdmissionsServer struct {
protocol.UnimplementedAdmissionsServer
calls chan string
}
func (s *fakeAdmissionsServer) ResolveAdmission(ctx context.Context, req *protocol.ResolveAdmissionRequest) (*emptypb.Empty, error) {
s.calls <- req.AdmissionId
return &emptypb.Empty{}, nil
}