Changeset - 2fac3c55f9bc
[Not reviewed]
default
0 2 0
Mads Kiilerich (mads) - 5 years ago 2020-11-14 17:55:36
mads@kiilerich.com
Grafted from: be223214bc17
vcs: slight clean up of _check_url

Make static void function static and without return value.
2 files changed with 16 insertions and 23 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/vcs/backends/git/repository.py
Show inline comments
 
@@ -102,162 +102,157 @@ class GitRepository(BaseRepository):
 
        """
 
        Runs given ``cmd`` as git command and returns output bytes in a tuple
 
        (stdout, stderr) ... or raise RepositoryError.
 

	
 
        :param cmd: git command to be executed
 
        :param cwd: passed directly to subprocess
 
        """
 
        # need to clean fix GIT_DIR !
 
        gitenv = dict(os.environ)
 
        gitenv.pop('GIT_DIR', None)
 
        gitenv['GIT_CONFIG_NOGLOBAL'] = '1'
 

	
 
        assert isinstance(cmd, list), cmd
 
        cmd = [settings.GIT_EXECUTABLE_PATH, '-c', 'core.quotepath=false'] + cmd
 
        try:
 
            p = subprocessio.SubprocessIOChunker(cmd, cwd=cwd, env=gitenv, shell=False)
 
        except (EnvironmentError, OSError) as err:
 
            # output from the failing process is in str(EnvironmentError)
 
            msg = ("Couldn't run git command %s.\n"
 
                   "Subprocess failed with '%s': %s\n" %
 
                   (cmd, type(err).__name__, err)
 
            ).strip()
 
            log.error(msg)
 
            raise RepositoryError(msg)
 

	
 
        try:
 
            stdout = b''.join(p.output)
 
            stderr = b''.join(p.error)
 
        finally:
 
            p.close()
 
        # TODO: introduce option to make commands fail if they have any stderr output?
 
        if stderr:
 
            log.debug('stderr from %s:\n%s', cmd, stderr)
 
        else:
 
            log.debug('stderr from %s: None', cmd)
 
        return stdout, stderr
 

	
 
    def run_git_command(self, cmd):
 
        """
 
        Runs given ``cmd`` as git command with cwd set to current repo.
 
        Returns stdout as unicode str ... or raise RepositoryError.
 
        """
 
        cwd = None
 
        if os.path.isdir(self.path):
 
            cwd = self.path
 
        stdout, _stderr = self._run_git_command(cmd, cwd=cwd)
 
        return safe_str(stdout)
 

	
 
    @classmethod
 
    def _check_url(cls, url):
 
    @staticmethod
 
    def _check_url(url):
 
        """
 
        Function will check given url and try to verify if it's a valid
 
        link. Sometimes it may happened that git will issue basic
 
        auth request that can cause whole API to hang when used from python
 
        or other external calls.
 
        Raise URLError if url doesn't seem like a valid safe Git URL. We
 
        only allow http, https, git, and ssh URLs.
 

	
 
        On failures it'll raise urllib2.HTTPError, exception is also thrown
 
        when the return code is non 200
 
        For http and https URLs, make a connection and probe to see if it is valid.
 
        """
 
        # check first if it's not an local url
 
        if os.path.isabs(url) and os.path.isdir(url):
 
            return True
 
            return
 

	
 
        if url.startswith('git://'):
 
            try:
 
                _git_colon, _empty, _host, path = url.split('/', 3)
 
            except ValueError:
 
                raise urllib.error.URLError("Invalid URL: %r" % url)
 
            # Mitigate problems elsewhere with incorrect handling of encoded paths.
 
            # Don't trust urllib.parse.unquote but be prepared for more flexible implementations elsewhere.
 
            # Space is the only allowed whitespace character - directly or % encoded. No other % or \ is allowed.
 
            for c in path.replace('%20', ' '):
 
                if c in '%\\':
 
                    raise urllib.error.URLError("Invalid escape character in path: '%s'" % c)
 
                if c.isspace() and c != ' ':
 
                    raise urllib.error.URLError("Invalid whitespace character in path: %r" % c)
 
            return True
 
            return
 

	
 
        if not url.startswith('http://') and not url.startswith('https://'):
 
            raise urllib.error.URLError("Unsupported protocol in URL %s" % url)
 

	
 
        url_obj = mercurial.util.url(safe_bytes(url))
 
        test_uri, handlers = get_urllib_request_handlers(url_obj)
 
        if not test_uri.endswith(b'info/refs'):
 
            test_uri = test_uri.rstrip(b'/') + b'/info/refs'
 

	
 
        url_obj.passwd = b'*****'
 
        cleaned_uri = str(url_obj)
 

	
 
        o = urllib.request.build_opener(*handlers)
 
        o.addheaders = [('User-Agent', 'git/1.7.8.0')]  # fake some git
 

	
 
        req = urllib.request.Request(
 
            "%s?%s" % (
 
                safe_str(test_uri),
 
                urllib.parse.urlencode({"service": 'git-upload-pack'})
 
            ))
 

	
 
        try:
 
            resp = o.open(req)
 
            if resp.code != 200:
 
                raise Exception('Return Code is not 200')
 
        except Exception as e:
 
            # means it cannot be cloned
 
            raise urllib.error.URLError("[%s] org_exc: %s" % (cleaned_uri, e))
 

	
 
        # now detect if it's proper git repo
 
        gitdata = resp.read()
 
        if b'service=git-upload-pack' not in gitdata:
 
            raise urllib.error.URLError(
 
                "url [%s] does not look like an git" % cleaned_uri)
 

	
 
        return True
 

	
 
    def _get_repo(self, create, src_url=None, update_after_clone=False,
 
                  bare=False):
 
        if create and os.path.exists(self.path):
 
            raise RepositoryError("Location already exist")
 
        if src_url and not create:
 
            raise RepositoryError("Create should be set to True if src_url is "
 
                                  "given (clone operation creates repository)")
 
        try:
 
            if create and src_url:
 
                GitRepository._check_url(src_url)
 
                self.clone(src_url, update_after_clone, bare)
 
                return Repo(self.path)
 
            elif create:
 
                os.makedirs(self.path)
 
                if bare:
 
                    return Repo.init_bare(self.path)
 
                else:
 
                    return Repo.init(self.path)
 
            else:
 
                return Repo(self.path)
 
        except (NotGitRepository, OSError) as err:
 
            raise RepositoryError(err)
 

	
 
    def _get_all_revisions(self):
 
        # we must check if this repo is not empty, since later command
 
        # fails if it is. And it's cheaper to ask than throw the subprocess
 
        # errors
 
        try:
 
            self._repo.head()
 
        except KeyError:
 
            return []
 

	
 
        rev_filter = settings.GIT_REV_FILTER
 
        cmd = ['rev-list', rev_filter, '--reverse', '--date-order']
 
        try:
 
            so = self.run_git_command(cmd)
 
        except RepositoryError:
 
            # Can be raised for empty repositories
 
            return []
 
        return so.splitlines()
 

	
 
    def _get_all_revisions2(self):
 
        # alternate implementation using dulwich
 
        includes = [ascii_str(sha) for key, (sha, type_) in self._parsed_refs.items()
 
                    if type_ != b'T']
 
        return [c.commit.id for c in self._repo.get_walker(include=includes)]
 

	
 
    def _get_revision(self, revision):
kallithea/lib/vcs/backends/hg/repository.py
Show inline comments
 
@@ -236,160 +236,158 @@ class MercurialRepository(BaseRepository
 

	
 
    def _get_all_revisions(self):
 
        return [ascii_str(self._repo[x].hex()) for x in self._repo.filtered(b'visible').changelog.revs()]
 

	
 
    def get_diff(self, rev1, rev2, path='', ignore_whitespace=False,
 
                  context=3):
 
        """
 
        Returns (git like) *diff*, as plain text. Shows changes introduced by
 
        ``rev2`` since ``rev1``.
 

	
 
        :param rev1: Entry point from which diff is shown. Can be
 
          ``self.EMPTY_CHANGESET`` - in this case, patch showing all
 
          the changes since empty state of the repository until ``rev2``
 
        :param rev2: Until which revision changes should be shown.
 
        :param ignore_whitespace: If set to ``True``, would not show whitespace
 
          changes. Defaults to ``False``.
 
        :param context: How many lines before/after changed lines should be
 
          shown. Defaults to ``3``. If negative value is passed-in, it will be
 
          set to ``0`` instead.
 
        """
 

	
 
        # Negative context values make no sense, and will result in
 
        # errors. Ensure this does not happen.
 
        if context < 0:
 
            context = 0
 

	
 
        if hasattr(rev1, 'raw_id'):
 
            rev1 = getattr(rev1, 'raw_id')
 

	
 
        if hasattr(rev2, 'raw_id'):
 
            rev2 = getattr(rev2, 'raw_id')
 

	
 
        # Check if given revisions are present at repository (may raise
 
        # ChangesetDoesNotExistError)
 
        if rev1 != self.EMPTY_CHANGESET:
 
            self.get_changeset(rev1)
 
        self.get_changeset(rev2)
 
        if path:
 
            file_filter = mercurial.match.exact([safe_bytes(path)])
 
        else:
 
            file_filter = None
 

	
 
        return b''.join(mercurial.patch.diff(self._repo, rev1, rev2, match=file_filter,
 
                          opts=mercurial.mdiff.diffopts(git=True,
 
                                        showfunc=True,
 
                                        ignorews=ignore_whitespace,
 
                                        context=context)))
 

	
 
    @classmethod
 
    def _check_url(cls, url, repoui=None):
 
        """
 
        Function will check given url and try to verify if it's a valid
 
        link. Sometimes it may happened that mercurial will issue basic
 
        auth request that can cause whole API to hang when used from python
 
        or other external calls.
 
    @staticmethod
 
    def _check_url(url, repoui=None):
 
        r"""
 
        Raise URLError if url doesn't seem like a valid safe Hg URL. We
 
        only allow http, https, ssh, and hg-git URLs.
 

	
 
        For http, https and git URLs, make a connection and probe to see if it is valid.
 

	
 
        On failures it'll raise urllib2.HTTPError, exception is also thrown
 
        when the return code is non 200
 
        """
 
        # check first if it's not an local url
 
        url = safe_bytes(url)
 
        if os.path.isdir(url) or url.startswith(b'file:'):
 
            return True
 
            return
 

	
 
        if url.startswith(b'ssh:'):
 
            # in case of invalid uri or authentication issues, sshpeer will
 
            # throw an exception.
 
            mercurial.sshpeer.instance(repoui or mercurial.ui.ui(), url, False).lookup(b'tip')
 
            return True
 
            return
 

	
 
        url_prefix = None
 
        if b'+' in url[:url.find(b'://')]:
 
            url_prefix, url = url.split(b'+', 1)
 

	
 
        url_obj = mercurial.util.url(url)
 
        test_uri, handlers = get_urllib_request_handlers(url_obj)
 

	
 
        url_obj.passwd = b'*****'
 
        cleaned_uri = str(url_obj)
 

	
 
        o = urllib.request.build_opener(*handlers)
 
        o.addheaders = [('Content-Type', 'application/mercurial-0.1'),
 
                        ('Accept', 'application/mercurial-0.1')]
 

	
 
        req = urllib.request.Request(
 
            "%s?%s" % (
 
                safe_str(test_uri),
 
                urllib.parse.urlencode({
 
                    'cmd': 'between',
 
                    'pairs': "%s-%s" % ('0' * 40, '0' * 40),
 
                })
 
            ))
 

	
 
        try:
 
            resp = o.open(req)
 
            if resp.code != 200:
 
                raise Exception('Return Code is not 200')
 
        except Exception as e:
 
            # means it cannot be cloned
 
            raise urllib.error.URLError("[%s] org_exc: %s" % (cleaned_uri, e))
 

	
 
        if not url_prefix: # skip git+http://... etc
 
            # now check if it's a proper hg repo
 
            try:
 
                mercurial.httppeer.instance(repoui or mercurial.ui.ui(), url, False).lookup(b'tip')
 
            except Exception as e:
 
                raise urllib.error.URLError(
 
                    "url [%s] does not look like an hg repo org_exc: %s"
 
                    % (cleaned_uri, e))
 

	
 
        return True
 

	
 
    def _get_repo(self, create, src_url=None, update_after_clone=False):
 
        """
 
        Function will check for mercurial repository in given path and return
 
        a localrepo object. If there is no repository in that path it will
 
        raise an exception unless ``create`` parameter is set to True - in
 
        that case repository would be created and returned.
 
        If ``src_url`` is given, would try to clone repository from the
 
        location at given clone_point. Additionally it'll make update to
 
        working copy accordingly to ``update_after_clone`` flag
 
        """
 
        try:
 
            if src_url:
 
                url = safe_bytes(self._get_url(src_url))
 
                opts = {}
 
                if not update_after_clone:
 
                    opts.update({'noupdate': True})
 
                MercurialRepository._check_url(url, self.baseui)
 
                mercurial.commands.clone(self.baseui, url, safe_bytes(self.path), **opts)
 

	
 
                # Don't try to create if we've already cloned repo
 
                create = False
 
            return mercurial.localrepo.instance(self.baseui, safe_bytes(self.path), create=create)
 
        except (mercurial.error.Abort, mercurial.error.RepoError) as err:
 
            if create:
 
                msg = "Cannot create repository at %s. Original error was %s" \
 
                    % (self.name, err)
 
            else:
 
                msg = "Not valid repository at %s. Original error was %s" \
 
                    % (self.name, err)
 
            raise RepositoryError(msg)
 

	
 
    @LazyProperty
 
    def in_memory_changeset(self):
 
        return inmemory.MercurialInMemoryChangeset(self)
 

	
 
    @LazyProperty
 
    def description(self):
 
        _desc = self._repo.ui.config(b'web', b'description', None, untrusted=True)
 
        return safe_str(_desc or b'unknown')
 

	
 
    @LazyProperty
 
    def contact(self):
 
        return safe_str(mercurial.hgweb.common.get_contact(self._repo.ui.config)
 
                            or b'Unknown')
 

	
 
    @LazyProperty
 
    def last_change(self):
 
        """
0 comments (0 inline, 0 general)