blob: 12f0e399abcd5d34ae9477194cb661ff989fb3d5 [file] [log] [blame]
// Copyright 2021 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package firmware
import (
"bufio"
"context"
"regexp"
"time"
"github.com/golang/protobuf/ptypes/empty"
"chromiumos/tast/common/servo"
"chromiumos/tast/errors"
"chromiumos/tast/remote/firmware"
"chromiumos/tast/remote/firmware/fixture"
"chromiumos/tast/testing"
"chromiumos/tast/testing/hwdep"
)
func init() {
testing.AddTest(&testing.Test{
Func: ECKeyboard,
Desc: "Test EC Keyboard interface",
Contacts: []string{"tij@google.com", "cros-fw-engprod@google.com"},
Attr: []string{"group:firmware", "firmware_unstable"},
HardwareDeps: hwdep.D(hwdep.ChromeEC(), hwdep.Keyboard()),
Fixture: fixture.NormalMode,
Timeout: 2 * time.Minute,
ServiceDeps: []string{"tast.cros.firmware.UtilsService"},
})
}
const typeTimeout = 250 * time.Millisecond
var testKeyMap = map[string]string{
"0": "KEY_0",
"b": "KEY_B",
"e": "KEY_E",
"o": "KEY_O",
"r": "KEY_R",
"s": "KEY_S",
"t": "KEY_T",
"<enter>": "KEY_ENTER",
"<ctrl_l>": "KEY_LEFTCTRL",
"<alt_l>": "KEY_LEFTALT",
}
func ECKeyboard(ctx context.Context, s *testing.State) {
h := s.FixtValue().(*fixture.Value).Helper
if err := h.RequireServo(ctx); err != nil {
s.Fatal("Failed to connect to servo: ", err)
}
if err := h.RequireRPCUtils(ctx); err != nil {
s.Fatal("Requiring RPC utils: ", err)
}
if hasKb, err := h.Servo.HasControl(ctx, string(servo.USBKeyboard)); err != nil {
s.Fatal("Failed to check for usb keyboard: ", err)
} else if hasKb {
if err := h.Servo.SetOnOff(ctx, servo.USBKeyboard, servo.Off); err != nil {
s.Fatal("Failed to disable usb keyboard: ", err)
}
}
res, err := h.RPCUtils.FindPhysicalKeyboard(ctx, &empty.Empty{})
if err != nil {
s.Fatal("During FindPhysicalKeyboard: ", err)
}
device := res.Path
s.Log("Device path: ", device)
cmd := h.DUT.Conn().CommandContext(ctx, "evtest", device)
stdout, err := cmd.StdoutPipe()
scanner := bufio.NewScanner(stdout)
cmd.Start()
// Read and discard initial info text.
func() {
text := make(chan string)
go func() {
defer close(text)
for scanner.Scan() {
text <- scanner.Text()
}
}()
for {
select {
case <-time.After(1 * time.Second):
// Time out after 1 second so it doesn't get stuck here.
s.Log("Finshed reading preamble")
return
case _ = <-text:
continue
}
}
}()
for key, keyCode := range testKeyMap {
s.Logf("Pressing key %q, expecting to read keycode %q", key, keyCode)
if err = readKeyPress(ctx, h, scanner, key, keyCode); err != nil {
s.Fatal("Failed to read key: ", err)
}
// Wait for reading to complete before entering next key to prevent failing previous read.
if err = testing.Sleep(ctx, typeTimeout); err != nil {
s.Fatalf("Failed to sleep for %s waiting to type next key", typeTimeout)
}
}
}
func readKeyPress(ctx context.Context, h *firmware.Helper, scanner *bufio.Scanner, key, keyCode string) error {
regex := `Event.*time.*code\s(\d*)\s\(` + keyCode + `\)`
expMatch := regexp.MustCompile(regex)
text := make(chan string)
go func() {
defer close(text)
for scanner.Scan() {
text <- scanner.Text()
}
}()
if err := h.Servo.PressKey(ctx, key, servo.DurTab); err != nil {
return errors.Wrap(err, "failed to type key")
}
for {
select {
case <-time.After(typeTimeout):
return errors.New("did not detect keycode within expected time")
case out := <-text:
if match := expMatch.FindStringSubmatch(out); match != nil {
testing.ContextLog(ctx, "key pressed: ", match)
return nil
}
}
}
}