blob: a03b4f54d357c7a0fb2691dc4331237cc01b8641 [file] [log] [blame]
#!python
# Copyright 2011 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provides an event consumer that tracks image events."""
from etw import EventConsumer, EventHandler
import etw.descriptors.image as image
import etw.descriptors.process as process
class _Module(object):
"""Store the state for a single loaded module."""
def __init__(self, event):
"""Initializes a module object from a module load event."""
self.module_base = event.ImageBase
self.module_size = event.ImageSize
self.file_name = event.FileName
def __repr__(self):
return '0x%08X(0x%08X): %s' % (self.module_base,
self.module_size,
self.file_name)
class _Process(object):
"""Keeps the module load state of a single process."""
def __init__(self):
self._modules = {}
def OnModuleLoaded(self, module):
"""Update module load state on a newly loaded module.
Args:
module: a _Module object for the newly loaded module.
"""
self._modules[module.module_base] = module
def OnModuleUnloaded(self, module_base):
"""Update module load state on a newly unloaded module.
Args:
module_base: the base address of a module that just unloaded.
"""
self._modules.pop(module_base, None)
def IsEmpty(self):
"""Check whether any modules are loaded.
Returns:
True iff no modules currently loaded in this process.
"""
return not self._modules
def FindModuleAt(self, addr):
"""Find the module loaded at a particular address in the process.
Args:
addr: an address within the process.
Returns:
A _Module object for the module at that address, or None if
no such module.
"""
for module in self._modules.itervalues():
if (addr >= module.module_base and
addr < module.module_base + module.module_size):
return module
return None
class ModuleDatabase(EventConsumer):
"""Keeps a database of the modules loaded in a process."""
def __init__(self):
EventConsumer.__init__(self)
self._processes = {}
def GetProcessModules(self, process_id):
"""Get all modules in a process.
Args:
process_id: the id of the process in question.
Returns:
a list of _Module objects, or None if no modules loaded in the process.
"""
if process_id not in self._processes:
return None
return self._processes[process_id].values()
def GetProcessModuleAt(self, process_id, addr):
"""Get the module loaded at a given address in a process.
Args:
process_id: the id of the process in question.
addr: the address we're interested in.
Returns:
A _Module object for the found module, or None if no module
is loaded at that address.
"""
proc = self._processes.get(process_id)
if proc:
return proc.FindModuleAt(addr)
return None
@EventHandler(image.Event.DCStart, image.Event.Load)
def _OnLoad(self, event):
proc_id = event.process_id
proc = self._processes.get(proc_id, None)
if not proc:
proc = _Process()
self._processes[proc_id] = proc
proc.OnModuleLoaded(_Module(event))
@EventHandler(image.Event.UnLoad)
def _OnUnLoad(self, event):
proc = self._processes.get(event.process_id, None)
if not proc:
return
proc.OnModuleUnloaded(event.ImageBase)
if proc.IsEmpty():
del self._processes[event.process_id]
@EventHandler(process.Event.End)
def _OnProcessEnd(self, event):
# Clean up all modules on process termination.
self._processes.pop(event.process_id, None)