You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

1875 lines
55 KiB

# sqlalchemy/util.py
# Copyright (C) 2005-2011 the SQLAlchemy authors and contributors <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
import inspect
import itertools
import operator
import sys
import warnings
import weakref
import re
# Py2K
import __builtin__
# end Py2K
types = __import__('types')
from sqlalchemy import exc
try:
import threading
except ImportError:
import dummy_threading as threading
py3k = getattr(sys, 'py3kwarning', False) or sys.version_info >= (3, 0)
jython = sys.platform.startswith('java')
win32 = sys.platform.startswith('win')
if py3k:
set_types = set
elif sys.version_info < (2, 6):
import sets
set_types = set, sets.Set
else:
# 2.6 deprecates sets.Set, but we still need to be able to detect them
# in user code and as return values from DB-APIs
ignore = ('ignore', None, DeprecationWarning, None, 0)
try:
warnings.filters.insert(0, ignore)
except Exception:
import sets
else:
import sets
warnings.filters.remove(ignore)
set_types = set, sets.Set
EMPTY_SET = frozenset()
NoneType = type(None)
if py3k:
import pickle
else:
try:
import cPickle as pickle
except ImportError:
import pickle
# Py2K
# a controversial feature, required by MySQLdb currently
def buffer(x):
return x
buffer = getattr(__builtin__, 'buffer', buffer)
# end Py2K
if sys.version_info >= (2, 5):
class PopulateDict(dict):
"""A dict which populates missing values via a creation function.
Note the creation function takes a key, unlike
collections.defaultdict.
"""
def __init__(self, creator):
self.creator = creator
def __missing__(self, key):
self[key] = val = self.creator(key)
return val
else:
class PopulateDict(dict):
"""A dict which populates missing values via a creation function."""
def __init__(self, creator):
self.creator = creator
def __getitem__(self, key):
try:
return dict.__getitem__(self, key)
except KeyError:
self[key] = value = self.creator(key)
return value
if py3k:
def callable(fn):
return hasattr(fn, '__call__')
def cmp(a, b):
return (a > b) - (a < b)
from functools import reduce
else:
callable = __builtin__.callable
cmp = __builtin__.cmp
reduce = __builtin__.reduce
try:
from collections import defaultdict
except ImportError:
class defaultdict(dict):
def __init__(self, default_factory=None, *a, **kw):
if (default_factory is not None and
not hasattr(default_factory, '__call__')):
raise TypeError('first argument must be callable')
dict.__init__(self, *a, **kw)
self.default_factory = default_factory
def __getitem__(self, key):
try:
return dict.__getitem__(self, key)
except KeyError:
return self.__missing__(key)
def __missing__(self, key):
if self.default_factory is None:
raise KeyError(key)
self[key] = value = self.default_factory()
return value
def __reduce__(self):
if self.default_factory is None:
args = tuple()
else:
args = self.default_factory,
return type(self), args, None, None, self.iteritems()
def copy(self):
return self.__copy__()
def __copy__(self):
return type(self)(self.default_factory, self)
def __deepcopy__(self, memo):
import copy
return type(self)(self.default_factory,
copy.deepcopy(self.items()))
def __repr__(self):
return 'defaultdict(%s, %s)' % (self.default_factory,
dict.__repr__(self))
class frozendict(dict):
@property
def _blocked_attribute(obj):
raise AttributeError, "A frozendict cannot be modified."
__delitem__ = __setitem__ = clear = _blocked_attribute
pop = popitem = setdefault = update = _blocked_attribute
def __new__(cls, *args):
new = dict.__new__(cls)
dict.__init__(new, *args)
return new
def __init__(self, *args):
pass
def __reduce__(self):
return frozendict, (dict(self), )
def union(self, d):
if not self:
return frozendict(d)
else:
d2 = self.copy()
d2.update(d)
return frozendict(d2)
def __repr__(self):
return "frozendict(%s)" % dict.__repr__(self)
# find or create a dict implementation that supports __missing__
class _probe(dict):
def __missing__(self, key):
return 1
try:
try:
_probe()['missing']
py25_dict = dict
except KeyError:
class py25_dict(dict):
def __getitem__(self, key):
try:
return dict.__getitem__(self, key)
except KeyError:
try:
missing = self.__missing__
except AttributeError:
raise KeyError(key)
else:
return missing(key)
finally:
del _probe
def to_list(x, default=None):
if x is None:
return default
if not isinstance(x, (list, tuple)):
return [x]
else:
return x
def to_set(x):
if x is None:
return set()
if not isinstance(x, set):
return set(to_list(x))
else:
return x
def to_column_set(x):
if x is None:
return column_set()
if not isinstance(x, column_set):
return column_set(to_list(x))
else:
return x
try:
from functools import update_wrapper
except ImportError:
def update_wrapper(wrapper, wrapped,
assigned=('__doc__', '__module__', '__name__'),
updated=('__dict__',)):
for attr in assigned:
setattr(wrapper, attr, getattr(wrapped, attr))
for attr in updated:
getattr(wrapper, attr).update(getattr(wrapped, attr, ()))
return wrapper
try:
from functools import partial
except:
def partial(func, *args, **keywords):
def newfunc(*fargs, **fkeywords):
newkeywords = keywords.copy()
newkeywords.update(fkeywords)
return func(*(args + fargs), **newkeywords)
return newfunc
try:
import hashlib
_md5 = hashlib.md5
except ImportError:
import md5
_md5 = md5.new
def md5_hex(x):
# Py3K
#x = x.encode('utf-8')
m = _md5()
m.update(x)
return m.hexdigest()
def accepts_a_list_as_starargs(list_deprecation=None):
def decorate(fn):
spec = inspect.getargspec(fn)
assert spec[1], 'Decorated function does not accept *args'
def _deprecate():
if list_deprecation:
if list_deprecation == 'pending':
warning_type = exc.SAPendingDeprecationWarning
else:
warning_type = exc.SADeprecationWarning
msg = (
"%s%s now accepts multiple %s arguments as a "
"variable argument list. Supplying %s as a single "
"list is deprecated and support will be removed "
"in a future release." % (
fn.func_name,
inspect.formatargspec(*spec),
spec[1], spec[1]))
warnings.warn(msg, warning_type, stacklevel=3)
def go(fn, *args, **kw):
if isinstance(args[-1], list):
_deprecate()
return fn(*(list(args[0:-1]) + args[-1]), **kw)
else:
return fn(*args, **kw)
return decorator(go)(fn)
return decorate
def unique_symbols(used, *bases):
used = set(used)
for base in bases:
pool = itertools.chain((base,),
itertools.imap(lambda i: base + str(i),
xrange(1000)))
for sym in pool:
if sym not in used:
used.add(sym)
yield sym
break
else:
raise NameError("exhausted namespace for symbol base %s" % base)
def decorator(target):
"""A signature-matching decorator factory."""
def decorate(fn):
spec = inspect.getargspec(fn)
names = tuple(spec[0]) + spec[1:3] + (fn.func_name,)
targ_name, fn_name = unique_symbols(names, 'target', 'fn')
metadata = dict(target=targ_name, fn=fn_name)
metadata.update(format_argspec_plus(spec, grouped=False))
code = 'lambda %(args)s: %(target)s(%(fn)s, %(apply_kw)s)' % (
metadata)
decorated = eval(code, {targ_name:target, fn_name:fn})
decorated.func_defaults = getattr(fn, 'im_func', fn).func_defaults
return update_wrapper(decorated, fn)
return update_wrapper(decorate, target)
if sys.version_info >= (2, 5):
def decode_slice(slc):
"""decode a slice object as sent to __getitem__.
takes into account the 2.5 __index__() method, basically.
"""
ret = []
for x in slc.start, slc.stop, slc.step:
if hasattr(x, '__index__'):
x = x.__index__()
ret.append(x)
return tuple(ret)
else:
def decode_slice(slc):
return (slc.start, slc.stop, slc.step)
def update_copy(d, _new=None, **kw):
"""Copy the given dict and update with the given values."""
d = d.copy()
if _new:
d.update(_new)
d.update(**kw)
return d
def flatten_iterator(x):
"""Given an iterator of which further sub-elements may also be
iterators, flatten the sub-elements into a single iterator.
"""
for elem in x:
if not isinstance(elem, basestring) and hasattr(elem, '__iter__'):
for y in flatten_iterator(elem):
yield y
else:
yield elem
def get_cls_kwargs(cls):
"""Return the full set of inherited kwargs for the given `cls`.
Probes a class's __init__ method, collecting all named arguments. If the
__init__ defines a \**kwargs catch-all, then the constructor is presumed to
pass along unrecognized keywords to it's base classes, and the collection
process is repeated recursively on each of the bases.
"""
for c in cls.__mro__:
if '__init__' in c.__dict__:
stack = set([c])
break
else:
return []
args = set()
while stack:
class_ = stack.pop()
ctr = class_.__dict__.get('__init__', False)
if not ctr or not isinstance(ctr, types.FunctionType):
stack.update(class_.__bases__)
continue
names, _, has_kw, _ = inspect.getargspec(ctr)
args.update(names)
if has_kw:
stack.update(class_.__bases__)
args.discard('self')
return args
def get_func_kwargs(func):
"""Return the full set of legal kwargs for the given `func`."""
return inspect.getargspec(func)[0]
def format_argspec_plus(fn, grouped=True):
"""Returns a dictionary of formatted, introspected function arguments.
A enhanced variant of inspect.formatargspec to support code generation.
fn
An inspectable callable or tuple of inspect getargspec() results.
grouped
Defaults to True; include (parens, around, argument) lists
Returns:
args
Full inspect.formatargspec for fn
self_arg
The name of the first positional argument, varargs[0], or None
if the function defines no positional arguments.
apply_pos
args, re-written in calling rather than receiving syntax. Arguments are
passed positionally.
apply_kw
Like apply_pos, except keyword-ish args are passed as keywords.
Example::
>>> format_argspec_plus(lambda self, a, b, c=3, **d: 123)
{'args': '(self, a, b, c=3, **d)',
'self_arg': 'self',
'apply_kw': '(self, a, b, c=c, **d)',
'apply_pos': '(self, a, b, c, **d)'}
"""
spec = callable(fn) and inspect.getargspec(fn) or fn
args = inspect.formatargspec(*spec)
if spec[0]:
self_arg = spec[0][0]
elif spec[1]:
self_arg = '%s[0]' % spec[1]
else:
self_arg = None
apply_pos = inspect.formatargspec(spec[0], spec[1], spec[2])
defaulted_vals = spec[3] is not None and spec[0][0-len(spec[3]):] or ()
apply_kw = inspect.formatargspec(spec[0], spec[1], spec[2], defaulted_vals,
formatvalue=lambda x: '=' + x)
if grouped:
return dict(args=args, self_arg=self_arg,
apply_pos=apply_pos, apply_kw=apply_kw)
else:
return dict(args=args[1:-1], self_arg=self_arg,
apply_pos=apply_pos[1:-1], apply_kw=apply_kw[1:-1])
def format_argspec_init(method, grouped=True):
"""format_argspec_plus with considerations for typical __init__ methods
Wraps format_argspec_plus with error handling strategies for typical
__init__ cases::
object.__init__ -> (self)
other unreflectable (usually C) -> (self, *args, **kwargs)
"""
try:
return format_argspec_plus(method, grouped=grouped)
except TypeError:
self_arg = 'self'
if method is object.__init__:
args = grouped and '(self)' or 'self'
else:
args = (grouped and '(self, *args, **kwargs)'
or 'self, *args, **kwargs')
return dict(self_arg='self', args=args, apply_pos=args, apply_kw=args)
def getargspec_init(method):
"""inspect.getargspec with considerations for typical __init__ methods
Wraps inspect.getargspec with error handling for typical __init__ cases::
object.__init__ -> (self)
other unreflectable (usually C) -> (self, *args, **kwargs)
"""
try:
return inspect.getargspec(method)
except TypeError:
if method is object.__init__:
return (['self'], None, None, None)
else:
return (['self'], 'args', 'kwargs', None)
def unbound_method_to_callable(func_or_cls):
"""Adjust the incoming callable such that a 'self' argument is not required."""
if isinstance(func_or_cls, types.MethodType) and not func_or_cls.im_self:
return func_or_cls.im_func
else:
return func_or_cls
class portable_instancemethod(object):
"""Turn an instancemethod into a (parent, name) pair
to produce a serializable callable.
"""
def __init__(self, meth):
self.target = meth.im_self
self.name = meth.__name__
def __call__(self, *arg, **kw):
return getattr(self.target, self.name)(*arg, **kw)
def class_hierarchy(cls):
"""Return an unordered sequence of all classes related to cls.
Traverses diamond hierarchies.
Fibs slightly: subclasses of builtin types are not returned. Thus
class_hierarchy(class A(object)) returns (A, object), not A plus every
class systemwide that derives from object.
Old-style classes are discarded and hierarchies rooted on them
will not be descended.
"""
# Py2K
if isinstance(cls, types.ClassType):
return list()
# end Py2K
hier = set([cls])
process = list(cls.__mro__)
while process:
c = process.pop()
# Py2K
if isinstance(c, types.ClassType):
continue
for b in (_ for _ in c.__bases__
if _ not in hier and not isinstance(_, types.ClassType)):
# end Py2K
# Py3K
#for b in (_ for _ in c.__bases__
# if _ not in hier):
process.append(b)
hier.add(b)
# Py3K
#if c.__module__ == 'builtins' or not hasattr(c, '__subclasses__'):
# continue
# Py2K
if c.__module__ == '__builtin__' or not hasattr(c, '__subclasses__'):
continue
# end Py2K
for s in [_ for _ in c.__subclasses__() if _ not in hier]:
process.append(s)
hier.add(s)
return list(hier)
def iterate_attributes(cls):
"""iterate all the keys and attributes associated
with a class, without using getattr().
Does not use getattr() so that class-sensitive
descriptors (i.e. property.__get__()) are not called.
"""
keys = dir(cls)
for key in keys:
for c in cls.__mro__:
if key in c.__dict__:
yield (key, c.__dict__[key])
break
# from paste.deploy.converters
def asbool(obj):
if isinstance(obj, (str, unicode)):
obj = obj.strip().lower()
if obj in ['true', 'yes', 'on', 'y', 't', '1']:
return True
elif obj in ['false', 'no', 'off', 'n', 'f', '0']:
return False
else:
raise ValueError("String is not true/false: %r" % obj)
return bool(obj)
def bool_or_str(*text):
"""Return a callable that will evaulate a string as
boolean, or one of a set of "alternate" string values.
"""
def bool_or_value(obj):
if obj in text:
return obj
else:
return asbool(obj)
return bool_or_value
def coerce_kw_type(kw, key, type_, flexi_bool=True):
"""If 'key' is present in dict 'kw', coerce its value to type 'type\_' if
necessary. If 'flexi_bool' is True, the string '0' is considered false
when coercing to boolean.
"""
if key in kw and type(kw[key]) is not type_ and kw[key] is not None:
if type_ is bool and flexi_bool:
kw[key] = asbool(kw[key])
else:
kw[key] = type_(kw[key])
def duck_type_collection(specimen, default=None):
"""Given an instance or class, guess if it is or is acting as one of
the basic collection types: list, set and dict. If the __emulates__
property is present, return that preferentially.
"""
if hasattr(specimen, '__emulates__'):
# canonicalize set vs sets.Set to a standard: the builtin set
if (specimen.__emulates__ is not None and
issubclass(specimen.__emulates__, set_types)):
return set
else:
return specimen.__emulates__
isa = isinstance(specimen, type) and issubclass or isinstance
if isa(specimen, list):
return list
elif isa(specimen, set_types):
return set
elif isa(specimen, dict):
return dict
if hasattr(specimen, 'append'):
return list
elif hasattr(specimen, 'add'):
return set
elif hasattr(specimen, 'set'):
return dict
else:
return default
def dictlike_iteritems(dictlike):
"""Return a (key, value) iterator for almost any dict-like object."""
# Py3K
#if hasattr(dictlike, 'items'):
# return dictlike.items()
# Py2K
if hasattr(dictlike, 'iteritems'):
return dictlike.iteritems()
elif hasattr(dictlike, 'items'):
return iter(dictlike.items())
# end Py2K
getter = getattr(dictlike, '__getitem__', getattr(dictlike, 'get', None))
if getter is None:
raise TypeError(
"Object '%r' is not dict-like" % dictlike)
if hasattr(dictlike, 'iterkeys'):
def iterator():
for key in dictlike.iterkeys():
yield key, getter(key)
return iterator()
elif hasattr(dictlike, 'keys'):
return iter((key, getter(key)) for key in dictlike.keys())
else:
raise TypeError(
"Object '%r' is not dict-like" % dictlike)
def assert_arg_type(arg, argtype, name):
if isinstance(arg, argtype):
return arg
else:
if isinstance(argtype, tuple):
raise exc.ArgumentError(
"Argument '%s' is expected to be one of type %s, got '%s'" %
(name, ' or '.join("'%s'" % a for a in argtype), type(arg)))
else:
raise exc.ArgumentError(
"Argument '%s' is expected to be of type '%s', got '%s'" %
(name, argtype, type(arg)))
_creation_order = 1
def set_creation_order(instance):
"""Assign a '_creation_order' sequence to the given instance.
This allows multiple instances to be sorted in order of creation
(typically within a single thread; the counter is not particularly
threadsafe).
"""
global _creation_order
instance._creation_order = _creation_order
_creation_order +=1
def warn_exception(func, *args, **kwargs):
"""executes the given function, catches all exceptions and converts to a warning."""
try:
return func(*args, **kwargs)
except:
warn("%s('%s') ignored" % sys.exc_info()[0:2])
def monkeypatch_proxied_specials(into_cls, from_cls, skip=None, only=None,
name='self.proxy', from_instance=None):
"""Automates delegation of __specials__ for a proxying type."""
if only:
dunders = only
else:
if skip is None:
skip = ('__slots__', '__del__', '__getattribute__',
'__metaclass__', '__getstate__', '__setstate__')
dunders = [m for m in dir(from_cls)
if (m.startswith('__') and m.endswith('__') and
not hasattr(into_cls, m) and m not in skip)]
for method in dunders:
try:
fn = getattr(from_cls, method)
if not hasattr(fn, '__call__'):
continue
fn = getattr(fn, 'im_func', fn)
except AttributeError:
continue
try:
spec = inspect.getargspec(fn)
fn_args = inspect.formatargspec(spec[0])
d_args = inspect.formatargspec(spec[0][1:])
except TypeError:
fn_args = '(self, *args, **kw)'
d_args = '(*args, **kw)'
py = ("def %(method)s%(fn_args)s: "
"return %(name)s.%(method)s%(d_args)s" % locals())
env = from_instance is not None and {name: from_instance} or {}
exec py in env
try:
env[method].func_defaults = fn.func_defaults
except AttributeError:
pass
setattr(into_cls, method, env[method])
class NamedTuple(tuple):
"""tuple() subclass that adds labeled names.
Is also pickleable.
"""
def __new__(cls, vals, labels=None):
vals = list(vals)
t = tuple.__new__(cls, vals)
if labels:
t.__dict__ = dict(itertools.izip(labels, vals))
t._labels = labels
return t
def keys(self):
return [l for l in self._labels if l is not None]
class OrderedProperties(object):
"""An object that maintains the order in which attributes are set upon it.
Also provides an iterator and a very basic getitem/setitem
interface to those attributes.
(Not really a dict, since it iterates over values, not keys. Not really
a list, either, since each value must have a key associated; hence there is
no append or extend.)
"""
def __init__(self):
self.__dict__['_data'] = OrderedDict()
def __len__(self):
return len(self._data)
def __iter__(self):
return self._data.itervalues()
def __add__(self, other):
return list(self) + list(other)
def __setitem__(self, key, object):
self._data[key] = object
def __getitem__(self, key):
return self._data[key]
def __delitem__(self, key):
del self._data[key]
def __setattr__(self, key, object):
self._data[key] = object
def __getstate__(self):
return {'_data': self.__dict__['_data']}
def __setstate__(self, state):
self.__dict__['_data'] = state['_data']
def __getattr__(self, key):
try:
return self._data[key]
except KeyError:
raise AttributeError(key)
def __contains__(self, key):
return key in self._data
def update(self, value):
self._data.update(value)
def get(self, key, default=None):
if key in self:
return self[key]
else:
return default
def keys(self):
return self._data.keys()
def has_key(self, key):
return key in self._data
def clear(self):
self._data.clear()
class OrderedDict(dict):
"""A dict that returns keys/values/items in the order they were added."""
def __init__(self, ____sequence=None, **kwargs):
self._list = []
if ____sequence is None:
if kwargs:
self.update(**kwargs)
else:
self.update(____sequence, **kwargs)
def clear(self):
self._list = []
dict.clear(self)
def copy(self):
return self.__copy__()
def __copy__(self):
return OrderedDict(self)
def sort(self, *arg, **kw):
self._list.sort(*arg, **kw)
def update(self, ____sequence=None, **kwargs):
if ____sequence is not None:
if hasattr(____sequence, 'keys'):
for key in ____sequence.keys():
self.__setitem__(key, ____sequence[key])
else:
for key, value in ____sequence:
self[key] = value
if kwargs:
self.update(kwargs)
def setdefault(self, key, value):
if key not in self:
self.__setitem__(key, value)
return value
else:
return self.__getitem__(key)
def __iter__(self):
return iter(self._list)
def values(self):
return [self[key] for key in self._list]
def itervalues(self):
return iter(self.values())
def keys(self):
return list(self._list)
def iterkeys(self):
return iter(self.keys())
def items(self):
return [(key, self[key]) for key in self.keys()]
def iteritems(self):
return iter(self.items())
def __setitem__(self, key, object):
if key not in self:
try:
self._list.append(key)
except AttributeError:
# work around Python pickle loads() with
# dict subclass (seems to ignore __setstate__?)
self._list = [key]
dict.__setitem__(self, key, object)
def __delitem__(self, key):
dict.__delitem__(self, key)
self._list.remove(key)
def pop(self, key, *default):
present = key in self
value = dict.pop(self, key, *default)
if present:
self._list.remove(key)
return value
def popitem(self):
item = dict.popitem(self)
self._list.remove(item[0])
return item
class OrderedSet(set):
def __init__(self, d=None):
set.__init__(self)
self._list = []
if d is not None:
self.update(d)
def add(self, element):
if element not in self:
self._list.append(element)
set.add(self, element)
def remove(self, element):
set.remove(self, element)
self._list.remove(element)
def insert(self, pos, element):
if element not in self:
self._list.insert(pos, element)
set.add(self, element)
def discard(self, element):
if element in self:
self._list.remove(element)
set.remove(self, element)
def clear(self):
set.clear(self)
self._list = []
def __getitem__(self, key):
return self._list[key]
def __iter__(self):
return iter(self._list)
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self._list)
__str__ = __repr__
def update(self, iterable):
add = self.add
for i in iterable:
add(i)
return self
__ior__ = update
def union(self, other):
result = self.__class__(self)
result.update(other)
return result
__or__ = union
def intersection(self, other):
other = set(other)
return self.__class__(a for a in self if a in other)
__and__ = intersection
def symmetric_difference(self, other):
other = set(other)
result = self.__class__(a for a in self if a not in other)
result.update(a for a in other if a not in self)
return result
__xor__ = symmetric_difference
def difference(self, other):
other = set(other)
return self.__class__(a for a in self if a not in other)
__sub__ = difference
def intersection_update(self, other):
other = set(other)
set.intersection_update(self, other)
self._list = [ a for a in self._list if a in other]
return self
__iand__ = intersection_update
def symmetric_difference_update(self, other):
set.symmetric_difference_update(self, other)
self._list = [ a for a in self._list if a in self]
self._list += [ a for a in other._list if a in self]
return self
__ixor__ = symmetric_difference_update
def difference_update(self, other):
set.difference_update(self, other)
self._list = [ a for a in self._list if a in self]
return self
__isub__ = difference_update
class IdentitySet(object):
"""A set that considers only object id() for uniqueness.
This strategy has edge cases for builtin types- it's possible to have
two 'foo' strings in one of these sets, for example. Use sparingly.
"""
_working_set = set
def __init__(self, iterable=None):
self._members = dict()
if iterable:
for o in iterable:
self.add(o)
def add(self, value):
self._members[id(value)] = value
def __contains__(self, value):
return id(value) in self._members
def remove(self, value):
del self._members[id(value)]
def discard(self, value):
try:
self.remove(value)
except KeyError:
pass
def pop(self):
try:
pair = self._members.popitem()
return pair[1]
except KeyError:
raise KeyError('pop from an empty set')
def clear(self):
self._members.clear()
def __cmp__(self, other):
raise TypeError('cannot compare sets using cmp()')
def __eq__(self, other):
if isinstance(other, IdentitySet):
return self._members == other._members
else:
return False
def __ne__(self, other):
if isinstance(other, IdentitySet):
return self._members != other._members
else:
return True
def issubset(self, iterable):
other = type(self)(iterable)
if len(self) > len(other):
return False
for m in itertools.ifilterfalse(other._members.__contains__,
self._members.iterkeys()):
return False
return True
def __le__(self, other):
if not isinstance(other, IdentitySet):
return NotImplemented
return self.issubset(other)
def __lt__(self, other):
if not isinstance(other, IdentitySet):
return NotImplemented
return len(self) < len(other) and self.issubset(other)
def issuperset(self, iterable):
other = type(self)(iterable)
if len(self) < len(other):
return False
for m in itertools.ifilterfalse(self._members.__contains__,
other._members.iterkeys()):
return False
return True
def __ge__(self, other):
if not isinstance(other, IdentitySet):
return NotImplemented
return self.issuperset(other)
def __gt__(self, other):
if not isinstance(other, IdentitySet):
return NotImplemented
return len(self) > len(other) and self.issuperset(other)
def union(self, iterable):
result = type(self)()
# testlib.pragma exempt:__hash__
result._members.update(
self._working_set(self._member_id_tuples()).union(_iter_id(iterable)))
return result
def __or__(self, other):
if not isinstance(other, IdentitySet):
return NotImplemented
return self.union(other)
def update(self, iterable):
self._members = self.union(iterable)._members
def __ior__(self, other):
if not isinstance(other, IdentitySet):
return NotImplemented
self.update(other)
return self
def difference(self, iterable):
result = type(self)()
# testlib.pragma exempt:__hash__
result._members.update(
self._working_set(self._member_id_tuples()).difference(_iter_id(iterable)))
return result
def __sub__(self, other):
if not isinstance(other, IdentitySet):
return NotImplemented
return self.difference(other)
def difference_update(self, iterable):
self._members = self.difference(iterable)._members
def __isub__(self, other):
if not isinstance(other, IdentitySet):
return NotImplemented
self.difference_update(other)
return self
def intersection(self, iterable):
result = type(self)()
# testlib.pragma exempt:__hash__
result._members.update(
self._working_set(self._member_id_tuples()).intersection(_iter_id(iterable)))
return result
def __and__(self, other):
if not isinstance(other, IdentitySet):
return NotImplemented
return self.intersection(other)
def intersection_update(self, iterable):
self._members = self.intersection(iterable)._members
def __iand__(self, other):
if not isinstance(other, IdentitySet):
return NotImplemented
self.intersection_update(other)
return self
def symmetric_difference(self, iterable):
result = type(self)()
# testlib.pragma exempt:__hash__
result._members.update(
self._working_set(self._member_id_tuples()).symmetric_difference(_iter_id(iterable)))
return result
def _member_id_tuples(self):
return ((id(v), v) for v in self._members.itervalues())
def __xor__(self, other):
if not isinstance(other, IdentitySet):
return NotImplemented
return self.symmetric_difference(other)
def symmetric_difference_update(self, iterable):
self._members = self.symmetric_difference(iterable)._members
def __ixor__(self, other):
if not isinstance(other, IdentitySet):
return NotImplemented
self.symmetric_difference(other)
return self
def copy(self):
return type(self)(self._members.itervalues())
__copy__ = copy
def __len__(self):
return len(self._members)
def __iter__(self):
return self._members.itervalues()
def __hash__(self):
raise TypeError('set objects are unhashable')
def __repr__(self):
return '%s(%r)' % (type(self).__name__, self._members.values())
class OrderedIdentitySet(IdentitySet):
class _working_set(OrderedSet):
# a testing pragma: exempt the OIDS working set from the test suite's
# "never call the user's __hash__" assertions. this is a big hammer,
# but it's safe here: IDS operates on (id, instance) tuples in the
# working set.
__sa_hash_exempt__ = True
def __init__(self, iterable=None):
IdentitySet.__init__(self)
self._members = OrderedDict()
if iterable:
for o in iterable:
self.add(o)
def _iter_id(iterable):
"""Generator: ((id(o), o) for o in iterable)."""
for item in iterable:
yield id(item), item
# define collections that are capable of storing
# ColumnElement objects as hashable keys/elements.
column_set = set
column_dict = dict
ordered_column_set = OrderedSet
populate_column_dict = PopulateDict
def unique_list(seq, compare_with=set):
seen = compare_with()
return [x for x in seq if x not in seen and not seen.add(x)]
class UniqueAppender(object):
"""Appends items to a collection ensuring uniqueness.
Additional appends() of the same object are ignored. Membership is
determined by identity (``is a``) not equality (``==``).
"""
def __init__(self, data, via=None):
self.data = data
self._unique = IdentitySet()
if via:
self._data_appender = getattr(data, via)
elif hasattr(data, 'append'):
self._data_appender = data.append
elif hasattr(data, 'add'):
# TODO: we think its a set here. bypass unneeded uniquing logic ?
self._data_appender = data.add
def append(self, item):
if item not in self._unique:
self._data_appender(item)
self._unique.add(item)
def __iter__(self):
return iter(self.data)
class ScopedRegistry(object):
"""A Registry that can store one or multiple instances of a single
class on the basis of a "scope" function.
The object implements ``__call__`` as the "getter", so by
calling ``myregistry()`` the contained object is returned
for the current scope.
:param createfunc:
a callable that returns a new object to be placed in the registry
:param scopefunc:
a callable that will return a key to store/retrieve an object.
"""
def __init__(self, createfunc, scopefunc):
"""Construct a new :class:`.ScopedRegistry`.
:param createfunc: A creation function that will generate
a new value for the current scope, if none is present.
:param scopefunc: A function that returns a hashable
token representing the current scope (such as, current
thread identifier).
"""
self.createfunc = createfunc
self.scopefunc = scopefunc
self.registry = {}
def __call__(self):
key = self.scopefunc()
try:
return self.registry[key]
except KeyError:
return self.registry.setdefault(key, self.createfunc())
def has(self):
"""Return True if an object is present in the current scope."""
return self.scopefunc() in self.registry
def set(self, obj):
"""Set the value forthe current scope."""
self.registry[self.scopefunc()] = obj
def clear(self):
"""Clear the current scope, if any."""
try:
del self.registry[self.scopefunc()]
except KeyError:
pass
class ThreadLocalRegistry(ScopedRegistry):
"""A :class:`.ScopedRegistry` that uses a ``threading.local()``
variable for storage.
"""
def __init__(self, createfunc):
self.createfunc = createfunc
self.registry = threading.local()
def __call__(self):
try:
return self.registry.value
except AttributeError:
val = self.registry.value = self.createfunc()
return val
def has(self):
return hasattr(self.registry, "value")
def set(self, obj):
self.registry.value = obj
def clear(self):
try:
del self.registry.value
except AttributeError:
pass
class _symbol(object):
def __init__(self, name):
"""Construct a new named symbol."""
assert isinstance(name, str)
self.name = name
def __reduce__(self):
return symbol, (self.name,)
def __repr__(self):
return "<symbol '%s>" % self.name
_symbol.__name__ = 'symbol'
class symbol(object):
"""A constant symbol.
>>> symbol('foo') is symbol('foo')
True
>>> symbol('foo')
<symbol 'foo>
A slight refinement of the MAGICCOOKIE=object() pattern. The primary
advantage of symbol() is its repr(). They are also singletons.
Repeated calls of symbol('name') will all return the same instance.
"""
symbols = {}
_lock = threading.Lock()
def __new__(cls, name):
cls._lock.acquire()
try:
sym = cls.symbols.get(name)
if sym is None:
cls.symbols[name] = sym = _symbol(name)
return sym
finally:
symbol._lock.release()
def as_interface(obj, cls=None, methods=None, required=None):
"""Ensure basic interface compliance for an instance or dict of callables.
Checks that ``obj`` implements public methods of ``cls`` or has members
listed in ``methods``. If ``required`` is not supplied, implementing at
least one interface method is sufficient. Methods present on ``obj`` that
are not in the interface are ignored.
If ``obj`` is a dict and ``dict`` does not meet the interface
requirements, the keys of the dictionary are inspected. Keys present in
``obj`` that are not in the interface will raise TypeErrors.
Raises TypeError if ``obj`` does not meet the interface criteria.
In all passing cases, an object with callable members is returned. In the
simple case, ``obj`` is returned as-is; if dict processing kicks in then
an anonymous class is returned.
obj
A type, instance, or dictionary of callables.
cls
Optional, a type. All public methods of cls are considered the
interface. An ``obj`` instance of cls will always pass, ignoring
``required``..
methods
Optional, a sequence of method names to consider as the interface.
required
Optional, a sequence of mandatory implementations. If omitted, an
``obj`` that provides at least one interface method is considered
sufficient. As a convenience, required may be a type, in which case
all public methods of the type are required.
"""
if not cls and not methods:
raise TypeError('a class or collection of method names are required')
if isinstance(cls, type) and isinstance(obj, cls):
return obj
interface = set(methods or [m for m in dir(cls) if not m.startswith('_')])
implemented = set(dir(obj))
complies = operator.ge
if isinstance(required, type):
required = interface
elif not required:
required = set()
complies = operator.gt
else:
required = set(required)
if complies(implemented.intersection(interface), required):
return obj
# No dict duck typing here.
if not type(obj) is dict:
qualifier = complies is operator.gt and 'any of' or 'all of'
raise TypeError("%r does not implement %s: %s" % (
obj, qualifier, ', '.join(interface)))
class AnonymousInterface(object):
"""A callable-holding shell."""
if cls:
AnonymousInterface.__name__ = 'Anonymous' + cls.__name__
found = set()
for method, impl in dictlike_iteritems(obj):
if method not in interface:
raise TypeError("%r: unknown in this interface" % method)
if not callable(impl):
raise TypeError("%r=%r is not callable" % (method, impl))
setattr(AnonymousInterface, method, staticmethod(impl))
found.add(method)
if complies(found, required):
return AnonymousInterface
raise TypeError("dictionary does not contain required keys %s" %
', '.join(required - found))
def function_named(fn, name):
"""Return a function with a given __name__.
Will assign to __name__ and return the original function if possible on
the Python implementation, otherwise a new function will be constructed.
"""
try:
fn.__name__ = name
except TypeError:
fn = types.FunctionType(fn.func_code, fn.func_globals, name,
fn.func_defaults, fn.func_closure)
return fn
class memoized_property(object):
"""A read-only @property that is only evaluated once."""
def __init__(self, fget, doc=None):
self.fget = fget
self.__doc__ = doc or fget.__doc__
self.__name__ = fget.__name__
def __get__(self, obj, cls):
if obj is None:
return self
obj.__dict__[self.__name__] = result = self.fget(obj)
return result
class memoized_instancemethod(object):
"""Decorate a method memoize its return value.
Best applied to no-arg methods: memoization is not sensitive to
argument values, and will always return the same value even when
called with different arguments.
"""
def __init__(self, fget, doc=None):
self.fget = fget
self.__doc__ = doc or fget.__doc__
self.__name__ = fget.__name__
def __get__(self, obj, cls):
if obj is None:
return self
def oneshot(*args, **kw):
result = self.fget(obj, *args, **kw)
memo = lambda *a, **kw: result
memo.__name__ = self.__name__
memo.__doc__ = self.__doc__
obj.__dict__[self.__name__] = memo
return result
oneshot.__name__ = self.__name__
oneshot.__doc__ = self.__doc__
return oneshot
def reset_memoized(instance, name):
instance.__dict__.pop(name, None)
class group_expirable_memoized_property(object):
"""A family of @memoized_properties that can be expired in tandem."""
def __init__(self):
self.attributes = []
def expire_instance(self, instance):
"""Expire all memoized properties for *instance*."""
stash = instance.__dict__
for attribute in self.attributes:
stash.pop(attribute, None)
def __call__(self, fn):
self.attributes.append(fn.__name__)
return memoized_property(fn)
class importlater(object):
"""Deferred import object.
e.g.::
somesubmod = importlater("mypackage.somemodule", "somesubmod")
is equivalent to::
from mypackage.somemodule import somesubmod
except evaluted upon attribute access to "somesubmod".
"""
def __init__(self, path, addtl=None):
self._il_path = path
self._il_addtl = addtl
@memoized_property
def _il_module(self):
if self._il_addtl:
m = __import__(self._il_path, globals(), locals(),
[self._il_addtl])
try:
return getattr(m, self._il_addtl)
except AttributeError:
raise ImportError(
"Module %s has no attribute '%s'" %
(self._il_path, self._il_addtl)
)
else:
m = __import__(self._il_path)
for token in self._il_path.split(".")[1:]:
m = getattr(m, token)
return m
def __getattr__(self, key):
try:
attr = getattr(self._il_module, key)
except AttributeError:
raise AttributeError(
"Module %s has no attribute '%s'" %
(self._il_path, key)
)
self.__dict__[key] = attr
return attr
class WeakIdentityMapping(weakref.WeakKeyDictionary):
"""A WeakKeyDictionary with an object identity index.
Adds a .by_id dictionary to a regular WeakKeyDictionary. Trades
performance during mutation operations for accelerated lookups by id().
The usual cautions about weak dictionaries and iteration also apply to
this subclass.
"""
_none = symbol('none')
def __init__(self):
weakref.WeakKeyDictionary.__init__(self)
self.by_id = {}
self._weakrefs = {}
def __setitem__(self, object, value):
oid = id(object)
self.by_id[oid] = value
if oid not in self._weakrefs:
self._weakrefs[oid] = self._ref(object)
weakref.WeakKeyDictionary.__setitem__(self, object, value)
def __delitem__(self, object):
del self._weakrefs[id(object)]
del self.by_id[id(object)]
weakref.WeakKeyDictionary.__delitem__(self, object)
def setdefault(self, object, default=None):
value = weakref.WeakKeyDictionary.setdefault(self, object, default)
oid = id(object)
if value is default:
self.by_id[oid] = default
if oid not in self._weakrefs:
self._weakrefs[oid] = self._ref(object)
return value
def pop(self, object, default=_none):
if default is self._none:
value = weakref.WeakKeyDictionary.pop(self, object)
else:
value = weakref.WeakKeyDictionary.pop(self, object, default)
if id(object) in self.by_id:
del self._weakrefs[id(object)]
del self.by_id[id(object)]
return value
def popitem(self):
item = weakref.WeakKeyDictionary.popitem(self)
oid = id(item[0])
del self._weakrefs[oid]
del self.by_id[oid]
return item
def clear(self):
# Py2K
# in 3k, MutableMapping calls popitem()
self._weakrefs.clear()
self.by_id.clear()
# end Py2K
weakref.WeakKeyDictionary.clear(self)
def update(self, *a, **kw):
raise NotImplementedError
def _cleanup(self, wr, key=None):
if key is None:
key = wr.key
try:
del self._weakrefs[key]
except (KeyError, AttributeError): # pragma: no cover
pass # pragma: no cover
try:
del self.by_id[key]
except (KeyError, AttributeError): # pragma: no cover
pass # pragma: no cover
class _keyed_weakref(weakref.ref):
def __init__(self, object, callback):
weakref.ref.__init__(self, object, callback)
self.key = id(object)
def _ref(self, object):
return self._keyed_weakref(object, self._cleanup)
import time
if win32 or jython:
time_func = time.clock
else:
time_func = time.time
class LRUCache(dict):
"""Dictionary with 'squishy' removal of least
recently used items.
"""
def __init__(self, capacity=100, threshold=.5):
self.capacity = capacity
self.threshold = threshold
def __getitem__(self, key):
item = dict.__getitem__(self, key)
item[2] = time_func()
return item[1]
def values(self):
return [i[1] for i in dict.values(self)]
def setdefault(self, key, value):
if key in self:
return self[key]
else:
self[key] = value
return value
def __setitem__(self, key, value):
item = dict.get(self, key)
if item is None:
item = [key, value, time_func()]
dict.__setitem__(self, key, item)
else:
item[1] = value
self._manage_size()
def _manage_size(self):
while len(self) > self.capacity + self.capacity * self.threshold:
bytime = sorted(dict.values(self),
key=operator.itemgetter(2),
reverse=True)
for item in bytime[self.capacity:]:
try:
del self[item[0]]
except KeyError:
# if we couldnt find a key, most
# likely some other thread broke in
# on us. loop around and try again
break
def warn(msg, stacklevel=3):
if isinstance(msg, basestring):
warnings.warn(msg, exc.SAWarning, stacklevel=stacklevel)
else:
warnings.warn(msg, stacklevel=stacklevel)
def warn_deprecated(msg, stacklevel=3):
warnings.warn(msg, exc.SADeprecationWarning, stacklevel=stacklevel)
def warn_pending_deprecation(msg, stacklevel=3):
warnings.warn(msg, exc.SAPendingDeprecationWarning, stacklevel=stacklevel)
def deprecated(version, message=None, add_deprecation_to_docstring=True):
"""Decorates a function and issues a deprecation warning on use.
:param message:
If provided, issue message in the warning. A sensible default
is used if not provided.
:param add_deprecation_to_docstring:
Default True. If False, the wrapped function's __doc__ is left
as-is. If True, the 'message' is prepended to the docs if
provided, or sensible default if message is omitted.
"""
if add_deprecation_to_docstring:
header = ".. deprecated:: %s %s" % \
(version, (message or ''))
else:
header = None
if message is None:
message = "Call to deprecated function %(func)s"
def decorate(fn):
return _decorate_with_warning(
fn, exc.SADeprecationWarning,
message % dict(func=fn.__name__), header)
return decorate
def pending_deprecation(version, message=None,
add_deprecation_to_docstring=True):
"""Decorates a function and issues a pending deprecation warning on use.
:param version:
An approximate future version at which point the pending deprecation
will become deprecated. Not used in messaging.
:param message:
If provided, issue message in the warning. A sensible default
is used if not provided.
:param add_deprecation_to_docstring:
Default True. If False, the wrapped function's __doc__ is left
as-is. If True, the 'message' is prepended to the docs if
provided, or sensible default if message is omitted.
"""
if add_deprecation_to_docstring:
header = ".. deprecated:: %s (pending) %s" % \
(version, (message or ''))
else:
header = None
if message is None:
message = "Call to deprecated function %(func)s"
def decorate(fn):
return _decorate_with_warning(
fn, exc.SAPendingDeprecationWarning,
message % dict(func=fn.__name__), header)
return decorate
def _sanitize_rest(text):
def repl(m):
type_, name = m.group(1, 2)
if type_ in ("func", "meth"):
name += "()"
return name
return re.sub(r'\:(\w+)\:`~?\.?(.+?)`', repl, text)
def _decorate_with_warning(func, wtype, message, docstring_header=None):
"""Wrap a function with a warnings.warn and augmented docstring."""
message = _sanitize_rest(message)
@decorator
def warned(fn, *args, **kwargs):
warnings.warn(wtype(message), stacklevel=3)
return fn(*args, **kwargs)
doc = func.__doc__ is not None and func.__doc__ or ''
if docstring_header is not None:
docstring_header %= dict(func=func.__name__)
docs = doc and doc.expandtabs().split('\n') or []
indent = ''
for line in docs[1:]:
text = line.lstrip()
if text:
indent = line[0:len(line) - len(text)]
break
point = min(len(docs), 1)
docs.insert(point, '\n' + indent + docstring_header.rstrip())
doc = '\n'.join(docs)
decorated = warned(func)
decorated.__doc__ = doc
return decorated
class classproperty(property):
"""A decorator that behaves like @property except that operates
on classes rather than instances.
The decorator is currently special when using the declarative
module, but note that the
:class:`~.sqlalchemy.ext.declarative.declared_attr`
decorator should be used for this purpose with declarative.
"""
def __init__(self, fget, *arg, **kw):
super(classproperty, self).__init__(fget, *arg, **kw)
self.__doc__ = fget.__doc__
def __get__(desc, self, cls):
return desc.fget(cls)