Changeset - d727e81e0097
[Not reviewed]
stable
0 3 0
Thomas De Schampheleire - 6 years ago 2020-07-22 21:55:57
thomas.de_schampheleire@nokia.com
vcs: fix cloning remote repository with HTTP authentication (Issue #379)

Using a remote clone URI of
http://user:pass@host/...
triggered an exception:

...
E File ".../kallithea/lib/utils.py", line 256, in is_valid_repo_uri
E GitRepository._check_url(url)
E File ".../kallithea/lib/vcs/backends/git/repository.py", line 183, in _check_url
E passmgr.add_password(*authinfo)
E File "/usr/lib/python3.7/urllib/request.py", line 848, in add_password
E self.reduce_uri(u, default_port) for u in uri)
E File "/usr/lib/python3.7/urllib/request.py", line 848, in <genexpr>
E self.reduce_uri(u, default_port) for u in uri)
E File "/usr/lib/python3.7/urllib/request.py", line 875, in reduce_uri
E host, port = splitport(authority)
E File "/usr/lib/python3.7/urllib/parse.py", line 1022, in splitport
E match = _portprog.fullmatch(host)
E TypeError: cannot use a string pattern on a bytes-like object


The authinfo tuple is obtained via mercurial.util.url, which unfortunately
returns a tuple of bytes whereas urllib expects strings.
It seems that mercurial internally has some more hacking around urllib as
urllibcompat.py, which we don't use.

Therefore, transform the bytes into strings before passing authinfo to
urllib. As the realm can be None, we need to check it specifically otherwise
safe_str would return a string 'None'.

A basic test that catches the mentioned problem is added, even though it
does not actually test that cloning with auth info will actually work (it
only tests that it fails cleanly if the URI is not reachable).

Additionally, one use of 'test_uri' in hg/repository.py still needed to be
transformed from bytes to string. For git this was already ok.
3 files changed with 28 insertions and 2 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/vcs/backends/hg/repository.py
Show inline comments
 
@@ -299,49 +299,49 @@ class MercurialRepository(BaseRepository
 
            return True
 

	
 
        if url.startswith(b'ssh:'):
 
            # in case of invalid uri or authentication issues, sshpeer will
 
            # throw an exception.
 
            mercurial.sshpeer.instance(repoui or mercurial.ui.ui(), url, False).lookup(b'tip')
 
            return True
 

	
 
        url_prefix = None
 
        if b'+' in url[:url.find(b'://')]:
 
            url_prefix, url = url.split(b'+', 1)
 

	
 
        url_obj = mercurial.util.url(url)
 
        test_uri, handlers = get_urllib_request_handlers(url_obj)
 

	
 
        url_obj.passwd = b'*****'
 
        cleaned_uri = str(url_obj)
 

	
 
        o = urllib.request.build_opener(*handlers)
 
        o.addheaders = [('Content-Type', 'application/mercurial-0.1'),
 
                        ('Accept', 'application/mercurial-0.1')]
 

	
 
        req = urllib.request.Request(
 
            "%s?%s" % (
 
                test_uri,
 
                safe_str(test_uri),
 
                urllib.parse.urlencode({
 
                    'cmd': 'between',
 
                    'pairs': "%s-%s" % ('0' * 40, '0' * 40),
 
                })
 
            ))
 

	
 
        try:
 
            resp = o.open(req)
 
            if resp.code != 200:
 
                raise Exception('Return Code is not 200')
 
        except Exception as e:
 
            # means it cannot be cloned
 
            raise urllib.error.URLError("[%s] org_exc: %s" % (cleaned_uri, e))
 

	
 
        if not url_prefix: # skip svn+http://... (and git+... too)
 
            # now check if it's a proper hg repo
 
            try:
 
                mercurial.httppeer.instance(repoui or mercurial.ui.ui(), url, False).lookup(b'tip')
 
            except Exception as e:
 
                raise urllib.error.URLError(
 
                    "url [%s] does not look like an hg repo org_exc: %s"
 
                    % (cleaned_uri, e))
 

	
 
        return True
kallithea/lib/vcs/utils/helpers.py
Show inline comments
 
"""
 
Utilities aimed to help achieve mostly basic tasks.
 
"""
 

	
 
import datetime
 
import os
 
import re
 
import time
 
import urllib.request
 

	
 
import mercurial.url
 

	
 
from kallithea.lib.vcs.exceptions import RepositoryError, VCSError
 
from kallithea.lib.vcs.utils import safe_str
 
from kallithea.lib.vcs.utils.paths import abspath
 

	
 

	
 
ALIASES = ['hg', 'git']
 

	
 

	
 
def get_scm(path, search_up=False, explicit_alias=None):
 
    """
 
    Returns one of alias from ``ALIASES`` (in order of precedence same as
 
    shortcuts given in ``ALIASES``) and top working dir path for the given
 
    argument. If no scm-specific directory is found or more than one scm is
 
    found at that directory, ``VCSError`` is raised.
 

	
 
    :param search_up: if set to ``True``, this function would try to
 
      move up to parent directory every time no scm is recognized for the
 
      currently checked path. Default: ``False``.
 
    :param explicit_alias: can be one of available backend aliases, when given
 
      it will return given explicit alias in repositories under more than one
 
      version control, if explicit_alias is different than found it will raise
 
      VCSError
 
    """
 
    if not os.path.isdir(path):
 
        raise VCSError("Given path %s is not a directory" % path)
 

	
 
@@ -205,32 +206,44 @@ def parse_datetime(text):
 
            if groupdict['days']:
 
                days += int(matched.groupdict()['days'])
 
            if groupdict['weeks']:
 
                days += int(matched.groupdict()['weeks']) * 7
 
            past = datetime.datetime.now() - datetime.timedelta(days=days)
 
            return datetime.datetime(*past.timetuple()[:3])
 

	
 
    raise ValueError('Wrong date: "%s"' % text)
 

	
 

	
 
def get_dict_for_attrs(obj, attrs):
 
    """
 
    Returns dictionary for each attribute from given ``obj``.
 
    """
 
    data = {}
 
    for attr in attrs:
 
        data[attr] = getattr(obj, attr)
 
    return data
 

	
 
def get_urllib_request_handlers(url_obj):
 
    handlers = []
 
    test_uri, authinfo = url_obj.authinfo()
 

	
 
    if authinfo:
 
        # authinfo is a tuple (realm, uris, user, password) where 'uris' itself
 
        # is a tuple of URIs.
 
        # If url_obj is obtained via mercurial.util.url, the obtained authinfo
 
        # values will be bytes, e.g.
 
        #    (None, (b'http://127.0.0.1/repo', b'127.0.0.1'), b'user', b'pass')
 
        # However, urllib expects strings, not bytes, so we must convert them.
 

	
 
        # create a password manager
 
        passmgr = urllib.request.HTTPPasswordMgrWithDefaultRealm()
 
        passmgr.add_password(*authinfo)
 
        passmgr.add_password(
 
            safe_str(authinfo[0]) if authinfo[0] else None, # realm
 
            tuple(safe_str(x) for x in authinfo[1]),        # uris
 
            safe_str(authinfo[2]),                          # user
 
            safe_str(authinfo[3]),                          # password
 
        )
 

	
 
        handlers.extend((mercurial.url.httpbasicauthhandler(passmgr),
 
                         mercurial.url.httpdigestauthhandler(passmgr)))
 

	
 
    return test_uri, handlers
kallithea/tests/functional/test_admin_repos.py
Show inline comments
 
@@ -323,48 +323,61 @@ class _BaseTestCase(base.TestController)
 
        repo_name = self.NEW_REPO
 
        description = 'description for newly created repo'
 
        response = self.app.post(base.url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                clone_uri='http://127.0.0.1/repo',
 
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        response.mustcontain('Invalid repository URL')
 

	
 
    def test_create_remote_repo_wrong_clone_uri_hg_svn(self):
 
        self.log_user()
 
        repo_name = self.NEW_REPO
 
        description = 'description for newly created repo'
 
        response = self.app.post(base.url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                clone_uri='svn+http://127.0.0.1/repo',
 
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        response.mustcontain('Invalid repository URL')
 

	
 
    def test_create_remote_repo_wrong_clone_uri_http_auth(self):
 
        self.log_user()
 
        repo_name = self.NEW_REPO
 
        description = 'description for newly created repo'
 
        response = self.app.post(base.url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                clone_uri='http://user:pass@127.0.0.1/repo',
 
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        response.mustcontain('Invalid repository URL')
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        repo_name = 'vcs_test_new_to_delete_%s' % self.REPO_TYPE
 
        description = 'description for newly created repo'
 
        response = self.app.post(base.url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_name=repo_name,
 
                                                repo_description=description,
 
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(base.url('repo_check_home', repo_name=repo_name))
 
        self.checkSessionFlash(response,
 
                               'Created repository <a href="/%s">%s</a>'
 
                               % (repo_name, repo_name))
 
        # test if the repo was created in the database
 
        new_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name).one()
 

	
 
        assert new_repo.repo_name == repo_name
 
        assert new_repo.description == description
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(base.url('summary_home', repo_name=repo_name))
0 comments (0 inline, 0 general)