"""
functionality related to downloading files
"""
__all__ = ("fetchable", "mirror", "default_mirror", "uri_list")
from itertools import zip_longest
from snakeoil.klass import generic_equality
[docs]
class fetchable(metaclass=generic_equality):
"""class representing uri sources for a file and chksum information."""
__slots__ = ("filename", "uri", "chksums")
__attr_comparison__ = __slots__
def __init__(self, filename, uri=None, chksums=None):
"""
:param filename: filename...
:param uri: either None (no uri),
or a sequence of uri where the file is available
:param chksums: either None (no chksum data),
or a dict of chksum_type -> value for this file
"""
self.uri = uri if uri is not None else ()
self.chksums = chksums if chksums is not None else {}
self.filename = filename
def __str__(self):
chksums = ", ".join(self.chksums)
return f"({self.filename!r}, {self.uri!r}, {chksums})"
def __repr__(self):
return "<%s filename=%r uri=%r chksums=%r @%#8x>" % (
self.__class__.__name__,
self.filename,
self.uri,
self.chksums,
id(self),
)
def __lt__(self, other):
return self.filename < other.filename
def __hash__(self):
return hash((self.filename, self.uri))
@property
def upstream(self):
"""Return a new fetchable with all mirror URIs removed."""
uri_list = self.uri.remove_mirrors()
return self.__class__(self.filename, uri=uri_list, chksums=self.chksums)
[docs]
class mirror(metaclass=generic_equality):
"""uri source representing a mirror tier"""
__attr_comparison__ = ("mirror_name", "mirrors")
__slots__ = ("mirrors", "mirror_name")
def __init__(self, mirrors, mirror_name):
"""
:param mirrors: list of hosts that comprise this mirror tier
:param mirror_name: name of the mirror tier
"""
if not isinstance(mirrors, tuple):
mirrors = tuple(mirrors)
self.mirrors = mirrors
self.mirror_name = mirror_name
def __iter__(self):
return iter(self.mirrors)
def __str__(self):
return f"mirror://{self.mirror_name}"
def __len__(self):
return len(self.mirrors)
def __bool__(self):
return bool(self.mirrors)
def __getitem__(self, idx):
return self.mirrors[idx]
def __repr__(self):
return f"<{self.__class__} mirror tier={self.mirror_name!r}>"
class unknown_mirror(mirror):
"""Unknown mirror tier."""
__slots__ = ()
def __init__(self, mirror_name):
super().__init__(mirrors=(), mirror_name=mirror_name)
[docs]
class default_mirror(mirror):
__slots__ = ()
[docs]
class uri_list:
__slots__ = ("_uri_source", "filename", "__weakref__")
def __init__(self, filename):
self._uri_source = []
self.filename = filename
[docs]
def add_mirror(self, mirror_inst, sub_uri=None):
if not isinstance(mirror_inst, mirror):
raise TypeError("mirror must be a pkgcore.fetch.mirror instance")
if sub_uri is not None:
self._uri_source.append((mirror_inst, sub_uri.lstrip("/")))
else:
self._uri_source.append(mirror_inst)
[docs]
def remove_mirrors(self):
"""Return a new URI source list after dropping all mirror-based URIs."""
uri_list = self.__class__(self.filename)
uri_list._uri_source = tuple(
x for x in self._uri_source if not isinstance(x, mirror)
)
return uri_list
[docs]
def add_uri(self, uri):
self._uri_source.append(uri)
[docs]
def finalize(self):
self._uri_source = tuple(self._uri_source)
def __iter__(self):
fname = self.filename
i = 0
while i < len(self._uri_source):
entry = self._uri_source[i]
if isinstance(entry, str):
yield entry
elif isinstance(entry, tuple):
# TODO: rewrite mirror handling to do this more transparently
# collect all mirrors at the same priority
mirrored = []
while True:
m, sub_uri = entry
uris = (f"{base_uri.rstrip('/')}/{sub_uri}" for base_uri in m)
mirrored.append(uris)
try:
entry = self._uri_source[i + 1]
except IndexError:
break
if not isinstance(entry, tuple):
break
i += 1
# iterate between different mirror groups
for mirrored_uris in zip_longest(*mirrored):
yield from filter(None, mirrored_uris)
else:
for base_uri in entry:
yield f"{base_uri.rstrip('/')}/{fname}"
i += 1
def __str__(self):
uris = ", ".join(str(x) for x in self._uri_source)
return f"file: {self.filename}, uri: {uris}"
def __bool__(self):
# implemented this way on the off chance an empty sublist is handed in
for entry in self:
return True
return False
def __len__(self):
# we do it this way since each item may be a sublist, and to reuse
# __iter__
count = 0
for entry in self:
count += 1
return count
[docs]
def visit_mirrors(self, invert=False, treat_default_as_mirror=True):
def is_mirror(item):
return isinstance(item, mirror) and treat_default_as_mirror == isinstance(
item, default_mirror
)
for item in self._uri_source:
if isinstance(item, tuple):
if invert != is_mirror(item[0]):
yield item
elif invert != is_mirror(item):
yield item