blob: 0642eda91d8deb52f2b45c2dd8968047bc92dca4 [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2013 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""Triggers a ton of fake jobs to test its handling under high load.
Generates an histogram with the latencies to process the tasks and number of
retries.
"""
import json
import logging
import optparse
import os
import random
import re
import string
import sys
import time
CLIENT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(
__file__.decode(sys.getfilesystemencoding()))))
sys.path.insert(0, CLIENT_DIR)
from utils import tools
tools.force_local_third_party()
# third_party/
import colorama
# pylint: disable=ungrouped-imports
import swarming
from utils import graph
from utils import net
from utils import threading_utils
import swarming_load_test_bot
# Amount of time the timer should be reduced on the Swarming side.
TIMEOUT_OVERHEAD = 10
def print_results(results, columns, buckets):
delays = [i for i in results if isinstance(i, float)]
failures = [i for i in results if not isinstance(i, float)]
graph.print_histogram(
graph.generate_histogram(delays, buckets), columns, '%5.3f')
print('')
print('Total items : %d' % len(results))
average = 0
if delays:
average = sum(delays)/ len(delays)
print('Average delay: %.2fs' % average)
#print('Average overhead: %s' % graph.to_units(total_size / len(sizes)))
print('')
if failures:
print('')
print('%sFAILURES%s:' % (colorama.Fore.RED, colorama.Fore.RESET))
print('\n'.join(' %s' % i for i in failures))
def trigger_task(
swarming_url, dimensions, sleep_time, output_size, progress,
unique, timeout, index):
"""Triggers a Swarming job and collects results.
Returns the total amount of time to run a task remotely, including all the
overhead.
"""
name = 'load-test-%d-%s' % (index, unique)
start = time.time()
logging.info('trigger')
# TODO(maruel): Broken.
manifest = swarming.Manifest(
isolate_server='http://localhost:1',
namespace='dummy-isolate',
isolated_hash=1,
task_name=name,
extra_args=[],
env={},
dimensions=dimensions,
deadline=int(timeout-TIMEOUT_OVERHEAD),
verbose=False,
profile=False,
priority=100)
cmd = [
'python',
'-c',
'import time; print(\'1\'*%s); time.sleep(%d); print(\'Back\')' %
(output_size, sleep_time)
]
manifest.add_task('echo stuff', cmd)
data = {'request': manifest.to_json()}
response = net.url_read(swarming_url + '/test', data=data)
if response is None:
# Failed to trigger. Return a failure.
return 'failed_trigger'
result = json.loads(response)
# Old API uses hardcoded config name. New API doesn't have concept of config
# name so it uses the task name. Ignore this detail.
test_keys = []
for key in result['test_keys']:
key.pop('config_name')
test_keys.append(key.pop('test_key'))
assert re.match('[0-9a-f]+', test_keys[-1]), test_keys
expected = {
u'priority': 100,
u'test_case_name': unicode(name),
u'test_keys': [
{
u'num_instances': 1,
u'instance_index': 0,
}
],
}
assert result == expected, '\n%s\n%s' % (result, expected)
progress.update_item('%5d' % index, processing=1)
try:
logging.info('collect')
new_test_keys = swarming.get_task_keys(swarming_url, name)
if not new_test_keys:
return 'no_test_keys'
assert test_keys == new_test_keys, (test_keys, new_test_keys)
out = [
output
for _index, output in swarming.yield_results(
swarming_url, test_keys, timeout, None, False, None, False,
True)
]
if not out:
return 'no_result'
for item in out:
item.pop('machine_tag')
item.pop('machine_id')
# TODO(maruel): Assert output even when run on a real bot.
_out_actual = item.pop('output')
# assert out_actual == swarming_load_test_bot.TASK_OUTPUT, out_actual
expected = [
{
u'config_instance_index': 0,
u'exit_codes': u'0',
u'num_config_instances': 1,
}
]
assert out == expected, '\n%s\n%s' % (out, expected)
return time.time() - start
finally:
progress.update_item('%5d - done' % index, processing=-1, processed=1)
def main():
colorama.init()
parser = optparse.OptionParser(description=sys.modules[__name__].__doc__)
parser.add_option(
'-S', '--swarming',
metavar='URL', default='',
help='Swarming server to use')
swarming.add_filter_options(parser)
parser.set_defaults(dimensions=[('os', swarming_load_test_bot.OS_NAME)])
group = optparse.OptionGroup(parser, 'Load generated')
group.add_option(
'-s', '--send-rate', type='float', default=16., metavar='RATE',
help='Rate (item/s) of sending requests as a float, default: %default')
group.add_option(
'-D', '--duration', type='float', default=60., metavar='N',
help='Duration (s) of the sending phase of the load test, '
'default: %default')
group.add_option(
'-m', '--concurrent', type='int', default=200, metavar='N',
help='Maximum concurrent on-going requests, default: %default')
group.add_option(
'-t', '--timeout', type='float', default=15*60., metavar='N',
help='Task expiration and timeout to get results, the task itself will '
'have %ds less than the value provided. Default: %%default' %
TIMEOUT_OVERHEAD)
group.add_option(
'-o', '--output-size', type='int', default=100, metavar='N',
help='Bytes sent to stdout, default: %default')
group.add_option(
'--sleep', type='int', default=60, metavar='N',
help='Amount of time the bot should sleep, e.g. faking work, '
'default: %default')
parser.add_option_group(group)
group = optparse.OptionGroup(parser, 'Display options')
group.add_option(
'--columns', type='int', default=graph.get_console_width(), metavar='N',
help='For histogram display, default:%default')
group.add_option(
'--buckets', type='int', default=20, metavar='N',
help='Number of buckets for histogram display, default:%default')
parser.add_option_group(group)
parser.add_option(
'--dump', metavar='FOO.JSON', help='Dumps to json file')
parser.add_option(
'-v', '--verbose', action='store_true', help='Enables logging')
options, args = parser.parse_args()
logging.basicConfig(level=logging.INFO if options.verbose else logging.FATAL)
if args:
parser.error('Unsupported args: %s' % args)
options.swarming = options.swarming.rstrip('/')
if not options.swarming:
parser.error('--swarming is required.')
if options.duration <= 0:
parser.error('Needs --duration > 0. 0.01 is a valid value.')
swarming.process_filter_options(parser, options)
total = int(round(options.send_rate * options.duration))
print(
'Sending %.1f i/s for %ds with max %d parallel requests; timeout %.1fs; '
'total %d' %
(options.send_rate, options.duration, options.concurrent,
options.timeout, total))
print('[processing/processed/todo]')
# This is used so there's no clash between runs and actual real usage.
unique = ''.join(random.choice(string.ascii_letters) for _ in range(8))
columns = [('processing', 0), ('processed', 0), ('todo', 0)]
progress = threading_utils.Progress(columns)
index = 0
results = []
with threading_utils.ThreadPoolWithProgress(
progress, 1, options.concurrent, 0) as pool:
try:
start = time.time()
while True:
duration = time.time() - start
if duration > options.duration:
break
should_have_triggered_so_far = int(round(duration * options.send_rate))
while index < should_have_triggered_so_far:
pool.add_task(
0,
trigger_task,
options.swarming,
options.dimensions,
options.sleep,
options.output_size,
progress,
unique,
options.timeout,
index)
progress.update_item('', todo=1)
index += 1
progress.print_update()
time.sleep(0.01)
progress.update_item('Getting results for on-going tasks.', raw=True)
for i in pool.iter_results():
results.append(i)
# This is a bit excessive but it's useful in the case where some tasks
# hangs, so at least partial data is available.
if options.dump:
results.sort()
if os.path.exists(options.dump):
os.rename(options.dump, options.dump + '.old')
with open(options.dump, 'wb') as f:
json.dump(results, f, separators=(',', ':'))
if not options.dump:
results.sort()
except KeyboardInterrupt:
aborted = pool.abort()
progress.update_item(
'Got Ctrl-C. Aborted %d unsent tasks.' % aborted,
raw=True,
todo=-aborted)
progress.print_update()
progress.print_update()
# At this point, progress is not used anymore.
print('')
print(' - Took %.1fs.' % (time.time() - start))
print('')
print_results(results, options.columns, options.buckets)
return 0
if __name__ == '__main__':
sys.exit(main())