blob: a82becc45cf2f8c32078e6eca9a66d7072b60267 [file] [log] [blame]
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""API that build and execute recipes wrapped into a task dependency graph.
A Task consists of a 'recipe' (a closure to be executed) and a list of refs to
tasks that should be executed prior to executing this Task (i.e. dependencies).
A Task can be either 'static' or 'dynamic'. A static tasks only represents an
existing file on the filesystem, its recipe is a no-op. The responsibility of
the recipe of a dynamic task is to produce the file with the name assigned at
task creation.
A scenario is a ordered list of dynamic tasks to execute such that the
dependencies of a given task are execute before the said task. The scenario is
built from a list of final tasks and a list of frozen tasks:
- A final task is a task to execute ultimately. Therefore the scenario is
composed of final tasks and their required intermediary tasks.
- A frozen task is dynamic task to not execute. This is a mechanism to morph a
dynamic task that may have dependencies to a static task with no dependency
at scenario generation time, injecting what the dynamic task have already
produced before as an input of the smaller tasks dependency graph covered
by the scenario.
Example:
# -------------------------------------------------- Build my dependency graph
builder = Builder('my/output/dir')
input0 = builder.CreateStaticTask('input0', 'path/to/input/file0')
input1 = builder.CreateStaticTask('input1', 'path/to/input/file1')
input2 = builder.CreateStaticTask('input2', 'path/to/input/file2')
input3 = builder.CreateStaticTask('input3', 'path/to/input/file3')
@builder.RegisterTask('out0', dependencies=[input0, input2])
def BuildOut0():
DoStuff(input0.path, input2.path, out=BuildOut0.path)
@builder.RegisterTask('out1', dependencies=[input1, input3])
def BuildOut1():
DoStuff(input1.path, input3.path, out=BuildOut1.path)
@builder.RegisterTask('out2', dependencies=[BuildOut0, BuildOut1])
def BuildOut2():
DoStuff(BuildOut0.path, BuildOut1.path, out=BuildOut2.path)
@builder.RegisterTask('out3', dependencies=[BuildOut0])
def BuildOut3():
DoStuff(BuildOut0.path, out=BuildOut3.path)
# ---------------------------- Case 1: Execute BuildOut3 and its dependencies.
for task in GenerateScenario(final_tasks=[BuildOut3], frozen_tasks=[])
task.Execute()
# ---------- Case 2: Execute BuildOut2 and its dependencies but not BuildOut1.
# It is required that BuildOut1.path is already existing.
for task in GenerateScenario(final_tasks=[BuildOut2],
frozen_tasks=[BuildOut1])
task.Execute()
"""
import argparse
import collections
import datetime
import errno
import logging
import os
import re
import subprocess
import sys
import common_util
_TASK_GRAPH_DOTFILE_NAME = 'tasks_graph.dot'
_TASK_GRAPH_PNG_NAME = 'tasks_graph.png'
_TASK_RESUME_ARGUMENTS_FILE = 'resume.txt'
_TASK_EXECUTION_LOG_NAME_FORMAT = 'task-execution-%Y-%m-%d-%H-%M-%S.log'
FROMFILE_PREFIX_CHARS = '@'
class TaskError(Exception):
pass
class Task(object):
"""Task that can be either a static task or dynamic with a recipe."""
def __init__(self, name, path, dependencies, recipe):
"""Constructor.
Args:
name: The name of the task.
path: Path to the file or directory that this task produces.
dependencies: List of parent task to execute before.
recipe: Function to execute if a dynamic task or None if a static task.
"""
self.name = name
self.path = path
self._dependencies = dependencies
self._recipe = recipe
self._is_done = recipe == None
def Execute(self):
"""Executes this task."""
if self.IsStatic():
raise TaskError('Task {} is static.'.format(self.name))
if not self._is_done:
self._recipe()
self._is_done = True
def IsStatic(self):
"""Returns whether this task is a static task."""
return self._recipe == None
class Builder(object):
"""Utilities for creating sub-graphs of tasks with dependencies."""
def __init__(self, output_directory, output_subdirectory):
"""Constructor.
Args:
output_directory: Output directory where the dynamic tasks work.
output_subdirectory: Subdirectory to put all created tasks in or None.
"""
self.output_directory = output_directory
self.output_subdirectory = output_subdirectory
self._tasks = {}
def CreateStaticTask(self, task_name, path):
"""Creates and returns a new static task."""
task_name = self._RebaseTaskName(task_name)
if not os.path.exists(path):
raise TaskError('Error while creating task {}: File not found: {}'.format(
task_name, path))
if task_name in self._tasks:
raise TaskError('Task {} already exists.'.format(task_name))
task = Task(task_name, path, [], None)
self._tasks[task_name] = task
return task
# Caution:
# This decorator may not create a dynamic task in the case where
# merge=True and another dynamic target having the same name have already
# been created. In this case, it will just reuse the former task. This is at
# the user responsibility to ensure that merged tasks would do the exact
# same thing.
#
# @builder.RegisterTask('hello')
# def TaskA():
# my_object.a = 1
#
# @builder.RegisterTask('hello', merge=True)
# def TaskB():
# # This function won't be executed ever.
# my_object.a = 2 # <------- Wrong because different from what TaskA do.
#
# assert TaskA == TaskB
# TaskB.Execute() # Sets set my_object.a == 1
def RegisterTask(self, task_name, dependencies=None, merge=False):
"""Decorator that wraps a function into a dynamic task.
Args:
task_name: The name of this new task to register.
dependencies: List of SandwichTarget to build before this task.
merge: If a task already have this name, don't create a new one and
reuse the existing one.
Returns:
A Task that was created by wrapping the function or an existing registered
wrapper (that have wrapped a different function).
"""
task_name = self._RebaseTaskName(task_name)
dependencies = dependencies or []
def InnerAddTaskWithNewPath(recipe):
if task_name in self._tasks:
if not merge:
raise TaskError('Task {} already exists.'.format(task_name))
task = self._tasks[task_name]
if task.IsStatic():
raise TaskError('Should not merge dynamic task {} with the already '
'existing static one.'.format(task_name))
return task
task_path = os.path.join(self.output_directory, task_name)
task = Task(task_name, task_path, dependencies, recipe)
self._tasks[task_name] = task
return task
return InnerAddTaskWithNewPath
def _RebaseTaskName(self, task_name):
if self.output_subdirectory:
return os.path.join(self.output_subdirectory, task_name)
return task_name
def GenerateScenario(final_tasks, frozen_tasks):
"""Generates a list of tasks to execute in order of dependencies-first.
Args:
final_tasks: The final tasks to generate the scenario from.
frozen_tasks: Sets of task to freeze.
Returns:
[Task]
"""
scenario = []
task_paths = {}
def InternalAppendTarget(task):
if task.IsStatic():
return
if task in frozen_tasks:
if not os.path.exists(task.path):
raise TaskError('Frozen target `{}`\'s path doesn\'t exist.'.format(
task.name))
return
if task.path in task_paths:
if task_paths[task.path] == None:
raise TaskError('Target `{}` depends on itself.'.format(task.name))
if task_paths[task.path] != task:
raise TaskError(
'Tasks `{}` and `{}` produce the same file: `{}`.'.format(
task.name, task_paths[task.path].name, task.path))
return
task_paths[task.path] = None
for dependency in task._dependencies:
InternalAppendTarget(dependency)
task_paths[task.path] = task
scenario.append(task)
for final_task in final_tasks:
InternalAppendTarget(final_task)
return scenario
def GenerateDependentSetPerTask(scenario):
"""Maps direct dependents per tasks of scenario.
Args:
scenario: The scenario containing the Tasks to map.
Returns:
{Task: set(Task)}
"""
task_set = set(scenario)
task_children = collections.defaultdict(set)
for task in scenario:
for parent in task._dependencies:
if parent in task_set:
task_children[parent].add(task)
return task_children
def ListResumingTasksToFreeze(scenario, final_tasks, skipped_tasks):
"""Lists the tasks that one needs to freeze to be able to resume the scenario
after failure.
Args:
scenario: The scenario (list of Task) to be resumed.
final_tasks: The list of final Task used to generate the scenario.
skipped_tasks: Set of Tasks in the scenario that were skipped.
Returns:
[Task]
"""
scenario_tasks = set(scenario)
assert skipped_tasks.issubset(scenario_tasks)
frozen_tasks = []
frozen_task_set = set()
walked_tasks = set()
def InternalWalk(task):
if task.IsStatic() or task in walked_tasks:
return
walked_tasks.add(task)
if task not in scenario_tasks or task not in skipped_tasks:
if task not in frozen_task_set:
frozen_task_set.add(task)
frozen_tasks.append(task)
else:
for dependency in task._dependencies:
InternalWalk(dependency)
for final_task in final_tasks:
InternalWalk(final_task)
return frozen_tasks
def OutputGraphViz(scenario, final_tasks, output):
"""Outputs the build dependency graph covered by this scenario.
Args:
scenario: The generated scenario.
final_tasks: The final tasks used to generate the scenario.
output: A file-like output stream to receive the dot file.
Graph interpretations:
- Static tasks are shape less.
- Final tasks (the one that where directly appended) are box shaped.
- Non final dynamic tasks are ellipse shaped.
- Frozen dynamic tasks have a blue shape.
"""
task_execution_ids = {t: i for i, t in enumerate(scenario)}
tasks_node_ids = dict()
def GetTaskNodeId(task):
if task in tasks_node_ids:
return tasks_node_ids[task]
node_id = len(tasks_node_ids)
node_label = task.name
node_color = 'blue'
node_shape = 'ellipse'
if task.IsStatic():
node_shape = 'plaintext'
elif task in task_execution_ids:
node_color = 'black'
node_label = str(task_execution_ids[task]) + ': ' + node_label
if task in final_tasks:
node_shape = 'box'
output.write(' n{} [label="{}", color={}, shape={}];\n'.format(
node_id, node_label, node_color, node_shape))
tasks_node_ids[task] = node_id
return node_id
output.write('digraph graphname {\n')
for task in scenario:
task_node_id = GetTaskNodeId(task)
for dep in task._dependencies:
dep_node_id = GetTaskNodeId(dep)
output.write(' n{} -> n{};\n'.format(dep_node_id, task_node_id))
output.write('}\n')
def CommandLineParser():
"""Creates command line arguments parser meant to be used as a parent parser
for any entry point that use the ExecuteWithCommandLine() function.
The root parser must be created with:
fromfile_prefix_chars=FROMFILE_PREFIX_CHARS.
Returns:
The command line arguments parser.
"""
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument('-d', '--dry-run', action='store_true',
help='Only prints the tasks to build.')
parser.add_argument('-e', '--to-execute', metavar='REGEX', type=str,
action='append', dest='run_regexes', default=[],
help='Regex selecting tasks to execute.')
parser.add_argument('-f', '--to-freeze', metavar='REGEX', type=str,
action='append', dest='frozen_regexes', default=[],
help='Regex selecting tasks to not execute.')
parser.add_argument('-k', '--keep-going', action='store_true', default=False,
help='Keep going when some targets can\'t be made.')
parser.add_argument('-o', '--output', type=str, required=True,
help='Path of the output directory.')
parser.add_argument('-v', '--output-graphviz', action='store_true',
help='Outputs the {} and {} file in the output directory.'
''.format(_TASK_GRAPH_DOTFILE_NAME, _TASK_GRAPH_PNG_NAME))
return parser
def _SelectTasksFromCommandLineRegexes(args, default_final_tasks):
frozen_regexes = [common_util.VerboseCompileRegexOrAbort(e)
for e in args.frozen_regexes]
run_regexes = [common_util.VerboseCompileRegexOrAbort(e)
for e in args.run_regexes]
# Lists final tasks.
final_tasks = default_final_tasks
if run_regexes:
final_tasks = []
# Traverse the graph in the normal execution order starting from
# |default_final_tasks| in case of command line regex selection.
tasks = GenerateScenario(default_final_tasks, frozen_tasks=set())
# Order of run regexes prevails on the traversing order of tasks.
for regex in run_regexes:
for task in tasks:
if regex.search(task.name):
final_tasks.append(task)
# Lists parents of |final_tasks| to freeze.
frozen_tasks = set()
impossible_tasks = set()
if frozen_regexes:
complete_scenario = GenerateScenario(final_tasks, frozen_tasks=set())
dependents_per_task = GenerateDependentSetPerTask(complete_scenario)
def MarkTaskAsImpossible(task):
if task in impossible_tasks:
return
impossible_tasks.add(task)
for dependent in dependents_per_task[task]:
MarkTaskAsImpossible(dependent)
for task in complete_scenario:
for regex in frozen_regexes:
if regex.search(task.name):
if os.path.exists(task.path):
frozen_tasks.add(task)
else:
MarkTaskAsImpossible(task)
break
return [t for t in final_tasks if t not in impossible_tasks], frozen_tasks
class _ResumingFileBuilder(object):
def __init__(self, args):
resume_path = os.path.join(args.output, _TASK_RESUME_ARGUMENTS_FILE)
self._resume_output = open(resume_path, 'w')
# List initial freezing regexes not to loose track of final targets to
# freeze in case of severals resume attempts caused by sudden death.
for regex in args.frozen_regexes:
self._resume_output.write('-f\n{}\n'.format(regex))
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
del exc_type, exc_value, exc_traceback # unused
self._resume_output.close()
def OnTaskSuccess(self, task):
# Log the succeed tasks so that they are ensured to be frozen in case
# of a sudden death.
self._resume_output.write('-f\n^{}$\n'.format(re.escape(task.name)))
# Makes sure the task freezing command line make it to the disk.
self._resume_output.flush()
os.fsync(self._resume_output.fileno())
def OnScenarioFinish(
self, scenario, final_tasks, failed_tasks, skipped_tasks):
resume_additonal_arguments = []
for task in ListResumingTasksToFreeze(
scenario, final_tasks, skipped_tasks):
resume_additonal_arguments.extend(
['-f', '^{}$'.format(re.escape(task.name))])
self._resume_output.seek(0)
self._resume_output.truncate()
self._resume_output.write('\n'.join(resume_additonal_arguments))
print '# Looks like something went wrong in tasks:'
for failed_task in failed_tasks:
print '# {}'.format(failed_task.name)
print '#'
print '# To resume, append the following parameter:'
print '# ' + FROMFILE_PREFIX_CHARS + self._resume_output.name
def ExecuteWithCommandLine(args, default_final_tasks):
"""Helper to execute tasks using command line arguments.
Args:
args: Command line argument parsed with CommandLineParser().
default_final_tasks: Default final tasks if there is no -r command
line arguments.
Returns:
0 if success or 1 otherwise
"""
# Builds the scenario.
final_tasks, frozen_tasks = _SelectTasksFromCommandLineRegexes(
args, default_final_tasks)
scenario = GenerateScenario(final_tasks, frozen_tasks)
if len(scenario) == 0:
logging.error('No tasks to build.')
return 1
if not os.path.isdir(args.output):
os.makedirs(args.output)
# Print the task dependency graph visualization.
if args.output_graphviz:
graphviz_path = os.path.join(args.output, _TASK_GRAPH_DOTFILE_NAME)
png_graph_path = os.path.join(args.output, _TASK_GRAPH_PNG_NAME)
with open(graphviz_path, 'w') as output:
OutputGraphViz(scenario, final_tasks, output)
subprocess.check_call(['dot', '-Tpng', graphviz_path, '-o', png_graph_path])
# Use the build scenario.
if args.dry_run:
for task in scenario:
print task.name
return 0
# Run the Scenario while saving intermediate state to be able to resume later.
failed_tasks = []
tasks_to_skip = set()
dependents_per_task = GenerateDependentSetPerTask(scenario)
def MarkTaskNotToExecute(task):
if task not in tasks_to_skip:
logging.warning('can not execute task: %s', task.name)
tasks_to_skip.add(task)
for dependent in dependents_per_task[task]:
MarkTaskNotToExecute(dependent)
log_filename = datetime.datetime.now().strftime(
_TASK_EXECUTION_LOG_NAME_FORMAT)
formatter = logging.Formatter('[%(asctime)s] %(levelname)s: %(message)s')
handler = logging.FileHandler(
os.path.join(args.output, log_filename), mode='a')
handler.setFormatter(formatter)
logging.getLogger().addHandler(handler)
logging.info(
'%s %s', '-' * 60, common_util.GetCommandLineForLogging(sys.argv))
try:
with _ResumingFileBuilder(args) as resume_file_builder:
for task_execute_id, task in enumerate(scenario):
if task in tasks_to_skip:
continue
logging.info('%s %s', '-' * 60, task.name)
try:
task.Execute()
except (MemoryError, SyntaxError):
raise
except BaseException:
# The resuming file being incrementally generated by
# resume_file_builder.OnTaskSuccess() is automatically fsynced().
# But resume_file_builder.OnScenarioFinish() completely rewrite
# this file with the mininal subset of task to freeze, and in case
# of an ENOSPC, we don't want to touch the resuming file at all so
# that it remains uncorrupted.
if (sys.exc_info()[0] == IOError and
sys.exc_info()[1].errno == errno.ENOSPC):
raise
logging.exception('%s %s failed', '-' * 60, task.name)
failed_tasks.append(task)
if args.keep_going and sys.exc_info()[0] != KeyboardInterrupt:
MarkTaskNotToExecute(task)
else:
tasks_to_skip.update(set(scenario[task_execute_id:]))
break
else:
resume_file_builder.OnTaskSuccess(task)
if tasks_to_skip:
assert failed_tasks
resume_file_builder.OnScenarioFinish(
scenario, final_tasks, failed_tasks, tasks_to_skip)
if sys.exc_info()[0] == KeyboardInterrupt:
raise
return 1
finally:
logging.getLogger().removeHandler(handler)
assert not failed_tasks
return 0