Changeset - aa067dbcdc82
[Not reviewed]
default
0 7 1
Mads Kiilerich (mads) - 5 years ago 2020-11-04 21:00:44
mads@kiilerich.com
Grafted from: c71575e5ba9e
hooks: move the vcs hook entry points and setup code out of lib

Mercurial hooks are running in a process that already has been initialized, so
they invoke the hooks lib directly. Git hooks are binaries and need a lot of
initialization before they can do the same. Move this extra setup code
elsewhere.

Having this high level code in bin is perhaps also not ideal, but it also
doesn't seem that bad: that is where other command line entry points invoke
make_app.

(It seems like it could be adventageous to somehow use "real" bin commands for
hooks ... but for now we use the home-made templates.)

Note: As a side effect of this change, all git hooks *must* be re-installed
when upgrading.
8 files changed with 63 insertions and 431 deletions:
0 comments (0 inline, 0 general)
kallithea/bin/vcs_hooks.py
Show inline comments
 
file copied from kallithea/lib/hooks.py to kallithea/bin/vcs_hooks.py
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.hooks
 
kallithea.bin.vcs_hooks
 
~~~~~~~~~~~~~~~~~~~
 

	
 
Hooks run by Kallithea
 
Entry points for Kallithea hooking into Mercurial and Git.
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Aug 6, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import os
 
import sys
 
import time
 

	
 
import mercurial.scmutil
 
import paste.deploy
 

	
 
import kallithea
 
from kallithea.lib import webutils
 
from kallithea.lib.exceptions import UserCreationError
 
from kallithea.lib.utils import action_logger, make_ui
 
import kallithea.config.application
 
from kallithea.lib import hooks, webutils
 
from kallithea.lib.utils2 import HookEnvironmentError, ascii_str, get_hook_environment, safe_bytes, safe_str
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.utils.helpers import get_scm_size
 
from kallithea.model import db
 

	
 

	
 
def _get_scm_size(alias, root_path):
 
    if not alias.startswith('.'):
 
        alias += '.'
 

	
 
    size_scm, size_root = 0, 0
 
    for path, dirs, files in os.walk(root_path):
 
        if path.find(alias) != -1:
 
            for f in files:
 
                try:
 
                    size_scm += os.path.getsize(os.path.join(path, f))
 
                except OSError:
 
                    pass
 
        else:
 
            for f in files:
 
                try:
 
                    size_root += os.path.getsize(os.path.join(path, f))
 
                except OSError:
 
                    pass
 

	
 
    size_scm_f = webutils.format_byte_size(size_scm)
 
    size_root_f = webutils.format_byte_size(size_root)
 
    size_total_f = webutils.format_byte_size(size_root + size_scm)
 

	
 
    return size_scm_f, size_root_f, size_total_f
 

	
 

	
 
def repo_size(ui, repo, hooktype=None, **kwargs):
 
    """Show size of Mercurial repository.
 

	
 
    Called as Mercurial hook changegroup.repo_size after push.
 
    """
 
    size_hg_f, size_root_f, size_total_f = _get_scm_size('.hg', safe_str(repo.root))
 
    size_hg, size_root = get_scm_size('.hg', safe_str(repo.root))
 

	
 
    last_cs = repo[len(repo) - 1]
 

	
 
    msg = ('Repository size .hg: %s Checkout: %s Total: %s\n'
 
           'Last revision is now r%s:%s\n') % (
 
        size_hg_f, size_root_f, size_total_f, last_cs.rev(), ascii_str(last_cs.hex())[:12]
 
        webutils.format_byte_size(size_hg),
 
        webutils.format_byte_size(size_root),
 
        webutils.format_byte_size(size_hg + size_root),
 
        last_cs.rev(),
 
        ascii_str(last_cs.hex())[:12],
 
    )
 
    ui.status(safe_bytes(msg))
 

	
 

	
 
def log_pull_action(*args, **kwargs):
 
    """Logs user last pull action
 

	
 
    Called as Mercurial hook outgoing.kallithea_log_pull_action or from Kallithea before invoking Git.
 

	
 
    Does *not* use the action from the hook environment but is always 'pull'.
 
    """
 
    ex = get_hook_environment()
 
def pull_action(ui, repo, **kwargs):
 
    """Logs user pull action
 

	
 
    action = 'pull'
 
    action_logger(ex.username, action, ex.repository, ex.ip, commit=True)
 
    # extension hook call
 
    callback = getattr(kallithea.EXTENSIONS, 'PULL_HOOK', None)
 
    if callable(callback):
 
        kw = {}
 
        kw.update(ex)
 
        callback(**kw)
 
    Called as Mercurial hook outgoing.kallithea_pull_action.
 
    """
 
    hooks.log_pull_action()
 

	
 

	
 
def log_push_action(ui, repo, node, node_last, **kwargs):
 
def push_action(ui, repo, node, node_last, **kwargs):
 
    """
 
    Register that changes have been added to the repo - log the action *and* invalidate caches.
 
    Note: This hook is not only logging, but also the side effect invalidating
 
    caches! The function should perhaps be renamed.
 

	
 
    Called as Mercurial hook changegroup.kallithea_log_push_action .
 
    Called as Mercurial hook changegroup.kallithea_push_action .
 

	
 
    The pushed changesets is given by the revset 'node:node_last'.
 
    """
 
    revs = [ascii_str(repo[r].hex()) for r in mercurial.scmutil.revrange(repo, [b'%s:%s' % (node, node_last)])]
 
    process_pushed_raw_ids(revs)
 

	
 

	
 
def process_pushed_raw_ids(revs):
 
    """
 
    Register that changes have been added to the repo - log the action *and* invalidate caches.
 

	
 
    Called from Mercurial changegroup.kallithea_log_push_action calling hook log_push_action,
 
    or from the Git post-receive hook calling handle_git_post_receive ...
 
    or from scm _handle_push.
 
    """
 
    ex = get_hook_environment()
 

	
 
    action = '%s:%s' % (ex.action, ','.join(revs))
 
    action_logger(ex.username, action, ex.repository, ex.ip, commit=True)
 

	
 
    from kallithea.model.scm import ScmModel
 
    ScmModel().mark_for_invalidation(ex.repository)
 

	
 
    # extension hook call
 
    callback = getattr(kallithea.EXTENSIONS, 'PUSH_HOOK', None)
 
    if callable(callback):
 
        kw = {'pushed_revs': revs}
 
        kw.update(ex)
 
        callback(**kw)
 

	
 

	
 
def log_create_repository(repository_dict, created_by, **kwargs):
 
    """
 
    Post create repository Hook.
 

	
 
    :param repository: dict dump of repository object
 
    :param created_by: username who created repository
 

	
 
    available keys of repository_dict:
 

	
 
     'repo_type',
 
     'description',
 
     'private',
 
     'created_on',
 
     'enable_downloads',
 
     'repo_id',
 
     'owner_id',
 
     'enable_statistics',
 
     'clone_uri',
 
     'fork_id',
 
     'group_id',
 
     'repo_name'
 

	
 
    """
 
    callback = getattr(kallithea.EXTENSIONS, 'CREATE_REPO_HOOK', None)
 
    if callable(callback):
 
        kw = {}
 
        kw.update(repository_dict)
 
        kw.update({'created_by': created_by})
 
        kw.update(kwargs)
 
        callback(**kw)
 

	
 

	
 
def check_allowed_create_user(user_dict, created_by, **kwargs):
 
    # pre create hooks
 
    callback = getattr(kallithea.EXTENSIONS, 'PRE_CREATE_USER_HOOK', None)
 
    if callable(callback):
 
        allowed, reason = callback(created_by=created_by, **user_dict)
 
        if not allowed:
 
            raise UserCreationError(reason)
 
    hooks.process_pushed_raw_ids(revs)
 

	
 

	
 
def log_create_user(user_dict, created_by, **kwargs):
 
    """
 
    Post create user Hook.
 

	
 
    :param user_dict: dict dump of user object
 

	
 
    available keys for user_dict:
 

	
 
     'username',
 
     'full_name_or_username',
 
     'full_contact',
 
     'user_id',
 
     'name',
 
     'firstname',
 
     'short_contact',
 
     'admin',
 
     'lastname',
 
     'ip_addresses',
 
     'ldap_dn',
 
     'email',
 
     'api_key',
 
     'last_login',
 
     'full_name',
 
     'active',
 
     'password',
 
     'emails',
 

	
 
    """
 
    callback = getattr(kallithea.EXTENSIONS, 'CREATE_USER_HOOK', None)
 
    if callable(callback):
 
        callback(created_by=created_by, **user_dict)
 

	
 

	
 
def log_create_pullrequest(pullrequest_dict, created_by, **kwargs):
 
    """
 
    Post create pull request hook.
 

	
 
    :param pullrequest_dict: dict dump of pull request object
 
    """
 
    callback = getattr(kallithea.EXTENSIONS, 'CREATE_PULLREQUEST_HOOK', None)
 
    if callable(callback):
 
        return callback(created_by=created_by, **pullrequest_dict)
 

	
 
    return 0
 

	
 
def log_delete_repository(repository_dict, deleted_by, **kwargs):
 
    """
 
    Post delete repository Hook.
 

	
 
    :param repository: dict dump of repository object
 
    :param deleted_by: username who deleted the repository
 

	
 
    available keys of repository_dict:
 

	
 
     'repo_type',
 
     'description',
 
     'private',
 
     'created_on',
 
     'enable_downloads',
 
     'repo_id',
 
     'owner_id',
 
     'enable_statistics',
 
     'clone_uri',
 
     'fork_id',
 
     'group_id',
 
     'repo_name'
 

	
 
    """
 
    callback = getattr(kallithea.EXTENSIONS, 'DELETE_REPO_HOOK', None)
 
    if callable(callback):
 
        kw = {}
 
        kw.update(repository_dict)
 
        kw.update({'deleted_by': deleted_by,
 
                   'deleted_on': time.time()})
 
        kw.update(kwargs)
 
        callback(**kw)
 

	
 

	
 
def log_delete_user(user_dict, deleted_by, **kwargs):
 
    """
 
    Post delete user Hook.
 

	
 
    :param user_dict: dict dump of user object
 

	
 
    available keys for user_dict:
 

	
 
     'username',
 
     'full_name_or_username',
 
     'full_contact',
 
     'user_id',
 
     'name',
 
     'firstname',
 
     'short_contact',
 
     'admin',
 
     'lastname',
 
     'ip_addresses',
 
     'ldap_dn',
 
     'email',
 
     'api_key',
 
     'last_login',
 
     'full_name',
 
     'active',
 
     'password',
 
     'emails',
 

	
 
    """
 
    callback = getattr(kallithea.EXTENSIONS, 'DELETE_USER_HOOK', None)
 
    if callable(callback):
 
        callback(deleted_by=deleted_by, **user_dict)
 

	
 

	
 
def _hook_environment(repo_path):
 
def _git_hook_environment(repo_path):
 
    """
 
    Create a light-weight environment for stand-alone scripts and return an UI and the
 
    db repository.
 

	
 
    Git hooks are executed as subprocess of Git while Kallithea is waiting, and
 
    they thus need enough info to be able to create an app environment and
 
    connect to the database.
 
    """
 
    import kallithea.config.application
 

	
 
    extras = get_hook_environment()
 

	
 
    path_to_ini_file = extras['config']
 
    config = paste.deploy.appconfig('config:' + path_to_ini_file)
 
    #logging.config.fileConfig(ini_file_path) # Note: we are in a different process - don't use configured logging
 
    kallithea.config.application.make_app(config.global_conf, **config.local_conf)
 

	
 
    # fix if it's not a bare repo
 
    if repo_path.endswith(os.sep + '.git'):
 
        repo_path = repo_path[:-5]
 

	
 
    repo = db.Repository.get_by_full_path(repo_path)
 
    if not repo:
 
        raise OSError('Repository %s not found in database' % repo_path)
 

	
 
    baseui = make_ui()
 
    return baseui, repo
 
    return repo
 

	
 

	
 
def handle_git_pre_receive(repo_path, git_stdin_lines):
 
def pre_receive(repo_path, git_stdin_lines):
 
    """Called from Git pre-receive hook.
 
    The returned value is used as hook exit code and must be 0.
 
    """
 
    # Currently unused. TODO: remove?
 
    return 0
 

	
 

	
 
def handle_git_post_receive(repo_path, git_stdin_lines):
 
def post_receive(repo_path, git_stdin_lines):
 
    """Called from Git post-receive hook.
 
    The returned value is used as hook exit code and must be 0.
 
    """
 
    try:
 
        baseui, repo = _hook_environment(repo_path)
 
        repo = _git_hook_environment(repo_path)
 
    except HookEnvironmentError as e:
 
        sys.stderr.write("Skipping Kallithea Git post-receive hook %r.\nGit was apparently not invoked by Kallithea: %s\n" % (sys.argv[0], e))
 
        return 0
 

	
 
    # the post push hook should never use the cached instance
 
    scm_repo = repo.scm_instance_no_cache()
 

	
 
    rev_data = []
 
    for l in git_stdin_lines:
 
        old_rev, new_rev, ref = l.strip().split(' ')
 
        _ref_data = ref.split('/')
 
        if _ref_data[1] in ['tags', 'heads']:
 
            rev_data.append({'old_rev': old_rev,
 
                             'new_rev': new_rev,
 
                             'ref': ref,
 
                             'type': _ref_data[1],
 
                             'name': '/'.join(_ref_data[2:])})
 

	
 
    git_revs = []
 
    for push_ref in rev_data:
 
        _type = push_ref['type']
 
        if _type == 'heads':
 
            if push_ref['old_rev'] == EmptyChangeset().raw_id:
 
                # update the symbolic ref if we push new repo
 
                if scm_repo.is_empty():
 
                    scm_repo._repo.refs.set_symbolic_ref(
 
                        b'HEAD',
 
                        b'refs/heads/%s' % safe_bytes(push_ref['name']))
 

	
 
                # build exclude list without the ref
 
                cmd = ['for-each-ref', '--format=%(refname)', 'refs/heads/*']
 
                stdout = scm_repo.run_git_command(cmd)
 
                ref = push_ref['ref']
 
                heads = [head for head in stdout.splitlines() if head != ref]
 
                # now list the git revs while excluding from the list
 
                cmd = ['log', push_ref['new_rev'], '--reverse', '--pretty=format:%H']
 
                cmd.append('--not')
 
                cmd.extend(heads) # empty list is ok
 
                stdout = scm_repo.run_git_command(cmd)
 
                git_revs += stdout.splitlines()
 

	
 
            elif push_ref['new_rev'] == EmptyChangeset().raw_id:
 
                # delete branch case
 
                git_revs += ['delete_branch=>%s' % push_ref['name']]
 
            else:
 
                cmd = ['log', '%(old_rev)s..%(new_rev)s' % push_ref,
 
                       '--reverse', '--pretty=format:%H']
 
                stdout = scm_repo.run_git_command(cmd)
 
                git_revs += stdout.splitlines()
 

	
 
        elif _type == 'tags':
 
            git_revs += ['tag=>%s' % push_ref['name']]
 

	
 
    process_pushed_raw_ids(git_revs)
 
    hooks.process_pushed_raw_ids(git_revs)
 

	
 
    return 0
 

	
 

	
 
# Almost exactly like Mercurial contrib/hg-ssh:
 
def rejectpush(ui, **kwargs):
 
    """Mercurial hook to be installed as pretxnopen and prepushkey for read-only repos.
 
    Return value 1 will make the hook fail and reject the push.
 
    """
 
    ex = get_hook_environment()
 
    ui.warn(safe_bytes("Push access to %r denied\n" % ex.repository))
 
    return 1
kallithea/config/middleware/simplegit.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.middleware.simplegit
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
SimpleGit middleware for handling Git protocol requests (push/clone etc.)
 
It's implemented with basic auth function
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 28, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 

	
 
"""
 

	
 

	
 
import logging
 
import re
 

	
 
from kallithea.config.middleware.pygrack import make_wsgi_app
 
from kallithea.lib import hooks
 
from kallithea.lib.base import BaseVCSController, get_path_info
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
GIT_PROTO_PAT = re.compile(r'^/(.+)/(info/refs|git-upload-pack|git-receive-pack)$')
 

	
 

	
 
cmd_mapping = {
 
    'git-receive-pack': 'push',
 
    'git-upload-pack': 'pull',
 
}
 

	
 

	
 
class SimpleGit(BaseVCSController):
 

	
 
    scm_alias = 'git'
 

	
 
    @classmethod
 
    def parse_request(cls, environ):
 
        path_info = get_path_info(environ)
 
        m = GIT_PROTO_PAT.match(path_info)
 
        if m is None:
 
            return None
 

	
 
        class parsed_request(object):
 
            # See https://git-scm.com/book/en/v2/Git-Internals-Transfer-Protocols#_the_smart_protocol
 
            repo_name = m.group(1).rstrip('/')
 
            cmd = m.group(2)
 

	
 
            query_string = environ['QUERY_STRING']
 
            if cmd == 'info/refs' and query_string.startswith('service='):
 
                service = query_string.split('=', 1)[1]
 
                action = cmd_mapping.get(service)
 
            else:
 
                service = None
 
                action = cmd_mapping.get(cmd)
 

	
 
        return parsed_request
 

	
 
    def _make_app(self, parsed_request):
 
        """
 
        Return a pygrack wsgi application.
 
        """
 
        pygrack_app = make_wsgi_app(parsed_request.repo_name, self.basepath)
 

	
 
        def wrapper_app(environ, start_response):
 
            if (parsed_request.cmd == 'info/refs' and
 
                parsed_request.service == 'git-upload-pack'
 
            ):
 
                # Run hooks like Mercurial outgoing.kallithea_log_pull_action does
 
                # Run hooks like Mercurial outgoing.kallithea_pull_action does
 
                hooks.log_pull_action()
 
            # Note: push hooks are handled by post-receive hook
 

	
 
            return pygrack_app(environ, start_response)
 

	
 
        return wrapper_app
kallithea/lib/hooks.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.hooks
 
~~~~~~~~~~~~~~~~~~~
 

	
 
Hooks run by Kallithea
 
Hooks run by Kallithea. Generally called 'log_*', but will also do important
 
invalidation of caches and run extension hooks.
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Aug 6, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import os
 
import sys
 
import time
 

	
 
import mercurial.scmutil
 
import paste.deploy
 

	
 
import kallithea
 
from kallithea.lib import webutils
 
from kallithea.lib.exceptions import UserCreationError
 
from kallithea.lib.utils import action_logger, make_ui
 
from kallithea.lib.utils2 import HookEnvironmentError, ascii_str, get_hook_environment, safe_bytes, safe_str
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.model import db
 
from kallithea.lib.utils import action_logger
 
from kallithea.lib.utils2 import get_hook_environment
 

	
 

	
 
def _get_scm_size(alias, root_path):
 
    if not alias.startswith('.'):
 
        alias += '.'
 

	
 
    size_scm, size_root = 0, 0
 
    for path, dirs, files in os.walk(root_path):
 
        if path.find(alias) != -1:
 
            for f in files:
 
                try:
 
                    size_scm += os.path.getsize(os.path.join(path, f))
 
                except OSError:
 
                    pass
 
        else:
 
            for f in files:
 
                try:
 
                    size_root += os.path.getsize(os.path.join(path, f))
 
                except OSError:
 
                    pass
 

	
 
    size_scm_f = webutils.format_byte_size(size_scm)
 
    size_root_f = webutils.format_byte_size(size_root)
 
    size_total_f = webutils.format_byte_size(size_root + size_scm)
 

	
 
    return size_scm_f, size_root_f, size_total_f
 

	
 

	
 
def repo_size(ui, repo, hooktype=None, **kwargs):
 
    """Show size of Mercurial repository.
 

	
 
    Called as Mercurial hook changegroup.repo_size after push.
 
    """
 
    size_hg_f, size_root_f, size_total_f = _get_scm_size('.hg', safe_str(repo.root))
 

	
 
    last_cs = repo[len(repo) - 1]
 

	
 
    msg = ('Repository size .hg: %s Checkout: %s Total: %s\n'
 
           'Last revision is now r%s:%s\n') % (
 
        size_hg_f, size_root_f, size_total_f, last_cs.rev(), ascii_str(last_cs.hex())[:12]
 
    )
 
    ui.status(safe_bytes(msg))
 

	
 

	
 
def log_pull_action(*args, **kwargs):
 
def log_pull_action():
 
    """Logs user last pull action
 

	
 
    Called as Mercurial hook outgoing.kallithea_log_pull_action or from Kallithea before invoking Git.
 

	
 
    Does *not* use the action from the hook environment but is always 'pull'.
 
    """
 
    ex = get_hook_environment()
 

	
 
    action = 'pull'
 
    action_logger(ex.username, action, ex.repository, ex.ip, commit=True)
 
    # extension hook call
 
    callback = getattr(kallithea.EXTENSIONS, 'PULL_HOOK', None)
 
    if callable(callback):
 
        kw = {}
 
        kw.update(ex)
 
        callback(**kw)
 

	
 

	
 
def log_push_action(ui, repo, node, node_last, **kwargs):
 
    """
 
    Register that changes have been added to the repo - log the action *and* invalidate caches.
 
    Note: This hook is not only logging, but also the side effect invalidating
 
    caches! The function should perhaps be renamed.
 

	
 
    Called as Mercurial hook changegroup.kallithea_log_push_action .
 

	
 
    The pushed changesets is given by the revset 'node:node_last'.
 
    """
 
    revs = [ascii_str(repo[r].hex()) for r in mercurial.scmutil.revrange(repo, [b'%s:%s' % (node, node_last)])]
 
    process_pushed_raw_ids(revs)
 

	
 

	
 
def process_pushed_raw_ids(revs):
 
    """
 
    Register that changes have been added to the repo - log the action *and* invalidate caches.
 

	
 
    Called from Mercurial changegroup.kallithea_log_push_action calling hook log_push_action,
 
    Called from Mercurial changegroup.kallithea_push_action calling hook push_action,
 
    or from the Git post-receive hook calling handle_git_post_receive ...
 
    or from scm _handle_push.
 
    """
 
    ex = get_hook_environment()
 

	
 
    action = '%s:%s' % (ex.action, ','.join(revs))
 
    action_logger(ex.username, action, ex.repository, ex.ip, commit=True)
 

	
 
    from kallithea.model.scm import ScmModel
 
    ScmModel().mark_for_invalidation(ex.repository)
 

	
 
    # extension hook call
 
    callback = getattr(kallithea.EXTENSIONS, 'PUSH_HOOK', None)
 
    if callable(callback):
 
        kw = {'pushed_revs': revs}
 
        kw.update(ex)
 
        callback(**kw)
 

	
 

	
 
def log_create_repository(repository_dict, created_by, **kwargs):
 
    """
 
    Post create repository Hook.
 

	
 
    :param repository: dict dump of repository object
 
    :param created_by: username who created repository
 

	
 
    available keys of repository_dict:
 

	
 
     'repo_type',
 
     'description',
 
     'private',
 
     'created_on',
 
     'enable_downloads',
 
     'repo_id',
 
     'owner_id',
 
     'enable_statistics',
 
     'clone_uri',
 
     'fork_id',
 
     'group_id',
 
     'repo_name'
 

	
 
    """
 
    callback = getattr(kallithea.EXTENSIONS, 'CREATE_REPO_HOOK', None)
 
    if callable(callback):
 
        kw = {}
 
        kw.update(repository_dict)
 
        kw.update({'created_by': created_by})
 
        kw.update(kwargs)
 
        callback(**kw)
 

	
 

	
 
def check_allowed_create_user(user_dict, created_by, **kwargs):
 
    # pre create hooks
 
    callback = getattr(kallithea.EXTENSIONS, 'PRE_CREATE_USER_HOOK', None)
 
    if callable(callback):
 
        allowed, reason = callback(created_by=created_by, **user_dict)
 
        if not allowed:
 
            raise UserCreationError(reason)
 

	
 

	
 
def log_create_user(user_dict, created_by, **kwargs):
 
    """
 
    Post create user Hook.
 

	
 
    :param user_dict: dict dump of user object
 

	
 
    available keys for user_dict:
 

	
 
     'username',
 
     'full_name_or_username',
 
     'full_contact',
 
     'user_id',
 
     'name',
 
     'firstname',
 
     'short_contact',
 
     'admin',
 
     'lastname',
 
     'ip_addresses',
 
     'ldap_dn',
 
     'email',
 
     'api_key',
 
     'last_login',
 
     'full_name',
 
     'active',
 
     'password',
 
     'emails',
 

	
 
    """
 
    callback = getattr(kallithea.EXTENSIONS, 'CREATE_USER_HOOK', None)
 
    if callable(callback):
 
        callback(created_by=created_by, **user_dict)
 

	
 

	
 
def log_create_pullrequest(pullrequest_dict, created_by, **kwargs):
 
    """
 
    Post create pull request hook.
 

	
 
    :param pullrequest_dict: dict dump of pull request object
 
    """
 
    callback = getattr(kallithea.EXTENSIONS, 'CREATE_PULLREQUEST_HOOK', None)
 
    if callable(callback):
 
        return callback(created_by=created_by, **pullrequest_dict)
 

	
 
    return 0
 

	
 
def log_delete_repository(repository_dict, deleted_by, **kwargs):
 
    """
 
    Post delete repository Hook.
 

	
 
    :param repository: dict dump of repository object
 
    :param deleted_by: username who deleted the repository
 

	
 
    available keys of repository_dict:
 

	
 
     'repo_type',
 
     'description',
 
     'private',
 
     'created_on',
 
     'enable_downloads',
 
     'repo_id',
 
     'owner_id',
 
     'enable_statistics',
 
     'clone_uri',
 
     'fork_id',
 
     'group_id',
 
     'repo_name'
 

	
 
    """
 
    callback = getattr(kallithea.EXTENSIONS, 'DELETE_REPO_HOOK', None)
 
    if callable(callback):
 
        kw = {}
 
        kw.update(repository_dict)
 
        kw.update({'deleted_by': deleted_by,
 
                   'deleted_on': time.time()})
 
        kw.update(kwargs)
 
        callback(**kw)
 

	
 

	
 
def log_delete_user(user_dict, deleted_by, **kwargs):
 
    """
 
    Post delete user Hook.
 

	
 
    :param user_dict: dict dump of user object
 

	
 
    available keys for user_dict:
 

	
 
     'username',
 
     'full_name_or_username',
 
     'full_contact',
 
     'user_id',
 
     'name',
 
     'firstname',
 
     'short_contact',
 
     'admin',
 
     'lastname',
 
     'ip_addresses',
 
     'ldap_dn',
 
     'email',
 
     'api_key',
 
     'last_login',
 
     'full_name',
 
     'active',
 
     'password',
 
     'emails',
 

	
 
    """
 
    callback = getattr(kallithea.EXTENSIONS, 'DELETE_USER_HOOK', None)
 
    if callable(callback):
 
        callback(deleted_by=deleted_by, **user_dict)
 

	
 

	
 
def _hook_environment(repo_path):
 
    """
 
    Create a light-weight environment for stand-alone scripts and return an UI and the
 
    db repository.
 

	
 
    Git hooks are executed as subprocess of Git while Kallithea is waiting, and
 
    they thus need enough info to be able to create an app environment and
 
    connect to the database.
 
    """
 
    import kallithea.config.application
 

	
 
    extras = get_hook_environment()
 

	
 
    path_to_ini_file = extras['config']
 
    config = paste.deploy.appconfig('config:' + path_to_ini_file)
 
    #logging.config.fileConfig(ini_file_path) # Note: we are in a different process - don't use configured logging
 
    kallithea.config.application.make_app(config.global_conf, **config.local_conf)
 

	
 
    # fix if it's not a bare repo
 
    if repo_path.endswith(os.sep + '.git'):
 
        repo_path = repo_path[:-5]
 

	
 
    repo = db.Repository.get_by_full_path(repo_path)
 
    if not repo:
 
        raise OSError('Repository %s not found in database' % repo_path)
 

	
 
    baseui = make_ui()
 
    return baseui, repo
 

	
 

	
 
def handle_git_pre_receive(repo_path, git_stdin_lines):
 
    """Called from Git pre-receive hook.
 
    The returned value is used as hook exit code and must be 0.
 
    """
 
    # Currently unused. TODO: remove?
 
    return 0
 

	
 

	
 
def handle_git_post_receive(repo_path, git_stdin_lines):
 
    """Called from Git post-receive hook.
 
    The returned value is used as hook exit code and must be 0.
 
    """
 
    try:
 
        baseui, repo = _hook_environment(repo_path)
 
    except HookEnvironmentError as e:
 
        sys.stderr.write("Skipping Kallithea Git post-receive hook %r.\nGit was apparently not invoked by Kallithea: %s\n" % (sys.argv[0], e))
 
        return 0
 

	
 
    # the post push hook should never use the cached instance
 
    scm_repo = repo.scm_instance_no_cache()
 

	
 
    rev_data = []
 
    for l in git_stdin_lines:
 
        old_rev, new_rev, ref = l.strip().split(' ')
 
        _ref_data = ref.split('/')
 
        if _ref_data[1] in ['tags', 'heads']:
 
            rev_data.append({'old_rev': old_rev,
 
                             'new_rev': new_rev,
 
                             'ref': ref,
 
                             'type': _ref_data[1],
 
                             'name': '/'.join(_ref_data[2:])})
 

	
 
    git_revs = []
 
    for push_ref in rev_data:
 
        _type = push_ref['type']
 
        if _type == 'heads':
 
            if push_ref['old_rev'] == EmptyChangeset().raw_id:
 
                # update the symbolic ref if we push new repo
 
                if scm_repo.is_empty():
 
                    scm_repo._repo.refs.set_symbolic_ref(
 
                        b'HEAD',
 
                        b'refs/heads/%s' % safe_bytes(push_ref['name']))
 

	
 
                # build exclude list without the ref
 
                cmd = ['for-each-ref', '--format=%(refname)', 'refs/heads/*']
 
                stdout = scm_repo.run_git_command(cmd)
 
                ref = push_ref['ref']
 
                heads = [head for head in stdout.splitlines() if head != ref]
 
                # now list the git revs while excluding from the list
 
                cmd = ['log', push_ref['new_rev'], '--reverse', '--pretty=format:%H']
 
                cmd.append('--not')
 
                cmd.extend(heads) # empty list is ok
 
                stdout = scm_repo.run_git_command(cmd)
 
                git_revs += stdout.splitlines()
 

	
 
            elif push_ref['new_rev'] == EmptyChangeset().raw_id:
 
                # delete branch case
 
                git_revs += ['delete_branch=>%s' % push_ref['name']]
 
            else:
 
                cmd = ['log', '%(old_rev)s..%(new_rev)s' % push_ref,
 
                       '--reverse', '--pretty=format:%H']
 
                stdout = scm_repo.run_git_command(cmd)
 
                git_revs += stdout.splitlines()
 

	
 
        elif _type == 'tags':
 
            git_revs += ['tag=>%s' % push_ref['name']]
 

	
 
    process_pushed_raw_ids(git_revs)
 

	
 
    return 0
 

	
 

	
 
# Almost exactly like Mercurial contrib/hg-ssh:
 
def rejectpush(ui, **kwargs):
 
    """Mercurial hook to be installed as pretxnopen and prepushkey for read-only repos.
 
    Return value 1 will make the hook fail and reject the push.
 
    """
 
    ex = get_hook_environment()
 
    ui.warn(safe_bytes("Push access to %r denied\n" % ex.repository))
 
    return 1
kallithea/lib/utils.py
Show inline comments
 
@@ -240,196 +240,196 @@ def is_valid_repo_uri(repo_type, url, ui
 
            raise InvalidCloneUriException('URI %s not allowed' % (url,))
 

	
 
    elif repo_type == 'git':
 
        if url.startswith('http') or url.startswith('git'):
 
            # initially check if it's at least the proper URL
 
            # or does it pass basic auth
 
            try:
 
                GitRepository._check_url(url)
 
            except urllib.error.URLError as e:
 
                raise InvalidCloneUriException('URI %s URLError: %s' % (url, e))
 
        elif url.startswith('hg+http'):
 
            raise InvalidCloneUriException('URI type %s not implemented' % (url,))
 
        else:
 
            raise InvalidCloneUriException('URI %s not allowed' % (url))
 

	
 

	
 
def is_valid_repo(repo_name, base_path, scm=None):
 
    """
 
    Returns True if given path is a valid repository False otherwise.
 
    If scm param is given also compare if given scm is the same as expected
 
    from scm parameter
 

	
 
    :param repo_name:
 
    :param base_path:
 
    :param scm:
 

	
 
    :return True: if given path is a valid repository
 
    """
 
    # TODO: paranoid security checks?
 
    full_path = os.path.join(base_path, repo_name)
 

	
 
    try:
 
        scm_ = get_scm(full_path)
 
        if scm:
 
            return scm_[0] == scm
 
        return True
 
    except VCSError:
 
        return False
 

	
 

	
 
def is_valid_repo_group(repo_group_name, base_path, skip_path_check=False):
 
    """
 
    Returns True if given path is a repository group False otherwise
 

	
 
    :param repo_name:
 
    :param base_path:
 
    """
 
    full_path = os.path.join(base_path, repo_group_name)
 

	
 
    # check if it's not a repo
 
    if is_valid_repo(repo_group_name, base_path):
 
        return False
 

	
 
    try:
 
        # we need to check bare git repos at higher level
 
        # since we might match branches/hooks/info/objects or possible
 
        # other things inside bare git repo
 
        get_scm(os.path.dirname(full_path))
 
        return False
 
    except VCSError:
 
        pass
 

	
 
    # check if it's a valid path
 
    if skip_path_check or os.path.isdir(full_path):
 
        return True
 

	
 
    return False
 

	
 

	
 
def make_ui(repo_path=None):
 
    """
 
    Create an Mercurial 'ui' object based on database Ui settings, possibly
 
    augmenting with content from a hgrc file.
 
    """
 
    baseui = mercurial.ui.ui()
 

	
 
    # clean the baseui object
 
    baseui._ocfg = mercurial.config.config()
 
    baseui._ucfg = mercurial.config.config()
 
    baseui._tcfg = mercurial.config.config()
 

	
 
    sa = meta.Session()
 
    for ui_ in sa.query(db.Ui).order_by(db.Ui.ui_section, db.Ui.ui_key):
 
        if ui_.ui_active:
 
            log.debug('config from db: [%s] %s=%r', ui_.ui_section,
 
                      ui_.ui_key, ui_.ui_value)
 
            baseui.setconfig(ascii_bytes(ui_.ui_section), ascii_bytes(ui_.ui_key),
 
                             b'' if ui_.ui_value is None else safe_bytes(ui_.ui_value))
 

	
 
    # force set push_ssl requirement to False, Kallithea handles that
 
    baseui.setconfig(b'web', b'push_ssl', False)
 
    baseui.setconfig(b'web', b'allow_push', b'*')
 
    # prevent interactive questions for ssh password / passphrase
 
    ssh = baseui.config(b'ui', b'ssh', default=b'ssh')
 
    baseui.setconfig(b'ui', b'ssh', b'%s -oBatchMode=yes -oIdentitiesOnly=yes' % ssh)
 
    # push / pull hooks
 
    baseui.setconfig(b'hooks', b'changegroup.kallithea_log_push_action', b'python:kallithea.lib.hooks.log_push_action')
 
    baseui.setconfig(b'hooks', b'outgoing.kallithea_log_pull_action', b'python:kallithea.lib.hooks.log_pull_action')
 
    baseui.setconfig(b'hooks', b'changegroup.kallithea_push_action', b'python:kallithea.bin.vcs_hooks.push_action')
 
    baseui.setconfig(b'hooks', b'outgoing.kallithea_pull_action', b'python:kallithea.bin.vcs_hooks.pull_action')
 
    if baseui.config(b'hooks', ascii_bytes(db.Ui.HOOK_REPO_SIZE)):  # ignore actual value
 
        baseui.setconfig(b'hooks', ascii_bytes(db.Ui.HOOK_REPO_SIZE), b'python:kallithea.lib.hooks.repo_size')
 
        baseui.setconfig(b'hooks', ascii_bytes(db.Ui.HOOK_REPO_SIZE), b'python:kallithea.bin.vcs_hooks.repo_size')
 

	
 
    if repo_path is not None:
 
        # Note: MercurialRepository / mercurial.localrepo.instance will do this too, so it will always be possible to override db settings or what is hardcoded above
 
        baseui.readconfig(repo_path)
 

	
 
    assert baseui.plain()  # set by hgcompat.monkey_do (invoked from import of vcs.backends.hg) to minimize potential impact of loading config files
 
    return baseui
 

	
 

	
 
def set_app_settings(config):
 
    """
 
    Updates app config with new settings from database
 

	
 
    :param config:
 
    """
 
    settings = db.Setting.get_app_settings()
 
    for k, v in settings.items():
 
        config[k] = v
 
    config['base_path'] = db.Ui.get_repos_location()
 

	
 

	
 
def set_vcs_config(config):
 
    """
 
    Patch VCS config with some Kallithea specific stuff
 

	
 
    :param config: kallithea.CONFIG
 
    """
 
    settings.BACKENDS = {
 
        'hg': 'kallithea.lib.vcs.backends.hg.MercurialRepository',
 
        'git': 'kallithea.lib.vcs.backends.git.GitRepository',
 
    }
 

	
 
    settings.GIT_EXECUTABLE_PATH = config.get('git_path', 'git')
 
    settings.GIT_REV_FILTER = config.get('git_rev_filter', '--all').strip()
 
    settings.DEFAULT_ENCODINGS = aslist(config.get('default_encoding',
 
                                                        'utf-8'), sep=',')
 

	
 

	
 
def set_indexer_config(config):
 
    """
 
    Update Whoosh index mapping
 

	
 
    :param config: kallithea.CONFIG
 
    """
 
    log.debug('adding extra into INDEX_EXTENSIONS')
 
    kallithea.lib.conf.INDEX_EXTENSIONS.extend(re.split(r'\s+', config.get('index.extensions', '')))
 

	
 
    log.debug('adding extra into INDEX_FILENAMES')
 
    kallithea.lib.conf.INDEX_FILENAMES.extend(re.split(r'\s+', config.get('index.filenames', '')))
 

	
 

	
 
def map_groups(path):
 
    """
 
    Given a full path to a repository, create all nested groups that this
 
    repo is inside. This function creates parent-child relationships between
 
    groups and creates default perms for all new groups.
 

	
 
    :param paths: full path to repository
 
    """
 
    from kallithea.model.repo_group import RepoGroupModel
 
    sa = meta.Session()
 
    groups = path.split(kallithea.URL_SEP)
 
    parent = None
 
    group = None
 

	
 
    # last element is repo in nested groups structure
 
    groups = groups[:-1]
 
    rgm = RepoGroupModel()
 
    owner = db.User.get_first_admin()
 
    for lvl, group_name in enumerate(groups):
 
        group_name = '/'.join(groups[:lvl] + [group_name])
 
        group = db.RepoGroup.get_by_group_name(group_name)
 
        desc = '%s group' % group_name
 

	
 
        # skip folders that are now removed repos
 
        if REMOVED_REPO_PAT.match(group_name):
 
            break
 

	
 
        if group is None:
 
            log.debug('creating group level: %s group_name: %s',
 
                      lvl, group_name)
 
            group = db.RepoGroup(group_name, parent)
 
            group.group_description = desc
 
            group.owner = owner
 
            sa.add(group)
 
            rgm._create_default_perms(group)
 
            sa.flush()
 

	
 
        parent = group
 
    return group
 

	
 

	
 
def repo2db_mapper(initial_repo_dict, remove_obsolete=False,
 
                   install_git_hooks=False, user=None, overwrite_git_hooks=False):
 
    """
 
    maps all repos given in initial_repo_dict, non existing repositories
kallithea/lib/vcs/ssh/hg.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import logging
 

	
 
import mercurial.hg
 
import mercurial.wireprotoserver
 

	
 
from kallithea.lib.utils import make_ui
 
from kallithea.lib.vcs.ssh import base
 
from kallithea.lib.vcs.utils import safe_bytes
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class MercurialSshHandler(base.BaseSshHandler):
 
    vcs_type = 'hg'
 

	
 
    @classmethod
 
    def make(cls, ssh_command_parts):
 
        r"""
 
        >>> import shlex
 

	
 
        >>> MercurialSshHandler.make(shlex.split('hg -R "foo bar" serve --stdio')).repo_name
 
        'foo bar'
 
        >>> MercurialSshHandler.make(shlex.split(' hg -R blåbærgrød serve --stdio ')).repo_name
 
        'bl\xe5b\xe6rgr\xf8d'
 
        >>> MercurialSshHandler.make(shlex.split('''hg -R 'foo"bar' serve --stdio''')).repo_name
 
        'foo"bar'
 

	
 
        >>> MercurialSshHandler.make(shlex.split('/bin/hg -R "foo" serve --stdio'))
 
        >>> MercurialSshHandler.make(shlex.split('''hg -R "foo"bar" serve --stdio''')) # ssh-serve will report: Error parsing SSH command "...": invalid syntax
 
        Traceback (most recent call last):
 
        ValueError: No closing quotation
 
        >>> MercurialSshHandler.make(shlex.split('git-upload-pack "/foo"')) # not handled here
 
        """
 
        if ssh_command_parts[:2] == ['hg', '-R'] and ssh_command_parts[3:] == ['serve', '--stdio']:
 
            return cls(ssh_command_parts[2])
 

	
 
        return None
 

	
 
    def _serve(self):
 
        # Note: we want a repo with config based on .hg/hgrc and can thus not use self.db_repo.scm_instance._repo.ui
 
        baseui = make_ui(repo_path=self.db_repo.repo_full_path)
 
        if not self.allow_push:
 
            baseui.setconfig(b'hooks', b'pretxnopen._ssh_reject', b'python:kallithea.lib.hooks.rejectpush')
 
            baseui.setconfig(b'hooks', b'prepushkey._ssh_reject', b'python:kallithea.lib.hooks.rejectpush')
 
            baseui.setconfig(b'hooks', b'pretxnopen._ssh_reject', b'python:kallithea.bin.vcs_hooks.rejectpush')
 
            baseui.setconfig(b'hooks', b'prepushkey._ssh_reject', b'python:kallithea.bin.vcs_hooks.rejectpush')
 

	
 
        repo = mercurial.hg.repository(baseui, safe_bytes(self.db_repo.repo_full_path))
 
        log.debug("Starting Mercurial sshserver for %s", self.db_repo.repo_full_path)
 
        mercurial.wireprotoserver.sshserver(baseui, repo).serve_forever()
kallithea/lib/vcs/utils/helpers.py
Show inline comments
 
@@ -10,192 +10,214 @@ import time
 
import urllib.request
 

	
 
import mercurial.url
 
from pygments import highlight
 
from pygments.formatters import TerminalFormatter
 
from pygments.lexers import ClassNotFound, guess_lexer_for_filename
 

	
 
from kallithea.lib.vcs import backends
 
from kallithea.lib.vcs.exceptions import RepositoryError, VCSError
 
from kallithea.lib.vcs.utils import safe_str
 
from kallithea.lib.vcs.utils.paths import abspath
 

	
 

	
 
ALIASES = ['hg', 'git']
 

	
 

	
 
def get_scm(path, search_up=False, explicit_alias=None):
 
    """
 
    Returns one of alias from ``ALIASES`` (in order of precedence same as
 
    shortcuts given in ``ALIASES``) and top working dir path for the given
 
    argument. If no scm-specific directory is found or more than one scm is
 
    found at that directory, ``VCSError`` is raised.
 

	
 
    :param search_up: if set to ``True``, this function would try to
 
      move up to parent directory every time no scm is recognized for the
 
      currently checked path. Default: ``False``.
 
    :param explicit_alias: can be one of available backend aliases, when given
 
      it will return given explicit alias in repositories under more than one
 
      version control, if explicit_alias is different than found it will raise
 
      VCSError
 
    """
 
    if not os.path.isdir(path):
 
        raise VCSError("Given path %s is not a directory" % path)
 

	
 
    while True:
 
        found_scms = [(scm, path) for scm in get_scms_for_path(path)]
 
        if found_scms or not search_up:
 
            break
 
        newpath = abspath(path, '..')
 
        if newpath == path:
 
            break
 
        path = newpath
 

	
 
    if len(found_scms) > 1:
 
        for scm in found_scms:
 
            if scm[0] == explicit_alias:
 
                return scm
 
        raise VCSError('More than one [%s] scm found at given path %s'
 
                       % (', '.join((x[0] for x in found_scms)), path))
 

	
 
    if len(found_scms) == 0:
 
        raise VCSError('No scm found at given path %s' % path)
 

	
 
    return found_scms[0]
 

	
 

	
 
def get_scms_for_path(path):
 
    """
 
    Returns all scm's found at the given path. If no scm is recognized
 
    - empty list is returned.
 

	
 
    :param path: path to directory which should be checked. May be callable.
 

	
 
    :raises VCSError: if given ``path`` is not a directory
 
    """
 
    if hasattr(path, '__call__'):
 
        path = path()
 
    if not os.path.isdir(path):
 
        raise VCSError("Given path %r is not a directory" % path)
 

	
 
    result = []
 
    for key in ALIASES:
 
        # find .hg / .git
 
        dirname = os.path.join(path, '.' + key)
 
        if os.path.isdir(dirname):
 
            result.append(key)
 
            continue
 
        # find rm__.hg / rm__.git too - left overs from old method for deleting
 
        dirname = os.path.join(path, 'rm__.' + key)
 
        if os.path.isdir(dirname):
 
            return result
 
        # We still need to check if it's not bare repository as
 
        # bare repos don't have working directories
 
        try:
 
            backends.get_backend(key)(path)
 
            result.append(key)
 
            continue
 
        except RepositoryError:
 
            # Wrong backend
 
            pass
 
        except VCSError:
 
            # No backend at all
 
            pass
 
    return result
 

	
 

	
 
def get_scm_size(alias, root_path):
 
    if not alias.startswith('.'):
 
        alias += '.'
 

	
 
    size_scm, size_root = 0, 0
 
    for path, dirs, files in os.walk(root_path):
 
        if path.find(alias) != -1:
 
            for f in files:
 
                try:
 
                    size_scm += os.path.getsize(os.path.join(path, f))
 
                except OSError:
 
                    pass
 
        else:
 
            for f in files:
 
                try:
 
                    size_root += os.path.getsize(os.path.join(path, f))
 
                except OSError:
 
                    pass
 

	
 
    return size_scm, size_root
 

	
 

	
 
def get_highlighted_code(name, code, type='terminal'):
 
    """
 
    If pygments are available on the system
 
    then returned output is colored. Otherwise
 
    unchanged content is returned.
 
    """
 
    try:
 
        lexer = guess_lexer_for_filename(name, code)
 
        formatter = TerminalFormatter()
 
        content = highlight(code, lexer, formatter)
 
    except ClassNotFound:
 
        logging.debug("Couldn't guess Lexer, will not use pygments.")
 
        content = code
 
    return content
 

	
 

	
 
def parse_changesets(text):
 
    """
 
    Returns dictionary with *start*, *main* and *end* ids.
 

	
 
    Examples::
 

	
 
        >>> parse_changesets('aaabbb')
 
        {'start': None, 'main': 'aaabbb', 'end': None}
 
        >>> parse_changesets('aaabbb..cccddd')
 
        {'start': 'aaabbb', 'end': 'cccddd', 'main': None}
 

	
 
    """
 
    text = text.strip()
 
    CID_RE = r'[a-zA-Z0-9]+'
 
    if '..' not in text:
 
        m = re.match(r'^(?P<cid>%s)$' % CID_RE, text)
 
        if m:
 
            return {
 
                'start': None,
 
                'main': text,
 
                'end': None,
 
            }
 
    else:
 
        RE = r'^(?P<start>%s)?\.{2,3}(?P<end>%s)?$' % (CID_RE, CID_RE)
 
        m = re.match(RE, text)
 
        if m:
 
            result = m.groupdict()
 
            result['main'] = None
 
            return result
 
    raise ValueError("IDs not recognized")
 

	
 

	
 
def parse_datetime(text):
 
    """
 
    Parses given text and returns ``datetime.datetime`` instance or raises
 
    ``ValueError``.
 

	
 
    :param text: string of desired date/datetime or something more verbose,
 
      like *yesterday*, *2weeks 3days*, etc.
 
    """
 

	
 
    text = text.strip().lower()
 

	
 
    INPUT_FORMATS = (
 
        '%Y-%m-%d %H:%M:%S',
 
        '%Y-%m-%d %H:%M',
 
        '%Y-%m-%d',
 
        '%m/%d/%Y %H:%M:%S',
 
        '%m/%d/%Y %H:%M',
 
        '%m/%d/%Y',
 
        '%m/%d/%y %H:%M:%S',
 
        '%m/%d/%y %H:%M',
 
        '%m/%d/%y',
 
    )
 
    for format in INPUT_FORMATS:
 
        try:
 
            return datetime.datetime(*time.strptime(text, format)[:6])
 
        except ValueError:
 
            pass
 

	
 
    # Try descriptive texts
 
    if text == 'tomorrow':
 
        future = datetime.datetime.now() + datetime.timedelta(days=1)
 
        args = future.timetuple()[:3] + (23, 59, 59)
 
        return datetime.datetime(*args)
 
    elif text == 'today':
 
        return datetime.datetime(*datetime.datetime.today().timetuple()[:3])
 
    elif text == 'now':
 
        return datetime.datetime.now()
 
    elif text == 'yesterday':
 
        past = datetime.datetime.now() - datetime.timedelta(days=1)
 
        return datetime.datetime(*past.timetuple()[:3])
 
    else:
 
        days = 0
 
        matched = re.match(
 
            r'^((?P<weeks>\d+) ?w(eeks?)?)? ?((?P<days>\d+) ?d(ays?)?)?$', text)
 
        if matched:
 
            groupdict = matched.groupdict()
 
            if groupdict['days']:
 
                days += int(matched.groupdict()['days'])
kallithea/templates/py/git_post_receive_hook.py
Show inline comments
 
"""Kallithea Git hook
 

	
 
This hook is installed and maintained by Kallithea. It will be overwritten
 
by Kallithea - don't customize it manually!
 

	
 
When Kallithea invokes Git, the KALLITHEA_EXTRAS environment variable will
 
contain additional info like the Kallithea instance and user info that this
 
hook will use.
 
"""
 

	
 
import os
 
import sys
 

	
 
import kallithea.lib.hooks
 
import kallithea.bin.vcs_hooks
 

	
 

	
 
# Set output mode on windows to binary for stderr.
 
# This prevents python (or the windows console) from replacing \n with \r\n.
 
# Git doesn't display remote output lines that contain \r,
 
# and therefore without this modification git would display empty lines
 
# instead of the exception output.
 
if sys.platform == "win32":
 
    import msvcrt
 
    msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY)
 

	
 
KALLITHEA_HOOK_VER = '_TMPL_'
 
os.environ['KALLITHEA_HOOK_VER'] = KALLITHEA_HOOK_VER
 

	
 

	
 
def main():
 
    repo_path = os.path.abspath('.')
 
    git_stdin_lines = sys.stdin.readlines()
 
    sys.exit(kallithea.lib.hooks.handle_git_post_receive(repo_path, git_stdin_lines))
 
    sys.exit(kallithea.bin.vcs_hooks.post_receive(repo_path, git_stdin_lines))
 

	
 

	
 
if __name__ == '__main__':
 
    main()
kallithea/templates/py/git_pre_receive_hook.py
Show inline comments
 
"""Kallithea Git hook
 

	
 
This hook is installed and maintained by Kallithea. It will be overwritten
 
by Kallithea - don't customize it manually!
 

	
 
When Kallithea invokes Git, the KALLITHEA_EXTRAS environment variable will
 
contain additional info like the Kallithea instance and user info that this
 
hook will use.
 
"""
 

	
 
import os
 
import sys
 

	
 
import kallithea.lib.hooks
 
import kallithea.bin.vcs_hooks
 

	
 

	
 
# Set output mode on windows to binary for stderr.
 
# This prevents python (or the windows console) from replacing \n with \r\n.
 
# Git doesn't display remote output lines that contain \r,
 
# and therefore without this modification git would display empty lines
 
# instead of the exception output.
 
if sys.platform == "win32":
 
    import msvcrt
 
    msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY)
 

	
 
KALLITHEA_HOOK_VER = '_TMPL_'
 
os.environ['KALLITHEA_HOOK_VER'] = KALLITHEA_HOOK_VER
 

	
 

	
 
def main():
 
    repo_path = os.path.abspath('.')
 
    git_stdin_lines = sys.stdin.readlines()
 
    sys.exit(kallithea.lib.hooks.handle_git_pre_receive(repo_path, git_stdin_lines))
 
    sys.exit(kallithea.bin.vcs_hooks.pre_receive(repo_path, git_stdin_lines))
 

	
 

	
 
if __name__ == '__main__':
 
    main()
0 comments (0 inline, 0 general)