"""
Miscellaneous mapping related classes and functionality
"""
__all__ = (
"DictMixin",
"LazyValDict",
"LazyFullValLoadDict",
"ProtectedDict",
"ImmutableDict",
"IndeterminantDict",
"defaultdictkey",
"AttrAccessible",
"StackedDict",
"make_SlottedDict_kls",
"ProxiedAttrs",
)
import operator
from collections import defaultdict
from collections.abc import Mapping, MutableSet, Set
from functools import partial
from itertools import chain, filterfalse, islice
from .klass import contains, get, sentinel, steal_docs
[docs]
class DictMixin:
"""
new style class replacement for :py:func:`UserDict.DictMixin`
designed around iter* methods rather then forcing lists as DictMixin does
To use this mixin, you need to define the following methods:
* __delitem__
* __setitem__
* __getitem__
* keys
It's suggested for performance reasons, it might be worth defining
`values` and `items` in addition.
"""
__slots__ = ()
__externally_mutable__ = True
def __init__(self, iterable=None, **kwargs):
"""
:param iterables: optional, an iterable of (key, value) to initialize this
instance with
:param kwargs: optional, key=value form of specifying the keys value tuples to
store in this instance.
"""
if iterable is not None:
self.update(iterable)
if kwargs:
self.update(kwargs.items())
@steal_docs(dict)
def __iter__(self):
return self.keys()
@steal_docs(dict)
def __str__(self):
return str(dict(self.items()))
[docs]
@steal_docs(dict)
def items(self):
for k in self:
yield k, self[k]
[docs]
@steal_docs(dict)
def keys(self):
raise NotImplementedError(self, "keys")
[docs]
@steal_docs(dict)
def values(self):
return map(self.__getitem__, self)
[docs]
@steal_docs(dict)
def update(self, iterable):
for k, v in iterable:
self[k] = v
get = get
__contains__ = contains
@steal_docs(dict)
def __eq__(self, other):
if len(self) != len(other):
return False
for k1, k2 in zip(sorted(self), sorted(other)):
if k1 != k2:
return False
if self[k1] != other[k2]:
return False
return True
@steal_docs(dict)
def __ne__(self, other):
return not self.__eq__(other)
[docs]
@steal_docs(dict)
def pop(self, key, default=sentinel):
if not self.__externally_mutable__:
raise AttributeError(self, "pop")
try:
val = self[key]
del self[key]
except KeyError:
if default is not sentinel:
return default
raise
return val
[docs]
@steal_docs(dict)
def setdefault(self, key, default=None):
if not self.__externally_mutable__:
raise AttributeError(self, "setdefault")
if key in self:
return self[key]
self[key] = default
return default
def __getitem__(self, key):
raise NotImplementedError(self, "__getitem__")
def __setitem__(self, key, val):
if not self.__externally_mutable__:
raise AttributeError(self, "__setitem__")
raise NotImplementedError(self, "__setitem__")
def __delitem__(self, key):
if not self.__externally_mutable__:
raise AttributeError(self, "__delitem__")
raise NotImplementedError(self, "__delitem__")
[docs]
@steal_docs(dict)
def clear(self):
if not self.__externally_mutable__:
raise AttributeError(self, "clear")
# yes, a bit ugly, but this works and is py3k compatible
# post conversion
df = self.__delitem__
for key in list(self.keys()):
df(key)
def __len__(self):
c = 0
for _ in self:
c += 1
return c
def __bool__(self):
for _ in self:
return True
return False
[docs]
@steal_docs(dict)
def popitem(self):
if not self.__externally_mutable__:
raise AttributeError(self, "popitem")
# do it this way so python handles the stopiteration; faster
for key, val in self.items():
del self[key]
return key, val
raise KeyError("container is empty")
[docs]
class LazyValDict(DictMixin):
"""Mapping that loads values via a callable.
given a function to get keys, and to look up the val for those keys, it'll
lazily load key definitions and values as requested
"""
__slots__ = ("_keys", "_keys_func", "_vals", "_val_func")
__externally_mutable__ = False
def __init__(self, get_keys_func, get_val_func):
"""
:param get_keys_func: either a container, or func to call to get keys.
:param get_val_func: a callable that is JIT called
with the key requested.
"""
if not callable(get_val_func):
raise TypeError("get_val_func isn't a callable")
if hasattr(get_keys_func, "__iter__"):
self._keys = get_keys_func
self._keys_func = None
else:
if not callable(get_keys_func):
raise TypeError("get_keys_func isn't iterable or callable")
self._keys_func = get_keys_func
self._val_func = get_val_func
self._vals = {}
def __getitem__(self, key):
if self._keys_func is not None:
self._keys = set(self._keys_func())
self._keys_func = None
if key in self._vals:
return self._vals[key]
if key in self._keys:
v = self._vals[key] = self._val_func(key)
return v
raise KeyError(key)
[docs]
def keys(self):
if self._keys_func is not None:
self._keys = set(self._keys_func())
self._keys_func = None
return iter(self._keys)
[docs]
def values(self):
return map(self.__getitem__, self.keys())
[docs]
def items(self):
return ((k, self[k]) for k in self.keys())
def __contains__(self, key):
if self._keys_func is not None:
self._keys = set(self._keys_func())
self._keys_func = None
return key in self._keys
def __len__(self):
if self._keys_func is not None:
self._keys = set(self._keys_func())
self._keys_func = None
return len(self._keys)
[docs]
class LazyFullValLoadDict(LazyValDict):
"""Lazily load all keys for this mapping in a single load.
This is essentially the same thing as :py:class:`LazyValDict`, just that the
load function must return all keys in a single request.
The val function must still return values one by one per key.
"""
__slots__ = ()
def __getitem__(self, key):
if self._keys_func is not None:
self._keys = set(self._keys_func())
self._keys_func = None
if key in self._vals:
return self._vals[key]
if key in self._keys:
if self._val_func is not None:
self._vals.update(self._val_func(self._keys))
return self._vals[key]
raise KeyError(key)
[docs]
class ProtectedDict(DictMixin):
"""Mapping wrapper storing changes to a dict without modifying the original.
Changes are stored in a secondary dict, protecting the underlying
mapping from changes.
"""
__slots__ = ("orig", "new", "blacklist")
def __init__(self, orig):
"""
:param orig: original dictionary to wrap
"""
self.orig = orig
self.new = {}
self.blacklist = {}
def __setitem__(self, key, val):
self.new[key] = val
if key in self.blacklist:
del self.blacklist[key]
def __getitem__(self, key):
if key in self.new:
return self.new[key]
if key in self.blacklist:
raise KeyError(key)
return self.orig[key]
def __delitem__(self, key):
if key in self.new:
del self.new[key]
self.blacklist[key] = True
return
elif key in self.orig:
if key not in self.blacklist:
self.blacklist[key] = True
return
raise KeyError(key)
[docs]
def keys(self):
for k in self.new:
yield k
for k in self.orig.keys():
if k not in self.blacklist and k not in self.new:
yield k
def __contains__(self, key):
return key in self.new or (key not in self.blacklist and key in self.orig)
[docs]
class ImmutableDict(Mapping):
"""Immutable dict, unchangeable after instantiating.
Because this is immutable, it's hashable.
"""
def __init__(self, data=None):
if isinstance(data, ImmutableDict):
mapping = data._dict
elif isinstance(data, Mapping):
mapping = data
elif isinstance(data, DictMixin):
mapping = dict(data.items())
elif data is None:
mapping = {}
else:
try:
mapping = dict(data)
except TypeError as exc:
raise TypeError(f"unsupported data format: {exc}")
object.__setattr__(self, "_dict", mapping)
def __getitem__(self, key):
# hack to avoid recursion exceptions for subclasses that use
# inject_getitem_as_getattr()
if key == "_dict":
return object.__getattribute__(self, "_dict")
return self._dict[key]
def __iter__(self):
return iter(self._dict)
def __reversed__(self):
return reversed(self._dict)
def __len__(self):
return len(self._dict)
def __repr__(self):
return str(self._dict)
def __str__(self):
return str(self._dict)
def __hash__(self):
return hash(tuple(sorted(self._dict.items(), key=operator.itemgetter(0))))
class OrderedFrozenSet(Set):
"""Ordered, immutable set using guaranteed insertion order dicts in py3.6 onwards."""
def __init__(self, iterable=()):
try:
self._dict = ImmutableDict({x: None for x in iterable})
except TypeError as exc:
raise TypeError("not iterable") from exc
def __contains__(self, key):
return key in self._dict
def __iter__(self):
return iter(self._dict)
def __getitem__(self, key):
if isinstance(key, int):
try:
return next(islice(self._dict, key, None))
except StopIteration:
raise IndexError("index out of range")
# handle keys using slice notation
return self.__class__(list(self._dict)[key])
def __reversed__(self):
return reversed(self._dict)
def __len__(self):
return len(self._dict)
def __eq__(self, other):
return set(self._dict) == other
def __str__(self):
elements_str = ", ".join(map(repr, self._dict))
return f"{{{elements_str}}}"
def __repr__(self):
return self.__str__()
def __hash__(self):
return hash(self._dict)
def intersection(self, other):
return self.__class__(self._dict.keys() & other)
def union(self, other):
return self.__class__(self._dict.keys() | other)
def difference(self, other):
return self.__class__(self._dict.keys() - other)
def symmetric_difference(self, other):
return self.__class__(self._dict.keys() ^ other)
class OrderedSet(OrderedFrozenSet, MutableSet):
"""Ordered, mutable set using guaranteed insertion order dicts in py3.6 onwards."""
def __init__(self, iterable=()):
try:
self._dict = {x: None for x in iterable}
except TypeError as exc:
raise TypeError("not iterable") from exc
def add(self, value):
self._dict[value] = None
def discard(self, value):
try:
del self._dict[value]
except KeyError:
pass
def remove(self, value):
del self._dict[value]
def clear(self):
self._dict = {}
def update(self, iterable):
self._dict.update((x, None) for x in iterable)
def __hash__(self):
raise TypeError(f"unhashable type: {self.__class__.__name__!r}")
[docs]
class IndeterminantDict:
"""A wrapped dict with constant defaults, and a function for other keys.
The primary use for this class is to make a JIT loaded mapping- for instance, a
mapping representing the filesystem that loads keys/values as it goes.
"""
__slots__ = ("__initial", "__pull")
def __init__(self, pull_func, starter_dict=None):
object.__init__(self)
if starter_dict is None:
self.__initial = {}
else:
self.__initial = starter_dict
self.__pull = pull_func
def __getitem__(self, key):
if key in self.__initial:
return self.__initial[key]
else:
return self.__pull(key)
[docs]
def get(self, key, val=None):
try:
return self[key]
except KeyError:
return val
def __hash__(self):
raise TypeError("unhashable")
pop = get
def __unmodifiable(func, *args):
raise TypeError(f"indeterminate dict: '{func}()' can't modify {args!r}")
for func in (
"__delitem__",
"__setitem__",
"setdefault",
"popitem",
"update",
"clear",
):
locals()[func] = partial(__unmodifiable, func)
def __indeterminate(func, *args):
raise TypeError(f"indeterminate dict: '{func}()' is inaccessible")
for func in ("__iter__", "__len__", "keys", "values", "items"):
locals()[func] = partial(__indeterminate, func)
[docs]
class StackedDict(DictMixin):
"""An unmodifiable dict that makes multiple dicts appear as one"""
def __init__(self, *dicts):
self._dicts = dicts
def __getitem__(self, key):
for x in self._dicts:
if key in x:
return x[key]
raise KeyError(key)
[docs]
def keys(self):
s = set()
for k in filterfalse(s.__contains__, chain(*self._dicts)):
s.add(k)
yield k
def __contains__(self, key):
for x in self._dicts:
if key in x:
return True
return False
def __setitem__(self, *a):
raise TypeError("unmodifiable")
__delitem__ = clear = __setitem__
class PreservingFoldingDict(DictMixin):
"""dict that uses a 'folder' function when looking up keys.
The most common use for this is to implement a dict with
case-insensitive key values (by using ``str.lower`` as folder
function).
This version returns the original 'unfolded' key.
"""
def __init__(self, folder, sourcedict=None):
self._folder = folder
# dict mapping folded keys to (original key, value)
self._dict = {}
if sourcedict is not None:
self.update(sourcedict)
def copy(self):
return PreservingFoldingDict(self._folder, iter(self.items()))
def refold(self, folder=None):
"""Use the remembered original keys to update to a new folder.
If folder is None, keep the current folding function (this
is useful if the folding function uses external data and that
data changed).
"""
if folder is not None:
self._folder = folder
oldDict = self._dict
self._dict = {}
for key, value in oldDict.values():
self._dict[self._folder(key)] = (key, value)
def __getitem__(self, key):
return self._dict[self._folder(key)][1]
def __setitem__(self, key, value):
self._dict[self._folder(key)] = (key, value)
def __delitem__(self, key):
del self._dict[self._folder(key)]
def items(self):
return iter(self._dict.values())
def keys(self):
for val in self._dict.values():
yield val[0]
def values(self):
for val in self._dict.values():
yield val[1]
def __contains__(self, key):
return self._folder(key) in self._dict
def __len__(self):
return len(self._dict)
def clear(self):
self._dict = {}
class NonPreservingFoldingDict(DictMixin):
"""dict that uses a 'folder' function when looking up keys.
The most common use for this is to implement a dict with
case-insensitive key values (by using ``str.lower`` as folder
function).
This version returns the 'folded' key.
"""
def __init__(self, folder, sourcedict=None):
self._folder = folder
# dict mapping folded keys to values.
self._dict = {}
if sourcedict is not None:
self.update(sourcedict)
def copy(self):
return NonPreservingFoldingDict(self._folder, iter(self.items()))
def __getitem__(self, key):
return self._dict[self._folder(key)]
def __setitem__(self, key, value):
self._dict[self._folder(key)] = value
def __delitem__(self, key):
del self._dict[self._folder(key)]
def keys(self):
return iter(self._dict.keys())
def values(self):
return iter(self._dict.values())
def items(self):
return iter(self._dict.items())
def __contains__(self, key):
return self._folder(key) in self._dict
def __len__(self):
return len(self._dict)
def clear(self):
self._dict = {}
[docs]
class defaultdictkey(defaultdict):
""":py:class:`defaultdict` derivative that automatically stores any missing key/value pairs.
Specifically, if instance[missing_key] is accessed, the `__missing__` method automatically
store self[missing_key] = self.default_factory(key).
"""
__slots__ = ()
def __init__(self, default_factory):
# we have our own init to explicitly force via prototype
# that a default_factory is required
defaultdict.__init__(self, default_factory)
@steal_docs(defaultdict)
def __missing__(self, key):
obj = self[key] = self.default_factory(key)
return obj
def _KeyError_to_Attr(functor):
def inner(self, *args):
try:
return functor(self, *args)
except KeyError:
raise AttributeError(args[0])
inner.__name__ = functor.__name__
inner.__doc__ = functor.__doc__
return inner
def inject_getitem_as_getattr(scope):
"""Modify a given class scope proxying attr access to dict access.
If the given scope already has __getattr__, __setattr__, or __delattr__,
the pre-existing method will not be overridden.
Example usage:
>>> class my_options(dict):
... inject_getitem_as_getattr(locals())
>>>
>>> d = my_options(asdf=1)
>>> print(d.asdf)
1
>>> d.asdf = 2
>>> print(d.asdf)
2
>>> del d.asdf
>>> print('asdf' in d)
False
>>> print(hasattr(d, 'asdf'))
False
:param scope: the scope of a class to modify, adding methods as needed
"""
scope.setdefault("__getattr__", _KeyError_to_Attr(operator.__getitem__))
scope.setdefault("__delattr__", _KeyError_to_Attr(operator.__delitem__))
scope.setdefault("__setattr__", _KeyError_to_Attr(operator.__setitem__))
[docs]
class AttrAccessible(dict):
"""Simple dict class allowing instance.x and instance['x'] access."""
__slots__ = ()
inject_getitem_as_getattr(locals())
[docs]
class ProxiedAttrs(DictMixin):
"""Proxy mapping protocol to an object's attributes.
Example usage:
>>> class foo:
... pass
>>> obj = foo()
>>> obj.x, obj.y = 1, 2
>>> d = ProxiedAttrs(obj)
>>> print(d['x'])
1
>>> del d['x']
>>> print(hasattr(obj, 'x'))
False
:param target: The object to wrap.
"""
__slots__ = ("__target__",)
def __init__(self, target):
self.__target__ = target
def __getitem__(self, key):
try:
return getattr(self.__target__, key)
except AttributeError:
raise KeyError(key)
def __setitem__(self, key, value):
try:
return setattr(self.__target__, key, value)
except AttributeError:
raise KeyError(key)
def __delitem__(self, key):
try:
return delattr(self.__target__, key)
except AttributeError:
raise KeyError(key)
[docs]
def keys(self):
return iter(dir(self.__target__))
class _SlottedDict(DictMixin):
"""A space efficient mapping class with a limited set of keys.
Specifically, this class has its __slots__ locked to the passed in keys-
this eliminates the allocation of a dict for the instance thus avoiding the
wasted memory common to dictionary overallocation- for small mappings that
waste is roughly 75%, for 100 item mappings it's roughly 95%, and for 1000
items it's roughly 84%. Point is, it's sizable, consistently so.
The constraint of this is that the resultant mapping has a locked set of
keys- you cannot add a key that wasn't allowed up front.
This functionality is primarily useful when you'll be generating many
dict instances, all with a common set of allowed keys.
:param keys: iterable/sequence of keys to allow in the resultant mapping
Example usage:
>>> from snakeoil.mappings import make_SlottedDict_kls
>>> import sys
>>> my_kls = make_SlottedDict_kls(["key1", "key2", "key3"])
>>> items = (("key1", 1), ("key2", 2), ("key3",3))
>>> inst = dict(items)
>>> slotted_inst = my_kls(items)
>>> print(sys.getsizeof(inst))
280
>>> print(sys.getsizeof(slotted_inst))
72
>>> # and now for an extreme example:
>>> raw = {"attribute%i" % (x,): x for x in range(1000)}
>>> skls = make_SlottedDict_kls(raw.keys())
>>> print(sys.getsizeof(raw))
49432
>>> sraw = skls(raw.items())
>>> print(sys.getsizeof(sraw))
8048
>>> print(sraw["attribute2"], sraw["attribute3"])
2 3
Note that those stats are for a 64bit python 2.6.5 VM. The stats may
differ for other python implementations or versions, although for cpython
the stats above should hold +/- a couple of bites.
Finally, it's worth noting that the stats above are the minimal savings-
via a side affect of the __slots__ the keys are automatically interned.
This means that if you have 100 instances floating around, for dict's
that costs you sizeof(key) * 100, for slotted dict instances you pay
sizeof(key) due to the interning.
"""
__slots__ = ()
__externally_mutable__ = True
def __init__(self, iterables=()):
if iterables:
self.update(iterables)
__setitem__ = object.__setattr__
def __getitem__(self, key):
try:
return getattr(self, key)
except AttributeError:
raise KeyError(key)
def __delitem__(self, key):
try:
delattr(self, key)
except AttributeError:
raise KeyError(key)
def __contains__(self, key):
return hasattr(self, key)
def update(self, iterable):
for k, v in iterable:
setattr(self, k, v)
def pop(self, key, *a):
# faster then the exception form...
l = len(a)
if l > 1:
raise TypeError("pop accepts 1 or 2 args only")
o = getattr(self, key, sentinel)
if o is not sentinel:
object.__delattr__(self, key)
elif l:
o = a[0]
else:
raise KeyError(key)
return o
def get(self, key, default=None):
return getattr(self, key, default)
def __iter__(self):
for k in self.__slots__:
if hasattr(self, k):
yield k
def keys(self):
return iter(self)
def values(self):
for k in self:
yield self[k]
def clear(self):
for k in self:
del self[k]
def __len__(self):
return len(list(self.keys()))
[docs]
def make_SlottedDict_kls(keys):
"""Create a space efficient mapping class with a limited set of keys."""
new_keys = tuple(sorted(keys))
cls_name = f"SlottedDict_{hash(new_keys)}"
o = globals().get(cls_name, None)
if o is None:
o = type(cls_name, (_SlottedDict,), {})
o.__slots__ = new_keys
globals()[cls_name] = o
return o