Changeset - b0ed41df0282
[Not reviewed]
default
0 1 0
Mads Kiilerich (mads) - 7 years ago 2019-06-07 03:37:07
mads@kiilerich.com
db: drop __mapper_args__ that cause SQLAlchemy deprecation warnings

We don't really need them. There were only few uses left, and we seem to be
using .order_by() in all places where we really care about the ordering.

Gets rid of

<string>:2
<string>:2: SADeprecationWarning: Mapper.order_by is deprecated.Use Query.order_by() in order to affect the ordering of ORM result sets.
1 file changed with 0 insertions and 4 deletions:
0 comments (0 inline, 0 general)
kallithea/model/db.py
Show inline comments
 
@@ -517,437 +517,434 @@ class User(Base, BaseDbModel):
 
        return self.username == User.DEFAULT_USER
 

	
 
    @hybrid_property
 
    def user_data(self):
 
        if not self._user_data:
 
            return {}
 

	
 
        try:
 
            return json.loads(self._user_data)
 
        except TypeError:
 
            return {}
 

	
 
    @user_data.setter
 
    def user_data(self, val):
 
        try:
 
            self._user_data = json.dumps(val)
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    def __unicode__(self):
 
        return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
 
                                      self.user_id, self.username)
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(User, cls).guess_instance(value, User.get_by_username)
 

	
 
    @classmethod
 
    def get_or_404(cls, id_, allow_default=True):
 
        '''
 
        Overridden version of BaseDbModel.get_or_404, with an extra check on
 
        the default user.
 
        '''
 
        user = super(User, cls).get_or_404(id_)
 
        if not allow_default and user.is_default_user:
 
            raise DefaultUserException()
 
        return user
 

	
 
    @classmethod
 
    def get_by_username_or_email(cls, username_or_email, case_insensitive=False, cache=False):
 
        """
 
        For anything that looks like an email address, look up by the email address (matching
 
        case insensitively).
 
        For anything else, try to look up by the user name.
 

	
 
        This assumes no normal username can have '@' symbol.
 
        """
 
        if '@' in username_or_email:
 
            return User.get_by_email(username_or_email, cache=cache)
 
        else:
 
            return User.get_by_username(username_or_email, case_insensitive=case_insensitive, cache=cache)
 

	
 
    @classmethod
 
    def get_by_username(cls, username, case_insensitive=False, cache=False):
 
        if case_insensitive:
 
            q = cls.query().filter(func.lower(cls.username) == func.lower(username))
 
        else:
 
            q = cls.query().filter(cls.username == username)
 

	
 
        if cache:
 
            q = q.options(FromCache(
 
                            "sql_cache_short",
 
                            "get_user_%s" % _hash_key(username)
 
                          )
 
            )
 
        return q.scalar()
 

	
 
    @classmethod
 
    def get_by_api_key(cls, api_key, cache=False, fallback=True):
 
        if len(api_key) != 40 or not api_key.isalnum():
 
            return None
 

	
 
        q = cls.query().filter(cls.api_key == api_key)
 

	
 
        if cache:
 
            q = q.options(FromCache("sql_cache_short",
 
                                    "get_api_key_%s" % api_key))
 
        res = q.scalar()
 

	
 
        if fallback and not res:
 
            # fallback to additional keys
 
            _res = UserApiKeys.query().filter_by(api_key=api_key, is_expired=False).first()
 
            if _res:
 
                res = _res.user
 
        if res is None or not res.active or res.is_default_user:
 
            return None
 
        return res
 

	
 
    @classmethod
 
    def get_by_email(cls, email, cache=False):
 
        q = cls.query().filter(func.lower(cls.email) == func.lower(email))
 

	
 
        if cache:
 
            q = q.options(FromCache("sql_cache_short",
 
                                    "get_email_key_%s" % email))
 

	
 
        ret = q.scalar()
 
        if ret is None:
 
            q = UserEmailMap.query()
 
            # try fetching in alternate email map
 
            q = q.filter(func.lower(UserEmailMap.email) == func.lower(email))
 
            q = q.options(joinedload(UserEmailMap.user))
 
            if cache:
 
                q = q.options(FromCache("sql_cache_short",
 
                                        "get_email_map_key_%s" % email))
 
            ret = getattr(q.scalar(), 'user', None)
 

	
 
        return ret
 

	
 
    @classmethod
 
    def get_from_cs_author(cls, author):
 
        """
 
        Tries to get User objects out of commit author string
 

	
 
        :param author:
 
        """
 
        from kallithea.lib.helpers import email, author_name
 
        # Valid email in the attribute passed, see if they're in the system
 
        _email = email(author)
 
        if _email:
 
            user = cls.get_by_email(_email)
 
            if user is not None:
 
                return user
 
        # Maybe we can match by username?
 
        _author = author_name(author)
 
        user = cls.get_by_username(_author, case_insensitive=True)
 
        if user is not None:
 
            return user
 

	
 
    def update_lastlogin(self):
 
        """Update user lastlogin"""
 
        self.last_login = datetime.datetime.now()
 
        log.debug('updated user %s lastlogin', self.username)
 

	
 
    @classmethod
 
    def get_first_admin(cls):
 
        user = User.query().filter(User.admin == True).first()
 
        if user is None:
 
            raise Exception('Missing administrative account!')
 
        return user
 

	
 
    @classmethod
 
    def get_default_user(cls, cache=False):
 
        user = User.get_by_username(User.DEFAULT_USER, cache=cache)
 
        if user is None:
 
            raise Exception('Missing default account!')
 
        return user
 

	
 
    def get_api_data(self, details=False):
 
        """
 
        Common function for generating user related data for API
 
        """
 
        user = self
 
        data = dict(
 
            user_id=user.user_id,
 
            username=user.username,
 
            firstname=user.name,
 
            lastname=user.lastname,
 
            email=user.email,
 
            emails=user.emails,
 
            active=user.active,
 
            admin=user.admin,
 
        )
 
        if details:
 
            data.update(dict(
 
                extern_type=user.extern_type,
 
                extern_name=user.extern_name,
 
                api_key=user.api_key,
 
                api_keys=user.api_keys,
 
                last_login=user.last_login,
 
                ip_addresses=user.ip_addresses
 
                ))
 
        return data
 

	
 
    def __json__(self):
 
        data = dict(
 
            full_name=self.full_name,
 
            full_name_or_username=self.full_name_or_username,
 
            short_contact=self.short_contact,
 
            full_contact=self.full_contact
 
        )
 
        data.update(self.get_api_data())
 
        return data
 

	
 

	
 
class UserApiKeys(Base, BaseDbModel):
 
    __tablename__ = 'user_api_keys'
 
    __table_args__ = (
 
        Index('uak_api_key_idx', 'api_key'),
 
        Index('uak_api_key_expires_idx', 'api_key', 'expires'),
 
        _table_args_default_dict,
 
    )
 
    __mapper_args__ = {}
 

	
 
    user_api_key_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    api_key = Column(String(255), nullable=False, unique=True)
 
    description = Column(UnicodeText(), nullable=False)
 
    expires = Column(Float(53), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    user = relationship('User')
 

	
 
    @hybrid_property
 
    def is_expired(self):
 
        return (self.expires != -1) & (time.time() > self.expires)
 

	
 

	
 
class UserEmailMap(Base, BaseDbModel):
 
    __tablename__ = 'user_email_map'
 
    __table_args__ = (
 
        Index('uem_email_idx', 'email'),
 
        _table_args_default_dict,
 
    )
 
    __mapper_args__ = {}
 

	
 
    email_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    _email = Column("email", String(255), nullable=False, unique=True)
 
    user = relationship('User')
 

	
 
    @validates('_email')
 
    def validate_email(self, key, email):
 
        # check if this email is not main one
 
        main_email = Session().query(User).filter(User.email == email).scalar()
 
        if main_email is not None:
 
            raise AttributeError('email %s is present is user table' % email)
 
        return email
 

	
 
    @hybrid_property
 
    def email(self):
 
        return self._email
 

	
 
    @email.setter
 
    def email(self, val):
 
        self._email = val.lower() if val else None
 

	
 

	
 
class UserIpMap(Base, BaseDbModel):
 
    __tablename__ = 'user_ip_map'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'ip_addr'),
 
        _table_args_default_dict,
 
    )
 
    __mapper_args__ = {}
 

	
 
    ip_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    ip_addr = Column(String(255), nullable=False)
 
    active = Column(Boolean(), nullable=False, default=True)
 
    user = relationship('User')
 

	
 
    @classmethod
 
    def _get_ip_range(cls, ip_addr):
 
        from kallithea.lib import ipaddr
 
        net = ipaddr.IPNetwork(address=ip_addr)
 
        return [str(net.network), str(net.broadcast)]
 

	
 
    def __json__(self):
 
        return dict(
 
          ip_addr=self.ip_addr,
 
          ip_range=self._get_ip_range(self.ip_addr)
 
        )
 

	
 
    def __unicode__(self):
 
        return u"<%s('user_id:%s=>%s')>" % (self.__class__.__name__,
 
                                            self.user_id, self.ip_addr)
 

	
 

	
 
class UserLog(Base, BaseDbModel):
 
    __tablename__ = 'user_logs'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    user_log_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=True)
 
    username = Column(String(255), nullable=False)
 
    repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=True)
 
    repository_name = Column(Unicode(255), nullable=False)
 
    user_ip = Column(String(255), nullable=True)
 
    action = Column(UnicodeText(), nullable=False)
 
    action_date = Column(DateTime(timezone=False), nullable=False)
 

	
 
    def __unicode__(self):
 
        return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
 
                                      self.repository_name,
 
                                      self.action)
 

	
 
    @property
 
    def action_as_day(self):
 
        return datetime.date(*self.action_date.timetuple()[:3])
 

	
 
    user = relationship('User')
 
    repository = relationship('Repository', cascade='')
 

	
 

	
 
class UserGroup(Base, BaseDbModel):
 
    __tablename__ = 'users_groups'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    users_group_id = Column(Integer(), primary_key=True)
 
    users_group_name = Column(Unicode(255), nullable=False, unique=True)
 
    user_group_description = Column(Unicode(10000), nullable=True) # FIXME: not nullable?
 
    users_group_active = Column(Boolean(), nullable=False)
 
    owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    _group_data = Column("group_data", LargeBinary(), nullable=True)  # JSON data # FIXME: not nullable?
 

	
 
    members = relationship('UserGroupMember', cascade="all, delete-orphan")
 
    users_group_to_perm = relationship('UserGroupToPerm', cascade='all')
 
    users_group_repo_to_perm = relationship('UserGroupRepoToPerm', cascade='all')
 
    users_group_repo_group_to_perm = relationship('UserGroupRepoGroupToPerm', cascade='all')
 
    user_user_group_to_perm = relationship('UserUserGroupToPerm ', cascade='all')
 
    user_group_user_group_to_perm = relationship('UserGroupUserGroupToPerm ', primaryjoin="UserGroupUserGroupToPerm.target_user_group_id==UserGroup.users_group_id", cascade='all')
 

	
 
    owner = relationship('User')
 

	
 
    @hybrid_property
 
    def group_data(self):
 
        if not self._group_data:
 
            return {}
 

	
 
        try:
 
            return json.loads(self._group_data)
 
        except TypeError:
 
            return {}
 

	
 
    @group_data.setter
 
    def group_data(self, val):
 
        try:
 
            self._group_data = json.dumps(val)
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    def __unicode__(self):
 
        return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
 
                                      self.users_group_id,
 
                                      self.users_group_name)
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(UserGroup, cls).guess_instance(value, UserGroup.get_by_group_name)
 

	
 
    @classmethod
 
    def get_by_group_name(cls, group_name, cache=False,
 
                          case_insensitive=False):
 
        if case_insensitive:
 
            q = cls.query().filter(func.lower(cls.users_group_name) == func.lower(group_name))
 
        else:
 
            q = cls.query().filter(cls.users_group_name == group_name)
 
        if cache:
 
            q = q.options(FromCache(
 
                            "sql_cache_short",
 
                            "get_group_%s" % _hash_key(group_name)
 
                          )
 
            )
 
        return q.scalar()
 

	
 
    @classmethod
 
    def get(cls, user_group_id, cache=False):
 
        user_group = cls.query()
 
        if cache:
 
            user_group = user_group.options(FromCache("sql_cache_short",
 
                                    "get_users_group_%s" % user_group_id))
 
        return user_group.get(user_group_id)
 

	
 
    def get_api_data(self, with_members=True):
 
        user_group = self
 

	
 
        data = dict(
 
            users_group_id=user_group.users_group_id,
 
            group_name=user_group.users_group_name,
 
            group_description=user_group.user_group_description,
 
            active=user_group.users_group_active,
 
            owner=user_group.owner.username,
 
        )
 
        if with_members:
 
            data['members'] = [
 
                ugm.user.get_api_data()
 
                for ugm in user_group.members
 
            ]
 

	
 
        return data
 

	
 

	
 
class UserGroupMember(Base, BaseDbModel):
 
    __tablename__ = 'users_groups_members'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    users_group_member_id = Column(Integer(), primary_key=True)
 
    users_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 

	
 
    user = relationship('User')
 
    users_group = relationship('UserGroup')
 

	
 
    def __init__(self, gr_id='', u_id=''):
 
        self.users_group_id = gr_id
 
        self.user_id = u_id
 

	
 

	
 
class RepositoryField(Base, BaseDbModel):
 
    __tablename__ = 'repositories_fields'
 
    __table_args__ = (
 
        UniqueConstraint('repository_id', 'field_key'),  # no-multi field
 
        _table_args_default_dict,
 
    )
 

	
 
    PREFIX = 'ex_'  # prefix used in form to not conflict with already existing fields
 

	
 
    repo_field_id = Column(Integer(), primary_key=True)
 
    repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 
    field_key = Column(String(250), nullable=False)
 
    field_label = Column(String(1024), nullable=False)
 
    field_value = Column(String(10000), nullable=False)
 
    field_desc = Column(String(1024), nullable=False)
 
    field_type = Column(String(255), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    repository = relationship('Repository')
 

	
 
    @property
 
    def field_key_prefixed(self):
 
        return 'ex_%s' % self.field_key
 

	
 
    @classmethod
 
    def un_prefix_key(cls, key):
 
        if key.startswith(cls.PREFIX):
 
            return key[len(cls.PREFIX):]
 
        return key
 

	
 
    @classmethod
 
@@ -1281,385 +1278,384 @@ class Repository(Base, BaseDbModel):
 
            uri_tmpl = self.DEFAULT_CLONE_URI
 
            try:
 
                from tg import tmpl_context as c
 
                uri_tmpl = c.clone_uri_tmpl
 
            except AttributeError:
 
                # in any case if we call this outside of request context,
 
                # ie, not having tmpl_context set up
 
                pass
 

	
 
        return get_clone_url(uri_tmpl=uri_tmpl,
 
                             qualified_home_url=qualified_home_url,
 
                             repo_name=self.repo_name,
 
                             repo_id=self.repo_id, **override)
 

	
 
    def set_state(self, state):
 
        self.repo_state = state
 

	
 
    #==========================================================================
 
    # SCM PROPERTIES
 
    #==========================================================================
 

	
 
    def get_changeset(self, rev=None):
 
        return get_changeset_safe(self.scm_instance, rev)
 

	
 
    def get_landing_changeset(self):
 
        """
 
        Returns landing changeset, or if that doesn't exist returns the tip
 
        """
 
        _rev_type, _rev = self.landing_rev
 
        cs = self.get_changeset(_rev)
 
        if isinstance(cs, EmptyChangeset):
 
            return self.get_changeset()
 
        return cs
 

	
 
    def update_changeset_cache(self, cs_cache=None):
 
        """
 
        Update cache of last changeset for repository, keys should be::
 

	
 
            short_id
 
            raw_id
 
            revision
 
            message
 
            date
 
            author
 

	
 
        :param cs_cache:
 
        """
 
        from kallithea.lib.vcs.backends.base import BaseChangeset
 
        if cs_cache is None:
 
            cs_cache = EmptyChangeset()
 
            # use no-cache version here
 
            scm_repo = self.scm_instance_no_cache()
 
            if scm_repo:
 
                cs_cache = scm_repo.get_changeset()
 

	
 
        if isinstance(cs_cache, BaseChangeset):
 
            cs_cache = cs_cache.__json__()
 

	
 
        if (not self.changeset_cache or cs_cache['raw_id'] != self.changeset_cache['raw_id']):
 
            _default = datetime.datetime.fromtimestamp(0)
 
            last_change = cs_cache.get('date') or _default
 
            log.debug('updated repo %s with new cs cache %s',
 
                      self.repo_name, cs_cache)
 
            self.updated_on = last_change
 
            self.changeset_cache = cs_cache
 
            Session().commit()
 
        else:
 
            log.debug('changeset_cache for %s already up to date with %s',
 
                      self.repo_name, cs_cache['raw_id'])
 

	
 
    @property
 
    def tip(self):
 
        return self.get_changeset('tip')
 

	
 
    @property
 
    def author(self):
 
        return self.tip.author
 

	
 
    @property
 
    def last_change(self):
 
        return self.scm_instance.last_change
 

	
 
    def get_comments(self, revisions=None):
 
        """
 
        Returns comments for this repository grouped by revisions
 

	
 
        :param revisions: filter query by revisions only
 
        """
 
        cmts = ChangesetComment.query() \
 
            .filter(ChangesetComment.repo == self)
 
        if revisions is not None:
 
            if not revisions:
 
                return {} # don't use sql 'in' on empty set
 
            cmts = cmts.filter(ChangesetComment.revision.in_(revisions))
 
        grouped = collections.defaultdict(list)
 
        for cmt in cmts.all():
 
            grouped[cmt.revision].append(cmt)
 
        return grouped
 

	
 
    def statuses(self, revisions):
 
        """
 
        Returns statuses for this repository.
 
        PRs without any votes do _not_ show up as unreviewed.
 

	
 
        :param revisions: list of revisions to get statuses for
 
        """
 
        if not revisions:
 
            return {}
 

	
 
        statuses = ChangesetStatus.query() \
 
            .filter(ChangesetStatus.repo == self) \
 
            .filter(ChangesetStatus.version == 0) \
 
            .filter(ChangesetStatus.revision.in_(revisions))
 

	
 
        grouped = {}
 
        for stat in statuses.all():
 
            pr_id = pr_nice_id = pr_repo = None
 
            if stat.pull_request:
 
                pr_id = stat.pull_request.pull_request_id
 
                pr_nice_id = PullRequest.make_nice_id(pr_id)
 
                pr_repo = stat.pull_request.other_repo.repo_name
 
            grouped[stat.revision] = [str(stat.status), stat.status_lbl,
 
                                      pr_id, pr_repo, pr_nice_id,
 
                                      stat.author]
 
        return grouped
 

	
 
    def _repo_size(self):
 
        from kallithea.lib import helpers as h
 
        log.debug('calculating repository size...')
 
        return h.format_byte_size(self.scm_instance.size)
 

	
 
    #==========================================================================
 
    # SCM CACHE INSTANCE
 
    #==========================================================================
 

	
 
    def set_invalidate(self):
 
        """
 
        Mark caches of this repo as invalid.
 
        """
 
        CacheInvalidation.set_invalidate(self.repo_name)
 

	
 
    _scm_instance = None
 

	
 
    @property
 
    def scm_instance(self):
 
        if self._scm_instance is None:
 
            self._scm_instance = self.scm_instance_cached()
 
        return self._scm_instance
 

	
 
    def scm_instance_cached(self, valid_cache_keys=None):
 
        @cache_region('long_term', 'scm_instance_cached')
 
        def _c(repo_name): # repo_name is just for the cache key
 
            log.debug('Creating new %s scm_instance and populating cache', repo_name)
 
            return self.scm_instance_no_cache()
 
        rn = self.repo_name
 

	
 
        valid = CacheInvalidation.test_and_set_valid(rn, None, valid_cache_keys=valid_cache_keys)
 
        if not valid:
 
            log.debug('Cache for %s invalidated, getting new object', rn)
 
            region_invalidate(_c, None, 'scm_instance_cached', rn)
 
        else:
 
            log.debug('Trying to get scm_instance of %s from cache', rn)
 
        return _c(rn)
 

	
 
    def scm_instance_no_cache(self):
 
        repo_full_path = safe_str(self.repo_full_path)
 
        alias = get_scm(repo_full_path)[0]
 
        log.debug('Creating instance of %s repository from %s',
 
                  alias, self.repo_full_path)
 
        backend = get_backend(alias)
 

	
 
        if alias == 'hg':
 
            repo = backend(repo_full_path, create=False,
 
                           baseui=self._ui)
 
        else:
 
            repo = backend(repo_full_path, create=False)
 

	
 
        return repo
 

	
 
    def __json__(self):
 
        return dict(
 
            repo_id=self.repo_id,
 
            repo_name=self.repo_name,
 
            landing_rev=self.landing_rev,
 
        )
 

	
 

	
 
class RepoGroup(Base, BaseDbModel):
 
    __tablename__ = 'groups'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 
    __mapper_args__ = {'order_by': 'group_name'} # TODO: Deprecated as of SQLAlchemy 1.1.
 

	
 
    SEP = ' &raquo; '
 

	
 
    group_id = Column(Integer(), primary_key=True)
 
    group_name = Column(Unicode(255), nullable=False, unique=True) # full path
 
    parent_group_id = Column('group_parent_id', Integer(), ForeignKey('groups.group_id'), nullable=True)
 
    group_description = Column(Unicode(10000), nullable=False)
 
    owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    repo_group_to_perm = relationship('UserRepoGroupToPerm', cascade='all', order_by='UserRepoGroupToPerm.group_to_perm_id')
 
    users_group_to_perm = relationship('UserGroupRepoGroupToPerm', cascade='all')
 
    parent_group = relationship('RepoGroup', remote_side=group_id)
 
    owner = relationship('User')
 

	
 
    @classmethod
 
    def query(cls, sorted=False):
 
        """Add RepoGroup-specific helpers for common query constructs.
 

	
 
        sorted: if True, apply the default ordering (name, case insensitive).
 
        """
 
        q = super(RepoGroup, cls).query()
 

	
 
        if sorted:
 
            q = q.order_by(func.lower(RepoGroup.group_name))
 

	
 
        return q
 

	
 
    def __init__(self, group_name='', parent_group=None):
 
        self.group_name = group_name
 
        self.parent_group = parent_group
 

	
 
    def __unicode__(self):
 
        return u"<%s('id:%s:%s')>" % (self.__class__.__name__, self.group_id,
 
                                      self.group_name)
 

	
 
    @classmethod
 
    def _generate_choice(cls, repo_group):
 
        """Return tuple with group_id and name as html literal"""
 
        from webhelpers.html import literal
 
        if repo_group is None:
 
            return (-1, u'-- %s --' % _('top level'))
 
        return repo_group.group_id, literal(cls.SEP.join(repo_group.full_path_splitted))
 

	
 
    @classmethod
 
    def groups_choices(cls, groups):
 
        """Return tuples with group_id and name as html literal."""
 
        return sorted((cls._generate_choice(g) for g in groups),
 
                      key=lambda c: c[1].split(cls.SEP))
 

	
 
    @classmethod
 
    def url_sep(cls):
 
        return URL_SEP
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(RepoGroup, cls).guess_instance(value, RepoGroup.get_by_group_name)
 

	
 
    @classmethod
 
    def get_by_group_name(cls, group_name, cache=False, case_insensitive=False):
 
        group_name = group_name.rstrip('/')
 
        if case_insensitive:
 
            gr = cls.query() \
 
                .filter(func.lower(cls.group_name) == func.lower(group_name))
 
        else:
 
            gr = cls.query() \
 
                .filter(cls.group_name == group_name)
 
        if cache:
 
            gr = gr.options(FromCache(
 
                            "sql_cache_short",
 
                            "get_group_%s" % _hash_key(group_name)
 
                            )
 
            )
 
        return gr.scalar()
 

	
 
    @property
 
    def parents(self):
 
        groups = []
 
        group = self.parent_group
 
        while group is not None:
 
            groups.append(group)
 
            group = group.parent_group
 
            assert group not in groups, group # avoid recursion on bad db content
 
        groups.reverse()
 
        return groups
 

	
 
    @property
 
    def children(self):
 
        return RepoGroup.query().filter(RepoGroup.parent_group == self)
 

	
 
    @property
 
    def name(self):
 
        return self.group_name.split(RepoGroup.url_sep())[-1]
 

	
 
    @property
 
    def full_path(self):
 
        return self.group_name
 

	
 
    @property
 
    def full_path_splitted(self):
 
        return self.group_name.split(RepoGroup.url_sep())
 

	
 
    @property
 
    def repositories(self):
 
        return Repository.query(sorted=True).filter_by(group=self)
 

	
 
    @property
 
    def repositories_recursive_count(self):
 
        cnt = self.repositories.count()
 

	
 
        def children_count(group):
 
            cnt = 0
 
            for child in group.children:
 
                cnt += child.repositories.count()
 
                cnt += children_count(child)
 
            return cnt
 

	
 
        return cnt + children_count(self)
 

	
 
    def _recursive_objects(self, include_repos=True):
 
        all_ = []
 

	
 
        def _get_members(root_gr):
 
            if include_repos:
 
                for r in root_gr.repositories:
 
                    all_.append(r)
 
            childs = root_gr.children.all()
 
            if childs:
 
                for gr in childs:
 
                    all_.append(gr)
 
                    _get_members(gr)
 

	
 
        _get_members(self)
 
        return [self] + all_
 

	
 
    def recursive_groups_and_repos(self):
 
        """
 
        Recursive return all groups, with repositories in those groups
 
        """
 
        return self._recursive_objects()
 

	
 
    def recursive_groups(self):
 
        """
 
        Returns all children groups for this group including children of children
 
        """
 
        return self._recursive_objects(include_repos=False)
 

	
 
    def get_new_name(self, group_name):
 
        """
 
        returns new full group name based on parent and new name
 

	
 
        :param group_name:
 
        """
 
        path_prefix = (self.parent_group.full_path_splitted if
 
                       self.parent_group else [])
 
        return RepoGroup.url_sep().join(path_prefix + [group_name])
 

	
 
    def get_api_data(self):
 
        """
 
        Common function for generating api data
 

	
 
        """
 
        group = self
 
        data = dict(
 
            group_id=group.group_id,
 
            group_name=group.group_name,
 
            group_description=group.group_description,
 
            parent_group=group.parent_group.group_name if group.parent_group else None,
 
            repositories=[x.repo_name for x in group.repositories],
 
            owner=group.owner.username
 
        )
 
        return data
 

	
 

	
 
class Permission(Base, BaseDbModel):
 
    __tablename__ = 'permissions'
 
    __table_args__ = (
 
        Index('p_perm_name_idx', 'permission_name'),
 
        _table_args_default_dict,
 
    )
 

	
 
    PERMS = (
 
        ('hg.admin', _('Kallithea Administrator')),
 

	
 
        ('repository.none', _('Default user has no access to new repositories')),
 
        ('repository.read', _('Default user has read access to new repositories')),
 
        ('repository.write', _('Default user has write access to new repositories')),
 
        ('repository.admin', _('Default user has admin access to new repositories')),
 

	
 
        ('group.none', _('Default user has no access to new repository groups')),
 
        ('group.read', _('Default user has read access to new repository groups')),
 
        ('group.write', _('Default user has write access to new repository groups')),
0 comments (0 inline, 0 general)