| # Copyright 2016 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """This module contains util functions that local scripts can use.""" |
| from __future__ import print_function |
| from __future__ import absolute_import |
| |
| import atexit |
| from datetime import datetime |
| import functools |
| import logging |
| import os |
| import pickle |
| import re |
| import subprocess |
| import sys |
| import threading |
| import time |
| import traceback |
| |
| MAX_THREAD_NUMBER = 30 |
| TASK_QUEUE = None |
| GIT_HASH_PATTERN = re.compile(r'^[0-9a-fA-F]{40}$') |
| |
| |
| def SetAppEnginePaths(root_dir=None): |
| """Inserts appengine sdk paths of infra to system paths.""" |
| # If no root directory is provided, default to findit root directory. |
| root_dir = root_dir or os.path.realpath( |
| os.path.join(os.path.dirname(__file__), os.path.pardir)) |
| |
| appengine_sdk_dir = os.path.realpath( |
| os.path.join(root_dir, os.path.pardir, os.path.pardir, os.path.pardir, |
| 'gcloud', 'platform', 'google_appengine')) |
| |
| sys.path.insert(1, appengine_sdk_dir) |
| |
| import dev_appserver |
| dev_appserver.fix_sys_path() |
| |
| |
| def SetUpSystemPaths(root_dir=None): # pragma: no cover |
| """Inserts root path, first_party, third_party and appengine sdk paths.""" |
| # If no root directory is provided, default to findit root directory. |
| root_dir = root_dir or os.path.realpath( |
| os.path.join(os.path.dirname(__file__), os.path.pardir)) |
| SetAppEnginePaths(root_dir) |
| |
| first_party_dir = os.path.join(root_dir, 'first_party') |
| third_party_dir = os.path.join(root_dir, 'third_party') |
| |
| # Add root dir to sys.path so that modules of the corresponding project is |
| # available. |
| if not root_dir in sys.path: |
| sys.path.insert(1, root_dir) |
| # Add first party and third party libraries to sys.path. |
| if not third_party_dir in sys.path: |
| sys.path.insert(1, third_party_dir) |
| if not first_party_dir in sys.path: |
| sys.path.insert(1, first_party_dir) |
| |
| import google |
| # protobuf and GAE have package name conflict on 'google'. |
| # Add this to solve the conflict. |
| google.__path__.insert(0, os.path.join(third_party_dir, 'google')) |
| |
| |
| def SignalWorkerThreads(): # pragma: no cover |
| """Puts signal worker threads into task queue.""" |
| global TASK_QUEUE # pylint: disable=W0602 |
| if not TASK_QUEUE: |
| return |
| |
| for _ in range(MAX_THREAD_NUMBER): |
| TASK_QUEUE.put(None) |
| |
| # Give worker threads a chance to exit. |
| # Workaround the harmless bug in python 2.7 below. |
| time.sleep(1) |
| |
| |
| atexit.register(SignalWorkerThreads) |
| |
| |
| def Worker(): # pragma: no cover |
| global TASK_QUEUE # pylint: disable=W0602 |
| while True: |
| try: |
| task = TASK_QUEUE.get() |
| if not task: |
| return |
| except TypeError: |
| # According to http://bugs.python.org/issue14623, this is a harmless bug |
| # in python 2.7 which won't be fixed. |
| # The exception is raised on daemon threads when python interpreter is |
| # shutting down. |
| return |
| |
| function, args, kwargs, result_semaphore = task |
| try: |
| function(*args, **kwargs) |
| except Exception: |
| print('Caught exception in thread.') |
| print(traceback.format_exc()) |
| # Continue to process tasks in queue, in case every thread fails, the |
| # main thread will be waiting forever. |
| continue |
| finally: |
| # Signal one task is done in case of exception. |
| result_semaphore.release() |
| |
| |
| def RunTasks(tasks): # pragma: no cover |
| """Run given tasks. Not thread-safe: no concurrent calls of this function. |
| |
| Return after all tasks were completed. A task is a dict as below: |
| { |
| 'function': the function to call, |
| 'args': the positional argument to pass to the function, |
| 'kwargs': the key-value arguments to pass to the function, |
| } |
| """ |
| if not tasks: |
| return |
| |
| global TASK_QUEUE |
| if not TASK_QUEUE: |
| TASK_QUEUE = queue.Queue() |
| for index in range(MAX_THREAD_NUMBER): |
| thread = threading.Thread(target=Worker, name='worker_%s' % index) |
| # Set as daemon, so no join is needed. |
| thread.daemon = True |
| thread.start() |
| |
| result_semaphore = threading.Semaphore(0) |
| # Push task to task queue for execution. |
| for task in tasks: |
| TASK_QUEUE.put((task['function'], task.get('args', []), |
| task.get('kwargs', {}), result_semaphore)) |
| |
| # Wait until all tasks to be executed. |
| for _ in tasks: |
| result_semaphore.acquire() |
| |
| |
| def GetCommandOutput(command): |
| SetUpSystemPaths() |
| |
| # The lib is in predator/ root dir, and can be imported only when sys.path |
| # gets set up. |
| from libs.cache_decorator import Cached |
| from .local_cache import LocalCache # pylint: disable=W |
| |
| @Cached(LocalCache(), namespace='Command-output') |
| def CachedGetCommandOutput(command): |
| p = subprocess.Popen( |
| command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) |
| stdoutdata, stderrdata = p.communicate() |
| |
| if p.returncode != 0: |
| raise Exception('Error running command %s: %s' % (command, stderrdata)) |
| |
| return stdoutdata |
| |
| return CachedGetCommandOutput(command) |
| |
| |
| def GetLockedMethod(cls, method_name, lock): # pragma: no cover |
| """Returns a class/object method serialized with lock.""" |
| method = getattr(cls, method_name) |
| |
| def LockedMethod(cls, *args, **kwargs): # pylint: disable=W |
| with lock: |
| return method(*args, **kwargs) |
| |
| return functools.partial(LockedMethod, cls) |
| |
| |
| # TODO(katesonia): Move this to gae_libs. |
| # TODO(crbug.com/662540): Add unittests. |
| def GetFilterQuery(query, |
| time_property, |
| start_date, |
| end_date, |
| property_values=None, |
| datetime_pattern='%Y-%m-%d'): # pragma: no cover. |
| """Gets query with filters. |
| |
| There are 2 kinds for filters: |
| (1) The time range filter defined by ``time_property``, ``start_date`` and |
| ``end_date``. Note, the format of ``start_date`` and ``end_date`` should be |
| consistent with ``datetime_pattern``. |
| (2) The values of properties set by ``property_values``. |
| """ |
| start_date = datetime.strptime(start_date, datetime_pattern) |
| end_date = datetime.strptime(end_date, datetime_pattern) |
| |
| if property_values: |
| for cls_property, value in property_values.items(): |
| if isinstance(value, list): |
| query = query.filter(cls_property.IN(value)) |
| else: |
| query = query.filter(cls_property == value) |
| |
| return query.filter(time_property >= start_date).filter( |
| time_property < end_date) |
| |
| |
| # TODO(crbug.com/662540): Add unittests. |
| def EnsureDirExists(path): # pragma: no cover |
| directory = os.path.dirname(path) |
| # TODO: this has a race condition. Should ``try: os.makedirs`` instead, |
| # discarding the error and returning if the directory already exists. |
| if os.path.exists(directory): |
| return |
| |
| os.makedirs(directory) |
| |
| |
| # TODO(crbug.com/662540): Add unittests. |
| def FlushResult(result, result_path, serializer=pickle, |
| print_path=False): # pragma: no cover |
| if print_path: |
| print('\nFlushing results to', result_path) |
| |
| EnsureDirExists(result_path) |
| with open(result_path, 'wb') as f: |
| serializer.dump(result, f) |
| |
| |
| # TODO(crbug.com/662540): Add unittests. |
| def IsGitHash(revision): # pragma: no cover |
| return GIT_HASH_PATTERN.match(str(revision)) or revision.lower() == 'master' |
| |
| |
| # TODO(crbug.com/662540): Add unittests. |
| def ParseGitHash(revision, repo_path='.'): # pragma: no cover |
| """Gets git hash of a revision.""" |
| if IsGitHash(revision): |
| return revision |
| |
| try: |
| # Can parse revision like 'HEAD', 'HEAD~3'. |
| return subprocess.check_output( |
| 'cd %s; git rev-parse %s' % (repo_path, revision), shell=True).replace( |
| '\n', '') |
| except: # pylint: disable=W |
| logging.error('Failed to parse git hash for %s\nStacktrace:\n%s', revision, |
| traceback.format_exc()) |
| return None |