Changeset - 39ba3ee88c7c
[Not reviewed]
default
0 1 0
Mads Kiilerich (mads) - 6 years ago 2019-12-31 15:39:17
mads@kiilerich.com
Grafted from: 4f6b9e6883fd
vcs: use more correct git revision identifiers

It will have the right types and make a difference for py3.
1 file changed with 4 insertions and 4 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/vcs/backends/git/changeset.py
Show inline comments
 
import re
 
from io import BytesIO
 
from itertools import chain
 
from subprocess import PIPE, Popen
 

	
 
from dulwich import objects
 
from dulwich.config import ConfigFile
 

	
 
from kallithea.lib.vcs.backends.base import BaseChangeset, EmptyChangeset
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError, ChangesetError, ImproperArchiveTypeError, NodeDoesNotExistError, RepositoryError, VCSError
 
from kallithea.lib.vcs.nodes import (
 
    AddedFileNodesGenerator, ChangedFileNodesGenerator, DirNode, FileNode, NodeKind, RemovedFileNodesGenerator, RootNode, SubModuleNode)
 
from kallithea.lib.vcs.utils import date_fromtimestamp, safe_int, safe_str, safe_unicode
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 

	
 

	
 
class GitChangeset(BaseChangeset):
 
    """
 
    Represents state of the repository at a revision.
 
    """
 

	
 
    def __init__(self, repository, revision):
 
        self._stat_modes = {}
 
        self.repository = repository
 
        revision = safe_str(revision)
 
        try:
 
            commit = self.repository._repo[revision]
 
            if isinstance(commit, objects.Tag):
 
                revision = safe_str(commit.object[1])
 
                commit = self.repository._repo.get_object(commit.object[1])
 
        except KeyError:
 
            raise RepositoryError("Cannot get object with id %s" % revision)
 
        self.raw_id = revision
 
        self.id = self.raw_id
 
        self.short_id = self.raw_id[:12]
 
        self._commit = commit
 
        self._tree_id = commit.tree
 
        self._committer_property = 'committer'
 
        self._author_property = 'author'
 
        self._date_property = 'commit_time'
 
        self._date_tz_property = 'commit_timezone'
 
        self.revision = repository.revisions.index(revision)
 
        self.revision = repository.revisions.index(self.raw_id)
 

	
 
        self.nodes = {}
 
        self._paths = {}
 

	
 
    @LazyProperty
 
    def bookmarks(self):
 
        return ()
 

	
 
    @LazyProperty
 
    def message(self):
 
        return safe_unicode(self._commit.message)
 

	
 
    @LazyProperty
 
    def committer(self):
 
        return safe_unicode(getattr(self._commit, self._committer_property))
 

	
 
    @LazyProperty
 
    def author(self):
 
        return safe_unicode(getattr(self._commit, self._author_property))
 

	
 
    @LazyProperty
 
    def date(self):
 
        return date_fromtimestamp(getattr(self._commit, self._date_property),
 
                                  getattr(self._commit, self._date_tz_property))
 

	
 
    @LazyProperty
 
    def _timestamp(self):
 
        return getattr(self._commit, self._date_property)
 

	
 
    @LazyProperty
 
    def status(self):
 
        """
 
        Returns modified, added, removed, deleted files for current changeset
 
        """
 
        return self.changed, self.added, self.removed
 

	
 
    @LazyProperty
 
    def tags(self):
 
        _tags = []
 
        for tname, tsha in self.repository.tags.iteritems():
 
            if tsha == self.raw_id:
 
                _tags.append(tname)
 
        return _tags
 

	
 
    @LazyProperty
 
    def branch(self):
 
        # Note: This function will return one branch name for the changeset -
 
        # that might not make sense in Git where branches() is a better match
 
        # for the basic model
 
        heads = self.repository._heads(reverse=False)
 
        ref = heads.get(self.raw_id)
 
        ref = heads.get(self._commit.id)
 
        if ref:
 
            return safe_unicode(ref)
 

	
 
    @LazyProperty
 
    def branches(self):
 
        heads = self.repository._heads(reverse=True)
 
        return [b for b in heads if heads[b] == self.raw_id] # FIXME: Inefficient ... and returning None!
 

	
 
    def _fix_path(self, path):
 
        """
 
        Paths are stored without trailing slash so we need to get rid off it if
 
        needed.
 
        """
 
        if path.endswith('/'):
 
            path = path.rstrip('/')
 
        return path
 

	
 
    def _get_id_for_path(self, path):
 
        path = safe_str(path)
 
        # FIXME: Please, spare a couple of minutes and make those codes cleaner;
 
        if path not in self._paths:
 
            path = path.strip('/')
 
            # set root tree
 
            tree = self.repository._repo[self._tree_id]
 
            if path == '':
 
                self._paths[''] = tree.id
 
                return tree.id
 
            splitted = path.split('/')
 
            dirs, name = splitted[:-1], splitted[-1]
 
            curdir = ''
 

	
 
            # initially extract things from root dir
 
            for item, stat, id in tree.iteritems():
 
                if curdir:
 
                    name = '/'.join((curdir, item))
 
                else:
 
                    name = item
 
                self._paths[name] = id
 
                self._stat_modes[name] = stat
 

	
 
            for dir in dirs:
 
                if curdir:
 
                    curdir = '/'.join((curdir, dir))
 
                else:
 
                    curdir = dir
 
                dir_id = None
 
                for item, stat, id in tree.iteritems():
 
                    if dir == item:
 
                        dir_id = id
 
                if dir_id:
 
                    # Update tree
 
                    tree = self.repository._repo[dir_id]
 
                    if not isinstance(tree, objects.Tree):
 
                        raise ChangesetError('%s is not a directory' % curdir)
 
                else:
 
                    raise ChangesetError('%s have not been found' % curdir)
 

	
 
                # cache all items from the given traversed tree
 
                for item, stat, id in tree.iteritems():
 
                    if curdir:
 
                        name = '/'.join((curdir, item))
 
                    else:
 
                        name = item
 
                    self._paths[name] = id
 
                    self._stat_modes[name] = stat
 
            if path not in self._paths:
 
                raise NodeDoesNotExistError("There is no file nor directory "
 
                    "at the given path '%s' at revision %s"
 
                    % (path, safe_str(self.short_id)))
 
        return self._paths[path]
 

	
 
    def _get_kind(self, path):
 
        obj = self.repository._repo[self._get_id_for_path(path)]
 
        if isinstance(obj, objects.Blob):
 
            return NodeKind.FILE
 
        elif isinstance(obj, objects.Tree):
 
            return NodeKind.DIR
 

	
 
    def _get_filectx(self, path):
 
        path = self._fix_path(path)
 
        if self._get_kind(path) != NodeKind.FILE:
 
            raise ChangesetError("File does not exist for revision %s at "
 
                " '%s'" % (self.raw_id, path))
 
        return path
 

	
 
    def _get_file_nodes(self):
 
        return chain(*(t[2] for t in self.walk()))
 

	
 
    @LazyProperty
 
    def parents(self):
 
        """
 
        Returns list of parents changesets.
 
        """
 
        return [self.repository.get_changeset(parent)
 
                for parent in self._commit.parents]
 

	
 
@@ -412,151 +412,151 @@ class GitChangeset(BaseChangeset):
 
        tree = self.repository._repo[id]
 
        dirnodes = []
 
        filenodes = []
 
        als = self.repository.alias
 
        for name, stat, id in tree.iteritems():
 
            if path != '':
 
                obj_path = '/'.join((path, name))
 
            else:
 
                obj_path = name
 
            if objects.S_ISGITLINK(stat):
 
                root_tree = self.repository._repo[self._tree_id]
 
                cf = ConfigFile.from_file(BytesIO(self.repository._repo.get_object(root_tree['.gitmodules'][1]).data))
 
                url = cf.get(('submodule', obj_path), 'url')
 
                dirnodes.append(SubModuleNode(obj_path, url=url, changeset=id,
 
                                              alias=als))
 
                continue
 

	
 
            obj = self.repository._repo.get_object(id)
 
            if obj_path not in self._stat_modes:
 
                self._stat_modes[obj_path] = stat
 
            if isinstance(obj, objects.Tree):
 
                dirnodes.append(DirNode(obj_path, changeset=self))
 
            elif isinstance(obj, objects.Blob):
 
                filenodes.append(FileNode(obj_path, changeset=self, mode=stat))
 
            else:
 
                raise ChangesetError("Requested object should be Tree "
 
                                     "or Blob, is %r" % type(obj))
 
        nodes = dirnodes + filenodes
 
        for node in nodes:
 
            if node.path not in self.nodes:
 
                self.nodes[node.path] = node
 
        nodes.sort()
 
        return nodes
 

	
 
    def get_node(self, path):
 
        """
 
        Returns ``Node`` object from the given ``path``. If there is no node at
 
        the given ``path``, ``ChangesetError`` would be raised.
 
        """
 
        path = self._fix_path(path)
 
        if path not in self.nodes:
 
            try:
 
                id_ = self._get_id_for_path(path)
 
            except ChangesetError:
 
                raise NodeDoesNotExistError("Cannot find one of parents' "
 
                    "directories for a given path: %s" % path)
 

	
 
            _GL = lambda m: m and objects.S_ISGITLINK(m)
 
            if _GL(self._stat_modes.get(path)):
 
                tree = self.repository._repo[self._tree_id]
 
                cf = ConfigFile.from_file(BytesIO(self.repository._repo.get_object(tree['.gitmodules'][1]).data))
 
                url = cf.get(('submodule', path), 'url')
 
                node = SubModuleNode(path, url=url, changeset=id_,
 
                                     alias=self.repository.alias)
 
            else:
 
                obj = self.repository._repo.get_object(id_)
 

	
 
                if isinstance(obj, objects.Tree):
 
                    if path == '':
 
                        node = RootNode(changeset=self)
 
                    else:
 
                        node = DirNode(path, changeset=self)
 
                    node._tree = obj
 
                elif isinstance(obj, objects.Blob):
 
                    node = FileNode(path, changeset=self)
 
                    node._blob = obj
 
                else:
 
                    raise NodeDoesNotExistError("There is no file nor directory "
 
                        "at the given path: '%s' at revision %s"
 
                        % (path, self.short_id))
 
            # cache node
 
            self.nodes[path] = node
 
        return self.nodes[path]
 

	
 
    @LazyProperty
 
    def affected_files(self):
 
        """
 
        Gets a fast accessible file changes for given changeset
 
        """
 
        added, modified, deleted = self._changes_cache
 
        return list(added.union(modified).union(deleted))
 

	
 
    @LazyProperty
 
    def _changes_cache(self):
 
        added = set()
 
        modified = set()
 
        deleted = set()
 
        _r = self.repository._repo
 

	
 
        parents = self.parents
 
        if not self.parents:
 
            parents = [EmptyChangeset()]
 
        for parent in parents:
 
            if isinstance(parent, EmptyChangeset):
 
                oid = None
 
            else:
 
                oid = _r[parent.raw_id].tree
 
            changes = _r.object_store.tree_changes(oid, _r[self.raw_id].tree)
 
                oid = _r[parent._commit.id].tree
 
            changes = _r.object_store.tree_changes(oid, _r[self._commit.id].tree)
 
            for (oldpath, newpath), (_, _), (_, _) in changes:
 
                if newpath and oldpath:
 
                    modified.add(newpath)
 
                elif newpath and not oldpath:
 
                    added.add(newpath)
 
                elif not newpath and oldpath:
 
                    deleted.add(oldpath)
 
        return added, modified, deleted
 

	
 
    def _get_paths_for_status(self, status):
 
        """
 
        Returns sorted list of paths for given ``status``.
 

	
 
        :param status: one of: *added*, *modified* or *deleted*
 
        """
 
        added, modified, deleted = self._changes_cache
 
        return sorted({
 
            'added': list(added),
 
            'modified': list(modified),
 
            'deleted': list(deleted)}[status]
 
        )
 

	
 
    @LazyProperty
 
    def added(self):
 
        """
 
        Returns list of added ``FileNode`` objects.
 
        """
 
        if not self.parents:
 
            return list(self._get_file_nodes())
 
        return AddedFileNodesGenerator([n for n in
 
                                self._get_paths_for_status('added')], self)
 

	
 
    @LazyProperty
 
    def changed(self):
 
        """
 
        Returns list of modified ``FileNode`` objects.
 
        """
 
        if not self.parents:
 
            return []
 
        return ChangedFileNodesGenerator([n for n in
 
                                self._get_paths_for_status('modified')], self)
 

	
 
    @LazyProperty
 
    def removed(self):
 
        """
 
        Returns list of removed ``FileNode`` objects.
 
        """
 
        if not self.parents:
 
            return []
 
        return RemovedFileNodesGenerator([n for n in
 
                                self._get_paths_for_status('deleted')], self)
 

	
 
    extra = {}
0 comments (0 inline, 0 general)