blob: d6e2974b942b4991c2fee9f70105b96b053e5704 [file] [log] [blame]
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Common helper module for working with Chrome's processes and windows."""
import logging
import os
import psutil
import re
import win32gui
import win32process
def GetProcessIDAndPathPairs():
"""Returns a list of 2-tuples of (process id, process path).
"""
process_id_and_path_pairs = []
for process in psutil.process_iter():
try:
process_id_and_path_pairs.append((process.pid, process.exe))
except psutil.Error:
# It's normal that some processes are not accessible.
pass
return process_id_and_path_pairs
def GetProcessIDs(process_path):
"""Returns a list of IDs of processes whose path is |process_path|.
Args:
process_path: The path to the process.
Returns:
A list of process IDs.
"""
return [pid for (pid, path) in GetProcessIDAndPathPairs() if
path == process_path]
def WaitForChromeExit(chrome_path):
"""Waits for all |chrome_path| processes to exit.
Args:
chrome_path: The path to the chrome.exe on which to wait.
"""
def GetChromeProcesses(chrome_path):
"""Returns a dict of all |chrome_path| processes indexed by pid."""
chrome_processes = dict()
for process in psutil.process_iter():
try:
if process.exe == chrome_path:
chrome_processes[process.pid] = process
logging.info('Found chrome process %s' % process.exe)
elif process.name == os.path.basename(chrome_path):
raise Exception('Found other chrome process %s' % process.exe)
except psutil.Error:
pass
return chrome_processes
def GetBrowserProcess(chrome_processes):
"""Returns a psutil.Process for the browser process in |chrome_processes|.
"""
# Find the one whose parent isn't a chrome.exe process.
for process in chrome_processes.itervalues():
try:
if process.ppid not in chrome_processes:
return process
except psutil.Error:
pass
return None
chrome_processes = GetChromeProcesses(chrome_path)
while chrome_processes:
# Prefer waiting on the browser process.
process = GetBrowserProcess(chrome_processes)
if not process:
# Pick any process to wait on if no top-level parent was found.
process = next(chrome_processes.itervalues())
if process.is_running():
logging.info(
'Waiting on PID %s for %s %s processes to exit' %
(process.pid, len(chrome_processes), process.exe))
process.wait()
# Check for stragglers and keep waiting until all are gone.
chrome_processes = GetChromeProcesses(chrome_path)
def GetWindowHandles(process_ids):
"""Returns a list of handles of windows owned by processes in |process_ids|.
Args:
process_ids: A list of process IDs.
Returns:
A list of handles of windows owned by processes in |process_ids|.
"""
hwnds = []
def EnumerateWindowCallback(hwnd, _):
_, found_process_id = win32process.GetWindowThreadProcessId(hwnd)
if found_process_id in process_ids and win32gui.IsWindowVisible(hwnd):
hwnds.append(hwnd)
# Enumerate all the top-level windows and call the callback with the hwnd as
# the first parameter.
win32gui.EnumWindows(EnumerateWindowCallback, None)
return hwnds
def WindowExists(process_ids, class_pattern):
"""Returns whether there exists a window with the specified criteria.
This method returns whether there exists a window that is owned by a process
in |process_ids| and has a class name that matches |class_pattern|.
Args:
process_ids: A list of process IDs.
class_pattern: The regular expression pattern of the window class name.
Returns:
A boolean indicating whether such window exists.
"""
for hwnd in GetWindowHandles(process_ids):
if re.match(class_pattern, win32gui.GetClassName(hwnd)):
return True
return False