blob: 5b7358d1dae7b3c39f3ca919aaf3fa1f8a4d0170 [file] [log] [blame]
# Copyright 2015 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""Windows specific utility functions."""
import ctypes
import logging
import os
import platform
import re
import string
import subprocess
import sys
from utils import tools
import common
import gpu
## Private stuff.
_WIN32_CLIENT_NAMES = {
u'5.0': u'2000',
u'5.1': u'XP',
u'5.2': u'XP',
u'6.0': u'Vista',
u'6.1': u'7',
u'6.2': u'8',
u'6.3': u'8.1',
u'10.0': u'10',
}
_WIN32_SERVER_NAMES = {
u'5.2': u'2003Server',
u'6.0': u'2008Server',
u'6.1': u'2008ServerR2',
u'6.2': u'2012Server',
u'6.3': u'2012ServerR2',
u'10.0': u'2016Server',
}
@tools.cached
def _get_mount_points():
"""Returns the list of 'fixed' drives in format 'X:\\'."""
ctypes.windll.kernel32.GetDriveTypeW.argtypes = (ctypes.c_wchar_p,)
ctypes.windll.kernel32.GetDriveTypeW.restype = ctypes.c_ulong
DRIVE_FIXED = 3
# https://msdn.microsoft.com/library/windows/desktop/aa364939.aspx
return [
u'%s:\\' % letter
for letter in string.lowercase
if ctypes.windll.kernel32.GetDriveTypeW(letter + ':\\') == DRIVE_FIXED
]
def _get_disk_info(mount_point):
"""Returns total and free space on a mount point in Mb."""
total_bytes = ctypes.c_ulonglong(0)
free_bytes = ctypes.c_ulonglong(0)
ctypes.windll.kernel32.GetDiskFreeSpaceExW(
ctypes.c_wchar_p(mount_point), None, ctypes.pointer(total_bytes),
ctypes.pointer(free_bytes))
return {
u'free_mb': round(free_bytes.value / 1024. / 1024., 1),
u'size_mb': round(total_bytes.value / 1024. / 1024., 1),
}
@tools.cached
def _get_win32com():
"""Returns an uninitialized WMI client."""
try:
import pythoncom
from win32com import client # pylint: disable=F0401
return client, pythoncom
except ImportError:
# win32com is included in pywin32, which is an optional package that is
# installed by Swarming devs. If you find yourself needing it to run without
# pywin32, for example in cygwin, please send us a CL with the
# implementation that doesn't use pywin32.
return None, None
@tools.cached
def _get_wmi_wbem():
"""Returns a WMI client connected to localhost ready to do queries."""
client, _ = _get_win32com()
if not client:
return None
wmi_service = client.Dispatch('WbemScripting.SWbemLocator')
return wmi_service.ConnectServer('.', 'root\\cimv2')
# Regexp for _get_os_numbers()
_CMD_RE = r'\[version (\d+\.\d+)\.(\d+(?:\.\d+|))\]'
@tools.cached
def _get_os_numbers():
"""Returns the normalized OS version and build numbers as strings.
Actively work around AppCompat version lie shim.
Returns:
- 5.1, 6.1, etc. There is no way to distinguish between Windows 7
and Windows Server 2008R2 since they both report 6.1.
- build number, like '10240'. Mostly relevant on Windows 10.
"""
# Windows is lying to us until python adds to its manifest:
# <supportedOS Id="{8e0f7a12-bfb3-4fe8-b9a5-48fd50a15a9a}"/>
# and it doesn't.
# So ask nicely to cmd.exe instead, which will always happily report the right
# version. Here's some sample output:
# - XP: Microsoft Windows XP [Version 5.1.2600]
# - Win10: Microsoft Windows [Version 10.0.10240]
# - Win7 or Win2K8R2: Microsoft Windows [Version 6.1.7601]
# - Win1709: Microsoft Windows [Version 10.0.16299.19]
#
# Some locale (like fr_CA) use a lower case 'version'.
out = subprocess.check_output(['cmd.exe', '/c', 'ver']).strip()
match = re.search(_CMD_RE, out, re.IGNORECASE)
if not match:
# Failed to start cmd.exe, that's really bad. Return a dummy value to not
# crash.
logging.error('Failed to run cmd.exe /c ver:\n%s', out)
return '0.0', '0'
return match.group(1), match.group(2)
def _is_topmost_window(hwnd):
"""Returns True if |hwnd| is a topmost window."""
ctypes.windll.user32.GetWindowLongW.restype = ctypes.c_long # LONG
ctypes.windll.user32.GetWindowLongW.argtypes = [
ctypes.c_void_p, # HWND
ctypes.c_int
]
# -20 is GWL_EXSTYLE
ex_styles = ctypes.windll.user32.GetWindowLongW(hwnd, -20)
# 8 is WS_EX_TOPMOST
return bool(ex_styles & 8)
def _get_window_class(hwnd):
"""Returns the class name of |hwnd|."""
ctypes.windll.user32.GetClassNameW.restype = ctypes.c_int
ctypes.windll.user32.GetClassNameW.argtypes = [
ctypes.c_void_p, # HWND
ctypes.c_wchar_p,
ctypes.c_int
]
name = ctypes.create_unicode_buffer(257)
name_len = ctypes.windll.user32.GetClassNameW(hwnd, name, len(name))
if name_len <= 0 or name_len >= len(name):
raise ctypes.WinError(descr='GetClassNameW failed; %s' %
ctypes.FormatError())
return name.value
## Public API.
def from_cygwin_path(path):
"""Converts an absolute cygwin path to a standard Windows path."""
if not path.startswith('/cygdrive/'):
logging.error('%s is not a cygwin path', path)
return None
# Remove the cygwin path identifier.
path = path[len('/cygdrive/'):]
# Add : after the drive letter.
path = path[:1] + ':' + path[1:]
return path.replace('/', '\\')
def to_cygwin_path(path):
"""Converts an absolute standard Windows path to a cygwin path."""
if len(path) < 2 or path[1] != ':':
# TODO(maruel): Accept \\?\ and \??\ if necessary.
logging.error('%s is not a win32 path', path)
return None
return '/cygdrive/%s/%s' % (path[0].lower(), path[3:].replace('\\', '/'))
@tools.cached
def get_os_version_number():
"""Returns the normalized OS version number as a string.
Returns:
- '5.1', '6.1', '10.0', etc. There is no way to distinguish between Windows
7 and Windows Server 2008R2 since they both report 6.1.
"""
return _get_os_numbers()[0]
@tools.cached
def get_os_version_names():
"""Returns the marketing name of the OS, without and with the service pack.
On Windows 10, use the build number since there will be no service pack.
"""
# Python keeps a local map in platform.py and it is updated at newer python
# release. Since our python release is a bit old, do not rely on it.
is_server = sys.getwindowsversion().product_type == 3
lookup = _WIN32_SERVER_NAMES if is_server else _WIN32_CLIENT_NAMES
version_number, build_number = _get_os_numbers()
marketing_name = lookup.get(version_number, version_number)
if version_number == u'10.0':
# Windows 10 doesn't have service packs, the build number now is the
# reference number.
return marketing_name, u'%s-%s' % (marketing_name, build_number)
service_pack = platform.win32_ver()[2] or u'SP0'
return marketing_name, u'%s-%s' % (marketing_name, service_pack)
def get_startup_dir():
# Do not use environment variables since it wouldn't work reliably on cygwin.
# TODO(maruel): Stop hardcoding the values and use the proper function
# described below. Postponed to a later CL since I'll have to spend quality
# time on Windows to ensure it works well.
# https://msdn.microsoft.com/library/windows/desktop/bb762494.aspx
# CSIDL_STARTUP = 7
# https://msdn.microsoft.com/library/windows/desktop/bb762180.aspx
# shell.SHGetFolderLocation(NULL, CSIDL_STARTUP, NULL, NULL, string)
if get_os_version_number() == u'5.1':
startup = 'Start Menu\\Programs\\Startup'
else:
# Vista+
startup = (
'AppData\\Roaming\\Microsoft\\Windows\\Start Menu\\Programs\\Startup')
# On cygwin 1.5, which is still used on some bots, '~' points inside
# c:\\cygwin\\home so use USERPROFILE.
return '%s\\%s\\' % (
os.environ.get('USERPROFILE', 'DUMMY, ONLY USED IN TESTS'), startup)
def get_disks_info():
"""Returns disk infos on all mount point in Mb."""
return {p: _get_disk_info(p) for p in _get_mount_points()}
@tools.cached
def get_audio():
"""Returns audio device as listed by WMI."""
wbem = _get_wmi_wbem()
if not wbem:
return None
# https://msdn.microsoft.com/library/aa394463.aspx
return [
device.Name
for device in wbem.ExecQuery('SELECT * FROM Win32_SoundDevice')
if device.Status == 'OK'
]
@tools.cached
def get_cpuinfo():
# Ironically, the data returned by WMI is mostly worthless.
# Another option is IsProcessorFeaturePresent().
# https://msdn.microsoft.com/en-us/library/windows/desktop/ms724482.aspx
import _winreg
k = _winreg.OpenKey(
_winreg.HKEY_LOCAL_MACHINE,
'HARDWARE\\DESCRIPTION\\System\\CentralProcessor\\0')
try:
identifier, _ = _winreg.QueryValueEx(k, 'Identifier')
match = re.match(
ur'^.+ Family (\d+) Model (\d+) Stepping (\d+)$', identifier)
name, _ = _winreg.QueryValueEx(k, 'ProcessorNameString')
vendor, _ = _winreg.QueryValueEx(k, 'VendorIdentifier')
return {
u'model': [
int(match.group(1)), int(match.group(2)), int(match.group(3))
],
u'name': name,
u'vendor': vendor,
}
finally:
k.Close()
def get_gpu():
"""Returns video device as listed by WMI.
Not cached as the GPU driver may change underneat.
"""
wbem = _get_wmi_wbem()
if not wbem:
return None, None
_, pythoncom = _get_win32com()
dimensions = set()
state = set()
# https://msdn.microsoft.com/library/aa394512.aspx
try:
for device in wbem.ExecQuery('SELECT * FROM Win32_VideoController'):
# The string looks like:
# PCI\VEN_15AD&DEV_0405&SUBSYS_040515AD&REV_00\3&2B8E0B4B&0&78
pnp_string = device.PNPDeviceID
ven_id = u'UNKNOWN'
dev_id = u'UNKNOWN'
match = re.search(r'VEN_([0-9A-F]{4})', pnp_string)
if match:
ven_id = match.group(1).lower()
match = re.search(r'DEV_([0-9A-F]{4})', pnp_string)
if match:
dev_id = match.group(1).lower()
dev_name = device.VideoProcessor or u''
version = device.DriverVersion or u''
ven_name, dev_name = gpu.ids_to_names(ven_id, u'', dev_id, dev_name)
dimensions.add(unicode(ven_id))
dimensions.add(u'%s:%s' % (ven_id, dev_id))
if version:
dimensions.add(u'%s:%s-%s' % (ven_id, dev_id, version))
state.add(u'%s %s %s' % (ven_name, dev_name, version))
else:
state.add(u'%s %s' % (ven_name, dev_name))
except pythoncom.com_error as e:
# This generally happens when this is called as the host is shutting down.
logging.error('get_gpu(): %s', e)
return sorted(dimensions), sorted(state)
@tools.cached
def get_integrity_level():
"""Returns the integrity level of the current process as a string.
TODO(maruel): It'd be nice to make it work on cygwin. The problem is that
ctypes.windll is unaccessible and it is not known to the author how to use
stdcall convention through ctypes.cdll.
"""
if get_os_version_number() == u'5.1':
# Integrity level is Vista+.
return None
mapping = {
0x0000: u'untrusted',
0x1000: u'low',
0x2000: u'medium',
0x2100: u'medium high',
0x3000: u'high',
0x4000: u'system',
0x5000: u'protected process',
}
# This was specifically written this way to work on cygwin except for the
# windll part. If someone can come up with a way to do stdcall on cygwin, that
# would be appreciated.
BOOL = ctypes.c_long
DWORD = ctypes.c_ulong
HANDLE = ctypes.c_void_p
class SID_AND_ATTRIBUTES(ctypes.Structure):
_fields_ = [
('Sid', ctypes.c_void_p),
('Attributes', DWORD),
]
class TOKEN_MANDATORY_LABEL(ctypes.Structure):
_fields_ = [
('Label', SID_AND_ATTRIBUTES),
]
TOKEN_READ = DWORD(0x20008)
# Use the same casing as in the C declaration:
# https://msdn.microsoft.com/library/windows/desktop/aa379626.aspx
TokenIntegrityLevel = ctypes.c_int(25)
ERROR_INSUFFICIENT_BUFFER = 122
# All the functions used locally. First open the process' token, then query
# the SID to know its integrity level.
ctypes.windll.kernel32.GetLastError.argtypes = ()
ctypes.windll.kernel32.GetLastError.restype = DWORD
ctypes.windll.kernel32.GetCurrentProcess.argtypes = ()
ctypes.windll.kernel32.GetCurrentProcess.restype = ctypes.c_void_p
ctypes.windll.advapi32.OpenProcessToken.argtypes = (
HANDLE, DWORD, ctypes.POINTER(HANDLE))
ctypes.windll.advapi32.OpenProcessToken.restype = BOOL
ctypes.windll.advapi32.GetTokenInformation.argtypes = (
HANDLE, ctypes.c_long, ctypes.c_void_p, DWORD, ctypes.POINTER(DWORD))
ctypes.windll.advapi32.GetTokenInformation.restype = BOOL
ctypes.windll.advapi32.GetSidSubAuthorityCount.argtypes = [ctypes.c_void_p]
ctypes.windll.advapi32.GetSidSubAuthorityCount.restype = ctypes.POINTER(
ctypes.c_ubyte)
ctypes.windll.advapi32.GetSidSubAuthority.argtypes = (ctypes.c_void_p, DWORD)
ctypes.windll.advapi32.GetSidSubAuthority.restype = ctypes.POINTER(DWORD)
# First open the current process token, query it, then close everything.
token = ctypes.c_void_p()
proc_handle = ctypes.windll.kernel32.GetCurrentProcess()
if not ctypes.windll.advapi32.OpenProcessToken(
proc_handle,
TOKEN_READ,
ctypes.byref(token)):
logging.error('Failed to get process\' token')
return None
if token.value == 0:
logging.error('Got a NULL token')
return None
try:
# The size of the structure is dynamic because the TOKEN_MANDATORY_LABEL
# used will have the SID appened right after the TOKEN_MANDATORY_LABEL in
# the heap allocated memory block, with .Label.Sid pointing to it.
info_size = DWORD()
if ctypes.windll.advapi32.GetTokenInformation(
token,
TokenIntegrityLevel,
ctypes.c_void_p(),
info_size,
ctypes.byref(info_size)):
logging.error('GetTokenInformation() failed expectation')
return None
if info_size.value == 0:
logging.error('GetTokenInformation() returned size 0')
return None
if ctypes.windll.kernel32.GetLastError() != ERROR_INSUFFICIENT_BUFFER:
logging.error(
'GetTokenInformation(): Unknown error: %d',
ctypes.windll.kernel32.GetLastError())
return None
token_info = TOKEN_MANDATORY_LABEL()
ctypes.resize(token_info, info_size.value)
if not ctypes.windll.advapi32.GetTokenInformation(
token,
TokenIntegrityLevel,
ctypes.byref(token_info),
info_size,
ctypes.byref(info_size)):
logging.error(
'GetTokenInformation(): Unknown error with buffer size %d: %d',
info_size.value,
ctypes.windll.kernel32.GetLastError())
return None
p_sid_size = ctypes.windll.advapi32.GetSidSubAuthorityCount(
token_info.Label.Sid)
res = ctypes.windll.advapi32.GetSidSubAuthority(
token_info.Label.Sid, p_sid_size.contents.value - 1)
value = res.contents.value
return mapping.get(value) or u'0x%04x' % value
finally:
ctypes.windll.kernel32.CloseHandle(token)
@tools.cached
def get_physical_ram():
"""Returns the amount of installed RAM in Mb, rounded to the nearest number.
"""
# https://msdn.microsoft.com/library/windows/desktop/aa366589.aspx
class MemoryStatusEx(ctypes.Structure):
_fields_ = [
('dwLength', ctypes.c_ulong),
('dwMemoryLoad', ctypes.c_ulong),
('dwTotalPhys', ctypes.c_ulonglong),
('dwAvailPhys', ctypes.c_ulonglong),
('dwTotalPageFile', ctypes.c_ulonglong),
('dwAvailPageFile', ctypes.c_ulonglong),
('dwTotalVirtual', ctypes.c_ulonglong),
('dwAvailVirtual', ctypes.c_ulonglong),
('dwAvailExtendedVirtual', ctypes.c_ulonglong),
]
stat = MemoryStatusEx()
stat.dwLength = ctypes.sizeof(MemoryStatusEx) # pylint: disable=W0201
ctypes.windll.kernel32.GlobalMemoryStatusEx(ctypes.byref(stat))
return int(round(stat.dwTotalPhys / 1024. / 1024.))
def get_uptime():
"""Return uptime for Windows 7 and later.
Excludes sleep time.
"""
val = ctypes.c_ulonglong(0)
if ctypes.windll.kernel32.QueryUnbiasedInterruptTime(ctypes.byref(val)) != 0:
return val.value / 10000000.
return 0.
def get_reboot_required():
"""Returns True if the system should be rebooted to apply updates.
This is not guaranteed to notice all conditions that could require reboot.
"""
# Based on https://stackoverflow.com/a/45717438
k = None
import _winreg
try:
k = _winreg.OpenKey(
_winreg.HKEY_LOCAL_MACHINE,
'SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\WindowsUpdate\\'
'Auto Update\\RebootRequired')
_, num_values, _ = _winreg.QueryInfoKey(k)
return num_values > 0
except WindowsError: # pylint: disable=undefined-variable
# This error very likely means the RebootRequired key does not exist,
# meaning reboot is not required.
return False
finally:
if k:
k.Close()
def list_top_windows():
"""Returns a list of the class names of topmost windows.
Windows owned by the shell are ignored.
"""
# The function prototype of EnumWindowsProc.
window_enum_proc_prototype = ctypes.WINFUNCTYPE(
ctypes.c_long, # BOOL
ctypes.c_void_p, # HWND
ctypes.c_void_p) # LPARAM
# Set up various user32 functions that are needed.
ctypes.windll.user32.EnumWindows.restype = ctypes.c_long # BOOL
ctypes.windll.user32.EnumWindows.argtypes = [
window_enum_proc_prototype,
ctypes.py_object
]
ctypes.windll.user32.IsWindowVisible.restype = ctypes.c_long # BOOL
ctypes.windll.user32.IsWindowVisible.argtypes = [ctypes.c_void_p] # HWND
ctypes.windll.user32.IsIconic.restype = ctypes.c_long # BOOL
ctypes.windll.user32.IsIconic.argtypes = [ctypes.c_void_p] # HWND
out = []
def on_window(hwnd, lparam): # pylint: disable=unused-argument
"""Evaluates |hwnd| to determine whether or not it is a topmost window.
In case |hwnd| is a topmost window, its class name is added to the
collection of topmost window class names to return.
"""
# Dig deeper into visible, non-iconified, topmost windows.
if (ctypes.windll.user32.IsWindowVisible(hwnd) and
not ctypes.windll.user32.IsIconic(hwnd) and
_is_topmost_window(hwnd)):
# Fetch the class name and make sure it's not owned by the Windows shell.
class_name = _get_window_class(hwnd)
if (class_name and
class_name not in ['Button', 'Shell_TrayWnd',
'Shell_SecondaryTrayWnd']):
out.append(class_name)
return 1
ctypes.windll.user32.EnumWindows(window_enum_proc_prototype(on_window), None)
return out
@tools.cached
def get_computer_system_info():
"""Return a named tuple, which lists the following params from the WMI class
Win32_ComputerSystemProduct:
name, vendor, version, uuid
"""
wbem = _get_wmi_wbem()
if not wbem:
return None
info = None
# https://msdn.microsoft.com/en-us/library/aa394105
for device in wbem.ExecQuery('SELECT * FROM Win32_ComputerSystemProduct'):
info = common.ComputerSystemInfo(
name=device.Name,
vendor=device.Vendor,
version=device.Version,
serial=device.IdentifyingNumber)
return info