"""
misc. stuff we've not found a spot for yet.
"""
__all__ = (
"ChunkedDataDict",
"IncrementalsDict",
"PayloadDict",
"chunked_data",
"collapsed_restrict_to_data",
"get_relative_dosym_target",
"incremental_chunked",
"incremental_expansion",
"incremental_expansion_license",
"non_incremental_collapsed_restrict_to_data",
"optimize_incrementals",
"sort_keywords",
)
import os.path
from collections import defaultdict, namedtuple
from functools import partial
from itertools import chain
from snakeoil import mappings
from snakeoil.klass import alias_method, generic_equality
from snakeoil.sequences import iflatten_instance
from ..restrictions import boolean, packages, restriction
from . import atom
restrict_payload = namedtuple("restrict_data", ["restrict", "data"])
chunked_data = namedtuple("chunked_data", ("key", "neg", "pos"))
[docs]
def sort_keywords(keywords):
"""Sort keywords in the proper order: i.e. glob-arches, arch, prefix-arches."""
def _sort_kwds(kw):
parts = tuple(reversed(kw.lstrip("~-").partition("-")))
return parts[0], parts[2]
return sorted(keywords, key=_sort_kwds)
[docs]
def optimize_incrementals(sequence):
# roughly the algorithm walks sequences right->left,
# identifying terminal points for incrementals; aka, -x x, 'x'
# is the terminal point- no point in having -x.
finalized = set()
for item in reversed(sequence):
if item[0] == "-":
i = item[1:]
if not i:
raise ValueError("encountered an incomplete negation (just -, no flag)")
if i == "*":
# seen enough.
yield item
return
if i not in finalized:
finalized.add(i)
yield item
else:
if item not in finalized:
yield item
finalized.add(item)
[docs]
def incremental_chunked(orig: set[str], iterables):
for cinst in iterables:
if "*" in cinst.neg: # remove all previous set flags
orig.clear()
for flag in cinst.neg:
if flag.endswith("_*"): # remove previous USE_EXPAND
drop = [f for f in orig if f.startswith(flag[:-2])]
orig.difference_update(drop)
orig.difference_update(cinst.neg)
orig.update(cinst.pos)
[docs]
def incremental_expansion(iterable, orig=None, msg_prefix="", finalize=True):
if orig is None:
orig = set()
for token in iterable:
if token[0] == "-":
i = token[1:]
if not i:
raise ValueError(
f"{msg_prefix} encountered an incomplete negation, '-'"
)
if i == "*":
orig.clear()
else:
orig.discard(i)
if not finalize:
orig.add(token)
else:
orig.discard("-" + token)
orig.add(token)
return orig
[docs]
def incremental_expansion_license(
pkg, licenses, license_groups, iterable, msg_prefix=""
):
seen = set()
for token in iterable:
if token[0] == "-":
i = token[1:]
if not i:
raise ValueError(
f"{pkg}: {msg_prefix}encountered an incomplete negation, '-'"
)
if i == "*":
seen.clear()
else:
if i[0] == "@":
i = i[1:]
if not i:
raise ValueError(
f"{pkg}: {msg_prefix}encountered an incomplete negation"
" of a license group, '-@'"
)
seen.difference_update(license_groups.get(i, ()))
else:
seen.discard(i)
elif token[0] == "@":
i = token[1:]
if not i:
raise ValueError(
f"{pkg}: {msg_prefix}encountered an incomplete license group, '@'"
)
seen.update(license_groups.get(i, ()))
elif token == "*":
seen.update(licenses)
else:
seen.add(token)
return seen
[docs]
class IncrementalsDict(mappings.DictMixin):
disable_py3k_rewriting = True
def __init__(self, incrementals, **kwds):
self._incrementals = incrementals
self._dict = {}
super().__init__(**kwds)
def __setitem__(self, key, value):
if key in self._incrementals:
if key in self._dict:
self._dict[key] += f" {value}"
else:
self._dict[key] = value
else:
self._dict[key] = value
for x in "getitem delitem len iter".split():
x = f"__{x}__"
locals()[x] = alias_method(f"_dict.{x}")
s = "pop clear keys items values"
for x in s.split():
locals()[x] = alias_method(f"_dict.{x}")
del x, s
[docs]
class collapsed_restrict_to_data(metaclass=generic_equality):
__attr_comparison__ = ("defaults", "freeform", "atoms", "__class__")
incremental = True
def __init__(self, *restrict_sources, **kwds):
"""
descriptive, no?
Basically splits an iterable of restrict:data into
level of specificity, repo, cat, pkg, atom (dict) for use
in filters
Finally, a finalize_defaults kwd is supported to control whether
incremental_expansion finalizes the initial defaults list.
defaults to True.
"""
always = []
repo = []
cat = []
pkg = []
multi = []
atom_d = {}
for restrict_pairs in restrict_sources:
for a, data in restrict_pairs:
if not data:
continue
if isinstance(a, restriction.AlwaysBool):
# yes, odd attr name, but negate holds the val to return.
# note also, we're dropping AlwaysFalse; it'll never match.
if a.negate:
always.extend(data)
for atomlist in atom_d.values():
atomlist.append(
(
a,
set(
[flag for flag in data if flag.startswith("-")]
),
)
)
elif isinstance(a, atom.atom):
atom_d.setdefault(a.key, []).append((a, data))
elif isinstance(a, boolean.AndRestriction):
multi.append((a, data))
elif isinstance(a, packages.PackageRestriction):
if a.attr == "category":
cat.append((a, data))
elif a.attr == "package":
pkg.append((a, data))
elif a.attr == "repo.repo_id":
repo.append((a, data))
else:
raise ValueError(
f"{a!r} doesn't operate on "
f"package/category/repo: data {data!r}"
)
else:
raise ValueError(
f"{a!r} is not an AlwaysBool, PackageRestriction, "
f"or atom: data {data!r}"
)
if always:
always = incremental_expansion(
always, finalize=kwds.get("finalize_defaults", True)
)
else:
always = set()
self.defaults = always
self.defaults_finalized = set(x for x in self.defaults if not x.startswith("-"))
self.freeform = tuple(x for x in (repo, cat, pkg, multi) if x)
self.atoms = atom_d
[docs]
def pull_data(self, pkg, force_copy=False, pre_defaults=()):
l = []
for specific in self.freeform:
for restrict, data in specific:
if restrict.match(pkg):
l.append(data)
for atom, data in self.atoms.get(pkg.key, ()):
if atom.match(pkg):
l.append(data)
if pre_defaults:
s = set(pre_defaults)
incremental_expansion(self.defaults, orig=s)
else:
s = set(self.defaults_finalized)
if l:
incremental_expansion(iflatten_instance(l), orig=s)
return s
[docs]
def iter_pull_data(self, pkg, pre_defaults=()):
for item in pre_defaults:
yield item
for item in self.defaults:
yield item
for specific in self.freeform:
for restrict, data in specific:
if restrict.match(pkg):
for item in data:
yield item
for atom, data in self.atoms.get(pkg.key, ()):
if atom.match(pkg):
for item in data:
yield item
[docs]
class non_incremental_collapsed_restrict_to_data(collapsed_restrict_to_data):
[docs]
def pull_data(self, pkg, force_copy=False):
l = []
for specific in self.freeform:
for restrict, data in specific:
if restrict.match(pkg):
l.append(data)
for atom, data in self.atoms.get(pkg.key, ()):
if atom.match(pkg):
l.append(data)
if not l:
if force_copy:
return set(self.defaults)
return self.defaults
s = set(self.defaults)
s.update(iflatten_instance(l))
return s
[docs]
def iter_pull_data(self, pkg):
l = [self.defaults]
for specific in self.freeform:
l.extend(data for restrict, data in specific if restrict.match(pkg))
for atom, data in self.atoms.get(pkg.key, ()):
if atom.match(pkg):
l.append(data)
if len(l) == 1:
return iter(self.defaults)
return iflatten_instance(l)
def _cached_build_cp_atom_payload(cache, sequence, restrict, payload_form=False):
sequence = list(sequence)
key = (payload_form, restrict, tuple(sequence))
val = cache.get(key)
if val is None:
val = cache[key] = _build_cp_atom_payload(
sequence, restrict, payload_form=payload_form
)
return val
def _build_cp_atom_payload(sequence, restrict, payload_form=False):
locked = {}
ldefault = locked.setdefault
l = []
if payload_form:
def f(r, neg, pos):
return restrict_payload(r, tuple(chain(("-" + x for x in neg), pos)))
else:
f = chunked_data
i = list(sequence)
if len(i) <= 1:
if not i:
return ()
return (f(i[0].key, i[0].neg, i[0].pos),)
i = reversed(i)
for data in i:
if data.key == packages.AlwaysTrue or getattr(data.key, "is_simple", False):
for n in data.neg:
ldefault(n, False)
for p in data.pos:
ldefault(p, True)
continue
neg = tuple(x for x in data.neg if x not in locked)
pos = tuple(x for x in data.pos if x not in locked)
if neg or pos:
l.append((data.key, neg, pos))
# thus far we've done essentially a tracing for R->L, of globals,
# this leaves d-u/a X, =d-u/a-1 X # slipping through however,
# since the specific is later. Plus it's reversed from what we want.
# so we rebuild, but apply the same global trick as we go.
if not locked:
# all is specific/non-simple, just reverse and return
return tuple(f(*vals) for vals in reversed(l))
new_l = [
f(
restrict,
tuple(k for k, v in locked.items() if not v), # neg
tuple(k for k, v in locked.items() if v), # pos
)
]
# we exploit a few things this time around in reusing the algo from above
# we know there is only going to be one global (which we just added),
# and that everything is specific.
lget = locked.get
for key, neg, pos in reversed(l):
# only grab the deltas; if a + becomes a specific -
neg = tuple(x for x in neg if lget(x, True))
pos = tuple(x for x in pos if not lget(x, False))
if neg or pos:
new_l.append(f(key, neg, pos))
return tuple(new_l)
[docs]
class ChunkedDataDict(metaclass=generic_equality):
__attr_comparison__ = ("_global_settings", "_dict")
def __init__(self):
self._global_settings = []
self._dict = defaultdict(partial(list, self._global_settings))
@property
def frozen(self):
return isinstance(self._dict, mappings.ImmutableDict)
[docs]
def clone(self, unfreeze=False):
obj = self.__class__()
if self.frozen and not unfreeze:
obj._dict = self._dict
obj._global_settings = self._global_settings
return obj
obj._dict = defaultdict(partial(list, self._global_settings))
for key, values in self._dict.items():
obj._dict[key].extend(values)
obj._global_settings = list(self._global_settings)
return obj
[docs]
def mk_item(self, key, neg, pos):
return chunked_data(key, tuple(neg), tuple(pos))
[docs]
def add_global(self, item):
return self._add_global(item.neg, item.pos, restrict=item.key)
[docs]
def add_bare_global(self, disabled, enabled):
return self._add_global(disabled, enabled)
def _add_global(self, disabled, enabled, restrict=None):
if not disabled and not enabled:
return
# discard current global in the mapping.
disabled = set(disabled)
enabled = set(enabled)
if restrict is None:
restrict = packages.AlwaysTrue
payload = self.mk_item(restrict, tuple(disabled), tuple(enabled))
for vals in self._dict.values():
vals.append(payload)
self._expand_globals([payload])
[docs]
def merge(self, cdict):
if not isinstance(cdict, ChunkedDataDict):
raise TypeError(
"merge expects a ChunkedDataDict instance; "
f"got type {type(cdict)}, {cdict!r}"
)
if isinstance(cdict, PayloadDict) and not isinstance(self, PayloadDict):
raise TypeError(
"merge expects a PayloadDataDict instance; "
f"got type {type(cdict)}, {cdict!r}"
)
# straight extensions for this, rather than update_from_stream.
d = self._dict
for key, values in cdict._dict.items():
d[key].extend(values)
# note the cdict we're merging has the globals layer through it already, ours
# however needs to have the new globals appended to all untouched keys
# (no need to update the merged keys- they already have that global data
# interlaced)
new_globals = cdict._global_settings
if new_globals:
updates = set(d)
updates.difference_update(cdict._dict)
for key in updates:
d[key].extend(new_globals)
self._expand_globals(new_globals)
def _expand_globals(self, new_globals):
# while a chain seems obvious here, reversed is used w/in _build_cp_atom;
# reversed doesn't like chain, so we just modify the list and do it this way.
self._global_settings.extend(new_globals)
restrict = getattr(new_globals[0], "key", packages.AlwaysTrue)
if restrict == packages.AlwaysTrue:
self._global_settings[:] = list(
_build_cp_atom_payload(self._global_settings, restrict)
)
[docs]
def add(self, cinst):
self.update_from_stream([cinst])
[docs]
def update_from_stream(self, stream):
for cinst in stream:
if getattr(cinst.key, "key", None) is not None:
# atom, or something similar. use the key lookup.
# hack also... recreate the restriction; this is due to
# internal idiocy in ChunkedDataDict that will be fixed.
new_globals = (
x
for x in self._global_settings
if x not in self._dict[cinst.key.key]
)
self._dict[cinst.key.key].extend(new_globals)
self._dict[cinst.key.key].append(cinst)
else:
self.add_global(cinst)
[docs]
def freeze(self):
if not isinstance(self._dict, mappings.ImmutableDict):
self._dict = mappings.ImmutableDict(
(k, tuple(v)) for k, v in self._dict.items()
)
self._global_settings = tuple(self._global_settings)
[docs]
def optimize(self, cache=None):
if cache is None:
d_stream = (
(k, _build_cp_atom_payload(v, atom.atom(k), False))
for k, v in self._dict.items()
)
g_stream = _build_cp_atom_payload(
self._global_settings,
packages.AlwaysTrue,
payload_form=isinstance(self, PayloadDict),
)
else:
d_stream = (
(k, _cached_build_cp_atom_payload(cache, v, atom.atom(k), False))
for k, v in self._dict.items()
)
g_stream = _cached_build_cp_atom_payload(
cache,
self._global_settings,
packages.AlwaysTrue,
payload_form=isinstance(self, PayloadDict),
)
if self.frozen:
self._dict = mappings.ImmutableDict(d_stream)
self._global_settings = tuple(g_stream)
else:
self._dict.update(d_stream)
self._global_settings[:] = list(g_stream)
[docs]
def render_to_dict(self):
d = dict(self._dict)
if self._global_settings:
d[packages.AlwaysTrue] = self._global_settings[:]
return d
[docs]
def render_to_payload(self):
d = PayloadDict()
d = {
atom.atom(k): _build_cp_atom_payload(v, atom.atom(k), True)
for k, v in self._dict.items()
}
if self._global_settings:
data = _build_cp_atom_payload(
self._global_settings, packages.AlwaysTrue, payload_form=True
)
d[packages.AlwaysTrue] = tuple(data)
return d
def __bool__(self):
return bool(self._global_settings) or bool(self._dict)
def __str__(self):
return str(self.render_to_dict())
[docs]
def render_pkg(self, pkg, pre_defaults=()):
items = self._dict.get(pkg.key)
if items is None:
items = self._global_settings
s = set(pre_defaults)
incremental_chunked(s, (cinst for cinst in items if cinst.key.match(pkg)))
return s
pull_data = render_pkg
[docs]
class PayloadDict(ChunkedDataDict):
[docs]
def mk_item(self, key, neg, pos):
return restrict_payload(key, tuple(chain(("-" + x for x in neg), pos)))
[docs]
def add_bare_global(self, payload):
neg = [x[1:] for x in payload if x[0] == "-"]
pos = [x for x in payload if x[0] != "-"]
ChunkedDataDict.add_bare_global(self, neg, pos)
[docs]
def add_global(self, pinst):
neg = [x[1:] for x in pinst.data if x[0] == "-"]
pos = [x for x in pinst.data if x[0] != "-"]
return ChunkedDataDict.add_global(self, chunked_data(pinst.restrict, neg, pos))
[docs]
def update_from_stream(self, stream):
for pinst in stream:
if getattr(pinst.restrict, "key", None) is not None:
# atom, or something similar. use the key lookup.
# hack also... recreate the restriction; this is due to
# internal idiocy in ChunkedDataDict that will be fixed.
self._dict[pinst.restrict.key].append(pinst)
else:
self.add_global(pinst)
[docs]
def render_pkg(self, pkg, pre_defaults=()):
items = self._dict.get(atom.atom(pkg.key))
if items is None:
items = self._global_settings
s = set(pre_defaults)
data = chain.from_iterable(
item.data for item in items if item.restrict.match(pkg)
)
return incremental_expansion(data, orig=s)
pull_data = render_pkg
def run_sanity_checks(pkgs, domain, threads=None):
"""Run all sanity checks for a sequence of packages."""
sanity_failures = defaultdict(list)
# TODO: parallelize this across separate processes
for pkg in pkgs:
pkg_ops = domain.get_pkg_operations(pkg)
if pkg_ops.supports("sanity_check") and (failures := pkg_ops.sanity_check()):
sanity_failures[pkg] = failures
return sanity_failures
[docs]
def get_relative_dosym_target(source, target):
"""Get relative path from target to source, for symlink target."""
# NB: as dosym arg, initial slash can be omitted
return os.path.relpath(source, os.path.join("/", os.path.dirname(target)))