blob: a7e7b07a37aa0a423b35f9387f95c0787ed99d2c [file] [log] [blame]
#!python
# Copyright 2014 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A utility script to automatically generate the SyzyASan interceptors for the
functions declared in a header file using SAL annotations.
Here's how this script should be used:
python asan_system_interceptor_parser.py input_header.h --output-file=$(OutName)
--overwrite --def-file=$(DefFile)
3 files will be produced:
- $(OutName)_impl.h.gen : This will contain the implementation of the new
interceptors.
- $(OutName)_instrumentation_filter.h.gen : This will contain a list of
AsanInterceptor entries, e.g:
{ "foo", NULL, NULL, "foo.dll", true },
{ "bar", NULL, NULL, "bar.dll", true },
- $(OutName).def.gen : This will contain a copy of the input DEF file followed
by the list of the new interceptors
As an example, for a definition like this:
MODULE: kernel32.dll
BOOL
WINAPI
WriteFile(
_In_ HANDLE hFile,
_In_reads_bytes_opt_(nNumberOfBytesToWrite) LPCVOID lpBuffer,
_In_ DWORD nNumberOfBytesToWrite,
_Out_opt_ LPDWORD lpNumberOfBytesWritten,
_Inout_opt_ LPOVERLAPPED lpOverlapped
);
This will produce the following interceptor:
BOOL WINAPI asan_WriteFile(
_In_ HANDLE hFile,
_In_reads_bytes_opt_(nNumberOfBytesToWrite) LPCVOID lpBuffer,
_In_ DWORD nNumberOfBytesToWrite,
_Out_opt_ LPDWORD lpNumberOfBytesWritten,
_Inout_opt_ LPOVERLAPPED lpOverlapped
) {
if (lpBuffer != NULL) {
TestMemoryRange(reinterpret_cast<const uint8*>(lpBuffer),
nNumberOfBytesToWrite,
agent::asan::ASAN_READ_ACCESS);
}
if (lpNumberOfBytesWritten != NULL) {
TestMemoryRange(reinterpret_cast<const uint8*>(lpNumberOfBytesWritten),
sizeof(*lpNumberOfBytesWritten),
agent::asan::ASAN_WRITE_ACCESS);
}
if (lpOverlapped != NULL) {
TestMemoryRange(reinterpret_cast<const uint8*>(lpOverlapped),
sizeof(*lpOverlapped),
agent::asan::ASAN_READ_ACCESS);
}
BOOL ret = ::WriteFile(hFile, lpBuffer, nNumberOfBytesToWrite,
lpNumberOfBytesWritten, lpOverlapped);
if (interceptor_tail_callback != NULL)
(*interceptor_tail_callback)();
if (lpNumberOfBytesWritten != NULL) {
TestMemoryRange(reinterpret_cast<const uint8*>(lpNumberOfBytesWritten),
sizeof(*lpNumberOfBytesWritten),
agent::asan::ASAN_WRITE_ACCESS);
}
if (lpBuffer != NULL) {
TestMemoryRange(reinterpret_cast<const uint8*>(lpBuffer),
nNumberOfBytesToWrite,
agent::asan::ASAN_READ_ACCESS);
}
return ret;
}
"""
import logging
import optparse
import os
import re
import sys
from string import Template
# Matches a function declaration of this type:
# MODULE: MODULE_NAME
# RETURN_TYPE
# WINAPI
# FUNCTION_NAME(
# ...
# );
_FUNCTION_MATCH_RE = re.compile(r"""
MODULE\:\s*(?P<module_name>(\w+\.\w+))\s+
# Match the name of the module implementing the
# function.
(?P<ret>\w+)\s+ # Match the return type of the function.
(?P<conv>WINAPI|__cdecl)\s+ # Match the calling convention keyword.
(?P<name>\w+)\s*\( # Match the function name.
(?P<params>[^;]+)\)\s*; # Match the functions parameters, terminated by
# ');'. This field can contain embedded
# parenthesis.
""", re.VERBOSE | re.IGNORECASE | re.MULTILINE)
# Match and tokenize an argument in a function declaration using SAL
# annotations. Here are some examples of strings that we need to be able to
# match:
# - _In_ HANDLE hFile
# - _In_reads_bytes_opt_(nNumberOfBytesToWrite) LPCVOID lpBuffer
# - _Out_writes_to_opt_(nBufferLength, return + 1) LPWSTR lpBuffer
# - _Out_writes_bytes_opt_(nNumber) __out_data_source(FILE) LPVOID lpBuffer
# - _Out_writes_to_opt_(cchBufferLength, *lpcchReturnLength) _Post_
# _NullNull_terminated_ LPWCH lpszVolumePathNames
# - _In_ FILE_SEGMENT_ELEMENT aSegmentArray[]
# - _In_reads_bytes_opt_(PropertyBufferSize) CONST PBYTE PropertyBuffer
# - _Inout_ _Interlocked_operand_ LONG volatile *Addend
#
# Here's a description of the different groups in this regex:
# - SAL_tag corresponds to the SAL tag of the argument.
# - SAL_tag_args (optional) corresponds to the arguments accompanying the
# tag.
# - var_type corresponds to the type of the argument.
# - var_name corresponds to the name of the argument.
#
# For an argument like:
# _Out_writes_to_opt_(nBufferLength, return + 1) LPWSTR lpBuffer
# we'll get the following values:
# - SAL_tag: _Out_writes_to_opt_
# - SAL_tag_args: nBufferLength, return + 1
# - var_type: LPWSTR
# - var_name: lpBuffer
# - var_keyword corresponds to the potential keyword qualifier accompanying
# the variable type (e.g. 'volatile')
#
# See http://msdn.microsoft.com/en-us/library/hh916382.aspx for a complete list
# of the possible annotations.
_ARG_TOKENS_RE = re.compile(r"""
(?P<SAL_tag>(\_\w+\_)) # Match the SAL annotation, it starts and
# ends with an underscore and usually
# contains one or several words separated
# by an underscore.
(\((?P<SAL_tag_args>[^\)]*)\))? # Match the optional arguments
# accompanying a tag.
\s+((\_[^ ]+\s+)*)? # The annotation is sometimes followed by
# one or several other tags, all starting
# with at least one underscore, like:
# - _Post_ _NullNull_terminated_
# - __out_data_source(FILE)
(?P<var_type>(((CONST|FAR)\s*)?[a-zA-Z][a-zA-Z_0-9]+)
# Match the type of the argument.
(\s+(?P<var_keyword>volatile|const))?(\*)?(\s+\*)?)
# Match the optional keyword of the
# argument.
(\*)?\s+(\*\s*)?(?P<var_name>\w+)(\[\])?
# Match the name of the argument.
""", re.VERBOSE | re.IGNORECASE | re.MULTILINE)
# Non-exhaustive dictionary of the annotations that we're interested in. All of
# these usually refer to a buffer that we should check. The key of the entry
# is the SAL tag and the value of this key corresponds to the access mode for
# this tag.
_TAGS_TO_INTERCEPT = {
'_In_reads_bytes_opt_' : 'READ',
'_Out_writes_to_opt_' : 'WRITE',
'_Out_writes_bytes_opt_' : 'WRITE',
'_Out_writes_bytes_to_opt_' : 'WRITE',
}
# List of the SAL tags meaning that an argument should be checked once the call
# to the intercepted function has returned.
_TAGS_TO_CHECK_POSTCALL = frozenset(['_Out_', '_Out_opt_'])
# List of the SAL tags meaning that an argument should be checked the call to
# the intercepted function. This includes the tag of some parameters that might
# become invalid once the call to the original function has returned (i.e. if
# the parameter is invalidated by the asynchronous callback made by the system
# call).
_TAGS_TO_CHECK_PRECALL = frozenset(list(_TAGS_TO_CHECK_POSTCALL) +
['_Inout_', '_Inout_opt_'])
_LOGGER = logging.getLogger(__name__)
# String template for an entry in an ASan instrumentation filter array.
#
# Here's the description of the different identifiers in this template:
# - function_name: Name of the function.
# - module_name: Name of the module containing this function.
instrumentation_filter_entry_template = Template("""
{ "${function_name}", NULL, "${module_name}", NULL, true },
""")
# String template for an ASan interceptor implementation.
#
# Here's the description of the different identifiers in this template:
# - ret_type: Return type of the function.
# - calling_convention: The calling convention of the function.
# - function_name: Name of the function.
# - function_arguments: Function's arguments, with their types.
# - buffer_check: Optional check on the buffer passed to the function.
# - function_param_names: String containing the name of the arguments to
# pass to the intercepted function.
# - param_checks_precall: Optional parameter check done before the call to
# the intercepted function.
# - param_checks_postcall: Optional parameter check done after the call to
# the intercepted function.
interceptor_template = Template("""
${ret_type} ${calling_convention} \
asan_${function_name}(${function_arguments}) {
${buffer_check}
${param_checks_precall}
${ret_type} ret = ::${function_name}(${function_param_names});
if (interceptor_tail_callback != NULL)
(*interceptor_tail_callback)();
${param_checks_postcall}
${buffer_check}
return ret;
}
""")
# String template for an ASan check on a parameter.
#
# Here's the description of the different identifiers in this template:
# - param_to_check: The parameter to check.
# - param_size: Size of the variable that should be checked.
# - access_type: The access type to the parameter.
# - param_keyword: The optional keyword qualifier accompanying the variable
# type (e.g. 'volatile').
#
# We need to do a double cast on the parameter to check to convert it to the
# expected type (via a reinterpret_cast) and to lose the optional keyword
# qualifier (via a const_cast).
param_checks_template = Template("""
if (${param_to_check} != NULL) {
TestMemoryRange(
const_cast<const uint8*>(
reinterpret_cast<const uint8 ${param_keyword}*>(${param_to_check})),
${param_size},
agent::asan::ASAN_${access_type}_ACCESS);
}
""")
class ASanSystemInterceptorGenerator(object):
"""Implement the ASan system interceptor generator class.
The instances of this class should be created with a 'with' statement to
ensure that the output files get correctly closed.
"""
def __init__(self, output_base, def_file, overwrite=False):
# Creates the output files:
# - output_base + '_impl.gen' : This file will contain the
# implementation of the interceptors.
# - output_base + '_instrumentation_filter.gen : This file will
# contain a list of AsanIntercept entries.
# - output_base + 'def.gen' : This file will contain a copy of the input
# DEF file followed by the list of the new interceptors.
output_impl_filename = output_base + '_impl.gen'
output_instrumentation_filter_filename = output_base + \
'_instrumentation_filter.gen'
output_def_filename = output_base + '.def.gen'
if (os.path.isfile(output_impl_filename) or \
os.path.isfile(output_instrumentation_filter_filename) or \
os.path.isfile(output_def_filename)) and \
not overwrite:
_LOGGER.error('Output files already exist, use the --overwrite flag to '
'overwrite it.')
return
self._output_impl_file = open(output_impl_filename, 'w')
self._output_instrumentation_filter_file = \
open(output_instrumentation_filter_filename, 'w')
self._def_file = open(output_def_filename, 'w')
# Copy the input DEF file.
with open(def_file, 'r') as f:
self._def_file.write(f.read())
# List of the intercepted functions.
self._intercepted_functions = set()
def __enter__(self):
"""This generator should be instantiated via a 'with' statement to ensure
that it resources are correctly closed.
"""
return self
def __exit__(self, type, value, traceback):
"""Close the handle to the allocated files. This is executed when the
instance of this generator are created with a 'with' statement.
"""
self._output_impl_file.close()
self._output_instrumentation_filter_file.close()
self._def_file.close()
def GenerateFunctionInterceptor(self, function_name, return_type,
function_arguments, calling_convention,
module_name):
"""Generate the interceptor for a given function if necessary.
Args:
function_name: The name of the function for which an interceptor should be
generated.
return_type: The return type of the function.
function_arguments: A string representing the functions arguments
(e.g. 'int foo, bool bar'). It can contain newline characters.
"""
# Prevent repeatedly intercepting the same function.
if (function_name, function_arguments) in self._intercepted_functions:
_LOGGER.error('Trying to intercept the same function twice !')
return
# Check if the function should be intercepted. If at least one of its
# parameters is annotated with one of the tags we're interested in then it
# should be intercepted.
m_buffer_size_arg = None
for m_iter in _ARG_TOKENS_RE.finditer(function_arguments):
if m_iter.group('SAL_tag') in _TAGS_TO_INTERCEPT:
# Keep a reference to the argument of interest.
m_buffer_size_arg = m_iter
break
# TODO(sebmarchand): Only check the argument type (instead of the raw
# string).
self._intercepted_functions.add((function_name, function_arguments))
_LOGGER.debug('Function to intercept:')
_LOGGER.debug(' Function calling convention : %s' % calling_convention)
_LOGGER.debug(' Function name : %s' % function_name)
_LOGGER.debug(' Function type : %s' % return_type)
_LOGGER.debug(' Function module : %s' % module_name)
_LOGGER.debug(' Function args : ')
param_checks_precall = ''
param_checks_postcall = ''
# Form a string containing the name of the arguments separated by a comma
# and fill the precall and postcall parameter check strings.
function_param_names = ''
for m_iter in _ARG_TOKENS_RE.finditer(function_arguments):
# Concatenate the argument names.
if function_param_names:
function_param_names = function_param_names + ', '
function_param_names = function_param_names + m_iter.group('var_name')
# Check if this argument should be checked prior to a call to the
# intercepted function.
if m_iter.group('SAL_tag') in _TAGS_TO_CHECK_PRECALL:
param_keyword = ''
if m_iter.group('var_keyword'):
param_keyword = m_iter.group('var_keyword')
param_check_str = param_checks_template.substitute(
param_to_check=m_iter.group('var_name'),
param_size='sizeof(*%s)' % m_iter.group('var_name'),
access_type='READ' if 'In' in m_iter.group('SAL_tag') else 'WRITE',
param_keyword=param_keyword)
param_checks_precall += param_check_str
# Check if it should also be checked once the function returns.
if m_iter.group('SAL_tag') in _TAGS_TO_CHECK_POSTCALL:
param_checks_postcall += param_check_str
_LOGGER.debug(' %s' % \
''.join(m_iter.group().replace('\n', ' ').split()))
_LOGGER.debug(' SAL tag: %s' % m_iter.group('SAL_tag'))
_LOGGER.debug(' SAL tag arguments: %s' % \
m_iter.group('SAL_tag_args'))
_LOGGER.debug(' variable type: %s' % m_iter.group('var_type'))
_LOGGER.debug(' variable name: %s' % m_iter.group('var_name'))
_LOGGER.debug(' variable keyword: %s' % m_iter.group('var_keyword'))
_LOGGER.debug('\n')
buffer_check = ''
if m_buffer_size_arg:
param_keyword = ''
if m_buffer_size_arg.group('var_keyword'):
param_keyword = m_buffer_size_arg.group('var_keyword')
buffer_check = param_checks_template.substitute(
param_to_check=m_buffer_size_arg.group('var_name'),
param_size=m_buffer_size_arg.group('SAL_tag_args').split(',')[0],
access_type=_TAGS_TO_INTERCEPT[m_buffer_size_arg.group('SAL_tag')],
param_keyword=param_keyword)
# Write the function's implementation in the appropriate file.
self._output_impl_file.write(interceptor_template.substitute(
ret_type=return_type,
calling_convention=calling_convention,
function_name=function_name,
function_arguments=function_arguments,
function_param_names=function_param_names,
param_checks_precall=param_checks_precall,
param_checks_postcall=param_checks_postcall,
buffer_check=buffer_check))
# Write the entry into the instrumentation filter file.
self._output_instrumentation_filter_file.write(
instrumentation_filter_entry_template.substitute(
function_name=function_name,
module_name=module_name))
# Add the new interceptor to the DEF file.
self._def_file.write('asan_' + function_name + '\n')
def VisitFunctionsInFiles(self, files, callback):
"""Parse the functions declared in a given list of files and invokes the
callback per encountered function.
Args:
files: The files to parse.
callback: The callback to invoke per encountered function.
output_base: A handle to the output file that will receive the function
definitions.
"""
for filename in files:
with open(filename, 'r') as f:
f_content = f.read()
for m_iter in _FUNCTION_MATCH_RE.finditer(f_content):
callback(m_iter.group('name'), m_iter.group('ret'),
m_iter.group('params'), m_iter.group('conv'),
m_iter.group('module_name'))
_USAGE = """\
%prog [options] [files to process]
Parse a list of files to find the SAL annotated functions.
"""
def ParseOptions(args, parser):
parser.add_option('--verbose',
dest='log_level',
default=logging.INFO,
action='store_const',
const=logging.DEBUG,
help='Enable verbose logging.')
parser.add_option('--def-file', help='The def file that should be '
'augmented. This file won\'t be modified, instead a new '
'one will be created and will be filled with the content '
'of this one followed by the new interceptors.')
parser.add_option('--output-base', help='Base name of the output files to '
'produce (without the extensions).')
parser.add_option('--overwrite', default=False, action='store_true',
help='Overwrite the output files if they already exist.')
return parser.parse_args(args)
def main(args):
parser = optparse.OptionParser(usage=_USAGE)
(opts, input_files) = ParseOptions(args, parser)
logging.basicConfig(level=opts.log_level)
if not opts.output_base:
parser.error('You must specify an output base filename.')
if not opts.def_file:
parser.error('You must specify a DEF file to update.')
# The first argument might be the current script name, remove it and make sure
# that there's at least one input file.
if __file__ in input_files:
input_files.remove(__file__)
if not len(input_files):
parser.error('You must specify at least one input file.')
with ASanSystemInterceptorGenerator(opts.output_base,
opts.def_file,
opts.overwrite) as generator:
generator.VisitFunctionsInFiles(input_files,
generator.GenerateFunctionInterceptor)
if __name__ == '__main__':
sys.exit(main(sys.argv))