blob: fd248517721d0ab1752de7526a51830036cab33b [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2018 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import argparse
import io
import json
import logging
import os
import subprocess
import sys
import tempfile
import time
# Did some testing, and it looks like the swarming server starts 400-ing when
# you request a URL which has about 320 task ids in it. Stay a bit under that to
# be safe.
TASK_BATCH_SIZE = 300
class TasksToCollect(object):
@classmethod
def read_from_file(cls, filename):
with open(filename) as f:
input_data = json.load(f)
return TasksToCollect(input_data)
def __init__(self, input_data):
self.task_sets = input_data
self.finished_tasks = set()
@property
def unfinished_tasks(self):
"""Which tasks are unfinished.
Returns a flat list of task ids."""
tasks = []
for sublist in self.task_sets:
for item in sublist:
if item not in self.finished_tasks:
tasks.append(item)
return sorted(tasks)
@property
def finished_task_sets(self):
"""Which task sets are ready to be collected."""
finished = []
for task_ids in self.task_sets:
if task_ids and all(task in self.finished_tasks for task in task_ids):
finished.append(task_ids)
return finished
@property
def task_batches(self):
tasks = self.unfinished_tasks
return [
tasks[idx:idx+TASK_BATCH_SIZE]
for idx in xrange(0, len(tasks), TASK_BATCH_SIZE)
]
def swarming_query_url(self, task_batch):
"""The swarming URL needed to collect task states."""
return 'tasks/get_states?' + '&'.join(
'task_id=%s' % task for task in task_batch)
def process_result(self, result, num_tasks):
"""Handles the result of getting swarming task task_sets."""
assert len(result['states']) == num_tasks
for task_id, state in zip(self.unfinished_tasks, result['states']):
if state not in ('PENDING', 'RUNNING'):
self.finished_tasks.add(task_id)
def main(argv):
parser = argparse.ArgumentParser()
parser.add_argument('--swarming-server', required=True)
parser.add_argument('--swarming-py-path', required=True)
parser.add_argument('--auth-service-account-json')
parser.add_argument('--verbose', action='store_true')
parser.add_argument('--output-json', required=True,
help='Where to output information about the results of '
'running this script. Will have two keys: \'attempts\', '
'which is the number of times we polled the swarming '
'server, and \'sets\', which is a list of finished '
'swarming task sets.')
parser.add_argument('--attempts', default=0, type=int,
help='Number of times this script has tried to get'
' results from the swarming server. Used to keep state'
' across runs to not reset the exponential backoff.')
parser.add_argument('--input-json', required=True,
help='List of sets of tasks. Each set of tasks is assumed'
' to all be shards of the same root task.')
args = parser.parse_args(argv[1:])
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.ERROR)
tasks = TasksToCollect.read_from_file(args.input_json)
retcode, output_json = real_main(
tasks, args.attempts, args.swarming_py_path, args.swarming_server,
args.auth_service_account_json)
if output_json:
with open(args.output_json, 'w') as f:
json.dump(output_json, f)
return retcode
def real_main(tasks, attempts, swarming_py_path, swarming_server,
auth_service_account_json):
fd, tmpfile = tempfile.mkstemp()
os.close(fd)
try:
while True:
for task_batch in tasks.task_batches:
url = tasks.swarming_query_url(task_batch)
cmd = [
sys.executable,
swarming_py_path,
'query',
'-S', swarming_server,
'--json=%s' % tmpfile
]
if auth_service_account_json:
cmd.extend([
'--auth-service-account-json', auth_service_account_json])
cmd.append(url)
logging.info('get_states cmd: %s', ' '.join(cmd))
get_states_result = subprocess.call(cmd)
if get_states_result != 0:
logging.warn(
'get_states cmd had non-zero return code: %s', get_states_result)
return 1, None
with open(tmpfile) as f:
tasks.process_result(json.load(f), len(task_batch))
if tasks.finished_task_sets:
break
# Do exponential backoff.
attempts += 1
time_to_sleep_sec = 2 ** attempts
# Cap the sleep time at 2 minutes. Waiting longer than that could start to
# impact the actual cycle time of the builder; if we wait for 16 minutes,
# and (potentially) the final task finished one minute into that sleep,
# we'd waste 15 minutes of time just sitting there. Ideally this would be
# interrupt driven.
time_to_sleep_sec = min(time_to_sleep_sec, 2 * 60)
logging.info('sleeping for %d seconds' % time_to_sleep_sec)
time.sleep(time_to_sleep_sec)
finally:
if os.path.exists(tmpfile):
os.unlink(tmpfile)
return 0, {
'sets': tasks.finished_task_sets,
'attempts': attempts,
}
if __name__ == '__main__':
sys.exit(main(sys.argv))