blob: c9464177552654b21b9f3c6ed3c74fb0f68e8e1c [file] [log] [blame] [edit]
# Copyright 2012 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Driver for board config controls of type=ec.
Provides the following EC controlled function:
lid_open
kbd_en
kbd_m1_a0
kbd_m1_a1
kbd_m2_a0
kbd_m2_a1
dev_mode (Temporary. See crosbug.com/p/9341)
"""
import logging
import re
import time
import typing
from servo.drv import pty_driver
KEY_STATE = [0, 1, 1, 1, 1]
# Key matrix row and column mapped from kbd_m*_a*
KEY_MATRIX = [[[(0, 4), (11, 4)], [(2, 4), None]], [[(0, 2), (11, 2)], [(2, 2), None]]]
# EC console mask for enabling only command channel
COMMAND_CHANNEL_MASK = 0x1
# EC PD Flags
PD_FLAGS_EXPLICIT_CONTRACT = 1 << 6
PD_FLAGS_TS_DTS_PARTNER = 1 << 16
PE_FLAGS_EXPLICIT_CONTRACT = 1 << 9
TC_FLAGS_TS_DTS_PARTNER = 1 << 1
TC_FLAGS_PARTNER_PD_CAPABLE = 1 << 12
class ecError(pty_driver.PtyError):
"""Exception class for ec."""
class ec(pty_driver.PtyDriver):
"""Object to access drv=ec controls.
Note, instances of this object get dispatched via base class,
HwDriver's get/set method. That method ultimately calls:
"_[GS]et_%s" % params['subtype'] below.
For example, a control to read kbd_en would be dispatched to
call _Get_kbd_en.
"""
def _drv_init(self):
"""Driver specific initializer."""
super(ec, self)._drv_init()
self._role_swap_delay = float(self._params.get("role_swap_delay", 1.0))
# Add locals to the values dictionary.
if "kbd" not in self._interface._uart_state:
self._interface._uart_state["kbd"] = list(KEY_STATE)
def _limit_channel(self):
"""Save the current console channel setting and limit the output to the
command channel (only print output from commands issued on console).
Raises:
ecError: when failing to retrieve channel settings
"""
self._issue_cmd("chan save")
self._issue_cmd("chan %d" % COMMAND_CHANNEL_MASK)
def _restore_channel(self):
"""Load saved channel setting"""
self._issue_cmd("chan restore")
def _set_key_pressed(self, key_rc, pressed):
"""Press/release a key.
Args:
key_rc: Tuple containing row and column of the key.
pressed: 0=release, 1=press.
"""
if key_rc is None:
return
self._issue_cmd("kbpress %d %d %d" % (key_rc + (pressed,)))
def _get_mx_ax_index(self, m, a):
"""Get the index of a kbd_mx_ax control.
Args:
m: Selection of kbd_m1 or kbd_m2. Note the value is 0/1, not 1/2.
a: Selection of a0 and a1.
"""
return m * 2 + a + 1
def _set_both_keys(self, pressed):
"""Press/release both simulated key.
Args:
pressed: 0=release, 1=press.
"""
m1_a0, m1_a1, m2_a0, m2_a1 = self._interface._uart_state["kbd"][1:5]
self._set_key_pressed(KEY_MATRIX[1][m2_a0][m2_a1], pressed)
self._set_key_pressed(KEY_MATRIX[0][m1_a0][m1_a1], pressed)
def _Set_kbd_en(self, value):
"""Enable/disable keypress simulation."""
org_value = self._interface._uart_state["kbd"][0]
if org_value == 0 and value == 1:
self._set_both_keys(pressed=1)
elif org_value == 1 and value == 0:
self._set_both_keys(pressed=0)
self._interface._uart_state["kbd"][0] = value
def _Get_kbd_en(self):
"""Retrieve keypress simulation enabled/disabled.
Returns:
0: Keyboard emulation is disabled.
1: Keyboard emulation is enabled.
"""
return self._interface._uart_state["kbd"][0]
def _Set_kbd_mx_ax(self, m, a, value):
"""Implementation of _Set_kbd_m*_a*
Args:
m: Selection of kbd_m1 or kbd_m2. Note the value is 0/1, not 1/2.
a: Selection of a0 and a1.
value: The new value to set.
"""
org_value = self._interface._uart_state["kbd"][self._get_mx_ax_index(m, a)]
if self._Get_kbd_en() == 1 and org_value != value:
org_value = [
self._interface._uart_state["kbd"][self._get_mx_ax_index(m, 0)],
self._interface._uart_state["kbd"][self._get_mx_ax_index(m, 1)],
]
new_value = list(org_value)
new_value[a] = value
self._set_key_pressed(KEY_MATRIX[m][org_value[0]][org_value[1]], 0)
self._set_key_pressed(KEY_MATRIX[m][new_value[0]][new_value[1]], 1)
self._interface._uart_state["kbd"][self._get_mx_ax_index(m, a)] = value
def _Set_kbd_m1_a0(self, value):
"""Setter of kbd_m1_a0."""
self._Set_kbd_mx_ax(0, 0, value)
def _Get_kbd_m1_a0(self):
"""Getter of kbd_m1_a0."""
return self._interface._uart_state["kbd"][self._get_mx_ax_index(0, 0)]
def _Set_kbd_m1_a1(self, value):
"""Setter of kbd_m1_a1."""
self._Set_kbd_mx_ax(0, 1, value)
def _Get_kbd_m1_a1(self):
"""Getter of kbd_m1_a1."""
return self._interface._uart_state["kbd"][self._get_mx_ax_index(0, 1)]
def _Set_kbd_m2_a0(self, value):
"""Setter of kbd_m2_a0."""
self._Set_kbd_mx_ax(1, 0, value)
def _Get_kbd_m2_a0(self):
"""Getter of kbd_m2_a0."""
return self._interface._uart_state["kbd"][self._get_mx_ax_index(1, 0)]
def _Set_kbd_m2_a1(self, value):
"""Setter of kbd_m2_a1."""
self._Set_kbd_mx_ax(1, 1, value)
def _Get_kbd_m2_a1(self):
"""Getter of kbd_m2_a1."""
return self._interface._uart_state["kbd"][self._get_mx_ax_index(1, 1)]
def _Set_lid_open(self, value):
"""Setter of lid_open.
Args:
value: 0=lid closed, 1=lid opened.
"""
if value == 0:
self._issue_cmd("lidclose")
else:
self._issue_cmd("lidopen")
def _Get_volume_up(self):
"""Getter of Volup for Ryu"""
result = self._issue_cmd_get_results(
"btnpress volup", [r"Button volup pressed = (\d+)"]
)[0]
return int(result[1])
def _Set_volume_up(self, value):
"""Setter of Volup for Ryu
Args:
value: 1=button pressed, 0=button released
"""
self._issue_cmd("btnpress volup %d" % int(value))
def _Get_volume_down(self):
"""Getter of Voldown for Ryu"""
result = self._issue_cmd_get_results(
"btnpress voldown", [r"Button voldown pressed = (\d+)"]
)[0]
return int(result[1])
def _Set_volume_down(self, value):
"""Setter of Voldown for Ryu
Args:
value: 1=button pressed, 0=button released
"""
self._issue_cmd("btnpress voldown %d" % int(value))
def _Set_button_hold(self, value):
"""Setter for a button hold on the ec.
'pwrbtn' or button for tablets/detachables.
Args:
value: number of ms to hold the volume button. Has to be at least 1.
"""
if value < 1:
self._logger.error(
"Trying to set ec button press to %d ms. Overwriting "
"the value to be 1ms.",
value,
)
value = 1
# One of 'button' or 'powerbtn'
cmd = self._params.get("ec_cmd")
# 'button' cmd requires vup|vdown or both while 'powerbtn' requires
# no args.
argline = self._params.get("ec_args", "")
ec_cmd = "%s %s %d" % (cmd, argline, value)
self._issue_cmd(ec_cmd)
# Issuing a console command is async. Wait the button release.
time.sleep(value / 1000.0)
def _Get_warm_reset(self):
"""Getter of warm_reset (active low).
Query the power sequence state. Return the value as if it is the legacy
servo warm_reset pin.
Returns:
0: warm_reset on.
1: warm_reset off.
"""
self._limit_channel()
try:
result = self._issue_cmd_get_results(
"powerinfo", ["power state [0-9]* = (.*),"]
)[0][1]
if "S0" in result or "S3" in result:
return 1
except pty_driver.PtyError:
# If EC UART has no respond, treat it as warm_reset on.
pass
finally:
self._restore_channel()
return 0
def _Set_warm_reset(self, value):
"""Setter of warm_reset (active low).
Use the power sequence to emulate the legacy servo warm_reset pin.
The servo warm_reset pin is active low.
Args:
value: 0=on -> power off the AP
1=off -> power on the AP
"""
if value == 0:
self._issue_cmd("power off")
else:
self._issue_cmd("power on")
def _Get_milliwatts(self):
"""Retrieves power measurements for the battery.
Battery command in the EC currently exposes the following information:
Temp: 0x0be1 = 304.1 K (31.0 C)
Manuf: SUNWODA
Device: S1
Chem: LION
Serial: 0x0000
V: 0x1ef7 = 7927 mV
V-desired: 0x20d0 = 8400 mV
V-design: 0x1ce8 = 7400 mV
I: 0x06a9 = 1705 mA(CHG)
I-desired: 0x06a4 = 1700 mA
Mode: 0x7f01
Charge: 66 %
Abs: 65 %
Remaining: 5489 mAh
Cap-full: 8358 mAh
Design: 8500 mAh
Time-full: 2h:47
Empty: 0h:0
This method currently returns a subset of above.
Returns:
Dictionary where:
mv: battery voltage in millivolts
ma: battery amps in milliamps
mw: battery power in milliwatts
"""
# The uart often drops some of the output of the battery cmd.
retries = 3
while retries > 0:
retries -= 1
try:
self._limit_channel()
results = self._issue_cmd_get_results(
"battery",
[r"V:[\s0-9a-fx]*= (-*\d+) mV", r"I:[\s0-9a-fx]*= (-*\d+) mA"],
)
self._restore_channel()
break
except pty_driver.PtyError as e:
if retries <= 0:
raise
logging.warning("Battery cmd failed, retrying: %s", e)
result = {"mv": int(results[0][1], 0), "ma": int(results[1][1], 0) * -1}
return result["ma"] * result["mv"] / 1000.0
def _Set_fan_target_rpm(self, value):
"""Set target fan RPM.
This function sets target fan RPM or turns on auto fan control.
Args:
value: Non-negative values for target fan RPM. -1 is treated as maximum
fan speed. -2 is treated as auto fan speed control.
"""
if value == -2:
self._issue_cmd("autofan")
else:
# "-1" is treated as max fan RPM in EC, so we don't need to handle that
self._issue_cmd("fanset %d" % value)
def _Get_feat(self): # pylint: disable=invalid-name
"""Retrieves the EC feature flags encoded as a hexadecimal."""
self._limit_channel()
try:
result = self._issue_cmd_get_results(
"feat",
[
r"Command 'feat' not found or ambiguous\.|"
r"0-31: (0x[0-9a-f]{8})\s*32-63: (0x[0-9a-f]{8})"
],
)
except pty_driver.PtyError:
raise ecError("Cannot retrieve the feature flags on EC console.")
finally:
self._restore_channel()
if result[0] == "Command 'feat' not found or ambiguous.":
return "0x0"
return hex((int(result[0][2], 16) << 32) | int(result[0][1], 16))
def _read_port_role(self, p: int) -> typing.Tuple[str, int]:
"""Reads the PD state.
The returned confidence score is to detect which port the servo is attached.
The port with the highest score is likely to be the servo.
Args:
p: The port to read
Returns: the tuple (data role, confidence score)
"""
pd_state_cmd = "pd %d state"
if self._params.get("pdc") == "yes":
pd_state_cmd = "pdc status %d"
result = self._issue_cmd_get_results(
pd_state_cmd % p,
[
r"Invalid port|Parameter 2 invalid|Role: ([A-Z]+)-([A-Z]+)(-\S*)? (.*)\n",
],
flush=True,
)
# TC Flags should always be present on TPMCv2
tc_flags = 0
pe_flags = 0
v1_flags = 0
m = re.match(r"TC State: \S* Flags: (0x\S+)", result[0][4])
if m:
tc_flags = int(m.group(1), 16)
# PE Flags are only sometimes present
m = re.match(r".*PE State: \S* Flags: (0x\S+)", result[0][4])
if m:
pe_flags = int(m.group(1), 16)
else:
# If no TC Flags, must be TPMCv1
m = re.match(r".*Flags: (0x\S+)", result[0][4])
if m:
v1_flags = int(m.group(1), 16)
if result[0] == "Parameter 2 invalid" or result[0] == "Invalid port":
logging.warning(
"Model doesn't have %d USB-C ports, please edit dut_pd_data_role "
"port_count in the board overlay.xml file",
p + 1,
)
return "", -1
confidence = 0
logging.debug(
"role=%s tc_flags=%x pe_flags=%x v1_flags=%x",
result[0][2],
tc_flags,
pe_flags,
v1_flags,
)
# Servo usually has DTS enabled, the cc command can enable or disable it.
if tc_flags & TC_FLAGS_TS_DTS_PARTNER or v1_flags & PD_FLAGS_TS_DTS_PARTNER:
confidence += 1
# Servo in src mode will have explicit contract
if (
tc_flags & TC_FLAGS_PARTNER_PD_CAPABLE
or pe_flags & PE_FLAGS_EXPLICIT_CONTRACT
or v1_flags & PD_FLAGS_EXPLICIT_CONTRACT
):
confidence += 1
return result[0][2], confidence
def _find_servo_port(self) -> typing.Tuple[int, str]:
"""Find the USB-C port that has something (presumable servo) attached.
Returns: the tuple (port number, data role)
"""
port_count = int(self._params.get("port_count"))
best_port = None
best_score = -1
best_role = None
for p in range(port_count):
role, confidence = self._read_port_role(p)
if confidence > best_score:
best_score = confidence
best_port = p
best_role = role
if best_role:
logging.debug("Assuming servo is on port %d role=%s", best_port, best_role)
return best_port, best_role
raise ecError("Could not find any connected USB-C port.")
def _Get_dut_pd_data_role(self) -> str:
self._limit_channel()
try:
port, role = self._find_servo_port()
return role
finally:
self._restore_channel()
def _Set_dut_pd_data_role(self, value: str) -> None:
value = value.upper()
if value not in ["DFP", "UFP"]:
raise ValueError("Bad data role (%s). Try 'DFP' or 'UFP'." % value)
self._limit_channel()
try:
port, role = self._find_servo_port()
if role == value:
return
pd_role_swap_cmd = "pd %d swap data"
if self._params.get("pdc") == "yes":
pd_role_swap_cmd = "pdc drs %d"
self._issue_cmd(pd_role_swap_cmd % port)
time.sleep(self._role_swap_delay)
role, _unused = self._read_port_role(port)
if role != value:
raise ecError(
"Failed to set port %d to PD role %s, got: %s" % (port, value, role)
)
finally:
self._restore_channel()
def _Set_pdc_ccd_keepalive_en(self, value: int) -> None:
"""Setter for pdc_ccd_keepalive_en
Input value:
0 for normal mux operation
1 to force the SBU mux into debug (CCD forced on). Needed in conjunction with
GSC's rddkeepalive for DUTs using PDC-driven CCD."""
if value == 0:
cmd = "pdc sbumux normal"
elif value == 1:
cmd = "pdc sbumux debug"
else:
raise ValueError("Value must be 0 or 1")
try:
self._issue_cmd(cmd)
except pty_driver.PtyError as e:
raise ecError(
f"Cannot run `{cmd}`. Is this a DUT with PDC-driven CCD? "
"(CONFIG_USBC_PDC_DRIVEN_CCD)"
) from e
def _Get_pdc_ccd_keepalive_en(self) -> int:
"""Getter for pdc_ccd_keepalive_en
Returns:
0 for normal operation
1 when SBU mux is forced into debug (CCD forced on)"""
cmd = "pdc sbumux"
try:
results = self._issue_cmd_get_results(
cmd, [r"CCD Port: C\d+, Mode: (\w+) \(\d+\)"]
)
except pty_driver.PtyError as e:
raise ecError(
f"Cannot run `{cmd}`. Is this a DUT with PDC-driven CCD? "
"(CONFIG_USBC_PDC_DRIVEN_CCD)"
) from e
if results[0][1] == "NORMAL":
return 0
elif results[0][1] == "FORCE_DEBUG":
return 1
else:
raise ecError(f"Unexpected `{cmd}` output: '{results[0][0]}'")