Source code for pkgcore.sync.tar
__all__ = ("tar_syncer",)
import atexit
import os
import shutil
import subprocess
import tempfile
from functools import partial
from . import base
from .http import http_syncer
[docs]
class tar_syncer(http_syncer, base.ExternalSyncer):
binary = "tar"
supported_uris = (
("tar+http://", 5),
("tar+https://", 5),
)
# TODO: support more of the less used file extensions
supported_protocols = ("http://", "https://")
supported_exts = (".tar.gz", ".tar.bz2", ".tar.xz")
[docs]
@classmethod
def parse_uri(cls, raw_uri):
if raw_uri.startswith(("tar+http://", "tar+https://")):
raw_uri = raw_uri[4:]
if raw_uri.startswith(cls.supported_protocols) and raw_uri.endswith(
cls.supported_exts
):
return raw_uri
else:
raise base.UriError(
raw_uri, "unsupported compression format for tarball archive"
)
raise base.UriError(raw_uri, "unsupported URI")
def _pre_download(self):
# create temp file for downloading
self.tarball = tempfile.NamedTemporaryFile()
# make sure temp file is deleted on exit
atexit.register(partial(self.tarball.close))
# determine names of tempdirs for staging
basedir = self.basedir.rstrip(os.path.sep)
repos_dir = os.path.dirname(basedir)
repo_name = os.path.basename(basedir)
self.tempdir = os.path.join(repos_dir, f".{repo_name}.update")
self.tempdir_old = os.path.join(repos_dir, f".{repo_name}.old")
# remove tempdirs on exit
atexit.register(partial(shutil.rmtree, self.tempdir, ignore_errors=True))
atexit.register(partial(shutil.rmtree, self.tempdir_old, ignore_errors=True))
return self.tarball.name
def _post_download(self, path):
super()._post_download(path)
# create tempdirs for staging
try:
os.makedirs(self.tempdir)
os.makedirs(self.tempdir_old)
except OSError as e:
raise base.SyncError(f"failed creating repo update dirs: {e}")
exts = {"gz": "gzip", "bz2": "bzip2", "xz": "xz"}
compression = exts[self.uri.rsplit(".", 1)[1]]
# use tar instead of tarfile so we can easily strip leading path components
# TODO: programmatically determine how many components to strip?
cmd = [
"tar",
"--extract",
f"--{compression}",
"-f",
self.tarball.name,
"--strip-components=1",
"--no-same-owner",
"-C",
self.tempdir,
]
try:
subprocess.run(cmd, stderr=subprocess.PIPE, check=True, encoding="utf8")
except subprocess.CalledProcessError as e:
error = e.stderr.splitlines()[0]
raise base.SyncError(f"failed to unpack tarball: {error}")
# TODO: verify gpg data if it exists
try:
if os.path.exists(self.basedir):
# move old repo out of the way if it exists
os.rename(self.basedir, self.tempdir_old)
# move new, unpacked repo into place
os.rename(self.tempdir, self.basedir)
except OSError as e:
raise base.SyncError(f"failed to update repo: {e.strerror}") from e