Changeset - 5c7b4229503e
[Not reviewed]
stable
0 1 0
Mads Kiilerich (mads) - 3 years ago 2022-12-22 01:02:36
mads@kiilerich.com
repo group: fix logging of rename/move

After renaming a group, it would iterate over all the contained groups and
repos and update their full path while logging the update from the/old/path to
the the/new/path. Doing that, it would also visit the already renamed top level
group, but since the full path of that one already had been updated, it would
log it as renaming from the/new/path to the/new/path.

Fixed by logging when renaming in the first place, and skipping the top level
repo group while iterating.

To avoid redundant logging, only log (and rename) if the name or parent
actually change.

Based on a patch by toras9000.
1 file changed with 10 insertions and 3 deletions:
0 comments (0 inline, 0 general)
kallithea/model/repo_group.py
Show inline comments
 
@@ -97,399 +97,406 @@ class RepoGroupModel(object):
 
            return
 

	
 
        log.debug('renaming repository group from %s to %s', old, new)
 

	
 
        old_path = os.path.join(self.repos_path, old)
 
        new_path = os.path.join(self.repos_path, new)
 

	
 
        log.debug('renaming repos paths from %s to %s', old_path, new_path)
 

	
 
        if os.path.isdir(new_path):
 
            raise Exception('Was trying to rename to already '
 
                            'existing dir %s' % new_path)
 
        shutil.move(old_path, new_path)
 

	
 
    def _delete_group(self, group, force_delete=False):
 
        """
 
        Deletes a group from a filesystem
 

	
 
        :param group: instance of group from database
 
        :param force_delete: use shutil rmtree to remove all objects
 
        """
 
        paths = group.full_path.split(kallithea.URL_SEP)
 
        paths = os.sep.join(paths)
 

	
 
        rm_path = os.path.join(self.repos_path, paths)
 
        log.info("Removing group %s", rm_path)
 
        # delete only if that path really exists
 
        if os.path.isdir(rm_path):
 
            if force_delete:
 
                shutil.rmtree(rm_path)
 
            else:
 
                # archive that group
 
                _now = datetime.datetime.now()
 
                _ms = str(_now.microsecond).rjust(6, '0')
 
                _d = 'rm__%s_GROUP_%s' % (_now.strftime('%Y%m%d_%H%M%S_' + _ms),
 
                                          group.name)
 
                shutil.move(rm_path, os.path.join(self.repos_path, _d))
 

	
 
    def create(self, group_name, group_description, owner, parent=None,
 
               just_db=False, copy_permissions=False):
 
        try:
 
            if kallithea.lib.utils2.repo_name_slug(group_name) != group_name:
 
                raise Exception('invalid repo group name %s' % group_name)
 

	
 
            owner = db.User.guess_instance(owner)
 
            parent_group = db.RepoGroup.guess_instance(parent)
 
            new_repo_group = db.RepoGroup()
 
            new_repo_group.owner = owner
 
            new_repo_group.group_description = group_description or group_name
 
            new_repo_group.parent_group = parent_group
 
            new_repo_group.group_name = new_repo_group.get_new_name(group_name)
 

	
 
            meta.Session().add(new_repo_group)
 

	
 
            # create an ADMIN permission for owner except if we're super admin,
 
            # later owner should go into the owner field of groups
 
            if not owner.is_admin:
 
                self.grant_user_permission(repo_group=new_repo_group,
 
                                           user=owner, perm='group.admin')
 

	
 
            if parent_group and copy_permissions:
 
                # copy permissions from parent
 
                user_perms = db.UserRepoGroupToPerm.query() \
 
                    .filter(db.UserRepoGroupToPerm.group == parent_group).all()
 

	
 
                group_perms = db.UserGroupRepoGroupToPerm.query() \
 
                    .filter(db.UserGroupRepoGroupToPerm.group == parent_group).all()
 

	
 
                for perm in user_perms:
 
                    # don't copy over the permission for user who is creating
 
                    # this group, if he is not super admin he get's admin
 
                    # permission set above
 
                    if perm.user != owner or owner.is_admin:
 
                        db.UserRepoGroupToPerm.create(perm.user, new_repo_group, perm.permission)
 

	
 
                for perm in group_perms:
 
                    db.UserGroupRepoGroupToPerm.create(perm.users_group, new_repo_group, perm.permission)
 
            else:
 
                self._create_default_perms(new_repo_group)
 

	
 
            if not just_db:
 
                # we need to flush here, in order to check if database won't
 
                # throw any exceptions, create filesystem dirs at the very end
 
                meta.Session().flush()
 
                self._create_group(new_repo_group.group_name)
 

	
 
            return new_repo_group
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def _update_permissions(self, repo_group, perms_new=None,
 
                            perms_updates=None, recursive=None,
 
                            check_perms=True):
 
        from kallithea.lib.auth import HasUserGroupPermissionLevel
 

	
 
        if not perms_new:
 
            perms_new = []
 
        if not perms_updates:
 
            perms_updates = []
 

	
 
        def _set_perm_user(obj, user, perm):
 
            if isinstance(obj, db.RepoGroup):
 
                self.grant_user_permission(repo_group=obj, user=user, perm=perm)
 
            elif isinstance(obj, db.Repository):
 
                user = db.User.guess_instance(user)
 

	
 
                # private repos will not allow to change the default permissions
 
                # using recursive mode
 
                if obj.private and user.is_default_user:
 
                    return
 

	
 
                # we set group permission but we have to switch to repo
 
                # permission
 
                perm = perm.replace('group.', 'repository.')
 
                repo.RepoModel().grant_user_permission(
 
                    repo=obj, user=user, perm=perm
 
                )
 

	
 
        def _set_perm_group(obj, users_group, perm):
 
            if isinstance(obj, db.RepoGroup):
 
                self.grant_user_group_permission(repo_group=obj,
 
                                                  group_name=users_group,
 
                                                  perm=perm)
 
            elif isinstance(obj, db.Repository):
 
                # we set group permission but we have to switch to repo
 
                # permission
 
                perm = perm.replace('group.', 'repository.')
 
                repo.RepoModel().grant_user_group_permission(
 
                    repo=obj, group_name=users_group, perm=perm
 
                )
 

	
 
        # start updates
 
        updates = []
 
        log.debug('Now updating permissions for %s in recursive mode:%s',
 
                  repo_group, recursive)
 

	
 
        for obj in repo_group.recursive_groups_and_repos():
 
            # iterated obj is an instance of a repos group or repository in
 
            # that group, recursive option can be: none, repos, groups, all
 
            if recursive == 'all':
 
                pass
 
            elif recursive == 'repos':
 
                # skip groups, other than this one
 
                if isinstance(obj, db.RepoGroup) and not obj == repo_group:
 
                    continue
 
            elif recursive == 'groups':
 
                # skip repos
 
                if isinstance(obj, db.Repository):
 
                    continue
 
            else:  # recursive == 'none': # DEFAULT don't apply to iterated objects
 
                obj = repo_group
 
                # also we do a break at the end of this loop.
 

	
 
            # update permissions
 
            for member, perm, member_type in perms_updates:
 
                ## set for user
 
                if member_type == 'user':
 
                    # this updates also current one if found
 
                    _set_perm_user(obj, user=member, perm=perm)
 
                ## set for user group
 
                else:
 
                    # check if we have permissions to alter this usergroup's access
 
                    if not check_perms or HasUserGroupPermissionLevel('read')(member):
 
                        _set_perm_group(obj, users_group=member, perm=perm)
 
            # set new permissions
 
            for member, perm, member_type in perms_new:
 
                if member_type == 'user':
 
                    _set_perm_user(obj, user=member, perm=perm)
 
                else:
 
                    # check if we have permissions to alter this usergroup's access
 
                    if not check_perms or HasUserGroupPermissionLevel('read')(member):
 
                        _set_perm_group(obj, users_group=member, perm=perm)
 
            updates.append(obj)
 
            # if it's not recursive call for all,repos,groups
 
            # break the loop and don't proceed with other changes
 
            if recursive not in ['all', 'repos', 'groups']:
 
                break
 

	
 
        return updates
 

	
 
    def update(self, repo_group, repo_group_args):
 
        try:
 
            repo_group = db.RepoGroup.guess_instance(repo_group)
 
            old_path = repo_group.full_path  # aka .group_name
 

	
 
            if 'owner' in repo_group_args:
 
                repo_group.owner = db.User.get_by_username(repo_group_args['owner'])
 
            if 'group_description' in repo_group_args:
 
                repo_group.group_description = repo_group_args['group_description']
 
            if 'parent_group_id' in repo_group_args:
 
                assert repo_group_args['parent_group_id'] != '-1', repo_group_args  # RepoGroupForm should have converted to None
 
                repo_group.parent_group = db.RepoGroup.get(repo_group_args['parent_group_id'])
 
                repo_group.group_name = repo_group.get_new_name(repo_group.name)
 
                new_parent_group = db.RepoGroup.get(repo_group_args['parent_group_id'])
 
                if new_parent_group is not repo_group.parent_group:
 
                    repo_group.parent_group = new_parent_group
 
                    repo_group.group_name = repo_group.get_new_name(repo_group.name)
 
                    log.debug('Moving repo group %s to %s', old_path, repo_group.group_name)
 
            if 'group_name' in repo_group_args:
 
                group_name = repo_group_args['group_name']
 
                if kallithea.lib.utils2.repo_name_slug(group_name) != group_name:
 
                    raise Exception('invalid repo group name %s' % group_name)
 
                repo_group.group_name = repo_group.get_new_name(group_name)
 
                if repo_group.name != group_name:
 
                    repo_group.group_name = repo_group.get_new_name(group_name)
 
                    log.debug('Renaming repo group %s to %s', old_path, repo_group.group_name)
 
            new_path = repo_group.full_path
 
            meta.Session().add(repo_group)
 

	
 
            # Iterate over all members of this repo group and update the full
 
            # path (repo_name and group_name) based on the (already updated)
 
            # full path of the parent.
 
            # This can potentially be a heavy operation.
 
            for obj in repo_group.recursive_groups_and_repos():
 
                if obj is repo_group:
 
                    continue  # already updated and logged
 
                if isinstance(obj, db.RepoGroup):
 
                    new_name = obj.get_new_name(obj.name)
 
                    log.debug('Fixing repo group %s to new name %s', obj.group_name, new_name)
 
                    obj.group_name = new_name
 
                elif isinstance(obj, db.Repository):
 
                    new_name = obj.get_new_name(obj.just_name)
 
                    log.debug('Fixing repo %s to new name %s', obj.repo_name, new_name)
 
                    obj.repo_name = new_name
 

	
 
            # Rename in file system
 
            self._rename_group(old_path, new_path)
 

	
 
            return repo_group
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def delete(self, repo_group, force_delete=False):
 
        repo_group = db.RepoGroup.guess_instance(repo_group)
 
        try:
 
            meta.Session().delete(repo_group)
 
            self._delete_group(repo_group, force_delete)
 
        except Exception:
 
            log.error('Error removing repo_group %s', repo_group)
 
            raise
 

	
 
    def add_permission(self, repo_group, obj, obj_type, perm, recursive):
 
        repo_group = db.RepoGroup.guess_instance(repo_group)
 
        perm = db.Permission.guess_instance(perm)
 

	
 
        for el in repo_group.recursive_groups_and_repos():
 
            # iterated obj is an instance of a repos group or repository in
 
            # that group, recursive option can be: none, repos, groups, all
 
            if recursive == 'all':
 
                pass
 
            elif recursive == 'repos':
 
                # skip groups, other than this one
 
                if isinstance(el, db.RepoGroup) and not el == repo_group:
 
                    continue
 
            elif recursive == 'groups':
 
                # skip repos
 
                if isinstance(el, db.Repository):
 
                    continue
 
            else:  # recursive == 'none': # DEFAULT don't apply to iterated objects
 
                el = repo_group
 
                # also we do a break at the end of this loop.
 

	
 
            if isinstance(el, db.RepoGroup):
 
                if obj_type == 'user':
 
                    RepoGroupModel().grant_user_permission(el, user=obj, perm=perm)
 
                elif obj_type == 'user_group':
 
                    RepoGroupModel().grant_user_group_permission(el, group_name=obj, perm=perm)
 
                else:
 
                    raise Exception('undefined object type %s' % obj_type)
 
            elif isinstance(el, db.Repository):
 
                # for repos we need to hotfix the name of permission
 
                _perm = perm.permission_name.replace('group.', 'repository.')
 
                if obj_type == 'user':
 
                    repo.RepoModel().grant_user_permission(el, user=obj, perm=_perm)
 
                elif obj_type == 'user_group':
 
                    repo.RepoModel().grant_user_group_permission(el, group_name=obj, perm=_perm)
 
                else:
 
                    raise Exception('undefined object type %s' % obj_type)
 
            else:
 
                raise Exception('el should be instance of Repository or '
 
                                'RepositoryGroup got %s instead' % type(el))
 

	
 
            # if it's not recursive call for all,repos,groups
 
            # break the loop and don't proceed with other changes
 
            if recursive not in ['all', 'repos', 'groups']:
 
                break
 

	
 
    def delete_permission(self, repo_group, obj, obj_type, recursive):
 
        """
 
        Revokes permission for repo_group for given obj(user or users_group),
 
        obj_type can be user or user group
 

	
 
        :param repo_group:
 
        :param obj: user or user group id
 
        :param obj_type: user or user group type
 
        :param recursive: recurse to all children of group
 
        """
 
        repo_group = db.RepoGroup.guess_instance(repo_group)
 

	
 
        for el in repo_group.recursive_groups_and_repos():
 
            # iterated obj is an instance of a repos group or repository in
 
            # that group, recursive option can be: none, repos, groups, all
 
            if recursive == 'all':
 
                pass
 
            elif recursive == 'repos':
 
                # skip groups, other than this one
 
                if isinstance(el, db.RepoGroup) and not el == repo_group:
 
                    continue
 
            elif recursive == 'groups':
 
                # skip repos
 
                if isinstance(el, db.Repository):
 
                    continue
 
            else:  # recursive == 'none': # DEFAULT don't apply to iterated objects
 
                el = repo_group
 
                # also we do a break at the end of this loop.
 

	
 
            if isinstance(el, db.RepoGroup):
 
                if obj_type == 'user':
 
                    RepoGroupModel().revoke_user_permission(el, user=obj)
 
                elif obj_type == 'user_group':
 
                    RepoGroupModel().revoke_user_group_permission(el, group_name=obj)
 
                else:
 
                    raise Exception('undefined object type %s' % obj_type)
 
            elif isinstance(el, db.Repository):
 
                if obj_type == 'user':
 
                    repo.RepoModel().revoke_user_permission(el, user=obj)
 
                elif obj_type == 'user_group':
 
                    repo.RepoModel().revoke_user_group_permission(el, group_name=obj)
 
                else:
 
                    raise Exception('undefined object type %s' % obj_type)
 
            else:
 
                raise Exception('el should be instance of Repository or '
 
                                'RepositoryGroup got %s instead' % type(el))
 

	
 
            # if it's not recursive call for all,repos,groups
 
            # break the loop and don't proceed with other changes
 
            if recursive not in ['all', 'repos', 'groups']:
 
                break
 

	
 
    def grant_user_permission(self, repo_group, user, perm):
 
        """
 
        Grant permission for user on given repository group, or update
 
        existing one if found
 

	
 
        :param repo_group: Instance of RepoGroup, repositories_group_id,
 
            or repositories_group name
 
        :param user: Instance of User, user_id or username
 
        :param perm: Instance of Permission, or permission_name
 
        """
 

	
 
        repo_group = db.RepoGroup.guess_instance(repo_group)
 
        user = db.User.guess_instance(user)
 
        permission = db.Permission.guess_instance(perm)
 

	
 
        # check if we have that permission already
 
        obj = db.UserRepoGroupToPerm.query() \
 
            .filter(db.UserRepoGroupToPerm.user == user) \
 
            .filter(db.UserRepoGroupToPerm.group == repo_group) \
 
            .scalar()
 
        if obj is None:
 
            # create new !
 
            obj = db.UserRepoGroupToPerm()
 
            meta.Session().add(obj)
 
        obj.group = repo_group
 
        obj.user = user
 
        obj.permission = permission
 
        log.debug('Granted perm %s to %s on %s', perm, user, repo_group)
 
        return obj
 

	
 
    def revoke_user_permission(self, repo_group, user):
 
        """
 
        Revoke permission for user on given repository group
 

	
 
        :param repo_group: Instance of RepoGroup, repositories_group_id,
 
            or repositories_group name
 
        :param user: Instance of User, user_id or username
 
        """
 

	
 
        repo_group = db.RepoGroup.guess_instance(repo_group)
 
        user = db.User.guess_instance(user)
 

	
 
        obj = db.UserRepoGroupToPerm.query() \
 
            .filter(db.UserRepoGroupToPerm.user == user) \
 
            .filter(db.UserRepoGroupToPerm.group == repo_group) \
 
            .scalar()
 
        if obj is not None:
 
            meta.Session().delete(obj)
 
            log.debug('Revoked perm on %s on %s', repo_group, user)
 

	
 
    def grant_user_group_permission(self, repo_group, group_name, perm):
 
        """
 
        Grant permission for user group on given repository group, or update
 
        existing one if found
 

	
 
        :param repo_group: Instance of RepoGroup, repositories_group_id,
 
            or repositories_group name
 
        :param group_name: Instance of UserGroup, users_group_id,
 
            or user group name
 
        :param perm: Instance of Permission, or permission_name
 
        """
 
        repo_group = db.RepoGroup.guess_instance(repo_group)
 
        group_name = db.UserGroup.guess_instance(group_name)
 
        permission = db.Permission.guess_instance(perm)
 

	
 
        # check if we have that permission already
 
        obj = db.UserGroupRepoGroupToPerm.query() \
 
            .filter(db.UserGroupRepoGroupToPerm.group == repo_group) \
0 comments (0 inline, 0 general)