Changeset - 092e897458fa
[Not reviewed]
default
0 1 0
Mads Kiilerich (mads) - 5 years ago 2021-04-21 14:52:00
mads@kiilerich.com
hg: read hgrc, also in repo_scan

This might make a difference in some real world cases.
1 file changed with 1 insertions and 2 deletions:
0 comments (0 inline, 0 general)
kallithea/model/scm.py
Show inline comments
 
@@ -127,110 +127,109 @@ class UserGroupList(_PermCheckIterator):
 
        super(UserGroupList, self).__init__(obj_list=db_user_group_list,
 
                    obj_attr='users_group_name', perm_set=[perm_level],
 
                    perm_checker=HasUserGroupPermissionLevel,
 
                    extra_kwargs=extra_kwargs)
 

	
 

	
 
class ScmModel(object):
 
    """
 
    Generic Scm Model
 
    """
 

	
 
    def __get_repo(self, instance):
 
        cls = db.Repository
 
        if isinstance(instance, cls):
 
            return instance
 
        elif isinstance(instance, int):
 
            return cls.get(instance)
 
        elif isinstance(instance, str):
 
            if instance.isdigit():
 
                return cls.get(int(instance))
 
            return cls.get_by_repo_name(instance)
 
        raise Exception('given object must be int, basestr or Instance'
 
                        ' of %s got %s' % (type(cls), type(instance)))
 

	
 
    @LazyProperty
 
    def repos_path(self):
 
        """
 
        Gets the repositories root path from database
 
        """
 

	
 
        q = db.Ui.query().filter(db.Ui.ui_key == '/').one()
 

	
 
        return q.ui_value
 

	
 
    def repo_scan(self, repos_path=None):
 
        """
 
        Listing of repositories in given path. This path should not be a
 
        repository itself. Return a dictionary of repository objects mapping to
 
        vcs instances.
 

	
 
        :param repos_path: path to directory containing repositories
 
        """
 

	
 
        if repos_path is None:
 
            repos_path = self.repos_path
 

	
 
        log.info('scanning for repositories in %s', repos_path)
 

	
 
        baseui = make_ui()
 
        repos = {}
 

	
 
        for name, path in get_filesystem_repos(repos_path):
 
            # name need to be decomposed and put back together using the /
 
            # since this is internal storage separator for kallithea
 
            name = db.Repository.normalize_repo_name(name)
 

	
 
            try:
 
                if name in repos:
 
                    raise RepositoryError('Duplicate repository name %s '
 
                                          'found in %s' % (name, path))
 
                else:
 
                    repos[name] = get_repo(path[1], baseui=baseui)
 
                    repos[name] = get_repo(path[1], baseui=make_ui(path[1]))
 
            except (OSError, VCSError):
 
                continue
 
        log.debug('found %s paths with repositories', len(repos))
 
        return repos
 

	
 
    def get_repos(self, repos):
 
        """Return the repos the user has access to"""
 
        return RepoList(repos, perm_level='read')
 

	
 
    def get_repo_groups(self, groups=None):
 
        """Return the repo groups the user has access to
 
        If no groups are specified, use top level groups.
 
        """
 
        if groups is None:
 
            groups = db.RepoGroup.query() \
 
                .filter(db.RepoGroup.parent_group_id == None).all()
 
        return RepoGroupList(groups, perm_level='read')
 

	
 
    def mark_for_invalidation(self, repo_name):
 
        """
 
        Mark caches of this repo invalid in the database.
 

	
 
        :param repo_name: the repo for which caches should be marked invalid
 
        """
 
        log.debug("Marking %s as invalidated and update cache", repo_name)
 
        repo = db.Repository.get_by_repo_name(repo_name)
 
        if repo is not None:
 
            repo.set_invalidate()
 
            repo.update_changeset_cache()
 

	
 
    def toggle_following_repo(self, follow_repo_id, user_id):
 

	
 
        f = db.UserFollowing.query() \
 
            .filter(db.UserFollowing.follows_repository_id == follow_repo_id) \
 
            .filter(db.UserFollowing.user_id == user_id).scalar()
 

	
 
        if f is not None:
 
            try:
 
                meta.Session().delete(f)
 
                userlog.action_logger(UserTemp(user_id),
 
                              'stopped_following_repo',
 
                              RepoTemp(follow_repo_id))
 
                return
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                raise
 

	
 
        try:
0 comments (0 inline, 0 general)