blob: ef0e4b47bbad2ad98bf8389fe3765c30e3113c67 [file] [log] [blame]
#!/usr/bin/env python
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Issues sharded slavekill, delete build directory, and reboot commands."""
import multiprocessing
import optparse
import os
import subprocess
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from common import chromium_utils
from master import slaves_list
def get_masters(parser, options):
"""Given parser options, find suitable master directories."""
paths = []
masters_path = chromium_utils.ListMasters()
# Populates by defaults with every masters with a twistd.pid, thus has
# been started.
if not options.master:
for m_p in masters_path:
if os.path.isfile(os.path.join(m_p, 'twistd.pid')):
paths.append(m_p)
elif options.master == 'all':
paths.extend(masters_path)
elif options.master in (os.path.basename(p) for p in masters_path):
full_master = next(
p for p in masters_path if os.path.basename(p) == options.master)
paths.append(full_master)
else:
parser.error('Unknown master \'%s\'.\nChoices are:\n %s' % (
options.master, '\n '.join((
os.path.basename(p) for p in masters_path))))
return paths
def get_slaves(master_paths, slavelist):
"""Return slaves split up by OS.
Takes a list of master paths and an optional slave whitelist."""
slavedict = {}
for path in master_paths:
for slave in chromium_utils.GetSlavesFromMasterPath(path):
if 'hostname' in slave:
slavedict[slave['hostname']] = slave
slaves = slaves_list.BaseSlavesList(slavedict.values())
def F(os_type):
out = slaves.GetSlaves(os=os_type)
named_slaves = [s.get('hostname') for s in out]
if slavelist:
return [s for s in named_slaves if s in slavelist]
else:
return named_slaves
slave_dict = {}
slave_dict['win'] = list(set(F('win')))
slave_dict['linux'] = list(set(F('linux')))
slave_dict['mac'] = list(set(F('mac')))
return slave_dict
def get_commands(slaves):
"""Depending on OS, yield the proper nuke-and-pave command sequence."""
commands = {}
for slave in slaves['win']:
def cmd(command):
return 'cmd.exe /c "%s"' % command
def cygwin(command):
return 'c:\\cygwin\\bin\\bash --login -c "%s"' % (
command.replace('"', '\\"'))
commands[slave] = [
cmd('taskkill /IM python.exe /F'),
cygwin('sleep 3'),
cygwin('rm -r -f /cygdrive/e/b/build/slave/*/build'),
cmd('shutdown -r -f -t 1'),
]
for slave in slaves['mac'] + slaves['linux']:
commands[slave] = [
'make -C /b/build/slave stop',
'sleep 3',
'rm -rf /b/build/slave/*/build',
'sudo shutdown -r now',
]
return commands
def status_writer(queue):
# Send None to kill the status writer.
msg = queue.get()
while msg:
print '\n'.join(msg)
msg = queue.get()
def stdout_writer(queue):
# Send None to kill the stdout writer.
slave = queue.get()
while slave:
print '%s: finished' % slave
slave = queue.get()
def journal_writer(filename, queue):
# Send None to kill the journal writer.
with open(filename, 'a') as f:
slave = queue.get()
while slave:
# pylint: disable=C0323
print >>f, slave
slave = queue.get()
def shard_slaves(slaves, max_per_shard):
"""Shart slaves with no more than max_per_shard in each shard."""
shards = []
for i in xrange(0, len(slaves), max_per_shard):
shards.append(list(slaves.iteritems())[i:i+max_per_shard])
return shards
def run_ssh_command(slavepair, worklog, status, errorlog, options):
"""Execute an ssh command as chrome-bot."""
slave, commands = slavepair
needs_connect = slave.endswith('-c4')
if options.corp:
slave = slave + '.chrome'
if needs_connect:
ssh = ['connect', slave, '-r']
else:
identity = ['chrome-bot@%s' % slave]
ssh = ['ssh', '-o ConnectTimeout=5'] + identity
if options.dry_run:
for command in commands:
status.put(['%s: %s' % (slave, command)])
return
retcode = 0
for command in commands:
status.put(['%s: %s' % (slave, command)])
retcode = subprocess.call(ssh + [command])
if options.verbose:
status.put(['%s: previous command returned code %d' % (slave, retcode)])
if retcode != 0 and command != commands[0]: # Don't fail on slavekill.
break
if retcode == 0:
worklog.put(slave)
else:
errorlog.put(slave)
class Worker(object):
def __init__(self, out_queue, status, errorlog, options):
self.out_queue = out_queue
self.status = status
self.options = options
self.errorlog = errorlog
def __call__(self, slave):
run_ssh_command(slave, self.out_queue, self.status, self.errorlog,
self.options)
def main():
usage = '%prog [options]'
parser = optparse.OptionParser(usage=usage)
parser.add_option('--master',
help=('Master to use to load the slaves list. If omitted, all masters '
'that were started at least once are included. If \'all\', all '
'masters are selected.'))
parser.add_option('--slavelist',
help=('List of slaves to contact, separated by newlines.'))
parser.add_option('--max-per-shard', default=50,
help=('Each shard has no more than max-per-shard slaves.'))
parser.add_option('--max-connections', default=16,
help=('Maximum concurrent SSH sessions.'))
parser.add_option('--journal',
help=('Log completed slaves to a journal file, skipping them'
'on the next run.'))
parser.add_option('--errorlog',
help='Log failed slaves to a file instead out stdout.')
parser.add_option('--dry-run', action='store_true',
help='Don\'t execute commands, only print them.')
parser.add_option('--corp', action='store_true',
help='Connect to bots within the corp network.')
parser.add_option('-v', '--verbose', action='store_true')
options, _ = parser.parse_args(sys.argv)
masters = get_masters(parser, options)
if options.verbose:
print 'reading from:'
for master in masters:
print ' ', master
slavelist = []
if options.slavelist:
with open(options.slavelist) as f:
slavelist = [s.strip() for s in f.readlines()]
slaves = get_slaves(masters, slavelist)
if options.verbose and options.slavelist:
wanted_slaves = set(slavelist)
got_slaves = set()
for _, s in slaves.iteritems():
got_slaves.update(s)
diff = wanted_slaves - got_slaves
if diff:
print 'Following slaves are not on selected masters:'
for s in diff:
print ' ', s
if options.journal and os.path.exists(options.journal):
skipped = set()
with open(options.journal) as f:
finished_slaves = set([s.strip() for s in f.readlines()])
for os_type in slaves:
skipped.update(set(slaves[os_type]) & finished_slaves)
slaves[os_type] = list(set(slaves[os_type]) - finished_slaves)
if options.verbose:
print 'Following slaves have already been processed:'
for s in skipped:
print ' ', s
commands = get_commands(slaves)
shards = shard_slaves(commands, options.max_per_shard)
pool = multiprocessing.Pool(processes=options.max_connections)
m = multiprocessing.Manager()
worklog = m.Queue()
status = m.Queue()
errors = m.Queue()
# Set up the worklog and status writers.
if options.journal:
p = multiprocessing.Process(target=journal_writer,
args=(options.journal, worklog))
else:
p = multiprocessing.Process(target=stdout_writer, args=(worklog,))
s = multiprocessing.Process(target=status_writer, args=(status,))
p.start()
s.start()
# Execute commands.
for shard in shards:
if options.verbose:
print 'Starting next shard with slaves:'
for slave in shard:
print ' ', slave
pool.map_async(Worker(worklog, status, errors, options), shard).get(9999999)
raw_input('Shard finished, press enter to continue...')
# Clean up the worklog and status writers.
worklog.put(None) # Signal worklog writer to stop.
status.put(None) # Signal status writer to stop.
p.join()
s.join()
# Print out errors.
error_list = []
errors.put(None) # Signal end of error list.
e = errors.get()
while e:
error_list.append(e)
e = errors.get()
if error_list:
if options.errorlog:
with open(options.errorlog, 'w') as f:
for error in error_list:
# pylint: disable=C0323
print >>f, error
else:
print 'Following slaves had errors:'
for error in error_list:
print ' ', error
return 0
if __name__ == '__main__':
sys.exit(main())