Source code for pkgcore.vdb.contents

__all__ = ("LookupFsDev", "ContentsFile")

import os
import stat

from snakeoil import data_source
from snakeoil.chksum import get_handler
from snakeoil.fileutils import AtomicWriteFile, readlines_utf8

from .. import os_data
from ..fs import fs
from ..fs.contents import contentsSet


[docs] class LookupFsDev(fs.fsDev): __slots__ = () def __init__(self, path, **kwds): if any(x not in kwds for x in ("major", "minor", "mode")): try: st = os.lstat(path) except FileNotFoundError: st = None if st is None or any( f(st.st_mode) for f in (stat.S_ISREG, stat.S_ISDIR, stat.S_ISFIFO) ): kwds["strict"] = True else: major, minor = fs.get_major_minor(st) kwds["major"] = major kwds["minor"] = minor kwds["mode"] = st.st_mode super().__init__(path, **kwds)
[docs] class ContentsFile(contentsSet): """class wrapping a contents file""" def __init__(self, source, mutable=False, create=False): if not isinstance(source, (data_source.base, str)): raise TypeError("source must be either data_source, or a filepath") super().__init__(mutable=True) self._source = source if not create: self.update(self._iter_contents()) self.mutable = mutable
[docs] def clone(self, empty=False): # create is used to block it from reading. cset = self.__class__(self._source, mutable=True, create=True) if not empty: cset.update(self) return cset
[docs] def add(self, obj): if obj.is_reg: # strict checks if obj.chksums is None or "md5" not in obj.chksums: raise TypeError("fsFile objects need to be strict") contentsSet.add(self, obj)
def _get_fd(self, write=False): if isinstance(self._source, str): if write: return AtomicWriteFile( self._source, uid=os_data.root_uid, gid=os_data.root_gid, perms=0o644, ) return readlines_utf8(self._source, True) fobj = self._source.text_fileobj(writable=write) if write: fobj.seek(0, 0) fobj.truncate(0) return fobj
[docs] def flush(self): return self._write()
def _iter_contents(self): self.clear() for line in self._get_fd(): if not line: continue s = line.split(" ") if s[0] in ("dir", "dev", "fif"): path = " ".join(s[1:]) if s[0] == "dir": obj = fs.fsDir(path, strict=False) elif s[0] == "dev": obj = LookupFsDev(path, strict=False) else: obj = fs.fsFifo(path, strict=False) elif s[0] == "obj": path = " ".join(s[1:-2]) obj = fs.fsFile( path, chksums={"md5": int(s[-2], 16)}, mtime=int(s[-1]), strict=False, ) elif s[0] == "sym": try: p = s.index("->") obj = fs.fsLink( " ".join(s[1:p]), " ".join(s[p + 1 : -1]), mtime=int(s[-1]), strict=False, ) except ValueError: # XXX throw a corruption error raise else: raise Exception(f"unknown entry type {line!r}") yield obj def _write(self): md5_handler = get_handler("md5") outfile = None try: outfile = self._get_fd(True) for obj in sorted(self): if obj.is_reg: s = " ".join( ( "obj", obj.location, md5_handler.long2str(obj.chksums["md5"]), str(int(obj.mtime)), ) ) elif obj.is_sym: s = " ".join( ("sym", obj.location, "->", obj.target, str(int(obj.mtime))) ) elif obj.is_dir: s = "dir " + obj.location elif obj.is_dev: s = "dev " + obj.location elif obj.is_fifo: s = "fif " + obj.location else: raise Exception(f"unknown type {type(obj)}: {obj}") outfile.write(s + "\n") outfile.close() finally: # if atomic, it forces the update to be wiped. del outfile