| #!/usr/bin/env python3 |
| # Copyright 2024 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """This scripts copies DEPS package information from one source onto |
| destination. |
| |
| If the destination doesn't have packages, the script errors out. |
| |
| Example usage: |
| roll_downstream_gcs_deps.py \ |
| --source some/repo/DEPS \ |
| --destination some/downstream/repo/DEPS \ |
| --package src/build/linux/debian_bullseye_amd64-sysroot \ |
| --package src/build/linux/debian_bullseye_arm64-sysroot |
| |
| """ |
| |
| import argparse |
| import ast |
| import sys |
| from typing import Dict, List |
| |
| |
| def _get_deps(deps_ast: ast.Module) -> Dict[str, ast.Dict]: |
| """Searches for the deps dict in a DEPS file AST. |
| |
| Args: |
| deps_ast: AST of the DEPS file. |
| |
| Raises: |
| Exception: If the deps dict is not found. |
| |
| Returns: |
| The deps dict. |
| """ |
| for statement in deps_ast.body: |
| if not isinstance(statement, ast.Assign): |
| continue |
| if len(statement.targets) != 1: |
| continue |
| target = statement.targets[0] |
| if not isinstance(target, ast.Name): |
| continue |
| if target.id != 'deps': |
| continue |
| if not isinstance(statement.value, ast.Dict): |
| continue |
| deps = {} |
| for key, value in zip(statement.value.keys, statement.value.values): |
| if not isinstance(key, ast.Constant): |
| continue |
| deps[key.value] = value |
| return deps |
| raise Exception('no deps found') |
| |
| |
| def _get_gcs_object_list_ast(package_ast: ast.Dict) -> ast.List: |
| """Searches for the objects list in a GCS package AST. |
| |
| Args: |
| package_ast: AST of the GCS package. |
| |
| Raises: |
| Exception: If the package is not a GCS package. |
| |
| Returns: |
| AST of the objects list. |
| """ |
| is_gcs = False |
| result = None |
| for key, value in zip(package_ast.keys, package_ast.values): |
| if not isinstance(key, ast.Constant): |
| continue |
| if key.value == 'dep_type' and isinstance( |
| value, ast.Constant) and value.value == 'gcs': |
| is_gcs = True |
| if key.value == 'objects' and isinstance(value, ast.List): |
| result = value |
| |
| assert is_gcs, 'Not a GCS dependency!' |
| assert result, 'No objects found!' |
| return result |
| |
| |
| def _replace_ast(destination: str, dest_ast: ast.Module, source: str, |
| source_ast: ast.Module) -> str: |
| """Replaces the content of dest_ast with the content of the |
| same package in source_ast. |
| |
| Args: |
| destination: Destination DEPS file content. |
| dest_ast: AST in the destination DEPS file that will be replaced. |
| source: Source DEPS file content. |
| source_ast: AST in the source DEPS file that will replace content of |
| destination. |
| |
| Returns: |
| Content of destination DEPS file with replaced content. |
| """ |
| source_lines = source.splitlines() |
| lines = destination.splitlines() |
| # Copy all lines before the replaced AST. |
| result = '\n'.join(lines[:dest_ast.lineno - 1]) + '\n' |
| |
| # Partially copy the line content before AST's value. |
| result += lines[dest_ast.lineno - 1][:dest_ast.col_offset] |
| |
| # Copy data from source AST. |
| if source_ast.lineno == source_ast.end_lineno: |
| # Starts and ends on the same line. |
| result += source_lines[ |
| source_ast.lineno - |
| 1][source_ast.col_offset:source_ast.end_col_offset] |
| else: |
| # Copy multiline content from source. The first line and the last line |
| # of source AST should be partially copied as `result` has a partial |
| # line from `destination`. |
| |
| # Partially copy the first line of source AST. |
| result += source_lines[source_ast.lineno - |
| 1][source_ast.col_offset:] + '\n' |
| # Copy content in the middle. |
| result += '\n'.join( |
| source_lines[source_ast.lineno:source_ast.end_lineno - 1]) + '\n' |
| # Partially copy the last line of source AST. |
| result += source_lines[source_ast.end_lineno - |
| 1][:source_ast.end_col_offset] |
| |
| # Copy the rest of the line after the package value. |
| result += lines[dest_ast.end_lineno - 1][dest_ast.end_col_offset:] + '\n' |
| |
| # Copy the rest of the lines after the package value. |
| result += '\n'.join(lines[dest_ast.end_lineno:]) |
| # Add trailing newline |
| if destination.endswith('\n'): |
| result += '\n' |
| return result |
| |
| |
| def copy_packages(source_content: str, destination_content: str, |
| source_packages: List[str], |
| destination_packages: List[str]) -> str: |
| """Copies GCS packages from source to destination. |
| |
| Args: |
| source: Source DEPS file content. |
| destination: Destination DEPS file content. |
| packages: List of GCS packages to copy. Only objects are copied. |
| |
| Returns: |
| Destination DEPS file content with packages copied. |
| """ |
| source_deps = _get_deps(ast.parse(source_content, mode='exec')) |
| for i in range(len(source_packages)): |
| source_package = source_packages[i] |
| destination_package = destination_packages[i] |
| if source_package not in source_deps: |
| raise Exception('Package %s not found in source' % source_package) |
| dest_deps = _get_deps(ast.parse(destination_content, mode='exec')) |
| if destination_package not in dest_deps: |
| raise Exception('Package %s not found in destination' % |
| destination_package) |
| destination_content = _replace_ast( |
| destination_content, |
| _get_gcs_object_list_ast(dest_deps[destination_package]), |
| source_content, |
| _get_gcs_object_list_ast(source_deps[source_package])) |
| |
| return destination_content |
| |
| |
| def main(): |
| parser = argparse.ArgumentParser(description=__doc__) |
| parser.add_argument('--source-deps', |
| required=True, |
| help='Source DEPS file where content will be copied ' |
| 'from') |
| parser.add_argument('--source-package', |
| action='append', |
| required=True, |
| help='List of DEPS packages to update') |
| parser.add_argument('--destination-deps', |
| required=True, |
| help='Destination DEPS file, where content will be ' |
| 'saved') |
| parser.add_argument('--destination-package', |
| action='append', |
| required=True, |
| help='List of DEPS packages to update') |
| args = parser.parse_args() |
| |
| if not args.source_package: |
| parser.error('No source packages specified to roll, aborting...') |
| |
| if not args.destination_package: |
| parser.error('No destination packages specified to roll, aborting...') |
| |
| if len(args.destination_package) != len(args.source_package): |
| parser.error('Source and destination packages must be of the same ' |
| 'length, aborting...') |
| |
| with open(args.source_deps) as f: |
| source_content = f.read() |
| |
| with open(args.destination_deps) as f: |
| destination_content = f.read() |
| |
| new_content = copy_packages(source_content, destination_content, |
| args.source_package, args.destination_package) |
| |
| with open(args.destination_deps, 'w') as f: |
| f.write(new_content) |
| |
| print('Run:') |
| print(' Destination DEPS file updated. You still need to create and ' |
| 'upload a change.') |
| return 0 |
| |
| |
| if __name__ == '__main__': |
| sys.exit(main()) |