Source code for pkgcore.fs.contents

"""
contents set- container of fs objects
"""

import os
import time
from collections import OrderedDict, defaultdict
from functools import partial
from operator import attrgetter
from os.path import join as pjoin
from os.path import normpath

from snakeoil.klass import GenericEquality, alias_method

from . import fs


[docs] def change_offset_rewriter(orig_offset, new_offset, iterable): path_sep = os.path.sep offset_len = len(orig_offset.rstrip(path_sep)) # localize it. npf = normpath for x in iterable: # slip in the '/' default to force it to still generate a # full path still yield x.change_attributes( location=npf(pjoin(new_offset, x.location[offset_len:].lstrip(path_sep))) )
offset_rewriter = partial(change_offset_rewriter, "/")
[docs] def check_instance(obj): if not isinstance(obj, fs.fsBase): raise TypeError(f"'{obj}' is not a fs.fsBase deriviative") return obj.location, obj
[docs] class contentsSet(GenericEquality): """set of :class:`pkgcore.fs.fs.fsBase` objects""" __attr_comparison__ = ("_dict",) __dict_kls__ = dict def __init__(self, initial=None, mutable=True): """ :param initial: initial fs objs for this set :type initial: sequence :param mutable: controls if it modifiable after initialization """ self._dict = self.__dict_kls__() if initial is not None: self._dict.update(check_instance(x) for x in initial) self.mutable = mutable def __str__(self): name = self.__class__.__name__ contents = ", ".join(map(str, self)) return f"{name}([{contents}])" def __repr__(self): name = self.__class__.__name__ contents = ", ".join(map(repr, self)) # this should include the id among other things return f"{name}([{contents}])"
[docs] def add(self, obj): """ add a new fs obj to the set :param obj: must be a derivative of :obj:`pkgcore.fs.fs.fsBase` """ if not self.mutable: # weird, but keeping with set. raise AttributeError(f"{self.__class__} is frozen; no add functionality") if not fs.isfs_obj(obj): raise TypeError(f"'{obj}' is not a fs.fsBase class") self._dict[obj.location] = obj
def __delitem__(self, obj): """ remove a fs obj to the set :type obj: a derivative of :obj:`pkgcore.fs.fs.fsBase` or a string location of an obj in the set. :raise KeyError: if the obj isn't found """ if not self.mutable: # weird, but keeping with set. raise AttributeError(f"{self.__class__} is frozen; no remove functionality") if fs.isfs_obj(obj): del self._dict[obj.location] else: del self._dict[normpath(obj)]
[docs] def remove(self, obj): del self[obj]
[docs] def discard(self, obj): if fs.isfs_obj(obj): self._dict.pop(obj.location, None) else: self._dict.pop(obj, None)
def __getitem__(self, obj): if fs.isfs_obj(obj): return self._dict[obj.location] return self._dict[normpath(obj)] def __contains__(self, key): if fs.isfs_obj(key): return key.location in self._dict return normpath(key) in self._dict
[docs] def clear(self): """ clear the set :raise ttributeError: if the instance is frozen """ if not self.mutable: # weird, but keeping with set. raise AttributeError(f"{self.__class__} is frozen; no clear functionality") self._dict.clear()
@staticmethod def _convert_loc(iterable): f = fs.isfs_obj for x in iterable: if f(x): yield x.location else: yield x @staticmethod def _ensure_fsbase(iterable): f = fs.isfs_obj for x in iterable: if not f(x): raise ValueError(f"must be an fsBase derivative: got {x!r}") yield x
[docs] def difference(self, other): if not hasattr(other, "__contains__"): other = set(self._convert_loc(other)) return contentsSet( (x for x in self if x.location not in other), mutable=self.mutable )
[docs] def difference_update(self, other): if not self.mutable: raise TypeError(f"immutable type {self!r}") rem = self.remove for x in other: if x in self: rem(x)
[docs] def intersection(self, other): return contentsSet((x for x in other if x in self), mutable=self.mutable)
[docs] def intersection_update(self, other): if not self.mutable: raise TypeError(f"immutable type {self!r}") if not hasattr(other, "__contains__"): other = set(self._convert_loc(other)) l = [x for x in self if x.location not in other] for x in l: self.remove(x)
[docs] def issubset(self, other): if not hasattr(other, "__contains__"): other = set(self._convert_loc(other)) return all(x in other for x in self._dict)
[docs] def issuperset(self, other): if not hasattr(other, "__contains__"): other = set(self._convert_loc(other)) return all(x in self for x in other)
[docs] def isdisjoint(self, other): if not hasattr(other, "__contains__"): other = set(self._convert_loc(other)) return not any(x in other for x in self._dict)
[docs] def union(self, other): c = contentsSet(other) c.update(self) return c
def __iter__(self): return iter(self._dict.values()) def __len__(self): return len(self._dict)
[docs] def symmetric_difference(self, other): c = contentsSet(mutable=True) c.update(self) c.symmetric_difference_update(other) object.__setattr__(c, "mutable", self.mutable) return c
[docs] def symmetric_difference_update(self, other): if not self.mutable: raise TypeError(f"immutable type {self!r}") if not hasattr(other, "__contains__"): other = contentsSet(self._ensure_fsbase(other)) l = [] for x in self: if x in other: l.append(x) add = self.add for x in other: if x not in self: add(x) rem = self.remove for x in l: rem(x) del l, rem
[docs] def update(self, iterable): d = self._dict for x in iterable: d[x.location] = x
[docs] def iterfiles(self, invert=False): """A generator yielding just :obj:`pkgcore.fs.fs.fsFile` instances. :param invert: if True, yield everything that isn't a fsFile instance, else yields just fsFile instances. """ if invert: return (x for x in self if not x.is_reg) return filter(attrgetter("is_reg"), self)
[docs] def files(self, invert=False): """Returns a list of just :obj:`pkgcore.fs.fs.fsFile` instances. :param invert: if True, yield everything that isn't a fsFile instance, else yields just fsFile. """ return list(self.iterfiles(invert=invert))
[docs] def iterdirs(self, invert=False): if invert: return (x for x in self if not x.is_dir) return filter(attrgetter("is_dir"), self)
[docs] def dirs(self, invert=False): return list(self.iterdirs(invert=invert))
iterlinks = alias_method("itersymlinks") links = alias_method("symlinks")
[docs] def iterdevs(self, invert=False): if invert: return (x for x in self if not x.is_dev) return filter(attrgetter("is_dev"), self)
[docs] def devs(self, invert=False): return list(self.iterdevs(invert=invert))
[docs] def iterfifos(self, invert=False): if invert: return (x for x in self if not x.is_fifo) return filter(attrgetter("is_fifo"), self)
[docs] def fifos(self, invert=False): return list(self.iterfifos(invert=invert))
for k in ("file", "dir", "symlink", "dev", "fifo"): locals()[f"iter{k}s"].__doc__ = iterfiles.__doc__.replace( "fsFile", f"fs{k.capitalize()}" ) locals()[f"{k}s"].__doc__ = files.__doc__.replace( "fsFile", f"fs{k.capitalize()}" ) del k
[docs] def inode_map(self): d = defaultdict(list) for obj in self.iterfiles(): key = (obj.dev, obj.inode) if None in key: continue d[key].append(obj) return d
[docs] def clone(self, empty=False): if empty: return self.__class__([], mutable=True) return self.__class__(iter(self._dict.values()), mutable=True)
[docs] def insert_offset(self, offset): cset = self.clone(empty=True) cset.update(offset_rewriter(offset, self)) return cset
[docs] def change_offset(self, old_offset, new_offset): cset = self.clone(empty=True) cset.update(change_offset_rewriter(old_offset, new_offset, self)) return cset
[docs] def iter_child_nodes(self, start_point): """Yield a stream of nodes that are fs entries contained within the passed in start point. :param start_point: fs filepath all yielded nodes must be within. """ if isinstance(start_point, fs.fsBase): if start_point.is_sym: start_point = start_point.target else: start_point = start_point.location for x in self: cn_path = normpath(start_point).rstrip(os.path.sep) + os.path.sep # what about sym targets? if x.location.startswith(cn_path): yield x
[docs] def child_nodes(self, start_point): """Return a clone of this instance, w/ just the child nodes returned from `iter_child_nodes`. :param start_point: fs filepath all yielded nodes must be within. """ obj = self.clone(empty=True) obj.update(self.iter_child_nodes(start_point)) return obj
[docs] def map_directory_structure(self, other, add_conflicting_sym=True): """Resolve the directory structure between this instance, and another contentset, collapsing syms of self into directories of other. """ conflicts_d = {x: x.resolved_target for x in other.iterlinks()} # rebuild the targets first; sorted due to the fact that we want to # rewrite each node (resolving down the filepath chain) conflicts = sorted(contentsSet(self.iterdirs()).intersection(conflicts_d)) obj = self.clone() while conflicts: for conflict in conflicts: # punt the conflict first, since we don't want it getting rewritten obj.remove(conflict) subset = obj.child_nodes(conflict.location) obj.difference_update(subset) subset = subset.change_offset( conflict.location, conflict.resolved_target ) obj.update(subset) if add_conflicting_sym: obj.add(other[conflicts_d[conflict]]) # rebuild the targets first; sorted due to the fact that we want to # rewrite each node (resolving down the filepath chain) conflicts = sorted(contentsSet(obj.iterdirs()).intersection(conflicts_d)) return obj
[docs] def add_missing_directories(self, mode=0o775, uid=0, gid=0, mtime=None): """Ensure that a directory node exists for each path; add if missing.""" missing = (x.dirname for x in self) missing = set(x for x in missing if x not in self) if mtime is None: mtime = time.time() # have to go recursive since many directories may be missing. missing_initial = list(missing) for x in missing_initial: target = os.path.dirname(x) while target not in missing and target not in self: missing.add(target) target = os.path.dirname(target) missing.discard("/") self.update( fs.fsDir(location=x, mode=mode, uid=uid, gid=gid, mtime=mtime) for x in missing )
[docs] class OrderedContentsSet(contentsSet): def __init__(self, initial=None, mutable=False, add_missing_directories=False): contentsSet.__init__(self, mutable=True) self._dict = OrderedDict() if initial: self.update(initial) # some sources are a bit stupid, tarballs for example. # add missing directories if requested if add_missing_directories: self.add_missing_directories() self.mutable = mutable