# -*- test-case-name: twisted.python.test.test_util -*-
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
# See LICENSE for details.
import os, sys, hmac, errno, new, inspect, warnings
import pwd, grp
except ImportError:
pwd = grp = None
from os import setgroups, getgroups
except ImportError:
setgroups = getgroups = None
from UserDict import UserDict
class InsensitiveDict:
"""Dictionary, that has case-insensitive keys.
Normally keys are retained in their original form when queried with
.keys() or .items(). If initialized with preserveCase=0, keys are both
looked up in lowercase and returned in lowercase by .keys() and .items().
Modified recipe at originally
contributed by Sami Hangaslammi.
def __init__(self, dict=None, preserve=1):
"""Create an empty dictionary, or update from 'dict'.""" = {}
if dict:
def __delitem__(self, key):
def _lowerOrReturn(self, key):
if isinstance(key, str) or isinstance(key, unicode):
return key.lower()
return key
def __getitem__(self, key):
"""Retrieve the value associated with 'key' (in any case)."""
k = self._lowerOrReturn(key)
def __setitem__(self, key, value):
"""Associate 'value' with 'key'. If 'key' already exists, but
in different case, it will be replaced."""
k = self._lowerOrReturn(key)[k] = (key, value)
def has_key(self, key):
"""Case insensitive test whether 'key' exists."""
k = self._lowerOrReturn(key)
def _doPreserve(self, key):
if not self.preserve and (isinstance(key, str)
or isinstance(key, unicode)):
return key.lower()
return key
def keys(self):
"""List of keys in their original case."""
return list(self.iterkeys())
def values(self):
"""List of values."""
return list(self.itervalues())
def items(self):
"""List of (key,value) pairs."""
return list(self.iteritems())
def get(self, key, default=None):
"""Retrieve value associated with 'key' or return default value
if 'key' doesn't exist."""
return self[key]
except KeyError:
return default
def setdefault(self, key, default):
"""If 'key' doesn't exists, associate it with the 'default' value.
Return value associated with 'key'."""
if not self.has_key(key):
self[key] = default
return self[key]
def update(self, dict):
"""Copy (key,value) pairs from 'dict'."""
for k,v in dict.items():
self[k] = v
def __repr__(self):
"""String representation of the dictionary."""
items = ", ".join([("%r: %r" % (k,v)) for k,v in self.items()])
return "InsensitiveDict({%s})" % items
def iterkeys(self):
for v in
yield self._doPreserve(v[0])
def itervalues(self):
for v in
yield v[1]
def iteritems(self):
for (k, v) in
yield self._doPreserve(k), v
def popitem(self):
del self[i[0]]
return i
def clear(self):
for k in self.keys():
del self[k]
def copy(self):
return InsensitiveDict(self, self.preserve)
def __len__(self):
return len(
def __eq__(self, other):
for k,v in self.items():
if not (k in other) or not (other[k]==v):
return 0
return len(self)==len(other)
class OrderedDict(UserDict):
"""A UserDict that preserves insert order whenever possible."""
def __init__(self, dict=None, **kwargs):
self._order = [] = {}
if dict is not None:
if hasattr(dict,'keys'):
for k,v in dict: # sequence
self[k] = v
if len(kwargs):
def __repr__(self):
return '{'+', '.join([('%r: %r' % item) for item in self.items()])+'}'
def __setitem__(self, key, value):
if not self.has_key(key):
UserDict.__setitem__(self, key, value)
def copy(self):
return self.__class__(self)
def __delitem__(self, key):
UserDict.__delitem__(self, key)
def iteritems(self):
for item in self._order:
yield (item, self[item])
def items(self):
return list(self.iteritems())
def itervalues(self):
for item in self._order:
yield self[item]
def values(self):
return list(self.itervalues())
def iterkeys(self):
return iter(self._order)
def keys(self):
return list(self._order)
def popitem(self):
key = self._order[-1]
value = self[key]
del self[key]
return (key, value)
def setdefault(self, item, default):
if self.has_key(item):
return self[item]
self[item] = default
return default
def update(self, d):
for k, v in d.items():
self[k] = v
def uniquify(lst):
"""Make the elements of a list unique by inserting them into a dictionary.
This must not change the order of the input lst.
dct = {}
result = []
for k in lst:
if not dct.has_key(k): result.append(k)
dct[k] = 1
return result
def padTo(n, seq, default=None):
"""Pads a sequence out to n elements,
filling in with a default value if it is not long enough.
If the input sequence is longer than n, raises ValueError.
Details, details:
This returns a new list; it does not extend the original sequence.
The new list contains the values of the original sequence, not copies.
if len(seq) > n:
raise ValueError, "%d elements is more than %d." % (len(seq), n)
blank = [default] * n
blank[:len(seq)] = list(seq)
return blank
def getPluginDirs():
import twisted
systemPlugins = os.path.join(os.path.dirname(os.path.dirname(
os.path.abspath(twisted.__file__))), 'plugins')
userPlugins = os.path.expanduser("~/TwistedPlugins")
confPlugins = os.path.expanduser("~/.twisted")
allPlugins = filter(os.path.isdir, [systemPlugins, userPlugins, confPlugins])
return allPlugins
def addPluginDir():
def sibpath(path, sibling):
"""Return the path to a sibling of a file in the filesystem.
This is useful in conjunction with the special __file__ attribute
that Python provides for modules, so modules can load associated
resource files.
return os.path.join(os.path.dirname(os.path.abspath(path)), sibling)
def _getpass(prompt):
"""Helper to turn IOErrors into KeyboardInterrupts"""
import getpass
return getpass.getpass(prompt)
except IOError, e:
if e.errno == errno.EINTR:
raise KeyboardInterrupt
except EOFError:
raise KeyboardInterrupt
def getPassword(prompt = 'Password: ', confirm = 0, forceTTY = 0,
confirmPrompt = 'Confirm password: ',
mismatchMessage = "Passwords don't match."):
"""Obtain a password by prompting or from stdin.
If stdin is a terminal, prompt for a new password, and confirm (if
C{confirm} is true) by asking again to make sure the user typed the same
thing, as keystrokes will not be echoed.
If stdin is not a terminal, and C{forceTTY} is not true, read in a line
and use it as the password, less the trailing newline, if any. If
C{forceTTY} is true, attempt to open a tty and prompt for the password
using it. Raise a RuntimeError if this is not possible.
@returns: C{str}
isaTTY = hasattr(sys.stdin, 'isatty') and sys.stdin.isatty()
old = None
if not isaTTY:
if forceTTY:
old = sys.stdin, sys.stdout
sys.stdin = sys.stdout = open('/dev/tty', 'r+')
raise RuntimeError("Cannot obtain a TTY")
password = sys.stdin.readline()
if password[-1] == '\n':
password = password[:-1]
return password
while 1:
try1 = _getpass(prompt)
if not confirm:
return try1
try2 = _getpass(confirmPrompt)
if try1 == try2:
return try1
sys.stderr.write(mismatchMessage + "\n")
if old:
sys.stdin, sys.stdout = old
def dict(*a, **k):
import __builtin__
warnings.warn('twisted.python.util.dict is deprecated. Use __builtin__.dict instead')
return __builtin__.dict(*a, **k)
def println(*a):
sys.stdout.write(' '.join(map(str, a))+'\n')
# This does not belong here
# But where does it belong?
def str_xor(s, b):
return ''.join([chr(ord(c) ^ b) for c in s])
def keyed_md5(secret, challenge):
Create the keyed MD5 string for the given secret and challenge.
"keyed_md5() is deprecated. Use the stdlib module hmac instead.",
DeprecationWarning, stacklevel=2
return hmac.HMAC(secret, challenge).hexdigest()
def makeStatBar(width, maxPosition, doneChar = '=', undoneChar = '-', currentChar = '>'):
"""Creates a function that will return a string representing a progress bar.
aValue = width / float(maxPosition)
def statBar(position, force = 0, last = ['']):
assert len(last) == 1, "Don't mess with the last parameter."
done = int(aValue * position)
toDo = width - done - 2
result = "[%s%s%s]" % (doneChar * done, currentChar, undoneChar * toDo)
if force:
last[0] = result
return result
if result == last[0]:
return ''
last[0] = result
return result
statBar.__doc__ = """statBar(position, force = 0) -> '[%s%s%s]'-style progress bar
returned string is %d characters long, and the range goes from 0..%d.
The 'position' argument is where the '%s' will be drawn. If force is false,
'' will be returned instead if the resulting progress bar is identical to the
previously returned progress bar.
""" % (doneChar * 3, currentChar, undoneChar * 3, width, maxPosition, currentChar)
return statBar
def spewer(frame, s, ignored):
"""A trace function for sys.settrace that prints every function or method call."""
from twisted.python import reflect
if frame.f_locals.has_key('self'):
se = frame.f_locals['self']
if hasattr(se, '__class__'):
k = reflect.qual(se.__class__)
k = reflect.qual(type(se))
print 'method %s of %s at %s' % (
frame.f_code.co_name, k, id(se)
print 'function %s in %s, line %s' % (
def searchupwards(start, files=[], dirs=[]):
"""Walk upwards from start, looking for a directory containing
all files and directories given as arguments::
>>> searchupwards('.', ['foo.txt'], ['bar', 'bam'])
If not found, return None
exists=os.path.exists; join=os.sep.join; isdir=os.path.isdir
while len(parents):
for f in files:
if not exists("%s%s" % (candidate, f)):
if allpresent:
for d in dirs:
if not isdir("%s%s" % (candidate, d)):
if allpresent: return candidate
return None
class LineLog:
A limited-size line-based log, useful for logging line-based
protocols such as SMTP.
When the log fills up, old entries drop off the end.
def __init__(self, size=10):
Create a new log, with size lines of storage (default 10).
A log size of 0 (or less) means an infinite log.
if size < 0:
size = 0
self.log = [None]*size
self.size = size
def append(self,line):
if self.size:
self.log[:-1] = self.log[1:]
self.log[-1] = line
def str(self):
return '\n'.join(filter(None,self.log))
def __getitem__(self, item):
return filter(None,self.log)[item]
def clear(self):
"""Empty the log"""
self.log = [None]*self.size
def raises(exception, f, *args, **kwargs):
"""Determine whether the given call raises the given exception"""
f(*args, **kwargs)
except exception:
return 1
return 0
class IntervalDifferential:
Given a list of intervals, generate the amount of time to sleep between
For example, given 7, 11 and 13, the three (infinite) sequences::
7 14 21 28 35 ...
11 22 33 44 ...
13 26 39 52 ...
will be generated, merged, and used to produce::
(7, 0) (4, 1) (2, 2) (1, 0) (7, 0) (1, 1) (4, 2) (2, 0) (5, 1) (2, 0)
New intervals may be added or removed as iteration proceeds using the
proper methods.
def __init__(self, intervals, default=60):
@type intervals: C{list} of C{int}, C{long}, or C{float} param
@param intervals: The intervals between instants.
@type default: C{int}, C{long}, or C{float}
@param default: The duration to generate if the intervals list
becomes empty.
self.intervals = intervals[:]
self.default = default
def __iter__(self):
return _IntervalDifferentialIterator(self.intervals, self.default)
class _IntervalDifferentialIterator:
def __init__(self, i, d):
self.intervals = [[e, e, n] for (e, n) in zip(i, range(len(i)))]
self.default = d
self.last = 0
def next(self):
if not self.intervals:
return (self.default, None)
last, index = self.intervals[0][0], self.intervals[0][2]
self.intervals[0][0] += self.intervals[0][1]
result = last - self.last
self.last = last
return result, index
def addInterval(self, i):
if self.intervals:
delay = self.intervals[0][0] - self.intervals[0][1]
self.intervals.append([delay + i, i, len(self.intervals)])
self.intervals.append([i, i, 0])
def removeInterval(self, interval):
for i in range(len(self.intervals)):
if self.intervals[i][1] == interval:
index = self.intervals[i][2]
del self.intervals[i]
for i in self.intervals:
if i[2] > index:
i[2] -= 1
raise ValueError, "Specified interval not in IntervalDifferential"
class FancyStrMixin:
Set showAttributes to a sequence of strings naming attributes, OR
sequences of (attributeName, displayName, formatCharacter)
showAttributes = ()
def __str__(self):
r = ['<', hasattr(self, 'fancybasename') and self.fancybasename or self.__class__.__name__]
for attr in self.showAttributes:
if isinstance(attr, str):
r.append(' %s=%r' % (attr, getattr(self, attr)))
r.append((' %s=' + attr[2]) % (attr[1], getattr(self, attr[0])))
return ''.join(r)
__repr__ = __str__
class FancyEqMixin:
compareAttributes = ()
def __eq__(self, other):
if not self.compareAttributes:
return self is other
if isinstance(self, other.__class__):
return (
[getattr(self, name) for name in self.compareAttributes] ==
[getattr(other, name) for name in self.compareAttributes])
return NotImplemented
def __ne__(self, other):
result = self.__eq__(other)
if result is NotImplemented:
return result
return not result
def dsu(list, key):
decorate-sort-undecorate (aka "Schwartzian transform")
DEPRECATED. Use the built-in C{sorted()} instead.
warnings.warn(("dsu is deprecated since Twisted 10.1. "
"Use the built-in sorted() instead."), DeprecationWarning, stacklevel=2)
L2 = [(key(e), i, e) for (i, e) in zip(range(len(list)), list)]
return [e[2] for e in L2]
from twisted.python._initgroups import initgroups as _c_initgroups
except ImportError:
_c_initgroups = None
if pwd is None or grp is None or setgroups is None or getgroups is None:
def initgroups(uid, primaryGid):
Do nothing.
Underlying platform support require to manipulate groups is missing.
# Fallback to the inefficient Python version
def _setgroups_until_success(l):
# NASTY NASTY HACK (but glibc does it so it must be okay):
# In case sysconfig didn't give the right answer, find the limit
# on max groups by just looping, trying to set fewer and fewer
# groups each time until it succeeds.
except ValueError:
# This exception comes from python itself restricting
# number of groups allowed.
if len(l) > 1:
del l[-1]
except OSError, e:
if e.errno == errno.EINVAL and len(l) > 1:
# This comes from the OS saying too many groups
del l[-1]
# Success, yay!
def initgroups(uid, primaryGid):
Initializes the group access list.
If the C extension is present, we're calling it, which in turn calls
If not, this is done by reading the group database /etc/group and using
all groups of which C{uid} is a member. The additional group
C{primaryGid} is also added to the list.
If the given user is a member of more than C{NGROUPS}, arbitrary
groups will be silently discarded to bring the number below that
@type uid: C{int}
@param uid: The UID for which to look up group information.
@type primaryGid: C{int} or C{NoneType}
@param primaryGid: If provided, an additional GID to include when
setting the groups.
if _c_initgroups is not None:
return _c_initgroups(pwd.getpwuid(uid)[0], primaryGid)
# Try to get the maximum number of groups
max_groups = os.sysconf("SC_NGROUPS_MAX")
# No predefined limit
max_groups = 0
username = pwd.getpwuid(uid)[0]
l = []
if primaryGid is not None:
for groupname, password, gid, userlist in grp.getgrall():
if username in userlist:
if len(l) == max_groups:
break # No more groups, ignore any more
except OSError, e:
# We might be able to remove this code now that we
# don't try to setgid/setuid even when not asked to.
if e.errno == errno.EPERM:
for g in getgroups():
if g not in l:
def switchUID(uid, gid, euid=False):
if euid:
setuid = os.seteuid
setgid = os.setegid
setuid = os.setuid
setgid = os.setgid
if gid is not None:
if uid is not None:
initgroups(uid, gid)
class SubclassableCStringIO(object):
"""A wrapper around cStringIO to allow for subclassing"""
__csio = None
def __init__(self, *a, **kw):
from cStringIO import StringIO
self.__csio = StringIO(*a, **kw)
def __iter__(self):
return self.__csio.__iter__()
def next(self):
def close(self):
return self.__csio.close()
def isatty(self):
return self.__csio.isatty()
def seek(self, pos, mode=0):
return, mode)
def tell(self):
return self.__csio.tell()
def read(self, n=-1):
def readline(self, length=None):
return self.__csio.readline(length)
def readlines(self, sizehint=0):
return self.__csio.readlines(sizehint)
def truncate(self, size=None):
return self.__csio.truncate(size)
def write(self, s):
return self.__csio.write(s)
def writelines(self, list):
return self.__csio.writelines(list)
def flush(self):
return self.__csio.flush()
def getvalue(self):
return self.__csio.getvalue()
def moduleMovedForSplit(origModuleName, newModuleName, moduleDesc,
projectName, projectURL, globDict):
No-op function; only present for backwards compatibility. There is no
reason to call this function.
"moduleMovedForSplit is deprecated since Twisted 9.0.",
DeprecationWarning, stacklevel=2)
def untilConcludes(f, *a, **kw):
while True:
return f(*a, **kw)
except (IOError, OSError), e:
if e.args[0] == errno.EINTR:
_idFunction = id
def setIDFunction(idFunction):
Change the function used by L{unsignedID} to determine the integer id value
of an object. This is largely useful for testing to give L{unsignedID}
deterministic, easily-controlled behavior.
@param idFunction: A function with the signature of L{id}.
@return: The previous function being used by L{unsignedID}.
global _idFunction
oldIDFunction = _idFunction
_idFunction = idFunction
return oldIDFunction
# A value about twice as large as any Python int, to which negative values
# from id() will be added, moving them into a range which should begin just
# above where positive values from id() leave off.
_HUGEINT = (sys.maxint + 1L) * 2L
def unsignedID(obj):
Return the id of an object as an unsigned number so that its hex
representation makes sense.
This is mostly necessary in Python 2.4 which implements L{id} to sometimes
return a negative value. Python 2.3 shares this behavior, but also
implements hex and the %x format specifier to represent negative values as
though they were positive ones, obscuring the behavior of L{id}. Python
2.5's implementation of L{id} always returns positive values.
rval = _idFunction(obj)
if rval < 0:
rval += _HUGEINT
return rval
def mergeFunctionMetadata(f, g):
Overwrite C{g}'s name and docstring with values from C{f}. Update
C{g}'s instance dictionary with C{f}'s.
To use this function safely you must use the return value. In Python 2.3,
L{mergeFunctionMetadata} will create a new function. In later versions of
Python, C{g} will be mutated and returned.
@return: A function that has C{g}'s behavior and metadata merged from
g.__name__ = f.__name__
except TypeError:
merged = new.function(
g.func_code, g.func_globals,
f.__name__, inspect.getargspec(g)[-1],
except TypeError:
merged = g
merged.__doc__ = f.__doc__
except (TypeError, AttributeError):
except (TypeError, AttributeError):
merged.__module__ = f.__module__
return merged
def nameToLabel(mname):
Convert a string like a variable name into a slightly more human-friendly
string with spaces and capitalized letters.
@type mname: C{str}
@param mname: The name to convert to a label. This must be a string
which could be used as a Python identifier. Strings which do not take
this form will result in unpredictable behavior.
@rtype: C{str}
labelList = []
word = ''
lastWasUpper = False
for letter in mname:
if letter.isupper() == lastWasUpper:
# Continuing a word.
word += letter
# breaking a word OR beginning a word
if lastWasUpper:
# could be either
if len(word) == 1:
# keep going
word += letter
# acronym
# we're processing the lowercase letter after the acronym-then-capital
lastWord = word[:-1]
firstLetter = word[-1]
word = firstLetter + letter
# definitely breaking: lower to upper
word = letter
lastWasUpper = letter.isupper()
if labelList:
labelList[0] = labelList[0].capitalize()
return mname.capitalize()
return ' '.join(labelList)
def uidFromString(uidString):
Convert a user identifier, as a string, into an integer UID.
@type uid: C{str}
@param uid: A string giving the base-ten representation of a UID or the
name of a user which can be converted to a UID via L{pwd.getpwnam}.
@rtype: C{int}
@return: The integer UID corresponding to the given string.
@raise ValueError: If the user name is supplied and L{pwd} is not
return int(uidString)
except ValueError:
if pwd is None:
return pwd.getpwnam(uidString)[2]
def gidFromString(gidString):
Convert a group identifier, as a string, into an integer GID.
@type uid: C{str}
@param uid: A string giving the base-ten representation of a GID or the
name of a group which can be converted to a GID via L{grp.getgrnam}.
@rtype: C{int}
@return: The integer GID corresponding to the given string.
@raise ValueError: If the group name is supplied and L{grp} is not
return int(gidString)
except ValueError:
if grp is None:
return grp.getgrnam(gidString)[2]
def runAsEffectiveUser(euid, egid, function, *args, **kwargs):
Run the given function wrapped with seteuid/setegid calls.
This will try to minimize the number of seteuid/setegid calls, comparing
current and wanted permissions
@param euid: effective UID used to call the function.
@type euid: C{int}
@type egid: effective GID used to call the function.
@param egid: C{int}
@param function: the function run with the specific permission.
@type function: any callable
@param *args: arguments passed to C{function}
@param **kwargs: keyword arguments passed to C{function}
uid, gid = os.geteuid(), os.getegid()
if uid == euid and gid == egid:
return function(*args, **kwargs)
if uid != 0 and (uid != euid or gid != egid):
if gid != egid:
if euid != 0 and (euid != uid or gid != egid):
return function(*args, **kwargs)
if euid != 0 and (uid != euid or gid != egid):
if gid != egid:
if uid != 0 and (uid != euid or gid != egid):
__all__ = [
"uniquify", "padTo", "getPluginDirs", "addPluginDir", "sibpath",
"getPassword", "dict", "println", "keyed_md5", "makeStatBar",
"OrderedDict", "InsensitiveDict", "spewer", "searchupwards", "LineLog",
"raises", "IntervalDifferential", "FancyStrMixin", "FancyEqMixin",
"dsu", "switchUID", "SubclassableCStringIO", "moduleMovedForSplit",
"unsignedID", "mergeFunctionMetadata", "nameToLabel", "uidFromString",
"gidFromString", "runAsEffectiveUser", "moduleMovedForSplit",