Source code for pkgcore.sync.cvs
__all__ = ("cvs_syncer",)
import os
from . import base
[docs]
class cvs_syncer(base.VcsSyncer):
binary = "cvs"
supported_uris = (
("cvs+", 5),
("cvs://", 5),
)
[docs]
@classmethod
def is_usable_on_filepath(cls, path):
cvs_path = os.path.join(path, "CVS")
if cls.disabled or not os.path.isdir(cvs_path):
return None
return (cls._rewrite_uri_from_stat(cvs_path, "cvs://"),)
@classmethod
def _parse_uri(cls, raw_uri):
if not raw_uri.startswith("cvs") and not raw_uri.startswith("cvs+"):
raise base.UriError(raw_uri, "must be cvs:// or cvs+${RSH}")
if raw_uri.startswith("cvs://"):
return None, raw_uri[len("cvs://") :]
proto = raw_uri[len("cvs+") :].split(":", 1)
if not proto[0]:
raise base.UriError(
raw_uri, "cvs+ requires the rsh alternative to be specified"
)
if proto[0] == "anon":
proto[0] = None
elif proto[0] != "pserver":
try:
proto[0] = cls.require_binary(proto[0])
except base.MissingBinary:
raise base.UriError(raw_uri, f"missing rsh binary: {proto[0]!r}")
return proto[0], proto[1].lstrip("/")
def __init__(self, basedir, raw_uri, **kwargs):
proto, uri = self._parse_uri(raw_uri)
self.rsh = proto
if self.rsh is None:
uri = f":anoncvs:{uri}"
elif self.rsh == "pserver":
uri = f":pserver:{uri}"
self.rsh = None
else:
uri = f":ext:{uri}"
host, self.module = uri.rsplit(":", 1)
super().__init__(basedir, host, **kwargs)
self.env["CVSROOT"] = self.uri
if self.rsh is not None:
self.env["CVS_RSH"] = self.rsh
def _update_existing(self):
return [self.binary_path, "up"]
def _initial_pull(self):
return [self.binary_path, "co", "-d", self.basedir]