Changeset - 1ecd6c0e2787
[Not reviewed]
default
0 6 0
Mads Kiilerich (mads) - 6 years ago 2020-08-18 16:40:19
mads@kiilerich.com
auth: refactor permissions

Avoid using complex vague typing in dict-of-dicts.
6 files changed with 108 insertions and 113 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/admin/my_account.py
Show inline comments
 
@@ -88,13 +88,13 @@ class MyAccountController(BaseController
 

	
 
    def my_account(self):
 
        c.active = 'profile'
 
        self.__load_data()
 
        c.perm_user = AuthUser(user_id=request.authuser.user_id)
 
        managed_fields = auth_modules.get_managed_fields(c.user)
 
        def_user_perms = AuthUser(dbuser=User.get_default_user()).permissions['global']
 
        def_user_perms = AuthUser(dbuser=User.get_default_user()).global_permissions
 
        if 'hg.register.none' in def_user_perms:
 
            managed_fields.extend(['username', 'firstname', 'lastname', 'email'])
 

	
 
        c.readonly = lambda n: 'readonly' if n in managed_fields else None
 

	
 
        defaults = c.user.get_dict()
kallithea/controllers/login.py
Show inline comments
 
@@ -115,13 +115,13 @@ class LoginController(BaseController):
 

	
 
        return render('/login.html')
 

	
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.register.auto_activate',
 
                               'hg.register.manual_activate')
 
    def register(self):
 
        def_user_perms = AuthUser(dbuser=User.get_default_user()).permissions['global']
 
        def_user_perms = AuthUser(dbuser=User.get_default_user()).global_permissions
 
        c.auto_active = 'hg.register.auto_activate' in def_user_perms
 

	
 
        settings = Setting.get_app_settings()
 
        captcha_private_key = settings.get('captcha_private_key')
 
        c.captcha_active = bool(captcha_private_key)
 
        c.captcha_public_key = settings.get('captcha_public_key')
kallithea/lib/auth.py
Show inline comments
 
@@ -40,13 +40,12 @@ from tg.i18n import ugettext as _
 
from webob.exc import HTTPForbidden, HTTPFound
 

	
 
import kallithea
 
from kallithea.config.routing import url
 
from kallithea.lib.utils import get_repo_group_slug, get_repo_slug, get_user_group_slug
 
from kallithea.lib.utils2 import ascii_bytes, ascii_str, safe_bytes
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.model.db import (Permission, UserApiKeys, UserGroup, UserGroupMember, UserGroupRepoGroupToPerm, UserGroupRepoToPerm, UserGroupToPerm,
 
                                UserGroupUserGroupToPerm, UserIpMap, UserToPerm)
 
from kallithea.model.meta import Session
 
from kallithea.model.user import UserModel
 

	
 

	
 
@@ -114,30 +113,30 @@ def check_password(password, hashed):
 
        log.error('error from bcrypt checking password: %s', e)
 
        return False
 
    log.error('check_password failed - no method found for hash length %s', len(hashed))
 
    return False
 

	
 

	
 
def _cached_perms_data(user_id, user_is_admin):
 
    RK = 'repositories'
 
    GK = 'repositories_groups'
 
    UK = 'user_groups'
 
    GLOBAL = 'global'
 
def get_user_permissions(user_id, user_is_admin):
 
    PERM_WEIGHTS = Permission.PERM_WEIGHTS
 
    permissions = {RK: {}, GK: {}, UK: {}, GLOBAL: set()}
 
    repository_permissions = {}
 
    repository_group_permissions = {}
 
    user_group_permissions = {}
 
    global_permissions = set()
 

	
 
    def bump_permission(kind, key, new_perm):
 
        """Add a new permission for kind and key.
 

	
 
    def bump_permission(permissions, key, new_perm):
 
        """Add a new permission for key to permissions.
 
        Assuming the permissions are comparable, set the new permission if it
 
        has higher weight, else drop it and keep the old permission.
 
        """
 
        cur_perm = permissions[kind][key]
 
        cur_perm = permissions[key]
 
        new_perm_val = PERM_WEIGHTS[new_perm]
 
        cur_perm_val = PERM_WEIGHTS[cur_perm]
 
        if new_perm_val > cur_perm_val:
 
            permissions[kind][key] = new_perm
 
            permissions[key] = new_perm
 

	
 
    #======================================================================
 
    # fetch default permissions
 
    #======================================================================
 
    default_repo_perms = Permission.get_default_perms(kallithea.DEFAULT_USER_ID)
 
    default_repo_groups_perms = Permission.get_default_group_perms(kallithea.DEFAULT_USER_ID)
 
@@ -145,69 +144,69 @@ def _cached_perms_data(user_id, user_is_
 

	
 
    if user_is_admin:
 
        #==================================================================
 
        # admin users have all rights;
 
        # based on default permissions, just set everything to admin
 
        #==================================================================
 
        permissions[GLOBAL].add('hg.admin')
 
        global_permissions.add('hg.admin')
 

	
 
        # repositories
 
        for perm in default_repo_perms:
 
            r_k = perm.repository.repo_name
 
            p = 'repository.admin'
 
            permissions[RK][r_k] = p
 
            repository_permissions[r_k] = p
 

	
 
        # repository groups
 
        for perm in default_repo_groups_perms:
 
            rg_k = perm.group.group_name
 
            p = 'group.admin'
 
            permissions[GK][rg_k] = p
 
            repository_group_permissions[rg_k] = p
 

	
 
        # user groups
 
        for perm in default_user_group_perms:
 
            u_k = perm.user_group.users_group_name
 
            p = 'usergroup.admin'
 
            permissions[UK][u_k] = p
 
        return permissions
 
            user_group_permissions[u_k] = p
 
        return (repository_permissions, repository_group_permissions, user_group_permissions, global_permissions)
 

	
 
    #==================================================================
 
    # SET DEFAULTS GLOBAL, REPOS, REPOSITORY GROUPS
 
    #==================================================================
 

	
 
    # default global permissions taken from the default user
 
    default_global_perms = UserToPerm.query() \
 
        .filter(UserToPerm.user_id == kallithea.DEFAULT_USER_ID) \
 
        .options(joinedload(UserToPerm.permission))
 

	
 
    for perm in default_global_perms:
 
        permissions[GLOBAL].add(perm.permission.permission_name)
 
        global_permissions.add(perm.permission.permission_name)
 

	
 
    # defaults for repositories, taken from default user
 
    for perm in default_repo_perms:
 
        r_k = perm.repository.repo_name
 
        if perm.repository.owner_id == user_id:
 
            p = 'repository.admin'
 
        elif perm.repository.private:
 
            p = 'repository.none'
 
        else:
 
            p = perm.permission.permission_name
 
        permissions[RK][r_k] = p
 
        repository_permissions[r_k] = p
 

	
 
    # defaults for repository groups taken from default user permission
 
    # on given group
 
    for perm in default_repo_groups_perms:
 
        rg_k = perm.group.group_name
 
        p = perm.permission.permission_name
 
        permissions[GK][rg_k] = p
 
        repository_group_permissions[rg_k] = p
 

	
 
    # defaults for user groups taken from default user permission
 
    # on given user group
 
    for perm in default_user_group_perms:
 
        u_k = perm.user_group.users_group_name
 
        p = perm.permission.permission_name
 
        permissions[UK][u_k] = p
 
        user_group_permissions[u_k] = p
 

	
 
    #======================================================================
 
    # !! Augment GLOBALS with user permissions if any found !!
 
    #======================================================================
 

	
 
    # USER GROUPS comes first
 
@@ -226,28 +225,28 @@ def _cached_perms_data(user_id, user_is_
 
    # one group
 
    _grouped = [[x, list(y)] for x, y in
 
                itertools.groupby(user_perms_from_users_groups,
 
                                  lambda x:x.users_group)]
 
    for gr, perms in _grouped:
 
        for perm in perms:
 
            permissions[GLOBAL].add(perm.permission.permission_name)
 
            global_permissions.add(perm.permission.permission_name)
 

	
 
    # user specific global permissions
 
    user_perms = Session().query(UserToPerm) \
 
            .options(joinedload(UserToPerm.permission)) \
 
            .filter(UserToPerm.user_id == user_id).all()
 

	
 
    for perm in user_perms:
 
        permissions[GLOBAL].add(perm.permission.permission_name)
 
        global_permissions.add(perm.permission.permission_name)
 

	
 
    # for each kind of global permissions, only keep the one with heighest weight
 
    kind_max_perm = {}
 
    for perm in sorted(permissions[GLOBAL], key=lambda n: PERM_WEIGHTS.get(n, -1)):
 
    for perm in sorted(global_permissions, key=lambda n: PERM_WEIGHTS.get(n, -1)):
 
        kind = perm.rsplit('.', 1)[0]
 
        kind_max_perm[kind] = perm
 
    permissions[GLOBAL] = set(kind_max_perm.values())
 
    global_permissions = set(kind_max_perm.values())
 
    ## END GLOBAL PERMISSIONS
 

	
 
    #======================================================================
 
    # !! PERMISSIONS FOR REPOSITORIES !!
 
    #======================================================================
 
    #======================================================================
 
@@ -266,20 +265,20 @@ def _cached_perms_data(user_id, user_is_
 
        .filter(UserGroupMember.user_id == user_id) \
 
        .options(joinedload(UserGroupRepoToPerm.repository)) \
 
        .options(joinedload(UserGroupRepoToPerm.permission)) \
 
        .all()
 

	
 
    for perm in user_repo_perms_from_users_groups:
 
        bump_permission(RK,
 
        bump_permission(repository_permissions,
 
            perm.repository.repo_name,
 
            perm.permission.permission_name)
 

	
 
    # user permissions for repositories
 
    user_repo_perms = Permission.get_default_perms(user_id)
 
    for perm in user_repo_perms:
 
        bump_permission(RK,
 
        bump_permission(repository_permissions,
 
            perm.repository.repo_name,
 
            perm.permission.permission_name)
 

	
 
    #======================================================================
 
    # !! PERMISSIONS FOR REPOSITORY GROUPS !!
 
    #======================================================================
 
@@ -297,20 +296,20 @@ def _cached_perms_data(user_id, user_is_
 
            == UserGroupMember.users_group_id)) \
 
     .filter(UserGroupMember.user_id == user_id) \
 
     .options(joinedload(UserGroupRepoGroupToPerm.permission)) \
 
     .all()
 

	
 
    for perm in user_repo_group_perms_from_users_groups:
 
        bump_permission(GK,
 
        bump_permission(repository_group_permissions,
 
            perm.group.group_name,
 
            perm.permission.permission_name)
 

	
 
    # user explicit permissions for repository groups
 
    user_repo_groups_perms = Permission.get_default_group_perms(user_id)
 
    for perm in user_repo_groups_perms:
 
        bump_permission(GK,
 
        bump_permission(repository_group_permissions,
 
            perm.group.group_name,
 
            perm.permission.permission_name)
 

	
 
    #======================================================================
 
    # !! PERMISSIONS FOR USER GROUPS !!
 
    #======================================================================
 
@@ -326,24 +325,24 @@ def _cached_perms_data(user_id, user_is_
 
            UserGroup.users_group_id), aliased=True, from_joinpoint=True) \
 
     .filter(UserGroup.users_group_active == True) \
 
     .options(joinedload(UserGroupUserGroupToPerm.permission)) \
 
     .all()
 

	
 
    for perm in user_group_user_groups_perms:
 
        bump_permission(UK,
 
        bump_permission(user_group_permissions,
 
            perm.target_user_group.users_group_name,
 
            perm.permission.permission_name)
 

	
 
    # user explicit permission for user groups
 
    user_user_groups_perms = Permission.get_default_user_group_perms(user_id)
 
    for perm in user_user_groups_perms:
 
        bump_permission(UK,
 
        bump_permission(user_group_permissions,
 
            perm.user_group.users_group_name,
 
            perm.permission.permission_name)
 

	
 
    return permissions
 
    return (repository_permissions, repository_group_permissions, user_group_permissions, global_permissions)
 

	
 

	
 
class AuthUser(object):
 
    """
 
    Represents a Kallithea user, including various authentication and
 
    authorization information. Typically used to store the current user,
 
@@ -425,55 +424,53 @@ class AuthUser(object):
 
            for k, v in dbuser.get_dict().items():
 
                assert k not in ['api_keys', 'permissions']
 
                setattr(self, k, v)
 
            self.is_default_user = dbuser.is_default_user
 
        log.debug('Auth User is now %s', self)
 

	
 
    @LazyProperty
 
    def permissions(self):
 
        """
 
        Fills user permission attribute with permissions taken from database
 
        works for permissions given for repositories, and for permissions that
 
        are granted to groups
 

	
 
        :param user: `AuthUser` instance
 
        """
 
        log.debug('Getting PERMISSION tree for %s', self)
 
        return _cached_perms_data(self.user_id, self.is_admin)
 
        (self.repository_permissions, self.repository_group_permissions, self.user_group_permissions, self.global_permissions,
 
        )= get_user_permissions(self.user_id, self.is_admin)
 
        self.permissions = {
 
            'global': self.global_permissions,
 
            'repositories': self.repository_permissions,
 
            'repositories_groups': self.repository_group_permissions,
 
            'user_groups': self.user_group_permissions,
 
        } # backwards compatibility
 

	
 
    def has_repository_permission_level(self, repo_name, level, purpose=None):
 
        required_perms = {
 
            'read': ['repository.read', 'repository.write', 'repository.admin'],
 
            'write': ['repository.write', 'repository.admin'],
 
            'admin': ['repository.admin'],
 
        }[level]
 
        actual_perm = self.permissions['repositories'].get(repo_name)
 
        actual_perm = self.repository_permissions.get(repo_name)
 
        ok = actual_perm in required_perms
 
        log.debug('Checking if user %r can %r repo %r (%s): %s (has %r)',
 
            self.username, level, repo_name, purpose, ok, actual_perm)
 
        return ok
 

	
 
    def has_repository_group_permission_level(self, repo_group_name, level, purpose=None):
 
        required_perms = {
 
            'read': ['group.read', 'group.write', 'group.admin'],
 
            'write': ['group.write', 'group.admin'],
 
            'admin': ['group.admin'],
 
        }[level]
 
        actual_perm = self.permissions['repositories_groups'].get(repo_group_name)
 
        actual_perm = self.repository_group_permissions.get(repo_group_name)
 
        ok = actual_perm in required_perms
 
        log.debug('Checking if user %r can %r repo group %r (%s): %s (has %r)',
 
            self.username, level, repo_group_name, purpose, ok, actual_perm)
 
        return ok
 

	
 
    def has_user_group_permission_level(self, user_group_name, level, purpose=None):
 
        required_perms = {
 
            'read': ['usergroup.read', 'usergroup.write', 'usergroup.admin'],
 
            'write': ['usergroup.write', 'usergroup.admin'],
 
            'admin': ['usergroup.admin'],
 
        }[level]
 
        actual_perm = self.permissions['user_groups'].get(user_group_name)
 
        actual_perm = self.user_group_permissions.get(user_group_name)
 
        ok = actual_perm in required_perms
 
        log.debug('Checking if user %r can %r user group %r (%s): %s (has %r)',
 
            self.username, level, user_group_name, purpose, ok, actual_perm)
 
        return ok
 

	
 
    @property
 
@@ -494,29 +491,29 @@ class AuthUser(object):
 

	
 
    @property
 
    def repositories_admin(self):
 
        """
 
        Returns list of repositories you're an admin of
 
        """
 
        return [x[0] for x in self.permissions['repositories'].items()
 
        return [x[0] for x in self.repository_permissions.items()
 
                if x[1] == 'repository.admin']
 

	
 
    @property
 
    def repository_groups_admin(self):
 
        """
 
        Returns list of repository groups you're an admin of
 
        """
 
        return [x[0] for x in self.permissions['repositories_groups'].items()
 
        return [x[0] for x in self.repository_group_permissions.items()
 
                if x[1] == 'group.admin']
 

	
 
    @property
 
    def user_groups_admin(self):
 
        """
 
        Returns list of user groups you're an admin of
 
        """
 
        return [x[0] for x in self.permissions['user_groups'].items()
 
        return [x[0] for x in self.user_group_permissions.items()
 
                if x[1] == 'usergroup.admin']
 

	
 
    def __repr__(self):
 
        return "<%s %s: %r>" % (self.__class__.__name__, self.user_id, self.username)
 

	
 
    def to_cookie(self):
 
@@ -669,14 +666,13 @@ class _PermsDecorator(object):
 
class HasPermissionAnyDecorator(_PermsDecorator):
 
    """
 
    Checks the user has any of the given global permissions.
 
    """
 

	
 
    def check_permissions(self, user):
 
        global_permissions = user.permissions['global'] # usually very short
 
        return any(p in global_permissions for p in self.required_perms)
 
        return any(p in user.global_permissions for p in self.required_perms)
 

	
 

	
 
class _PermDecorator(_PermsDecorator):
 
    """Base class for controller decorators with a single permission"""
 

	
 
    def __init__(self, required_perm):
 
@@ -736,14 +732,13 @@ class _PermsFunction(object):
 
        raise NotImplementedError()
 

	
 

	
 
class HasPermissionAny(_PermsFunction):
 

	
 
    def __call__(self, purpose=None):
 
        global_permissions = request.authuser.permissions['global'] # usually very short
 
        ok = any(p in global_permissions for p in self.required_perms)
 
        ok = any(p in request.authuser.global_permissions for p in self.required_perms)
 

	
 
        log.debug('Check %s for global %s (%s): %s',
 
            request.authuser.username, self.required_perms, purpose, ok)
 
        return ok
 

	
 

	
 
@@ -780,13 +775,13 @@ class HasUserGroupPermissionLevel(_PermF
 
class HasPermissionAnyMiddleware(object):
 
    def __init__(self, *perms):
 
        self.required_perms = set(perms)
 

	
 
    def __call__(self, authuser, repo_name, purpose=None):
 
        try:
 
            ok = authuser.permissions['repositories'][repo_name] in self.required_perms
 
            ok = authuser.repository_permissions[repo_name] in self.required_perms
 
        except KeyError:
 
            ok = False
 

	
 
        log.debug('Middleware check %s for %s for repo %s (%s): %s', authuser.username, self.required_perms, repo_name, purpose, ok)
 
        return ok
 

	
kallithea/lib/auth_modules/__init__.py
Show inline comments
 
@@ -237,13 +237,13 @@ class KallitheaExternalAuthPlugin(Kallit
 

	
 
    def _authenticate(self, userobj, username, passwd, settings, **kwargs):
 
        user_data = super(KallitheaExternalAuthPlugin, self)._authenticate(
 
            userobj, username, passwd, settings, **kwargs)
 
        if user_data is not None:
 
            if userobj is None: # external authentication of unknown user that will be created soon
 
                def_user_perms = AuthUser(dbuser=User.get_default_user()).permissions['global']
 
                def_user_perms = AuthUser(dbuser=User.get_default_user()).global_permissions
 
                active = 'hg.extern_activate.auto' in def_user_perms
 
            else:
 
                active = userobj.active
 

	
 
            if self.use_fake_password():
 
                # Randomize the PW because we don't need it, but don't want
kallithea/model/repo.py
Show inline comments
 
@@ -99,13 +99,13 @@ class RepoModel(object):
 

	
 
        :param user:
 
        """
 
        from kallithea.lib.auth import AuthUser
 
        auth_user = AuthUser(dbuser=User.guess_instance(user))
 
        repos = [repo_name
 
            for repo_name, perm in auth_user.permissions['repositories'].items()
 
            for repo_name, perm in auth_user.repository_permissions.items()
 
            if perm in ['repository.read', 'repository.write', 'repository.admin']
 
            ]
 
        return Repository.query().filter(Repository.repo_name.in_(repos))
 

	
 
    @classmethod
 
    def _render_datatable(cls, tmpl, *args, **kwargs):
kallithea/tests/models/test_permissions.py
Show inline comments
 
@@ -65,69 +65,69 @@ class TestPermissions(base.TestControlle
 
            UserGroupModel().delete(self.ug1, force=True)
 

	
 
        Session().commit()
 

	
 
    def test_default_perms_set(self):
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories'][base.HG_REPO] == 'repository.read'
 
        assert u1_auth.repository_permissions[base.HG_REPO] == 'repository.read'
 
        new_perm = 'repository.write'
 
        RepoModel().grant_user_permission(repo=base.HG_REPO, user=self.u1,
 
                                          perm=new_perm)
 
        Session().commit()
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories'][base.HG_REPO] == new_perm
 
        assert u1_auth.repository_permissions[base.HG_REPO] == new_perm
 

	
 
    def test_default_admin_perms_set(self):
 
        a1_auth = AuthUser(user_id=self.a1.user_id)
 
        assert a1_auth.permissions['repositories'][base.HG_REPO] == 'repository.admin'
 
        assert a1_auth.repository_permissions[base.HG_REPO] == 'repository.admin'
 
        new_perm = 'repository.write'
 
        RepoModel().grant_user_permission(repo=base.HG_REPO, user=self.a1,
 
                                          perm=new_perm)
 
        Session().commit()
 
        # cannot really downgrade admins permissions !? they still gets set as
 
        # admin !
 
        u1_auth = AuthUser(user_id=self.a1.user_id)
 
        assert u1_auth.permissions['repositories'][base.HG_REPO] == 'repository.admin'
 
        assert u1_auth.repository_permissions[base.HG_REPO] == 'repository.admin'
 

	
 
    def test_default_group_perms(self):
 
        self.g1 = fixture.create_repo_group('test1', skip_if_exists=True)
 
        self.g2 = fixture.create_repo_group('test2', skip_if_exists=True)
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories'][base.HG_REPO] == 'repository.read'
 
        assert u1_auth.permissions['repositories_groups'].get('test1') == 'group.read'
 
        assert u1_auth.permissions['repositories_groups'].get('test2') == 'group.read'
 
        assert u1_auth.permissions['global'] == set(Permission.DEFAULT_USER_PERMISSIONS)
 
        assert u1_auth.repository_permissions[base.HG_REPO] == 'repository.read'
 
        assert u1_auth.repository_group_permissions.get('test1') == 'group.read'
 
        assert u1_auth.repository_group_permissions.get('test2') == 'group.read'
 
        assert u1_auth.global_permissions == set(Permission.DEFAULT_USER_PERMISSIONS)
 

	
 
    def test_default_admin_group_perms(self):
 
        self.g1 = fixture.create_repo_group('test1', skip_if_exists=True)
 
        self.g2 = fixture.create_repo_group('test2', skip_if_exists=True)
 
        a1_auth = AuthUser(user_id=self.a1.user_id)
 
        assert a1_auth.permissions['repositories'][base.HG_REPO] == 'repository.admin'
 
        assert a1_auth.permissions['repositories_groups'].get('test1') == 'group.admin'
 
        assert a1_auth.permissions['repositories_groups'].get('test2') == 'group.admin'
 
        assert a1_auth.repository_permissions[base.HG_REPO] == 'repository.admin'
 
        assert a1_auth.repository_group_permissions.get('test1') == 'group.admin'
 
        assert a1_auth.repository_group_permissions.get('test2') == 'group.admin'
 

	
 
    def test_propagated_permission_from_users_group_by_explicit_perms_exist(self):
 
        # make group
 
        self.ug1 = fixture.create_user_group('G1')
 
        UserGroupModel().add_user_to_group(self.ug1, self.u1)
 

	
 
        # set user permission none
 
        RepoModel().grant_user_permission(repo=base.HG_REPO, user=self.u1, perm='repository.none')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories'][base.HG_REPO] == 'repository.read' # inherit from default user
 
        assert u1_auth.repository_permissions[base.HG_REPO] == 'repository.read' # inherit from default user
 

	
 
        # grant perm for group this should override permission from user
 
        RepoModel().grant_user_group_permission(repo=base.HG_REPO,
 
                                                 group_name=self.ug1,
 
                                                 perm='repository.write')
 

	
 
        # verify that user group permissions win
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories'][base.HG_REPO] == 'repository.write'
 
        assert u1_auth.repository_permissions[base.HG_REPO] == 'repository.write'
 

	
 
    def test_propagated_permission_from_users_group(self):
 
        # make group
 
        self.ug1 = fixture.create_user_group('G1')
 
        UserGroupModel().add_user_to_group(self.ug1, self.u3)
 

	
 
@@ -135,13 +135,13 @@ class TestPermissions(base.TestControlle
 
        new_perm_gr = 'repository.write'
 
        RepoModel().grant_user_group_permission(repo=base.HG_REPO,
 
                                                 group_name=self.ug1,
 
                                                 perm=new_perm_gr)
 
        # check perms
 
        u3_auth = AuthUser(user_id=self.u3.user_id)
 
        assert u3_auth.permissions['repositories'][base.HG_REPO] == new_perm_gr
 
        assert u3_auth.repository_permissions[base.HG_REPO] == new_perm_gr
 

	
 
    def test_propagated_permission_from_users_group_lower_weight(self):
 
        # make group
 
        self.ug1 = fixture.create_user_group('G1')
 
        # add user to group
 
        UserGroupModel().add_user_to_group(self.ug1, self.u1)
 
@@ -149,93 +149,93 @@ class TestPermissions(base.TestControlle
 
        # set permission to lower
 
        new_perm_h = 'repository.write'
 
        RepoModel().grant_user_permission(repo=base.HG_REPO, user=self.u1,
 
                                          perm=new_perm_h)
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories'][base.HG_REPO] == new_perm_h
 
        assert u1_auth.repository_permissions[base.HG_REPO] == new_perm_h
 

	
 
        # grant perm for group this should NOT override permission from user
 
        # since it's lower than granted
 
        new_perm_l = 'repository.read'
 
        RepoModel().grant_user_group_permission(repo=base.HG_REPO,
 
                                                 group_name=self.ug1,
 
                                                 perm=new_perm_l)
 
        # check perms
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories'][base.HG_REPO] == new_perm_h
 
        assert u1_auth.repository_permissions[base.HG_REPO] == new_perm_h
 

	
 
    def test_repo_in_group_permissions(self):
 
        self.g1 = fixture.create_repo_group('group1', skip_if_exists=True)
 
        self.g2 = fixture.create_repo_group('group2', skip_if_exists=True)
 
        # both perms should be read !
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories_groups'].get('group1') == 'group.read'
 
        assert u1_auth.permissions['repositories_groups'].get('group2') == 'group.read'
 
        assert u1_auth.repository_group_permissions.get('group1') == 'group.read'
 
        assert u1_auth.repository_group_permissions.get('group2') == 'group.read'
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        assert a1_auth.permissions['repositories_groups'].get('group1') == 'group.read'
 
        assert a1_auth.permissions['repositories_groups'].get('group2') == 'group.read'
 
        assert a1_auth.repository_group_permissions.get('group1') == 'group.read'
 
        assert a1_auth.repository_group_permissions.get('group2') == 'group.read'
 

	
 
        # Change perms to none for both groups
 
        RepoGroupModel().grant_user_permission(repo_group=self.g1,
 
                                               user=self.anon,
 
                                               perm='group.none')
 
        RepoGroupModel().grant_user_permission(repo_group=self.g2,
 
                                               user=self.anon,
 
                                               perm='group.none')
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories_groups'].get('group1') == 'group.none'
 
        assert u1_auth.permissions['repositories_groups'].get('group2') == 'group.none'
 
        assert u1_auth.repository_group_permissions.get('group1') == 'group.none'
 
        assert u1_auth.repository_group_permissions.get('group2') == 'group.none'
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        assert a1_auth.permissions['repositories_groups'].get('group1') == 'group.none'
 
        assert a1_auth.permissions['repositories_groups'].get('group2') == 'group.none'
 
        assert a1_auth.repository_group_permissions.get('group1') == 'group.none'
 
        assert a1_auth.repository_group_permissions.get('group2') == 'group.none'
 

	
 
        # add repo to group
 
        name = db.URL_SEP.join([self.g1.group_name, 'test_perm'])
 
        self.test_repo = fixture.create_repo(name=name,
 
                                             repo_type='hg',
 
                                             repo_group=self.g1,
 
                                             cur_user=self.u1,)
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories_groups'].get('group1') == 'group.none'
 
        assert u1_auth.permissions['repositories_groups'].get('group2') == 'group.none'
 
        assert u1_auth.repository_group_permissions.get('group1') == 'group.none'
 
        assert u1_auth.repository_group_permissions.get('group2') == 'group.none'
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        assert a1_auth.permissions['repositories_groups'].get('group1') == 'group.none'
 
        assert a1_auth.permissions['repositories_groups'].get('group2') == 'group.none'
 
        assert a1_auth.repository_group_permissions.get('group1') == 'group.none'
 
        assert a1_auth.repository_group_permissions.get('group2') == 'group.none'
 

	
 
        # grant permission for u2 !
 
        RepoGroupModel().grant_user_permission(repo_group=self.g1, user=self.u2,
 
                                               perm='group.read')
 
        RepoGroupModel().grant_user_permission(repo_group=self.g2, user=self.u2,
 
                                               perm='group.read')
 
        Session().commit()
 
        assert self.u1 != self.u2
 
        # u1 and anon should have not change perms while u2 should !
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories_groups'].get('group1') == 'group.none'
 
        assert u1_auth.permissions['repositories_groups'].get('group2') == 'group.none'
 
        assert u1_auth.repository_group_permissions.get('group1') == 'group.none'
 
        assert u1_auth.repository_group_permissions.get('group2') == 'group.none'
 

	
 
        u2_auth = AuthUser(user_id=self.u2.user_id)
 
        assert u2_auth.permissions['repositories_groups'].get('group1') == 'group.read'
 
        assert u2_auth.permissions['repositories_groups'].get('group2') == 'group.read'
 
        assert u2_auth.repository_group_permissions.get('group1') == 'group.read'
 
        assert u2_auth.repository_group_permissions.get('group2') == 'group.read'
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        assert a1_auth.permissions['repositories_groups'].get('group1') == 'group.none'
 
        assert a1_auth.permissions['repositories_groups'].get('group2') == 'group.none'
 
        assert a1_auth.repository_group_permissions.get('group1') == 'group.none'
 
        assert a1_auth.repository_group_permissions.get('group2') == 'group.none'
 

	
 
    def test_repo_group_user_as_user_group_member(self):
 
        # create Group1
 
        self.g1 = fixture.create_repo_group('group1', skip_if_exists=True)
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 

	
 
        assert a1_auth.permissions['repositories_groups'].get('group1') == 'group.read'
 
        assert a1_auth.repository_group_permissions.get('group1') == 'group.read'
 

	
 
        # set default permission to none
 
        RepoGroupModel().grant_user_permission(repo_group=self.g1,
 
                                               user=self.anon,
 
                                               perm='group.none')
 
        # make group
 
@@ -248,16 +248,16 @@ class TestPermissions(base.TestControlle
 
        members = [x.user_id for x in UserGroupModel().get(self.ug1.users_group_id).members]
 
        assert members == [self.u1.user_id]
 
        # add some user to that group
 

	
 
        # check his permissions
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        assert a1_auth.permissions['repositories_groups'].get('group1') == 'group.none'
 
        assert a1_auth.repository_group_permissions.get('group1') == 'group.none'
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories_groups'].get('group1') == 'group.none'
 
        assert u1_auth.repository_group_permissions.get('group1') == 'group.none'
 

	
 
        # grant ug1 read permissions for
 
        RepoGroupModel().grant_user_group_permission(repo_group=self.g1,
 
                                                      group_name=self.ug1,
 
                                                      perm='group.read')
 
        Session().commit()
 
@@ -267,29 +267,29 @@ class TestPermissions(base.TestControlle
 
            .filter(UserGroupRepoGroupToPerm.users_group == self.ug1) \
 
            .scalar()
 
        assert obj.permission.permission_name == 'group.read'
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 

	
 
        assert a1_auth.permissions['repositories_groups'].get('group1') == 'group.none'
 
        assert a1_auth.repository_group_permissions.get('group1') == 'group.none'
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories_groups'].get('group1') == 'group.read'
 
        assert u1_auth.repository_group_permissions.get('group1') == 'group.read'
 

	
 
    def test_inherit_nice_permissions_from_default_user(self):
 
        user_model = UserModel()
 
        # enable fork and create on default user
 
        usr = 'default'
 
        user_model.revoke_perm(usr, 'hg.create.none')
 
        user_model.grant_perm(usr, 'hg.create.repository')
 
        user_model.revoke_perm(usr, 'hg.fork.none')
 
        user_model.grant_perm(usr, 'hg.fork.repository')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        # this user will have inherited permissions from default user
 
        assert u1_auth.permissions['global'] == set(['hg.create.repository', 'hg.fork.repository',
 
        assert u1_auth.global_permissions == set(['hg.create.repository', 'hg.fork.repository',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read'])
 

	
 
    def test_inherit_sad_permissions_from_default_user(self):
 
@@ -300,13 +300,13 @@ class TestPermissions(base.TestControlle
 
        user_model.grant_perm(usr, 'hg.create.none')
 
        user_model.revoke_perm(usr, 'hg.fork.repository')
 
        user_model.grant_perm(usr, 'hg.fork.none')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        # this user will have inherited permissions from default user
 
        assert u1_auth.permissions['global'] == set(['hg.create.none', 'hg.fork.none',
 
        assert u1_auth.global_permissions == set(['hg.create.none', 'hg.fork.none',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read'])
 

	
 
    def test_inherit_more_permissions_from_default_user(self):
 
@@ -324,13 +324,13 @@ class TestPermissions(base.TestControlle
 
        user_model.revoke_perm(self.u1, 'hg.fork.repository')
 
        user_model.grant_perm(self.u1, 'hg.fork.none')
 

	
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        # this user will have inherited more permissions from default user
 
        assert u1_auth.permissions['global'] == set([
 
        assert u1_auth.global_permissions == set([
 
                              'hg.create.repository',
 
                              'hg.fork.repository',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read'])
 
@@ -350,13 +350,13 @@ class TestPermissions(base.TestControlle
 
        user_model.revoke_perm(self.u1, 'hg.fork.none')
 
        user_model.grant_perm(self.u1, 'hg.fork.repository')
 

	
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        # this user will have inherited less permissions from default user
 
        assert u1_auth.permissions['global'] == set([
 
        assert u1_auth.global_permissions == set([
 
                              'hg.create.repository',
 
                              'hg.fork.repository',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read'])
 
@@ -383,13 +383,13 @@ class TestPermissions(base.TestControlle
 
        user_model.revoke_perm(usr, 'hg.fork.repository')
 
        user_model.grant_perm(usr, 'hg.fork.none')
 

	
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 

	
 
        assert u1_auth.permissions['global'] == set(['hg.create.none', 'hg.fork.none',
 
        assert u1_auth.global_permissions == set(['hg.create.none', 'hg.fork.none',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read',
 
                              ])
 

	
 
@@ -415,13 +415,13 @@ class TestPermissions(base.TestControlle
 
        user_model.revoke_perm(usr, 'hg.fork.none')
 
        user_model.grant_perm(usr, 'hg.fork.repository')
 

	
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 

	
 
        assert u1_auth.permissions['global'] == set(['hg.create.repository', 'hg.fork.repository',
 
        assert u1_auth.global_permissions == set(['hg.create.repository', 'hg.fork.repository',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read',
 
                              ])
 

	
 
@@ -444,13 +444,13 @@ class TestPermissions(base.TestControlle
 
        # enable only write access for default user on repo
 
        RepoModel().grant_user_permission(self.test_repo,
 
                                          user='default',
 
                                          perm='repository.write')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories']['myownrepo'] == 'repository.write'
 
        assert u1_auth.repository_permissions['myownrepo'] == 'repository.write'
 

	
 
    def test_inactive_user_group_does_not_affect_repo_permissions_inverse(self):
 
        self.ug1 = fixture.create_user_group('G1')
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 
@@ -468,13 +468,13 @@ class TestPermissions(base.TestControlle
 
        # enable admin access for default user on repo
 
        RepoModel().grant_user_permission(self.test_repo,
 
                                          user='default',
 
                                          perm='repository.admin')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories']['myownrepo'] == 'repository.admin'
 
        assert u1_auth.repository_permissions['myownrepo'] == 'repository.admin'
 

	
 
    def test_inactive_user_group_does_not_affect_repo_group_permissions(self):
 
        self.ug1 = fixture.create_user_group('G1')
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 
@@ -488,13 +488,13 @@ class TestPermissions(base.TestControlle
 
        # enable only write access for default user on repo group
 
        RepoGroupModel().grant_user_permission(self.g1,
 
                                               user='default',
 
                                               perm='group.write')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories_groups'].get('group1') == 'group.write'
 
        assert u1_auth.repository_group_permissions.get('group1') == 'group.write'
 

	
 
    def test_inactive_user_group_does_not_affect_repo_group_permissions_inverse(self):
 
        self.ug1 = fixture.create_user_group('G1')
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 
@@ -508,13 +508,13 @@ class TestPermissions(base.TestControlle
 
        # enable admin access for default user on repo group
 
        RepoGroupModel().grant_user_permission(self.g1,
 
                                               user='default',
 
                                               perm='group.admin')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories_groups'].get('group1') == 'group.admin'
 
        assert u1_auth.repository_group_permissions.get('group1') == 'group.admin'
 

	
 
    def test_inactive_user_group_does_not_affect_user_group_permissions(self):
 
        self.ug1 = fixture.create_user_group('G1')
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 
@@ -528,14 +528,14 @@ class TestPermissions(base.TestControlle
 
        # enable only write access for default user on user group
 
        UserGroupModel().grant_user_permission(self.ug2,
 
                                               user='default',
 
                                               perm='usergroup.write')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['user_groups']['G1'] == 'usergroup.read'
 
        assert u1_auth.permissions['user_groups']['G2'] == 'usergroup.write'
 
        assert u1_auth.user_group_permissions['G1'] == 'usergroup.read'
 
        assert u1_auth.user_group_permissions['G2'] == 'usergroup.write'
 

	
 
    def test_inactive_user_group_does_not_affect_user_group_permissions_inverse(self):
 
        self.ug1 = fixture.create_user_group('G1')
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 
@@ -549,50 +549,50 @@ class TestPermissions(base.TestControlle
 
        # enable admin access for default user on user group
 
        UserGroupModel().grant_user_permission(self.ug2,
 
                                               user='default',
 
                                               perm='usergroup.admin')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['user_groups']['G1'] == 'usergroup.read'
 
        assert u1_auth.permissions['user_groups']['G2'] == 'usergroup.admin'
 
        assert u1_auth.user_group_permissions['G1'] == 'usergroup.read'
 
        assert u1_auth.user_group_permissions['G2'] == 'usergroup.admin'
 

	
 
    def test_owner_permissions_doesnot_get_overwritten_by_group(self):
 
        # create repo as USER,
 
        self.test_repo = fixture.create_repo(name='myownrepo',
 
                                             repo_type='hg',
 
                                             cur_user=self.u1)
 

	
 
        # he has permissions of admin as owner
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories']['myownrepo'] == 'repository.admin'
 
        assert u1_auth.repository_permissions['myownrepo'] == 'repository.admin'
 
        # set his permission as user group, he should still be admin
 
        self.ug1 = fixture.create_user_group('G1')
 
        UserGroupModel().add_user_to_group(self.ug1, self.u1)
 
        RepoModel().grant_user_group_permission(self.test_repo,
 
                                                 group_name=self.ug1,
 
                                                 perm='repository.none')
 

	
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories']['myownrepo'] == 'repository.admin'
 
        assert u1_auth.repository_permissions['myownrepo'] == 'repository.admin'
 

	
 
    def test_owner_permissions_doesnot_get_overwritten_by_others(self):
 
        # create repo as USER,
 
        self.test_repo = fixture.create_repo(name='myownrepo',
 
                                             repo_type='hg',
 
                                             cur_user=self.u1)
 

	
 
        # he has permissions of admin as owner
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories']['myownrepo'] == 'repository.admin'
 
        assert u1_auth.repository_permissions['myownrepo'] == 'repository.admin'
 
        # set his permission as user, he should still be admin
 
        RepoModel().grant_user_permission(self.test_repo, user=self.u1,
 
                                          perm='repository.none')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories']['myownrepo'] == 'repository.admin'
 
        assert u1_auth.repository_permissions['myownrepo'] == 'repository.admin'
 

	
 
    def _test_def_perm_equal(self, user, change_factor=0):
 
        perms = UserToPerm.query() \
 
                .filter(UserToPerm.user == user) \
 
                .all()
 
        assert len(perms) == len(Permission.DEFAULT_USER_PERMISSIONS,)+change_factor, perms
0 comments (0 inline, 0 general)