Changeset - 6fbbbd9a627a
[Not reviewed]
default
0 11 0
Mads Kiilerich (mads) - 5 years ago 2020-10-12 11:26:49
mads@kiilerich.com
Grafted from: 1aebb614b3a4
db: move URL_SEP constant from db to top level kallithea module

URL_SEP is used in several places - having it in the db module is too high
level.

__init__ might not be the best place for this, but it does no harm to the
dependencies graph to place constants there.
11 files changed with 29 insertions and 32 deletions:
0 comments (0 inline, 0 general)
kallithea/__init__.py
Show inline comments
 
@@ -36,24 +36,26 @@ if sys.version_info < (3, 6):
 

	
 
VERSION = (0, 6, 99)
 
BACKENDS = {
 
    'hg': 'Mercurial repository',
 
    'git': 'Git repository',
 
}
 

	
 
CELERY_APP = None  # set to Celery app instance if using Celery
 
CELERY_EAGER = False
 

	
 
CONFIG = {}
 

	
 
URL_SEP = '/'
 

	
 
# Linked module for extensions
 
EXTENSIONS = {}
 

	
 
__version__ = '.'.join(str(each) for each in VERSION)
 
__platform__ = platform.system()
 
__license__ = 'GPLv3'
 
__py_version__ = sys.version_info
 
__author__ = "Various Authors"
 
__url__ = 'https://kallithea-scm.org/'
 

	
 
is_windows = __platform__ in ['Windows']
 
is_unix = not is_windows
kallithea/controllers/files.py
Show inline comments
 
@@ -43,25 +43,24 @@ from kallithea.config.routing import url
 
from kallithea.lib import diffs
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import HasRepoPermissionLevelDecorator, LoginRequired
 
from kallithea.lib.base import BaseRepoController, jsonify, render
 
from kallithea.lib.exceptions import NonRelativePathError
 
from kallithea.lib.utils import action_logger
 
from kallithea.lib.utils2 import asbool, convert_line_endings, detect_mode, safe_str
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import (ChangesetDoesNotExistError, ChangesetError, EmptyRepositoryError, ImproperArchiveTypeError, NodeAlreadyExistsError,
 
                                          NodeDoesNotExistError, NodeError, RepositoryError, VCSError)
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.model import db
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.scm import ScmModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class FilesController(BaseRepoController):
 

	
 
    def _before(self, *args, **kwargs):
 
        super(FilesController, self)._before(*args, **kwargs)
 

	
 
@@ -224,25 +223,25 @@ class FilesController(BaseRepoController
 
            c.authors = []
 
            for a in set([x.author for x in _hist]):
 
                c.authors.append((h.email(a), h.person(a)))
 
            return render('files/files_history_box.html')
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def rawfile(self, repo_name, revision, f_path):
 
        cs = self.__get_cs(revision)
 
        file_node = self.__get_filenode(cs, f_path)
 

	
 
        response.content_disposition = \
 
            'attachment; filename=%s' % f_path.split(db.URL_SEP)[-1]
 
            'attachment; filename=%s' % f_path.split(kallithea.URL_SEP)[-1]
 

	
 
        response.content_type = file_node.mimetype
 
        return file_node.content
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def raw(self, repo_name, revision, f_path):
 
        cs = self.__get_cs(revision)
 
        file_node = self.__get_filenode(cs, f_path)
 

	
 
        raw_mimetype_mapping = {
 
            # map original mimetype to a mimetype used for "show as raw"
kallithea/lib/helpers.py
Show inline comments
 
@@ -48,25 +48,25 @@ from kallithea.lib.annotate import annot
 
from kallithea.lib.auth import HasPermissionAny, HasRepoGroupPermissionLevel, HasRepoPermissionLevel
 
from kallithea.lib.diffs import BIN_FILENODE, CHMOD_FILENODE, DEL_FILENODE, MOD_FILENODE, NEW_FILENODE, RENAMED_FILENODE
 
from kallithea.lib.markup_renderer import url_re
 
from kallithea.lib.pygmentsutils import get_custom_lexer
 
from kallithea.lib.utils2 import MENTIONS_REGEX, AttributeDict, age, asbool, credentials_filter, safe_bytes, safe_int, safe_str, time_to_datetime
 
from kallithea.lib.vcs.backends.base import BaseChangeset, EmptyChangeset
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError
 
#==============================================================================
 
# SCM FILTERS available via h.
 
#==============================================================================
 
from kallithea.lib.vcs.utils import author_email, author_name
 
from kallithea.model.changeset_status import ChangesetStatusModel
 
from kallithea.model.db import URL_SEP, ChangesetStatus, Permission, PullRequest, User, UserIpMap
 
from kallithea.model.db import ChangesetStatus, Permission, PullRequest, User, UserIpMap
 

	
 

	
 
# mute pyflakes "imported but unused"
 
assert Option
 
assert checkbox
 
assert end_form
 
assert password
 
assert radio
 
assert submit
 
assert text
 
assert textarea
 
assert format_byte_size
 
@@ -1233,25 +1233,25 @@ def urlify_issues(newtext, repo_name):
 
                continue
 

	
 
            log.debug('issue pattern %r: %r -> %r %r', k, issue_pat, issue_server_link, issue_sub)
 

	
 
            def issues_replace(match_obj,
 
                               issue_server_link=issue_server_link, issue_sub=issue_sub):
 
                try:
 
                    issue_url = match_obj.expand(issue_server_link)
 
                except (IndexError, re.error) as e:
 
                    log.error('invalid issue_url setting %r -> %r %r. Error: %s', issue_pat, issue_server_link, issue_sub, str(e))
 
                    issue_url = issue_server_link
 
                issue_url = issue_url.replace('{repo}', repo_name)
 
                issue_url = issue_url.replace('{repo_name}', repo_name.split(URL_SEP)[-1])
 
                issue_url = issue_url.replace('{repo_name}', repo_name.split(kallithea.URL_SEP)[-1])
 
                # if issue_sub is empty use the matched issue reference verbatim
 
                if not issue_sub:
 
                    issue_text = match_obj.group()
 
                else:
 
                    try:
 
                        issue_text = match_obj.expand(issue_sub)
 
                    except (IndexError, re.error) as e:
 
                        log.error('invalid issue_sub setting %r -> %r %r. Error: %s', issue_pat, issue_server_link, issue_sub, str(e))
 
                        issue_text = match_obj.group()
 

	
 
                return (
 
                    '<a class="issue-tracker-link" href="%(url)s">'
kallithea/lib/utils.py
Show inline comments
 
@@ -38,25 +38,25 @@ import mercurial.config
 
import mercurial.error
 
import mercurial.ui
 

	
 
import kallithea.config.conf
 
from kallithea.lib.exceptions import InvalidCloneUriException
 
from kallithea.lib.utils2 import ascii_bytes, aslist, get_current_authuser, safe_bytes, safe_str
 
from kallithea.lib.vcs.backends.git.repository import GitRepository
 
from kallithea.lib.vcs.backends.hg.repository import MercurialRepository
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import RepositoryError, VCSError
 
from kallithea.lib.vcs.utils.fakemod import create_module
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.model import db, meta
 
from kallithea.model import meta
 
from kallithea.model.db import RepoGroup, Repository, Setting, Ui, User, UserGroup, UserLog
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
REMOVED_REPO_PAT = re.compile(r'rm__\d{8}_\d{6}_\d{6}_.*')
 

	
 

	
 
#==============================================================================
 
# PERM DECORATOR HELPERS FOR EXTRACTING NAMES FOR PERM CHECKS
 
#==============================================================================
 
def get_repo_slug(request):
 
@@ -390,25 +390,25 @@ def set_indexer_config(config):
 

	
 

	
 
def map_groups(path):
 
    """
 
    Given a full path to a repository, create all nested groups that this
 
    repo is inside. This function creates parent-child relationships between
 
    groups and creates default perms for all new groups.
 

	
 
    :param paths: full path to repository
 
    """
 
    from kallithea.model.repo_group import RepoGroupModel
 
    sa = meta.Session()
 
    groups = path.split(db.URL_SEP)
 
    groups = path.split(kallithea.URL_SEP)
 
    parent = None
 
    group = None
 

	
 
    # last element is repo in nested groups structure
 
    groups = groups[:-1]
 
    rgm = RepoGroupModel()
 
    owner = User.get_first_admin()
 
    for lvl, group_name in enumerate(groups):
 
        group_name = '/'.join(groups[:lvl] + [group_name])
 
        group = RepoGroup.get_by_group_name(group_name)
 
        desc = '%s group' % group_name
 

	
kallithea/model/db.py
Show inline comments
 
@@ -44,25 +44,24 @@ from tg.i18n import lazy_ugettext as _
 
from webob.exc import HTTPNotFound
 

	
 
import kallithea
 
from kallithea.lib import ext_json, ssh
 
from kallithea.lib.exceptions import DefaultUserException
 
from kallithea.lib.utils2 import asbool, ascii_bytes, aslist, get_changeset_safe, get_clone_url, remove_prefix, safe_bytes, safe_int, safe_str, urlreadable
 
from kallithea.lib.vcs import get_backend
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.model.meta import Base, Session
 

	
 

	
 
URL_SEP = '/'
 
log = logging.getLogger(__name__)
 

	
 
#==============================================================================
 
# BASE CLASSES
 
#==============================================================================
 

	
 
class BaseDbModel(object):
 
    """
 
    Base Model for all classes
 
    """
 

	
 
    @classmethod
 
@@ -1016,25 +1015,25 @@ class Repository(Base, BaseDbModel):
 

	
 
        return q
 

	
 
    @classmethod
 
    def normalize_repo_name(cls, repo_name):
 
        """
 
        Normalizes os specific repo_name to the format internally stored inside
 
        database using URL_SEP
 

	
 
        :param cls:
 
        :param repo_name:
 
        """
 
        return URL_SEP.join(repo_name.split(os.sep))
 
        return kallithea.URL_SEP.join(repo_name.split(os.sep))
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(Repository, cls).guess_instance(value, Repository.get_by_repo_name)
 

	
 
    @classmethod
 
    def get_by_repo_name(cls, repo_name, case_insensitive=False):
 
        """Get the repo, defaulting to database case sensitivity.
 
        case_insensitive will be slower and should only be specified if necessary."""
 
        if case_insensitive:
 
            q = Session().query(cls).filter(sqlalchemy.func.lower(cls.repo_name) == sqlalchemy.func.lower(repo_name))
 
        else:
 
@@ -1042,80 +1041,80 @@ class Repository(Base, BaseDbModel):
 
        q = q.options(joinedload(Repository.fork)) \
 
                .options(joinedload(Repository.owner)) \
 
                .options(joinedload(Repository.group))
 
        return q.scalar()
 

	
 
    @classmethod
 
    def get_by_full_path(cls, repo_full_path):
 
        base_full_path = os.path.realpath(kallithea.CONFIG['base_path'])
 
        repo_full_path = os.path.realpath(repo_full_path)
 
        assert repo_full_path.startswith(base_full_path + os.path.sep)
 
        repo_name = repo_full_path[len(base_full_path) + 1:]
 
        repo_name = cls.normalize_repo_name(repo_name)
 
        return cls.get_by_repo_name(repo_name.strip(URL_SEP))
 
        return cls.get_by_repo_name(repo_name.strip(kallithea.URL_SEP))
 

	
 
    @classmethod
 
    def get_repo_forks(cls, repo_id):
 
        return cls.query().filter(Repository.fork_id == repo_id)
 

	
 
    @property
 
    def forks(self):
 
        """
 
        Return forks of this repo
 
        """
 
        return Repository.get_repo_forks(self.repo_id)
 

	
 
    @property
 
    def parent(self):
 
        """
 
        Returns fork parent
 
        """
 
        return self.fork
 

	
 
    @property
 
    def just_name(self):
 
        return self.repo_name.split(URL_SEP)[-1]
 
        return self.repo_name.split(kallithea.URL_SEP)[-1]
 

	
 
    @property
 
    def groups_with_parents(self):
 
        groups = []
 
        group = self.group
 
        while group is not None:
 
            groups.append(group)
 
            group = group.parent_group
 
            assert group not in groups, group # avoid recursion on bad db content
 
        groups.reverse()
 
        return groups
 

	
 
    @property
 
    def repo_full_path(self):
 
        """
 
        Returns base full path for the repository - where it actually
 
        exists on a filesystem.
 
        """
 
        p = [kallithea.CONFIG['base_path']]
 
        # we need to split the name by / since this is how we store the
 
        # names in the database, but that eventually needs to be converted
 
        # into a valid system path
 
        p += self.repo_name.split(URL_SEP)
 
        p += self.repo_name.split(kallithea.URL_SEP)
 
        return os.path.join(*p)
 

	
 
    def get_new_name(self, repo_name):
 
        """
 
        returns new full repository name based on assigned group and new new
 

	
 
        :param group_name:
 
        """
 
        path_prefix = self.group.full_path_splitted if self.group else []
 
        return URL_SEP.join(path_prefix + [repo_name])
 
        return kallithea.URL_SEP.join(path_prefix + [repo_name])
 

	
 
    @property
 
    def _ui(self):
 
        """
 
        Creates an db based ui object for this repository
 
        """
 
        from kallithea.lib.utils import make_ui
 
        return make_ui()
 

	
 
    @classmethod
 
    def is_valid(cls, repo_name):
 
        """
 
@@ -1436,33 +1435,33 @@ class RepoGroup(Base, BaseDbModel):
 
            groups.append(group)
 
            group = group.parent_group
 
            assert group not in groups, group # avoid recursion on bad db content
 
        groups.reverse()
 
        return groups
 

	
 
    @property
 
    def children(self):
 
        return RepoGroup.query().filter(RepoGroup.parent_group == self)
 

	
 
    @property
 
    def name(self):
 
        return self.group_name.split(URL_SEP)[-1]
 
        return self.group_name.split(kallithea.URL_SEP)[-1]
 

	
 
    @property
 
    def full_path(self):
 
        return self.group_name
 

	
 
    @property
 
    def full_path_splitted(self):
 
        return self.group_name.split(URL_SEP)
 
        return self.group_name.split(kallithea.URL_SEP)
 

	
 
    @property
 
    def repositories(self):
 
        return Repository.query(sorted=True).filter_by(group=self)
 

	
 
    @property
 
    def repositories_recursive_count(self):
 
        cnt = self.repositories.count()
 

	
 
        def children_count(group):
 
            cnt = 0
 
            for child in group.children:
 
@@ -1499,25 +1498,25 @@ class RepoGroup(Base, BaseDbModel):
 
        Returns all children groups for this group including children of children
 
        """
 
        return self._recursive_objects(include_repos=False)
 

	
 
    def get_new_name(self, group_name):
 
        """
 
        returns new full group name based on parent and new name
 

	
 
        :param group_name:
 
        """
 
        path_prefix = (self.parent_group.full_path_splitted if
 
                       self.parent_group else [])
 
        return URL_SEP.join(path_prefix + [group_name])
 
        return kallithea.URL_SEP.join(path_prefix + [group_name])
 

	
 
    def get_api_data(self):
 
        """
 
        Common function for generating api data
 

	
 
        """
 
        group = self
 
        data = dict(
 
            group_id=group.group_id,
 
            group_name=group.group_name,
 
            group_description=group.group_description,
 
            parent_group=group.parent_group.group_name if group.parent_group else None,
kallithea/model/repo.py
Show inline comments
 
@@ -31,35 +31,33 @@ import os
 
import shutil
 
import traceback
 
from datetime import datetime
 

	
 
import kallithea.lib.utils2
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import HasRepoPermissionLevel, HasUserGroupPermissionLevel
 
from kallithea.lib.exceptions import AttachedForksError
 
from kallithea.lib.hooks import log_delete_repository
 
from kallithea.lib.utils import is_valid_repo_uri, make_ui
 
from kallithea.lib.utils2 import LazyProperty, get_current_authuser, obfuscate_url_pw, remove_prefix
 
from kallithea.lib.vcs.backends import get_backend
 
from kallithea.model.db import (URL_SEP, Permission, RepoGroup, Repository, RepositoryField, Session, Statistics, Ui, User, UserGroup, UserGroupRepoGroupToPerm,
 
from kallithea.model.db import (Permission, RepoGroup, Repository, RepositoryField, Session, Statistics, Ui, User, UserGroup, UserGroupRepoGroupToPerm,
 
                                UserGroupRepoToPerm, UserRepoGroupToPerm, UserRepoToPerm)
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class RepoModel(object):
 

	
 
    URL_SEPARATOR = URL_SEP
 

	
 
    def _create_default_perms(self, repository, private):
 
        # create default permission
 
        default = 'repository.read'
 
        def_user = User.get_default_user()
 
        for p in def_user.user_perms:
 
            if p.permission.permission_name.startswith('repository.'):
 
                default = p.permission.permission_name
 
                break
 

	
 
        default_perm = 'repository.none' if private else default
 

	
 
        repo_to_perm = UserRepoToPerm()
 
@@ -335,25 +333,25 @@ class RepoModel(object):
 
        from kallithea.model.scm import ScmModel
 

	
 
        owner = User.guess_instance(owner)
 
        fork_of = Repository.guess_instance(fork_of)
 
        repo_group = RepoGroup.guess_instance(repo_group)
 
        try:
 
            repo_name = repo_name
 
            description = description
 
            # repo name is just a name of repository
 
            # while repo_name_full is a full qualified name that is combined
 
            # with name and path of group
 
            repo_name_full = repo_name
 
            repo_name = repo_name.split(URL_SEP)[-1]
 
            repo_name = repo_name.split(kallithea.URL_SEP)[-1]
 
            if kallithea.lib.utils2.repo_name_slug(repo_name) != repo_name:
 
                raise Exception('invalid repo name %s' % repo_name)
 

	
 
            new_repo = Repository()
 
            new_repo.repo_state = state
 
            new_repo.enable_statistics = False
 
            new_repo.repo_name = repo_name_full
 
            new_repo.repo_type = repo_type
 
            new_repo.owner = owner
 
            new_repo.group = repo_group
 
            new_repo.description = description or repo_name
 
            new_repo.private = private
kallithea/model/repo_group.py
Show inline comments
 
@@ -25,25 +25,24 @@ Original author and date, and relevant c
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import datetime
 
import logging
 
import os
 
import shutil
 
import traceback
 

	
 
import kallithea.lib.utils2
 
from kallithea.lib.utils2 import LazyProperty
 
from kallithea.model import db
 
from kallithea.model.db import Permission, RepoGroup, Repository, Session, Ui, User, UserGroup, UserGroupRepoGroupToPerm, UserRepoGroupToPerm
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class RepoGroupModel(object):
 

	
 
    @LazyProperty
 
    def repos_path(self):
 
        """
 
        Gets the repositories root path from database
 
@@ -107,25 +106,25 @@ class RepoGroupModel(object):
 
        if os.path.isdir(new_path):
 
            raise Exception('Was trying to rename to already '
 
                            'existing dir %s' % new_path)
 
        shutil.move(old_path, new_path)
 

	
 
    def _delete_group(self, group, force_delete=False):
 
        """
 
        Deletes a group from a filesystem
 

	
 
        :param group: instance of group from database
 
        :param force_delete: use shutil rmtree to remove all objects
 
        """
 
        paths = group.full_path.split(db.URL_SEP)
 
        paths = group.full_path.split(kallithea.URL_SEP)
 
        paths = os.sep.join(paths)
 

	
 
        rm_path = os.path.join(self.repos_path, paths)
 
        log.info("Removing group %s", rm_path)
 
        # delete only if that path really exists
 
        if os.path.isdir(rm_path):
 
            if force_delete:
 
                shutil.rmtree(rm_path)
 
            else:
 
                # archive that group
 
                _now = datetime.datetime.now()
 
                _ms = str(_now.microsecond).rjust(6, '0')
kallithea/model/validators.py
Show inline comments
 
@@ -18,31 +18,31 @@ Set of generic validators
 
import logging
 
import os
 
import re
 
from collections import defaultdict
 

	
 
import formencode
 
import ipaddr
 
import sqlalchemy
 
from formencode.validators import CIDR, Bool, Email, FancyValidator, Int, IPAddress, NotEmpty, Number, OneOf, Regex, Set, String, StringBoolean, UnicodeString
 
from sqlalchemy import func
 
from tg.i18n import ugettext as _
 

	
 
import kallithea
 
from kallithea.config.routing import ADMIN_PREFIX
 
from kallithea.lib.auth import HasPermissionAny, HasRepoGroupPermissionLevel
 
from kallithea.lib.compat import OrderedSet
 
from kallithea.lib.exceptions import InvalidCloneUriException, LdapImportError
 
from kallithea.lib.utils import is_valid_repo_uri
 
from kallithea.lib.utils2 import asbool, aslist, repo_name_slug
 
from kallithea.model import db
 
from kallithea.model.db import RepoGroup, Repository, User, UserGroup
 

	
 

	
 
# silence warnings and pylint
 
UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set, \
 
    NotEmpty, IPAddress, CIDR, String, FancyValidator
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def UniqueListFromString():
 
    class _UniqueListFromString(formencode.FancyValidator):
 
@@ -317,25 +317,25 @@ def ValidRepoName(edit=False, old_data=N
 
        }
 

	
 
        def _convert_to_python(self, value, state):
 
            repo_name = repo_name_slug(value.get('repo_name', ''))
 
            repo_group = value.get('repo_group')
 
            if repo_group:
 
                gr = RepoGroup.get(repo_group)
 
                group_path = gr.full_path
 
                group_name = gr.group_name
 
                # value needs to be aware of group name in order to check
 
                # db key This is an actual just the name to store in the
 
                # database
 
                repo_name_full = group_path + db.URL_SEP + repo_name
 
                repo_name_full = group_path + kallithea.URL_SEP + repo_name
 
            else:
 
                group_name = group_path = ''
 
                repo_name_full = repo_name
 

	
 
            value['repo_name'] = repo_name
 
            value['repo_name_full'] = repo_name_full
 
            value['group_path'] = group_path
 
            value['group_name'] = group_name
 
            return value
 

	
 
        def _validate_python(self, value, state):
 
            repo_name = value.get('repo_name')
kallithea/tests/functional/test_admin_repos.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
import os
 
import urllib.parse
 

	
 
import mock
 
import pytest
 

	
 
import kallithea
 
from kallithea.lib import vcs
 
from kallithea.model import db
 
from kallithea.model.db import Permission, Repository, Ui, User, UserRepoToPerm
 
from kallithea.model.meta import Session
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.user import UserModel
 
from kallithea.tests import base
 
from kallithea.tests.fixture import Fixture, raise_exception
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
@@ -106,25 +106,25 @@ class _BaseTestCase(base.TestController)
 

	
 
    def test_create_in_group(self):
 
        self.log_user()
 

	
 
        ## create GROUP
 
        group_name = 'sometest_%s' % self.REPO_TYPE
 
        gr = RepoGroupModel().create(group_name=group_name,
 
                                     group_description='test',
 
                                     owner=base.TEST_USER_ADMIN_LOGIN)
 
        Session().commit()
 

	
 
        repo_name = 'ingroup'
 
        repo_name_full = db.URL_SEP.join([group_name, repo_name])
 
        repo_name_full = kallithea.URL_SEP.join([group_name, repo_name])
 
        description = 'description for newly created repo'
 
        response = self.app.post(base.url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                repo_group=gr.group_id,
 
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(base.url('repo_check_home', repo_name=repo_name_full))
 
        assert response.json == {'result': True}
 
        self.checkSessionFlash(response,
 
@@ -183,39 +183,39 @@ class _BaseTestCase(base.TestController)
 
        gr = RepoGroupModel().create(group_name=group_name,
 
                                     group_description='test',
 
                                     owner=base.TEST_USER_ADMIN_LOGIN)
 
        Session().commit()
 

	
 
        group_name_allowed = 'reg_sometest_allowed_%s' % self.REPO_TYPE
 
        gr_allowed = RepoGroupModel().create(group_name=group_name_allowed,
 
                                     group_description='test',
 
                                     owner=base.TEST_USER_REGULAR_LOGIN)
 
        Session().commit()
 

	
 
        repo_name = 'ingroup'
 
        repo_name_full = db.URL_SEP.join([group_name, repo_name])
 
        repo_name_full = kallithea.URL_SEP.join([group_name, repo_name])
 
        description = 'description for newly created repo'
 
        response = self.app.post(base.url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                repo_group=gr.group_id,
 
                                                _session_csrf_secret_token=session_csrf_secret_token))
 

	
 
        response.mustcontain('Invalid value')
 

	
 
        # user is allowed to create in this group
 
        repo_name = 'ingroup'
 
        repo_name_full = db.URL_SEP.join([group_name_allowed, repo_name])
 
        repo_name_full = kallithea.URL_SEP.join([group_name_allowed, repo_name])
 
        description = 'description for newly created repo'
 
        response = self.app.post(base.url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                repo_group=gr_allowed.group_id,
 
                                                _session_csrf_secret_token=session_csrf_secret_token))
 

	
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(base.url('repo_check_home', repo_name=repo_name_full))
 
        assert response.json == {'result': True}
 
@@ -258,25 +258,25 @@ class _BaseTestCase(base.TestController)
 
        ## create GROUP
 
        group_name = 'sometest_%s' % self.REPO_TYPE
 
        gr = RepoGroupModel().create(group_name=group_name,
 
                                     group_description='test',
 
                                     owner=base.TEST_USER_ADMIN_LOGIN)
 
        perm = Permission.get_by_key('repository.write')
 
        RepoGroupModel().grant_user_permission(gr, base.TEST_USER_REGULAR_LOGIN, perm)
 

	
 
        ## add repo permissions
 
        Session().commit()
 

	
 
        repo_name = 'ingroup_inherited_%s' % self.REPO_TYPE
 
        repo_name_full = db.URL_SEP.join([group_name, repo_name])
 
        repo_name_full = kallithea.URL_SEP.join([group_name, repo_name])
 
        description = 'description for newly created repo'
 
        response = self.app.post(base.url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                repo_group=gr.group_id,
 
                                                repo_copy_permissions=True,
 
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(base.url('repo_check_home', repo_name=repo_name_full))
kallithea/tests/models/test_permissions.py
Show inline comments
 
import kallithea
 
from kallithea.lib.auth import AuthUser
 
from kallithea.model import db
 
from kallithea.model.db import Permission, User, UserGroupRepoGroupToPerm, UserToPerm
 
from kallithea.model.meta import Session
 
from kallithea.model.permission import PermissionModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.user import UserModel
 
from kallithea.model.user_group import UserGroupModel
 
from kallithea.tests import base
 
from kallithea.tests.fixture import Fixture
 

	
 

	
 
fixture = Fixture()
 
@@ -184,25 +184,25 @@ class TestPermissions(base.TestControlle
 
                                               user=self.anon,
 
                                               perm='group.none')
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.repository_group_permissions.get('group1') == 'group.none'
 
        assert u1_auth.repository_group_permissions.get('group2') == 'group.none'
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        assert a1_auth.repository_group_permissions.get('group1') == 'group.none'
 
        assert a1_auth.repository_group_permissions.get('group2') == 'group.none'
 

	
 
        # add repo to group
 
        name = db.URL_SEP.join([self.g1.group_name, 'test_perm'])
 
        name = kallithea.URL_SEP.join([self.g1.group_name, 'test_perm'])
 
        self.test_repo = fixture.create_repo(name=name,
 
                                             repo_type='hg',
 
                                             repo_group=self.g1,
 
                                             cur_user=self.u1,)
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.repository_group_permissions.get('group1') == 'group.none'
 
        assert u1_auth.repository_group_permissions.get('group2') == 'group.none'
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        assert a1_auth.repository_group_permissions.get('group1') == 'group.none'
 
        assert a1_auth.repository_group_permissions.get('group2') == 'group.none'
kallithea/tests/models/test_repo_groups.py
Show inline comments
 
import os
 

	
 
import pytest
 
from sqlalchemy.exc import IntegrityError
 

	
 
from kallithea.model import db
 
import kallithea
 
from kallithea.model.db import RepoGroup
 
from kallithea.model.meta import Session
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.tests import base
 
from kallithea.tests.fixture import Fixture
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
def _update_repo_group(id_, group_name, desc='desc', parent_id=None):
 
@@ -124,25 +124,25 @@ class TestRepoGroups(base.TestController
 
        r = fixture.create_repo('john')
 

	
 
        assert r.repo_name == 'john'
 
        # put repo into group
 
        r = _update_repo('john', repo_group=g1.group_id)
 
        Session().commit()
 
        assert r.repo_name == 'g1/john'
 

	
 
        _update_repo_group(g1.group_id, 'g1', parent_id=g2.group_id)
 
        assert self.__check_path('g2', 'g1')
 

	
 
        # test repo
 
        assert r.repo_name == db.URL_SEP.join(['g2', 'g1', r.just_name])
 
        assert r.repo_name == kallithea.URL_SEP.join(['g2', 'g1', r.just_name])
 

	
 
    def test_move_to_root(self):
 
        g1 = fixture.create_repo_group('t11')
 
        g2 = fixture.create_repo_group('t22', parent_group_id=g1.group_id)
 

	
 
        assert g2.full_path == 't11/t22'
 
        assert self.__check_path('t11', 't22')
 

	
 
        g2 = _update_repo_group(g2.group_id, 'g22', parent_id=None)
 
        Session().commit()
 

	
 
        assert g2.group_name == 'g22'
0 comments (0 inline, 0 general)