"""
contents set- container of fs objects
"""
import os
import time
from collections import OrderedDict, defaultdict
from functools import partial
from operator import attrgetter
from os.path import join as pjoin
from os.path import normpath
from snakeoil.klass import GenericEquality, alias_method
from . import fs
[docs]
def change_offset_rewriter(orig_offset, new_offset, iterable):
path_sep = os.path.sep
offset_len = len(orig_offset.rstrip(path_sep))
# localize it.
npf = normpath
for x in iterable:
# slip in the '/' default to force it to still generate a
# full path still
yield x.change_attributes(
location=npf(pjoin(new_offset, x.location[offset_len:].lstrip(path_sep)))
)
offset_rewriter = partial(change_offset_rewriter, "/")
[docs]
def check_instance(obj):
if not isinstance(obj, fs.fsBase):
raise TypeError(f"'{obj}' is not a fs.fsBase deriviative")
return obj.location, obj
[docs]
class contentsSet(GenericEquality):
"""set of :class:`pkgcore.fs.fs.fsBase` objects"""
__attr_comparison__ = ("_dict",)
__dict_kls__ = dict
def __init__(self, initial=None, mutable=True):
"""
:param initial: initial fs objs for this set
:type initial: sequence
:param mutable: controls if it modifiable after initialization
"""
self._dict = self.__dict_kls__()
if initial is not None:
self._dict.update(check_instance(x) for x in initial)
self.mutable = mutable
def __str__(self):
name = self.__class__.__name__
contents = ", ".join(map(str, self))
return f"{name}([{contents}])"
def __repr__(self):
name = self.__class__.__name__
contents = ", ".join(map(repr, self))
# this should include the id among other things
return f"{name}([{contents}])"
[docs]
def add(self, obj):
"""
add a new fs obj to the set
:param obj: must be a derivative of :obj:`pkgcore.fs.fs.fsBase`
"""
if not self.mutable:
# weird, but keeping with set.
raise AttributeError(f"{self.__class__} is frozen; no add functionality")
if not fs.isfs_obj(obj):
raise TypeError(f"'{obj}' is not a fs.fsBase class")
self._dict[obj.location] = obj
def __delitem__(self, obj):
"""
remove a fs obj to the set
:type obj: a derivative of :obj:`pkgcore.fs.fs.fsBase`
or a string location of an obj in the set.
:raise KeyError: if the obj isn't found
"""
if not self.mutable:
# weird, but keeping with set.
raise AttributeError(f"{self.__class__} is frozen; no remove functionality")
if fs.isfs_obj(obj):
del self._dict[obj.location]
else:
del self._dict[normpath(obj)]
[docs]
def remove(self, obj):
del self[obj]
[docs]
def discard(self, obj):
if fs.isfs_obj(obj):
self._dict.pop(obj.location, None)
else:
self._dict.pop(obj, None)
def __getitem__(self, obj):
if fs.isfs_obj(obj):
return self._dict[obj.location]
return self._dict[normpath(obj)]
def __contains__(self, key):
if fs.isfs_obj(key):
return key.location in self._dict
return normpath(key) in self._dict
[docs]
def clear(self):
"""
clear the set
:raise ttributeError: if the instance is frozen
"""
if not self.mutable:
# weird, but keeping with set.
raise AttributeError(f"{self.__class__} is frozen; no clear functionality")
self._dict.clear()
@staticmethod
def _convert_loc(iterable):
f = fs.isfs_obj
for x in iterable:
if f(x):
yield x.location
else:
yield x
@staticmethod
def _ensure_fsbase(iterable):
f = fs.isfs_obj
for x in iterable:
if not f(x):
raise ValueError(f"must be an fsBase derivative: got {x!r}")
yield x
[docs]
def difference(self, other):
if not hasattr(other, "__contains__"):
other = set(self._convert_loc(other))
return contentsSet(
(x for x in self if x.location not in other), mutable=self.mutable
)
[docs]
def difference_update(self, other):
if not self.mutable:
raise TypeError(f"immutable type {self!r}")
rem = self.remove
for x in other:
if x in self:
rem(x)
[docs]
def intersection(self, other):
return contentsSet((x for x in other if x in self), mutable=self.mutable)
[docs]
def intersection_update(self, other):
if not self.mutable:
raise TypeError(f"immutable type {self!r}")
if not hasattr(other, "__contains__"):
other = set(self._convert_loc(other))
l = [x for x in self if x.location not in other]
for x in l:
self.remove(x)
[docs]
def issubset(self, other):
if not hasattr(other, "__contains__"):
other = set(self._convert_loc(other))
return all(x in other for x in self._dict)
[docs]
def issuperset(self, other):
if not hasattr(other, "__contains__"):
other = set(self._convert_loc(other))
return all(x in self for x in other)
[docs]
def isdisjoint(self, other):
if not hasattr(other, "__contains__"):
other = set(self._convert_loc(other))
return not any(x in other for x in self._dict)
[docs]
def union(self, other):
c = contentsSet(other)
c.update(self)
return c
def __iter__(self):
return iter(self._dict.values())
def __len__(self):
return len(self._dict)
[docs]
def symmetric_difference(self, other):
c = contentsSet(mutable=True)
c.update(self)
c.symmetric_difference_update(other)
object.__setattr__(c, "mutable", self.mutable)
return c
[docs]
def symmetric_difference_update(self, other):
if not self.mutable:
raise TypeError(f"immutable type {self!r}")
if not hasattr(other, "__contains__"):
other = contentsSet(self._ensure_fsbase(other))
l = []
for x in self:
if x in other:
l.append(x)
add = self.add
for x in other:
if x not in self:
add(x)
rem = self.remove
for x in l:
rem(x)
del l, rem
[docs]
def update(self, iterable):
d = self._dict
for x in iterable:
d[x.location] = x
[docs]
def iterfiles(self, invert=False):
"""A generator yielding just :obj:`pkgcore.fs.fs.fsFile` instances.
:param invert: if True, yield everything that isn't a fsFile instance,
else yields just fsFile instances.
"""
if invert:
return (x for x in self if not x.is_reg)
return filter(attrgetter("is_reg"), self)
[docs]
def files(self, invert=False):
"""Returns a list of just :obj:`pkgcore.fs.fs.fsFile` instances.
:param invert: if True, yield everything that isn't a
fsFile instance, else yields just fsFile.
"""
return list(self.iterfiles(invert=invert))
[docs]
def iterdirs(self, invert=False):
if invert:
return (x for x in self if not x.is_dir)
return filter(attrgetter("is_dir"), self)
[docs]
def dirs(self, invert=False):
return list(self.iterdirs(invert=invert))
[docs]
def itersymlinks(self, invert=False):
if invert:
return (x for x in self if not x.is_sym)
return filter(attrgetter("is_sym"), self)
[docs]
def symlinks(self, invert=False):
return list(self.iterlinks(invert=invert))
iterlinks = alias_method("itersymlinks")
links = alias_method("symlinks")
[docs]
def iterdevs(self, invert=False):
if invert:
return (x for x in self if not x.is_dev)
return filter(attrgetter("is_dev"), self)
[docs]
def devs(self, invert=False):
return list(self.iterdevs(invert=invert))
[docs]
def iterfifos(self, invert=False):
if invert:
return (x for x in self if not x.is_fifo)
return filter(attrgetter("is_fifo"), self)
[docs]
def fifos(self, invert=False):
return list(self.iterfifos(invert=invert))
for k in ("file", "dir", "symlink", "dev", "fifo"):
locals()[f"iter{k}s"].__doc__ = iterfiles.__doc__.replace(
"fsFile", f"fs{k.capitalize()}"
)
locals()[f"{k}s"].__doc__ = files.__doc__.replace(
"fsFile", f"fs{k.capitalize()}"
)
del k
[docs]
def inode_map(self):
d = defaultdict(list)
for obj in self.iterfiles():
key = (obj.dev, obj.inode)
if None in key:
continue
d[key].append(obj)
return d
[docs]
def clone(self, empty=False):
if empty:
return self.__class__([], mutable=True)
return self.__class__(iter(self._dict.values()), mutable=True)
[docs]
def insert_offset(self, offset):
cset = self.clone(empty=True)
cset.update(offset_rewriter(offset, self))
return cset
[docs]
def change_offset(self, old_offset, new_offset):
cset = self.clone(empty=True)
cset.update(change_offset_rewriter(old_offset, new_offset, self))
return cset
[docs]
def iter_child_nodes(self, start_point):
"""Yield a stream of nodes that are fs entries contained within the
passed in start point.
:param start_point: fs filepath all yielded nodes must be within.
"""
if isinstance(start_point, fs.fsBase):
if start_point.is_sym:
start_point = start_point.target
else:
start_point = start_point.location
for x in self:
cn_path = normpath(start_point).rstrip(os.path.sep) + os.path.sep
# what about sym targets?
if x.location.startswith(cn_path):
yield x
[docs]
def child_nodes(self, start_point):
"""Return a clone of this instance, w/ just the child nodes returned
from `iter_child_nodes`.
:param start_point: fs filepath all yielded nodes must be within.
"""
obj = self.clone(empty=True)
obj.update(self.iter_child_nodes(start_point))
return obj
[docs]
def map_directory_structure(self, other, add_conflicting_sym=True):
"""Resolve the directory structure between this instance, and another
contentset, collapsing syms of self into directories of other.
"""
conflicts_d = {x: x.resolved_target for x in other.iterlinks()}
# rebuild the targets first; sorted due to the fact that we want to
# rewrite each node (resolving down the filepath chain)
conflicts = sorted(contentsSet(self.iterdirs()).intersection(conflicts_d))
obj = self.clone()
while conflicts:
for conflict in conflicts:
# punt the conflict first, since we don't want it getting rewritten
obj.remove(conflict)
subset = obj.child_nodes(conflict.location)
obj.difference_update(subset)
subset = subset.change_offset(
conflict.location, conflict.resolved_target
)
obj.update(subset)
if add_conflicting_sym:
obj.add(other[conflicts_d[conflict]])
# rebuild the targets first; sorted due to the fact that we want to
# rewrite each node (resolving down the filepath chain)
conflicts = sorted(contentsSet(obj.iterdirs()).intersection(conflicts_d))
return obj
[docs]
def add_missing_directories(self, mode=0o775, uid=0, gid=0, mtime=None):
"""Ensure that a directory node exists for each path; add if missing."""
missing = (x.dirname for x in self)
missing = set(x for x in missing if x not in self)
if mtime is None:
mtime = time.time()
# have to go recursive since many directories may be missing.
missing_initial = list(missing)
for x in missing_initial:
target = os.path.dirname(x)
while target not in missing and target not in self:
missing.add(target)
target = os.path.dirname(target)
missing.discard("/")
self.update(
fs.fsDir(location=x, mode=mode, uid=uid, gid=gid, mtime=mtime)
for x in missing
)
[docs]
class OrderedContentsSet(contentsSet):
def __init__(self, initial=None, mutable=False, add_missing_directories=False):
contentsSet.__init__(self, mutable=True)
self._dict = OrderedDict()
if initial:
self.update(initial)
# some sources are a bit stupid, tarballs for example.
# add missing directories if requested
if add_missing_directories:
self.add_missing_directories()
self.mutable = mutable