Changeset - 3dabbbc9da89
[Not reviewed]
stable
0 4 0
Mads Kiilerich (mads) - 5 months ago 2025-10-05 13:32:36
mads@kiilerich.com
setup: use importlib instead of pkg_resources

Address warnings like:
UserWarning: pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html. The pkg_resources package is slated for removal as early as 2025-11-30. Refrain from using this package or pin to Setuptools<81.
import pkg_resources
4 files changed with 10 insertions and 14 deletions:
0 comments (0 inline, 0 general)
kallithea/bin/kallithea_cli_extensions.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
This file was forked by the Kallithea project in July 2014 and later moved.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Mar 6, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 
import importlib.resources
 
import os
 

	
 
import click
 
import pkg_resources
 

	
 
import kallithea.bin.kallithea_cli_base as cli_base
 
from kallithea.lib.utils2 import ask_ok
 

	
 

	
 
@cli_base.register_command(needs_config_file=True)
 
def extensions_create(config):
 
    """Write template file for extending Kallithea in Python.
 

	
 
    Create a template `extensions.py` file next to the ini file. Local
 
    customizations in that file will survive upgrades. The file contains
 
    instructions on how it can be customized.
 
    """
 
    here = config['here']
 
    content = pkg_resources.resource_string(
 
        'kallithea', os.path.join('templates', 'py', 'extensions.py')
 
    )
 
    content = importlib.resources.files('kallithea').joinpath(
 
       'templates', 'py', 'extensions.py').read_bytes()
 
    ext_file = os.path.join(here, 'extensions.py')
 
    if os.path.exists(ext_file):
 
        msg = ('Extension file %s already exists, do you want '
 
               'to overwrite it ? [y/n] ') % ext_file
 
        if not ask_ok(msg):
 
            click.echo('Nothing done, exiting...')
 
            return
 

	
 
    dirname = os.path.dirname(ext_file)
 
    if not os.path.isdir(dirname):
 
        os.makedirs(dirname)
 
    with open(ext_file, 'wb') as f:
 
        f.write(content)
 
        click.echo('Wrote new extensions file to %s' % ext_file)
kallithea/model/db.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.db
 
~~~~~~~~~~~~~~~~~~
 

	
 
Database Models for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 08, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import base64
 
import collections
 
import datetime
 
import functools
 
import hashlib
 
import importlib.metadata
 
import logging
 
import os
 
import platform
 
import time
 
import traceback
 

	
 
import ipaddr
 
import sqlalchemy
 
import urlobject
 
from sqlalchemy import Boolean, Column, DateTime, Float, ForeignKey, Index, Integer, LargeBinary, String, Unicode, UnicodeText, UniqueConstraint
 
from sqlalchemy.ext.hybrid import hybrid_property
 
from sqlalchemy.orm import class_mapper, joinedload, relationship, validates
 
from tg.i18n import lazy_ugettext as _
 
from webob.exc import HTTPNotFound
 

	
 
import kallithea
 
from kallithea.lib import ext_json, ssh, webutils
 
from kallithea.lib.exceptions import DefaultUserException
 
from kallithea.lib.utils2 import (asbool, ascii_bytes, aslist, check_git_version, get_changeset_safe, get_clone_url, remove_prefix, safe_bytes, safe_int,
 
                                  safe_str, urlreadable)
 
from kallithea.lib.vcs import get_repo
 
from kallithea.lib.vcs.backends.base import BaseChangeset, EmptyChangeset
 
from kallithea.lib.vcs.utils import author_email, author_name
 
from kallithea.model import meta
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
#==============================================================================
 
# BASE CLASSES
 
#==============================================================================
 

	
 
class BaseDbModel(object):
 
    """
 
    Base Model for all classes
 
    """
 

	
 
    @classmethod
 
    def _get_keys(cls):
 
        """return column names for this model """
 
        # Note: not a normal dict - iterator gives "users.firstname", but keys gives "firstname"
 
        return class_mapper(cls).c.keys()
 

	
 
    def get_dict(self):
 
        """
 
        return dict with keys and values corresponding
 
        to this model data """
 

	
 
        d = {}
 
        for k in self._get_keys():
 
            d[k] = getattr(self, k)
 

	
 
        # also use __json__() if present to get additional fields
 
        _json_attr = getattr(self, '__json__', None)
 
        if _json_attr:
 
            # update with attributes from __json__
 
            if callable(_json_attr):
 
                _json_attr = _json_attr()
 
            for k, val in _json_attr.items():
 
                d[k] = val
 
        return d
 

	
 
    def get_appstruct(self):
 
        """return list with keys and values tuples corresponding
 
        to this model data """
 

	
 
        return [
 
            (k, getattr(self, k))
 
            for k in self._get_keys()
 
        ]
 

	
 
    def populate_obj(self, populate_dict):
 
        """populate model with data from given populate_dict"""
 

	
 
        for k in self._get_keys():
 
            if k in populate_dict:
 
                setattr(self, k, populate_dict[k])
 

	
 
    @classmethod
 
    def query(cls):
 
        return meta.Session().query(cls)
 

	
 
    @classmethod
 
    def get(cls, id_):
 
        if id_:
 
            return cls.query().get(id_)
 

	
 
    @classmethod
 
    def guess_instance(cls, value, callback=None):
 
        """Haphazardly attempt to convert `value` to a `cls` instance.
 

	
 
        If `value` is None or already a `cls` instance, return it. If `value`
 
        is a number (or looks like one if you squint just right), assume it's
 
        a database primary key and let SQLAlchemy sort things out. Otherwise,
 
        fall back to resolving it using `callback` (if specified); this could
 
        e.g. be a function that looks up instances by name (though that won't
 
        work if the name begins with a digit). Otherwise, raise Exception.
 
        """
 

	
 
        if value is None:
 
            return None
 
        if isinstance(value, cls):
 
            return value
 
        if isinstance(value, int):
 
            return cls.get(value)
 
        if isinstance(value, str) and value.isdigit():
 
            return cls.get(int(value))
 
        if callback is not None:
 
            return callback(value)
 

	
 
        raise Exception(
 
            'given object must be int, long or Instance of %s '
 
            'got %s, no callback provided' % (cls, type(value))
 
        )
 

	
 
    @classmethod
 
    def get_or_404(cls, id_):
 
        try:
 
            id_ = int(id_)
 
        except (TypeError, ValueError):
 
            raise HTTPNotFound
 

	
 
        res = cls.query().get(id_)
 
        if res is None:
 
            raise HTTPNotFound
 
        return res
 

	
 
    @classmethod
 
    def delete(cls, id_):
 
        obj = cls.query().get(id_)
 
        meta.Session().delete(obj)
 

	
 
    def __repr__(self):
 
        return '<DB:%s>' % (self.__class__.__name__)
 

	
 

	
 
_table_args_default_dict = {'extend_existing': True,
 
                            'mysql_engine': 'InnoDB',
 
                            'sqlite_autoincrement': True,
 
                           }
 

	
 
class Setting(meta.Base, BaseDbModel):
 
    __tablename__ = 'settings'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    SETTINGS_TYPES = {
 
        'str': safe_bytes,
 
        'int': safe_int,
 
        'unicode': safe_str,
 
        'bool': asbool,
 
        'list': functools.partial(aslist, sep=',')
 
    }
 

	
 
    app_settings_id = Column(Integer(), primary_key=True)
 
    app_settings_name = Column(String(255), nullable=False, unique=True)
 
    _app_settings_value = Column("app_settings_value", Unicode(4096), nullable=False)
 
    _app_settings_type = Column("app_settings_type", String(255), nullable=True) # FIXME: not nullable?
 

	
 
    def __init__(self, key='', val='', type='unicode'):
 
        self.app_settings_name = key
 
        self.app_settings_value = val
 
        self.app_settings_type = type
 

	
 
    @validates('_app_settings_value')
 
    def validate_settings_value(self, key, val):
 
        assert isinstance(val, str)
 
        return val
 

	
 
    @hybrid_property
 
    def app_settings_value(self):
 
        v = self._app_settings_value
 
        _type = self.app_settings_type
 
        converter = self.SETTINGS_TYPES.get(_type) or self.SETTINGS_TYPES['unicode']
 
        return converter(v)
 

	
 
    @app_settings_value.setter
 
    def app_settings_value(self, val):
 
        """
 
        Setter that will always make sure we use str in app_settings_value
 
        """
 
        self._app_settings_value = safe_str(val)
 

	
 
    @hybrid_property
 
    def app_settings_type(self):
 
        return self._app_settings_type
 

	
 
    @app_settings_type.setter
 
    def app_settings_type(self, val):
 
        if val not in self.SETTINGS_TYPES:
 
            raise Exception('type must be one of %s got %s'
 
                            % (list(self.SETTINGS_TYPES), val))
 
        self._app_settings_type = val
 

	
 
    def __repr__(self):
 
        return "<%s %s.%s=%r>" % (
 
            self.__class__.__name__,
 
            self.app_settings_name, self.app_settings_type, self.app_settings_value
 
        )
 

	
 
    @classmethod
 
    def get_by_name(cls, key):
 
        return cls.query() \
 
            .filter(cls.app_settings_name == key).scalar()
 

	
 
    @classmethod
 
    def get_by_name_or_create(cls, key, val='', type='unicode'):
 
        res = cls.get_by_name(key)
 
        if res is None:
 
            res = cls(key, val, type)
 
        return res
 

	
 
    @classmethod
 
    def create_or_update(cls, key, val=None, type=None):
 
        """
 
        Creates or updates Kallithea setting. If updates are triggered, it will only
 
        update parameters that are explicitly set. 'None' values will be skipped.
 

	
 
        :param key:
 
        :param val:
 
        :param type:
 
        :return:
 
        """
 
        res = cls.get_by_name(key)
 
        if res is None:
 
            # new setting
 
            val = val if val is not None else ''
 
            type = type if type is not None else 'unicode'
 
            res = cls(key, val, type)
 
            meta.Session().add(res)
 
        else:
 
            if val is not None:
 
                # update if set
 
                res.app_settings_value = val
 
            if type is not None:
 
                # update if set
 
                res.app_settings_type = type
 
        return res
 

	
 
    @classmethod
 
    def get_app_settings(cls):
 

	
 
        ret = cls.query()
 
        if ret is None:
 
            raise Exception('Could not get application settings !')
 
        settings = {}
 
        for each in ret:
 
            settings[each.app_settings_name] = \
 
                each.app_settings_value
 

	
 
        return settings
 

	
 
    @classmethod
 
    def get_auth_settings(cls):
 
        ret = cls.query() \
 
                .filter(cls.app_settings_name.startswith('auth_')).all()
 
        fd = {}
 
        for row in ret:
 
            fd[row.app_settings_name] = row.app_settings_value
 
        return fd
 

	
 
    @classmethod
 
    def get_default_repo_settings(cls, strip_prefix=False):
 
        ret = cls.query() \
 
                .filter(cls.app_settings_name.startswith('default_')).all()
 
        fd = {}
 
        for row in ret:
 
            key = row.app_settings_name
 
            if strip_prefix:
 
                key = remove_prefix(key, prefix='default_')
 
            fd.update({key: row.app_settings_value})
 

	
 
        return fd
 

	
 
    @classmethod
 
    def get_server_info(cls):
 
        import platform
 

	
 
        import pkg_resources
 

	
 
        mods = [(p.project_name, p.version) for p in pkg_resources.working_set]
 
        # Python 3.9 PathDistribution doesn't have .name
 
        mods = set((p.metadata['Name'], p.version) for p in importlib.metadata.distributions())
 
        info = {
 
            'modules': sorted(mods, key=lambda k: k[0].lower()),
 
            'py_version': platform.python_version(),
 
            'platform': platform.platform(),
 
            'kallithea_version': kallithea.__version__,
 
            'git_version': str(check_git_version()),
 
            'git_path': kallithea.CONFIG.get('git_path')
 
        }
 
        return info
 

	
 

	
 
class Ui(meta.Base, BaseDbModel):
 
    __tablename__ = 'ui'
 
    __table_args__ = (
 
        Index('ui_ui_section_ui_key_idx', 'ui_section', 'ui_key'),
 
        UniqueConstraint('ui_section', 'ui_key'),
 
        _table_args_default_dict,
 
    )
 

	
 
    HOOK_UPDATE = 'changegroup.kallithea_update'
 
    HOOK_REPO_SIZE = 'changegroup.kallithea_repo_size'
 

	
 
    ui_id = Column(Integer(), primary_key=True)
 
    ui_section = Column(String(255), nullable=False)
 
    ui_key = Column(String(255), nullable=False)
 
    ui_value = Column(String(255), nullable=True) # FIXME: not nullable?
 
    ui_active = Column(Boolean(), nullable=False, default=True)
 

	
 
    @classmethod
 
    def get_by_key(cls, section, key):
 
        """ Return specified Ui object, or None if not found. """
 
        return cls.query().filter_by(ui_section=section, ui_key=key).scalar()
 

	
 
    @classmethod
 
    def get_or_create(cls, section, key):
 
        """ Return specified Ui object, creating it if necessary. """
 
        setting = cls.get_by_key(section, key)
 
        if setting is None:
 
            setting = cls(ui_section=section, ui_key=key)
 
            meta.Session().add(setting)
 
        return setting
 

	
 
    @classmethod
 
    def get_custom_hooks(cls):
 
        q = cls.query()
 
        q = q.filter(cls.ui_section == 'hooks')
 
        q = q.filter(~cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE]))
 
        q = q.filter(cls.ui_active)
 
        q = q.order_by(cls.ui_section, cls.ui_key)
 
        return q.all()
 

	
 
    @classmethod
 
    def get_repos_location(cls):
 
        return cls.get_by_key('paths', '/').ui_value
 

	
 
    @classmethod
 
    def create_or_update_hook(cls, key, val):
 
        new_ui = cls.get_or_create('hooks', key)
 
        new_ui.ui_active = True
 
        new_ui.ui_value = val
 

	
 
    def __repr__(self):
 
        return '<%s %s.%s=%r>' % (
 
            self.__class__.__name__,
 
            self.ui_section, self.ui_key, self.ui_value)
 

	
 

	
 
class User(meta.Base, BaseDbModel):
 
    __tablename__ = 'users'
 
    __table_args__ = (
 
        Index('u_username_idx', 'username'),
 
        Index('u_email_idx', 'email'),
 
        _table_args_default_dict,
 
    )
 

	
 
    DEFAULT_USER_NAME = 'default'
 
    DEFAULT_GRAVATAR_URL = 'https://secure.gravatar.com/avatar/{md5email}?d=identicon&s={size}'
 
    # The name of the default auth type in extern_type, 'internal' lives in auth_internal.py
 
    DEFAULT_AUTH_TYPE = 'internal'
 

	
 
    user_id = Column(Integer(), primary_key=True)
 
    username = Column(String(255), nullable=False, unique=True)
 
    password = Column(String(255), nullable=False)
 
    active = Column(Boolean(), nullable=False, default=True)
 
    admin = Column(Boolean(), nullable=False, default=False)
 
    name = Column("firstname", Unicode(255), nullable=False)
 
    lastname = Column(Unicode(255), nullable=False)
 
    _email = Column("email", String(255), nullable=True, unique=True) # FIXME: not nullable?
 
    last_login = Column(DateTime(timezone=False), nullable=True)
 
    extern_type = Column(String(255), nullable=True) # FIXME: not nullable?
 
    extern_name = Column(String(255), nullable=True) # FIXME: not nullable?
 
    api_key = Column(String(255), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    _user_data = Column("user_data", LargeBinary(), nullable=True)  # JSON data # FIXME: not nullable?
 

	
 
    user_log = relationship('UserLog')
 
    user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
 

	
 
    repositories = relationship('Repository')
 
    repo_groups = relationship('RepoGroup')
 
    user_groups = relationship('UserGroup')
 
    user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
 
    followings = relationship('UserFollowing', primaryjoin='UserFollowing.user_id==User.user_id', cascade='all')
 

	
 
    repo_to_perm = relationship('UserRepoToPerm', primaryjoin='UserRepoToPerm.user_id==User.user_id', cascade='all')
 
    repo_group_to_perm = relationship('UserRepoGroupToPerm', primaryjoin='UserRepoGroupToPerm.user_id==User.user_id', cascade='all')
 

	
 
    group_member = relationship('UserGroupMember', cascade='all')
 

	
 
    # comments created by this user
 
    user_comments = relationship('ChangesetComment', cascade='all')
 
    # extra emails for this user
 
    user_emails = relationship('UserEmailMap', cascade='all')
 
    # extra API keys
 
    user_api_keys = relationship('UserApiKeys', cascade='all')
 
    ssh_keys = relationship('UserSshKeys', cascade='all')
 

	
 
    @hybrid_property
 
    def email(self):
 
        return self._email
 

	
 
    @email.setter
 
    def email(self, val):
 
        self._email = val.lower() if val else None
 

	
 
    @property
 
    def firstname(self):
 
        # alias for future
 
        return self.name
 

	
 
    @property
 
    def emails(self):
 
        other = UserEmailMap.query().filter(UserEmailMap.user == self).all()
 
        return [self.email] + [x.email for x in other]
 

	
 
    @property
 
    def api_keys(self):
 
        other = UserApiKeys.query().filter(UserApiKeys.user == self).all()
 
        return [self.api_key] + [x.api_key for x in other]
 

	
 
    @property
 
    def ip_addresses(self):
 
        ret = UserIpMap.query().filter(UserIpMap.user == self).all()
 
        return [x.ip_addr for x in ret]
 

	
 
    @property
 
    def full_name(self):
 
        return '%s %s' % (self.firstname, self.lastname)
 

	
 
    @property
 
    def full_name_or_username(self):
 
        """
 
        Show full name.
 
        If full name is not set, fall back to username.
 
        """
 
        return ('%s %s' % (self.firstname, self.lastname)
 
                if (self.firstname and self.lastname) else self.username)
 

	
 
    @property
 
    def full_name_and_username(self):
 
        """
 
        Show full name and username as 'Firstname Lastname (username)'.
 
        If full name is not set, fall back to username.
 
        """
 
        return ('%s %s (%s)' % (self.firstname, self.lastname, self.username)
 
                if (self.firstname and self.lastname) else self.username)
 

	
 
    @property
 
    def full_contact(self):
 
        return '%s %s <%s>' % (self.firstname, self.lastname, self.email)
 

	
 
    @property
 
    def short_contact(self):
 
        return '%s %s' % (self.firstname, self.lastname)
 

	
 
    @property
 
    def is_admin(self):
 
        return self.admin
 

	
 
    @hybrid_property
 
    def is_default_user(self):
 
        return self.username == User.DEFAULT_USER_NAME
 

	
 
    @hybrid_property
 
    def user_data(self):
 
        if not self._user_data:
 
            return {}
 

	
 
        try:
 
            return ext_json.loads(self._user_data)
 
        except TypeError:
 
            return {}
kallithea/model/scm.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.scm
 
~~~~~~~~~~~~~~~~~~~
 

	
 
Scm model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 9, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import importlib.resources
 
import logging
 
import os
 
import posixpath
 
import re
 
import sys
 
import tempfile
 
import traceback
 

	
 
import pkg_resources
 
from tg.i18n import ugettext as _
 

	
 
import kallithea
 
from kallithea.lib import hooks
 
from kallithea.lib.auth import HasPermissionAny, HasRepoGroupPermissionLevel, HasRepoPermissionLevel, HasUserGroupPermissionLevel
 
from kallithea.lib.exceptions import IMCCommitError, NonRelativePathError
 
from kallithea.lib.utils import get_filesystem_repos, make_ui
 
from kallithea.lib.utils2 import safe_bytes, safe_str, set_hook_environment, umask
 
from kallithea.lib.vcs import get_repo
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.exceptions import RepositoryError, VCSError
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.model import db, meta, userlog
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class UserTemp(object):
 
    def __init__(self, user_id):
 
        self.user_id = user_id
 

	
 
    def __repr__(self):
 
        return "<%s('id:%s')>" % (self.__class__.__name__, self.user_id)
 

	
 

	
 
class RepoTemp(object):
 
    def __init__(self, repo_id):
 
        self.repo_id = repo_id
 

	
 
    def __repr__(self):
 
        return "<%s('id:%s')>" % (self.__class__.__name__, self.repo_id)
 

	
 

	
 
class _PermCheckIterator(object):
 
    def __init__(self, obj_list, obj_attr, perm_set, perm_checker, extra_kwargs=None):
 
        """
 
        Creates iterator from given list of objects, additionally
 
        checking permission for them from perm_set var
 

	
 
        :param obj_list: list of db objects
 
        :param obj_attr: attribute of object to pass into perm_checker
 
        :param perm_set: list of permissions to check
 
        :param perm_checker: callable to check permissions against
 
        """
 
        self.obj_list = obj_list
 
        self.obj_attr = obj_attr
 
        self.perm_set = perm_set
 
        self.perm_checker = perm_checker
 
        self.extra_kwargs = extra_kwargs or {}
 

	
 
    def __len__(self):
 
        return len(self.obj_list)
 

	
 
    def __repr__(self):
 
        return '<%s (%s)>' % (self.__class__.__name__, self.__len__())
 

	
 
    def __iter__(self):
 
        for db_obj in self.obj_list:
 
            # check permission at this level
 
            name = getattr(db_obj, self.obj_attr, None)
 
            if not self.perm_checker(*self.perm_set)(
 
                    name, self.__class__.__name__, **self.extra_kwargs):
 
                continue
 

	
 
            yield db_obj
 

	
 

	
 
class RepoList(_PermCheckIterator):
 

	
 
    def __init__(self, db_repo_list, perm_level, extra_kwargs=None):
 
        super(RepoList, self).__init__(obj_list=db_repo_list,
 
                    obj_attr='repo_name', perm_set=[perm_level],
 
                    perm_checker=HasRepoPermissionLevel,
 
                    extra_kwargs=extra_kwargs)
 

	
 

	
 
class RepoGroupList(_PermCheckIterator):
 

	
 
    def __init__(self, db_repo_group_list, perm_level, extra_kwargs=None):
 
        super(RepoGroupList, self).__init__(obj_list=db_repo_group_list,
 
                    obj_attr='group_name', perm_set=[perm_level],
 
                    perm_checker=HasRepoGroupPermissionLevel,
 
                    extra_kwargs=extra_kwargs)
 

	
 

	
 
class UserGroupList(_PermCheckIterator):
 

	
 
    def __init__(self, db_user_group_list, perm_level, extra_kwargs=None):
 
        super(UserGroupList, self).__init__(obj_list=db_user_group_list,
 
                    obj_attr='users_group_name', perm_set=[perm_level],
 
                    perm_checker=HasUserGroupPermissionLevel,
 
                    extra_kwargs=extra_kwargs)
 

	
 

	
 
class ScmModel(object):
 
    """
 
    Generic Scm Model
 
    """
 

	
 
    def __get_repo(self, instance):
 
        cls = db.Repository
 
        if isinstance(instance, cls):
 
            return instance
 
        elif isinstance(instance, int):
 
            return cls.get(instance)
 
        elif isinstance(instance, str):
 
            if instance.isdigit():
 
                return cls.get(int(instance))
 
            return cls.get_by_repo_name(instance)
 
        raise Exception('given object must be int, basestr or Instance'
 
                        ' of %s got %s' % (type(cls), type(instance)))
 

	
 
    @LazyProperty
 
    def repos_path(self):
 
        """
 
        Gets the repositories root path from database
 
        """
 

	
 
        q = db.Ui.query().filter(db.Ui.ui_key == '/').one()
 

	
 
        return q.ui_value
 

	
 
    def repo_scan(self, repos_path=None):
 
        """
 
        Listing of repositories in given path. This path should not be a
 
        repository itself. Return a dictionary of repository objects mapping to
 
        vcs instances.
 

	
 
        :param repos_path: path to directory containing repositories
 
        """
 

	
 
        if repos_path is None:
 
            repos_path = self.repos_path
 

	
 
        log.info('scanning for repositories in %s', repos_path)
 

	
 
        repos = {}
 

	
 
        for name, path in get_filesystem_repos(repos_path):
 
            # name need to be decomposed and put back together using the /
 
            # since this is internal storage separator for kallithea
 
            name = db.Repository.normalize_repo_name(name)
 

	
 
            try:
 
                if name in repos:
 
                    raise RepositoryError('Duplicate repository name %s '
 
                                          'found in %s' % (name, path))
 
                else:
 
                    repos[name] = get_repo(path[1], baseui=make_ui(path[1]))
 
            except (OSError, VCSError):
 
                continue
 
        log.debug('found %s paths with repositories', len(repos))
 
        return repos
 

	
 
    def get_repos(self, repos):
 
        """Return the repos the user has access to"""
 
        return RepoList(repos, perm_level='read')
 

	
 
    def get_repo_groups(self, groups=None):
 
        """Return the repo groups the user has access to
 
        If no groups are specified, use top level groups.
 
        """
 
        if groups is None:
 
            groups = db.RepoGroup.query() \
 
                .filter(db.RepoGroup.parent_group_id == None).all()
 
        return RepoGroupList(groups, perm_level='read')
 

	
 
    def mark_for_invalidation(self, repo_name):
 
        """
 
        Mark caches of this repo invalid in the database.
 

	
 
        :param repo_name: the repo for which caches should be marked invalid
 
        """
 
        log.debug("Marking %s as invalidated and update cache", repo_name)
 
        repo = db.Repository.get_by_repo_name(repo_name)
 
        if repo is not None:
 
            repo.set_invalidate()
 
            repo.update_changeset_cache()
 

	
 
    def toggle_following_repo(self, follow_repo_id, user_id):
 

	
 
        f = db.UserFollowing.query() \
 
            .filter(db.UserFollowing.follows_repository_id == follow_repo_id) \
 
            .filter(db.UserFollowing.user_id == user_id).scalar()
 

	
 
        if f is not None:
 
            try:
 
                meta.Session().delete(f)
 
                userlog.action_logger(UserTemp(user_id),
 
                              'stopped_following_repo',
 
@@ -481,256 +481,255 @@ class ScmModel(object):
 
                              revisions=[tip.raw_id])
 
        else:
 
            self.mark_for_invalidation(repo.repo_name)
 
        return tip
 

	
 
    def update_nodes(self, user, ip_addr, repo, message, nodes, parent_cs=None,
 
                     author=None, trigger_push_hook=True):
 
        """
 
        Commits specified nodes to repo. Again.
 
        """
 
        user = db.User.guess_instance(user)
 
        scm_instance = repo.scm_instance_no_cache()
 

	
 
        committer = user.full_contact
 
        if not author:
 
            author = committer
 

	
 
        if not parent_cs:
 
            parent_cs = EmptyChangeset(alias=scm_instance.alias)
 

	
 
        if isinstance(parent_cs, EmptyChangeset):
 
            # EmptyChangeset means we we're editing empty repository
 
            parents = None
 
        else:
 
            parents = [parent_cs]
 

	
 
        # add multiple nodes
 
        imc = scm_instance.in_memory_changeset
 
        for _filename, data in nodes.items():
 
            # new filename, can be renamed from the old one
 
            filename = self._sanitize_path(data['filename'])
 
            old_filename = self._sanitize_path(_filename)
 
            content = data['content']
 

	
 
            filenode = FileNode(old_filename, content=content)
 
            op = data['op']
 
            if op == 'add':
 
                imc.add(filenode)
 
            elif op == 'del':
 
                imc.remove(filenode)
 
            elif op == 'mod':
 
                if filename != old_filename:
 
                    # TODO: handle renames, needs vcs lib changes
 
                    imc.remove(filenode)
 
                    imc.add(FileNode(filename, content=content))
 
                else:
 
                    imc.change(filenode)
 

	
 
        # commit changes
 
        tip = imc.commit(message=message,
 
                         author=author,
 
                         parents=parents,
 
                         branch=parent_cs.branch)
 

	
 
        if trigger_push_hook:
 
            self._handle_push(scm_instance,
 
                              username=user.username,
 
                              ip_addr=ip_addr,
 
                              action='push_local',
 
                              repo_name=repo.repo_name,
 
                              revisions=[tip.raw_id])
 
        else:
 
            self.mark_for_invalidation(repo.repo_name)
 

	
 
    def delete_nodes(self, user, ip_addr, repo, message, nodes, parent_cs=None,
 
                     author=None, trigger_push_hook=True):
 
        """
 
        Deletes specified nodes from repo.
 

	
 
        :param user: Kallithea User object or user_id, the committer
 
        :param repo: Kallithea Repository object
 
        :param message: commit message
 
        :param nodes: mapping {filename:{'content':content},...}
 
        :param parent_cs: parent changeset, can be empty than it's initial commit
 
        :param author: author of commit, cna be different that committer only for git
 
        :param trigger_push_hook: trigger push hooks
 

	
 
        :returns: new committed changeset after deletion
 
        """
 

	
 
        user = db.User.guess_instance(user)
 
        scm_instance = repo.scm_instance_no_cache()
 

	
 
        processed_nodes = []
 
        for f_path in nodes:
 
            f_path = self._sanitize_path(f_path)
 
            # content can be empty but for compatibility it allows same dicts
 
            # structure as add_nodes
 
            content = nodes[f_path].get('content')
 
            processed_nodes.append((f_path, content))
 

	
 
        committer = user.full_contact
 
        if not author:
 
            author = committer
 

	
 
        if not parent_cs:
 
            parent_cs = EmptyChangeset(alias=scm_instance.alias)
 

	
 
        if isinstance(parent_cs, EmptyChangeset):
 
            # EmptyChangeset means we we're editing empty repository
 
            parents = None
 
        else:
 
            parents = [parent_cs]
 
        # add multiple nodes
 
        imc = scm_instance.in_memory_changeset
 
        for path, content in processed_nodes:
 
            imc.remove(FileNode(path, content=content))
 

	
 
        tip = imc.commit(message=message,
 
                         author=author,
 
                         parents=parents,
 
                         branch=parent_cs.branch)
 

	
 
        if trigger_push_hook:
 
            self._handle_push(scm_instance,
 
                              username=user.username,
 
                              ip_addr=ip_addr,
 
                              action='push_local',
 
                              repo_name=repo.repo_name,
 
                              revisions=[tip.raw_id])
 
        else:
 
            self.mark_for_invalidation(repo.repo_name)
 
        return tip
 

	
 
    def get_unread_journal(self):
 
        return db.UserLog.query().count()
 

	
 
    def get_repo_landing_revs(self, repo=None):
 
        """
 
        Generates select option with tags branches and bookmarks (for hg only)
 
        grouped by type
 

	
 
        :param repo:
 
        """
 

	
 
        hist_l = []
 
        choices = []
 
        hist_l.append(('rev:tip', _('latest tip')))
 
        choices.append('rev:tip')
 
        if repo is None:
 
            return choices, hist_l
 

	
 
        repo = self.__get_repo(repo)
 
        repo = repo.scm_instance
 

	
 
        branches_group = ([('branch:%s' % k, k) for k, v in
 
                           repo.branches.items()], _("Branches"))
 
        hist_l.append(branches_group)
 
        choices.extend([x[0] for x in branches_group[0]])
 

	
 
        if repo.alias == 'hg':
 
            bookmarks_group = ([('book:%s' % k, k) for k, v in
 
                                repo.bookmarks.items()], _("Bookmarks"))
 
            hist_l.append(bookmarks_group)
 
            choices.extend([x[0] for x in bookmarks_group[0]])
 

	
 
        tags_group = ([('tag:%s' % k, k) for k, v in
 
                       repo.tags.items()], _("Tags"))
 
        hist_l.append(tags_group)
 
        choices.extend([x[0] for x in tags_group[0]])
 

	
 
        return choices, hist_l
 

	
 
    def _get_git_hook_interpreter(self):
 
        """Return a suitable interpreter for Git hooks.
 

	
 
        Return a suitable string to be written in the POSIX #! shebang line for
 
        Git hook scripts so they invoke Kallithea code with the right Python
 
        interpreter and in the right environment.
 
        """
 
        # Note: sys.executable might not point at a usable Python interpreter. For
 
        # example, when using uwsgi, it will point at the uwsgi program itself.
 
        # FIXME This may not work on Windows and may need a shell wrapper script.
 
        return (kallithea.CONFIG.get('git_hook_interpreter')
 
                or sys.executable
 
                or '/usr/bin/env python3')
 

	
 
    def install_git_hooks(self, repo, force=False):
 
        """
 
        Creates a kallithea hook inside a git repository
 

	
 
        :param repo: Instance of VCS repo
 
        :param force: Overwrite existing non-Kallithea hooks
 
        """
 

	
 
        hooks_path = os.path.join(repo.path, 'hooks')
 
        if not repo.bare:
 
            hooks_path = os.path.join(repo.path, '.git', 'hooks')
 
        if not os.path.isdir(hooks_path):
 
            os.makedirs(hooks_path)
 

	
 
        tmpl_post = b"#!%s\n" % safe_bytes(self._get_git_hook_interpreter())
 
        tmpl_post += pkg_resources.resource_string(
 
            'kallithea', os.path.join('templates', 'py', 'git_post_receive_hook.py')
 
        )
 
        tmpl_post += importlib.resources.files('kallithea').joinpath(
 
            'templates', 'py', 'git_post_receive_hook.py').read_bytes()
 

	
 
        for h_type, tmpl in [('pre-receive', None), ('post-receive', tmpl_post)]:
 
            hook_file = os.path.join(hooks_path, h_type)
 
            other_hook = False
 
            log.debug('Installing git hook %s in repo %s', h_type, repo.path)
 
            if os.path.islink(hook_file):
 
                log.debug("Found symlink hook at %s", hook_file)
 
                other_hook = True
 
            elif os.path.isfile(hook_file):
 
                log.debug('hook file %s exists, checking if it is from kallithea', hook_file)
 
                with open(hook_file, 'rb') as f:
 
                    data = f.read()
 
                    matches = re.search(br'^KALLITHEA_HOOK_VER\s*=\s*(.*)$', data, flags=re.MULTILINE)
 
                    if matches:
 
                        ver = safe_str(matches.group(1))
 
                        log.debug('Found Kallithea hook - it has KALLITHEA_HOOK_VER %s', ver)
 
                    else:
 
                        log.debug('Found non-Kallithea hook at %s', hook_file)
 
                        other_hook = True
 
            elif os.path.exists(hook_file):
 
                log.debug("Found hook that isn't a regular file at %s", hook_file)
 
                other_hook = True
 
            if other_hook and not force:
 
                log.warning('skipping overwriting hook file %s', hook_file)
 
            elif h_type == 'post-receive':
 
                log.debug('writing hook file %s', hook_file)
 
                if other_hook:
 
                    backup_file = hook_file + '.bak'
 
                    log.warning('moving existing hook to %s', backup_file)
 
                    os.rename(hook_file, backup_file)
 
                try:
 
                    fh, fn = tempfile.mkstemp(prefix=hook_file + '.tmp.')
 
                    os.write(fh, tmpl.replace(b'_TMPL_', safe_bytes(kallithea.__version__)))
 
                    os.close(fh)
 
                    os.chmod(fn, 0o777 & ~umask)
 
                    os.rename(fn, hook_file)
 
                except (OSError, IOError) as e:
 
                    log.error('error writing hook %s: %s', hook_file, e)
 
            elif h_type == 'pre-receive':  # no longer used, so just remove any existing Kallithea hook
 
                if os.path.lexists(hook_file) and not other_hook:
 
                    log.warning('removing existing unused Kallithea hook %s', hook_file)
 
                    os.remove(hook_file)
 

	
 

	
 
def AvailableRepoGroupChoices(repo_group_perm_level, extras=()):
 
    """Return group_id,string tuples with choices for all the repo groups where
 
    the user has the necessary permissions.
 

	
 
    Top level is -1.
 
    """
 
    groups = db.RepoGroup.query().all()
 
    if HasPermissionAny('hg.admin')('available repo groups'):
 
        groups.append(None)
 
    else:
 
        groups = list(RepoGroupList(groups, perm_level=repo_group_perm_level))
 
        if HasPermissionAny('hg.create.repository')('available repo groups'):
 
            groups.append(None)
 
        for extra in extras:
 
            if not any(rg == extra for rg in groups):
 
                groups.append(extra)
 
    return db.RepoGroup.groups_choices(groups=groups)
scripts/deps.py
Show inline comments
 
#!/usr/bin/env python3
 

	
 

	
 
import re
 
import sys
 

	
 

	
 
ignored_modules = set('''
 
argparse
 
base64
 
bcrypt
 
binascii
 
bleach
 
calendar
 
celery
 
celery
 
chardet
 
click
 
collections
 
configparser
 
copy
 
csv
 
ctypes
 
datetime
 
dateutil
 
decimal
 
decorator
 
difflib
 
distutils
 
docutils
 
email
 
errno
 
fileinput
 
functools
 
getpass
 
grp
 
hashlib
 
hmac
 
html
 
http
 
imp
 
importlib
 
inspect
 
io
 
ipaddr
 
IPython
 
isapi_wsgi
 
itertools
 
json
 
kajiki
 
ldap
 
logging
 
mako
 
markdown
 
mimetypes
 
mock
 
msvcrt
 
multiprocessing
 
operator
 
os
 
paginate
 
paginate_sqlalchemy
 
pam
 
paste
 
pathlib
 
pkg_resources
 
platform
 
posixpath
 
pprint
 
pwd
 
pyflakes
 
pytest
 
pytest_localserver
 
random
 
re
 
routes
 
setuptools
 
shlex
 
shutil
 
smtplib
 
socket
 
ssl
 
stat
 
string
 
struct
 
subprocess
 
sys
 
tarfile
 
tempfile
 
textwrap
 
tgext
 
threading
 
time
 
traceback
 
traitlets
 
types
 
typing
 
urllib
 
urlobject
 
uuid
 
warnings
 
webhelpers2
 
webob
 
webtest
 
whoosh
 
win32traceutil
 
zipfile
 
'''.split())
 

	
 
top_modules = set('''
 
kallithea.alembic
 
kallithea.bin
 
kallithea.config
 
kallithea.controllers
 
kallithea.templates.py
 
scripts
 
'''.split())
 

	
 
bottom_external_modules = set('''
 
tg
 
mercurial
 
sqlalchemy
 
alembic
 
formencode
 
pygments
 
dulwich
 
beaker
 
psycopg2
 
docs
 
setup
 
conftest
 
packaging
 
'''.split())
 

	
 
normal_modules = set('''
 
kallithea
 
kallithea.controllers.base
 
kallithea.lib
 
kallithea.lib.auth
 
kallithea.lib.auth_modules
 
kallithea.lib.celerylib
 
kallithea.lib.db_manage
 
kallithea.lib.helpers
 
kallithea.lib.hooks
 
kallithea.lib.indexers
 
kallithea.lib.utils
 
kallithea.lib.utils2
 
kallithea.lib.vcs
 
kallithea.lib.webutils
 
kallithea.model
 
kallithea.model.async_tasks
 
kallithea.model.scm
 
kallithea.templates.py
 
'''.split())
 

	
 
shown_modules = normal_modules | top_modules
 

	
 
# break the chains somehow - this is a cleanup TODO list
 
known_violations = set([
 
('kallithea.lib.auth_modules', 'kallithea.lib.auth'),  # needs base&facade
 
('kallithea.lib.utils', 'kallithea.model'),  # clean up utils
 
('kallithea.lib.utils', 'kallithea.model.db'),
 
('kallithea.lib.utils', 'kallithea.model.scm'),
 
('kallithea.model', 'kallithea.lib.auth'),  # auth.HasXXX
 
('kallithea.model', 'kallithea.lib.auth_modules'),  # validators
 
('kallithea.model', 'kallithea.lib.hooks'),  # clean up hooks
 
('kallithea.model', 'kallithea.model.scm'),
 
('kallithea.model.scm', 'kallithea.lib.hooks'),
 
])
 

	
 
extra_edges = [
 
('kallithea.config', 'kallithea.controllers'),  # through TG
 
('kallithea.lib.auth', 'kallithea.lib.auth_modules'),  # custom loader
 
]
 

	
 

	
 
def normalize(s):
 
    """Given a string with dot path, return the string it should be shown as."""
 
    parts = s.replace('.__init__', '').split('.')
 
    short_2 = '.'.join(parts[:2])
 
    short_3 = '.'.join(parts[:3])
 
    short_4 = '.'.join(parts[:4])
 
    if parts[0] in ['scripts', 'contributor_data', 'i18n_utils']:
 
        return 'scripts'
 
    if short_3 == 'kallithea.model.meta':
 
        return 'kallithea.model.db'
 
    if parts[:4] == ['kallithea', 'lib', 'vcs', 'ssh']:
 
        return 'kallithea.lib.vcs.ssh'
 
    if short_4 in shown_modules:
 
        return short_4
 
    if short_3 in shown_modules:
 
        return short_3
 
    if short_2 in shown_modules:
 
        return short_2
 
    if short_2 == 'kallithea.tests':
 
        return None
 
    if parts[0] in ignored_modules:
 
        return None
 
    assert parts[0] in bottom_external_modules, parts
 
    return parts[0]
 

	
 

	
 
def main(filenames):
 
    if not filenames or filenames[0].startswith('-'):
 
        print('''\
 
Usage:
 
    hg files 'set:!binary()&grep("^#!.*python")' 'set:**.py' | xargs scripts/deps.py
 
    dot -Tsvg deps.dot > deps.svg
 
        ''')
 
        raise SystemExit(1)
 

	
 
    files_imports = dict()  # map filenames to its imports
 
    import_deps = set()  # set of tuples with module name and its imports
 
    for fn in filenames:
 
        with open(fn) as f:
 
            s = f.read()
 

	
 
        dot_name = (fn[:-3] if fn.endswith('.py') else fn).replace('/', '.')
 
        file_imports = set()
 
        for m in re.finditer(r'^ *(?:from ([^ ]*) import (?:([a-zA-Z].*)|\(([^)]*)\))|import (.*))$', s, re.MULTILINE):
 
            m_from, m_from_import, m_from_import2, m_import = m.groups()
 
            if m_from:
 
                pre = m_from + '.'
 
                if pre.startswith('.'):
 
                    pre = dot_name.rsplit('.', 1)[0] + pre
 
                importlist = m_from_import or m_from_import2
 
            else:
 
                pre = ''
 
                importlist = m_import
 
            for imp in importlist.split('#', 1)[0].split(','):
 
                full_imp = pre + imp.strip().split(' as ', 1)[0]
 
                file_imports.add(full_imp)
 
                import_deps.add((dot_name, full_imp))
 
        files_imports[fn] = file_imports
 

	
 
    # dump out all deps for debugging and analysis
 
    with open('deps.txt', 'w') as f:
 
        for fn, file_imports in sorted(files_imports.items()):
 
            for file_import in sorted(file_imports):
 
                if file_import.split('.', 1)[0] in ignored_modules:
 
                    continue
 
                f.write('%s: %s\n' % (fn, file_import))
 

	
 
    # find leafs that haven't been ignored - they are the important external dependencies and shown in the bottom row
 
    only_imported = set(
 
        set(normalize(b) for a, b in import_deps) -
 
        set(normalize(a) for a, b in import_deps) -
 
        set([None, 'kallithea'])
 
    )
 

	
 
    normalized_dep_edges = set()
 
    for dot_name, full_imp in import_deps:
 
        a = normalize(dot_name)
 
        b = normalize(full_imp)
 
        if a is None or b is None or a == b:
 
            continue
 
        normalized_dep_edges.add((a, b))
 
        #print((dot_name, full_imp, a, b))
0 comments (0 inline, 0 general)