| #!/usr/bin/env python3 |
| |
| # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Contracts tests. These tests mainly check API sanity in terms of |
| returned types and APIs availability. |
| Some of these are duplicates of tests test_system.py and test_process.py |
| """ |
| |
| import errno |
| import multiprocessing |
| import os |
| import signal |
| import stat |
| import sys |
| import time |
| import traceback |
| |
| from psutil import AIX |
| from psutil import BSD |
| from psutil import FREEBSD |
| from psutil import LINUX |
| from psutil import MACOS |
| from psutil import NETBSD |
| from psutil import OPENBSD |
| from psutil import OSX |
| from psutil import POSIX |
| from psutil import SUNOS |
| from psutil import WINDOWS |
| from psutil._compat import FileNotFoundError |
| from psutil._compat import long |
| from psutil._compat import range |
| from psutil.tests import create_sockets |
| from psutil.tests import enum |
| from psutil.tests import GITHUB_WHEELS |
| from psutil.tests import HAS_CPU_FREQ |
| from psutil.tests import HAS_NET_IO_COUNTERS |
| from psutil.tests import HAS_SENSORS_FANS |
| from psutil.tests import HAS_SENSORS_TEMPERATURES |
| from psutil.tests import is_namedtuple |
| from psutil.tests import process_namespace |
| from psutil.tests import PsutilTestCase |
| from psutil.tests import PYPY |
| from psutil.tests import serialrun |
| from psutil.tests import SKIP_SYSCONS |
| from psutil.tests import unittest |
| from psutil.tests import VALID_PROC_STATUSES |
| import psutil |
| |
| |
| # =================================================================== |
| # --- APIs availability |
| # =================================================================== |
| |
| # Make sure code reflects what doc promises in terms of APIs |
| # availability. |
| |
| class TestAvailConstantsAPIs(PsutilTestCase): |
| |
| def test_PROCFS_PATH(self): |
| self.assertEqual(hasattr(psutil, "PROCFS_PATH"), |
| LINUX or SUNOS or AIX) |
| |
| def test_win_priority(self): |
| ae = self.assertEqual |
| ae(hasattr(psutil, "ABOVE_NORMAL_PRIORITY_CLASS"), WINDOWS) |
| ae(hasattr(psutil, "BELOW_NORMAL_PRIORITY_CLASS"), WINDOWS) |
| ae(hasattr(psutil, "HIGH_PRIORITY_CLASS"), WINDOWS) |
| ae(hasattr(psutil, "IDLE_PRIORITY_CLASS"), WINDOWS) |
| ae(hasattr(psutil, "NORMAL_PRIORITY_CLASS"), WINDOWS) |
| ae(hasattr(psutil, "REALTIME_PRIORITY_CLASS"), WINDOWS) |
| |
| def test_linux_ioprio_linux(self): |
| ae = self.assertEqual |
| ae(hasattr(psutil, "IOPRIO_CLASS_NONE"), LINUX) |
| ae(hasattr(psutil, "IOPRIO_CLASS_RT"), LINUX) |
| ae(hasattr(psutil, "IOPRIO_CLASS_BE"), LINUX) |
| ae(hasattr(psutil, "IOPRIO_CLASS_IDLE"), LINUX) |
| |
| def test_linux_ioprio_windows(self): |
| ae = self.assertEqual |
| ae(hasattr(psutil, "IOPRIO_HIGH"), WINDOWS) |
| ae(hasattr(psutil, "IOPRIO_NORMAL"), WINDOWS) |
| ae(hasattr(psutil, "IOPRIO_LOW"), WINDOWS) |
| ae(hasattr(psutil, "IOPRIO_VERYLOW"), WINDOWS) |
| |
| @unittest.skipIf(GITHUB_WHEELS, "not exposed via GITHUB_WHEELS") |
| def test_linux_rlimit(self): |
| ae = self.assertEqual |
| ae(hasattr(psutil, "RLIM_INFINITY"), LINUX) |
| ae(hasattr(psutil, "RLIMIT_AS"), LINUX) |
| ae(hasattr(psutil, "RLIMIT_CORE"), LINUX) |
| ae(hasattr(psutil, "RLIMIT_CPU"), LINUX) |
| ae(hasattr(psutil, "RLIMIT_DATA"), LINUX) |
| ae(hasattr(psutil, "RLIMIT_FSIZE"), LINUX) |
| ae(hasattr(psutil, "RLIMIT_LOCKS"), LINUX) |
| ae(hasattr(psutil, "RLIMIT_MEMLOCK"), LINUX) |
| ae(hasattr(psutil, "RLIMIT_NOFILE"), LINUX) |
| ae(hasattr(psutil, "RLIMIT_NPROC"), LINUX) |
| ae(hasattr(psutil, "RLIMIT_RSS"), LINUX) |
| ae(hasattr(psutil, "RLIMIT_STACK"), LINUX) |
| |
| ae(hasattr(psutil, "RLIMIT_MSGQUEUE"), LINUX) # requires Linux 2.6.8 |
| ae(hasattr(psutil, "RLIMIT_NICE"), LINUX) # requires Linux 2.6.12 |
| ae(hasattr(psutil, "RLIMIT_RTPRIO"), LINUX) # requires Linux 2.6.12 |
| ae(hasattr(psutil, "RLIMIT_RTTIME"), LINUX) # requires Linux 2.6.25 |
| ae(hasattr(psutil, "RLIMIT_SIGPENDING"), LINUX) # requires Linux 2.6.8 |
| |
| |
| class TestAvailSystemAPIs(PsutilTestCase): |
| |
| def test_win_service_iter(self): |
| self.assertEqual(hasattr(psutil, "win_service_iter"), WINDOWS) |
| |
| def test_win_service_get(self): |
| self.assertEqual(hasattr(psutil, "win_service_get"), WINDOWS) |
| |
| def test_cpu_freq(self): |
| self.assertEqual(hasattr(psutil, "cpu_freq"), |
| LINUX or MACOS or WINDOWS or FREEBSD) |
| |
| def test_sensors_temperatures(self): |
| self.assertEqual( |
| hasattr(psutil, "sensors_temperatures"), LINUX or FREEBSD) |
| |
| def test_sensors_fans(self): |
| self.assertEqual(hasattr(psutil, "sensors_fans"), LINUX) |
| |
| def test_battery(self): |
| self.assertEqual(hasattr(psutil, "sensors_battery"), |
| LINUX or WINDOWS or FREEBSD or MACOS) |
| |
| |
| class TestAvailProcessAPIs(PsutilTestCase): |
| |
| def test_environ(self): |
| self.assertEqual(hasattr(psutil.Process, "environ"), |
| LINUX or MACOS or WINDOWS or AIX or SUNOS or |
| FREEBSD or OPENBSD or NETBSD) |
| |
| def test_uids(self): |
| self.assertEqual(hasattr(psutil.Process, "uids"), POSIX) |
| |
| def test_gids(self): |
| self.assertEqual(hasattr(psutil.Process, "uids"), POSIX) |
| |
| def test_terminal(self): |
| self.assertEqual(hasattr(psutil.Process, "terminal"), POSIX) |
| |
| def test_ionice(self): |
| self.assertEqual(hasattr(psutil.Process, "ionice"), LINUX or WINDOWS) |
| |
| @unittest.skipIf(GITHUB_WHEELS, "not exposed via GITHUB_WHEELS") |
| def test_rlimit(self): |
| # requires Linux 2.6.36 |
| self.assertEqual(hasattr(psutil.Process, "rlimit"), LINUX) |
| |
| def test_io_counters(self): |
| hasit = hasattr(psutil.Process, "io_counters") |
| self.assertEqual(hasit, False if MACOS or SUNOS else True) |
| |
| def test_num_fds(self): |
| self.assertEqual(hasattr(psutil.Process, "num_fds"), POSIX) |
| |
| def test_num_handles(self): |
| self.assertEqual(hasattr(psutil.Process, "num_handles"), WINDOWS) |
| |
| def test_cpu_affinity(self): |
| self.assertEqual(hasattr(psutil.Process, "cpu_affinity"), |
| LINUX or WINDOWS or FREEBSD) |
| |
| def test_cpu_num(self): |
| self.assertEqual(hasattr(psutil.Process, "cpu_num"), |
| LINUX or FREEBSD or SUNOS) |
| |
| def test_memory_maps(self): |
| hasit = hasattr(psutil.Process, "memory_maps") |
| self.assertEqual( |
| hasit, False if OPENBSD or NETBSD or AIX or MACOS else True) |
| |
| |
| # =================================================================== |
| # --- API types |
| # =================================================================== |
| |
| |
| class TestSystemAPITypes(PsutilTestCase): |
| """Check the return types of system related APIs. |
| Mainly we want to test we never return unicode on Python 2, see: |
| https://github.com/giampaolo/psutil/issues/1039 |
| """ |
| |
| @classmethod |
| def setUpClass(cls): |
| cls.proc = psutil.Process() |
| |
| def assert_ntuple_of_nums(self, nt, type_=float, gezero=True): |
| assert is_namedtuple(nt) |
| for n in nt: |
| self.assertIsInstance(n, type_) |
| if gezero: |
| self.assertGreaterEqual(n, 0) |
| |
| def test_cpu_times(self): |
| self.assert_ntuple_of_nums(psutil.cpu_times()) |
| for nt in psutil.cpu_times(percpu=True): |
| self.assert_ntuple_of_nums(nt) |
| |
| def test_cpu_percent(self): |
| self.assertIsInstance(psutil.cpu_percent(interval=None), float) |
| self.assertIsInstance(psutil.cpu_percent(interval=0.00001), float) |
| |
| def test_cpu_times_percent(self): |
| self.assert_ntuple_of_nums(psutil.cpu_times_percent(interval=None)) |
| self.assert_ntuple_of_nums(psutil.cpu_times_percent(interval=0.0001)) |
| |
| def test_cpu_count(self): |
| self.assertIsInstance(psutil.cpu_count(), int) |
| |
| @unittest.skipIf(not HAS_CPU_FREQ, "not supported") |
| def test_cpu_freq(self): |
| if psutil.cpu_freq() is None: |
| raise self.skipTest("cpu_freq() returns None") |
| self.assert_ntuple_of_nums(psutil.cpu_freq(), type_=(float, int, long)) |
| |
| def test_disk_io_counters(self): |
| # Duplicate of test_system.py. Keep it anyway. |
| for k, v in psutil.disk_io_counters(perdisk=True).items(): |
| self.assertIsInstance(k, str) |
| self.assert_ntuple_of_nums(v, type_=(int, long)) |
| |
| def test_disk_partitions(self): |
| # Duplicate of test_system.py. Keep it anyway. |
| for disk in psutil.disk_partitions(): |
| self.assertIsInstance(disk.device, str) |
| self.assertIsInstance(disk.mountpoint, str) |
| self.assertIsInstance(disk.fstype, str) |
| self.assertIsInstance(disk.opts, str) |
| |
| @unittest.skipIf(SKIP_SYSCONS, "requires root") |
| def test_net_connections(self): |
| with create_sockets(): |
| ret = psutil.net_connections('all') |
| self.assertEqual(len(ret), len(set(ret))) |
| for conn in ret: |
| assert is_namedtuple(conn) |
| |
| def test_net_if_addrs(self): |
| # Duplicate of test_system.py. Keep it anyway. |
| for ifname, addrs in psutil.net_if_addrs().items(): |
| self.assertIsInstance(ifname, str) |
| for addr in addrs: |
| if enum is not None and not PYPY: |
| self.assertIsInstance(addr.family, enum.IntEnum) |
| else: |
| self.assertIsInstance(addr.family, int) |
| self.assertIsInstance(addr.address, str) |
| self.assertIsInstance(addr.netmask, (str, type(None))) |
| self.assertIsInstance(addr.broadcast, (str, type(None))) |
| |
| def test_net_if_stats(self): |
| # Duplicate of test_system.py. Keep it anyway. |
| for ifname, info in psutil.net_if_stats().items(): |
| self.assertIsInstance(ifname, str) |
| self.assertIsInstance(info.isup, bool) |
| if enum is not None: |
| self.assertIsInstance(info.duplex, enum.IntEnum) |
| else: |
| self.assertIsInstance(info.duplex, int) |
| self.assertIsInstance(info.speed, int) |
| self.assertIsInstance(info.mtu, int) |
| |
| @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported') |
| def test_net_io_counters(self): |
| # Duplicate of test_system.py. Keep it anyway. |
| for ifname, _ in psutil.net_io_counters(pernic=True).items(): |
| self.assertIsInstance(ifname, str) |
| |
| @unittest.skipIf(not HAS_SENSORS_FANS, "not supported") |
| def test_sensors_fans(self): |
| # Duplicate of test_system.py. Keep it anyway. |
| for name, units in psutil.sensors_fans().items(): |
| self.assertIsInstance(name, str) |
| for unit in units: |
| self.assertIsInstance(unit.label, str) |
| self.assertIsInstance(unit.current, (float, int, type(None))) |
| |
| @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported") |
| def test_sensors_temperatures(self): |
| # Duplicate of test_system.py. Keep it anyway. |
| for name, units in psutil.sensors_temperatures().items(): |
| self.assertIsInstance(name, str) |
| for unit in units: |
| self.assertIsInstance(unit.label, str) |
| self.assertIsInstance(unit.current, (float, int, type(None))) |
| self.assertIsInstance(unit.high, (float, int, type(None))) |
| self.assertIsInstance(unit.critical, (float, int, type(None))) |
| |
| def test_boot_time(self): |
| # Duplicate of test_system.py. Keep it anyway. |
| self.assertIsInstance(psutil.boot_time(), float) |
| |
| def test_users(self): |
| # Duplicate of test_system.py. Keep it anyway. |
| for user in psutil.users(): |
| self.assertIsInstance(user.name, str) |
| self.assertIsInstance(user.terminal, (str, type(None))) |
| self.assertIsInstance(user.host, (str, type(None))) |
| self.assertIsInstance(user.pid, (int, type(None))) |
| |
| |
| class TestProcessWaitType(PsutilTestCase): |
| |
| @unittest.skipIf(not POSIX, "not POSIX") |
| def test_negative_signal(self): |
| p = psutil.Process(self.spawn_testproc().pid) |
| p.terminate() |
| code = p.wait() |
| self.assertEqual(code, -signal.SIGTERM) |
| if enum is not None: |
| self.assertIsInstance(code, enum.IntEnum) |
| else: |
| self.assertIsInstance(code, int) |
| |
| |
| # =================================================================== |
| # --- Featch all processes test |
| # =================================================================== |
| |
| |
| def proc_info(pid): |
| tcase = PsutilTestCase() |
| |
| def check_exception(exc, proc, name, ppid): |
| tcase.assertEqual(exc.pid, pid) |
| tcase.assertEqual(exc.name, name) |
| if isinstance(exc, psutil.ZombieProcess): |
| if exc.ppid is not None: |
| tcase.assertGreaterEqual(exc.ppid, 0) |
| tcase.assertEqual(exc.ppid, ppid) |
| elif isinstance(exc, psutil.NoSuchProcess): |
| tcase.assertProcessGone(proc) |
| str(exc) |
| assert exc.msg |
| |
| def do_wait(): |
| if pid != 0: |
| try: |
| proc.wait(0) |
| except psutil.Error as exc: |
| check_exception(exc, proc, name, ppid) |
| |
| try: |
| proc = psutil.Process(pid) |
| d = proc.as_dict(['ppid', 'name']) |
| except psutil.NoSuchProcess: |
| return {} |
| |
| name, ppid = d['name'], d['ppid'] |
| info = {'pid': proc.pid} |
| ns = process_namespace(proc) |
| with proc.oneshot(): |
| for fun, fun_name in ns.iter(ns.getters, clear_cache=False): |
| try: |
| info[fun_name] = fun() |
| except psutil.Error as exc: |
| check_exception(exc, proc, name, ppid) |
| continue |
| do_wait() |
| return info |
| |
| |
| @serialrun |
| class TestFetchAllProcesses(PsutilTestCase): |
| """Test which iterates over all running processes and performs |
| some sanity checks against Process API's returned values. |
| Uses a process pool to get info about all processes. |
| """ |
| |
| def setUp(self): |
| self.pool = multiprocessing.Pool() |
| |
| def tearDown(self): |
| self.pool.terminate() |
| self.pool.join() |
| |
| def iter_proc_info(self): |
| # Fixes "can't pickle <function proc_info>: it's not the |
| # same object as test_contracts.proc_info". |
| from psutil.tests.test_contracts import proc_info |
| return self.pool.imap_unordered(proc_info, psutil.pids()) |
| |
| def test_all(self): |
| failures = [] |
| for info in self.iter_proc_info(): |
| for name, value in info.items(): |
| meth = getattr(self, name) |
| try: |
| meth(value, info) |
| except AssertionError: |
| s = '\n' + '=' * 70 + '\n' |
| s += "FAIL: test_%s pid=%s, ret=%s\n" % ( |
| name, info['pid'], repr(value)) |
| s += '-' * 70 |
| s += "\n%s" % traceback.format_exc() |
| s = "\n".join((" " * 4) + i for i in s.splitlines()) |
| s += '\n' |
| failures.append(s) |
| else: |
| if value not in (0, 0.0, [], None, '', {}): |
| assert value, value |
| if failures: |
| raise self.fail(''.join(failures)) |
| |
| def cmdline(self, ret, info): |
| self.assertIsInstance(ret, list) |
| for part in ret: |
| self.assertIsInstance(part, str) |
| |
| def exe(self, ret, info): |
| self.assertIsInstance(ret, (str, type(None))) |
| if not ret: |
| self.assertEqual(ret, '') |
| else: |
| if WINDOWS and not ret.endswith('.exe'): |
| return # May be "Registry", "MemCompression", ... |
| assert os.path.isabs(ret), ret |
| # Note: os.stat() may return False even if the file is there |
| # hence we skip the test, see: |
| # http://stackoverflow.com/questions/3112546/os-path-exists-lies |
| if POSIX and os.path.isfile(ret): |
| if hasattr(os, 'access') and hasattr(os, "X_OK"): |
| # XXX may fail on MACOS |
| assert os.access(ret, os.X_OK) |
| |
| def pid(self, ret, info): |
| self.assertIsInstance(ret, int) |
| self.assertGreaterEqual(ret, 0) |
| |
| def ppid(self, ret, info): |
| self.assertIsInstance(ret, (int, long)) |
| self.assertGreaterEqual(ret, 0) |
| |
| def name(self, ret, info): |
| self.assertIsInstance(ret, str) |
| # on AIX, "<exiting>" processes don't have names |
| if not AIX: |
| assert ret |
| |
| def create_time(self, ret, info): |
| self.assertIsInstance(ret, float) |
| try: |
| self.assertGreaterEqual(ret, 0) |
| except AssertionError: |
| # XXX |
| if OPENBSD and info['status'] == psutil.STATUS_ZOMBIE: |
| pass |
| else: |
| raise |
| # this can't be taken for granted on all platforms |
| # self.assertGreaterEqual(ret, psutil.boot_time()) |
| # make sure returned value can be pretty printed |
| # with strftime |
| time.strftime("%Y %m %d %H:%M:%S", time.localtime(ret)) |
| |
| def uids(self, ret, info): |
| assert is_namedtuple(ret) |
| for uid in ret: |
| self.assertIsInstance(uid, int) |
| self.assertGreaterEqual(uid, 0) |
| |
| def gids(self, ret, info): |
| assert is_namedtuple(ret) |
| # note: testing all gids as above seems not to be reliable for |
| # gid == 30 (nodoby); not sure why. |
| for gid in ret: |
| self.assertIsInstance(gid, int) |
| if not MACOS and not NETBSD: |
| self.assertGreaterEqual(gid, 0) |
| |
| def username(self, ret, info): |
| self.assertIsInstance(ret, str) |
| assert ret |
| |
| def status(self, ret, info): |
| self.assertIsInstance(ret, str) |
| assert ret |
| self.assertNotEqual(ret, '?') # XXX |
| self.assertIn(ret, VALID_PROC_STATUSES) |
| |
| def io_counters(self, ret, info): |
| assert is_namedtuple(ret) |
| for field in ret: |
| self.assertIsInstance(field, (int, long)) |
| if field != -1: |
| self.assertGreaterEqual(field, 0) |
| |
| def ionice(self, ret, info): |
| if LINUX: |
| self.assertIsInstance(ret.ioclass, int) |
| self.assertIsInstance(ret.value, int) |
| self.assertGreaterEqual(ret.ioclass, 0) |
| self.assertGreaterEqual(ret.value, 0) |
| else: # Windows, Cygwin |
| choices = [ |
| psutil.IOPRIO_VERYLOW, |
| psutil.IOPRIO_LOW, |
| psutil.IOPRIO_NORMAL, |
| psutil.IOPRIO_HIGH] |
| self.assertIsInstance(ret, int) |
| self.assertGreaterEqual(ret, 0) |
| self.assertIn(ret, choices) |
| |
| def num_threads(self, ret, info): |
| self.assertIsInstance(ret, int) |
| self.assertGreaterEqual(ret, 1) |
| |
| def threads(self, ret, info): |
| self.assertIsInstance(ret, list) |
| for t in ret: |
| assert is_namedtuple(t) |
| self.assertGreaterEqual(t.id, 0) |
| self.assertGreaterEqual(t.user_time, 0) |
| self.assertGreaterEqual(t.system_time, 0) |
| for field in t: |
| self.assertIsInstance(field, (int, float)) |
| |
| def cpu_times(self, ret, info): |
| assert is_namedtuple(ret) |
| for n in ret: |
| self.assertIsInstance(n, float) |
| self.assertGreaterEqual(n, 0) |
| # TODO: check ntuple fields |
| |
| def cpu_percent(self, ret, info): |
| self.assertIsInstance(ret, float) |
| assert 0.0 <= ret <= 100.0, ret |
| |
| def cpu_num(self, ret, info): |
| self.assertIsInstance(ret, int) |
| if FREEBSD and ret == -1: |
| return |
| self.assertGreaterEqual(ret, 0) |
| if psutil.cpu_count() == 1: |
| self.assertEqual(ret, 0) |
| self.assertIn(ret, list(range(psutil.cpu_count()))) |
| |
| def memory_info(self, ret, info): |
| assert is_namedtuple(ret) |
| for value in ret: |
| self.assertIsInstance(value, (int, long)) |
| self.assertGreaterEqual(value, 0) |
| if WINDOWS: |
| self.assertGreaterEqual(ret.peak_wset, ret.wset) |
| self.assertGreaterEqual(ret.peak_paged_pool, ret.paged_pool) |
| self.assertGreaterEqual(ret.peak_nonpaged_pool, ret.nonpaged_pool) |
| self.assertGreaterEqual(ret.peak_pagefile, ret.pagefile) |
| |
| def memory_full_info(self, ret, info): |
| assert is_namedtuple(ret) |
| total = psutil.virtual_memory().total |
| for name in ret._fields: |
| value = getattr(ret, name) |
| self.assertIsInstance(value, (int, long)) |
| self.assertGreaterEqual(value, 0, msg=(name, value)) |
| if LINUX or OSX and name in ('vms', 'data'): |
| # On Linux there are processes (e.g. 'goa-daemon') whose |
| # VMS is incredibly high for some reason. |
| continue |
| self.assertLessEqual(value, total, msg=(name, value, total)) |
| |
| if LINUX: |
| self.assertGreaterEqual(ret.pss, ret.uss) |
| |
| def open_files(self, ret, info): |
| self.assertIsInstance(ret, list) |
| for f in ret: |
| self.assertIsInstance(f.fd, int) |
| self.assertIsInstance(f.path, str) |
| if WINDOWS: |
| self.assertEqual(f.fd, -1) |
| elif LINUX: |
| self.assertIsInstance(f.position, int) |
| self.assertIsInstance(f.mode, str) |
| self.assertIsInstance(f.flags, int) |
| self.assertGreaterEqual(f.position, 0) |
| self.assertIn(f.mode, ('r', 'w', 'a', 'r+', 'a+')) |
| self.assertGreater(f.flags, 0) |
| elif BSD and not f.path: |
| # XXX see: https://github.com/giampaolo/psutil/issues/595 |
| continue |
| assert os.path.isabs(f.path), f |
| try: |
| st = os.stat(f.path) |
| except FileNotFoundError: |
| pass |
| else: |
| assert stat.S_ISREG(st.st_mode), f |
| |
| def num_fds(self, ret, info): |
| self.assertIsInstance(ret, int) |
| self.assertGreaterEqual(ret, 0) |
| |
| def connections(self, ret, info): |
| with create_sockets(): |
| self.assertEqual(len(ret), len(set(ret))) |
| for conn in ret: |
| assert is_namedtuple(conn) |
| |
| def cwd(self, ret, info): |
| if ret: # 'ret' can be None or empty |
| self.assertIsInstance(ret, str) |
| assert os.path.isabs(ret), ret |
| try: |
| st = os.stat(ret) |
| except OSError as err: |
| if WINDOWS and err.errno in \ |
| psutil._psplatform.ACCESS_DENIED_SET: |
| pass |
| # directory has been removed in mean time |
| elif err.errno != errno.ENOENT: |
| raise |
| else: |
| assert stat.S_ISDIR(st.st_mode) |
| |
| def memory_percent(self, ret, info): |
| self.assertIsInstance(ret, float) |
| assert 0 <= ret <= 100, ret |
| |
| def is_running(self, ret, info): |
| self.assertIsInstance(ret, bool) |
| |
| def cpu_affinity(self, ret, info): |
| self.assertIsInstance(ret, list) |
| assert ret != [], ret |
| cpus = list(range(psutil.cpu_count())) |
| for n in ret: |
| self.assertIsInstance(n, int) |
| self.assertIn(n, cpus) |
| |
| def terminal(self, ret, info): |
| self.assertIsInstance(ret, (str, type(None))) |
| if ret is not None: |
| assert os.path.isabs(ret), ret |
| assert os.path.exists(ret), ret |
| |
| def memory_maps(self, ret, info): |
| for nt in ret: |
| self.assertIsInstance(nt.addr, str) |
| self.assertIsInstance(nt.perms, str) |
| self.assertIsInstance(nt.path, str) |
| for fname in nt._fields: |
| value = getattr(nt, fname) |
| if fname == 'path': |
| if not value.startswith('['): |
| assert os.path.isabs(nt.path), nt.path |
| # commented as on Linux we might get |
| # '/foo/bar (deleted)' |
| # assert os.path.exists(nt.path), nt.path |
| elif fname == 'addr': |
| assert value, repr(value) |
| elif fname == 'perms': |
| if not WINDOWS: |
| assert value, repr(value) |
| else: |
| self.assertIsInstance(value, (int, long)) |
| self.assertGreaterEqual(value, 0) |
| |
| def num_handles(self, ret, info): |
| self.assertIsInstance(ret, int) |
| self.assertGreaterEqual(ret, 0) |
| |
| def nice(self, ret, info): |
| self.assertIsInstance(ret, int) |
| if POSIX: |
| assert -20 <= ret <= 20, ret |
| else: |
| priorities = [getattr(psutil, x) for x in dir(psutil) |
| if x.endswith('_PRIORITY_CLASS')] |
| self.assertIn(ret, priorities) |
| if sys.version_info > (3, 4): |
| self.assertIsInstance(ret, enum.IntEnum) |
| else: |
| self.assertIsInstance(ret, int) |
| |
| def num_ctx_switches(self, ret, info): |
| assert is_namedtuple(ret) |
| for value in ret: |
| self.assertIsInstance(value, (int, long)) |
| self.assertGreaterEqual(value, 0) |
| |
| def rlimit(self, ret, info): |
| self.assertIsInstance(ret, tuple) |
| self.assertEqual(len(ret), 2) |
| self.assertGreaterEqual(ret[0], -1) |
| self.assertGreaterEqual(ret[1], -1) |
| |
| def environ(self, ret, info): |
| self.assertIsInstance(ret, dict) |
| for k, v in ret.items(): |
| self.assertIsInstance(k, str) |
| self.assertIsInstance(v, str) |
| |
| |
| if __name__ == '__main__': |
| from psutil.tests.runner import run_from_name |
| run_from_name(__file__) |