Changeset - 25c51511c8eb
[Not reviewed]
default
0 6 1
Mads Kiilerich (mads) - 5 years ago 2021-01-11 00:23:20
mads@kiilerich.com
hooks: put repo_size and update hooks in kallithea namespace

Keep things separate.

Include missing migration steps for 642847355a10.
7 files changed with 69 insertions and 12 deletions:
0 comments (0 inline, 0 general)
kallithea/alembic/versions/7ba0d2cad930_hooks_migrate_internal_hooks_to_.py
Show inline comments
 
new file 100644
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""hooks: migrate internal hooks to kallithea namespace
 

	
 
Revision ID: 7ba0d2cad930
 
Revises: f62826179f39
 
Create Date: 2021-01-11 00:10:13.576586
 

	
 
"""
 

	
 
# The following opaque hexadecimal identifiers ("revisions") are used
 
# by Alembic to track this migration script and its relations to others.
 
revision = '7ba0d2cad930'
 
down_revision = 'f62826179f39'
 
branch_labels = None
 
depends_on = None
 

	
 
from alembic import op
 
from sqlalchemy import MetaData, Table
 

	
 
from kallithea.model import db
 

	
 

	
 
meta = MetaData()
 

	
 

	
 
def upgrade():
 
    meta.bind = op.get_bind()
 
    ui = Table(db.Ui.__tablename__, meta, autoload=True)
 

	
 
    ui.update(values={
 
        'ui_key': 'changegroup.kallithea_update',
 
        'ui_value': 'python:',  # value in db isn't used
 
    }).where(ui.c.ui_key == 'changegroup.update').execute()
 
    ui.update(values={
 
        'ui_key': 'changegroup.kallithea_repo_size',
 
        'ui_value': 'python:',  # value in db isn't used
 
    }).where(ui.c.ui_key == 'changegroup.repo_size').execute()
 

	
 
    # 642847355a10 moved these hooks out of db - remove old entries
 
    ui.delete().where(ui.c.ui_key == 'changegroup.push_logger').execute()
 
    ui.delete().where(ui.c.ui_key == 'outgoing.pull_logger').execute()
 

	
 

	
 
def downgrade():
 
    pass
kallithea/bin/vcs_hooks.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.bin.vcs_hooks
 
~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Entry points for Kallithea hooking into Mercurial and Git.
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Aug 6, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import os
 
import sys
 

	
 
import mercurial.hg
 
import mercurial.scmutil
 
import paste.deploy
 

	
 
import kallithea
 
import kallithea.config.application
 
from kallithea.lib import hooks, webutils
 
from kallithea.lib.utils2 import HookEnvironmentError, ascii_str, get_hook_environment, safe_bytes, safe_str
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.utils.helpers import get_scm_size
 
from kallithea.model import db
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def repo_size(ui, repo, hooktype=None, **kwargs):
 
    """Show size of Mercurial repository.
 

	
 
    Called as Mercurial hook changegroup.repo_size after push.
 
    Called as Mercurial hook changegroup.kallithea_repo_size after push.
 
    """
 
    size_hg, size_root = get_scm_size('.hg', safe_str(repo.root))
 

	
 
    last_cs = repo[len(repo) - 1]
 

	
 
    msg = ('Repository size .hg: %s Checkout: %s Total: %s\n'
 
           'Last revision is now r%s:%s\n') % (
 
        webutils.format_byte_size(size_hg),
 
        webutils.format_byte_size(size_root),
 
        webutils.format_byte_size(size_hg + size_root),
 
        last_cs.rev(),
 
        ascii_str(last_cs.hex())[:12],
 
    )
 
    ui.status(safe_bytes(msg))
 

	
 

	
 
def update(ui, repo, hooktype=None, **kwargs):
 
    """Update repo after push. The equivalent to 'hg update' but using the same
 
    Mercurial as everything else.
 

	
 
    Called as Mercurial hook changegroup.update after push.
 
    Called as Mercurial hook changegroup.kallithea_update after push.
 
    """
 
    try:
 
        ui.pushbuffer(error=True, subproc=True)
 
        rev = brev = None
 
        mercurial.hg.updatetotally(ui, repo, rev, brev)
 
    finally:
 
        s = ui.popbuffer()  # usually just "x files updated, x files merged, x files removed, x files unresolved"
 
        log.info('%s update hook output: %s', safe_str(repo.root), safe_str(s).rstrip())
 

	
 

	
 
def pull_action(ui, repo, **kwargs):
 
    """Logs user pull action
 

	
 
    Called as Mercurial hook outgoing.kallithea_pull_action.
 
    """
 
    hooks.log_pull_action()
 

	
 

	
 
def push_action(ui, repo, node, node_last, **kwargs):
 
    """
 
    Register that changes have been added to the repo - log the action *and* invalidate caches.
 
    Note: This hook is not only logging, but also the side effect invalidating
 
    caches! The function should perhaps be renamed.
 

	
 
    Called as Mercurial hook changegroup.kallithea_push_action .
 

	
 
    The pushed changesets is given by the revset 'node:node_last'.
 
    """
 
    revs = [ascii_str(repo[r].hex()) for r in mercurial.scmutil.revrange(repo, [b'%s:%s' % (node, node_last)])]
 
    hooks.process_pushed_raw_ids(revs)
 

	
 

	
 
def _git_hook_environment(repo_path):
 
    """
 
    Create a light-weight environment for stand-alone scripts and return an UI and the
 
    db repository.
 

	
 
    Git hooks are executed as subprocess of Git while Kallithea is waiting, and
 
    they thus need enough info to be able to create an app environment and
 
    connect to the database.
 
    """
 
    extras = get_hook_environment()
 

	
 
    path_to_ini_file = extras['config']
 
    config = paste.deploy.appconfig('config:' + path_to_ini_file)
 
    #logging.config.fileConfig(ini_file_path) # Note: we are in a different process - don't use configured logging
 
    kallithea.config.application.make_app(config.global_conf, **config.local_conf)
 

	
 
    # fix if it's not a bare repo
 
    if repo_path.endswith(os.sep + '.git'):
 
        repo_path = repo_path[:-5]
 

	
 
    repo = db.Repository.get_by_full_path(repo_path)
 
    if not repo:
 
        raise OSError('Repository %s not found in database' % repo_path)
 

	
 
    return repo
 

	
 

	
 
def pre_receive(repo_path, git_stdin_lines):
 
    """Called from Git pre-receive hook.
 
    The returned value is used as hook exit code and must be 0.
 
    """
 
    # Currently unused. TODO: remove?
 
    return 0
 

	
 

	
 
def post_receive(repo_path, git_stdin_lines):
 
    """Called from Git post-receive hook.
 
    The returned value is used as hook exit code and must be 0.
 
    """
 
    try:
 
        repo = _git_hook_environment(repo_path)
 
    except HookEnvironmentError as e:
 
        sys.stderr.write("Skipping Kallithea Git post-receive hook %r.\nGit was apparently not invoked by Kallithea: %s\n" % (sys.argv[0], e))
 
        return 0
 

	
 
    # the post push hook should never use the cached instance
 
    scm_repo = repo.scm_instance_no_cache()
 

	
 
    rev_data = []
 
    for l in git_stdin_lines:
 
        old_rev, new_rev, ref = l.strip().split(' ')
 
        _ref_data = ref.split('/')
 
        if _ref_data[1] in ['tags', 'heads']:
 
            rev_data.append({'old_rev': old_rev,
 
                             'new_rev': new_rev,
 
                             'ref': ref,
 
                             'type': _ref_data[1],
 
                             'name': '/'.join(_ref_data[2:])})
 

	
 
    git_revs = []
 
    for push_ref in rev_data:
 
        _type = push_ref['type']
 
        if _type == 'heads':
 
            if push_ref['old_rev'] == EmptyChangeset().raw_id:
 
                # update the symbolic ref if we push new repo
 
                if scm_repo.is_empty():
 
                    scm_repo._repo.refs.set_symbolic_ref(
 
                        b'HEAD',
 
                        b'refs/heads/%s' % safe_bytes(push_ref['name']))
 

	
 
                # build exclude list without the ref
 
                cmd = ['for-each-ref', '--format=%(refname)', 'refs/heads/*']
 
                stdout = scm_repo.run_git_command(cmd)
 
                ref = push_ref['ref']
 
                heads = [head for head in stdout.splitlines() if head != ref]
 
                # now list the git revs while excluding from the list
 
                cmd = ['log', push_ref['new_rev'], '--reverse', '--pretty=format:%H']
 
                cmd.append('--not')
 
                cmd.extend(heads) # empty list is ok
 
                stdout = scm_repo.run_git_command(cmd)
 
                git_revs += stdout.splitlines()
 

	
 
            elif push_ref['new_rev'] == EmptyChangeset().raw_id:
 
                # delete branch case
 
                git_revs += ['delete_branch=>%s' % push_ref['name']]
 
            else:
 
                cmd = ['log', '%(old_rev)s..%(new_rev)s' % push_ref,
 
                       '--reverse', '--pretty=format:%H']
 
                stdout = scm_repo.run_git_command(cmd)
 
                git_revs += stdout.splitlines()
 

	
 
        elif _type == 'tags':
 
            git_revs += ['tag=>%s' % push_ref['name']]
 

	
 
    hooks.process_pushed_raw_ids(git_revs)
 

	
 
    return 0
 

	
 

	
 
# Almost exactly like Mercurial contrib/hg-ssh:
 
def rejectpush(ui, **kwargs):
 
    """Mercurial hook to be installed as pretxnopen and prepushkey for read-only repos.
 
    Return value 1 will make the hook fail and reject the push.
 
    """
 
    ex = get_hook_environment()
 
    ui.warn(safe_bytes("Push access to %r denied\n" % ex.repository))
 
    return 1
kallithea/controllers/admin/settings.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.admin.settings
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
settings controller for Kallithea admin
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jul 14, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 

	
 
import formencode
 
from formencode import htmlfill
 
from tg import config, request
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPFound
 

	
 
import kallithea
 
import kallithea.lib.indexers.daemon
 
from kallithea.controllers import base
 
from kallithea.lib import webutils
 
from kallithea.lib.auth import HasPermissionAnyDecorator, LoginRequired
 
from kallithea.lib.utils import repo2db_mapper, set_app_settings
 
from kallithea.lib.utils2 import safe_str
 
from kallithea.lib.vcs import VCSError
 
from kallithea.lib.webutils import url
 
from kallithea.model import db, meta, notification
 
from kallithea.model.forms import ApplicationSettingsForm, ApplicationUiSettingsForm, ApplicationVisualisationForm
 
from kallithea.model.notification import EmailNotificationModel
 
from kallithea.model.scm import ScmModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class SettingsController(base.BaseController):
 

	
 
    @LoginRequired(allow_default_user=True)
 
    def _before(self, *args, **kwargs):
 
        super(SettingsController, self)._before(*args, **kwargs)
 

	
 
    def _get_hg_ui_settings(self):
 
        ret = db.Ui.query().all()
 

	
 
        settings = {}
 
        for each in ret:
 
            k = each.ui_section + '_' + each.ui_key
 
            v = each.ui_value
 
            if k == 'paths_/':
 
                k = 'paths_root_path'
 

	
 
            k = k.replace('.', '_')
 

	
 
            if each.ui_section in ['hooks', 'extensions']:
 
                v = each.ui_active
 

	
 
            settings[k] = v
 
        return settings
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def settings_vcs(self):
 
        c.active = 'vcs'
 
        if request.POST:
 
            application_form = ApplicationUiSettingsForm()()
 
            try:
 
                form_result = application_form.to_python(dict(request.POST))
 
            except formencode.Invalid as errors:
 
                return htmlfill.render(
 
                     base.render('admin/settings/settings.html'),
 
                     defaults=errors.value,
 
                     errors=errors.error_dict or {},
 
                     prefix_error=False,
 
                     encoding="UTF-8",
 
                     force_defaults=False)
 

	
 
            try:
 
                if c.visual.allow_repo_location_change:
 
                    sett = db.Ui.get_by_key('paths', '/')
 
                    sett.ui_value = form_result['paths_root_path']
 

	
 
                # HOOKS
 
                sett = db.Ui.get_by_key('hooks', db.Ui.HOOK_UPDATE)
 
                sett.ui_active = form_result['hooks_changegroup_update']
 
                sett.ui_active = form_result['hooks_changegroup_kallithea_update']
 

	
 
                sett = db.Ui.get_by_key('hooks', db.Ui.HOOK_REPO_SIZE)
 
                sett.ui_active = form_result['hooks_changegroup_repo_size']
 
                sett.ui_active = form_result['hooks_changegroup_kallithea_repo_size']
 

	
 
                ## EXTENSIONS
 
                sett = db.Ui.get_or_create('extensions', 'largefiles')
 
                sett.ui_active = form_result['extensions_largefiles']
 

	
 
#                sett = db.Ui.get_or_create('extensions', 'hggit')
 
#                sett.ui_active = form_result['extensions_hggit']
 

	
 
                meta.Session().commit()
 

	
 
                webutils.flash(_('Updated VCS settings'), category='success')
 

	
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                webutils.flash(_('Error occurred while updating '
 
                          'application settings'), category='error')
 

	
 
        defaults = db.Setting.get_app_settings()
 
        defaults.update(self._get_hg_ui_settings())
 

	
 
        return htmlfill.render(
 
            base.render('admin/settings/settings.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def settings_mapping(self):
 
        c.active = 'mapping'
 
        if request.POST:
 
            rm_obsolete = request.POST.get('destroy', False)
 
            install_git_hooks = request.POST.get('hooks', False)
 
            overwrite_git_hooks = request.POST.get('hooks_overwrite', False)
 
            invalidate_cache = request.POST.get('invalidate', False)
 
            log.debug('rescanning repo location with destroy obsolete=%s, '
 
                      'install git hooks=%s and '
 
                      'overwrite git hooks=%s' % (rm_obsolete, install_git_hooks, overwrite_git_hooks))
 

	
 
            filesystem_repos = ScmModel().repo_scan()
 
            added, removed = repo2db_mapper(filesystem_repos, rm_obsolete,
 
                                            install_git_hooks=install_git_hooks,
 
                                            user=request.authuser.username,
 
                                            overwrite_git_hooks=overwrite_git_hooks)
 
            added_msg = webutils.HTML(', ').join(
 
                webutils.link_to(safe_str(repo_name), webutils.url('summary_home', repo_name=repo_name)) for repo_name in added
 
            ) or '-'
 
            removed_msg = webutils.HTML(', ').join(
 
                safe_str(repo_name) for repo_name in removed
 
            ) or '-'
 
            webutils.flash(webutils.HTML(_('Repositories successfully rescanned. Added: %s. Removed: %s.')) %
 
                    (added_msg, removed_msg), category='success')
 

	
 
            if invalidate_cache:
 
                log.debug('invalidating all repositories cache')
 
                i = 0
 
                for repo in db.Repository.query():
 
                    try:
 
                        ScmModel().mark_for_invalidation(repo.repo_name)
 
                        i += 1
 
                    except VCSError as e:
 
                        log.warning('VCS error invalidating %s: %s', repo.repo_name, e)
 
                webutils.flash(_('Invalidated %s repositories') % i, category='success')
 

	
 
            raise HTTPFound(location=url('admin_settings_mapping'))
 

	
 
        defaults = db.Setting.get_app_settings()
 
        defaults.update(self._get_hg_ui_settings())
 

	
 
        return htmlfill.render(
 
            base.render('admin/settings/settings.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def settings_global(self):
 
        c.active = 'global'
 
        if request.POST:
 
            application_form = ApplicationSettingsForm()()
 
            try:
 
                form_result = application_form.to_python(dict(request.POST))
 
            except formencode.Invalid as errors:
 
                return htmlfill.render(
 
                    base.render('admin/settings/settings.html'),
 
                    defaults=errors.value,
 
                    errors=errors.error_dict or {},
 
                    prefix_error=False,
 
                    encoding="UTF-8",
 
                    force_defaults=False)
 

	
 
            try:
 
                for setting in (
 
                    'title',
 
                    'realm',
 
                    'ga_code',
 
                    'captcha_public_key',
 
                    'captcha_private_key',
 
                ):
 
                    db.Setting.create_or_update(setting, form_result[setting])
 

	
 
                meta.Session().commit()
 
                set_app_settings(config)
 
                webutils.flash(_('Updated application settings'), category='success')
 

	
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                webutils.flash(_('Error occurred while updating '
 
                          'application settings'),
 
                          category='error')
 

	
 
            raise HTTPFound(location=url('admin_settings_global'))
 

	
 
        defaults = db.Setting.get_app_settings()
 
        defaults.update(self._get_hg_ui_settings())
 

	
 
        return htmlfill.render(
 
            base.render('admin/settings/settings.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def settings_visual(self):
 
        c.active = 'visual'
 
        if request.POST:
 
            application_form = ApplicationVisualisationForm()()
 
            try:
 
                form_result = application_form.to_python(dict(request.POST))
 
            except formencode.Invalid as errors:
 
                return htmlfill.render(
 
                    base.render('admin/settings/settings.html'),
 
                    defaults=errors.value,
 
                    errors=errors.error_dict or {},
 
                    prefix_error=False,
 
                    encoding="UTF-8",
 
                    force_defaults=False)
 

	
 
            try:
 
                settings = [
 
                    ('show_public_icon', 'show_public_icon', 'bool'),
 
                    ('show_private_icon', 'show_private_icon', 'bool'),
 
                    ('stylify_metalabels', 'stylify_metalabels', 'bool'),
 
                    ('repository_fields', 'repository_fields', 'bool'),
 
                    ('dashboard_items', 'dashboard_items', 'int'),
 
                    ('admin_grid_items', 'admin_grid_items', 'int'),
 
                    ('show_version', 'show_version', 'bool'),
 
                    ('use_gravatar', 'use_gravatar', 'bool'),
 
                    ('gravatar_url', 'gravatar_url', 'unicode'),
 
                    ('clone_uri_tmpl', 'clone_uri_tmpl', 'unicode'),
 
                    ('clone_ssh_tmpl', 'clone_ssh_tmpl', 'unicode'),
 
                ]
 
                for setting, form_key, type_ in settings:
 
                    db.Setting.create_or_update(setting, form_result[form_key], type_)
 

	
 
                meta.Session().commit()
 
                set_app_settings(config)
 
                webutils.flash(_('Updated visualisation settings'),
 
                        category='success')
 

	
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                webutils.flash(_('Error occurred during updating '
 
                          'visualisation settings'),
 
                        category='error')
 

	
 
            raise HTTPFound(location=url('admin_settings_visual'))
 

	
 
        defaults = db.Setting.get_app_settings()
 
        defaults.update(self._get_hg_ui_settings())
 

	
 
        return htmlfill.render(
 
            base.render('admin/settings/settings.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def settings_email(self):
 
        c.active = 'email'
 
        if request.POST:
 
            test_email = request.POST.get('test_email')
 
            test_email_subj = 'Kallithea test email'
 
            test_body = ('Kallithea Email test, '
 
                               'Kallithea version: %s' % c.kallithea_version)
 
            if not test_email:
 
                webutils.flash(_('Please enter email address'), category='error')
 
                raise HTTPFound(location=url('admin_settings_email'))
 

	
 
            test_email_txt_body = EmailNotificationModel() \
 
                .get_email_tmpl(EmailNotificationModel.TYPE_DEFAULT,
 
                                'txt', body=test_body)
 
            test_email_html_body = EmailNotificationModel() \
kallithea/model/db.py
Show inline comments
 
@@ -141,386 +141,386 @@ class BaseDbModel(object):
 

	
 
        raise Exception(
 
            'given object must be int, long or Instance of %s '
 
            'got %s, no callback provided' % (cls, type(value))
 
        )
 

	
 
    @classmethod
 
    def get_or_404(cls, id_):
 
        try:
 
            id_ = int(id_)
 
        except (TypeError, ValueError):
 
            raise HTTPNotFound
 

	
 
        res = cls.query().get(id_)
 
        if res is None:
 
            raise HTTPNotFound
 
        return res
 

	
 
    @classmethod
 
    def delete(cls, id_):
 
        obj = cls.query().get(id_)
 
        meta.Session().delete(obj)
 

	
 
    def __repr__(self):
 
        return '<DB:%s>' % (self.__class__.__name__)
 

	
 

	
 
_table_args_default_dict = {'extend_existing': True,
 
                            'mysql_engine': 'InnoDB',
 
                            'sqlite_autoincrement': True,
 
                           }
 

	
 
class Setting(meta.Base, BaseDbModel):
 
    __tablename__ = 'settings'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    SETTINGS_TYPES = {
 
        'str': safe_bytes,
 
        'int': safe_int,
 
        'unicode': safe_str,
 
        'bool': asbool,
 
        'list': functools.partial(aslist, sep=',')
 
    }
 

	
 
    app_settings_id = Column(Integer(), primary_key=True)
 
    app_settings_name = Column(String(255), nullable=False, unique=True)
 
    _app_settings_value = Column("app_settings_value", Unicode(4096), nullable=False)
 
    _app_settings_type = Column("app_settings_type", String(255), nullable=True) # FIXME: not nullable?
 

	
 
    def __init__(self, key='', val='', type='unicode'):
 
        self.app_settings_name = key
 
        self.app_settings_value = val
 
        self.app_settings_type = type
 

	
 
    @validates('_app_settings_value')
 
    def validate_settings_value(self, key, val):
 
        assert isinstance(val, str)
 
        return val
 

	
 
    @hybrid_property
 
    def app_settings_value(self):
 
        v = self._app_settings_value
 
        _type = self.app_settings_type
 
        converter = self.SETTINGS_TYPES.get(_type) or self.SETTINGS_TYPES['unicode']
 
        return converter(v)
 

	
 
    @app_settings_value.setter
 
    def app_settings_value(self, val):
 
        """
 
        Setter that will always make sure we use str in app_settings_value
 
        """
 
        self._app_settings_value = safe_str(val)
 

	
 
    @hybrid_property
 
    def app_settings_type(self):
 
        return self._app_settings_type
 

	
 
    @app_settings_type.setter
 
    def app_settings_type(self, val):
 
        if val not in self.SETTINGS_TYPES:
 
            raise Exception('type must be one of %s got %s'
 
                            % (list(self.SETTINGS_TYPES), val))
 
        self._app_settings_type = val
 

	
 
    def __repr__(self):
 
        return "<%s %s.%s=%r>" % (
 
            self.__class__.__name__,
 
            self.app_settings_name, self.app_settings_type, self.app_settings_value
 
        )
 

	
 
    @classmethod
 
    def get_by_name(cls, key):
 
        return cls.query() \
 
            .filter(cls.app_settings_name == key).scalar()
 

	
 
    @classmethod
 
    def get_by_name_or_create(cls, key, val='', type='unicode'):
 
        res = cls.get_by_name(key)
 
        if res is None:
 
            res = cls(key, val, type)
 
        return res
 

	
 
    @classmethod
 
    def create_or_update(cls, key, val=None, type=None):
 
        """
 
        Creates or updates Kallithea setting. If updates are triggered, it will only
 
        update parameters that are explicitly set. 'None' values will be skipped.
 

	
 
        :param key:
 
        :param val:
 
        :param type:
 
        :return:
 
        """
 
        res = cls.get_by_name(key)
 
        if res is None:
 
            # new setting
 
            val = val if val is not None else ''
 
            type = type if type is not None else 'unicode'
 
            res = cls(key, val, type)
 
            meta.Session().add(res)
 
        else:
 
            if val is not None:
 
                # update if set
 
                res.app_settings_value = val
 
            if type is not None:
 
                # update if set
 
                res.app_settings_type = type
 
        return res
 

	
 
    @classmethod
 
    def get_app_settings(cls):
 

	
 
        ret = cls.query()
 
        if ret is None:
 
            raise Exception('Could not get application settings !')
 
        settings = {}
 
        for each in ret:
 
            settings[each.app_settings_name] = \
 
                each.app_settings_value
 

	
 
        return settings
 

	
 
    @classmethod
 
    def get_auth_settings(cls):
 
        ret = cls.query() \
 
                .filter(cls.app_settings_name.startswith('auth_')).all()
 
        fd = {}
 
        for row in ret:
 
            fd[row.app_settings_name] = row.app_settings_value
 
        return fd
 

	
 
    @classmethod
 
    def get_default_repo_settings(cls, strip_prefix=False):
 
        ret = cls.query() \
 
                .filter(cls.app_settings_name.startswith('default_')).all()
 
        fd = {}
 
        for row in ret:
 
            key = row.app_settings_name
 
            if strip_prefix:
 
                key = remove_prefix(key, prefix='default_')
 
            fd.update({key: row.app_settings_value})
 

	
 
        return fd
 

	
 
    @classmethod
 
    def get_server_info(cls):
 
        import platform
 

	
 
        import pkg_resources
 

	
 
        mods = [(p.project_name, p.version) for p in pkg_resources.working_set]
 
        info = {
 
            'modules': sorted(mods, key=lambda k: k[0].lower()),
 
            'py_version': platform.python_version(),
 
            'platform': platform.platform(),
 
            'kallithea_version': kallithea.__version__,
 
            'git_version': str(check_git_version()),
 
            'git_path': kallithea.CONFIG.get('git_path')
 
        }
 
        return info
 

	
 

	
 
class Ui(meta.Base, BaseDbModel):
 
    __tablename__ = 'ui'
 
    __table_args__ = (
 
        Index('ui_ui_section_ui_key_idx', 'ui_section', 'ui_key'),
 
        UniqueConstraint('ui_section', 'ui_key'),
 
        _table_args_default_dict,
 
    )
 

	
 
    HOOK_UPDATE = 'changegroup.update'
 
    HOOK_REPO_SIZE = 'changegroup.repo_size'
 
    HOOK_UPDATE = 'changegroup.kallithea_update'
 
    HOOK_REPO_SIZE = 'changegroup.kallithea_repo_size'
 

	
 
    ui_id = Column(Integer(), primary_key=True)
 
    ui_section = Column(String(255), nullable=False)
 
    ui_key = Column(String(255), nullable=False)
 
    ui_value = Column(String(255), nullable=True) # FIXME: not nullable?
 
    ui_active = Column(Boolean(), nullable=False, default=True)
 

	
 
    @classmethod
 
    def get_by_key(cls, section, key):
 
        """ Return specified Ui object, or None if not found. """
 
        return cls.query().filter_by(ui_section=section, ui_key=key).scalar()
 

	
 
    @classmethod
 
    def get_or_create(cls, section, key):
 
        """ Return specified Ui object, creating it if necessary. """
 
        setting = cls.get_by_key(section, key)
 
        if setting is None:
 
            setting = cls(ui_section=section, ui_key=key)
 
            meta.Session().add(setting)
 
        return setting
 

	
 
    @classmethod
 
    def get_builtin_hooks(cls):
 
        q = cls.query()
 
        q = q.filter(cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE]))
 
        q = q.filter(cls.ui_section == 'hooks')
 
        q = q.order_by(cls.ui_section, cls.ui_key)
 
        return q.all()
 

	
 
    @classmethod
 
    def get_custom_hooks(cls):
 
        q = cls.query()
 
        q = q.filter(~cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE]))
 
        q = q.filter(cls.ui_section == 'hooks')
 
        q = q.order_by(cls.ui_section, cls.ui_key)
 
        return q.all()
 

	
 
    @classmethod
 
    def get_repos_location(cls):
 
        return cls.get_by_key('paths', '/').ui_value
 

	
 
    @classmethod
 
    def create_or_update_hook(cls, key, val):
 
        new_ui = cls.get_or_create('hooks', key)
 
        new_ui.ui_active = True
 
        new_ui.ui_value = val
 

	
 
    def __repr__(self):
 
        return '<%s %s.%s=%r>' % (
 
            self.__class__.__name__,
 
            self.ui_section, self.ui_key, self.ui_value)
 

	
 

	
 
class User(meta.Base, BaseDbModel):
 
    __tablename__ = 'users'
 
    __table_args__ = (
 
        Index('u_username_idx', 'username'),
 
        Index('u_email_idx', 'email'),
 
        _table_args_default_dict,
 
    )
 

	
 
    DEFAULT_USER_NAME = 'default'
 
    DEFAULT_GRAVATAR_URL = 'https://secure.gravatar.com/avatar/{md5email}?d=identicon&s={size}'
 
    # The name of the default auth type in extern_type, 'internal' lives in auth_internal.py
 
    DEFAULT_AUTH_TYPE = 'internal'
 

	
 
    user_id = Column(Integer(), primary_key=True)
 
    username = Column(String(255), nullable=False, unique=True)
 
    password = Column(String(255), nullable=False)
 
    active = Column(Boolean(), nullable=False, default=True)
 
    admin = Column(Boolean(), nullable=False, default=False)
 
    name = Column("firstname", Unicode(255), nullable=False)
 
    lastname = Column(Unicode(255), nullable=False)
 
    _email = Column("email", String(255), nullable=True, unique=True) # FIXME: not nullable?
 
    last_login = Column(DateTime(timezone=False), nullable=True)
 
    extern_type = Column(String(255), nullable=True) # FIXME: not nullable?
 
    extern_name = Column(String(255), nullable=True) # FIXME: not nullable?
 
    api_key = Column(String(255), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    _user_data = Column("user_data", LargeBinary(), nullable=True)  # JSON data # FIXME: not nullable?
 

	
 
    user_log = relationship('UserLog')
 
    user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
 

	
 
    repositories = relationship('Repository')
 
    repo_groups = relationship('RepoGroup')
 
    user_groups = relationship('UserGroup')
 
    user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
 
    followings = relationship('UserFollowing', primaryjoin='UserFollowing.user_id==User.user_id', cascade='all')
 

	
 
    repo_to_perm = relationship('UserRepoToPerm', primaryjoin='UserRepoToPerm.user_id==User.user_id', cascade='all')
 
    repo_group_to_perm = relationship('UserRepoGroupToPerm', primaryjoin='UserRepoGroupToPerm.user_id==User.user_id', cascade='all')
 

	
 
    group_member = relationship('UserGroupMember', cascade='all')
 

	
 
    # comments created by this user
 
    user_comments = relationship('ChangesetComment', cascade='all')
 
    # extra emails for this user
 
    user_emails = relationship('UserEmailMap', cascade='all')
 
    # extra API keys
 
    user_api_keys = relationship('UserApiKeys', cascade='all')
 
    ssh_keys = relationship('UserSshKeys', cascade='all')
 

	
 
    @hybrid_property
 
    def email(self):
 
        return self._email
 

	
 
    @email.setter
 
    def email(self, val):
 
        self._email = val.lower() if val else None
 

	
 
    @property
 
    def firstname(self):
 
        # alias for future
 
        return self.name
 

	
 
    @property
 
    def emails(self):
 
        other = UserEmailMap.query().filter(UserEmailMap.user == self).all()
 
        return [self.email] + [x.email for x in other]
 

	
 
    @property
 
    def api_keys(self):
 
        other = UserApiKeys.query().filter(UserApiKeys.user == self).all()
 
        return [self.api_key] + [x.api_key for x in other]
 

	
 
    @property
 
    def ip_addresses(self):
 
        ret = UserIpMap.query().filter(UserIpMap.user == self).all()
 
        return [x.ip_addr for x in ret]
 

	
 
    @property
 
    def full_name(self):
 
        return '%s %s' % (self.firstname, self.lastname)
 

	
 
    @property
 
    def full_name_or_username(self):
 
        """
 
        Show full name.
 
        If full name is not set, fall back to username.
 
        """
 
        return ('%s %s' % (self.firstname, self.lastname)
 
                if (self.firstname and self.lastname) else self.username)
 

	
 
    @property
 
    def full_name_and_username(self):
 
        """
 
        Show full name and username as 'Firstname Lastname (username)'.
 
        If full name is not set, fall back to username.
 
        """
 
        return ('%s %s (%s)' % (self.firstname, self.lastname, self.username)
 
                if (self.firstname and self.lastname) else self.username)
 

	
 
    @property
 
    def full_contact(self):
 
        return '%s %s <%s>' % (self.firstname, self.lastname, self.email)
 

	
 
    @property
 
    def short_contact(self):
 
        return '%s %s' % (self.firstname, self.lastname)
 

	
 
    @property
 
    def is_admin(self):
 
        return self.admin
 

	
 
    @hybrid_property
 
    def is_default_user(self):
 
        return self.username == User.DEFAULT_USER_NAME
 

	
 
    @hybrid_property
 
    def user_data(self):
 
        if not self._user_data:
 
            return {}
 

	
 
        try:
 
            return ext_json.loads(self._user_data)
 
        except TypeError:
 
            return {}
 

	
 
    @user_data.setter
 
    def user_data(self, val):
 
        try:
 
            self._user_data = ascii_bytes(ext_json.dumps(val))
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    def __repr__(self):
 
        return "<%s %s: %r>" % (self.__class__.__name__, self.user_id, self.username)
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(User, cls).guess_instance(value, User.get_by_username)
kallithea/model/forms.py
Show inline comments
 
@@ -195,370 +195,370 @@ def RegisterForm(edit=False, old_data=No
 
            v.ValidUsername(edit, old_data),
 
            v.UnicodeString(strip=True, min=1, not_empty=True)
 
        )
 
        password = All(
 
            v.ValidPassword(),
 
            v.UnicodeString(strip=False, min=6, not_empty=True)
 
        )
 
        password_confirmation = All(
 
            v.ValidPassword(),
 
            v.UnicodeString(strip=False, min=6, not_empty=True)
 
        )
 
        active = v.StringBoolean(if_missing=False)
 
        firstname = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        lastname = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        email = All(v.Email(not_empty=True), v.UniqSystemEmail(old_data))
 

	
 
        chained_validators = [v.ValidPasswordsMatch('password',
 
                                                    'password_confirmation')]
 

	
 
    return _RegisterForm
 

	
 

	
 
def PasswordResetRequestForm():
 
    class _PasswordResetRequestForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        email = v.Email(not_empty=True)
 
    return _PasswordResetRequestForm
 

	
 

	
 
def PasswordResetConfirmationForm():
 
    class _PasswordResetConfirmationForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 

	
 
        email = v.UnicodeString(strip=True, not_empty=True)
 
        timestamp = v.Number(strip=True, not_empty=True)
 
        token = v.UnicodeString(strip=True, not_empty=True)
 
        password = All(v.ValidPassword(), v.UnicodeString(strip=False, min=6))
 
        password_confirm = All(v.ValidPassword(), v.UnicodeString(strip=False, min=6))
 

	
 
        chained_validators = [v.ValidPasswordsMatch('password',
 
                                                    'password_confirm')]
 
    return _PasswordResetConfirmationForm
 

	
 

	
 
def RepoForm(edit=False, old_data=None, supported_backends=kallithea.BACKENDS,
 
             repo_groups=None, landing_revs=None):
 
    old_data = old_data or {}
 
    repo_groups = repo_groups or []
 
    landing_revs = landing_revs or []
 
    repo_group_ids = [rg[0] for rg in repo_groups]
 

	
 
    class _RepoForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        repo_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
 
                        v.SlugifyName())
 
        repo_group = All(v.CanWriteGroup(old_data),
 
                         v.OneOf(repo_group_ids, hideList=True),
 
                         v.Int(min=-1, not_empty=True))
 
        repo_type = v.OneOf(supported_backends, required=False,
 
                            if_missing=old_data.get('repo_type'))
 
        repo_description = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        repo_private = v.StringBoolean(if_missing=False)
 
        repo_landing_rev = v.OneOf(landing_revs, hideList=True)
 
        repo_copy_permissions = v.StringBoolean(if_missing=False)
 
        clone_uri = All(v.UnicodeString(strip=True, min=1, not_empty=False))
 

	
 
        repo_enable_statistics = v.StringBoolean(if_missing=False)
 
        repo_enable_downloads = v.StringBoolean(if_missing=False)
 

	
 
        if edit:
 
            owner = All(v.UnicodeString(not_empty=True), v.ValidRepoUser())
 
            # Not a real field - just for reference for validation:
 
            # clone_uri_hidden = v.UnicodeString(if_missing='')
 

	
 
        chained_validators = [v.ValidCloneUri(),
 
                              v.ValidRepoName(edit, old_data)]
 
    return _RepoForm
 

	
 

	
 
def RepoPermsForm():
 
    class _RepoPermsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        chained_validators = [v.ValidPerms(type_='repo')]
 
    return _RepoPermsForm
 

	
 

	
 
def RepoGroupPermsForm(valid_recursive_choices):
 
    class _RepoGroupPermsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        recursive = v.OneOf(valid_recursive_choices)
 
        chained_validators = [v.ValidPerms(type_='repo_group')]
 
    return _RepoGroupPermsForm
 

	
 

	
 
def UserGroupPermsForm():
 
    class _UserPermsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        chained_validators = [v.ValidPerms(type_='user_group')]
 
    return _UserPermsForm
 

	
 

	
 
def RepoFieldForm():
 
    class _RepoFieldForm(formencode.Schema):
 
        filter_extra_fields = True
 
        allow_extra_fields = True
 

	
 
        new_field_key = All(v.FieldKey(),
 
                            v.UnicodeString(strip=True, min=3, not_empty=True))
 
        new_field_value = v.UnicodeString(not_empty=False, if_missing='')
 
        new_field_type = v.OneOf(['str', 'unicode', 'list', 'tuple'],
 
                                 if_missing='str')
 
        new_field_label = v.UnicodeString(not_empty=False)
 
        new_field_desc = v.UnicodeString(not_empty=False)
 

	
 
    return _RepoFieldForm
 

	
 

	
 
def RepoForkForm(edit=False, old_data=None, supported_backends=kallithea.BACKENDS,
 
                 repo_groups=None, landing_revs=None):
 
    old_data = old_data or {}
 
    repo_groups = repo_groups or []
 
    landing_revs = landing_revs or []
 
    repo_group_ids = [rg[0] for rg in repo_groups]
 

	
 
    class _RepoForkForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        repo_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
 
                        v.SlugifyName())
 
        repo_group = All(v.CanWriteGroup(),
 
                         v.OneOf(repo_group_ids, hideList=True),
 
                         v.Int(min=-1, not_empty=True))
 
        repo_type = All(v.ValidForkType(old_data), v.OneOf(supported_backends))
 
        description = v.UnicodeString(strip=True, min=1, not_empty=True)
 
        private = v.StringBoolean(if_missing=False)
 
        copy_permissions = v.StringBoolean(if_missing=False)
 
        update_after_clone = v.StringBoolean(if_missing=False)
 
        fork_parent_id = v.UnicodeString()
 
        chained_validators = [v.ValidForkName(edit, old_data)]
 
        landing_rev = v.OneOf(landing_revs, hideList=True)
 

	
 
    return _RepoForkForm
 

	
 

	
 
def ApplicationSettingsForm():
 
    class _ApplicationSettingsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        title = v.UnicodeString(strip=True, not_empty=False)
 
        realm = v.UnicodeString(strip=True, min=1, not_empty=True)
 
        ga_code = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        captcha_public_key = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        captcha_private_key = v.UnicodeString(strip=True, min=1, not_empty=False)
 

	
 
    return _ApplicationSettingsForm
 

	
 

	
 
def ApplicationVisualisationForm():
 
    class _ApplicationVisualisationForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        show_public_icon = v.StringBoolean(if_missing=False)
 
        show_private_icon = v.StringBoolean(if_missing=False)
 
        stylify_metalabels = v.StringBoolean(if_missing=False)
 

	
 
        repository_fields = v.StringBoolean(if_missing=False)
 
        lightweight_journal = v.StringBoolean(if_missing=False)
 
        dashboard_items = v.Int(min=5, not_empty=True)
 
        admin_grid_items = v.Int(min=5, not_empty=True)
 
        show_version = v.StringBoolean(if_missing=False)
 
        use_gravatar = v.StringBoolean(if_missing=False)
 
        gravatar_url = v.UnicodeString(min=3)
 
        clone_uri_tmpl = v.UnicodeString(min=3)
 
        clone_ssh_tmpl = v.UnicodeString()
 

	
 
    return _ApplicationVisualisationForm
 

	
 

	
 
def ApplicationUiSettingsForm():
 
    class _ApplicationUiSettingsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        paths_root_path = All(
 
            v.ValidPath(),
 
            v.UnicodeString(strip=True, min=1, not_empty=True)
 
        )
 
        hooks_changegroup_update = v.StringBoolean(if_missing=False)
 
        hooks_changegroup_repo_size = v.StringBoolean(if_missing=False)
 
        hooks_changegroup_kallithea_update = v.StringBoolean(if_missing=False)
 
        hooks_changegroup_kallithea_repo_size = v.StringBoolean(if_missing=False)
 

	
 
        extensions_largefiles = v.StringBoolean(if_missing=False)
 
        extensions_hggit = v.StringBoolean(if_missing=False)
 

	
 
    return _ApplicationUiSettingsForm
 

	
 

	
 
def DefaultPermissionsForm(repo_perms_choices, group_perms_choices,
 
                           user_group_perms_choices, create_choices,
 
                           user_group_create_choices, fork_choices,
 
                           register_choices, extern_activate_choices):
 
    class _DefaultPermissionsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        overwrite_default_repo = v.StringBoolean(if_missing=False)
 
        overwrite_default_group = v.StringBoolean(if_missing=False)
 
        overwrite_default_user_group = v.StringBoolean(if_missing=False)
 
        anonymous = v.StringBoolean(if_missing=False)
 
        default_repo_perm = v.OneOf(repo_perms_choices)
 
        default_group_perm = v.OneOf(group_perms_choices)
 
        default_user_group_perm = v.OneOf(user_group_perms_choices)
 

	
 
        default_repo_create = v.OneOf(create_choices)
 
        default_user_group_create = v.OneOf(user_group_create_choices)
 
        default_fork = v.OneOf(fork_choices)
 

	
 
        default_register = v.OneOf(register_choices)
 
        default_extern_activate = v.OneOf(extern_activate_choices)
 
    return _DefaultPermissionsForm
 

	
 

	
 
def CustomDefaultPermissionsForm():
 
    class _CustomDefaultPermissionsForm(formencode.Schema):
 
        filter_extra_fields = True
 
        allow_extra_fields = True
 

	
 
        create_repo_perm = v.StringBoolean(if_missing=False)
 
        create_user_group_perm = v.StringBoolean(if_missing=False)
 
        #create_repo_group_perm Impl. later
 

	
 
        fork_repo_perm = v.StringBoolean(if_missing=False)
 

	
 
    return _CustomDefaultPermissionsForm
 

	
 

	
 
def DefaultsForm(edit=False, old_data=None, supported_backends=kallithea.BACKENDS):
 
    class _DefaultsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        default_repo_type = v.OneOf(supported_backends)
 
        default_repo_private = v.StringBoolean(if_missing=False)
 
        default_repo_enable_statistics = v.StringBoolean(if_missing=False)
 
        default_repo_enable_downloads = v.StringBoolean(if_missing=False)
 

	
 
    return _DefaultsForm
 

	
 

	
 
def AuthSettingsForm(current_active_modules):
 
    class _AuthSettingsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        auth_plugins = All(v.ValidAuthPlugins(),
 
                           v.UniqueListFromString()(not_empty=True))
 

	
 
        def __init__(self, *args, **kwargs):
 
            # The auth plugins tell us what form validators they use
 
            if current_active_modules:
 
                import kallithea.lib.auth_modules
 
                from kallithea.lib.auth_modules import LazyFormencode
 
                for module in current_active_modules:
 
                    plugin = kallithea.lib.auth_modules.loadplugin(module)
 
                    plugin_name = plugin.name
 
                    for sv in plugin.plugin_settings():
 
                        newk = "auth_%s_%s" % (plugin_name, sv["name"])
 
                        # can be a LazyFormencode object from plugin settings
 
                        validator = sv["validator"]
 
                        if isinstance(validator, LazyFormencode):
 
                            validator = validator()
 
                        # init all lazy validators from formencode.All
 
                        if isinstance(validator, All):
 
                            init_validators = []
 
                            for validator in validator.validators:
 
                                if isinstance(validator, LazyFormencode):
 
                                    validator = validator()
 
                                init_validators.append(validator)
 
                            validator.validators = init_validators
 

	
 
                        self.add_field(newk, validator)
 
            formencode.Schema.__init__(self, *args, **kwargs)
 

	
 
    return _AuthSettingsForm
 

	
 

	
 
def LdapSettingsForm(tls_reqcert_choices, search_scope_choices,
 
                     tls_kind_choices):
 
    class _LdapSettingsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        #pre_validators = [LdapLibValidator]
 
        ldap_active = v.StringBoolean(if_missing=False)
 
        ldap_host = v.UnicodeString(strip=True,)
 
        ldap_port = v.Number(strip=True,)
 
        ldap_tls_kind = v.OneOf(tls_kind_choices)
 
        ldap_tls_reqcert = v.OneOf(tls_reqcert_choices)
 
        ldap_dn_user = v.UnicodeString(strip=True,)
 
        ldap_dn_pass = v.UnicodeString(strip=True,)
 
        ldap_base_dn = v.UnicodeString(strip=True,)
 
        ldap_filter = v.UnicodeString(strip=True,)
 
        ldap_search_scope = v.OneOf(search_scope_choices)
 
        ldap_attr_login = v.AttrLoginValidator()(not_empty=True)
 
        ldap_attr_firstname = v.UnicodeString(strip=True,)
 
        ldap_attr_lastname = v.UnicodeString(strip=True,)
 
        ldap_attr_email = v.UnicodeString(strip=True,)
 

	
 
    return _LdapSettingsForm
 

	
 

	
 
def UserExtraEmailForm():
 
    class _UserExtraEmailForm(formencode.Schema):
 
        email = All(v.UniqSystemEmail(), v.Email(not_empty=True))
 
    return _UserExtraEmailForm
 

	
 

	
 
def UserExtraIpForm():
 
    class _UserExtraIpForm(formencode.Schema):
 
        ip = v.ValidIp()(not_empty=True)
 
    return _UserExtraIpForm
 

	
 

	
 
def PullRequestForm(repo_id):
 
    class _PullRequestForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 

	
 
        org_repo = v.UnicodeString(strip=True, required=True)
 
        org_ref = v.UnicodeString(strip=True, required=True)
 
        other_repo = v.UnicodeString(strip=True, required=True)
 
        other_ref = v.UnicodeString(strip=True, required=True)
 

	
 
        pullrequest_title = v.UnicodeString(strip=True, required=True)
 
        pullrequest_desc = v.UnicodeString(strip=True, required=False)
 

	
 
    return _PullRequestForm
 

	
 

	
 
def PullRequestPostForm():
 
    class _PullRequestPostForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 

	
 
        pullrequest_title = v.UnicodeString(strip=True, required=True)
 
        pullrequest_desc = v.UnicodeString(strip=True, required=False)
 
        org_review_members = v.Set()
 
        review_members = v.Set()
 
        updaterev = v.UnicodeString(strip=True, required=False, if_missing=None)
 
        owner = All(v.UnicodeString(strip=True, required=True),
 
                    v.ValidRepoUser())
 

	
 
    return _PullRequestPostForm
 

	
 

	
 
def GistForm(lifetime_options):
 
    class _GistForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 

	
 
        filename = All(v.BasePath()(),
 
                       v.UnicodeString(strip=True, required=False))
 
        description = v.UnicodeString(required=False, if_missing='')
 
        lifetime = v.OneOf(lifetime_options)
 
        mimetype = v.UnicodeString(required=False, if_missing=None)
 
        content = v.UnicodeString(required=True, not_empty=True)
 
        public = v.UnicodeString(required=False, if_missing='')
 
        private = v.UnicodeString(required=False, if_missing='')
 

	
 
    return _GistForm
kallithea/templates/admin/settings/settings_vcs.html
Show inline comments
 
${h.form(url('admin_settings'), method='post')}
 
    <div class="form">
 
            <div class="form-group">
 
                <label class="control-label">${_('Hooks')}:</label>
 
                <div>
 
                    <div class="checkbox">
 
                        <label>
 
                            ${h.checkbox('hooks_changegroup_repo_size','True')}
 
                            ${h.checkbox('hooks_changegroup_kallithea_repo_size','True')}
 
                            ${_('Show repository size after push')}
 
                        </label>
 
                    </div>
 
                    <div class="checkbox">
 
                        <label>
 
                            ${h.checkbox('hooks_changegroup_update','True')}
 
                            ${h.checkbox('hooks_changegroup_kallithea_update','True')}
 
                            ${_('Update repository after push (hg update)')}
 
                        </label>
 
                    </div>
 
                </div>
 
            </div>
 
            <div class="form-group">
 
                <label class="control-label">${_('Mercurial extensions')}:</label>
 
                <div>
 
                    <div class="checkbox">
 
                        <label>
 
                            ${h.checkbox('extensions_largefiles','True')}
 
                            ${_('Enable largefiles extension')}
 
                        </label>
 
                    </div>
 
                    ##<div class="checkbox">
 
                    ##    <label>
 
                    ##        ${h.checkbox('extensions_hggit','True')}
 
                    ##        ${_('Enable hg-git extension')}
 
                    ##    </label>
 
                    ##</div>
 
                    ##<span class="help-block">${_('Requires hg-git library to be installed. Enables cloning of remote Git repositories while converting them to Mercurial.')}</span>
 
                </div>
 
            </div>
 
            %if c.visual.allow_repo_location_change:
 
            <div class="form-group">
 
                <label class="control-label" for="paths_root_path">${_('Location of repositories')}:</label>
 
                <div>
 
                    <div class="input-group">
 
                        ${h.text('paths_root_path',size=60,readonly="readonly",class_='form-control')}
 
                        <span id="path_unlock" data-toggle="tooltip" class="input-group-btn"
 
                            title="${_('Click to unlock. You must restart Kallithea in order to make this setting take effect.')}">
 
                            <button type="button" class="btn btn-default btn-sm"><i id="path_unlock_icon" class="icon-lock"></i></button>
 
                        </span>
 
                    </div>
 
                    <span class="help-block">${_('Filesystem location where repositories are stored. After changing this value, a restart and rescan of the repository folder are both required.')}</span>
 
                </div>
 
            </div>
 
            %else:
 
            ## form still requires this but we cannot internally change it anyway
 
            ${h.hidden('paths_root_path',size=30,readonly="readonly")}
 
            %endif
 
            <div class="form-group">
 
                <div class="buttons">
 
                    ${h.submit('save',_('Save Settings'),class_="btn btn-default")}
 
                    ${h.reset('reset',_('Reset'),class_="btn btn-default")}
 
                </div>
 
            </div>
 
    </div>
 
    ${h.end_form()}
 

	
 
    <script>
 
        'use strict';
 
        $(document).ready(function(){
 
            $('#path_unlock').on('click', function(){
 
                $('#path_unlock_icon').removeClass('icon-lock');
 
                $('#path_unlock_icon').addClass('icon-lock-open-alt');
 
                $('#paths_root_path').removeAttr('readonly');
 
            });
 
        });
 
    </script>
kallithea/tests/functional/test_admin_settings.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
from kallithea.model import db
 
from kallithea.tests import base
 
from kallithea.tests.fixture import Fixture
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestAdminSettingsController(base.TestController):
 

	
 
    def test_index_main(self):
 
        self.log_user()
 
        response = self.app.get(base.url('admin_settings'))
 

	
 
    def test_index_mapping(self):
 
        self.log_user()
 
        response = self.app.get(base.url('admin_settings_mapping'))
 

	
 
    def test_index_global(self):
 
        self.log_user()
 
        response = self.app.get(base.url('admin_settings_global'))
 

	
 
    def test_index_visual(self):
 
        self.log_user()
 
        response = self.app.get(base.url('admin_settings_visual'))
 

	
 
    def test_index_email(self):
 
        self.log_user()
 
        response = self.app.get(base.url('admin_settings_email'))
 

	
 
    def test_index_hooks(self):
 
        self.log_user()
 
        response = self.app.get(base.url('admin_settings_hooks'))
 

	
 
    def test_create_custom_hook(self):
 
        self.log_user()
 
        response = self.app.post(base.url('admin_settings_hooks'),
 
                                params=dict(new_hook_ui_key='test_hooks_1',
 
                                            new_hook_ui_value='cd %s' % base.TESTS_TMP_PATH,
 
                                            _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
        self.checkSessionFlash(response, 'Added new hook')
 
        response = response.follow()
 
        response.mustcontain('test_hooks_1')
 
        response.mustcontain('cd %s' % base.TESTS_TMP_PATH)
 

	
 
        # test_edit_custom_hook
 
        response = self.app.post(base.url('admin_settings_hooks'),
 
                                params=dict(hook_ui_key='test_hooks_1',
 
                                            hook_ui_value='old_value_of_hook_1',
 
                                            hook_ui_value_new='new_value_of_hook_1',
 
                                            _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
        response = response.follow()
 
        response.mustcontain('test_hooks_1')
 
        response.mustcontain('new_value_of_hook_1')
 

	
 
        # test_add_existing_custom_hook
 
        response = self.app.post(base.url('admin_settings_hooks'),
 
                                params=dict(new_hook_ui_key='test_hooks_1',
 
                                            new_hook_ui_value='attempted_new_value',
 
                                            _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
        self.checkSessionFlash(response, 'Hook already exists')
 
        response = response.follow()
 
        response.mustcontain('test_hooks_1')
 
        response.mustcontain('new_value_of_hook_1')
 

	
 
    def test_create_custom_hook_delete(self):
 
        self.log_user()
 
        response = self.app.post(base.url('admin_settings_hooks'),
 
                                params=dict(new_hook_ui_key='test_hooks_2',
 
                                            new_hook_ui_value='cd %s2' % base.TESTS_TMP_PATH,
 
                                            _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
        self.checkSessionFlash(response, 'Added new hook')
 
        response = response.follow()
 
        response.mustcontain('test_hooks_2')
 
        response.mustcontain('cd %s2' % base.TESTS_TMP_PATH)
 

	
 
        hook_id = db.Ui.get_by_key('hooks', 'test_hooks_2').ui_id
 
        ## delete
 
        self.app.post(base.url('admin_settings_hooks'),
 
                        params=dict(hook_id=hook_id, _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        response = self.app.get(base.url('admin_settings_hooks'))
 
        response.mustcontain(no=['test_hooks_2'])
 
        response.mustcontain(no=['cd %s2' % base.TESTS_TMP_PATH])
 

	
 
    def test_add_existing_builtin_hook(self):
 
        self.log_user()
 
        response = self.app.post(base.url('admin_settings_hooks'),
 
                                params=dict(new_hook_ui_key='changegroup.update',
 
                                params=dict(new_hook_ui_key='changegroup.kallithea_update',
 
                                            new_hook_ui_value='attempted_new_value',
 
                                            _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
        self.checkSessionFlash(response, 'Builtin hooks are read-only')
 
        response = response.follow()
 
        response.mustcontain('changegroup.update')
 
        response.mustcontain('changegroup.kallithea_update')
 

	
 
    def test_index_search(self):
 
        self.log_user()
 
        response = self.app.get(base.url('admin_settings_search'))
 

	
 
    def test_index_system(self):
 
        self.log_user()
 
        response = self.app.get(base.url('admin_settings_system'))
 

	
 
    def test_ga_code_active(self):
 
        self.log_user()
 
        old_title = 'Kallithea'
 
        old_realm = 'Kallithea authentication'
 
        new_ga_code = 'ga-test-123456789'
 
        response = self.app.post(base.url('admin_settings_global'),
 
                        params=dict(title=old_title,
 
                                 realm=old_realm,
 
                                 ga_code=new_ga_code,
 
                                 captcha_private_key='',
 
                                 captcha_public_key='',
 
                                 _session_csrf_secret_token=self.session_csrf_secret_token(),
 
                                 ))
 

	
 
        self.checkSessionFlash(response, 'Updated application settings')
 

	
 
        assert db.Setting.get_app_settings()['ga_code'] == new_ga_code
 

	
 
        response = response.follow()
 
        response.mustcontain("""_gaq.push(['_setAccount', '%s']);""" % new_ga_code)
 

	
 
    def test_ga_code_inactive(self):
 
        self.log_user()
 
        old_title = 'Kallithea'
 
        old_realm = 'Kallithea authentication'
 
        new_ga_code = ''
 
        response = self.app.post(base.url('admin_settings_global'),
 
                        params=dict(title=old_title,
 
                                 realm=old_realm,
 
                                 ga_code=new_ga_code,
 
                                 captcha_private_key='',
 
                                 captcha_public_key='',
 
                                 _session_csrf_secret_token=self.session_csrf_secret_token(),
 
                                 ))
 

	
 
        self.checkSessionFlash(response, 'Updated application settings')
 
        assert db.Setting.get_app_settings()['ga_code'] == new_ga_code
 

	
 
        response = response.follow()
 
        response.mustcontain(no=["_gaq.push(['_setAccount', '%s']);" % new_ga_code])
 

	
 
    def test_captcha_activate(self):
 
        self.log_user()
 
        old_title = 'Kallithea'
 
        old_realm = 'Kallithea authentication'
 
        new_ga_code = ''
 
        response = self.app.post(base.url('admin_settings_global'),
 
                        params=dict(title=old_title,
 
                                 realm=old_realm,
 
                                 ga_code=new_ga_code,
 
                                 captcha_private_key='1234567890',
 
                                 captcha_public_key='1234567890',
 
                                 _session_csrf_secret_token=self.session_csrf_secret_token(),
 
                                 ))
 

	
 
        self.checkSessionFlash(response, 'Updated application settings')
 
        assert db.Setting.get_app_settings()['captcha_private_key'] == '1234567890'
 

	
 
        response = self.app.get(base.url('register'))
 
        response.mustcontain('captcha')
 

	
 
    def test_captcha_deactivate(self):
 
        self.log_user()
 
        old_title = 'Kallithea'
 
        old_realm = 'Kallithea authentication'
 
        new_ga_code = ''
 
        response = self.app.post(base.url('admin_settings_global'),
 
                        params=dict(title=old_title,
 
                                 realm=old_realm,
 
                                 ga_code=new_ga_code,
 
                                 captcha_private_key='',
 
                                 captcha_public_key='1234567890',
 
                                 _session_csrf_secret_token=self.session_csrf_secret_token(),
 
                                 ))
 

	
 
        self.checkSessionFlash(response, 'Updated application settings')
 
        assert db.Setting.get_app_settings()['captcha_private_key'] == ''
 

	
 
        response = self.app.get(base.url('register'))
 
        response.mustcontain(no=['captcha'])
 

	
 
    def test_title_change(self):
 
        self.log_user()
 
        old_title = 'Kallithea'
 
        new_title = old_title + '_changed'
 
        old_realm = 'Kallithea authentication'
 

	
 
        for new_title in ['Changed', 'Żółwik', old_title]:
 
            response = self.app.post(base.url('admin_settings_global'),
 
                        params=dict(title=new_title,
 
                                 realm=old_realm,
 
                                 ga_code='',
 
                                 captcha_private_key='',
 
                                 captcha_public_key='',
 
                                 _session_csrf_secret_token=self.session_csrf_secret_token(),
 
                                ))
 

	
 
            self.checkSessionFlash(response, 'Updated application settings')
 
            assert db.Setting.get_app_settings()['title'] == new_title
 

	
 
            response = response.follow()
 
            response.mustcontain("""<span class="branding">%s</span>""" % new_title)
0 comments (0 inline, 0 general)