blob: 28670f25c846555d1343fbf011b091602190290b [file] [log] [blame]
#!/usr/bin/env python
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Tests for net_connections() and Process.connections() APIs."""
import contextlib
import os
import socket
import textwrap
from socket import AF_INET
from socket import AF_INET6
from socket import SOCK_DGRAM
from socket import SOCK_STREAM
import psutil
from psutil import FREEBSD
from psutil import OSX
from psutil import POSIX
from psutil import SUNOS
from psutil import WINDOWS
from psutil._common import supports_ipv6
from psutil._compat import unicode
from psutil.tests import bind_unix_socket
from psutil.tests import check_connection_ntuple
from psutil.tests import pyrun
from psutil.tests import reap_children
from psutil.tests import run_test_module_by_name
from psutil.tests import safe_rmpath
from psutil.tests import skip_on_access_denied
from psutil.tests import TESTFN
from psutil.tests import unix_socket_path
from psutil.tests import wait_for_file
from psutil.tests import unittest
AF_UNIX = getattr(socket, "AF_UNIX", object())
class TestProcessConnections(unittest.TestCase):
"""Tests for Process.connections()."""
def tearDown(self):
safe_rmpath(TESTFN)
reap_children()
def compare_proc_sys_cons(self, pid, proc_cons):
from psutil._common import pconn
try:
syscons = psutil.net_connections(kind='all')
except psutil.AccessDenied:
# On OSX, system-wide connections are retrieved by iterating
# over all processes
if not OSX:
raise
# exclude PIDs from syscons
syscons = [c[:-1] for c in syscons if c.pid == pid]
if FREEBSD:
# on FreeBSD all fds are set to -1 so exclude them
proc_cons = [pconn(*[-1] + list(x[1:])) for x in proc_cons]
self.assertEqual(sorted(proc_cons), sorted(syscons))
@unittest.skipUnless(POSIX, 'POSIX only')
def test_connections_unix(self):
def check(type):
with unix_socket_path() as name:
sock = bind_unix_socket(name, type=type)
with contextlib.closing(sock):
cons = psutil.Process().connections(kind='unix')
self.assertEqual(len(cons), 1)
conn = cons[0]
check_connection_ntuple(conn)
if conn.fd != -1: # != sunos and windows
self.assertEqual(conn.fd, sock.fileno())
self.assertEqual(conn.family, AF_UNIX)
self.assertEqual(conn.type, type)
self.assertEqual(conn.laddr, name)
if not SUNOS:
# XXX Solaris can't retrieve system-wide UNIX
# sockets.
self.compare_proc_sys_cons(os.getpid(), cons)
check(SOCK_STREAM)
check(SOCK_DGRAM)
@unittest.skipUnless(hasattr(socket, "fromfd"),
'socket.fromfd() not supported')
@unittest.skipIf(WINDOWS or SUNOS,
'connection fd not available on this platform')
def test_connection_fromfd(self):
with contextlib.closing(socket.socket()) as sock:
sock.bind(('127.0.0.1', 0))
sock.listen(1)
p = psutil.Process()
for conn in p.connections():
if conn.fd == sock.fileno():
break
else:
self.fail("couldn't find socket fd")
dupsock = socket.fromfd(conn.fd, conn.family, conn.type)
with contextlib.closing(dupsock):
self.assertEqual(dupsock.getsockname(), conn.laddr)
self.assertNotEqual(sock.fileno(), dupsock.fileno())
def test_connection_constants(self):
ints = []
strs = []
for name in dir(psutil):
if name.startswith('CONN_'):
num = getattr(psutil, name)
str_ = str(num)
assert str_.isupper(), str_
assert str_ not in strs, str_
assert num not in ints, num
ints.append(num)
strs.append(str_)
if SUNOS:
psutil.CONN_IDLE
psutil.CONN_BOUND
if WINDOWS:
psutil.CONN_DELETE_TCB
@skip_on_access_denied(only_if=OSX)
def test_connections(self):
def check_conn(proc, conn, family, type, laddr, raddr, status, kinds):
all_kinds = ("all", "inet", "inet4", "inet6", "tcp", "tcp4",
"tcp6", "udp", "udp4", "udp6")
check_connection_ntuple(conn)
self.assertEqual(conn.family, family)
self.assertEqual(conn.type, type)
self.assertEqual(conn.laddr, laddr)
self.assertEqual(conn.raddr, raddr)
self.assertEqual(conn.status, status)
for kind in all_kinds:
cons = proc.connections(kind=kind)
if kind in kinds:
self.assertNotEqual(cons, [])
else:
self.assertEqual(cons, [])
# compare against system-wide connections
# XXX Solaris can't retrieve system-wide UNIX
# sockets.
if not SUNOS:
self.compare_proc_sys_cons(proc.pid, [conn])
tcp_template = textwrap.dedent("""
import socket, time
s = socket.socket($family, socket.SOCK_STREAM)
s.bind(('$addr', 0))
s.listen(1)
with open('$testfn', 'w') as f:
f.write(str(s.getsockname()[:2]))
time.sleep(60)
""")
udp_template = textwrap.dedent("""
import socket, time
s = socket.socket($family, socket.SOCK_DGRAM)
s.bind(('$addr', 0))
with open('$testfn', 'w') as f:
f.write(str(s.getsockname()[:2]))
time.sleep(60)
""")
from string import Template
testfile = os.path.basename(TESTFN)
tcp4_template = Template(tcp_template).substitute(
family=int(AF_INET), addr="127.0.0.1", testfn=testfile)
udp4_template = Template(udp_template).substitute(
family=int(AF_INET), addr="127.0.0.1", testfn=testfile)
tcp6_template = Template(tcp_template).substitute(
family=int(AF_INET6), addr="::1", testfn=testfile)
udp6_template = Template(udp_template).substitute(
family=int(AF_INET6), addr="::1", testfn=testfile)
# launch various subprocess instantiating a socket of various
# families and types to enrich psutil results
tcp4_proc = pyrun(tcp4_template)
tcp4_addr = eval(wait_for_file(testfile))
udp4_proc = pyrun(udp4_template)
udp4_addr = eval(wait_for_file(testfile))
if supports_ipv6():
tcp6_proc = pyrun(tcp6_template)
tcp6_addr = eval(wait_for_file(testfile))
udp6_proc = pyrun(udp6_template)
udp6_addr = eval(wait_for_file(testfile))
else:
tcp6_proc = None
udp6_proc = None
tcp6_addr = None
udp6_addr = None
for p in psutil.Process().children():
cons = p.connections()
self.assertEqual(len(cons), 1)
for conn in cons:
# TCP v4
if p.pid == tcp4_proc.pid:
check_conn(p, conn, AF_INET, SOCK_STREAM, tcp4_addr, (),
psutil.CONN_LISTEN,
("all", "inet", "inet4", "tcp", "tcp4"))
# UDP v4
elif p.pid == udp4_proc.pid:
check_conn(p, conn, AF_INET, SOCK_DGRAM, udp4_addr, (),
psutil.CONN_NONE,
("all", "inet", "inet4", "udp", "udp4"))
# TCP v6
elif p.pid == getattr(tcp6_proc, "pid", None):
check_conn(p, conn, AF_INET6, SOCK_STREAM, tcp6_addr, (),
psutil.CONN_LISTEN,
("all", "inet", "inet6", "tcp", "tcp6"))
# UDP v6
elif p.pid == getattr(udp6_proc, "pid", None):
check_conn(p, conn, AF_INET6, SOCK_DGRAM, udp6_addr, (),
psutil.CONN_NONE,
("all", "inet", "inet6", "udp", "udp6"))
# err
self.assertRaises(ValueError, p.connections, kind='???')
class TestSystemConnections(unittest.TestCase):
"""Tests for net_connections()."""
@skip_on_access_denied()
def test_net_connections(self):
def check(cons, families, types_):
AF_UNIX = getattr(socket, 'AF_UNIX', object())
for conn in cons:
self.assertIn(conn.family, families, msg=conn)
if conn.family != AF_UNIX:
self.assertIn(conn.type, types_, msg=conn)
self.assertIsInstance(conn.status, (str, unicode))
from psutil._common import conn_tmap
for kind, groups in conn_tmap.items():
if SUNOS and kind == 'unix':
continue
families, types_ = groups
cons = psutil.net_connections(kind)
self.assertEqual(len(cons), len(set(cons)))
check(cons, families, types_)
self.assertRaises(ValueError, psutil.net_connections, kind='???')
if __name__ == '__main__':
run_test_module_by_name(__file__)