Source code for pkgcore.fs.livefs

"""
interaction with the livefs: generating fs objects to represent the livefs.
"""

import collections
import errno
import os
from stat import S_IMODE, S_ISDIR, S_ISFIFO, S_ISLNK, S_ISREG

from snakeoil.chksum import get_handlers
from snakeoil.data_source import local_source
from snakeoil.mappings import LazyValDict
from snakeoil.osutils import listdir, normpath, pjoin

from .contents import contentsSet
from .fs import fsBase, fsDev, fsDir, fsFifo, fsFile, fsSymlink, get_major_minor

__all__ = ["gen_obj", "scan", "iter_scan", "sorted_scan"]


def gen_chksums(handlers, location):
    def f(key):
        return handlers[key](location)

    return LazyValDict(handlers, f)


[docs] def gen_obj( path, stat=None, chksum_handlers=None, real_location=None, stat_func=os.lstat, **overrides, ): """ given a fs path, and an optional stat, create an appropriate fs obj. :param stat: stat object to reuse if available :param real_location: real path to the object if path is the desired location, rather then existent location. :raise KeyError: if no obj type matches the stat checks :return: :obj:`pkgcore.fs.fs.fsBase` derivative """ if real_location is None: real_location = path if stat is None: try: stat = stat_func(real_location) except EnvironmentError as e: if stat_func == os.lstat or e.errno != errno.ENOENT: raise stat = os.lstat(real_location) mode = stat.st_mode d = { "mtime": stat.st_mtime, "mode": S_IMODE(mode), "uid": stat.st_uid, "gid": stat.st_gid, } if S_ISREG(mode): d["size"] = stat.st_size d["data"] = local_source(real_location) d["dev"] = stat.st_dev d["inode"] = stat.st_ino if chksum_handlers is not None: d["chf_types"] = chksum_handlers d.update(overrides) return fsFile(path, **d) d.update(overrides) if S_ISDIR(mode): return fsDir(path, **d) elif S_ISLNK(mode): d["target"] = os.readlink(real_location) return fsSymlink(path, **d) elif S_ISFIFO(mode): return fsFifo(path, **d) else: major, minor = get_major_minor(stat) d["minor"] = minor d["major"] = major d["mode"] = mode return fsDev(path, **d)
# hmm. this code is roughly 25x slower then find. # make it less slow somehow. the obj instantiation is a bit of a # killer I'm afraid; without obj, looking at 2.3ms roughly best of 3 # 100 iterations, obj instantiation, 58ms. # also, os.path.join is rather slow. # in this case, we know it's always pegging one more dir on, so it's # fine doing it this way (specially since we're relying on # os.path.sep, not '/' :P) def _internal_iter_scan( path, chksum_handlers, stat_func=os.lstat, hidden=True, backup=True ): dirs = collections.deque([normpath(path)]) obj = gen_obj(dirs[0], chksum_handlers=chksum_handlers, stat_func=stat_func) yield obj if not obj.is_dir: return while dirs: base = dirs.popleft() for x in listdir(base): if not hidden and x.startswith("."): continue if not backup and x.endswith("~"): continue path = pjoin(base, x) obj = gen_obj( path, chksum_handlers=chksum_handlers, real_location=path, stat_func=stat_func, ) yield obj if obj.is_dir: dirs.append(path) def _internal_offset_iter_scan( path, chksum_handlers, offset, stat_func=os.lstat, hidden=True, backup=True ): offset = normpath(offset) path = normpath(path) dirs = collections.deque([path[len(offset) :]]) if dirs[0]: yield gen_obj(dirs[0], chksum_handlers=chksum_handlers, stat_func=stat_func) sep = os.path.sep while dirs: base = dirs.popleft() real_base = pjoin(offset, base.lstrip(sep)) base = base.rstrip(sep) + sep for x in listdir(real_base): if not hidden and x.startswith("."): continue if not backup and x.endswith("~"): continue path = pjoin(base, x) obj = gen_obj( path, chksum_handlers=chksum_handlers, real_location=pjoin(real_base, x), stat_func=os.lstat, ) yield obj if obj.is_dir: dirs.append(path)
[docs] def iter_scan( path, offset=None, follow_symlinks=False, chksum_types=None, hidden=True, backup=True, ): """ Recursively scan a path. Does not follow symlinks pointing at dirs, just merely yields an obj representing said symlink :return: an iterator of :obj:`pkgcore.fs.fs.fsBase` objects. :param path: str path of what directory to scan in the livefs :type path: str :param offset: if not None, prefix to strip from each objects location. if offset is /tmp, /tmp/blah becomes /blah :type nonexistent: str or None """ chksum_handlers = get_handlers(chksum_types) stat_func = follow_symlinks and os.stat or os.lstat if offset is None: return _internal_iter_scan( path, chksum_handlers, stat_func, hidden=hidden, backup=backup ) return _internal_offset_iter_scan( path, chksum_handlers, offset, stat_func, hidden=hidden, backup=backup )
[docs] def sorted_scan(path, nonexistent=False, *args, **kwargs): """ Recursively scan a path for regular, nonhidden files. :param path: path to directory to scan in the livefs :type path: str :param nonexistent: return nonexistent given path if True, else return an empty list :type nonexistent: bool :return: an alphabetically sorted list of regular, nonhidden file locations accessible under the given path :raise EnvironmentError: on permission errors See :py:func:`iter_scan` for other valid args. """ files = [path] if nonexistent else [] try: files = sorted(x.location for x in iter_scan(path, *args, **kwargs) if x.is_reg) except EnvironmentError as e: if e.errno != errno.ENOENT: raise return files
[docs] def scan(*a, **kw): """Alias for list(iter_scan(*a, **kw)) Look at :py:func:`iter_scan` for valid args. """ mutable = kw.pop("mutable", True) return contentsSet(iter_scan(*a, **kw), mutable=mutable)
class _realpath_dir: _realpath_func = staticmethod(os.path.realpath) def __init__(self): self._cache = {} def __call__(self, location): dname, fname = location.rsplit("/", 1) if not dname: return location dname2 = self._cache.get(dname) if dname2 is None: dname2 = self._cache[dname] = self._realpath_func(dname) return pjoin(dname2, fname) def intersect(cset, realpath=False): """Generate the intersect of a cset and the livefs.""" f = gen_obj if realpath: f2 = _realpath_dir() else: f2 = lambda x: x for x in cset: try: yield f(f2(x.location)) except OSError as oe: if oe.errno not in (errno.ENOENT, errno.ENOTDIR): raise del oe def recursively_fill_syms(cset, limiter=fsBase): sym_src = [cset.links()] while sym_src: syms = sym_src.pop(-1) new_syms = [] for sym in syms: new_loc = sym.resolved_target if new_loc in cset: continue try: obj = gen_obj(new_loc) except EnvironmentError as e: if e.errno != errno.ENOENT: raise continue if obj.is_sym: cset.add(obj) new_syms.append(obj) elif isinstance(obj, limiter): cset.add(obj) if new_syms: sym_src.append(new_syms)