"""
filesystem entry abstractions
"""
import fnmatch
import stat
from os.path import abspath, basename, dirname, realpath
from os.path import sep as path_seperator
from snakeoil import klass
from snakeoil.chksum import get_chksums, get_handlers
from snakeoil.compatibility import cmp
from snakeoil.currying import post_curry, pretty_docs
from snakeoil.data_source import local_source
from snakeoil.mappings import LazyFullValLoadDict
from snakeoil.osutils import normpath, pjoin
# goofy set of classes representating the fs objects pkgcore knows of.
__all__ = ["fsFile", "fsDir", "fsSymlink", "fsDev", "fsFifo"]
__all__.extend(f"is{x}" for x in ("dir", "reg", "sym", "fifo", "dev", "fs_obj"))
# following are used to generate appropriate __init__, wiped from the
# namespace at the end of the module
_fs_doc = {
"mode": """:keyword mode: int, the mode of this entry. """
"""required if strict is set""",
"mtime": """:keyword mtime: long, the mtime of this entry. """
"""required if strict is set""",
"uid": """:keyword uid: int, the uid of this entry. """
"""required if strict is set""",
"gid": """:keyword gid: int, the gid of this entry. """
"""required if strict is set""",
}
def gen_doc_additions(init, slots):
if init.__doc__ is None:
d = """
:param location: location (real or intended) for this entry
:param strict: is this fully representative of the entry, or only partially
:raise KeyError: if strict is enabled, and not all args are passed in
""".split(
"\n"
)
else:
d = init.__doc__.split("\n")
init.__doc__ = "\n".join(k.lstrip() for k in d) + "\n".join(
_fs_doc[k] for k in _fs_doc if k in slots
)
class fsBase:
"""base class, all extensions must derive from this class"""
__slots__ = ("location", "mtime", "mode", "uid", "gid")
__attrs__ = __slots__
__default_attrs__ = {}
locals().update(
(x.replace("is", "is_"), False)
for x in __all__
if x.startswith("is") and x.islower() and not x.endswith("fs_obj")
)
klass.inject_richcmp_methods_from_cmp(locals())
klass.inject_immutable_instance(locals())
def __init__(self, location, strict=True, **d):
d["location"] = normpath(location)
s = object.__setattr__
if strict:
for k in self.__attrs__:
s(self, k, d[k])
else:
for k, v in d.items():
s(self, k, v)
gen_doc_additions(__init__, __attrs__)
def change_attributes(self, **kwds):
d = {x: getattr(self, x) for x in self.__attrs__ if hasattr(self, x)}
d.update(kwds)
# split location out
location = d.pop("location")
if not location.startswith(path_seperator):
location = abspath(location)
d["strict"] = False
return self.__class__(location, **d)
def __getattr__(self, attr):
# we would only get called if it doesn't exist.
if attr not in self.__attrs__:
raise AttributeError(self, attr)
obj = self.__default_attrs__.get(attr)
if not callable(obj):
return obj
return obj(self)
def __hash__(self):
return hash(self.location)
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
return self.location == other.location
def __ne__(self, other):
return not self == other
def realpath(self, cache=None):
"""calculate the abspath/canonicalized path for this entry, returning
a new instance if the path differs.
:keyword cache: Either None (no cache), or a data object of path->
resolved. Currently unused, but left in for forwards compatibility
"""
new_path = realpath(self.location)
if new_path == self.location:
return self
return self.change_attributes(location=new_path)
@property
def basename(self):
return basename(self.location)
@property
def dirname(self):
return dirname(self.location)
def fnmatch(self, pattern):
return fnmatch.fnmatch(self.location, pattern)
def __cmp__(self, other):
return cmp(self.location, other.location)
def __str__(self):
return self.location
class _LazyChksums(LazyFullValLoadDict):
__slots__ = ()
[docs]
class fsFile(fsBase):
"""file class"""
__slots__ = ("chksums", "data", "dev", "inode")
__attrs__ = fsBase.__attrs__ + __slots__
__default_attrs__ = {"mtime": 0, "dev": None, "inode": None}
is_reg = True
def __init__(self, location, chksums=None, data=None, **kwds):
"""
:param chksums: dict of checksums, key chksum_type: val hash val.
See :obj:`snakeoil.chksum`.
"""
assert "data_source" not in kwds
if data is None:
data = local_source(location)
kwds["data"] = data
if chksums is None:
# this can be problematic offhand if the file is modified
# but chksum not triggered
chf_types = kwds.pop("chf_types", None)
if chf_types is None:
chf_types = tuple(get_handlers())
chksums = _LazyChksums(chf_types, self._chksum_callback)
kwds["chksums"] = chksums
fsBase.__init__(self, location, **kwds)
gen_doc_additions(__init__, __slots__)
def __repr__(self):
return f"file:{self.location}"
data_source = klass.alias_attr("data")
def _chksum_callback(self, chfs):
return list(zip(chfs, get_chksums(self.data, *chfs)))
[docs]
def change_attributes(self, **kwds):
if "data" in kwds and (
"chksums" not in kwds and isinstance(self.chksums, _LazyChksums)
):
kwds["chksums"] = None
return fsBase.change_attributes(self, **kwds)
def _can_be_hardlinked(self, other):
if not other.is_reg:
return False
if None in (self.inode, self.dev):
return False
for attr in ("dev", "inode", "uid", "gid", "mode", "mtime"):
if getattr(self, attr) != getattr(other, attr):
return False
return True
[docs]
class fsDir(fsBase):
"""dir class"""
__slots__ = ()
is_dir = True
def __repr__(self):
return f"dir:{self.location}"
class fsLink(fsBase):
"""symlink class"""
__slots__ = ("target",)
__attrs__ = fsBase.__attrs__ + __slots__
is_sym = True
def __init__(self, location, target, **kwargs):
"""
:param target: string, filepath of the symlinks target
"""
kwargs["target"] = target
fsBase.__init__(self, location, **kwargs)
gen_doc_additions(__init__, __slots__)
def change_attributes(self, **kwds):
d = {x: getattr(self, x) for x in self.__attrs__ if hasattr(self, x)}
d.update(kwds)
# split location out
location = d.pop("location")
if not location.startswith(path_seperator):
location = abspath(location)
target = d.pop("target")
d["strict"] = False
return self.__class__(location, target, **d)
@property
def resolved_target(self):
if self.target.startswith("/"):
return self.target
return normpath(pjoin(self.location, "../", self.target))
def __cmp__(self, other):
c = cmp(self.location, other.location)
if c:
return c
if isinstance(other, self.__class__):
return cmp(self.target, other.target)
return 0
def __str__(self):
return f"{self.location} -> {self.target}"
def __repr__(self):
return f"symlink:{self.location}->{self.target}"
fsSymlink = fsLink
[docs]
class fsDev(fsBase):
"""dev class (char/block objects)"""
__slots__ = ("major", "minor")
__attrs__ = fsBase.__attrs__ + __slots__
__default_attrs__ = {"major": -1, "minor": -1}
is_dev = True
def __init__(self, path, major=-1, minor=-1, **kwds):
if kwds.get("strict", True):
if major == -1 or minor == -1:
raise TypeError("major/minor must be specified and positive ints")
if not stat.S_IFMT(kwds["mode"]):
raise TypeError(
"mode %o: must specify the device type (got %o)"
% (kwds["mode"], stat.S_IFMT(kwds["mode"]))
)
kwds["major"] = major
kwds["minor"] = minor
else:
if major != -1:
major = int(major)
if major < 0:
raise TypeError("major/minor must be specified and positive ints")
kwds["major"] = major
if minor != -1:
minor = int(minor)
if minor < 0:
raise TypeError("major/minor must be specified and positive ints")
kwds["minor"] = minor
fsBase.__init__(self, path, **kwds)
def __repr__(self):
return f"device:{self.location}"
def get_major_minor(stat_inst):
"""get major/minor from a stat instance
:return: major,minor tuple of ints
"""
return (stat_inst.st_rdev >> 8) & 0xFF, stat_inst.st_rdev & 0xFF
[docs]
class fsFifo(fsBase):
"""fifo class (socket objects)"""
__slots__ = ()
is_fifo = True
def __repr__(self):
return f"fifo:{self.location}"
def mk_check(name):
return pretty_docs(
post_curry(getattr, "is_" + name, False),
extradocs=("return True if obj is an instance of :obj:`%s`, else False" % name),
name=("is" + name),
)
isdir = mk_check("dir")
isreg = mk_check("reg")
issym = mk_check("sym")
isfifo = mk_check("fifo")
isdev = mk_check("dev")
isfs_obj = pretty_docs(
post_curry(isinstance, fsBase),
name="isfs_obj",
extradocs="return True if obj is an fsBase derived object",
)
del gen_doc_additions, mk_check