Changeset - f0fbb0fe4462
[Not reviewed]
default
0 1 0
Mads Kiilerich (mads) - 5 years ago 2021-05-18 00:58:06
mads@kiilerich.com
git: update check for invalid URL characters to work with Python versions that include an attempt at fixing the very same problem

With changes like
https://github.com/python/cpython/commit/76cd81d60310d65d01f9d7b48a8985d8ab89c8b4
making it to Python 3.10 and being backported to previous Python versions, the
approach in a8a51a3bdb61 no longer works when combined with
urllib.parse.urlparse in d2f59de17bef: path will never contain the invalid
characters.

To catch this case anyway, add a new check to verify that the parsed URL can
roundtrip back to the original representation with urllib.parse.urlunparse .

The actual exception might vary, but one of them should always fire.

There is a risk that the new check will reject some URLs that somehow isn't
normalized. No such cases have been found yet.
1 file changed with 8 insertions and 0 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/vcs/backends/git/repository.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    vcs.backends.git.repository
 
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
    Git repository implementation.
 

	
 
    :created_on: Apr 8, 2010
 
    :copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak.
 
"""
 

	
 
import errno
 
import logging
 
import os
 
import re
 
import time
 
import urllib.error
 
import urllib.parse
 
import urllib.request
 
from collections import OrderedDict
 

	
 
import mercurial.util  # import url as hg_url
 
from dulwich.client import SubprocessGitClient
 
from dulwich.config import ConfigFile
 
from dulwich.objects import Tag
 
from dulwich.repo import NotGitRepository, Repo
 
from dulwich.server import update_server_info
 

	
 
from kallithea.lib.vcs import subprocessio
 
from kallithea.lib.vcs.backends.base import BaseRepository, CollectionGenerator
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import (BranchDoesNotExistError, ChangesetDoesNotExistError, EmptyRepositoryError, RepositoryError, TagAlreadyExistError,
 
                                          TagDoesNotExistError)
 
from kallithea.lib.vcs.utils import ascii_bytes, ascii_str, date_fromtimestamp, makedate, safe_bytes, safe_str
 
from kallithea.lib.vcs.utils.helpers import get_urllib_request_handlers
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.utils.paths import abspath, get_user_home
 

	
 
from . import changeset, inmemory, workdir
 

	
 

	
 
SHA_PATTERN = re.compile(r'^([0-9a-fA-F]{12}|[0-9a-fA-F]{40})$')
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class GitRepository(BaseRepository):
 
    """
 
    Git repository backend.
 
    """
 
    DEFAULT_BRANCH_NAME = 'master'
 
    scm = 'git'
 

	
 
    def __init__(self, repo_path, create=False, src_url=None,
 
                 update_after_clone=False, bare=False, baseui=None):
 
        baseui  # unused
 
        self.path = abspath(repo_path)
 
        self.repo = self._get_repo(create, src_url, update_after_clone, bare)
 
        self.bare = self.repo.bare
 

	
 
    @property
 
    def _config_files(self):
 
        return [
 
            self.bare and abspath(self.path, 'config')
 
                      or abspath(self.path, '.git', 'config'),
 
             abspath(get_user_home(), '.gitconfig'),
 
         ]
 

	
 
    @property
 
    def _repo(self):
 
        return self.repo
 

	
 
    @property
 
    def head(self):
 
        try:
 
            return self._repo.head()
 
        except KeyError:
 
            return None
 

	
 
    @property
 
    def _empty(self):
 
        """
 
        Checks if repository is empty ie. without any changesets
 
        """
 

	
 
        try:
 
            self.revisions[0]
 
        except (KeyError, IndexError):
 
            return True
 
        return False
 

	
 
    @LazyProperty
 
    def revisions(self):
 
        """
 
        Returns list of revisions' ids, in ascending order.  Being lazy
 
        attribute allows external tools to inject shas from cache.
 
        """
 
        return self._get_all_revisions()
 

	
 
    @classmethod
 
    def _run_git_command(cls, cmd, cwd=None):
 
        """
 
        Runs given ``cmd`` as git command and returns output bytes in a tuple
 
        (stdout, stderr) ... or raise RepositoryError.
 

	
 
        :param cmd: git command to be executed
 
        :param cwd: passed directly to subprocess
 
        """
 
        # need to clean fix GIT_DIR !
 
        gitenv = dict(os.environ)
 
        gitenv.pop('GIT_DIR', None)
 
        gitenv['GIT_CONFIG_NOGLOBAL'] = '1'
 

	
 
        assert isinstance(cmd, list), cmd
 
        cmd = [settings.GIT_EXECUTABLE_PATH, '-c', 'core.quotepath=false'] + cmd
 
        try:
 
            p = subprocessio.SubprocessIOChunker(cmd, cwd=cwd, env=gitenv, shell=False)
 
        except (EnvironmentError, OSError) as err:
 
            # output from the failing process is in str(EnvironmentError)
 
            msg = ("Couldn't run git command %s.\n"
 
                   "Subprocess failed with '%s': %s\n" %
 
                   (cmd, type(err).__name__, err)
 
            ).strip()
 
            log.error(msg)
 
            raise RepositoryError(msg)
 

	
 
        try:
 
            stdout = b''.join(p.output)
 
            stderr = b''.join(p.error)
 
        finally:
 
            p.close()
 
        # TODO: introduce option to make commands fail if they have any stderr output?
 
        if stderr:
 
            log.debug('stderr from %s:\n%s', cmd, stderr)
 
        else:
 
            log.debug('stderr from %s: None', cmd)
 
        return stdout, stderr
 

	
 
    def run_git_command(self, cmd):
 
        """
 
        Runs given ``cmd`` as git command with cwd set to current repo.
 
        Returns stdout as unicode str ... or raise RepositoryError.
 
        """
 
        cwd = None
 
        if os.path.isdir(self.path):
 
            cwd = self.path
 
        stdout, _stderr = self._run_git_command(cmd, cwd=cwd)
 
        return safe_str(stdout)
 

	
 
    @staticmethod
 
    def _check_url(url):
 
        r"""
 
        Raise URLError if url doesn't seem like a valid safe Git URL. We
 
        only allow http, https, git, and ssh URLs.
 

	
 
        For http and https URLs, make a connection and probe to see if it is valid.
 

	
 
        >>> GitRepository._check_url('git://example.com/my%20fine repo')
 

	
 
        >>> GitRepository._check_url('http://example.com:65537/repo')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Error parsing URL: 'http://example.com:65537/repo'>
 
        >>> GitRepository._check_url('foo')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Unsupported protocol in URL 'foo'>
 
        >>> GitRepository._check_url('file:///repo')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Unsupported protocol in URL 'file:///repo'>
 
        >>> GitRepository._check_url('git+http://example.com/repo')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Unsupported protocol in URL 'git+http://example.com/repo'>
 
        >>> GitRepository._check_url('git://example.com/%09')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Invalid escape character in path: '%'>
 
        >>> GitRepository._check_url('git://example.com/%x00')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Invalid escape character in path: '%'>
 
        >>> GitRepository._check_url(r'git://example.com/\u0009')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Invalid escape character in path: '\'>
 
        >>> GitRepository._check_url(r'git://example.com/\t')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Invalid escape character in path: '\'>
 
        >>> GitRepository._check_url('git://example.com/\t')
 
        Traceback (most recent call last):
 
        ...
 
        urllib.error.URLError: <urlopen error Invalid ...>
 

	
 
        The failure above will be one of, depending on the level of WhatWG support:
 
        urllib.error.URLError: <urlopen error Invalid whitespace character in path: '\t'>
 
        urllib.error.URLError: <urlopen error Invalid url: 'git://example.com/	' normalizes to 'git://example.com/'>
 
        """
 
        try:
 
            parsed_url = urllib.parse.urlparse(url)
 
            parsed_url.port  # trigger netloc parsing which might raise ValueError
 
        except ValueError:
 
            raise urllib.error.URLError("Error parsing URL: %r" % url)
 

	
 
        # check first if it's not an local url
 
        if os.path.isabs(url) and os.path.isdir(url):
 
            return
 

	
 
        unparsed_url = urllib.parse.urlunparse(parsed_url)
 
        if unparsed_url != url:
 
            raise urllib.error.URLError("Invalid url: '%s' normalizes to '%s'" % (url, unparsed_url))
 

	
 
        if parsed_url.scheme == 'git':
 
            # Mitigate problems elsewhere with incorrect handling of encoded paths.
 
            # Don't trust urllib.parse.unquote but be prepared for more flexible implementations elsewhere.
 
            # Space is the only allowed whitespace character - directly or % encoded. No other % or \ is allowed.
 
            for c in parsed_url.path.replace('%20', ' '):
 
                if c in '%\\':
 
                    raise urllib.error.URLError("Invalid escape character in path: '%s'" % c)
 
                if c.isspace() and c != ' ':
 
                    raise urllib.error.URLError("Invalid whitespace character in path: %r" % c)
 
            return
 

	
 
        if parsed_url.scheme not in ['http', 'https']:
 
            raise urllib.error.URLError("Unsupported protocol in URL %r" % url)
 

	
 
        url_obj = mercurial.util.url(safe_bytes(url))
 
        test_uri, handlers = get_urllib_request_handlers(url_obj)
 
        if not test_uri.endswith(b'info/refs'):
 
            test_uri = test_uri.rstrip(b'/') + b'/info/refs'
 

	
 
        url_obj.passwd = b'*****'
 
        cleaned_uri = str(url_obj)
 

	
 
        o = urllib.request.build_opener(*handlers)
 
        o.addheaders = [('User-Agent', 'git/1.7.8.0')]  # fake some git
 

	
 
        req = urllib.request.Request(
 
            "%s?%s" % (
 
                safe_str(test_uri),
 
                urllib.parse.urlencode({"service": 'git-upload-pack'})
 
            ))
 

	
 
        try:
 
            resp = o.open(req)
 
            if resp.code != 200:
 
                raise Exception('Return Code is not 200')
 
        except Exception as e:
 
            # means it cannot be cloned
 
            raise urllib.error.URLError("[%s] org_exc: %s" % (cleaned_uri, e))
 

	
 
        # now detect if it's proper git repo
 
        gitdata = resp.read()
 
        if b'service=git-upload-pack' not in gitdata:
 
            raise urllib.error.URLError(
 
                "url [%s] does not look like an git" % cleaned_uri)
 

	
 
    def _get_repo(self, create, src_url=None, update_after_clone=False,
 
                  bare=False):
 
        if create and os.path.exists(self.path):
 
            raise RepositoryError("Location already exist")
 
        if src_url and not create:
 
            raise RepositoryError("Create should be set to True if src_url is "
 
                                  "given (clone operation creates repository)")
 
        try:
 
            if create and src_url:
 
                GitRepository._check_url(src_url)
 
                self.clone(src_url, update_after_clone, bare)
 
                return Repo(self.path)
 
            elif create:
 
                os.makedirs(self.path)
 
                if bare:
 
                    return Repo.init_bare(self.path)
 
                else:
 
                    return Repo.init(self.path)
 
            else:
 
                return Repo(self.path)
 
        except (NotGitRepository, OSError) as err:
 
            raise RepositoryError(err)
 

	
 
    def _get_all_revisions(self):
 
        # we must check if this repo is not empty, since later command
 
        # fails if it is. And it's cheaper to ask than throw the subprocess
 
        # errors
 
        try:
 
            self._repo.head()
 
        except KeyError:
 
            return []
 

	
 
        rev_filter = settings.GIT_REV_FILTER
 
        cmd = ['rev-list', rev_filter, '--reverse', '--date-order']
 
        try:
 
            so = self.run_git_command(cmd)
 
        except RepositoryError:
 
            # Can be raised for empty repositories
 
            return []
 
        return so.splitlines()
 

	
 
    def _get_all_revisions2(self):
 
        # alternate implementation using dulwich
 
        includes = [ascii_str(sha) for key, (sha, type_) in self._parsed_refs.items()
 
                    if type_ != b'T']
 
        return [c.commit.id for c in self._repo.get_walker(include=includes)]
 

	
 
    def _get_revision(self, revision):
 
        """
 
        Given any revision identifier, returns a 40 char string with revision hash.
 
        """
 
        if self._empty:
 
            raise EmptyRepositoryError("There are no changesets yet")
 

	
 
        if revision in (None, '', 'tip', 'HEAD', 'head', -1):
 
            revision = -1
 

	
 
        if isinstance(revision, int):
 
            try:
 
                return self.revisions[revision]
 
            except IndexError:
 
                msg = "Revision %r does not exist for %s" % (revision, self.name)
 
                raise ChangesetDoesNotExistError(msg)
 

	
 
        if isinstance(revision, str):
 
            if revision.isdigit() and (len(revision) < 12 or len(revision) == revision.count('0')):
 
                try:
 
                    return self.revisions[int(revision)]
 
                except IndexError:
 
                    msg = "Revision %r does not exist for %s" % (revision, self)
 
                    raise ChangesetDoesNotExistError(msg)
 

	
 
            # get by branch/tag name
 
            _ref_revision = self._parsed_refs.get(safe_bytes(revision))
 
            if _ref_revision:  # and _ref_revision[1] in [b'H', b'RH', b'T']:
 
                return ascii_str(_ref_revision[0])
 

	
 
            if revision in self.revisions:
 
                return revision
 

	
 
            # maybe it's a tag ? we don't have them in self.revisions
 
            if revision in self.tags.values():
 
                return revision
 

	
 
            if SHA_PATTERN.match(revision):
 
                msg = "Revision %r does not exist for %s" % (revision, self.name)
 
                raise ChangesetDoesNotExistError(msg)
 

	
 
        raise ChangesetDoesNotExistError("Given revision %r not recognized" % revision)
 

	
 
    def get_ref_revision(self, ref_type, ref_name):
 
        """
 
        Returns ``GitChangeset`` object representing repository's
 
        changeset at the given ``revision``.
 
        """
 
        return self._get_revision(ref_name)
 

	
 
    def _get_archives(self, archive_name='tip'):
 

	
 
        for i in [('zip', '.zip'), ('gz', '.tar.gz'), ('bz2', '.tar.bz2')]:
 
            yield {"type": i[0], "extension": i[1], "node": archive_name}
 

	
 
    def _get_url(self, url):
 
        """
 
        Returns normalized url. If schema is not given, would fall to
 
        filesystem (``file:///``) schema.
 
        """
 
        if url != 'default' and '://' not in url:
 
            url = ':///'.join(('file', url))
 
        return url
 

	
 
    @LazyProperty
 
    def name(self):
 
        return os.path.basename(self.path)
 

	
 
    @LazyProperty
 
    def last_change(self):
 
        """
 
        Returns last change made on this repository as datetime object
 
        """
 
        return date_fromtimestamp(self._get_mtime(), makedate()[1])
 

	
 
    def _get_mtime(self):
 
        try:
 
            return time.mktime(self.get_changeset().date.timetuple())
 
        except RepositoryError:
 
            idx_loc = '' if self.bare else '.git'
 
            # fallback to filesystem
 
            in_path = os.path.join(self.path, idx_loc, "index")
 
            he_path = os.path.join(self.path, idx_loc, "HEAD")
 
            if os.path.exists(in_path):
 
                return os.stat(in_path).st_mtime
 
            else:
 
                return os.stat(he_path).st_mtime
 

	
 
    @LazyProperty
 
    def description(self):
 
        return safe_str(self._repo.get_description() or b'unknown')
 

	
 
    @property
 
    def branches(self):
 
        if not self.revisions:
 
            return {}
 
        _branches = [(safe_str(key), ascii_str(sha))
 
                     for key, (sha, type_) in self._parsed_refs.items() if type_ == b'H']
 
        return OrderedDict(sorted(_branches, key=(lambda ctx: ctx[0]), reverse=False))
 

	
 
    @LazyProperty
 
    def closed_branches(self):
 
        return {}
 

	
 
    @LazyProperty
 
    def tags(self):
 
        return self._get_tags()
 

	
 
    def _get_tags(self):
 
        if not self.revisions:
 
            return {}
 
        _tags = [(safe_str(key), ascii_str(sha))
 
                 for key, (sha, type_) in self._parsed_refs.items() if type_ == b'T']
 
        return OrderedDict(sorted(_tags, key=(lambda ctx: ctx[0]), reverse=True))
 

	
 
    def tag(self, name, user, revision=None, message=None, date=None,
 
            **kwargs):
 
        """
 
        Creates and returns a tag for the given ``revision``.
 

	
 
        :param name: name for new tag
 
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
 
        :param revision: changeset id for which new tag would be created
 
        :param message: message of the tag's commit
 
        :param date: date of tag's commit
 

	
 
        :raises TagAlreadyExistError: if tag with same name already exists
 
        """
 
        if name in self.tags:
 
            raise TagAlreadyExistError("Tag %s already exists" % name)
 
        changeset = self.get_changeset(revision)
 
        message = message or "Added tag %s for commit %s" % (name,
 
            changeset.raw_id)
 
        self._repo.refs[b"refs/tags/%s" % safe_bytes(name)] = changeset._commit.id
 

	
 
        self._parsed_refs = self._get_parsed_refs()
 
        self.tags = self._get_tags()
 
        return changeset
 

	
 
    def remove_tag(self, name, user, message=None, date=None):
 
        """
 
        Removes tag with the given ``name``.
 

	
 
        :param name: name of the tag to be removed
 
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
 
        :param message: message of the tag's removal commit
 
        :param date: date of tag's removal commit
 

	
 
        :raises TagDoesNotExistError: if tag with given name does not exists
 
        """
 
        if name not in self.tags:
 
            raise TagDoesNotExistError("Tag %s does not exist" % name)
 
        # self._repo.refs is a DiskRefsContainer, and .path gives the full absolute path of '.git'
 
        tagpath = os.path.join(safe_str(self._repo.refs.path), 'refs', 'tags', name)
 
        try:
 
            os.remove(tagpath)
 
            self._parsed_refs = self._get_parsed_refs()
 
            self.tags = self._get_tags()
 
        except OSError as e:
 
            raise RepositoryError(e.strerror)
 

	
 
    @LazyProperty
 
    def bookmarks(self):
 
        """
 
        Gets bookmarks for this repository
 
        """
 
        return {}
 

	
 
    @LazyProperty
 
    def _parsed_refs(self):
 
        return self._get_parsed_refs()
 

	
 
    def _get_parsed_refs(self):
 
        """Return refs as a dict, like:
 
        { b'v0.2.0': [b'599ba911aa24d2981225f3966eb659dfae9e9f30', b'T'] }
 
        """
 
        _repo = self._repo
 
        refs = _repo.get_refs()
 
        keys = [(b'refs/heads/', b'H'),
 
                (b'refs/remotes/origin/', b'RH'),
 
                (b'refs/tags/', b'T')]
 
        _refs = {}
 
        for ref, sha in refs.items():
 
            for k, type_ in keys:
 
                if ref.startswith(k):
 
                    _key = ref[len(k):]
 
                    if type_ == b'T':
 
                        obj = _repo.get_object(sha)
 
                        if isinstance(obj, Tag):
 
                            sha = _repo.get_object(sha).object[1]
 
                    _refs[_key] = [sha, type_]
 
                    break
 
        return _refs
 

	
 
    def _heads(self, reverse=False):
 
        refs = self._repo.get_refs()
 
        heads = {}
 

	
 
        for key, val in refs.items():
 
            for ref_key in [b'refs/heads/', b'refs/remotes/origin/']:
 
                if key.startswith(ref_key):
 
                    n = key[len(ref_key):]
 
                    if n not in [b'HEAD']:
 
                        heads[n] = val
 

	
 
        return heads if reverse else dict((y, x) for x, y in heads.items())
 

	
 
    def get_changeset(self, revision=None):
 
        """
 
        Returns ``GitChangeset`` object representing commit from git repository
 
        at the given revision or head (most recent commit) if None given.
 
        """
 
        if isinstance(revision, changeset.GitChangeset):
 
            return revision
 
        return changeset.GitChangeset(repository=self, revision=self._get_revision(revision))
 

	
 
    def get_changesets(self, start=None, end=None, start_date=None,
 
           end_date=None, branch_name=None, reverse=False, max_revisions=None):
 
        """
 
        Returns iterator of ``GitChangeset`` objects from start to end (both
 
        are inclusive), in ascending date order (unless ``reverse`` is set).
 

	
 
        :param start: changeset ID, as str; first returned changeset
 
        :param end: changeset ID, as str; last returned changeset
 
        :param start_date: if specified, changesets with commit date less than
 
          ``start_date`` would be filtered out from returned set
 
        :param end_date: if specified, changesets with commit date greater than
 
          ``end_date`` would be filtered out from returned set
 
        :param branch_name: if specified, changesets not reachable from given
 
          branch would be filtered out from returned set
 
        :param reverse: if ``True``, returned generator would be reversed
 
          (meaning that returned changesets would have descending date order)
 

	
 
        :raise BranchDoesNotExistError: If given ``branch_name`` does not
 
            exist.
 
        :raise ChangesetDoesNotExistError: If changeset for given ``start`` or
 
          ``end`` could not be found.
 

	
 
        """
 
        if branch_name and branch_name not in self.branches:
 
            raise BranchDoesNotExistError("Branch '%s' not found"
 
                                          % branch_name)
 
        # actually we should check now if it's not an empty repo to not spaw
 
        # subprocess commands
 
        if self._empty:
 
            raise EmptyRepositoryError("There are no changesets yet")
 

	
 
        # %H at format means (full) commit hash, initial hashes are retrieved
 
        # in ascending date order
 
        cmd = ['log', '--date-order', '--reverse', '--pretty=format:%H']
 
        if max_revisions:
 
            cmd += ['--max-count=%s' % max_revisions]
 
        if start_date:
 
            cmd += ['--since', start_date.strftime('%m/%d/%y %H:%M:%S')]
 
        if end_date:
 
            cmd += ['--until', end_date.strftime('%m/%d/%y %H:%M:%S')]
 
        if branch_name:
 
            cmd.append(branch_name)
 
        else:
 
            cmd.append(settings.GIT_REV_FILTER)
 

	
 
        revs = self.run_git_command(cmd).splitlines()
 
        start_pos = 0
 
        end_pos = len(revs)
 
        if start:
 
            _start = self._get_revision(start)
 
            try:
 
                start_pos = revs.index(_start)
 
            except ValueError:
 
                pass
 

	
 
        if end is not None:
 
            _end = self._get_revision(end)
 
            try:
 
                end_pos = revs.index(_end)
 
            except ValueError:
 
                pass
 

	
 
        if None not in [start, end] and start_pos > end_pos:
 
            raise RepositoryError('start cannot be after end')
 

	
 
        if end_pos is not None:
 
            end_pos += 1
 

	
 
        revs = revs[start_pos:end_pos]
 
        if reverse:
 
            revs.reverse()
 

	
 
        return CollectionGenerator(self, revs)
 

	
 
    def get_diff_changesets(self, org_rev, other_repo, other_rev):
 
        """
 
        Returns lists of changesets that can be merged from this repo @org_rev
 
        to other_repo @other_rev
 
        ... and the other way
 
        ... and the ancestors that would be used for merge
 

	
 
        :param org_rev: the revision we want our compare to be made
 
        :param other_repo: repo object, most likely the fork of org_repo. It has
 
            all changesets that we need to obtain
 
        :param other_rev: revision we want out compare to be made on other_repo
 
        """
 
        org_changesets = []
 
        ancestors = None
 
        if org_rev == other_rev:
 
            other_changesets = []
 
        elif self != other_repo:
 
            gitrepo = Repo(self.path)
 
            SubprocessGitClient(thin_packs=False).fetch(other_repo.path, gitrepo)
 

	
 
            gitrepo_remote = Repo(other_repo.path)
 
            SubprocessGitClient(thin_packs=False).fetch(self.path, gitrepo_remote)
 

	
 
            revs = [
 
                ascii_str(x.commit.id)
 
                for x in gitrepo_remote.get_walker(include=[ascii_bytes(other_rev)],
 
                                                   exclude=[ascii_bytes(org_rev)])
 
            ]
 
            other_changesets = [other_repo.get_changeset(rev) for rev in reversed(revs)]
 
            if other_changesets:
 
                ancestors = [other_changesets[0].parents[0].raw_id]
 
            else:
 
                # no changesets from other repo, ancestor is the other_rev
 
                ancestors = [other_rev]
 

	
 
            gitrepo.close()
 
            gitrepo_remote.close()
 

	
 
        else:
 
            so = self.run_git_command(
 
                ['log', '--reverse', '--pretty=format:%H',
 
                 '-s', '%s..%s' % (org_rev, other_rev)]
 
            )
 
            other_changesets = [self.get_changeset(cs)
 
                          for cs in re.findall(r'[0-9a-fA-F]{40}', so)]
 
            so = self.run_git_command(
 
                ['merge-base', org_rev, other_rev]
 
            )
 
            ancestors = [re.findall(r'[0-9a-fA-F]{40}', so)[0]]
 

	
 
        return other_changesets, org_changesets, ancestors
 

	
 
    def get_diff(self, rev1, rev2, path=None, ignore_whitespace=False,
 
                 context=3):
 
        """
 
        Returns (git like) *diff*, as plain bytes text. Shows changes
 
        introduced by ``rev2`` since ``rev1``.
 

	
 
        :param rev1: Entry point from which diff is shown. Can be
 
          ``self.EMPTY_CHANGESET`` - in this case, patch showing all
 
          the changes since empty state of the repository until ``rev2``
 
        :param rev2: Until which revision changes should be shown.
 
        :param ignore_whitespace: If set to ``True``, would not show whitespace
 
          changes. Defaults to ``False``.
 
        :param context: How many lines before/after changed lines should be
 
          shown. Defaults to ``3``. Due to limitations in Git, if
 
          value passed-in is greater than ``2**31-1``
 
          (``2147483647``), it will be set to ``2147483647``
 
          instead. If negative value is passed-in, it will be set to
 
          ``0`` instead.
 
        """
 

	
 
        # Git internally uses a signed long int for storing context
 
        # size (number of lines to show before and after the
 
        # differences). This can result in integer overflow, so we
 
        # ensure the requested context is smaller by one than the
 
        # number that would cause the overflow. It is highly unlikely
 
        # that a single file will contain that many lines, so this
 
        # kind of change should not cause any realistic consequences.
 
        overflowed_long_int = 2**31
 

	
 
        if context >= overflowed_long_int:
 
            context = overflowed_long_int - 1
 

	
 
        # Negative context values make no sense, and will result in
 
        # errors. Ensure this does not happen.
 
        if context < 0:
 
            context = 0
 

	
 
        flags = ['-U%s' % context, '--full-index', '--binary', '-p', '-M', '--abbrev=40']
 
        if ignore_whitespace:
 
            flags.append('-w')
 

	
 
        if hasattr(rev1, 'raw_id'):
 
            rev1 = getattr(rev1, 'raw_id')
 

	
 
        if hasattr(rev2, 'raw_id'):
 
            rev2 = getattr(rev2, 'raw_id')
 

	
 
        if rev1 == self.EMPTY_CHANGESET:
 
            rev2 = self.get_changeset(rev2).raw_id
 
            cmd = ['show'] + flags + [rev2]
 
        else:
 
            rev1 = self.get_changeset(rev1).raw_id
 
            rev2 = self.get_changeset(rev2).raw_id
 
            cmd = ['diff'] + flags + [rev1, rev2]
 

	
 
        if path:
 
            cmd += ['--', path]
 

	
 
        stdout, stderr = self._run_git_command(cmd, cwd=self.path)
 
        # If we used 'show' command, strip first few lines (until actual diff
 
        # starts)
 
        if rev1 == self.EMPTY_CHANGESET:
 
            parts = stdout.split(b'\ndiff ', 1)
 
            if len(parts) > 1:
 
                stdout = b'diff ' + parts[1]
 
        return stdout
 

	
 
    @LazyProperty
 
    def in_memory_changeset(self):
 
        """
 
        Returns ``GitInMemoryChangeset`` object for this repository.
 
        """
 
        return inmemory.GitInMemoryChangeset(self)
 

	
 
    def clone(self, url, update_after_clone=True, bare=False):
 
        """
 
        Tries to clone changes from external location.
 

	
 
        :param update_after_clone: If set to ``False``, git won't checkout
 
          working directory
 
        :param bare: If set to ``True``, repository would be cloned into
 
          *bare* git repository (no working directory at all).
 
        """
 
        url = self._get_url(url)
 
        cmd = ['clone', '-q']
 
        if bare:
 
            cmd.append('--bare')
 
        elif not update_after_clone:
 
            cmd.append('--no-checkout')
 
        cmd += ['--', url, self.path]
 
        # If error occurs run_git_command raises RepositoryError already
 
        self.run_git_command(cmd)
 

	
 
    def pull(self, url):
 
        """
 
        Tries to pull changes from external location.
 
        """
 
        url = self._get_url(url)
 
        cmd = ['pull', '--ff-only', url]
 
        # If error occurs run_git_command raises RepositoryError already
 
        self.run_git_command(cmd)
 

	
 
    def fetch(self, url):
 
        """
 
        Tries to pull changes from external location.
 
        """
 
        url = self._get_url(url)
 
        so = self.run_git_command(['ls-remote', '-h', url])
 
        cmd = ['fetch', url, '--']
 
        for line in so.splitlines():
 
            sha, ref = line.split('\t')
 
            cmd.append('+%s:%s' % (ref, ref))
 
        self.run_git_command(cmd)
 

	
 
    def _update_server_info(self):
 
        """
 
        runs gits update-server-info command in this repo instance
 
        """
 
        try:
 
            update_server_info(self._repo)
 
        except OSError as e:
 
            if e.errno not in [errno.ENOENT, errno.EROFS]:
 
                raise
 
            # Workaround for dulwich crashing on for example its own dulwich/tests/data/repos/simple_merge.git/info/refs.lock
 
            log.error('Ignoring %s running update-server-info: %s', type(e).__name__, e)
 

	
 
    @LazyProperty
 
    def workdir(self):
 
        """
 
        Returns ``Workdir`` instance for this repository.
 
        """
 
        return workdir.GitWorkdir(self)
 

	
 
    def get_config_value(self, section, name, config_file=None):
 
        """
 
        Returns configuration value for a given [``section``] and ``name``.
 

	
 
        :param section: Section we want to retrieve value from
 
        :param name: Name of configuration we want to retrieve
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        if config_file is None:
 
            config_file = []
 
        elif isinstance(config_file, str):
 
            config_file = [config_file]
 

	
 
        def gen_configs():
 
            for path in config_file + self._config_files:
 
                try:
 
                    yield ConfigFile.from_path(path)
 
                except (IOError, OSError, ValueError):
 
                    continue
 

	
 
        for config in gen_configs():
 
            try:
 
                value = config.get(section, name)
 
            except KeyError:
 
                continue
 
            return None if value is None else safe_str(value)
 
        return None
 

	
 
    def get_user_name(self, config_file=None):
 
        """
 
        Returns user's name from global configuration file.
 

	
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        return self.get_config_value('user', 'name', config_file)
 

	
 
    def get_user_email(self, config_file=None):
 
        """
 
        Returns user's email from global configuration file.
 

	
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        return self.get_config_value('user', 'email', config_file)
0 comments (0 inline, 0 general)