blob: 525256c6b1ac08913304148230d2a5ef16f9b231 [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import argparse
import json
import logging
import random
import sys
import time
from multiprocessing import Process, current_process
import requests
LOGGER = logging.getLogger(__name__)
def gen_simple_deps(fanout, depth):
"""Create pseudorandom fake dependency graphs."""
graph = {
'deadbeefdeadbeefdeadbeefdeadbeefdeadbeef':
['baddecafbaddecafbaddecafbaddecafbaddecaf',
'cafebabecafebabecafebabecafebabecafebabe'],
'baddecafbaddecafbaddecafbaddecafbaddecaf':
['f005ba11f005ba11f005ba11f005ba11f005ba11'],
'cafebabecafebabecafebabecafebabecafebabe':
['f005ba11f005ba11f005ba11f005ba11f005ba11'],
}
roots = ['deadbeefdeadbeefdeadbeefdeadbeefdeadbeef']
return graph, roots
def gen_deps(fanout, depth, density):
"""Create pseudorandom fake dependency graphs."""
nodes = set()
roots = set()
graph = {}
ranks = random.randint(int(depth * 0.5), int(depth * 1.5))
for _ in xrange(ranks):
width = random.randint(int(fanout * 0.5), int(fanout * 1.5))
new_nodes = ['%040x' % random.randrange(16**40) for _ in xrange(width)]
roots.update(new_nodes)
for i in nodes:
for j in new_nodes:
if random.random() < density:
graph.setdefault(i, []).append(j)
roots.discard(j)
nodes.update(new_nodes)
return graph, roots
def post_jobs(url, jobs):
"""Post initial collection of jobs to the scheduler."""
job_url = '%s/api/v1/job' % url
for job in jobs:
params = {'uid': 'seeder'}
data = {'name': job[:8], 'binary': job, 'version': 0}
r = requests.post(job_url, params=params, data=json.dumps(data))
if r.status_code != requests.codes.ok or r.json()['status'] != 'OK':
LOGGER.error('Failed to post initial job %s: %r', job, r._content)
continue
LOGGER.debug('Posted initial job %s', job)
def fake_slave(url, graph):
"""Run a fake slave worker.
Performs the following actions, in order:
* Heartbeat
* Acquire a job
* Sleep
* Either:
* Complete job; or
* Halt job with some number of dependencies
* Rinse (sleep)
* Repeat
Whether the job completes or halts with dependencies is determined by the
|graphs| input argument. This is a collection of pre-generated dependency
DAGs (represented as nodes mapped to a list of their children). When the
slave worker picks up a job, it emits dependencies based on the children of
that job found in the graph. If the job has no children, it is completed.
The slave worker exits when it fails to acquire a job three consecutive times.
"""
worker_id = current_process().name
LOGGER.debug('Hello from worker %s', worker_id)
uid = {'uid': worker_id}
heartbeat_url = '%s/api/v1/worker/heartbeat' % (url)
acquire_url = '%s/api/v1/worker/acquire_job' % (url)
job_url = '%s/api/v1/job' % (url)
failures_left = 3
while failures_left:
r = requests.post(heartbeat_url, params=uid)
LOGGER.debug('Sent request %s %s', r.request.url, r.request.body)
if r.status_code != requests.codes.ok or r.json()['status'] != 'OK':
LOGGER.error('%s failed to send heartbeat: %r', worker_id, r._content)
failures_left -= 1
continue
r = requests.post(acquire_url, params=uid)
LOGGER.debug('Sent request %s %s', r.request.url, r.request.body)
if r.status_code != requests.codes.ok or r.json()['status'] != 'OK':
LOGGER.error('%s failed to acquire job: %r', worker_id, r._content)
failures_left -= 1
continue
job = r.json()['result']['job']
LOGGER.info('%s acquired job %s', worker_id, job['name'])
failures_left = 3
time.sleep(random.uniform(0, 2))
children = graph.get(job['binary'])
if children:
deps = []
for dep in children:
data = {'name': dep[:8], 'binary': dep, 'version': 0}
r = requests.post(job_url, params=uid, data=json.dumps(data))
LOGGER.debug('Sent request %s %s', r.request.url, r.request.body)
if r.status_code != requests.codes.ok or r.json()['status'] != 'OK':
LOGGER.error('Failed to post dependency %s: %r', dep, r._content)
continue
LOGGER.debug('Posted dependency %s', dep)
deps.append(r.json()['result']['job']['id'])
data = {'state': 'queued', 'dependencies': deps}
r = requests.put('%s/%s' % (job_url, job['id']),
params=uid, data=json.dumps(data))
LOGGER.debug('Sent request %s %s', r.request.url, r.request.body)
if r.status_code != requests.codes.ok or r.json()['status'] != 'OK':
LOGGER.error('%s failed to halt job: %r', worker_id, r._content)
continue
LOGGER.info('%s halted job %s', worker_id, job)
else:
data = {'state': 'completed'}
r = requests.put('%s/%s' % (job_url, job['id']),
params=uid, data=json.dumps(data))
LOGGER.debug('Sent request %s %s', r.request.url, r.request.body)
if r.status_code != requests.codes.ok or r.json()['status'] != 'OK':
LOGGER.error('%s failed to complete job: %r', worker_id, r._content)
continue
LOGGER.info('%s completed job %s', worker_id, job)
time.sleep(random.uniform(0, 2))
def parse_args(args):
parser = argparse.ArgumentParser(
description='Emulate one or more slave workers.')
parser.add_argument('-u', '--url', default='http://localhost:8080',
help='url of scheduler')
parser.add_argument('-j', '--slaves', default=1, metavar='N', type=int,
help='number of slaves to emulate')
parser.add_argument('-f', '--fanout', default=5, metavar='N', type=int,
help='fanout / width of dependency tree')
parser.add_argument('-d', '--depth', default=5, metavar='N', type=int,
help='depth of dependency tree')
parser.add_argument('-e', '--edges', default=0.3, type=float,
help='chance of each edge in dependency tree')
parser.add_argument('-t', '--timeout', default=5, metavar='S', type=int,
help='how long (in seconds) to let slaves run')
loglevel = parser.add_mutually_exclusive_group()
loglevel.add_argument('-v', '--verbose', action='store_const', const='DEBUG',
dest='loglevel', default='INFO')
loglevel.add_argument('-q', '--quiet', action='store_const', const='ERROR',
dest='loglevel', default='INFO')
return parser.parse_args(args)
def main(args):
opts = parse_args(args)
LOGGER.setLevel(opts.loglevel)
graph, roots = gen_deps(opts.fanout, opts.depth, opts.edges)
post_jobs(opts.url, roots)
children = []
for _ in xrange(opts.slaves):
c = Process(target=fake_slave, args=(opts.url, graph))
children.append(c)
LOGGER.debug('Starting child %s.', c.name)
c.start()
for c in children:
c.join()
LOGGER.debug('Joining child %s.', c.name)
if __name__ == '__main__':
logging.basicConfig()
sys.exit(main(sys.argv[1:]))