blob: bbc3f714160e3a5210fe380ef434776c0e3bea47 [file] [log] [blame]
# Copyright 2022 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from . import customization
from . import helper
from PB.recipes.infra.windows_image_builder import windows_iso as winiso
from PB.recipes.infra.windows_image_builder import windows_image_builder as wib
from PB.recipes.infra.windows_image_builder import sources as src_pb
from PB.recipes.infra.windows_image_builder import dest as dest_pb
class WinISOCustomization(customization.Customization):
""" Windows iso customization support """
def __init__(self, **kwargs):
# register all the default args
super(WinISOCustomization, self).__init__(**kwargs)
# ensure that we have the correct customization
assert self.customization().WhichOneof(
'customization') == 'windows_iso_customization'
# use custom work dir
self._name = self.customization().windows_iso_customization.name
self._workdir = self.m.path.cleanup_dir.joinpath(self._name, 'workdir')
self._scratchpad = self.m.path.cleanup_dir.joinpath(self._name,
'scratchpad')
self._canon_cust = None
helper.ensure_dirs(self.m.file, [self._workdir, self._scratchpad])
def pin_sources(self, ctx):
""" pins the given config by replacing the sources in the customization
Args:
* ctx: dict containing the context for customization
"""
wic = self.customization().windows_iso_customization
if wic.base_image.WhichOneof('src'):
wic.base_image.CopyFrom(self._source.pin(wic.base_image, ctx))
if wic.boot_image.WhichOneof('src'):
wic.boot_image.CopyFrom(self._source.pin(wic.boot_image, ctx))
for x in wic.copy_files:
x.artifact.CopyFrom(self._source.pin(x.artifact, ctx))
def download_sources(self):
""" download_sources downloads the sources in the given config to the
disk """
wic = self.customization().windows_iso_customization
if wic.base_image.WhichOneof('src'):
self._source.download(wic.base_image)
if wic.boot_image.WhichOneof('src'):
self._source.download(wic.boot_image)
for x in wic.copy_files:
self._source.download(x.artifact)
def get_canonical_cfg(self):
""" get_canonical_cfg returns canonical config after removing the name and
dests
Example:
Given a config
Customization{
windows_iso_customization: WinISOImage{
name: "windows_vanilla_gce"
base_image: Src{...}
boot_image: Src{...}
uploads: [...]
copy_files: [...]
}
}
Returns config
Customization{
windows_iso_customization: WinISOImage{
name: ""
base_image: Src{...}
boot_image: Src{...}
copy_files: [...]
}
}
"""
if not self._canon_cust:
wic = self.customization().windows_iso_customization
self._canon_cust = wib.Customization(
windows_iso_customization=winiso.WinISOImage(
base_image=wic.base_image,
boot_image=wic.boot_image,
copy_files=wic.copy_files,
),)
return self._canon_cust
def remove_upload_dests(self):
""" remove_upload_dests removes the upload_dests specified by a config.
This is meant to be used by the try builder to avoid uploading to prod
locations from a try job.
"""
self.customization().windows_iso_customization.uploads.clear()
self.customization().windows_iso_customization.unpacked_uploads.clear()
@property
def outputs(self):
""" return the output(s) of executing this config. Doesn't guarantee that
the output(s) exists"""
uploads = []
build_tags = {}
if self.get_key():
location = 'WIB-ISO/{}.iso'
if self.tryrun:
location = 'WIB-ISO-TRY/{}.iso' # pragma: nocover
output = src_pb.GCSSrc(
bucket='chrome-gce-images', source=location.format(self.get_key()))
build_tags = {
'orig': self._source.get_url(src_pb.Src(gcs_src=output)),
'build_url': self.m.buildbucket.build_url()
}
uploads.append(dest_pb.Dest(
gcs_src=output,
tags=build_tags,
))
wic = self.customization().windows_iso_customization
if wic.uploads:
# inject build tags
if build_tags:
for up in wic.uploads:
up.tags.update(build_tags)
uploads.extend(wic.uploads)
if wic.unpacked_uploads:
# inject build tags
if build_tags:
for up in wic.unpacked_uploads:
up.tags.update(build_tags)
uploads.extend(wic.unpacked_uploads)
return uploads
@property
def inputs(self):
""" inputs returns the input(s) required for this customization.
inputs here refer to any external refs that might be required for
this customization
"""
inputs = []
wic = self.customization().windows_iso_customization
if wic.base_image.WhichOneof('src'):
inputs.append(wic.base_image)
if wic.boot_image.WhichOneof('src'):
inputs.append(wic.boot_image)
for x in wic.copy_files:
inputs.append(x.artifact)
return inputs
@property
def context(self):
""" context returns a dict containing the local_src id mapping to output
src.
"""
if not self.outputs:
return {} #pragma: nocover
return {
'{}-output'.format(self.id): self._source.dest_to_src(self.outputs[0])
}
def execute_customization(self):
""" execute_customization generates the required iso image.
"""
output = self.customization().windows_iso_customization
with self.m.step.nest('Windows iso customization {}'.format(output.name)):
try:
# Use staging to unpack the iso for modification
iso_dir = self._workdir / 'staging'
boot = None
# Copy base image
if output.base_image.WhichOneof('src'):
self.copy_base_image(output.base_image, iso_dir)
# Copy all the modifications
for cf in output.copy_files:
self.copy_files_to_image(cf, iso_dir)
if output.boot_image.WhichOneof('src'):
src = self._source.download(output.boot_image)
# Copy the boot_image to /boot dir
self.m.file.copy('Add {}'.format(src), src, iso_dir / 'boot')
boot = iso_dir.joinpath('boot', self.m.path.basename(src))
compressed_archive = self._workdir.joinpath('{}.zip'.format(
output.name))
if output.unpacked_uploads:
do_archive = False
# Only archive if gcs upload is needed. CIPD can do its own archiving
for package in output.unpacked_uploads:
if package.WhichOneof('dest') == 'gcs_src':
do_archive = True
break
if do_archive:
# archive the iso staging as we need to upload it
self.m.archive.package(iso_dir).archive(
'Compress contents for upload', compressed_archive)
output_image = self._workdir.joinpath(output.name + '.iso')
# package everything into an iso
self.generate_iso_image(
output.name, boot=boot, directory=iso_dir, output=output_image)
# Upload the image
self._source.upload_package(self.outputs[0], output_image)
orig_tag = self.outputs[0].tags['orig']
build_url = self.outputs[0].tags['build_url']
for package in output.uploads:
package.tags['orig'] = orig_tag
package.tags['build_url'] = build_url
self._source.upload_package(package, output_image)
for package in output.unpacked_uploads:
package.tags['orig'] = orig_tag
package.tags['build_url'] = build_url
if package.WhichOneof('dest') == 'cipd_src':
self._source.upload_package(package, iso_dir, dir_contents=True)
else:
self._source.upload_package(package, compressed_archive)
finally:
# cleanup everything
with self.m.step.nest('Cleanup customization') as n:
self.m.file.rmcontents('clean workdir', self._workdir)
self.m.file.rmcontents('clean scratchpad', self._scratchpad)
def copy_base_image(self, base_image, iso_staging):
""" copy_base_image mounts the given iso image and copies the contents to
given iso_staging dir.
Args:
* base_image: sources.Src proto object representing the iso image
* iso_staging: dir path where we stage the iso to be packaged
"""
url = self._source.get_url(base_image)
with self.m.step.nest('Copy {} to staging'.format(url)) as pres:
base_image_path = self._source.get_local_src(base_image)
loop, mount_loc = self.m.qemu.mount_disk_image(
self._source.get_local_src(base_image), partitions=None)
try:
pres.logs['mount_path'] = mount_loc[0]
pres.logs['loop'] = loop
# Copy the base image to the staging dir (iso_dir)
self.m.file.copytree('Copy base image', mount_loc[0], iso_staging)
# default permissions are 0555
self.m.step(
'Set permissions for base image',
cmd=['chmod', '0755', '-Rv', iso_staging])
finally:
self.m.qemu.unmount_disk_image(loop, partitions=None)
def copy_files_to_image(self, cf, iso_staging):
""" copy_files_to_image copies the given file to the image.
If the given file is an archive, extracts the archive contents. If mount
is set and there is only one file in the archive or input is just one file
(that is the image to mount), Mounts the image and then copies the file
at given source. Otherwise just copies the given file to the destination.
Args:
* cf: windows_iso.CopyFile proto object representing the file
* iso_staging: location to copy the file to
"""
src = self._source.get_local_src(cf.artifact)
src_url = self._source.get_url(cf.artifact)
partitions = None
dest = iso_staging / cf.dest
loop = ''
with self.m.step.nest('Copy {} to staging'.format(src_url)) as n:
# If src is an archive of sorts
if str(src).endswith('.zip') or str(src).endswith('.tar'):
# If this is a archive. Extract the required file to scratchpad
self.m.archive.extract(
'Unpack {} to {}'.format(src, self._scratchpad),
archive_file=src,
output=self._scratchpad)
# In the situation that this archive contains an image to extract. Let's
# figure out the path for the image
if cf.mount:
contents = self.m.file.listdir(
name='Checking the contents of {}'.format(src),
source=self._scratchpad,
recursive=False,
test_data=['image.file'])
if (len(contents) == 0): #pragma: nocover
raise self.m.step.StepFailure('Nothing was extracted to mount')
elif (len(contents) > 1): #pragma: nocover
raise self.m.step.StepFailure('Cannot determine what to mount')
else:
image = contents[0]
partitions = None if str(image).endswith('iso') else [1]
return self.mount_copy(image, cf.source, dest, partitions)
else:
src = self._scratchpad / cf.source
# Copy the given file as is to the target location
return self.copy(src, dest)
if cf.mount:
# If we are copying from an iso. There are no partitions
partitions = None if str(src).endswith('iso') else [1]
return self.mount_copy(src, cf.source, dest, partitions)
# Append source if given
if cf.source:
src = src / cf.source
# Copy the given file as is to the target location
return self.copy(src, dest)
def mount_copy(self, image, source, dest, partitions=None):
""" mount_copy mounts the give image[partition] and copies the file given
by source to dest
Args:
* image: path to the image
* source: file/dir to copy in the said image
* dest: location to copy source to
* partitions: If the image is a disk image then partitions to mount
"""
loop, mount_loc = self.m.qemu.mount_disk_image(image, partitions=partitions)
try:
# Copy the file from mounted location
src = mount_loc[0] + '/' + source
self.copy(src, dest)
finally:
self.m.qemu.unmount_disk_image(loop, partitions=partitions)
return
def copy(self, src, dest):
""" copy is a helper function that unifies the action of copying dir or file
Args:
* src: path to the file to be copied
* dest: destination to copy the file to
"""
if self.m.path.isdir(src):
self.m.file.copytree('Copy {} to {}'.format(src, dest), src, dest)
else:
self.m.file.copy('Copy {} to {}'.format(src, dest), src, dest)
def generate_iso_image(self, name, boot, directory, output):
""" generate_iso_image creates an iso image (output) from the given
directory
Args:
* name: The name of the image
* boot: The bootloader for the iso image
* directory: The staging directory for the image
* output: The output image to be generated
"""
# For info regarding el-torito boot image and genisoimage
# See: https://wiki.osdev.org/El-Torito
# See: https://wiki.osdev.org/Genisoimage
cmd = [
'genisoimage', # use genisoimage to generate the iso image
]
if boot:
cmd.append('-b{}'.format(boot)) # use this as the bootloader image
elif self._arch == "aarch64":
cmd.append('-befi/microsoft/boot/efisys_noprompt.bin') # pragma: nocover
else:
# x86_64 or x86 options
cmd.append('-bboot/etfsboot.com')
cmd.append('-no-emul-boot')
cmd.append('--eltorito-alt-boot')
cmd.append('-befi/microsoft/boot/efisys_noprompt.bin')
cmd.extend([
'-no-emul-boot', # dont emulate boot image as floppy
'--hide',
'*',
'--udf', # include UDF filesystem support
'-iso-level',
'3', # Use level 3
'-allow-limited-size', # allow files bigger than 4GB
'-V',
'{}'.format(name), # name for the iso being generated
'-o',
output, # write to this file
directory # directory to be used for this image
])
self.m.step('Generate iso image', cmd=cmd)