blob: aec93239bbef81875fed721f5b6bdca6f5b8e9ff [file] [log] [blame]
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Provides tools for packaging, installing and testing firmware."""
from mtlib.util import Path, RequiredRegex, Execute, SafeExecute, GitRepo
from tempfile import NamedTemporaryFile
import tarfile
import os
import io
import time
src_dir = Path("/mnt/host/source/src/")
script_dir = Path(__file__).parent
templates_dir = script_dir / "templates"
ebuild_template_file = templates_dir / "ebuild.template"
class FirmwareException(Exception):
pass
class FirmwareBinary(object):
"""."""
def __init__(self, filename, fileobj=None, symlink=None):
self.hw_version = None
self.fw_version = None
self.symlink_name = None
if not fileobj:
fileobj = open(filename, "rb")
self.data = fileobj.read()
if symlink:
self.symlink_name = symlink
name_regex = RequiredRegex("([0-9.a-zA-Z]+)_([0-9.a-zA-Z]+)\\.bin")
match = name_regex.Match(filename, must_succeed=False)
if match:
self.hw_version = match.group(1)
self.fw_version = match.group(2)
self.filename = filename
self.device_file = Path("/opt/google/touch/firmware", filename)
def UpdateName(self, hw_version, fw_version):
name = "{}_{}.bin".format(hw_version, fw_version)
os.rename(self.filename, name)
self.filename = name
def ForceUpdate(self, device, remote):
remote.RemountWriteable()
symlink = Path("/lib/firmware", device.symlink)
symlink_bak = Path(str(symlink) + ".bak")
target = Path("/opt/google/touch/firmware/force_update.fw")
device.symlink = symlink.basename
remote.Write(str(target), self.data)
try:
remote.SafeExecute(["mv", str(symlink), str(symlink_bak)])
try:
remote.SafeExecute(["ln", "-s", str(target), str(symlink)])
device.ForceFirmwareUpdate(remote)
finally:
remote.SafeExecute(["mv", str(symlink_bak), str(symlink)])
finally:
remote.SafeExecute(["rm", str(target)])
def __str__(self):
symlink = "Unknown"
if self.symlink_name:
symlink = "/lib/firmware/" + self.symlink_name
return "%s @%s" % (self.filename, symlink)
def __repr__(self):
return str(self)
class FirmwarePackage(object):
"""Helper class to deal with firmware installation on devices."""
name = "firmware"
def __init__(self, board, variant):
self.board = board
self.variant = variant
self.binaries = {}
# determine path and name of touch firmware ebuild file
if variant:
overlay_name = "overlay-variant-{}-{}-private".format(
board, variant)
else:
overlay_name = "overlay-{}-private".format(board)
self.overlay = src_dir / "private-overlays" / overlay_name
self.bcsname = "bcs-{}-private".format(variant if variant else board)
self.ebuild_name = "chromeos-touch-firmware-{}".format(
variant if variant else board)
self.ebuild_dir = self.overlay / "chromeos-base" / self.ebuild_name
self.ebuild_repo = GitRepo(self.ebuild_dir)
self.ebuild_file = self.ebuild_dir / "{}-0.0.1.ebuild".format(
self.ebuild_name)
# look for symlink to ebuild file
self.ebuild_symlink = None
self.bcs_url = None
self.ebuild_version = None
if self.ebuild_file.exists:
for symlink in self.ebuild_dir.ListDir():
if symlink.is_link and symlink.basename.startswith(self.ebuild_name):
self.ebuild_symlink = symlink
if self.ebuild_symlink:
# extract ebuild version from symlink name
regex = "{}-([0-9a-zA-Z_\\-\\.]*).ebuild"
regex = RequiredRegex(regex.format(self.ebuild_name))
match = regex.Search(self.ebuild_symlink.basename)
self._UpdateVersion(match.group(1))
def _UpdateVersion(self, version):
self.ebuild_version = version
self.ebuild_symlink = self.ebuild_dir / "{}-{}.ebuild".format(
self.ebuild_name, version)
url = "gs://chromeos-binaries/HOME/{}/{}/chromeos-base/{}/{}-{}.tbz2"
self.bcs_url = url.format(self.bcsname, self.overlay.basename,
self.ebuild_name, self.ebuild_name,
version)
def _ExtractSymlinkfromEbuild(self, firmware):
ebuild = open(str(self.ebuild_file), "r").read()
regex = "dosym \"{}\" \"/lib/firmware/([a-zA-Z0-9_\\-.]+)\""
regex = RequiredRegex(regex.format(firmware.device_file))
match = regex.Search(ebuild, must_succeed=False)
if match:
return match.group(1)
else:
return None
def GetExistingBinaries(self):
if not self.ebuild_version:
return
tar_file = NamedTemporaryFile("rb")
res = Execute(["gsutil", "cp", self.bcs_url, tar_file.name])
if not res:
return
tar_file.seek(0)
tar = tarfile.open(fileobj=tar_file)
for member in tar.getmembers():
if not member.isfile():
continue
name = os.path.basename(member.name)
fileobj = tar.extractfile(member)
binary = FirmwareBinary(name, fileobj)
binary.symlink_name = self._ExtractSymlinkfromEbuild(binary)
yield binary
def AddBinary(self, binary):
self.binaries[binary.hw_version] = binary
def GenerateBCSPackage(self, version):
tar_name = "{}-{}".format(self.ebuild_name, version)
tar_file = "{}.tbz2".format(tar_name)
tar = tarfile.open(tar_file, "w:bz2")
for binary in self.binaries.values():
data = io.BytesIO(binary.data)
path = tar_name + str(binary.device_file)
info = tarfile.TarInfo(path)
info.size = len(binary.data)
info.mode = 0755
info.uid = 0
info.gid = 0
info.mtime = time.time()
info.uname = "root"
info.gname = "root"
tar.addfile(info, data)
return Path(tar_file)
def UpdateVersionSymlink(self, version):
new_symlink = self.ebuild_dir / "{}-{}.ebuild".format(
self.ebuild_name, version)
old_symlink = self.ebuild_symlink
if old_symlink and old_symlink != new_symlink:
self.ebuild_repo.Move(old_symlink, new_symlink)
if not new_symlink.is_link:
SafeExecute(["ln", "-s", self.ebuild_file.basename, str(new_symlink)])
self.ebuild_repo.Add(new_symlink)
self._UpdateVersion(version)
def UpdateBinarySymlinks(self, remote):
device_info = remote.GetDeviceInfo()
devices = device_info.touch_devices
for firmware in self.binaries.values():
if firmware.symlink_name:
continue
if firmware.hw_version not in devices:
msg = "Cannot find device for binary {}"
raise Exception(msg.format(firmware))
device = devices[firmware.hw_version]
firmware.symlink_name = device.symlink
symlink_names = [b.symlink_name for b in self.binaries.values()]
if len(set(symlink_names)) != len(symlink_names):
raise Exception("Duplicate symlink names for firmwares found")
def UpdateSrcInstall(self, remote, dosym_lines):
ebuild = self.ebuild_file.Read()
install_idx = ebuild.find("src_install")
begin = ebuild.find("{", install_idx)
# find closing bracket
brackets = 0
end = len(ebuild)
for end in range(begin, len(ebuild)):
if ebuild[end] == "{":
brackets = brackets + 1
elif ebuild[end] == "}":
brackets = brackets - 1
if brackets == 0:
end = end + 1
break
# write ebuild with new src_install method
out = self.ebuild_file.Open("w")
out.write(ebuild[:begin])
out.write("{\n")
out.write("\tinsinto /\ndoins -r */*\n")
for line in dosym_lines:
out.write("\t{}\n".format(line))
out.write("}\n")
out.write(ebuild[end:].strip())
out.close()
def GenerateEbuildFile(self, remote, dosym_lines):
rdepend = "\tchromeos-base/touch_updater"
if self.variant:
line = "\t!chromeos-base/chromeos-touch-firmware-{}"
rdepend += line.format(self.board)
template = ebuild_template_file.Read()
variables = {
"year": time.strftime("%Y"),
"rdepend": rdepend,
"bcs": self.bcsname,
"overlay": self.overlay.basename,
"dosym_lines": "\n\t".join(dosym_lines)}
ebuild = template.format(**variables)
self.ebuild_file.Write(ebuild)
def UpdateEbuildFile(self, remote, regenerate=False):
self.UpdateBinarySymlinks(remote)
dosym_lines = []
for firmware in self.binaries.values():
line = "dosym \"%s\" \"/lib/firmware/%s\""
dosym_lines.append(line % (firmware.device_file, firmware.symlink_name))
if regenerate or not self.ebuild_file.exists:
self.GenerateEbuildFile(remote, dosym_lines)
else:
self.UpdateSrcInstall(remote, dosym_lines)
def VerifySymlinks(self, remote):
valid = True
for firmware in self.GetExistingBinaries():
cmd = "readlink -f /lib/firmware/{}".format(firmware.symlink_name)
target = remote.Execute(cmd)
if not target:
msg = "Symlink for firmware '{}' does not exist on device"
print msg.format(firmware)
valid = False
target = Path(target)
if target.basename != firmware.device_file.basename:
msg = "Symlink for firmware '{}' does not point to the right file"
print msg.format(firmware)
valid = False
cmd = "ls {}".format(firmware.device_file)
if remote.Execute(cmd) is False:
msg = "Firmware file {} does not exist on device"
print msg.format(firmware.device_file)
valid = False
return valid
def VerifyFirmwareVersions(self, remote):
device_info = remote.GetDeviceInfo(refresh=True)
binaries = dict([(b.hw_version, b) for b in self.GetExistingBinaries()])
valid = True
for device in device_info.touch_devices.values():
if device.hw_version not in binaries:
continue
firmware = binaries[device.hw_version]
if firmware.fw_version != device.fw_version:
print "Device {} did not update correctly:".format(device.hw_version)
print "Device version {} != firmware version {}".format(
device.fw_version, firmware.fw_version)
valid = False
return valid
def __str__(self):
res = " firmwares:\n"
for firmware in self.firmwares.values():
res += " {}".format(firmware)
return res