Files @ b4d1e85265c1
Branch filter:

Location: kallithea/kallithea/controllers/forks.py

Søren Løvborg
auth: simplify repository group permission checks

In practice, Kallithea has the 'group.admin' permission imply the
'group.write' permission, which again implies 'group.read'.

This codifies this practice by replacing HasRepoGroupPermissionAny
"perm function" with the new HasRepoGroupLevel function, reducing the
risk of errors and saving quite a lot of typing.
# -*- coding: utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""
kallithea.controllers.forks
~~~~~~~~~~~~~~~~~~~~~~~~~~~

forks controller for Kallithea

This file was forked by the Kallithea project in July 2014.
Original author and date, and relevant copyright and licensing information is below:
:created_on: Apr 23, 2011
:author: marcink
:copyright: (c) 2013 RhodeCode GmbH, and others.
:license: GPLv3, see LICENSE.md for more details.
"""

import logging
import formencode
import traceback
from formencode import htmlfill

from pylons import tmpl_context as c, request
from pylons.i18n.translation import _
from webob.exc import HTTPFound

import kallithea.lib.helpers as h

from kallithea.config.routing import url
from kallithea.lib.auth import LoginRequired, HasRepoPermissionLevelDecorator, \
    NotAnonymous, HasRepoPermissionLevel, HasPermissionAnyDecorator, HasPermissionAny
from kallithea.lib.base import BaseRepoController, render
from kallithea.lib.page import Page
from kallithea.lib.utils2 import safe_int
from kallithea.model.db import Repository, UserFollowing, User, Ui
from kallithea.model.repo import RepoModel
from kallithea.model.forms import RepoForkForm
from kallithea.model.scm import ScmModel, AvailableRepoGroupChoices

log = logging.getLogger(__name__)


class ForksController(BaseRepoController):

    def __before__(self):
        super(ForksController, self).__before__()

    def __load_defaults(self):
        if HasPermissionAny('hg.create.write_on_repogroup.true')():
            repo_group_perm_level = 'write'
        else:
            repo_group_perm_level = 'admin'
        c.repo_groups = AvailableRepoGroupChoices(['hg.create.repository'], repo_group_perm_level)

        c.landing_revs_choices, c.landing_revs = ScmModel().get_repo_landing_revs()

        c.can_update = Ui.get_by_key('hooks', Ui.HOOK_UPDATE).ui_active

    def __load_data(self):
        """
        Load defaults settings for edit, and update
        """
        self.__load_defaults()

        c.repo_info = c.db_repo
        repo = c.db_repo.scm_instance

        if c.repo_info is None:
            h.not_mapped_error(c.repo_name)
            raise HTTPFound(location=url('repos'))

        c.default_user_id = User.get_default_user().user_id
        c.in_public_journal = UserFollowing.query() \
            .filter(UserFollowing.user_id == c.default_user_id) \
            .filter(UserFollowing.follows_repository == c.repo_info).scalar()

        if c.repo_info.stats:
            last_rev = c.repo_info.stats.stat_on_revision+1
        else:
            last_rev = 0
        c.stats_revision = last_rev

        c.repo_last_rev = repo.count() if repo.revisions else 0

        if last_rev == 0 or c.repo_last_rev == 0:
            c.stats_percentage = 0
        else:
            c.stats_percentage = '%.2f' % ((float((last_rev)) /
                                            c.repo_last_rev) * 100)

        defaults = RepoModel()._get_defaults(c.repo_name)
        # alter the description to indicate a fork
        defaults['description'] = ('fork of repository: %s \n%s'
                                   % (defaults['repo_name'],
                                      defaults['description']))
        # add suffix to fork
        defaults['repo_name'] = '%s-fork' % defaults['repo_name']

        return defaults

    @LoginRequired()
    @HasRepoPermissionLevelDecorator('read')
    def forks(self, repo_name):
        p = safe_int(request.GET.get('page'), 1)
        repo_id = c.db_repo.repo_id
        d = []
        for r in Repository.get_repo_forks(repo_id):
            if not HasRepoPermissionLevel('read')(r.repo_name, 'get forks check'):
                continue
            d.append(r)
        c.forks_pager = Page(d, page=p, items_per_page=20)

        if request.environ.get('HTTP_X_PARTIAL_XHR'):
            return render('/forks/forks_data.html')

        return render('/forks/forks.html')

    @LoginRequired()
    @NotAnonymous()
    @HasPermissionAnyDecorator('hg.admin', 'hg.fork.repository')
    @HasRepoPermissionLevelDecorator('read')
    def fork(self, repo_name):
        c.repo_info = Repository.get_by_repo_name(repo_name)
        if not c.repo_info:
            h.not_mapped_error(repo_name)
            raise HTTPFound(location=url('home'))

        defaults = self.__load_data()

        return htmlfill.render(
            render('forks/fork.html'),
            defaults=defaults,
            encoding="UTF-8",
            force_defaults=False)

    @LoginRequired()
    @NotAnonymous()
    @HasPermissionAnyDecorator('hg.admin', 'hg.fork.repository')
    @HasRepoPermissionLevelDecorator('read')
    def fork_create(self, repo_name):
        self.__load_defaults()
        c.repo_info = Repository.get_by_repo_name(repo_name)
        _form = RepoForkForm(old_data={'repo_type': c.repo_info.repo_type},
                             repo_groups=c.repo_groups,
                             landing_revs=c.landing_revs_choices)()
        form_result = {}
        task_id = None
        try:
            form_result = _form.to_python(dict(request.POST))

            # an approximation that is better than nothing
            if not Ui.get_by_key('hooks', Ui.HOOK_UPDATE).ui_active:
                form_result['update_after_clone'] = False

            # create fork is done sometimes async on celery, db transaction
            # management is handled there.
            task = RepoModel().create_fork(form_result, request.authuser.user_id)
            task_id = task.task_id
        except formencode.Invalid as errors:
            return htmlfill.render(
                render('forks/fork.html'),
                defaults=errors.value,
                errors=errors.error_dict or {},
                prefix_error=False,
                encoding="UTF-8",
                force_defaults=False)
        except Exception:
            log.error(traceback.format_exc())
            h.flash(_('An error occurred during repository forking %s') %
                    repo_name, category='error')

        raise HTTPFound(location=h.url('repo_creating_home',
                              repo_name=form_result['repo_name_full'],
                              task_id=task_id))