blob: 40f4f9431d4bb7c3791c18945cdd679f0c00a508 [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2013 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""Triggers a ton of fake jobs to test its handling under high load.
Generates an histogram with the latencies to process the tasks and number of
retries.
"""
import hashlib
import json
import logging
import optparse
import os
import Queue
import socket
import StringIO
import sys
import threading
import time
import zipfile
CLIENT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(
__file__.decode(sys.getfilesystemencoding()))))
sys.path.insert(0, CLIENT_DIR)
from utils import tools
tools.force_local_third_party()
# third_party/
import colorama
# pylint: disable=ungrouped-imports
import swarming
from utils import graph
from utils import net
from utils import threading_utils
OS_NAME = 'Comodore64'
TASK_OUTPUT = 'This task ran with great success'
def print_results(results, columns, buckets):
delays = [i for i in results if isinstance(i, float)]
failures = [i for i in results if not isinstance(i, float)]
print('%sDELAYS%s:' % (colorama.Fore.RED, colorama.Fore.RESET))
graph.print_histogram(
graph.generate_histogram(delays, buckets), columns, ' %.3f')
print('')
print('Total items : %d' % len(results))
average = 0
if delays:
average = sum(delays)/ len(delays)
print('Average delay: %s' % graph.to_units(average))
print('')
if failures:
print('%sEVENTS%s:' % (colorama.Fore.RED, colorama.Fore.RESET))
values = {}
for f in failures:
values.setdefault(f, 0)
values[f] += 1
graph.print_histogram(values, columns, ' %s')
print('')
def generate_version(source):
"""Generates the sha-1 based on the content of this zip.
Copied from ../utils/zip_package.py.
"""
h = hashlib.sha1()
with zipfile.ZipFile(source, 'r') as z:
for name in sorted(z.namelist()):
with z.open(name) as f:
h.update(str(len(name)))
h.update(name)
content = f.read()
h.update(str(len(content)))
h.update(content)
return h.hexdigest()
def calculate_version(url):
"""Retrieves the swarm_bot code and returns the SHA-1 for it."""
# Cannot use url_open() since zipfile requires .seek().
return generate_version(StringIO.StringIO(net.url_read(url)))
def get_hostname():
return socket.getfqdn().lower().split('.', 1)[0]
class FakeSwarmBot(object):
"""This is a Fake swarm_bot implementation simulating it is running
Comodore64.
It polls for job, acts as if it was processing them and return the fake
result.
"""
def __init__(
self, swarming_url, dimensions, swarm_bot_version_hash, hostname, index,
progress, duration, events, kill_event):
self._lock = threading.Lock()
self._swarming = swarming_url
self._index = index
self._progress = progress
self._duration = duration
self._events = events
self._kill_event = kill_event
self._bot_id = '%s-%d' % (hostname, index)
self._attributes = {
'dimensions': dimensions,
'id': self._bot_id,
# TODO(maruel): Use os_utilities.py.
'ip': '127.0.0.1',
'try_count': 0,
'version': swarm_bot_version_hash,
}
self._thread = threading.Thread(target=self._run, name='bot%d' % index)
self._thread.daemon = True
self._thread.start()
def join(self):
self._thread.join()
def is_alive(self):
return self._thread.is_alive()
def _run(self):
"""Polls the server and fake execution."""
try:
self._progress.update_item('%d alive' % self._index, bots=1)
while True:
if self._kill_event.is_set():
return
data = {'attributes': json.dumps(self._attributes)}
request = net.url_read(self._swarming + '/poll_for_test', data=data)
if request is None:
self._events.put('poll_for_test_empty')
continue
start = time.time()
try:
manifest = json.loads(request)
except ValueError:
self._progress.update_item('Failed to poll')
self._events.put('poll_for_test_invalid')
continue
commands = [c['function'] for c in manifest.get('commands', [])]
if not commands:
# Nothing to run.
self._events.put('sleep')
time.sleep(manifest['come_back'])
continue
if commands == ['UpdateSlave']:
# Calculate the proper SHA-1 and loop again.
# This could happen if the Swarming server is upgraded while this
# script runs.
self._attributes['version'] = calculate_version(
manifest['commands'][0]['args'])
self._events.put('update_slave')
continue
if commands != ['RunManifest']:
self._progress.update_item(
'Unexpected RPC call %s\n%s' % (commands, manifest))
self._events.put('unknown_rpc')
break
store_cmd = manifest['commands'][0]
if not isinstance(store_cmd['args'], unicode):
self._progress.update_item('Unexpected RPC manifest\n%s' % manifest)
self._events.put('unknown_args')
break
result_url = manifest['result_url']
test_run = json.loads(store_cmd['args'])
if result_url != test_run['result_url']:
self._progress.update_item(
'Unexpected result url: %s != %s' %
(result_url, test_run['result_url']))
self._events.put('invalid_result_url')
break
ping_url = test_run['ping_url']
ping_delay = test_run['ping_delay']
self._progress.update_item('%d processing' % self._index, processing=1)
# Fake activity and send pings as requested.
while True:
remaining = max(0, (start + self._duration) - time.time())
if remaining > ping_delay:
# Include empty data to ensure the request is a POST request.
result = net.url_read(ping_url, data={})
assert result == 'Success.', result
remaining = max(0, (start + self._duration) - time.time())
if not remaining:
break
time.sleep(remaining)
# In the old API, r=<task_id>&id=<bot_id> is passed as the url.
data = {
'o': TASK_OUTPUT,
'x': '0',
}
result = net.url_read(manifest['result_url'], data=data)
self._progress.update_item(
'%d processed' % self._index, processing=-1, processed=1)
if not result:
self._events.put('result_url_fail')
else:
assert result == 'Successfully update the runner results.', result
self._events.put(time.time() - start)
finally:
try:
# Unregister itself. Otherwise the server will have tons of fake bots
# that the admin will have to remove manually.
response = net.url_read(
self._swarming + '/delete_machine_stats',
data=[('r', self._bot_id)])
if response is None:
self._events.put('failed_unregister')
finally:
self._progress.update_item('%d quit' % self._index, bots=-1)
def main():
colorama.init()
parser = optparse.OptionParser(description=sys.modules[__name__].__doc__)
parser.add_option(
'-S', '--swarming',
metavar='URL', default='',
help='Swarming server to use')
parser.add_option(
'--suffix', metavar='NAME', default='', help='Bot suffix name to use')
swarming.add_filter_options(parser)
# Use improbable values to reduce the chance of interfering with real bots.
parser.set_defaults(
dimensions=[
('cpu', ['arm36']),
('hostname', socket.getfqdn()),
('os', OS_NAME),
])
group = optparse.OptionGroup(parser, 'Load generated')
group.add_option(
'--bots', type='int', default=300, metavar='N',
help='Number of swarming bots, default: %default')
group.add_option(
'-c', '--consume', type='float', default=60., metavar='N',
help='Duration (s) for consuming a request, default: %default')
parser.add_option_group(group)
group = optparse.OptionGroup(parser, 'Display options')
group.add_option(
'--columns', type='int', default=graph.get_console_width(), metavar='N',
help='For histogram display, default:%default')
group.add_option(
'--buckets', type='int', default=20, metavar='N',
help='Number of buckets for histogram display, default:%default')
parser.add_option_group(group)
parser.add_option(
'--dump', metavar='FOO.JSON', help='Dumps to json file')
parser.add_option(
'-v', '--verbose', action='store_true', help='Enables logging')
options, args = parser.parse_args()
logging.basicConfig(level=logging.INFO if options.verbose else logging.FATAL)
if args:
parser.error('Unsupported args: %s' % args)
options.swarming = options.swarming.rstrip('/')
if not options.swarming:
parser.error('--swarming is required.')
if options.consume <= 0:
parser.error('Needs --consume > 0. 0.01 is a valid value.')
swarming.process_filter_options(parser, options)
print(
'Running %d bots, each task lasting %.1fs' % (
options.bots, options.consume))
print('Ctrl-C to exit.')
print('[processing/processed/bots]')
columns = [('processing', 0), ('processed', 0), ('bots', 0)]
progress = threading_utils.Progress(columns)
events = Queue.Queue()
start = time.time()
kill_event = threading.Event()
swarm_bot_version_hash = calculate_version(options.swarming + '/bot_code')
hostname = get_hostname()
if options.suffix:
hostname += '-' + options.suffix
bots = [
FakeSwarmBot(
options.swarming, options.dimensions, swarm_bot_version_hash, hostname, i,
progress, options.consume, events, kill_event)
for i in range(options.bots)
]
try:
# Wait for all the bots to come alive.
while not all(s.is_alive() for s in bots):
time.sleep(0.01)
progress.update_item('Ready to run')
while bots:
progress.print_update()
time.sleep(0.01)
# The bots could be told to die.
bots = [s for s in bots if s.is_alive()]
except KeyboardInterrupt:
kill_event.set()
progress.update_item('Waiting for bots to quit.', raw=True)
progress.update_item('')
while bots:
progress.print_update()
bots = [s for s in bots if s.is_alive()]
# At this point, progress is not used anymore.
print('')
print('Ran for %.1fs.' % (time.time() - start))
print('')
results = list(events.queue)
print_results(results, options.columns, options.buckets)
if options.dump:
with open(options.dump, 'w') as f:
json.dump(results, f, separators=(',', ':'))
return 0
if __name__ == '__main__':
sys.exit(main())