blob: f05c6f1e0997304f12159a2c4e30eb364fd2aeb8 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright (c) 2021 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import io
import logging
import re
import zipfile
from abc import ABC, abstractmethod
from distutils.version import LooseVersion
from typing import Optional, Set, Tuple, cast
from google.cloud import storage
from config import (CHROME_UNSIGNED_ARTIFACT_PATH, CHROME_UNSIGNED_BUCKET,
CHROME_UNSIGNED_ZIP_BASE_DIR, LEGACY_BUCKET, LOCAL_BUCKET)
from pipelines import CONTINUE_SEARCH, DOES_NOT_EXIST, BaseProvider, Pipeline
from storage_helper import download_blob, upload_from_string
from versions import (get_version_from_revision, is_valid_revision,
is_valid_version)
LAST_LEGACY_MAJOR = 99
class BaseFileProvider(BaseProvider[Tuple[str, str], bytes], ABC):
"""File providers expect a str tuple (revision, filename), and return a bytes
content (the file)."""
@abstractmethod
def get_bucketname(self):
pass
def __init__(self):
self.bucket = storage.Client().bucket(self.get_bucketname())
class LocalBucketProvider(BaseFileProvider):
"""Retrieve and store files from and to a local (cache) bucket."""
def get_bucketname(self):
return LOCAL_BUCKET
def get_local_path(self, revision, filename):
"""Return the storage path for a given file within the local bucket."""
return f"extracted/{revision}/{filename}"
def retrieve(self, params):
# Retrieve blob content from local bucket
path = self.get_local_path(*params)
blob = download_blob(self.bucket, path)
if blob is None:
return CONTINUE_SEARCH
return blob
def process_response(self, provider, params, content):
# Save file content on local bucket
if provider == self:
return
if content is None:
return
path = self.get_local_path(*params)
blob = self.bucket.blob(path)
upload_from_string(blob, content)
class ZipFileProvider(BaseFileProvider, ABC):
ZIP_TOC_PATH = "tocs/%s/%s"
def __init__(self):
super().__init__()
self.local_bucket = storage.Client().bucket(LOCAL_BUCKET)
@abstractmethod
def get_blobnames(self, revision, version):
pass
@abstractmethod
def get_zip_base_dir(self):
pass
def applies_to_version(self, major_version): # pylint: disable=W0613
return True
def get_zip_toc_path(self, blobname):
bucketname = self.get_bucketname()
return self.ZIP_TOC_PATH % (bucketname, blobname)
def is_file_in_zip(self, blobname, filename) -> Tuple[bool, bool]:
"""Validate that a file is listed in the zip's table of content.
Args:
blobname (str): Name of the zip archive
filename (str): Expected file within the zip
Returns:
Tuple[bool, bool]: (table of content exists, file exists in toc)
"""
toc_path = self.get_zip_toc_path(blobname)
toc_blob = download_blob(self.local_bucket, toc_path)
if toc_blob is None:
logging.info("Requested zip toc for %s, but toc does not exist.",
toc_path)
return False, False
toc_file = toc_blob.decode("utf-8").strip('\n')
return True, filename in toc_file.split("\n")
def save_zip_toc(self, blobname, zip_file) -> Set[str]:
files = filter(lambda zi: not zi.is_dir(), zip_file.infolist())
toc = ""
paths = set()
for f in files:
toc += f"{f.orig_filename}\n"
paths.add(f.orig_filename)
path = self.get_zip_toc_path(blobname)
blob = self.local_bucket.blob(path)
upload_from_string(blob, toc)
return paths
def extract_from_zip_blob(self, blobnames, filename):
"""Extract a file from a list of blobs.
Searches for the first existing blob in a list of blobs and extracts the
file.
Args:
blobnames (str[]): List of potentially existing blobs.
filename (str): File to extract from the blob
Returns:
Union[bytes, DOES_NOT_EXIST, CONTINUE_SEARCH]
"""
path = f"{self.get_zip_base_dir()}{filename}"
for blobname in blobnames:
# Validate that file is in zip's table of content
toc_exists, file_in_toc = self.is_file_in_zip(blobname, path)
if toc_exists and not file_in_toc:
logging.info("Requested file %s not found in toc of archive %s.", path,
blobname)
return DOES_NOT_EXIST
# Retrieve zip archive
content = download_blob(self.bucket, blobname)
if content is None:
logging.warning("Requested file %s, but the archive %s does not exist",
filename, blobname)
continue
# Extract requested file
with io.BytesIO(content) as zip_bytes, \
zipfile.ZipFile(zip_bytes) as zip_file:
if not toc_exists:
paths = self.save_zip_toc(blobname, zip_file)
if path not in paths:
logging.info("File %s not found in gs://%s/%s", filename,
self.get_bucketname(), blobname)
return DOES_NOT_EXIST
return zip_file.read(path)
return CONTINUE_SEARCH
def retrieve(self, params):
revision, name = params
version = get_version_from_revision(revision)
if version is None:
logging.info("Skip provider; no version found for revision %s", revision)
return CONTINUE_SEARCH
major = int(version.split(".")[0])
if not self.applies_to_version(major):
logging.info("Skip provider; major version %s not applicable", major)
return CONTINUE_SEARCH
blobnames = self.get_blobnames(revision, version)
if len(blobnames) == 0:
logging.info("Skip provider; no zip-archive found for revision %s",
revision)
return CONTINUE_SEARCH
return self.extract_from_zip_blob(blobnames, name)
class ChromeSignedProvider(ZipFileProvider):
"""Retrieve devtools-frontend.zip artifacts from chrome's signed binary
bucket.
Artifacts are available from chrome's signed bucket for M100 and
later.
"""
def get_bucketname(self):
return CHROME_UNSIGNED_BUCKET
def get_zip_base_dir(self):
return CHROME_UNSIGNED_ZIP_BASE_DIR
def applies_to_version(self, major_version):
return major_version > LAST_LEGACY_MAJOR
def get_blobnames(self, revision, version): # pylint: disable=W0613
if version is None:
return []
# Generate a list of patch versions down to 0, e.g. 100.0.5911.3 returns
# ["100.0.5911.3", "100.0.5911.2", "100.0.5911.1", "100.0.5911.0"]
v = LooseVersion(version)
patch_versions = [
".".join(version.split(".")[:3] + [str(patch)])
for patch in reversed(range(0, v.version[3] + 1))
]
return [
CHROME_UNSIGNED_ARTIFACT_PATH % patch_version
for patch_version in patch_versions
]
class LegacyBucketMixin:
def get_bucketname(self):
return LEGACY_BUCKET
class LegacyM99ZipProvider(LegacyBucketMixin, ZipFileProvider, ABC):
"""Retrieve devtools-frontend.zip artifacts from the legacy bucket.
Artifacts have been uploaded to the local bucket till M99.
"""
LEGACY_M99_REVS_PATH = "revs/@%s"
LEGACY_M99_ZIPS_PATH = "zips/%s.zip"
def get_zip_base_dir(self):
return ''
def get_meta_filename(self, revision, version): # pylint: disable=W0613
return self.LEGACY_M99_REVS_PATH % revision
def get_blobnames(self, revision, version):
meta_filename = self.get_meta_filename(revision, version)
meta_blob = download_blob(self.bucket, meta_filename)
if meta_blob is None:
logging.warning("Requested file %s does not exist", meta_filename)
return []
zip_file_name = meta_blob.decode("utf-8").strip(' \t\n')
return [self.LEGACY_M99_ZIPS_PATH % zip_file_name]
class LegacyM99LongRevisionProvider(LegacyM99ZipProvider):
"""Retrieve devtools-frontend.zip artifacts for 40-digit revisions."""
def applies_to_version(self, major_version):
return major_version <= LAST_LEGACY_MAJOR
class LegacyM99ShortRevisionProvider(LegacyM99ZipProvider):
"""Retrieve devtools-frontend.zip artifacts for 6-digit revisions.
Initial chrome revisions have been identified by using the first 6
digits only. This provider serves artifacts for those revisions.
6-digit revisions are ambiguous within the Chromium project, and a mapping to
a specific version is not possible. Chromium uses 40-digit revisions more
recently. This provider assumes that all requested 6-digit versions origin
from pre-M100 versions and searches for corresponding files on the legacy
bucket.
The version check is skipped for these short revisions.
"""
def retrieve(self, params):
revision, name = params
if not is_valid_revision(revision, 6):
logging.info("Skip %s; revision %s not applicable", self.name, revision)
return CONTINUE_SEARCH
blobnames = self.get_blobnames(revision, None)
if len(blobnames) == 0:
logging.info("Skip %s; no zip-archive found for revision %s", self.name,
revision)
return CONTINUE_SEARCH
return self.extract_from_zip_blob(blobnames, name)
class LegacyM99StaticVersionProvider(LegacyM99ZipProvider):
"""Retrieve devtools-frontend.zip artifacts for legacy versions."""
LEGACY_M99_VERS_PATH = "vers/%s"
def get_meta_filename(self, revision, version): # pylint: disable=W0613
return self.LEGACY_M99_VERS_PATH % version
def retrieve(self, params):
version, name = params
if not is_valid_version(version):
logging.warning("Skip %s; invalid version %s provided", self.name,
version)
return CONTINUE_SEARCH
# Artifacts are not available for patch versions, so we replace the patch
# number with 0
version = re.sub(r"\.\d+$", ".0", version)
blobnames = self.get_blobnames(None, version)
if len(blobnames) == 0:
logging.info("Skip %s; no zip-archive found for version %s", self.name,
version)
return CONTINUE_SEARCH
return self.extract_from_zip_blob(blobnames, name)
class LegacyM99FilesProvider(LegacyBucketMixin, BaseFileProvider):
"""Retrieve an already extracted file stored.
This provider returns the same files as the legacy
/serve_file/<revision>/<filename> endpoint.
Some of the revisions served via this endpoint are not part of the Chromium
repository. We skip a check for a pre-M100 version.
The legacy bucket has the following structure. For a request (e.g. GET
/serve_file/@e2206c2e9067be8fc1dea2050e67246228949ff/demo.js), the provider
1) searches for the file hash of demo.js in gs://legacy-bucket/meta/@e2206c…
```
911feebcaa974b936128173b5ec89115d354223f:logo.ico
220bcaa974b936128173b5ec89115d354223f8ab:demo.js ◄
f8ab220bcaa974b936128173b5ec89115d354223:bg.jpg
```
2) Serves the file in gs://legacy-bucket/hash/220bca…
"""
LEGACY_M99_META_PATH = "meta/@%s"
LEGACY_M99_HASH_PATH = "hash/%s"
def retrieve(self, params):
revision, name = params
# Load ToC including hashes for this revision
meta_filename = self.LEGACY_M99_META_PATH % revision
meta_blob = download_blob(self.bucket, meta_filename)
if meta_blob is None:
logging.info("Skip provider; meta file %s does not exist", meta_filename)
return CONTINUE_SEARCH
# Find requested file hash and name in ToC
hash_entries = (meta_blob.decode("utf-8").strip("\n").split('\n'))
file_hash = None
for hash_entry in hash_entries:
current_file_hash, current_filename = hash_entry.split(":", maxsplit=1)
if current_filename == name:
file_hash = current_file_hash
break
if file_hash is None:
logging.info("Skip provider; file %s does not exist in %s", name,
meta_filename)
return CONTINUE_SEARCH
# Download file from hash folder
hash_filename = self.LEGACY_M99_HASH_PATH % file_hash
hash_blob = download_blob(self.bucket, hash_filename)
if hash_blob is None:
logging.warning(
"Skip provider; hash file %s does not exist for revision %s",
hash_filename, revision)
return CONTINUE_SEARCH
return hash_blob
# The order is important since the next provider will only be requested if the
# current provider cannot find a matching file. Providers at the top are less
# complete but have a lower latency. We use a lazy init approach to avoid call-
# outs when starting the app.
_REVISION_PIPELINE = None
_VERSION_PIPELINE = None
def get_revision_pipeline() -> Pipeline[Tuple[str, str], bytes]:
global _REVISION_PIPELINE
if _REVISION_PIPELINE is None:
_REVISION_PIPELINE = Pipeline[Tuple[str, str], bytes]([
LocalBucketProvider(),
ChromeSignedProvider(),
LegacyM99LongRevisionProvider(),
LegacyM99ShortRevisionProvider(),
LegacyM99FilesProvider(),
])
return _REVISION_PIPELINE
def get_version_pipeline() -> Pipeline[Tuple[str, str], bytes]:
global _VERSION_PIPELINE
if _VERSION_PIPELINE is None:
_VERSION_PIPELINE = Pipeline[Tuple[str, str], bytes]([
LegacyM99StaticVersionProvider(),
])
return _VERSION_PIPELINE
def get_file_from_revision(revision: str, filename: str) -> Optional[bytes]:
"""Return the content of a file from a revision.
Args:
revision (str): Chrome revision
filename (str): filepath without starting slash /
Returns:
Optional[bytes]: File content or None if no file was retrieved
"""
params = revision, filename
return get_revision_pipeline().retrieve(params)
def get_file_from_version(version: str, filename: str) -> Optional[bytes]:
"""Return the content of a file from a version.
Args:
version (str): Chrome version <major.minor.build.patch>
filename (str): filepath without starting slash /
Returns:
Optional[bytes]: File content or None if no file was retrieved
"""
params = version, filename
return get_version_pipeline().retrieve(params)