Changeset - c3b6ad713a0f
[Not reviewed]
default
0 1 0
Mads Kiilerich (mads) - 5 years ago 2020-12-03 11:16:44
mads@kiilerich.com
hg: make sure _check_url only accept the protocols accepted by is_valid_repo_uri

Avoid unnecessary flexibility, ambiguity, and complexity.

This is similar to what 3ea3d3a2b3e3 did for Git.
1 file changed with 19 insertions and 5 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/vcs/backends/hg/repository.py
Show inline comments
 
@@ -253,148 +253,162 @@ class MercurialRepository(BaseRepository
 
          shown. Defaults to ``3``. If negative value is passed-in, it will be
 
          set to ``0`` instead.
 
        """
 

	
 
        # Negative context values make no sense, and will result in
 
        # errors. Ensure this does not happen.
 
        if context < 0:
 
            context = 0
 

	
 
        if hasattr(rev1, 'raw_id'):
 
            rev1 = getattr(rev1, 'raw_id')
 

	
 
        if hasattr(rev2, 'raw_id'):
 
            rev2 = getattr(rev2, 'raw_id')
 

	
 
        # Check if given revisions are present at repository (may raise
 
        # ChangesetDoesNotExistError)
 
        if rev1 != self.EMPTY_CHANGESET:
 
            self.get_changeset(rev1)
 
        self.get_changeset(rev2)
 
        if path:
 
            file_filter = mercurial.match.exact([safe_bytes(path)])
 
        else:
 
            file_filter = None
 

	
 
        return b''.join(mercurial.patch.diff(self._repo, rev1, rev2, match=file_filter,
 
                          opts=mercurial.mdiff.diffopts(git=True,
 
                                        showfunc=True,
 
                                        ignorews=ignore_whitespace,
 
                                        context=context)))
 

	
 
    @staticmethod
 
    def _check_url(url, repoui=None):
 
        r"""
 
        Raise URLError if url doesn't seem like a valid safe Hg URL. We
 
        only allow http, https, ssh, and hg-git URLs.
 

	
 
        For http, https and git URLs, make a connection and probe to see if it is valid.
 

	
 
        On failures it'll raise urllib2.HTTPError, exception is also thrown
 
        when the return code is non 200
 

	
 
        >>> MercurialRepository._check_url('file:///repo')
 

	
 
        >>> MercurialRepository._check_url('http://example.com:65537/repo')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Error parsing URL: 'http://example.com:65537/repo'>
 
        >>> MercurialRepository._check_url('foo')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Unsupported protocol in URL 'foo'>
 
        >>> MercurialRepository._check_url('git+ssh://example.com/my%20fine repo')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Unsupported protocol in URL 'git+ssh://example.com/my%20fine repo'>
 
        >>> MercurialRepository._check_url('svn+http://example.com/repo')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Unsupported protocol in URL 'svn+http://example.com/repo'>
 
        """
 
        try:
 
            parsed_url = urllib.parse.urlparse(url)
 
            parsed_url.port  # trigger netloc parsing which might raise ValueError
 
        except ValueError:
 
            raise urllib.error.URLError("Error parsing URL: %r" % url)
 

	
 
        # check first if it's not an local url
 
        if os.path.isdir(url) or parsed_url.scheme == 'file':
 
        if os.path.isabs(url) and os.path.isdir(url) or parsed_url.scheme == 'file':
 
            # When creating repos, _get_url will use file protocol for local paths
 
            return
 

	
 
        if parsed_url.scheme not in ['http', 'https', 'ssh', 'git+http', 'git+https']:
 
            raise urllib.error.URLError("Unsupported protocol in URL %r" % url)
 

	
 
        url = safe_bytes(url)
 

	
 
        if parsed_url.scheme == 'ssh':
 
            # in case of invalid uri or authentication issues, sshpeer will
 
            # throw an exception.
 
            mercurial.sshpeer.instance(repoui or mercurial.ui.ui(), url, False).lookup(b'tip')
 
            return
 

	
 
        url_prefix = None
 
        if '+' in parsed_url.scheme:
 
            url_prefix, url = url.split(b'+', 1)
 
        if '+' in parsed_url.scheme:  # strip 'git+' for hg-git URLs
 
            url = url.split(b'+', 1)[1]
 

	
 
        url_obj = mercurial.util.url(url)
 
        test_uri, handlers = get_urllib_request_handlers(url_obj)
 

	
 
        url_obj.passwd = b'*****'
 
        cleaned_uri = str(url_obj)
 

	
 
        o = urllib.request.build_opener(*handlers)
 
        o.addheaders = [('Content-Type', 'application/mercurial-0.1'),
 
                        ('Accept', 'application/mercurial-0.1')]
 

	
 
        req = urllib.request.Request(
 
            "%s?%s" % (
 
                safe_str(test_uri),
 
                urllib.parse.urlencode({
 
                    'cmd': 'between',
 
                    'pairs': "%s-%s" % ('0' * 40, '0' * 40),
 
                })
 
            ))
 

	
 
        try:
 
            resp = o.open(req)
 
            if resp.code != 200:
 
                raise Exception('Return Code is not 200')
 
        except Exception as e:
 
            # means it cannot be cloned
 
            raise urllib.error.URLError("[%s] org_exc: %s" % (cleaned_uri, e))
 

	
 
        if not url_prefix: # skip git+http://... etc
 
        if parsed_url.scheme in ['http', 'https']:  # skip git+http://... etc
 
            # now check if it's a proper hg repo
 
            try:
 
                mercurial.httppeer.instance(repoui or mercurial.ui.ui(), url, False).lookup(b'tip')
 
            except Exception as e:
 
                raise urllib.error.URLError(
 
                    "url [%s] does not look like an hg repo org_exc: %s"
 
                    % (cleaned_uri, e))
 

	
 
    def _get_repo(self, create, src_url=None, update_after_clone=False):
 
        """
 
        Function will check for mercurial repository in given path and return
 
        a localrepo object. If there is no repository in that path it will
 
        raise an exception unless ``create`` parameter is set to True - in
 
        that case repository would be created and returned.
 
        If ``src_url`` is given, would try to clone repository from the
 
        location at given clone_point. Additionally it'll make update to
 
        working copy accordingly to ``update_after_clone`` flag
 
        """
 
        try:
 
            if src_url:
 
                url = self._get_url(src_url)
 
                opts = {}
 
                if not update_after_clone:
 
                    opts.update({'noupdate': True})
 
                MercurialRepository._check_url(url, self.baseui)
 
                mercurial.commands.clone(self.baseui, safe_bytes(url), safe_bytes(self.path), **opts)
 

	
 
                # Don't try to create if we've already cloned repo
 
                create = False
 
            return mercurial.localrepo.instance(self.baseui, safe_bytes(self.path), create=create)
 
        except (mercurial.error.Abort, mercurial.error.RepoError) as err:
 
            if create:
 
                msg = "Cannot create repository at %s. Original error was %s" \
 
                    % (self.name, err)
 
            else:
 
                msg = "Not valid repository at %s. Original error was %s" \
 
                    % (self.name, err)
 
            raise RepositoryError(msg)
 

	
 
    @LazyProperty
 
    def in_memory_changeset(self):
 
        return inmemory.MercurialInMemoryChangeset(self)
 

	
 
    @LazyProperty
 
    def description(self):
 
        _desc = self._repo.ui.config(b'web', b'description', None, untrusted=True)
 
        return safe_str(_desc or b'unknown')
 

	
0 comments (0 inline, 0 general)