Changeset - ae3d9b4c043e
[Not reviewed]
default
0 1 0
Mads Kiilerich (mads) - 6 years ago 2020-08-23 14:26:21
mads@kiilerich.com
Grafted from: f86af7b1cdaa
auth: compute AuthUser.permissions lazily

Prepare for computing each kind of permissions separately so they can be
skipped in the common case.

This will undo a part of 1ecd6c0e2787 where we took the detour of assigning
permissions in __init__ - now we move it back to a LazyProperty, but in a
better way.
1 file changed with 7 insertions and 2 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/auth.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.auth
 
~~~~~~~~~~~~~~~~~~
 

	
 
authentication and permission libraries
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 4, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 
import hashlib
 
import itertools
 
import logging
 
import os
 
import string
 

	
 
import bcrypt
 
import ipaddr
 
from decorator import decorator
 
from sqlalchemy.orm import joinedload
 
from sqlalchemy.orm.exc import ObjectDeletedError
 
from tg import request
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPForbidden, HTTPFound
 

	
 
import kallithea
 
from kallithea.config.routing import url
 
from kallithea.lib.utils import get_repo_group_slug, get_repo_slug, get_user_group_slug
 
from kallithea.lib.utils2 import ascii_bytes, ascii_str, safe_bytes
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.model.db import (Permission, UserApiKeys, UserGroup, UserGroupMember, UserGroupRepoGroupToPerm, UserGroupRepoToPerm, UserGroupToPerm,
 
                                UserGroupUserGroupToPerm, UserIpMap, UserToPerm)
 
from kallithea.model.meta import Session
 
from kallithea.model.user import UserModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class PasswordGenerator(object):
 
    """
 
    This is a simple class for generating password from different sets of
 
    characters
 
    usage::
 

	
 
        passwd_gen = PasswordGenerator()
 
        #print 8-letter password containing only big and small letters
 
            of alphabet
 
        passwd_gen.gen_password(8, passwd_gen.ALPHABETS_BIG_SMALL)
 
    """
 
    ALPHABETS_NUM = r'''1234567890'''
 
    ALPHABETS_SMALL = r'''qwertyuiopasdfghjklzxcvbnm'''
 
    ALPHABETS_BIG = r'''QWERTYUIOPASDFGHJKLZXCVBNM'''
 
    ALPHABETS_SPECIAL = r'''`-=[]\;',./~!@#$%^&*()_+{}|:"<>?'''
 
    ALPHABETS_FULL = ALPHABETS_BIG + ALPHABETS_SMALL \
 
        + ALPHABETS_NUM + ALPHABETS_SPECIAL
 
    ALPHABETS_ALPHANUM = ALPHABETS_BIG + ALPHABETS_SMALL + ALPHABETS_NUM
 
    ALPHABETS_BIG_SMALL = ALPHABETS_BIG + ALPHABETS_SMALL
 
    ALPHABETS_ALPHANUM_BIG = ALPHABETS_BIG + ALPHABETS_NUM
 
    ALPHABETS_ALPHANUM_SMALL = ALPHABETS_SMALL + ALPHABETS_NUM
 

	
 
    def gen_password(self, length, alphabet=ALPHABETS_FULL):
 
        assert len(alphabet) <= 256, alphabet
 
        l = []
 
        while len(l) < length:
 
            i = ord(os.urandom(1))
 
            if i < len(alphabet):
 
                l.append(alphabet[i])
 
        return ''.join(l)
 

	
 

	
 
def get_crypt_password(password):
 
    """
 
    Cryptographic function used for bcrypt password hashing.
 

	
 
    :param password: password to hash
 
    """
 
    return ascii_str(bcrypt.hashpw(safe_bytes(password), bcrypt.gensalt(10)))
 

	
 

	
 
def check_password(password, hashed):
 
    """
 
    Checks password match the hashed value using bcrypt.
 
    Remains backwards compatible and accept plain sha256 hashes which used to
 
    be used on Windows.
 

	
 
    :param password: password
 
    :param hashed: password in hashed form
 
    """
 
    # sha256 hashes will always be 64 hex chars
 
    # bcrypt hashes will always contain $ (and be shorter)
 
    if len(hashed) == 64 and all(x in string.hexdigits for x in hashed):
 
        return hashlib.sha256(password).hexdigest() == hashed
 
    try:
 
        return bcrypt.checkpw(safe_bytes(password), ascii_bytes(hashed))
 
    except ValueError as e:
 
        # bcrypt will throw ValueError 'Invalid hashed_password salt' on all password errors
 
        log.error('error from bcrypt checking password: %s', e)
 
        return False
 
    log.error('check_password failed - no method found for hash length %s', len(hashed))
 
    return False
 

	
 

	
 
def get_user_permissions(user_id, user_is_admin):
 
    PERM_WEIGHTS = Permission.PERM_WEIGHTS
 
    repository_permissions = {}
 
    repository_group_permissions = {}
 
    user_group_permissions = {}
 
    global_permissions = set()
 

	
 

	
 
    def bump_permission(permissions, key, new_perm):
 
        """Add a new permission for key to permissions.
 
        Assuming the permissions are comparable, set the new permission if it
 
        has higher weight, else drop it and keep the old permission.
 
        """
 
        cur_perm = permissions[key]
 
        new_perm_val = PERM_WEIGHTS[new_perm]
 
        cur_perm_val = PERM_WEIGHTS[cur_perm]
 
        if new_perm_val > cur_perm_val:
 
            permissions[key] = new_perm
 

	
 
    #======================================================================
 
    # fetch default permissions
 
    #======================================================================
 
    default_repo_perms = Permission.get_default_perms(kallithea.DEFAULT_USER_ID)
 
@@ -337,198 +338,202 @@ def get_user_permissions(user_id, user_i
 
    for perm in user_user_groups_perms:
 
        bump_permission(user_group_permissions,
 
            perm.user_group.users_group_name,
 
            perm.permission.permission_name)
 

	
 
    return (repository_permissions, repository_group_permissions, user_group_permissions, global_permissions)
 

	
 

	
 
class AuthUser(object):
 
    """
 
    Represents a Kallithea user, including various authentication and
 
    authorization information. Typically used to store the current user,
 
    but is also used as a generic user information data structure in
 
    parts of the code, e.g. user management.
 

	
 
    Constructed from a database `User` object, a user ID or cookie dict,
 
    it looks up the user (if needed) and copies all attributes to itself,
 
    adding various non-persistent data. If lookup fails but anonymous
 
    access to Kallithea is enabled, the default user is loaded instead.
 

	
 
    `AuthUser` does not by itself authenticate users. It's up to other parts of
 
    the code to check e.g. if a supplied password is correct, and if so, trust
 
    the AuthUser object as an authenticated user.
 

	
 
    However, `AuthUser` does refuse to load a user that is not `active`.
 

	
 
    Note that Kallithea distinguishes between the default user (an actual
 
    user in the database with username "default") and "no user" (no actual
 
    User object, AuthUser filled with blank values and username "None").
 

	
 
    If the default user is active, that will always be used instead of
 
    "no user". On the other hand, if the default user is disabled (and
 
    there is no login information), we instead get "no user"; this should
 
    only happen on the login page (as all other requests are redirected).
 

	
 
    `is_default_user` specifically checks if the AuthUser is the user named
 
    "default". Use `is_anonymous` to check for both "default" and "no user".
 
    """
 

	
 
    @classmethod
 
    def make(cls, dbuser=None, is_external_auth=False, ip_addr=None):
 
        """Create an AuthUser to be authenticated ... or return None if user for some reason can't be authenticated.
 
        Checks that a non-None dbuser is provided, is active, and that the IP address is ok.
 
        """
 
        assert ip_addr is not None
 
        if dbuser is None:
 
            log.info('No db user for authentication')
 
            return None
 
        if not dbuser.active:
 
            log.info('Db user %s not active', dbuser.username)
 
            return None
 
        allowed_ips = AuthUser.get_allowed_ips(dbuser.user_id)
 
        if not check_ip_access(source_ip=ip_addr, allowed_ips=allowed_ips):
 
            log.info('Access for %s from %s forbidden - not in %s', dbuser.username, ip_addr, allowed_ips)
 
            return None
 
        return cls(dbuser=dbuser, is_external_auth=is_external_auth)
 

	
 
    def __init__(self, user_id=None, dbuser=None, is_external_auth=False):
 
        self.is_external_auth = is_external_auth # container auth - don't show logout option
 

	
 
        # These attributes will be overridden below if the requested user is
 
        # found or anonymous access (using the default user) is enabled.
 
        self.user_id = None
 
        self.username = None
 
        self.api_key = None
 
        self.name = ''
 
        self.lastname = ''
 
        self.email = ''
 
        self.admin = False
 

	
 
        # Look up database user, if necessary.
 
        if user_id is not None:
 
            assert dbuser is None
 
            log.debug('Auth User lookup by USER ID %s', user_id)
 
            dbuser = UserModel().get(user_id)
 
            assert dbuser is not None
 
        else:
 
            assert dbuser is not None
 
            log.debug('Auth User lookup by database user %s', dbuser)
 

	
 
        log.debug('filling %s data', dbuser)
 
        self.is_anonymous = dbuser.is_default_user
 
        if dbuser.is_default_user and not dbuser.active:
 
            self.username = 'None'
 
            self.is_default_user = False
 
        else:
 
            # copy non-confidential database fields from a `db.User` to this `AuthUser`.
 
            for k, v in dbuser.get_dict().items():
 
                assert k not in ['api_keys', 'permissions']
 
                setattr(self, k, v)
 
            self.is_default_user = dbuser.is_default_user
 
        log.debug('Auth User is now %s', self)
 

	
 
        log.debug('Getting PERMISSION tree for %s', self)
 
        (self.repository_permissions, self.repository_group_permissions, self.user_group_permissions, self.global_permissions,
 
        )= get_user_permissions(self.user_id, self.is_admin)
 
        self.permissions = {
 

	
 
    @LazyProperty
 
    def permissions(self):
 
        """dict with all 4 kind of permissions - mainly for backwards compatibility"""
 
        return {
 
            'global': self.global_permissions,
 
            'repositories': self.repository_permissions,
 
            'repositories_groups': self.repository_group_permissions,
 
            'user_groups': self.user_group_permissions,
 
        } # backwards compatibility
 
        }
 

	
 
    def has_repository_permission_level(self, repo_name, level, purpose=None):
 
        required_perms = {
 
            'read': ['repository.read', 'repository.write', 'repository.admin'],
 
            'write': ['repository.write', 'repository.admin'],
 
            'admin': ['repository.admin'],
 
        }[level]
 
        actual_perm = self.repository_permissions.get(repo_name)
 
        ok = actual_perm in required_perms
 
        log.debug('Checking if user %r can %r repo %r (%s): %s (has %r)',
 
            self.username, level, repo_name, purpose, ok, actual_perm)
 
        return ok
 

	
 
    def has_repository_group_permission_level(self, repo_group_name, level, purpose=None):
 
        required_perms = {
 
            'read': ['group.read', 'group.write', 'group.admin'],
 
            'write': ['group.write', 'group.admin'],
 
            'admin': ['group.admin'],
 
        }[level]
 
        actual_perm = self.repository_group_permissions.get(repo_group_name)
 
        ok = actual_perm in required_perms
 
        log.debug('Checking if user %r can %r repo group %r (%s): %s (has %r)',
 
            self.username, level, repo_group_name, purpose, ok, actual_perm)
 
        return ok
 

	
 
    def has_user_group_permission_level(self, user_group_name, level, purpose=None):
 
        required_perms = {
 
            'read': ['usergroup.read', 'usergroup.write', 'usergroup.admin'],
 
            'write': ['usergroup.write', 'usergroup.admin'],
 
            'admin': ['usergroup.admin'],
 
        }[level]
 
        actual_perm = self.user_group_permissions.get(user_group_name)
 
        ok = actual_perm in required_perms
 
        log.debug('Checking if user %r can %r user group %r (%s): %s (has %r)',
 
            self.username, level, user_group_name, purpose, ok, actual_perm)
 
        return ok
 

	
 
    @property
 
    def api_keys(self):
 
        return self._get_api_keys()
 

	
 
    def _get_api_keys(self):
 
        api_keys = [self.api_key]
 
        for api_key in UserApiKeys.query() \
 
                .filter_by(user_id=self.user_id, is_expired=False):
 
            api_keys.append(api_key.api_key)
 

	
 
        return api_keys
 

	
 
    @property
 
    def is_admin(self):
 
        return self.admin
 

	
 
    @property
 
    def repositories_admin(self):
 
        """
 
        Returns list of repositories you're an admin of
 
        """
 
        return [x[0] for x in self.repository_permissions.items()
 
                if x[1] == 'repository.admin']
 

	
 
    @property
 
    def repository_groups_admin(self):
 
        """
 
        Returns list of repository groups you're an admin of
 
        """
 
        return [x[0] for x in self.repository_group_permissions.items()
 
                if x[1] == 'group.admin']
 

	
 
    @property
 
    def user_groups_admin(self):
 
        """
 
        Returns list of user groups you're an admin of
 
        """
 
        return [x[0] for x in self.user_group_permissions.items()
 
                if x[1] == 'usergroup.admin']
 

	
 
    def __repr__(self):
 
        return "<%s %s: %r>" % (self.__class__.__name__, self.user_id, self.username)
 

	
 
    def to_cookie(self):
 
        """ Serializes this login session to a cookie `dict`. """
 
        return {
 
            'user_id': self.user_id,
 
            'is_external_auth': self.is_external_auth,
 
        }
 

	
 
    @staticmethod
 
    def from_cookie(cookie, ip_addr):
 
        """
 
        Deserializes an `AuthUser` from a cookie `dict` ... or return None.
 
        """
 
        return AuthUser.make(
 
            dbuser=UserModel().get(cookie.get('user_id')),
 
            is_external_auth=cookie.get('is_external_auth', False),
 
            ip_addr=ip_addr,
0 comments (0 inline, 0 general)