blob: dca08449133f1da24881bd3ae2487c6b2861cb0d [file] [log] [blame]
# Copyright 2021 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from PB.recipes.infra.windows_image_builder import windows_image_builder as wib
from PB.recipes.infra.windows_image_builder import sources
from . import helper
class GCSManager:
"""
GCSManager is used to download required artifacts from google cloud storage
and generate a pinned config for the downloaded artifacts. Also supports
uploading artifacts to GCS.
"""
def __init__(self, module, cache):
""" __init__ copies few module objects and cache dir path into class vars
Args:
* module: module object with all dependencies
* cache: path to cache file dir. Files from gcs will be saved here
"""
self.m = module
self._cache = cache
# dict mapping unpinned url to pinned package
self._pinned_srcs = {}
# dict mapping downloaded package paths to package
self._downloaded_srcs = {}
# set containing all the existing packages
self._existence = set()
def pin_package(self, gcs_src):
""" pin_package takes a gcs_src and returns a pinned gcs_src
Args:
* gcs_src: sources.GCSSrc type referring to a package
"""
pkg_url = self.get_gs_url(gcs_src)
if pkg_url in self._pinned_srcs:
return self._pinned_srcs[pkg_url] # pragma: no cover
else:
pin_url = self.get_orig(pkg_url)
if pin_url:
# This package was linked to another
b, s = self.get_bucket_source(pin_url)
gcs_src.bucket = b
gcs_src.source = s
# Add pkg_url as that's the only one we know that exists. Not the
# resolved pin
self._existence.add(pkg_url)
self._pinned_srcs[pkg_url] = gcs_src
return gcs_src
def get_orig(self, url):
""" get_orig goes through the metadata to determine original object and
returns url for the original GCS object. See upload_packages
Args:
* url: string representing url that describes a gcs object
"""
res = self.m.gsutil.stat(
url,
name='stat {}'.format(url),
stdout=self.m.raw_io.output(),
ok_ret='any')
ret_code = res.exc_result.retcode
if ret_code == 0:
text = res.stdout.decode('utf-8')
# return the given url if not pinned
orig_url = url
for line in text.split('\n'):
if 'orig:' in line:
orig_url = line.replace('orig:', '').strip()
return orig_url
return ''
# TODO(anushruth): Cover this test path
def exists(self, src): #pragma: no cover
""" exists returns True if the given ref exists on GCS
Args:
* src: sources.Src proto object to check for existence
"""
url = self.get_gs_url(src)
if url in self._existence:
return True
src_exists = self.get_orig(url) != ''
if src_exists:
self._existence.add(url)
return src_exists
def download_package(self, gcs_src):
""" download_package downloads the given package if required and returns
local_path to the package.
Args:
* gcs_src: source.GCSSrc object referencing a package
"""
local_path = self.get_local_src(gcs_src)
if not local_path in self._downloaded_srcs:
self.m.gsutil.download(
gcs_src.bucket,
gcs_src.source,
local_path,
name='download {}'.format(self.get_gs_url(gcs_src)))
self._downloaded_srcs[local_path] = gcs_src
return local_path
def get_local_src(self, gcs_src):
""" get_local_src returns the path to the source on disk
Args:
* gcs_src: sources.GCSSrc proto object referencing an artifact in GCS
"""
# source is usually given as a unix path. Ensure this works on windows
source = gcs_src.source.split('/')
return self._cache.joinpath(gcs_src.bucket, *source)
def get_gs_url(self, gcs_src):
""" get_gs_url returns the gcs url for the given gcs src
Args:
* gcs_src: sources.GCSSrc proto object referencing an artifact in GCS
"""
return 'gs://{}/{}'.format(gcs_src.bucket, gcs_src.source)
def get_bucket_source(self, url):
""" get_bucket_source returns bucket and source given gcs url
Args:
* url: gcs url representing a file on GCS
"""
bs = url.replace('gs://', '')
tokens = bs.split('/')
bucket = tokens[0]
source = bs.replace(bucket + '/', '')
return bucket, source
def upload_package(self, dest, source):
""" upload_package uploads the contents of source on disk to dest.
Args:
* dest: dest.Dest proto object representing an upload location
* source: path to the package on disk to be uploaded
"""
self.stage_upload(dest, source)
# upload the package to gcs
self.m.gsutil.upload(
self.get_local_src(dest.gcs_src),
dest.gcs_src.bucket,
dest.gcs_src.source,
metadata=dest.tags,
name='upload {}'.format(self.get_gs_url(dest.gcs_src)))
def stage_upload(self, dest, source):
""" stage_upload copies the contents to be uploaded to local cache.
Doing this allows not having to do the upload, and still make the file
available for downstream customizations.
Args:
* dest: dest.Dest proto object representing an upload location
* source: path to the package on disk to be uploaded
"""
local_path = self.get_local_src(dest.gcs_src)
if local_path in self._downloaded_srcs: # pragma: no cover
# Staging already done.
return
if not self.m.path.exists(source): # pragma: no cover
raise self.m.step.StepFailure('Cannot find {}'.format(source))
up_path = source
if self.m.path.isdir(source):
# package the dir to zip
up_path = source / 'gcs.zip'
self.m.archive.package(source).archive(
'Package {} for upload'.format(source), up_path)
try:
# ensure that the folder exists
self.m.file.ensure_directory(
'Ensure cache path {}'.format(local_path),
dest=self.m.path.dirname(local_path))
# copy the package to cache
self.m.file.copy('Copy {} to cache'.format(up_path), up_path, local_path)
# Record the local copy to avoid downloads
self._downloaded_srcs[local_path] = dest.gcs_src
# Record existence of the src too
self._existence.add(self.get_gs_url(dest.gcs_src))
finally:
if self.m.path.isdir(source):
self.m.file.remove('Delete {}'.format(up_path), up_path)