blob: 6b3a369b8842ec9146885d225cb8338ab5c3d5d7 [file] [log] [blame]
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import argparse
import json
import logging
import os
import random
import sys
import time
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials
# NOTE: The parent directory needs to be first in sys.path to avoid conflicts
# with catapult modules that have colliding names, as catapult inserts itself
# into the path as the second element. This is an ugly and fragile hack.
_CLOUD_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)),
os.pardir)
sys.path.insert(0, os.path.join(_CLOUD_DIR, os.pardir))
# Add _CLOUD_DIR to the path to access common code through the same path as the
# frontend.
sys.path.append(_CLOUD_DIR)
from common.clovis_task import ClovisTask
import common.google_bigquery_helper
from common.google_instance_helper import GoogleInstanceHelper
from clovis_task_handler import ClovisTaskHandler
from failure_database import FailureDatabase
from google_storage_accessor import GoogleStorageAccessor
class Worker(object):
def __init__(self, config, logger):
"""See README.md for the config format."""
self._project_name = config['project_name']
self._taskqueue_tag = config['taskqueue_tag']
self._src_path = config['src_path']
self._instance_name = config.get('instance_name')
self._worker_log_path = config.get('worker_log_path')
self._credentials = GoogleCredentials.get_application_default()
self._logger = logger
self._self_destruct = config.get('self_destruct')
if self._self_destruct and not self._instance_name:
self._logger.error('Self destruction requires an instance name.')
# Separate the task storage path into the bucket and the base path under
# the bucket.
storage_path_components = config['task_storage_path'].split('/')
self._bucket_name = storage_path_components[0]
self._base_path_in_bucket = ''
if len(storage_path_components) > 1:
self._base_path_in_bucket = '/'.join(storage_path_components[1:])
if not self._base_path_in_bucket.endswith('/'):
self._base_path_in_bucket += '/'
self._google_storage_accessor = GoogleStorageAccessor(
credentials=self._credentials, project_name=self._project_name,
bucket_name=self._bucket_name)
if self._instance_name:
failure_database_filename = \
'failure_database_%s.json' % self._instance_name
else:
failure_database_filename = 'failure_dabatase.json'
self._failure_database_path = os.path.join(self._base_path_in_bucket,
failure_database_filename)
# Recover any existing failures in case the worker died.
self._failure_database = self._GetFailureDatabase()
if self._failure_database.ToJsonDict():
# Script is restarting after a crash, or there are already files from a
# previous run in the directory.
self._failure_database.AddFailure(FailureDatabase.DIRTY_STATE_ERROR,
'failure_database')
bigquery_service = common.google_bigquery_helper.GetBigQueryService(
self._credentials)
self._clovis_task_handler = ClovisTaskHandler(
self._project_name, self._base_path_in_bucket, self._failure_database,
self._google_storage_accessor, bigquery_service,
config['binaries_path'], config['ad_rules_filename'],
config['tracking_rules_filename'], self._logger, self._instance_name)
self._UploadFailureDatabase()
def Start(self):
"""Main worker loop.
Repeatedly pulls tasks from the task queue and processes them. Returns when
the queue is empty.
"""
task_api = discovery.build('taskqueue', 'v1beta2',
credentials=self._credentials)
queue_name = 'clovis-queue'
# Workaround for
# https://code.google.com/p/googleappengine/issues/detail?id=10199
project = 's~' + self._project_name
while True:
self._logger.debug('Fetching new task.')
(clovis_task, task_id) = self._FetchClovisTask(project, task_api,
queue_name)
if not clovis_task:
break
self._logger.info('Processing task %s' % task_id)
self._clovis_task_handler.Run(clovis_task)
self._UploadFailureDatabase()
self._logger.debug('Deleting task %s' % task_id)
task_api.tasks().delete(project=project, taskqueue=queue_name,
task=task_id).execute()
self._logger.info('Finished task %s' % task_id)
self._Finalize()
def _GetFailureDatabase(self):
"""Downloads the failure database from CloudStorage."""
self._logger.info('Downloading failure database')
failure_database_string = self._google_storage_accessor.DownloadAsString(
self._failure_database_path)
return FailureDatabase(failure_database_string)
def _UploadFailureDatabase(self):
"""Uploads the failure database to CloudStorage."""
if not self._failure_database.is_dirty:
return
self._logger.info('Uploading failure database')
self._google_storage_accessor.UploadString(
self._failure_database.ToJsonString(),
self._failure_database_path)
self._failure_database.is_dirty = False
def _FetchClovisTask(self, project_name, task_api, queue_name):
"""Fetches a ClovisTask from the task queue.
Params:
project_name(str): The name of the Google Cloud project.
task_api: The TaskQueue service.
queue_name(str): The name of the task queue.
Returns:
(ClovisTask, str): The fetched ClovisTask and its task ID, or (None, None)
if no tasks are found.
"""
response = task_api.tasks().lease(
project=project_name, taskqueue=queue_name, numTasks=1, leaseSecs=600,
groupByTag=True, tag=self._taskqueue_tag).execute()
if (not response.get('items')) or (len(response['items']) < 1):
return (None, None) # The task queue is empty.
google_task = response['items'][0]
task_id = google_task['id']
# Delete the task without processing if it already failed multiple times.
# TODO(droger): This is a workaround for internal bug b/28442122, revisit
# once it is fixed.
retry_count = google_task['retry_count']
max_retry_count = 3
skip_task = retry_count >= max_retry_count
if skip_task:
task_api.tasks().delete(project=project_name, taskqueue=queue_name,
task=task_id).execute()
clovis_task = ClovisTask.FromBase64(google_task['payloadBase64'])
if retry_count > 0:
self._failure_database.AddFailure('task_queue_retry',
clovis_task.ToJsonString())
self._UploadFailureDatabase()
if skip_task:
return self._FetchClovisTask(project_name, task_api, queue_name)
return (clovis_task, task_id)
def _Finalize(self):
"""Called before exiting."""
self._logger.info('Done')
self._clovis_task_handler.Finalize()
# Upload the worker log.
if self._worker_log_path:
self._logger.info('Uploading worker log.')
remote_log_path = os.path.join(self._base_path_in_bucket, 'worker_log')
if self._instance_name:
remote_log_path += '_' + self._instance_name
self._google_storage_accessor.UploadFile(self._worker_log_path,
remote_log_path)
# Self destruct.
if self._self_destruct:
# Workaround for ComputeEngine internal bug b/28760288.
random_delay = random.random() * 600.0 # Up to 10 minutes.
self._logger.info(
'Wait %.0fs to avoid load spikes on compute engine.' % random_delay)
time.sleep(random_delay)
self._logger.info('Starting instance destruction: ' + self._instance_name)
google_instance_helper = GoogleInstanceHelper(
self._credentials, self._project_name, self._logger)
success = google_instance_helper.DeleteInstance(self._taskqueue_tag,
self._instance_name)
if not success:
self._logger.error('Self destruction failed.')
# Do not add anything after this line, as the instance might be killed at
# any time.
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='ComputeEngine Worker for Clovis')
parser.add_argument('--config', required=True,
help='Path to the configuration file.')
args = parser.parse_args()
# Configure logging.
logging.basicConfig(level=logging.WARNING,
format='[%(asctime)s][%(levelname)s] %(message)s',
datefmt='%y-%m-%d %H:%M:%S')
logging.Formatter.converter = time.gmtime
worker_logger = logging.getLogger('worker')
worker_logger.setLevel(logging.INFO)
worker_logger.info('Reading configuration')
with open(args.config) as config_json:
worker = Worker(json.load(config_json), worker_logger)
worker.Start()