Changeset - d2f59de17bef
[Not reviewed]
default
0 2 0
Mads Kiilerich (mads) - 5 years ago 2020-12-03 11:16:44
mads@kiilerich.com
Grafted from: 4581409d9119
vcs: use urlparse in _check_url

This makes URL checking more standard and more explicit.
2 files changed with 32 insertions and 11 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/vcs/backends/git/repository.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    vcs.backends.git.repository
 
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
    Git repository implementation.
 

	
 
    :created_on: Apr 8, 2010
 
    :copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak.
 
"""
 

	
 
import errno
 
import logging
 
import os
 
import re
 
import time
 
import urllib.error
 
import urllib.parse
 
import urllib.request
 
from collections import OrderedDict
 

	
 
import mercurial.util  # import url as hg_url
 
from dulwich.client import SubprocessGitClient
 
from dulwich.config import ConfigFile
 
from dulwich.objects import Tag
 
from dulwich.repo import NotGitRepository, Repo
 
from dulwich.server import update_server_info
 

	
 
from kallithea.lib.vcs import subprocessio
 
from kallithea.lib.vcs.backends.base import BaseRepository, CollectionGenerator
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import (BranchDoesNotExistError, ChangesetDoesNotExistError, EmptyRepositoryError, RepositoryError, TagAlreadyExistError,
 
                                          TagDoesNotExistError)
 
from kallithea.lib.vcs.utils import ascii_bytes, ascii_str, date_fromtimestamp, makedate, safe_bytes, safe_str
 
from kallithea.lib.vcs.utils.helpers import get_urllib_request_handlers
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.utils.paths import abspath, get_user_home
 

	
 
from . import changeset, inmemory, workdir
 

	
 

	
 
SHA_PATTERN = re.compile(r'^([0-9a-fA-F]{12}|[0-9a-fA-F]{40})$')
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class GitRepository(BaseRepository):
 
    """
 
    Git repository backend.
 
    """
 
    DEFAULT_BRANCH_NAME = 'master'
 
    scm = 'git'
 

	
 
    def __init__(self, repo_path, create=False, src_url=None,
 
                 update_after_clone=False, bare=False, baseui=None):
 
        baseui  # unused
 
        self.path = abspath(repo_path)
 
        self.repo = self._get_repo(create, src_url, update_after_clone, bare)
 
        self.bare = self.repo.bare
 

	
 
    @property
 
    def _config_files(self):
 
        return [
 
            self.bare and abspath(self.path, 'config')
 
                      or abspath(self.path, '.git', 'config'),
 
             abspath(get_user_home(), '.gitconfig'),
 
         ]
 

	
 
    @property
 
    def _repo(self):
 
        return self.repo
 

	
 
    @property
 
    def head(self):
 
        try:
 
            return self._repo.head()
 
        except KeyError:
 
            return None
 

	
 
    @property
 
    def _empty(self):
 
        """
 
        Checks if repository is empty ie. without any changesets
 
        """
 

	
 
        try:
 
            self.revisions[0]
 
        except (KeyError, IndexError):
 
            return True
 
        return False
 

	
 
    @LazyProperty
 
    def revisions(self):
 
        """
 
        Returns list of revisions' ids, in ascending order.  Being lazy
 
        attribute allows external tools to inject shas from cache.
 
        """
 
        return self._get_all_revisions()
 

	
 
    @classmethod
 
    def _run_git_command(cls, cmd, cwd=None):
 
        """
 
        Runs given ``cmd`` as git command and returns output bytes in a tuple
 
        (stdout, stderr) ... or raise RepositoryError.
 

	
 
        :param cmd: git command to be executed
 
        :param cwd: passed directly to subprocess
 
        """
 
        # need to clean fix GIT_DIR !
 
        gitenv = dict(os.environ)
 
        gitenv.pop('GIT_DIR', None)
 
        gitenv['GIT_CONFIG_NOGLOBAL'] = '1'
 

	
 
        assert isinstance(cmd, list), cmd
 
        cmd = [settings.GIT_EXECUTABLE_PATH, '-c', 'core.quotepath=false'] + cmd
 
        try:
 
            p = subprocessio.SubprocessIOChunker(cmd, cwd=cwd, env=gitenv, shell=False)
 
        except (EnvironmentError, OSError) as err:
 
            # output from the failing process is in str(EnvironmentError)
 
            msg = ("Couldn't run git command %s.\n"
 
                   "Subprocess failed with '%s': %s\n" %
 
                   (cmd, type(err).__name__, err)
 
            ).strip()
 
            log.error(msg)
 
            raise RepositoryError(msg)
 

	
 
        try:
 
            stdout = b''.join(p.output)
 
            stderr = b''.join(p.error)
 
        finally:
 
            p.close()
 
        # TODO: introduce option to make commands fail if they have any stderr output?
 
        if stderr:
 
            log.debug('stderr from %s:\n%s', cmd, stderr)
 
        else:
 
            log.debug('stderr from %s: None', cmd)
 
        return stdout, stderr
 

	
 
    def run_git_command(self, cmd):
 
        """
 
        Runs given ``cmd`` as git command with cwd set to current repo.
 
        Returns stdout as unicode str ... or raise RepositoryError.
 
        """
 
        cwd = None
 
        if os.path.isdir(self.path):
 
            cwd = self.path
 
        stdout, _stderr = self._run_git_command(cmd, cwd=cwd)
 
        return safe_str(stdout)
 

	
 
    @staticmethod
 
    def _check_url(url):
 
        r"""
 
        Raise URLError if url doesn't seem like a valid safe Git URL. We
 
        only allow http, https, git, and ssh URLs.
 

	
 
        For http and https URLs, make a connection and probe to see if it is valid.
 

	
 
        >>> GitRepository._check_url('git://example.com/my%20fine repo')
 

	
 
        >>> GitRepository._check_url('http://example.com:65537/repo')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Error parsing URL: 'http://example.com:65537/repo'>
 
        >>> GitRepository._check_url('foo')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Unsupported protocol in URL 'foo'>
 
        >>> GitRepository._check_url('file:///repo')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Unsupported protocol in URL 'file:///repo'>
 
        >>> GitRepository._check_url('git+http://example.com/repo')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Unsupported protocol in URL 'git+http://example.com/repo'>
 
        >>> GitRepository._check_url('git://example.com/%09')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Invalid escape character in path: '%'>
 
        >>> GitRepository._check_url('git://example.com/%x00')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Invalid escape character in path: '%'>
 
        >>> GitRepository._check_url(r'git://example.com/\u0009')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Invalid escape character in path: '\'>
 
        >>> GitRepository._check_url(r'git://example.com/\t')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Invalid escape character in path: '\'>
 
        >>> GitRepository._check_url('git://example.com/\t')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Invalid whitespace character in path: '\t'>
 
        """
 
        try:
 
            parsed_url = urllib.parse.urlparse(url)
 
            parsed_url.port  # trigger netloc parsing which might raise ValueError
 
        except ValueError:
 
            raise urllib.error.URLError("Error parsing URL: %r" % url)
 

	
 
        # check first if it's not an local url
 
        if os.path.isabs(url) and os.path.isdir(url):
 
            return
 

	
 
        if url.startswith('git://'):
 
            try:
 
                _git_colon, _empty, _host, path = url.split('/', 3)
 
            except ValueError:
 
                raise urllib.error.URLError("Invalid URL: %r" % url)
 
        if parsed_url.scheme == 'git':
 
            # Mitigate problems elsewhere with incorrect handling of encoded paths.
 
            # Don't trust urllib.parse.unquote but be prepared for more flexible implementations elsewhere.
 
            # Space is the only allowed whitespace character - directly or % encoded. No other % or \ is allowed.
 
            for c in path.replace('%20', ' '):
 
            for c in parsed_url.path.replace('%20', ' '):
 
                if c in '%\\':
 
                    raise urllib.error.URLError("Invalid escape character in path: '%s'" % c)
 
                if c.isspace() and c != ' ':
 
                    raise urllib.error.URLError("Invalid whitespace character in path: %r" % c)
 
            return
 

	
 
        if not url.startswith('http://') and not url.startswith('https://'):
 
        if parsed_url.scheme not in ['http', 'https']:
 
            raise urllib.error.URLError("Unsupported protocol in URL %r" % url)
 

	
 
        url_obj = mercurial.util.url(safe_bytes(url))
 
        test_uri, handlers = get_urllib_request_handlers(url_obj)
 
        if not test_uri.endswith(b'info/refs'):
 
            test_uri = test_uri.rstrip(b'/') + b'/info/refs'
 

	
 
        url_obj.passwd = b'*****'
 
        cleaned_uri = str(url_obj)
 

	
 
        o = urllib.request.build_opener(*handlers)
 
        o.addheaders = [('User-Agent', 'git/1.7.8.0')]  # fake some git
 

	
 
        req = urllib.request.Request(
 
            "%s?%s" % (
 
                safe_str(test_uri),
 
                urllib.parse.urlencode({"service": 'git-upload-pack'})
 
            ))
 

	
 
        try:
 
            resp = o.open(req)
 
            if resp.code != 200:
 
                raise Exception('Return Code is not 200')
 
        except Exception as e:
 
            # means it cannot be cloned
 
            raise urllib.error.URLError("[%s] org_exc: %s" % (cleaned_uri, e))
 

	
 
        # now detect if it's proper git repo
 
        gitdata = resp.read()
 
        if b'service=git-upload-pack' not in gitdata:
 
            raise urllib.error.URLError(
 
                "url [%s] does not look like an git" % cleaned_uri)
 

	
 
    def _get_repo(self, create, src_url=None, update_after_clone=False,
 
                  bare=False):
 
        if create and os.path.exists(self.path):
 
            raise RepositoryError("Location already exist")
 
        if src_url and not create:
 
            raise RepositoryError("Create should be set to True if src_url is "
 
                                  "given (clone operation creates repository)")
 
        try:
 
            if create and src_url:
 
                GitRepository._check_url(src_url)
 
                self.clone(src_url, update_after_clone, bare)
 
                return Repo(self.path)
 
            elif create:
 
                os.makedirs(self.path)
 
                if bare:
 
                    return Repo.init_bare(self.path)
 
                else:
 
                    return Repo.init(self.path)
 
            else:
 
                return Repo(self.path)
 
        except (NotGitRepository, OSError) as err:
 
            raise RepositoryError(err)
 

	
 
    def _get_all_revisions(self):
 
        # we must check if this repo is not empty, since later command
 
        # fails if it is. And it's cheaper to ask than throw the subprocess
 
        # errors
 
        try:
 
            self._repo.head()
 
        except KeyError:
 
            return []
 

	
 
        rev_filter = settings.GIT_REV_FILTER
 
        cmd = ['rev-list', rev_filter, '--reverse', '--date-order']
 
        try:
 
            so = self.run_git_command(cmd)
 
        except RepositoryError:
 
            # Can be raised for empty repositories
 
            return []
 
        return so.splitlines()
 

	
 
    def _get_all_revisions2(self):
 
        # alternate implementation using dulwich
 
        includes = [ascii_str(sha) for key, (sha, type_) in self._parsed_refs.items()
 
                    if type_ != b'T']
 
        return [c.commit.id for c in self._repo.get_walker(include=includes)]
 

	
 
    def _get_revision(self, revision):
 
        """
 
        Given any revision identifier, returns a 40 char string with revision hash.
 
        """
 
        if self._empty:
 
            raise EmptyRepositoryError("There are no changesets yet")
 

	
 
        if revision in (None, '', 'tip', 'HEAD', 'head', -1):
 
            revision = -1
 

	
 
        if isinstance(revision, int):
 
            try:
 
                return self.revisions[revision]
 
            except IndexError:
 
                msg = "Revision %r does not exist for %s" % (revision, self.name)
 
                raise ChangesetDoesNotExistError(msg)
 

	
 
        if isinstance(revision, str):
 
            if revision.isdigit() and (len(revision) < 12 or len(revision) == revision.count('0')):
 
                try:
 
                    return self.revisions[int(revision)]
 
                except IndexError:
 
                    msg = "Revision %r does not exist for %s" % (revision, self)
 
                    raise ChangesetDoesNotExistError(msg)
 

	
 
            # get by branch/tag name
 
            _ref_revision = self._parsed_refs.get(safe_bytes(revision))
 
            if _ref_revision:  # and _ref_revision[1] in [b'H', b'RH', b'T']:
 
                return ascii_str(_ref_revision[0])
 

	
 
            if revision in self.revisions:
 
                return revision
 

	
 
            # maybe it's a tag ? we don't have them in self.revisions
 
            if revision in self.tags.values():
 
                return revision
 

	
 
            if SHA_PATTERN.match(revision):
 
                msg = "Revision %r does not exist for %s" % (revision, self.name)
 
                raise ChangesetDoesNotExistError(msg)
 

	
 
        raise ChangesetDoesNotExistError("Given revision %r not recognized" % revision)
 

	
 
    def get_ref_revision(self, ref_type, ref_name):
 
        """
 
        Returns ``GitChangeset`` object representing repository's
 
        changeset at the given ``revision``.
 
        """
 
        return self._get_revision(ref_name)
 

	
 
    def _get_archives(self, archive_name='tip'):
 

	
 
        for i in [('zip', '.zip'), ('gz', '.tar.gz'), ('bz2', '.tar.bz2')]:
 
            yield {"type": i[0], "extension": i[1], "node": archive_name}
 

	
 
    def _get_url(self, url):
 
        """
 
        Returns normalized url. If schema is not given, would fall to
 
        filesystem (``file:///``) schema.
 
        """
 
        if url != 'default' and '://' not in url:
 
            url = ':///'.join(('file', url))
 
        return url
 

	
 
    @LazyProperty
 
    def name(self):
 
        return os.path.basename(self.path)
 

	
 
    @LazyProperty
 
    def last_change(self):
 
        """
 
        Returns last change made on this repository as datetime object
 
        """
 
        return date_fromtimestamp(self._get_mtime(), makedate()[1])
 

	
 
    def _get_mtime(self):
 
        try:
 
            return time.mktime(self.get_changeset().date.timetuple())
 
        except RepositoryError:
 
            idx_loc = '' if self.bare else '.git'
 
            # fallback to filesystem
 
            in_path = os.path.join(self.path, idx_loc, "index")
 
            he_path = os.path.join(self.path, idx_loc, "HEAD")
 
            if os.path.exists(in_path):
 
                return os.stat(in_path).st_mtime
 
            else:
 
                return os.stat(he_path).st_mtime
 

	
 
    @LazyProperty
 
    def description(self):
 
        return safe_str(self._repo.get_description() or b'unknown')
 

	
 
    @LazyProperty
 
    def contact(self):
 
        undefined_contact = 'Unknown'
 
        return undefined_contact
 

	
 
    @property
 
    def branches(self):
 
        if not self.revisions:
 
            return {}
 
        _branches = [(safe_str(key), ascii_str(sha))
 
                     for key, (sha, type_) in self._parsed_refs.items() if type_ == b'H']
 
        return OrderedDict(sorted(_branches, key=(lambda ctx: ctx[0]), reverse=False))
 

	
 
    @LazyProperty
 
    def closed_branches(self):
 
        return {}
 

	
 
    @LazyProperty
 
    def tags(self):
 
        return self._get_tags()
kallithea/lib/vcs/backends/hg/repository.py
Show inline comments
 
@@ -102,398 +102,413 @@ class MercurialRepository(BaseRepository
 
    def name(self):
 
        return os.path.basename(self.path)
 

	
 
    @LazyProperty
 
    def branches(self):
 
        return self._get_branches()
 

	
 
    @LazyProperty
 
    def closed_branches(self):
 
        return self._get_branches(normal=False, closed=True)
 

	
 
    @LazyProperty
 
    def allbranches(self):
 
        """
 
        List all branches, including closed branches.
 
        """
 
        return self._get_branches(closed=True)
 

	
 
    def _get_branches(self, normal=True, closed=False):
 
        """
 
        Gets branches for this repository
 
        Returns only not closed branches by default
 

	
 
        :param closed: return also closed branches for mercurial
 
        :param normal: return also normal branches
 
        """
 

	
 
        if self._empty:
 
            return {}
 

	
 
        bt = OrderedDict()
 
        for bn, _heads, node, isclosed in sorted(self._repo.branchmap().iterbranches()):
 
            if isclosed:
 
                if closed:
 
                    bt[safe_str(bn)] = ascii_str(mercurial.node.hex(node))
 
            else:
 
                if normal:
 
                    bt[safe_str(bn)] = ascii_str(mercurial.node.hex(node))
 
        return bt
 

	
 
    @LazyProperty
 
    def tags(self):
 
        """
 
        Gets tags for this repository
 
        """
 
        return self._get_tags()
 

	
 
    def _get_tags(self):
 
        if self._empty:
 
            return {}
 

	
 
        return OrderedDict(sorted(
 
            ((safe_str(n), ascii_str(mercurial.node.hex(h))) for n, h in self._repo.tags().items()),
 
            reverse=True,
 
            key=lambda x: x[0],  # sort by name
 
        ))
 

	
 
    def tag(self, name, user, revision=None, message=None, date=None,
 
            **kwargs):
 
        """
 
        Creates and returns a tag for the given ``revision``.
 

	
 
        :param name: name for new tag
 
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
 
        :param revision: changeset id for which new tag would be created
 
        :param message: message of the tag's commit
 
        :param date: date of tag's commit
 

	
 
        :raises TagAlreadyExistError: if tag with same name already exists
 
        """
 
        if name in self.tags:
 
            raise TagAlreadyExistError("Tag %s already exists" % name)
 
        changeset = self.get_changeset(revision)
 
        local = kwargs.setdefault('local', False)
 

	
 
        if message is None:
 
            message = "Added tag %s for changeset %s" % (name,
 
                changeset.short_id)
 

	
 
        if date is None:
 
            date = safe_bytes(datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S'))
 

	
 
        try:
 
            mercurial.tags.tag(self._repo, safe_bytes(name), changeset._ctx.node(), safe_bytes(message), local, safe_bytes(user), date)
 
        except mercurial.error.Abort as e:
 
            raise RepositoryError(e.args[0])
 

	
 
        # Reinitialize tags
 
        self.tags = self._get_tags()
 
        tag_id = self.tags[name]
 

	
 
        return self.get_changeset(revision=tag_id)
 

	
 
    def remove_tag(self, name, user, message=None, date=None):
 
        """
 
        Removes tag with the given ``name``.
 

	
 
        :param name: name of the tag to be removed
 
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
 
        :param message: message of the tag's removal commit
 
        :param date: date of tag's removal commit
 

	
 
        :raises TagDoesNotExistError: if tag with given name does not exists
 
        """
 
        if name not in self.tags:
 
            raise TagDoesNotExistError("Tag %s does not exist" % name)
 
        if message is None:
 
            message = "Removed tag %s" % name
 
        if date is None:
 
            date = safe_bytes(datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S'))
 
        local = False
 

	
 
        try:
 
            mercurial.tags.tag(self._repo, safe_bytes(name), mercurial.commands.nullid, safe_bytes(message), local, safe_bytes(user), date)
 
            self.tags = self._get_tags()
 
        except mercurial.error.Abort as e:
 
            raise RepositoryError(e.args[0])
 

	
 
    @LazyProperty
 
    def bookmarks(self):
 
        """
 
        Gets bookmarks for this repository
 
        """
 
        return self._get_bookmarks()
 

	
 
    def _get_bookmarks(self):
 
        if self._empty:
 
            return {}
 

	
 
        return OrderedDict(sorted(
 
            ((safe_str(n), ascii_str(mercurial.node.hex(h))) for n, h in self._repo._bookmarks.items()),
 
            reverse=True,
 
            key=lambda x: x[0],  # sort by name
 
        ))
 

	
 
    def _get_all_revisions(self):
 
        return [ascii_str(self._repo[x].hex()) for x in self._repo.filtered(b'visible').changelog.revs()]
 

	
 
    def get_diff(self, rev1, rev2, path='', ignore_whitespace=False,
 
                  context=3):
 
        """
 
        Returns (git like) *diff*, as plain text. Shows changes introduced by
 
        ``rev2`` since ``rev1``.
 

	
 
        :param rev1: Entry point from which diff is shown. Can be
 
          ``self.EMPTY_CHANGESET`` - in this case, patch showing all
 
          the changes since empty state of the repository until ``rev2``
 
        :param rev2: Until which revision changes should be shown.
 
        :param ignore_whitespace: If set to ``True``, would not show whitespace
 
          changes. Defaults to ``False``.
 
        :param context: How many lines before/after changed lines should be
 
          shown. Defaults to ``3``. If negative value is passed-in, it will be
 
          set to ``0`` instead.
 
        """
 

	
 
        # Negative context values make no sense, and will result in
 
        # errors. Ensure this does not happen.
 
        if context < 0:
 
            context = 0
 

	
 
        if hasattr(rev1, 'raw_id'):
 
            rev1 = getattr(rev1, 'raw_id')
 

	
 
        if hasattr(rev2, 'raw_id'):
 
            rev2 = getattr(rev2, 'raw_id')
 

	
 
        # Check if given revisions are present at repository (may raise
 
        # ChangesetDoesNotExistError)
 
        if rev1 != self.EMPTY_CHANGESET:
 
            self.get_changeset(rev1)
 
        self.get_changeset(rev2)
 
        if path:
 
            file_filter = mercurial.match.exact([safe_bytes(path)])
 
        else:
 
            file_filter = None
 

	
 
        return b''.join(mercurial.patch.diff(self._repo, rev1, rev2, match=file_filter,
 
                          opts=mercurial.mdiff.diffopts(git=True,
 
                                        showfunc=True,
 
                                        ignorews=ignore_whitespace,
 
                                        context=context)))
 

	
 
    @staticmethod
 
    def _check_url(url, repoui=None):
 
        r"""
 
        Raise URLError if url doesn't seem like a valid safe Hg URL. We
 
        only allow http, https, ssh, and hg-git URLs.
 

	
 
        For http, https and git URLs, make a connection and probe to see if it is valid.
 

	
 
        On failures it'll raise urllib2.HTTPError, exception is also thrown
 
        when the return code is non 200
 

	
 
        >>> MercurialRepository._check_url('file:///repo')
 

	
 
        >>> MercurialRepository._check_url('http://example.com:65537/repo')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Error parsing URL: 'http://example.com:65537/repo'>
 
        """
 
        try:
 
            parsed_url = urllib.parse.urlparse(url)
 
            parsed_url.port  # trigger netloc parsing which might raise ValueError
 
        except ValueError:
 
            raise urllib.error.URLError("Error parsing URL: %r" % url)
 

	
 
        # check first if it's not an local url
 
        url = safe_bytes(url)
 
        if os.path.isdir(url) or url.startswith(b'file:'):
 
        if os.path.isdir(url) or parsed_url.scheme == 'file':
 
            # When creating repos, _get_url will use file protocol for local paths
 
            return
 

	
 
        if url.startswith(b'ssh:'):
 
        url = safe_bytes(url)
 

	
 
        if parsed_url.scheme == 'ssh':
 
            # in case of invalid uri or authentication issues, sshpeer will
 
            # throw an exception.
 
            mercurial.sshpeer.instance(repoui or mercurial.ui.ui(), url, False).lookup(b'tip')
 
            return
 

	
 
        url_prefix = None
 
        if b'+' in url[:url.find(b'://')]:
 
        if '+' in parsed_url.scheme:
 
            url_prefix, url = url.split(b'+', 1)
 

	
 
        url_obj = mercurial.util.url(url)
 
        test_uri, handlers = get_urllib_request_handlers(url_obj)
 

	
 
        url_obj.passwd = b'*****'
 
        cleaned_uri = str(url_obj)
 

	
 
        o = urllib.request.build_opener(*handlers)
 
        o.addheaders = [('Content-Type', 'application/mercurial-0.1'),
 
                        ('Accept', 'application/mercurial-0.1')]
 

	
 
        req = urllib.request.Request(
 
            "%s?%s" % (
 
                safe_str(test_uri),
 
                urllib.parse.urlencode({
 
                    'cmd': 'between',
 
                    'pairs': "%s-%s" % ('0' * 40, '0' * 40),
 
                })
 
            ))
 

	
 
        try:
 
            resp = o.open(req)
 
            if resp.code != 200:
 
                raise Exception('Return Code is not 200')
 
        except Exception as e:
 
            # means it cannot be cloned
 
            raise urllib.error.URLError("[%s] org_exc: %s" % (cleaned_uri, e))
 

	
 
        if not url_prefix: # skip git+http://... etc
 
            # now check if it's a proper hg repo
 
            try:
 
                mercurial.httppeer.instance(repoui or mercurial.ui.ui(), url, False).lookup(b'tip')
 
            except Exception as e:
 
                raise urllib.error.URLError(
 
                    "url [%s] does not look like an hg repo org_exc: %s"
 
                    % (cleaned_uri, e))
 

	
 
    def _get_repo(self, create, src_url=None, update_after_clone=False):
 
        """
 
        Function will check for mercurial repository in given path and return
 
        a localrepo object. If there is no repository in that path it will
 
        raise an exception unless ``create`` parameter is set to True - in
 
        that case repository would be created and returned.
 
        If ``src_url`` is given, would try to clone repository from the
 
        location at given clone_point. Additionally it'll make update to
 
        working copy accordingly to ``update_after_clone`` flag
 
        """
 
        try:
 
            if src_url:
 
                url = self._get_url(src_url)
 
                opts = {}
 
                if not update_after_clone:
 
                    opts.update({'noupdate': True})
 
                MercurialRepository._check_url(url, self.baseui)
 
                mercurial.commands.clone(self.baseui, safe_bytes(url), safe_bytes(self.path), **opts)
 

	
 
                # Don't try to create if we've already cloned repo
 
                create = False
 
            return mercurial.localrepo.instance(self.baseui, safe_bytes(self.path), create=create)
 
        except (mercurial.error.Abort, mercurial.error.RepoError) as err:
 
            if create:
 
                msg = "Cannot create repository at %s. Original error was %s" \
 
                    % (self.name, err)
 
            else:
 
                msg = "Not valid repository at %s. Original error was %s" \
 
                    % (self.name, err)
 
            raise RepositoryError(msg)
 

	
 
    @LazyProperty
 
    def in_memory_changeset(self):
 
        return inmemory.MercurialInMemoryChangeset(self)
 

	
 
    @LazyProperty
 
    def description(self):
 
        _desc = self._repo.ui.config(b'web', b'description', None, untrusted=True)
 
        return safe_str(_desc or b'unknown')
 

	
 
    @LazyProperty
 
    def contact(self):
 
        return safe_str(mercurial.hgweb.common.get_contact(self._repo.ui.config)
 
                            or b'Unknown')
 

	
 
    @LazyProperty
 
    def last_change(self):
 
        """
 
        Returns last change made on this repository as datetime object
 
        """
 
        return date_fromtimestamp(self._get_mtime(), makedate()[1])
 

	
 
    def _get_mtime(self):
 
        try:
 
            return time.mktime(self.get_changeset().date.timetuple())
 
        except RepositoryError:
 
            # fallback to filesystem
 
            cl_path = os.path.join(self.path, '.hg', "00changelog.i")
 
            st_path = os.path.join(self.path, '.hg', "store")
 
            if os.path.exists(cl_path):
 
                return os.stat(cl_path).st_mtime
 
            else:
 
                return os.stat(st_path).st_mtime
 

	
 
    def _get_revision(self, revision):
 
        """
 
        Given any revision identifier, returns a 40 char string with revision hash.
 

	
 
        :param revision: str or int or None
 
        """
 
        if self._empty:
 
            raise EmptyRepositoryError("There are no changesets yet")
 

	
 
        if revision in [-1, None]:
 
            revision = b'tip'
 
        elif isinstance(revision, str):
 
            revision = safe_bytes(revision)
 

	
 
        try:
 
            if isinstance(revision, int):
 
                return ascii_str(self._repo[revision].hex())
 
            return ascii_str(mercurial.scmutil.revsymbol(self._repo, revision).hex())
 
        except (IndexError, ValueError, mercurial.error.RepoLookupError, TypeError):
 
            msg = "Revision %r does not exist for %s" % (safe_str(revision), self.name)
 
            raise ChangesetDoesNotExistError(msg)
 
        except (LookupError, ):
 
            msg = "Ambiguous identifier `%s` for %s" % (safe_str(revision), self.name)
 
            raise ChangesetDoesNotExistError(msg)
 

	
 
    def get_ref_revision(self, ref_type, ref_name):
 
        """
 
        Returns revision number for the given reference.
 
        """
 
        if ref_type == 'rev' and not ref_name.strip('0'):
 
            return self.EMPTY_CHANGESET
 
        # lookup up the exact node id
 
        _revset_predicates = {
 
                'branch': 'branch',
 
                'book': 'bookmark',
 
                'tag': 'tag',
 
                'rev': 'id',
 
            }
 
        # avoid expensive branch(x) iteration over whole repo
 
        rev_spec = "%%s & %s(%%s)" % _revset_predicates[ref_type]
 
        try:
 
            revs = self._repo.revs(rev_spec, ref_name, ref_name)
 
        except LookupError:
 
            msg = "Ambiguous identifier %s:%s for %s" % (ref_type, ref_name, self.name)
 
            raise ChangesetDoesNotExistError(msg)
 
        except mercurial.error.RepoLookupError:
 
            msg = "Revision %s:%s does not exist for %s" % (ref_type, ref_name, self.name)
 
            raise ChangesetDoesNotExistError(msg)
 
        if revs:
 
            revision = revs.last()
 
        else:
 
            # TODO: just report 'not found'?
 
            revision = ref_name
 

	
 
        return self._get_revision(revision)
 

	
 
    def _get_archives(self, archive_name='tip'):
 
        allowed = self.baseui.configlist(b"web", b"allow_archive",
 
                                         untrusted=True)
 
        for name, ext in [(b'zip', '.zip'), (b'gz', '.tar.gz'), (b'bz2', '.tar.bz2')]:
 
            if name in allowed or self._repo.ui.configbool(b"web",
 
                                                           b"allow" + name,
 
                                                           untrusted=True):
 
                yield {"type": safe_str(name), "extension": ext, "node": archive_name}
 

	
 
    def _get_url(self, url):
 
        """
 
        Returns normalized url. If schema is not given, fall back to
 
        filesystem (``file:///``) schema.
 
        """
 
        if url != 'default' and '://' not in url:
 
            url = "file:" + urllib.request.pathname2url(url)
 
        return url
 

	
 
    def get_changeset(self, revision=None):
 
        """
 
        Returns ``MercurialChangeset`` object representing repository's
 
        changeset at the given ``revision``.
 
        """
 
        return changeset.MercurialChangeset(repository=self, revision=self._get_revision(revision))
 

	
 
    def get_changesets(self, start=None, end=None, start_date=None,
 
                       end_date=None, branch_name=None, reverse=False, max_revisions=None):
 
        """
 
        Returns iterator of ``MercurialChangeset`` objects from start to end
 
        (both are inclusive)
 

	
 
        :param start: None, str, int or mercurial lookup format
 
        :param end:  None, str, int or mercurial lookup format
 
        :param start_date:
0 comments (0 inline, 0 general)