Changeset - 3b5657fba7f3
[Not reviewed]
default
1 3 0
Mads Kiilerich (mads) - 5 years ago 2021-01-11 14:25:52
mads@kiilerich.com
Grafted from: 7d4e68b4d696
hooks: drop using Git pre-receive hooks - we don't need them
4 files changed with 18 insertions and 61 deletions:
0 comments (0 inline, 0 general)
kallithea/bin/vcs_hooks.py
Show inline comments
 
@@ -36,176 +36,168 @@ import paste.deploy
 
import kallithea
 
import kallithea.config.application
 
from kallithea.lib import hooks, webutils
 
from kallithea.lib.utils2 import HookEnvironmentError, ascii_str, get_hook_environment, safe_bytes, safe_str
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.utils.helpers import get_scm_size
 
from kallithea.model import db
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def repo_size(ui, repo, hooktype=None, **kwargs):
 
    """Show size of Mercurial repository.
 

	
 
    Called as Mercurial hook changegroup.kallithea_repo_size after push.
 
    """
 
    size_hg, size_root = get_scm_size('.hg', safe_str(repo.root))
 

	
 
    last_cs = repo[len(repo) - 1]
 

	
 
    msg = ('Repository size .hg: %s Checkout: %s Total: %s\n'
 
           'Last revision is now r%s:%s\n') % (
 
        webutils.format_byte_size(size_hg),
 
        webutils.format_byte_size(size_root),
 
        webutils.format_byte_size(size_hg + size_root),
 
        last_cs.rev(),
 
        ascii_str(last_cs.hex())[:12],
 
    )
 
    ui.status(safe_bytes(msg))
 

	
 

	
 
def update(ui, repo, hooktype=None, **kwargs):
 
    """Update repo after push. The equivalent to 'hg update' but using the same
 
    Mercurial as everything else.
 

	
 
    Called as Mercurial hook changegroup.kallithea_update after push.
 
    """
 
    try:
 
        ui.pushbuffer(error=True, subproc=True)
 
        rev = brev = None
 
        mercurial.hg.updatetotally(ui, repo, rev, brev)
 
    finally:
 
        s = ui.popbuffer()  # usually just "x files updated, x files merged, x files removed, x files unresolved"
 
        log.info('%s update hook output: %s', safe_str(repo.root), safe_str(s).rstrip())
 

	
 

	
 
def pull_action(ui, repo, **kwargs):
 
    """Logs user pull action
 

	
 
    Called as Mercurial hook outgoing.kallithea_pull_action.
 
    """
 
    hooks.log_pull_action()
 

	
 

	
 
def push_action(ui, repo, node, node_last, **kwargs):
 
    """
 
    Register that changes have been added to the repo - log the action *and* invalidate caches.
 
    Note: This hook is not only logging, but also the side effect invalidating
 
    caches! The function should perhaps be renamed.
 

	
 
    Called as Mercurial hook changegroup.kallithea_push_action .
 

	
 
    The pushed changesets is given by the revset 'node:node_last'.
 
    """
 
    revs = [ascii_str(repo[r].hex()) for r in mercurial.scmutil.revrange(repo, [b'%s:%s' % (node, node_last)])]
 
    hooks.process_pushed_raw_ids(revs)
 

	
 

	
 
def _git_hook_environment(repo_path):
 
    """
 
    Create a light-weight environment for stand-alone scripts and return an UI and the
 
    db repository.
 

	
 
    Git hooks are executed as subprocess of Git while Kallithea is waiting, and
 
    they thus need enough info to be able to create an app environment and
 
    connect to the database.
 
    """
 
    extras = get_hook_environment()
 

	
 
    path_to_ini_file = extras['config']
 
    config = paste.deploy.appconfig('config:' + path_to_ini_file)
 
    #logging.config.fileConfig(ini_file_path) # Note: we are in a different process - don't use configured logging
 
    kallithea.config.application.make_app(config.global_conf, **config.local_conf)
 

	
 
    # fix if it's not a bare repo
 
    if repo_path.endswith(os.sep + '.git'):
 
        repo_path = repo_path[:-5]
 

	
 
    repo = db.Repository.get_by_full_path(repo_path)
 
    if not repo:
 
        raise OSError('Repository %s not found in database' % repo_path)
 

	
 
    return repo
 

	
 

	
 
def pre_receive(repo_path, git_stdin_lines):
 
    """Called from Git pre-receive hook.
 
    The returned value is used as hook exit code and must be 0.
 
    """
 
    # Currently unused. TODO: remove?
 
    return 0
 

	
 

	
 
def post_receive(repo_path, git_stdin_lines):
 
    """Called from Git post-receive hook.
 
    The returned value is used as hook exit code and must be 0.
 
    """
 
    try:
 
        repo = _git_hook_environment(repo_path)
 
    except HookEnvironmentError as e:
 
        sys.stderr.write("Skipping Kallithea Git post-receive hook %r.\nGit was apparently not invoked by Kallithea: %s\n" % (sys.argv[0], e))
 
        return 0
 

	
 
    # the post push hook should never use the cached instance
 
    scm_repo = repo.scm_instance_no_cache()
 

	
 
    rev_data = []
 
    for l in git_stdin_lines:
 
        old_rev, new_rev, ref = l.strip().split(' ')
 
        _ref_data = ref.split('/')
 
        if _ref_data[1] in ['tags', 'heads']:
 
            rev_data.append({'old_rev': old_rev,
 
                             'new_rev': new_rev,
 
                             'ref': ref,
 
                             'type': _ref_data[1],
 
                             'name': '/'.join(_ref_data[2:])})
 

	
 
    git_revs = []
 
    for push_ref in rev_data:
 
        _type = push_ref['type']
 
        if _type == 'heads':
 
            if push_ref['old_rev'] == EmptyChangeset().raw_id:
 
                # update the symbolic ref if we push new repo
 
                if scm_repo.is_empty():
 
                    scm_repo._repo.refs.set_symbolic_ref(
 
                        b'HEAD',
 
                        b'refs/heads/%s' % safe_bytes(push_ref['name']))
 

	
 
                # build exclude list without the ref
 
                cmd = ['for-each-ref', '--format=%(refname)', 'refs/heads/*']
 
                stdout = scm_repo.run_git_command(cmd)
 
                ref = push_ref['ref']
 
                heads = [head for head in stdout.splitlines() if head != ref]
 
                # now list the git revs while excluding from the list
 
                cmd = ['log', push_ref['new_rev'], '--reverse', '--pretty=format:%H']
 
                cmd.append('--not')
 
                cmd.extend(heads) # empty list is ok
 
                stdout = scm_repo.run_git_command(cmd)
 
                git_revs += stdout.splitlines()
 

	
 
            elif push_ref['new_rev'] == EmptyChangeset().raw_id:
 
                # delete branch case
 
                git_revs += ['delete_branch=>%s' % push_ref['name']]
 
            else:
 
                cmd = ['log', '%(old_rev)s..%(new_rev)s' % push_ref,
 
                       '--reverse', '--pretty=format:%H']
 
                stdout = scm_repo.run_git_command(cmd)
 
                git_revs += stdout.splitlines()
 

	
 
        elif _type == 'tags':
 
            git_revs += ['tag=>%s' % push_ref['name']]
 

	
 
    hooks.process_pushed_raw_ids(git_revs)
 

	
 
    return 0
 

	
 

	
 
# Almost exactly like Mercurial contrib/hg-ssh:
 
def rejectpush(ui, **kwargs):
 
    """Mercurial hook to be installed as pretxnopen and prepushkey for read-only repos.
 
    Return value 1 will make the hook fail and reject the push.
 
    """
 
    ex = get_hook_environment()
 
    ui.warn(safe_bytes("Push access to %r denied\n" % ex.repository))
 
    return 1
kallithea/model/scm.py
Show inline comments
 
@@ -581,157 +581,156 @@ class ScmModel(object):
 
            # EmptyChangeset means we we're editing empty repository
 
            parents = None
 
        else:
 
            parents = [parent_cs]
 
        # add multiple nodes
 
        imc = scm_instance.in_memory_changeset
 
        for path, content in processed_nodes:
 
            imc.remove(FileNode(path, content=content))
 

	
 
        tip = imc.commit(message=message,
 
                         author=author,
 
                         parents=parents,
 
                         branch=parent_cs.branch)
 

	
 
        if trigger_push_hook:
 
            self._handle_push(scm_instance,
 
                              username=user.username,
 
                              ip_addr=ip_addr,
 
                              action='push_local',
 
                              repo_name=repo.repo_name,
 
                              revisions=[tip.raw_id])
 
        else:
 
            self.mark_for_invalidation(repo.repo_name)
 
        return tip
 

	
 
    def get_unread_journal(self):
 
        return db.UserLog.query().count()
 

	
 
    def get_repo_landing_revs(self, repo=None):
 
        """
 
        Generates select option with tags branches and bookmarks (for hg only)
 
        grouped by type
 

	
 
        :param repo:
 
        """
 

	
 
        hist_l = []
 
        choices = []
 
        hist_l.append(('rev:tip', _('latest tip')))
 
        choices.append('rev:tip')
 
        if repo is None:
 
            return choices, hist_l
 

	
 
        repo = self.__get_repo(repo)
 
        repo = repo.scm_instance
 

	
 
        branches_group = ([('branch:%s' % k, k) for k, v in
 
                           repo.branches.items()], _("Branches"))
 
        hist_l.append(branches_group)
 
        choices.extend([x[0] for x in branches_group[0]])
 

	
 
        if repo.alias == 'hg':
 
            bookmarks_group = ([('book:%s' % k, k) for k, v in
 
                                repo.bookmarks.items()], _("Bookmarks"))
 
            hist_l.append(bookmarks_group)
 
            choices.extend([x[0] for x in bookmarks_group[0]])
 

	
 
        tags_group = ([('tag:%s' % k, k) for k, v in
 
                       repo.tags.items()], _("Tags"))
 
        hist_l.append(tags_group)
 
        choices.extend([x[0] for x in tags_group[0]])
 

	
 
        return choices, hist_l
 

	
 
    def _get_git_hook_interpreter(self):
 
        """Return a suitable interpreter for Git hooks.
 

	
 
        Return a suitable string to be written in the POSIX #! shebang line for
 
        Git hook scripts so they invoke Kallithea code with the right Python
 
        interpreter and in the right environment.
 
        """
 
        # Note: sys.executable might not point at a usable Python interpreter. For
 
        # example, when using uwsgi, it will point at the uwsgi program itself.
 
        # FIXME This may not work on Windows and may need a shell wrapper script.
 
        return (kallithea.CONFIG.get('git_hook_interpreter')
 
                or sys.executable
 
                or '/usr/bin/env python3')
 

	
 
    def install_git_hooks(self, repo, force=False):
 
        """
 
        Creates a kallithea hook inside a git repository
 

	
 
        :param repo: Instance of VCS repo
 
        :param force: Overwrite existing non-Kallithea hooks
 
        """
 

	
 
        hooks_path = os.path.join(repo.path, 'hooks')
 
        if not repo.bare:
 
            hooks_path = os.path.join(repo.path, '.git', 'hooks')
 
        if not os.path.isdir(hooks_path):
 
            os.makedirs(hooks_path)
 

	
 
        tmpl_post = b"#!%s\n" % safe_bytes(self._get_git_hook_interpreter())
 
        tmpl_post += pkg_resources.resource_string(
 
            'kallithea', os.path.join('templates', 'py', 'git_post_receive_hook.py')
 
        )
 
        tmpl_pre = b"#!%s\n" % safe_bytes(self._get_git_hook_interpreter())
 
        tmpl_pre += pkg_resources.resource_string(
 
            'kallithea', os.path.join('templates', 'py', 'git_pre_receive_hook.py')
 
        )
 

	
 
        for h_type, tmpl in [('pre-receive', tmpl_pre), ('post-receive', tmpl_post)]:
 
        for h_type, tmpl in [('pre-receive', None), ('post-receive', tmpl_post)]:
 
            hook_file = os.path.join(hooks_path, h_type)
 
            other_hook = False
 
            log.debug('Installing git hook %s in repo %s', h_type, repo.path)
 
            if os.path.islink(hook_file):
 
                log.debug("Found symlink hook at %s", hook_file)
 
                other_hook = True
 
            elif os.path.isfile(hook_file):
 
                log.debug('hook file %s exists, checking if it is from kallithea', hook_file)
 
                with open(hook_file, 'rb') as f:
 
                    data = f.read()
 
                    matches = re.search(br'^KALLITHEA_HOOK_VER\s*=\s*(.*)$', data, flags=re.MULTILINE)
 
                    if matches:
 
                        ver = safe_str(matches.group(1))
 
                        log.debug('Found Kallithea hook - it has KALLITHEA_HOOK_VER %s', ver)
 
                    else:
 
                        log.debug('Found non-Kallithea hook at %s', hook_file)
 
                        other_hook = True
 
            elif os.path.exists(hook_file):
 
                log.debug("Found hook that isn't a regular file at %s", hook_file)
 
                other_hook = True
 
            if other_hook and not force:
 
                log.warning('skipping overwriting hook file %s', hook_file)
 
            else:
 
            elif h_type == 'post-receive':
 
                log.debug('writing hook file %s', hook_file)
 
                if other_hook:
 
                    backup_file = hook_file + '.bak'
 
                    log.warning('moving existing hook to %s', backup_file)
 
                    os.rename(hook_file, backup_file)
 
                try:
 
                    fh, fn = tempfile.mkstemp(prefix=hook_file + '.tmp.')
 
                    os.write(fh, tmpl.replace(b'_TMPL_', safe_bytes(kallithea.__version__)))
 
                    os.close(fh)
 
                    os.chmod(fn, 0o777 & ~umask)
 
                    os.rename(fn, hook_file)
 
                except (OSError, IOError) as e:
 
                    log.error('error writing hook %s: %s', hook_file, e)
 
            elif h_type == 'pre-receive':  # no longer used, so just remove any existing Kallithea hook
 
                if os.path.lexists(hook_file) and not other_hook:
 
                    os.remove(hook_file)
 

	
 

	
 
def AvailableRepoGroupChoices(repo_group_perm_level, extras=()):
 
    """Return group_id,string tuples with choices for all the repo groups where
 
    the user has the necessary permissions.
 

	
 
    Top level is -1.
 
    """
 
    groups = db.RepoGroup.query().all()
 
    if HasPermissionAny('hg.admin')('available repo groups'):
 
        groups.append(None)
 
    else:
 
        groups = list(RepoGroupList(groups, perm_level=repo_group_perm_level))
 
        if HasPermissionAny('hg.create.repository')('available repo groups'):
 
            groups.append(None)
 
        for extra in extras:
 
            if not any(rg == extra for rg in groups):
 
                groups.append(extra)
 
    return db.RepoGroup.groups_choices(groups=groups)
kallithea/templates/py/git_pre_receive_hook.py
Show inline comments
 
deleted file
kallithea/tests/vcs/test_git.py
Show inline comments
 
@@ -687,159 +687,162 @@ class TestGitSpecificWithRepo(_BackendTe
 
    def test_get_diff_does_not_sanitize_valid_context(self):
 
        almost_overflowed_long_int = 2**31-1
 

	
 
        self.repo._run_git_command = mock.Mock(return_value=(b'', b''))
 
        self.repo.get_diff(0, 1, 'foo', context=almost_overflowed_long_int)
 
        self.repo._run_git_command.assert_called_once_with(
 
            ['diff', '-U' + str(almost_overflowed_long_int), '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'], cwd=self.repo.path)
 

	
 
    def test_get_diff_sanitizes_overflowing_context(self):
 
        overflowed_long_int = 2**31
 
        sanitized_overflowed_long_int = overflowed_long_int-1
 

	
 
        self.repo._run_git_command = mock.Mock(return_value=(b'', b''))
 
        self.repo.get_diff(0, 1, 'foo', context=overflowed_long_int)
 

	
 
        self.repo._run_git_command.assert_called_once_with(
 
            ['diff', '-U' + str(sanitized_overflowed_long_int), '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'], cwd=self.repo.path)
 

	
 
    def test_get_diff_does_not_sanitize_zero_context(self):
 
        zero_context = 0
 

	
 
        self.repo._run_git_command = mock.Mock(return_value=(b'', b''))
 
        self.repo.get_diff(0, 1, 'foo', context=zero_context)
 

	
 
        self.repo._run_git_command.assert_called_once_with(
 
            ['diff', '-U' + str(zero_context), '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'], cwd=self.repo.path)
 

	
 
    def test_get_diff_sanitizes_negative_context(self):
 
        negative_context = -10
 

	
 
        self.repo._run_git_command = mock.Mock(return_value=(b'', b''))
 
        self.repo.get_diff(0, 1, 'foo', context=negative_context)
 

	
 
        self.repo._run_git_command.assert_called_once_with(
 
            ['diff', '-U0', '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'], cwd=self.repo.path)
 

	
 

	
 
class TestGitRegression(_BackendTestMixin):
 
    backend_alias = 'git'
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        return [
 
            {
 
                'message': 'Initial',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('bot/__init__.py', content='base'),
 
                    FileNode('bot/templates/404.html', content='base'),
 
                    FileNode('bot/templates/500.html', content='base'),
 
                ],
 
            },
 
            {
 
                'message': 'Second',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 22),
 
                'added': [
 
                    FileNode('bot/build/migrations/1.py', content='foo2'),
 
                    FileNode('bot/build/migrations/2.py', content='foo2'),
 
                    FileNode('bot/build/static/templates/f.html', content='foo2'),
 
                    FileNode('bot/build/static/templates/f1.html', content='foo2'),
 
                    FileNode('bot/build/templates/err.html', content='foo2'),
 
                    FileNode('bot/build/templates/err2.html', content='foo2'),
 
                ],
 
            },
 
        ]
 

	
 
    def test_similar_paths(self):
 
        cs = self.repo.get_changeset()
 
        paths = lambda *n: [x.path for x in n]
 
        assert paths(*cs.get_nodes('bot')) == ['bot/build', 'bot/templates', 'bot/__init__.py']
 
        assert paths(*cs.get_nodes('bot/build')) == ['bot/build/migrations', 'bot/build/static', 'bot/build/templates']
 
        assert paths(*cs.get_nodes('bot/build/static')) == ['bot/build/static/templates']
 
        # this get_nodes below causes troubles !
 
        assert paths(*cs.get_nodes('bot/build/static/templates')) == ['bot/build/static/templates/f.html', 'bot/build/static/templates/f1.html']
 
        assert paths(*cs.get_nodes('bot/build/templates')) == ['bot/build/templates/err.html', 'bot/build/templates/err2.html']
 
        assert paths(*cs.get_nodes('bot/templates/')) == ['bot/templates/404.html', 'bot/templates/500.html']
 

	
 

	
 
class TestGitHooks(object):
 
    """
 
    Tests related to hook functionality of Git repositories.
 
    """
 

	
 
    def setup_method(self):
 
        # For each run we want a fresh repo.
 
        self.repo_directory = get_new_dir("githookrepo")
 
        self.repo = GitRepository(self.repo_directory, create=True)
 

	
 
        # Create a dictionary where keys are hook names, and values are paths to
 
        # them in the non-bare repo. Deduplicates code in tests a bit.
 
        self.pre_receive = os.path.join(self.repo.path, '.git', 'hooks', "pre-receive")
 
        self.post_receive = os.path.join(self.repo.path, '.git', 'hooks', "post-receive")
 
        self.kallithea_hooks = {
 
            "pre-receive": os.path.join(self.repo.path, '.git', 'hooks', "pre-receive"),
 
            "post-receive": os.path.join(self.repo.path, '.git', 'hooks', "post-receive"),
 
            "pre-receive": self.pre_receive,
 
            "post-receive": self.post_receive,
 
        }
 

	
 
    def test_hooks_created_if_missing(self):
 
        """
 
        Tests if hooks are installed in repository if they are missing.
 
        """
 

	
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            if os.path.exists(hook_path):
 
                os.remove(hook_path)
 

	
 
        ScmModel().install_git_hooks(self.repo)
 

	
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            assert os.path.exists(hook_path)
 
        assert not os.path.exists(self.pre_receive)
 
        assert os.path.exists(self.post_receive)
 

	
 
    def test_kallithea_hooks_updated(self):
 
        """
 
        Tests if hooks are updated if they are Kallithea hooks already.
 
        """
 

	
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            with open(hook_path, "w") as f:
 
                f.write("KALLITHEA_HOOK_VER=0.0.0\nJUST_BOGUS")
 

	
 
        ScmModel().install_git_hooks(self.repo)
 

	
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            with open(hook_path) as f:
 
                assert "JUST_BOGUS" not in f.read()
 
        assert not os.path.exists(self.pre_receive)
 
        with open(self.post_receive) as f:
 
            assert "JUST_BOGUS" not in f.read()
 

	
 
    def test_custom_hooks_untouched(self):
 
        """
 
        Tests if hooks are left untouched if they are not Kallithea hooks.
 
        """
 

	
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            with open(hook_path, "w") as f:
 
                f.write("#!/bin/bash\n#CUSTOM_HOOK")
 

	
 
        ScmModel().install_git_hooks(self.repo)
 

	
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            with open(hook_path) as f:
 
                assert "CUSTOM_HOOK" in f.read()
 

	
 
    def test_custom_hooks_forced_update(self):
 
        """
 
        Tests if hooks are forcefully updated even though they are custom hooks.
 
        """
 

	
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            with open(hook_path, "w") as f:
 
                f.write("#!/bin/bash\n#CUSTOM_HOOK")
 

	
 
        ScmModel().install_git_hooks(self.repo, force=True)
 

	
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            with open(hook_path) as f:
 
                assert "KALLITHEA_HOOK_VER" in f.read()
 
        with open(self.pre_receive) as f:
 
            assert "KALLITHEA_HOOK_VER" not in f.read()
 
        with open(self.post_receive) as f:
 
            assert "KALLITHEA_HOOK_VER" in f.read()
0 comments (0 inline, 0 general)