blob: d1589944cb9d10ed7448d65610052da3a30b4eb4 [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2014 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""Automated maintenance tool to run a script on bots.
To use this script, write a self-contained python script (use a .zip if
necessary), specify it on the command line and it will be packaged and triggered
on all the swarming bots corresponding to the --dimension filters specified, or
all the bots if no filter is specified.
"""
__version__ = '0.2'
import json
import os
import shutil
import string
import subprocess
import sys
import tempfile
import threading
CLIENT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(
__file__.decode(sys.getfilesystemencoding()))))
sys.path.insert(0, CLIENT_DIR)
from utils import tools
tools.force_local_third_party()
# third_party/
import colorama
from chromium import natsort
from depot_tools import fix_encoding
# pylint: disable=ungrouped-imports
import parallel_execution
from utils import file_path
from utils import tools
def get_bot_list(swarming_server, dimensions):
"""Returns a list of swarming bots: health, quarantined, dead."""
q = '&'.join(
'dimensions=%s:%s' % (k, v) for k, v in sorted(dimensions.iteritems()))
cmd = [
sys.executable, 'swarming.py', 'query',
'--swarming', swarming_server,
'--limit', '0',
'bots/list?' + q,
]
healthy = []
quarantined = []
dead = []
results = json.loads(subprocess.check_output(cmd, cwd=CLIENT_DIR))
if not results.get('items'):
return (), (), ()
for b in results['items']:
if b['is_dead']:
dead.append(b['bot_id'])
elif b['quarantined']:
quarantined.append(b['bot_id'])
else:
healthy.append(b['bot_id'])
return natsort.natsorted(healthy), quarantined, dead
def archive(isolate_server, script):
"""Archives the tool and return the sha-1."""
base_script = os.path.basename(script)
isolate = {
'variables': {
'command': ['python', base_script],
'files': [base_script],
},
}
tempdir = tempfile.mkdtemp(prefix=u'run_on_bots')
try:
isolate_file = os.path.join(tempdir, 'tool.isolate')
isolated_file = os.path.join(tempdir, 'tool.isolated')
with open(isolate_file, 'wb') as f:
f.write(str(isolate))
shutil.copyfile(script, os.path.join(tempdir, base_script))
cmd = [
sys.executable, 'isolate.py', 'archive',
'--isolate-server', isolate_server,
'-i', isolate_file,
'-s', isolated_file,
]
return subprocess.check_output(cmd, cwd=CLIENT_DIR).split()[0]
finally:
file_path.rmtree(tempdir)
def batched_subprocess(cmd, sem):
def run(cmd, sem):
subprocess.call(cmd, cwd=CLIENT_DIR)
sem.release()
sem.acquire()
thread = threading.Thread(target=run, args=(cmd, sem))
thread.start()
return thread
def run_batches(
swarming_server, isolate_server, dimensions, tags, env, priority, deadline,
batches, repeat, isolated_hash, name, bots, args):
"""Runs the task |batches| at a time.
This will be mainly bound by task scheduling latency, especially if the bots
are busy and the priority is low.
"""
sem = threading.Semaphore(batches)
threads = []
for i in xrange(repeat):
for bot in bots:
suffix = '/%d' % i if repeat > 1 else ''
task_name = parallel_execution.task_to_name(
name, {'id': bot}, isolated_hash) + suffix
cmd = [
sys.executable, 'swarming.py', 'run',
'--swarming', swarming_server,
'--isolate-server', isolate_server,
'--priority', priority,
'--deadline', deadline,
'--dimension', 'id', bot,
'--task-name', task_name,
'-s', isolated_hash,
]
for k, v in sorted(dimensions.iteritems()):
cmd.extend(('-d', k, v))
for t in sorted(tags):
cmd.extend(('--tags', t))
for k, v in env:
cmd.extend(('--env', k, v))
if args:
cmd.append('--')
cmd.extend(args)
threads.append(batched_subprocess(cmd, sem))
for t in threads:
t.join()
def run_serial(
swarming_server, isolate_server, dimensions, tags, env, priority, deadline,
repeat, isolated_hash, name, bots, args):
"""Runs the task one at a time.
This will be mainly bound by task scheduling latency, especially if the bots
are busy and the priority is low.
"""
result = 0
for i in xrange(repeat):
for bot in bots:
suffix = '/%d' % i if repeat > 1 else ''
task_name = parallel_execution.task_to_name(
name, {'id': bot}, isolated_hash) + suffix
cmd = [
sys.executable, 'swarming.py', 'run',
'--swarming', swarming_server,
'--isolate-server', isolate_server,
'--priority', priority,
'--deadline', deadline,
'--dimension', 'id', bot,
'--task-name', task_name,
'-s', isolated_hash,
]
for k, v in sorted(dimensions.iteritems()):
cmd.extend(('-d', k, v))
for t in sorted(tags):
cmd.extend(('--tags', t))
for k, v in env:
cmd.extend(('--env', k, v))
if args:
cmd.append('--')
cmd.extend(args)
r = subprocess.call(cmd, cwd=CLIENT_DIR)
result = max(r, result)
return result
def run_parallel(
swarming_server, isolate_server, dimensions, env, priority, deadline,
repeat, isolated_hash, name, bots, args):
tasks = []
for i in xrange(repeat):
suffix = '/%d' % i if repeat > 1 else ''
for bot in bots:
d = {'id': bot}
tname = parallel_execution.task_to_name(name, d, isolated_hash) + suffix
d.update(dimensions)
tasks.append((tname, isolated_hash, d, env))
extra_args = ['--priority', priority, '--deadline', deadline]
extra_args.extend(args)
print('Using priority %s' % priority)
for failed_task in parallel_execution.run_swarming_tasks_parallel(
swarming_server, isolate_server, extra_args, tasks):
_name, dimensions, stdout = failed_task
print('%sFailure: %s%s\n%s' % (
colorama.Fore.RED, dimensions, colorama.Fore.RESET, stdout))
def main():
parser = parallel_execution.OptionParser(
usage='%prog [options] (script.py|isolated hash) '
'-- [script.py arguments]',
version=__version__)
parser.add_option(
'--serial', action='store_true',
help='Runs the task serially, to be used when debugging problems since '
'it\'s slow')
parser.add_option(
'--batches', type='int', default=0,
help='Runs a task in parallel |batches| at a time.')
parser.add_option(
'--tags', action='append', default=[], metavar='FOO:BAR',
help='Tags to assign to the task.')
parser.add_option(
'--repeat', type='int', default=1,
help='Runs the task multiple time on each bot, meant to be used as a '
'load test')
parser.add_option(
'--name',
help='Name to use when providing an isolated hash')
options, args = parser.parse_args()
if len(args) < 1:
parser.error(
'Must pass one python script to run. Use --help for more details')
if not options.priority:
parser.error(
'Please provide the --priority option. Either use a very low number\n'
'so the task completes as fast as possible, or an high number so the\n'
'task only runs when the bot is idle.')
# 1. Archive the script to run.
if not os.path.exists(args[0]):
if not options.name:
parser.error(
'Please provide --name when using an isolated hash.')
if len(args[0]) not in (40, 64):
parser.error(
'Hash wrong length %d (%r)' % (len(args.hash), args[0]))
for i, c in enumerate(args[0]):
if c not in string.hexdigits:
parser.error(
'Hash character invalid\n'
' %s\n' % args[0] +
' '+'-'*i+'^\n'
)
isolated_hash = args[0]
name = options.name
else:
isolated_hash = archive(options.isolate_server, args[0])
name = os.path.basename(args[0])
print('Running %s' % isolated_hash)
# 2. Query the bots list.
bots, quarantined_bots, dead_bots = get_bot_list(
options.swarming, options.dimensions)
print('Found %d bots to process' % len(bots))
if quarantined_bots:
print('Warning: found %d quarantined bots' % len(quarantined_bots))
if dead_bots:
print('Warning: found %d dead bots' % len(dead_bots))
if not bots:
return 1
# 3. Trigger the tasks.
if options.batches > 0:
return run_batches(
options.swarming,
options.isolate_server,
options.dimensions,
options.tags,
options.env,
str(options.priority),
str(options.deadline),
options.batches,
options.repeat,
isolated_hash,
name,
bots,
args[1:])
if options.serial:
return run_serial(
options.swarming,
options.isolate_server,
options.dimensions,
options.tags,
options.env,
str(options.priority),
str(options.deadline),
options.repeat,
isolated_hash,
name,
bots,
args[1:])
return run_parallel(
options.swarming,
options.isolate_server,
options.dimensions,
options.env,
str(options.priority),
str(options.deadline),
options.repeat,
isolated_hash,
name,
bots,
args[1:])
if __name__ == '__main__':
fix_encoding.fix_encoding()
tools.disable_buffering()
colorama.init()
sys.exit(main())