Changeset - da519b9782e8
[Not reviewed]
stable
0 3 0
Mads Kiilerich (mads) - 4 years ago 2022-03-07 17:47:13
mads@kiilerich.com
hg: support Mercurial 6.1 without util.url
3 files changed with 15 insertions and 5 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/vcs/backends/git/repository.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    vcs.backends.git.repository
 
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
    Git repository implementation.
 

	
 
    :created_on: Apr 8, 2010
 
    :copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak.
 
"""
 

	
 
import errno
 
import logging
 
import os
 
import re
 
import time
 
import urllib.error
 
import urllib.parse
 
import urllib.request
 
from collections import OrderedDict
 

	
 
import mercurial.util  # import url as hg_url
 

	
 
try:
 
    from mercurial.utils.urlutil import url as hg_url
 
except ImportError:  # urlutil was introduced in Mercurial 5.8
 
    from mercurial.util import url as hg_url
 

	
 
from dulwich.client import SubprocessGitClient
 
from dulwich.config import ConfigFile
 
from dulwich.objects import Tag
 
from dulwich.repo import NotGitRepository, Repo
 
from dulwich.server import update_server_info
 

	
 
from kallithea.lib.vcs import subprocessio
 
from kallithea.lib.vcs.backends.base import BaseRepository, CollectionGenerator
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import (BranchDoesNotExistError, ChangesetDoesNotExistError, EmptyRepositoryError, RepositoryError, TagAlreadyExistError,
 
                                          TagDoesNotExistError)
 
from kallithea.lib.vcs.utils import ascii_bytes, ascii_str, date_fromtimestamp, makedate, safe_bytes, safe_str
 
from kallithea.lib.vcs.utils.helpers import get_urllib_request_handlers
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.utils.paths import abspath, get_user_home
 

	
 
from . import changeset, inmemory, workdir
 

	
 

	
 
SHA_PATTERN = re.compile(r'^([0-9a-fA-F]{12}|[0-9a-fA-F]{40})$')
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class GitRepository(BaseRepository):
 
    """
 
    Git repository backend.
 
    """
 
    DEFAULT_BRANCH_NAME = 'master'
 
    scm = 'git'
 

	
 
    def __init__(self, repo_path, create=False, src_url=None,
 
                 update_after_clone=False, bare=False, baseui=None):
 
        baseui  # unused
 
        self.path = abspath(repo_path)
 
        self.repo = self._get_repo(create, src_url, update_after_clone, bare)
 
        self.bare = self.repo.bare
 

	
 
    @property
 
    def _config_files(self):
 
        return [
 
            self.bare and abspath(self.path, 'config')
 
                      or abspath(self.path, '.git', 'config'),
 
             abspath(get_user_home(), '.gitconfig'),
 
         ]
 

	
 
    @property
 
    def _repo(self):
 
        return self.repo
 

	
 
    @property
 
    def head(self):
 
        try:
 
            return self._repo.head()
 
        except KeyError:
 
            return None
 

	
 
    @property
 
    def _empty(self):
 
        """
 
        Checks if repository is empty ie. without any changesets
 
        """
 

	
 
        try:
 
            self.revisions[0]
 
        except (KeyError, IndexError):
 
            return True
 
        return False
 

	
 
    @LazyProperty
 
    def revisions(self):
 
        """
 
        Returns list of revisions' ids, in ascending order.  Being lazy
 
        attribute allows external tools to inject shas from cache.
 
        """
 
        return self._get_all_revisions()
 

	
 
    @classmethod
 
    def _run_git_command(cls, cmd, cwd=None):
 
        """
 
        Runs given ``cmd`` as git command and returns output bytes in a tuple
 
        (stdout, stderr) ... or raise RepositoryError.
 

	
 
        :param cmd: git command to be executed
 
        :param cwd: passed directly to subprocess
 
        """
 
        # need to clean fix GIT_DIR !
 
        gitenv = dict(os.environ)
 
        gitenv.pop('GIT_DIR', None)
 
        gitenv['GIT_CONFIG_NOGLOBAL'] = '1'
 

	
 
        assert isinstance(cmd, list), cmd
 
        cmd = [settings.GIT_EXECUTABLE_PATH, '-c', 'core.quotepath=false'] + cmd
 
        try:
 
            p = subprocessio.SubprocessIOChunker(cmd, cwd=cwd, env=gitenv, shell=False)
 
        except (EnvironmentError, OSError) as err:
 
@@ -133,193 +138,193 @@ class GitRepository(BaseRepository):
 
        if stderr:
 
            log.debug('stderr from %s:\n%s', cmd, stderr)
 
        else:
 
            log.debug('stderr from %s: None', cmd)
 
        return stdout, stderr
 

	
 
    def run_git_command(self, cmd):
 
        """
 
        Runs given ``cmd`` as git command with cwd set to current repo.
 
        Returns stdout as unicode str ... or raise RepositoryError.
 
        """
 
        cwd = None
 
        if os.path.isdir(self.path):
 
            cwd = self.path
 
        stdout, _stderr = self._run_git_command(cmd, cwd=cwd)
 
        return safe_str(stdout)
 

	
 
    @staticmethod
 
    def _check_url(url):
 
        r"""
 
        Raise URLError if url doesn't seem like a valid safe Git URL. We
 
        only allow http, https, git, and ssh URLs.
 

	
 
        For http and https URLs, make a connection and probe to see if it is valid.
 

	
 
        >>> GitRepository._check_url('git://example.com/my%20fine repo')
 

	
 
        >>> GitRepository._check_url('http://example.com:65537/repo')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Error parsing URL: 'http://example.com:65537/repo'>
 
        >>> GitRepository._check_url('foo')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Unsupported protocol in URL 'foo'>
 
        >>> GitRepository._check_url('file:///repo')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Unsupported protocol in URL 'file:///repo'>
 
        >>> GitRepository._check_url('git+http://example.com/repo')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Unsupported protocol in URL 'git+http://example.com/repo'>
 
        >>> GitRepository._check_url('git://example.com/%09')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Invalid escape character in path: '%'>
 
        >>> GitRepository._check_url('git://example.com/%x00')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Invalid escape character in path: '%'>
 
        >>> GitRepository._check_url(r'git://example.com/\u0009')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Invalid escape character in path: '\'>
 
        >>> GitRepository._check_url(r'git://example.com/\t')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Invalid escape character in path: '\'>
 
        >>> GitRepository._check_url('git://example.com/\t')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Invalid ...>
 

	
 
        The failure above will be one of, depending on the level of WhatWG support:
 
        urllib.error.URLError: <urlopen error Invalid whitespace character in path: '\t'>
 
        urllib.error.URLError: <urlopen error Invalid url: 'git://example.com/    ' normalizes to 'git://example.com/'>
 
        """
 
        try:
 
            parsed_url = urllib.parse.urlparse(url)
 
            parsed_url.port  # trigger netloc parsing which might raise ValueError
 
        except ValueError:
 
            raise urllib.error.URLError("Error parsing URL: %r" % url)
 

	
 
        # check first if it's not an local url
 
        if os.path.isabs(url) and os.path.isdir(url):
 
            return
 

	
 
        unparsed_url = urllib.parse.urlunparse(parsed_url)
 
        if unparsed_url != url:
 
            raise urllib.error.URLError("Invalid url: '%s' normalizes to '%s'" % (url, unparsed_url))
 

	
 
        if parsed_url.scheme == 'git':
 
            # Mitigate problems elsewhere with incorrect handling of encoded paths.
 
            # Don't trust urllib.parse.unquote but be prepared for more flexible implementations elsewhere.
 
            # Space is the only allowed whitespace character - directly or % encoded. No other % or \ is allowed.
 
            for c in parsed_url.path.replace('%20', ' '):
 
                if c in '%\\':
 
                    raise urllib.error.URLError("Invalid escape character in path: '%s'" % c)
 
                if c.isspace() and c != ' ':
 
                    raise urllib.error.URLError("Invalid whitespace character in path: %r" % c)
 
            return
 

	
 
        if parsed_url.scheme not in ['http', 'https']:
 
            raise urllib.error.URLError("Unsupported protocol in URL %r" % url)
 

	
 
        url_obj = mercurial.util.url(safe_bytes(url))
 
        url_obj = hg_url(safe_bytes(url))
 
        test_uri, handlers = get_urllib_request_handlers(url_obj)
 
        if not test_uri.endswith(b'info/refs'):
 
            test_uri = test_uri.rstrip(b'/') + b'/info/refs'
 

	
 
        url_obj.passwd = b'*****'
 
        cleaned_uri = str(url_obj)
 

	
 
        o = urllib.request.build_opener(*handlers)
 
        o.addheaders = [('User-Agent', 'git/1.7.8.0')]  # fake some git
 

	
 
        req = urllib.request.Request(
 
            "%s?%s" % (
 
                safe_str(test_uri),
 
                urllib.parse.urlencode({"service": 'git-upload-pack'})
 
            ))
 

	
 
        try:
 
            resp = o.open(req)
 
            if resp.code != 200:
 
                raise Exception('Return Code is not 200')
 
        except Exception as e:
 
            # means it cannot be cloned
 
            raise urllib.error.URLError("[%s] org_exc: %s" % (cleaned_uri, e))
 

	
 
        # now detect if it's proper git repo
 
        gitdata = resp.read()
 
        if b'service=git-upload-pack' not in gitdata:
 
            raise urllib.error.URLError(
 
                "url [%s] does not look like an git" % cleaned_uri)
 

	
 
    def _get_repo(self, create, src_url=None, update_after_clone=False,
 
                  bare=False):
 
        if create and os.path.exists(self.path):
 
            raise RepositoryError("Location already exist")
 
        if src_url and not create:
 
            raise RepositoryError("Create should be set to True if src_url is "
 
                                  "given (clone operation creates repository)")
 
        try:
 
            if create and src_url:
 
                GitRepository._check_url(src_url)
 
                self.clone(src_url, update_after_clone, bare)
 
                return Repo(self.path)
 
            elif create:
 
                os.makedirs(self.path)
 
                if bare:
 
                    return Repo.init_bare(self.path)
 
                else:
 
                    return Repo.init(self.path)
 
            else:
 
                return Repo(self.path)
 
        except (NotGitRepository, OSError) as err:
 
            raise RepositoryError(err)
 

	
 
    def _get_all_revisions(self):
 
        # we must check if this repo is not empty, since later command
 
        # fails if it is. And it's cheaper to ask than throw the subprocess
 
        # errors
 
        try:
 
            self._repo.head()
 
        except KeyError:
 
            return []
 

	
 
        rev_filter = settings.GIT_REV_FILTER
 
        cmd = ['rev-list', rev_filter, '--reverse', '--date-order']
 
        try:
 
            so = self.run_git_command(cmd)
 
        except RepositoryError:
 
            # Can be raised for empty repositories
 
            return []
 
        return so.splitlines()
 

	
 
    def _get_all_revisions2(self):
 
        # alternate implementation using dulwich
 
        includes = [ascii_str(sha) for key, (sha, type_) in self._parsed_refs.items()
 
                    if type_ != b'T']
 
        return [c.commit.id for c in self._repo.get_walker(include=includes)]
 

	
 
    def _get_revision(self, revision):
 
        """
 
        Given any revision identifier, returns a 40 char string with revision hash.
 
        """
 
        if self._empty:
 
            raise EmptyRepositoryError("There are no changesets yet")
 

	
 
        if revision in (None, '', 'tip', 'HEAD', 'head', -1):
 
            revision = -1
 

	
 
        if isinstance(revision, int):
 
            try:
 
                return self.revisions[revision]
 
            except IndexError:
 
                msg = "Revision %r does not exist for %s" % (revision, self.name)
 
                raise ChangesetDoesNotExistError(msg)
 

	
 
        if isinstance(revision, str):
 
            if revision.isdigit() and (len(revision) < 12 or len(revision) == revision.count('0')):
kallithea/lib/vcs/backends/hg/repository.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    vcs.backends.hg.repository
 
    ~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
    Mercurial repository implementation.
 

	
 
    :created_on: Apr 8, 2010
 
    :copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak.
 
"""
 

	
 
import datetime
 
import logging
 
import os
 
import time
 
import urllib.error
 
import urllib.parse
 
import urllib.request
 
from collections import OrderedDict
 

	
 
import mercurial.commands
 
import mercurial.error
 
import mercurial.exchange
 
import mercurial.hg
 
import mercurial.hgweb
 
import mercurial.httppeer
 
import mercurial.localrepo
 
import mercurial.match
 
import mercurial.mdiff
 
import mercurial.node
 
import mercurial.patch
 
import mercurial.scmutil
 
import mercurial.sshpeer
 
import mercurial.tags
 
import mercurial.ui
 
import mercurial.unionrepo
 
import mercurial.util
 

	
 

	
 
try:
 
    from mercurial.utils.urlutil import url as hg_url
 
except ImportError:  # urlutil was introduced in Mercurial 5.8
 
    from mercurial.util import url as hg_url
 

	
 
from kallithea.lib.vcs.backends.base import BaseRepository, CollectionGenerator
 
from kallithea.lib.vcs.exceptions import (BranchDoesNotExistError, ChangesetDoesNotExistError, EmptyRepositoryError, RepositoryError, TagAlreadyExistError,
 
                                          TagDoesNotExistError, VCSError)
 
from kallithea.lib.vcs.utils import ascii_bytes, ascii_str, author_email, author_name, date_fromtimestamp, makedate, safe_bytes, safe_str
 
from kallithea.lib.vcs.utils.helpers import get_urllib_request_handlers
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.utils.paths import abspath
 

	
 
from . import changeset, inmemory, workdir
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class MercurialRepository(BaseRepository):
 
    """
 
    Mercurial repository backend
 
    """
 
    DEFAULT_BRANCH_NAME = 'default'
 
    scm = 'hg'
 

	
 
    def __init__(self, repo_path, create=False, baseui=None, src_url=None,
 
                 update_after_clone=False):
 
        """
 
        Raises RepositoryError if repository could not be find at the given
 
        ``repo_path``.
 

	
 
        :param repo_path: local path of the repository
 
        :param create=False: if set to True, would try to create repository if
 
           it does not exist rather than raising exception
 
        :param baseui=None: user data
 
        :param src_url=None: would try to clone repository from given location
 
        :param update_after_clone=False: sets update of working copy after
 
          making a clone
 
        """
 

	
 
        if not isinstance(repo_path, str):
 
            raise VCSError('Mercurial backend requires repository path to '
 
                           'be instance of <str> got %s instead' %
 
                           type(repo_path))
 
        self.path = abspath(repo_path)
 
        self.baseui = baseui or mercurial.ui.ui()
 
        # We've set path and ui, now we can set _repo itself
 
        self._repo = self._get_repo(create, src_url, update_after_clone)
 

	
 
    @property
 
    def _empty(self):
 
        """
 
        Checks if repository is empty ie. without any changesets
 
        """
 
        # TODO: Following raises errors when using InMemoryChangeset...
 
        # return len(self._repo.changelog) == 0
 
        return len(self.revisions) == 0
 

	
 
    @LazyProperty
 
    def revisions(self):
 
        """
 
        Returns list of revisions' ids, in ascending order.  Being lazy
 
        attribute allows external tools to inject shas from cache.
 
        """
 
        return self._get_all_revisions()
 

	
 
    @LazyProperty
 
    def name(self):
 
        return os.path.basename(self.path)
 

	
 
    @LazyProperty
 
    def branches(self):
 
        return self._get_branches()
 

	
 
    @LazyProperty
 
    def closed_branches(self):
 
        return self._get_branches(normal=False, closed=True)
 

	
 
    @LazyProperty
 
    def allbranches(self):
 
        """
 
        List all branches, including closed branches.
 
        """
 
        return self._get_branches(closed=True)
 

	
 
    def _get_branches(self, normal=True, closed=False):
 
        """
 
        Gets branches for this repository
 
        Returns only not closed branches by default
 

	
 
        :param closed: return also closed branches for mercurial
 
        :param normal: return also normal branches
 
        """
 

	
 
        if self._empty:
 
            return {}
 

	
 
        bt = OrderedDict()
 
        for bn, _heads, node, isclosed in sorted(self._repo.branchmap().iterbranches()):
 
@@ -243,193 +248,193 @@ class MercurialRepository(BaseRepository
 
        Returns (git like) *diff*, as plain text. Shows changes introduced by
 
        ``rev2`` since ``rev1``.
 

	
 
        :param rev1: Entry point from which diff is shown. Can be
 
          ``self.EMPTY_CHANGESET`` - in this case, patch showing all
 
          the changes since empty state of the repository until ``rev2``
 
        :param rev2: Until which revision changes should be shown.
 
        :param ignore_whitespace: If set to ``True``, would not show whitespace
 
          changes. Defaults to ``False``.
 
        :param context: How many lines before/after changed lines should be
 
          shown. Defaults to ``3``. If negative value is passed-in, it will be
 
          set to ``0`` instead.
 
        """
 

	
 
        # Negative context values make no sense, and will result in
 
        # errors. Ensure this does not happen.
 
        if context < 0:
 
            context = 0
 

	
 
        if hasattr(rev1, 'raw_id'):
 
            rev1 = getattr(rev1, 'raw_id')
 

	
 
        if hasattr(rev2, 'raw_id'):
 
            rev2 = getattr(rev2, 'raw_id')
 

	
 
        # Check if given revisions are present at repository (may raise
 
        # ChangesetDoesNotExistError)
 
        if rev1 != self.EMPTY_CHANGESET:
 
            self.get_changeset(rev1)
 
        self.get_changeset(rev2)
 
        if path:
 
            file_filter = mercurial.match.exact([safe_bytes(path)])
 
        else:
 
            file_filter = None
 

	
 
        return b''.join(mercurial.patch.diff(self._repo, rev1, rev2, match=file_filter,
 
                          opts=mercurial.mdiff.diffopts(git=True,
 
                                        showfunc=True,
 
                                        ignorews=ignore_whitespace,
 
                                        context=context)))
 

	
 
    @staticmethod
 
    def _check_url(url, repoui=None):
 
        r"""
 
        Raise URLError if url doesn't seem like a valid safe Hg URL. We
 
        only allow http, https, ssh, and hg-git URLs.
 

	
 
        For http, https and git URLs, make a connection and probe to see if it is valid.
 

	
 
        On failures it'll raise urllib2.HTTPError, exception is also thrown
 
        when the return code is non 200
 

	
 
        >>> MercurialRepository._check_url('file:///repo')
 

	
 
        >>> MercurialRepository._check_url('http://example.com:65537/repo')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Error parsing URL: 'http://example.com:65537/repo'>
 
        >>> MercurialRepository._check_url('foo')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Unsupported protocol in URL 'foo'>
 
        >>> MercurialRepository._check_url('git+ssh://example.com/my%20fine repo')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Unsupported protocol in URL 'git+ssh://example.com/my%20fine repo'>
 
        >>> MercurialRepository._check_url('svn+http://example.com/repo')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Unsupported protocol in URL 'svn+http://example.com/repo'>
 
        """
 
        try:
 
            parsed_url = urllib.parse.urlparse(url)
 
            parsed_url.port  # trigger netloc parsing which might raise ValueError
 
        except ValueError:
 
            raise urllib.error.URLError("Error parsing URL: %r" % url)
 

	
 
        # check first if it's not an local url
 
        if os.path.isabs(url) and os.path.isdir(url) or parsed_url.scheme == 'file':
 
            # When creating repos, _get_url will use file protocol for local paths
 
            return
 

	
 
        if parsed_url.scheme not in ['http', 'https', 'ssh', 'git+http', 'git+https']:
 
            raise urllib.error.URLError("Unsupported protocol in URL %r" % url)
 

	
 
        url = safe_bytes(url)
 

	
 
        if parsed_url.scheme == 'ssh':
 
            # in case of invalid uri or authentication issues, sshpeer will
 
            # throw an exception.
 
            mercurial.sshpeer.instance(repoui or mercurial.ui.ui(), url, False).lookup(b'tip')
 
            return
 

	
 
        if '+' in parsed_url.scheme:  # strip 'git+' for hg-git URLs
 
            url = url.split(b'+', 1)[1]
 

	
 
        url_obj = mercurial.util.url(url)
 
        url_obj = hg_url(url)
 
        test_uri, handlers = get_urllib_request_handlers(url_obj)
 

	
 
        url_obj.passwd = b'*****'
 
        cleaned_uri = str(url_obj)
 

	
 
        o = urllib.request.build_opener(*handlers)
 
        o.addheaders = [('Content-Type', 'application/mercurial-0.1'),
 
                        ('Accept', 'application/mercurial-0.1')]
 

	
 
        req = urllib.request.Request(
 
            "%s?%s" % (
 
                safe_str(test_uri),
 
                urllib.parse.urlencode({
 
                    'cmd': 'between',
 
                    'pairs': "%s-%s" % ('0' * 40, '0' * 40),
 
                })
 
            ))
 

	
 
        try:
 
            resp = o.open(req)
 
            if resp.code != 200:
 
                raise Exception('Return Code is not 200')
 
        except Exception as e:
 
            # means it cannot be cloned
 
            raise urllib.error.URLError("[%s] org_exc: %s" % (cleaned_uri, e))
 

	
 
        if parsed_url.scheme in ['http', 'https']:  # skip git+http://... etc
 
            # now check if it's a proper hg repo
 
            try:
 
                mercurial.httppeer.instance(repoui or mercurial.ui.ui(), url, False).lookup(b'tip')
 
            except Exception as e:
 
                raise urllib.error.URLError(
 
                    "url [%s] does not look like an hg repo org_exc: %s"
 
                    % (cleaned_uri, e))
 

	
 
    def _get_repo(self, create, src_url=None, update_after_clone=False):
 
        """
 
        Function will check for mercurial repository in given path and return
 
        a localrepo object. If there is no repository in that path it will
 
        raise an exception unless ``create`` parameter is set to True - in
 
        that case repository would be created and returned.
 
        If ``src_url`` is given, would try to clone repository from the
 
        location at given clone_point. Additionally it'll make update to
 
        working copy accordingly to ``update_after_clone`` flag
 
        """
 
        try:
 
            if src_url:
 
                url = self._get_url(src_url)
 
                opts = {}
 
                if not update_after_clone:
 
                    opts.update({'noupdate': True})
 
                MercurialRepository._check_url(url, self.baseui)
 
                mercurial.commands.clone(self.baseui, safe_bytes(url), safe_bytes(self.path), **opts)
 

	
 
                # Don't try to create if we've already cloned repo
 
                create = False
 
            return mercurial.localrepo.instance(self.baseui, safe_bytes(self.path), create=create)
 
        except (mercurial.error.Abort, mercurial.error.RepoError) as err:
 
            if create:
 
                msg = "Cannot create repository at %s. Original error was %s" \
 
                    % (self.name, err)
 
            else:
 
                msg = "Not valid repository at %s. Original error was %s" \
 
                    % (self.name, err)
 
            raise RepositoryError(msg)
 

	
 
    @LazyProperty
 
    def in_memory_changeset(self):
 
        return inmemory.MercurialInMemoryChangeset(self)
 

	
 
    @LazyProperty
 
    def description(self):
 
        _desc = self._repo.ui.config(b'web', b'description', None, untrusted=True)
 
        return safe_str(_desc or b'unknown')
 

	
 
    @LazyProperty
 
    def last_change(self):
 
        """
 
        Returns last change made on this repository as datetime object
 
        """
 
        return date_fromtimestamp(self._get_mtime(), makedate()[1])
 

	
 
    def _get_mtime(self):
 
        try:
 
            return time.mktime(self.get_changeset().date.timetuple())
 
        except RepositoryError:
 
            # fallback to filesystem
 
            cl_path = os.path.join(self.path, '.hg', "00changelog.i")
 
            st_path = os.path.join(self.path, '.hg', "store")
 
            if os.path.exists(cl_path):
 
                return os.stat(cl_path).st_mtime
 
            else:
 
                return os.stat(st_path).st_mtime
 

	
 
    def _get_revision(self, revision):
 
        """
kallithea/lib/vcs/utils/helpers.py
Show inline comments
 
@@ -152,114 +152,114 @@ def parse_changesets(text):
 
        >>> parse_changesets('aaabbb..cccddd')
 
        {'start': 'aaabbb', 'end': 'cccddd', 'main': None}
 

	
 
    """
 
    text = text.strip()
 
    CID_RE = r'[a-zA-Z0-9]+'
 
    if '..' not in text:
 
        m = re.match(r'^(?P<cid>%s)$' % CID_RE, text)
 
        if m:
 
            return {
 
                'start': None,
 
                'main': text,
 
                'end': None,
 
            }
 
    else:
 
        RE = r'^(?P<start>%s)?\.{2,3}(?P<end>%s)?$' % (CID_RE, CID_RE)
 
        m = re.match(RE, text)
 
        if m:
 
            result = m.groupdict()
 
            result['main'] = None
 
            return result
 
    raise ValueError("IDs not recognized")
 

	
 

	
 
def parse_datetime(text):
 
    """
 
    Parses given text and returns ``datetime.datetime`` instance or raises
 
    ``ValueError``.
 

	
 
    :param text: string of desired date/datetime or something more verbose,
 
      like *yesterday*, *2weeks 3days*, etc.
 
    """
 

	
 
    text = text.strip().lower()
 

	
 
    INPUT_FORMATS = (
 
        '%Y-%m-%d %H:%M:%S',
 
        '%Y-%m-%d %H:%M',
 
        '%Y-%m-%d',
 
        '%m/%d/%Y %H:%M:%S',
 
        '%m/%d/%Y %H:%M',
 
        '%m/%d/%Y',
 
        '%m/%d/%y %H:%M:%S',
 
        '%m/%d/%y %H:%M',
 
        '%m/%d/%y',
 
    )
 
    for format in INPUT_FORMATS:
 
        try:
 
            return datetime.datetime(*time.strptime(text, format)[:6])
 
        except ValueError:
 
            pass
 

	
 
    # Try descriptive texts
 
    if text == 'tomorrow':
 
        future = datetime.datetime.now() + datetime.timedelta(days=1)
 
        args = future.timetuple()[:3] + (23, 59, 59)
 
        return datetime.datetime(*args)
 
    elif text == 'today':
 
        return datetime.datetime(*datetime.datetime.today().timetuple()[:3])
 
    elif text == 'now':
 
        return datetime.datetime.now()
 
    elif text == 'yesterday':
 
        past = datetime.datetime.now() - datetime.timedelta(days=1)
 
        return datetime.datetime(*past.timetuple()[:3])
 
    else:
 
        days = 0
 
        matched = re.match(
 
            r'^((?P<weeks>\d+) ?w(eeks?)?)? ?((?P<days>\d+) ?d(ays?)?)?$', text)
 
        if matched:
 
            groupdict = matched.groupdict()
 
            if groupdict['days']:
 
                days += int(matched.groupdict()['days'])
 
            if groupdict['weeks']:
 
                days += int(matched.groupdict()['weeks']) * 7
 
            past = datetime.datetime.now() - datetime.timedelta(days=days)
 
            return datetime.datetime(*past.timetuple()[:3])
 

	
 
    raise ValueError('Wrong date: "%s"' % text)
 

	
 

	
 
def get_dict_for_attrs(obj, attrs):
 
    """
 
    Returns dictionary for each attribute from given ``obj``.
 
    """
 
    data = {}
 
    for attr in attrs:
 
        data[attr] = getattr(obj, attr)
 
    return data
 

	
 
def get_urllib_request_handlers(url_obj):
 
    handlers = []
 
    test_uri, authinfo = url_obj.authinfo()
 

	
 
    if authinfo:
 
        # authinfo is a tuple (realm, uris, user, password) where 'uris' itself
 
        # is a tuple of URIs.
 
        # If url_obj is obtained via mercurial.util.url, the obtained authinfo
 
        # If url_obj is obtained via mercurial urlutil, the obtained authinfo
 
        # values will be bytes, e.g.
 
        #    (None, (b'http://127.0.0.1/repo', b'127.0.0.1'), b'user', b'pass')
 
        # However, urllib expects strings, not bytes, so we must convert them.
 

	
 
        # create a password manager
 
        passmgr = urllib.request.HTTPPasswordMgrWithDefaultRealm()
 
        passmgr.add_password(
 
            safe_str(authinfo[0]) if authinfo[0] else None, # realm
 
            tuple(safe_str(x) for x in authinfo[1]),        # uris
 
            safe_str(authinfo[2]),                          # user
 
            safe_str(authinfo[3]),                          # password
 
        )
 

	
 
        handlers.extend((mercurial.url.httpbasicauthhandler(passmgr),
 
                         mercurial.url.httpdigestauthhandler(passmgr)))
 

	
 
    return test_uri, handlers
0 comments (0 inline, 0 general)