Changeset - 7b0aafc6b7ca
[Not reviewed]
default
0 2 0
Mads Kiilerich (mads) - 6 years ago 2020-06-15 17:27:19
mads@kiilerich.com
Grafted from: 5273e61453eb
mysql: create database with explicit UTF-8 character set and collation

A spin-off from Issue #378.

In MySQL, the character sets for server, database, tables, and connection are
set independently. Ideally, they should all use UTF-8, but systems tend to use
latin1 as default encoding, for example:

character_set_server = latin1
collation_server = latin1_swedish_ci

Databases would thus by default be created as:

character_set_database = latin1
collation_database = latin1_swedish_ci

To make things work consistently anyway, we have so far specified the utf8mb4
charset explicitly when creating tables, but there is no corresponding simple
option for specifying the collation for tables. We need a better solution.

If necessary and possible, the system charset and collation should be set to
UTF-8. Some systems already have these defaults default - see
https://mariadb.com/kb/en/differences-in-mariadb-in-debian-and-ubuntu/ .
The defaults can be changed as described on
https://mariadb.com/kb/en/setting-character-sets-and-collations/#example-changing-the-default-character-set-to-utf-8
to give something like:

character_set_server = utf8mb4
collation_server = utf8mb4_unicode_ci

Databases will then by default be created as:

character_set_database = utf8mb4
collation_database = utf8mb4_unicode_ci

and there is thus no longer any need for specifying the charset when creating
tables.

To be reasonably resilient across all systems without relying on system
defaults, we will now start specifying the charset and collation when creating
the database, but drop the specification of charset when creating tables.

For existing databases, it is recommended to change encoding (and collation) by
altering the database and each of the tables inside it as described on
https://stackoverflow.com/questions/6115612/how-to-convert-an-entire-mysql-database-characterset-and-collation-to-utf-8 .

Note the use of utf8mb4_unicode_ci instead of utf8mb4_general_ci - see
https://stackoverflow.com/questions/766809/whats-the-difference-between-utf8-general-ci-and-utf8-unicode-ci .

For investigation of these issues, consider the output from:
show variables like '%char%';
show variables like '%collation%';
show create database `KALLITHEA_DB_NAME`;
SELECT * FROM information_schema.SCHEMATA WHERE schema_name = "KALLITHEA_DB_NAME";
SELECT * FROM information_schema.TABLES T, information_schema.COLLATION_CHARACTER_SET_APPLICABILITY CCSA WHERE CCSA.collation_name = T.table_collation AND T.table_schema = "KALLITHEA_DB_NAME";
2 files changed with 1 insertions and 2 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/db_manage.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.db_manage
 
~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Database creation, and setup module for Kallithea. Used for creation
 
of database as well as for migration operations
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 10, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import os
 
import sys
 
import uuid
 

	
 
import alembic.command
 
import alembic.config
 
import sqlalchemy
 
from sqlalchemy.engine import create_engine
 

	
 
from kallithea.model.base import init_model
 
from kallithea.model.db import Repository, Setting, Ui, User
 
from kallithea.model.meta import Base, Session
 
from kallithea.model.permission import PermissionModel
 
from kallithea.model.user import UserModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class DbManage(object):
 
    def __init__(self, dbconf, root, tests=False, SESSION=None, cli_args=None):
 
        self.dbname = dbconf.split('/')[-1]
 
        self.tests = tests
 
        self.root = root
 
        self.dburi = dbconf
 
        self.cli_args = cli_args or {}
 
        self.init_db(SESSION=SESSION)
 

	
 
    def _ask_ok(self, msg):
 
        """Invoke ask_ok unless the force_ask option provides the answer"""
 
        force_ask = self.cli_args.get('force_ask')
 
        if force_ask is not None:
 
            return force_ask
 
        from kallithea.lib.utils2 import ask_ok
 
        return ask_ok(msg)
 

	
 
    def init_db(self, SESSION=None):
 
        if SESSION:
 
            self.sa = SESSION
 
        else:
 
            # init new sessions
 
            engine = create_engine(self.dburi)
 
            init_model(engine)
 
            self.sa = Session()
 

	
 
    def create_tables(self, override=False):
 
        """
 
        Create a auth database
 
        """
 

	
 
        log.info("Any existing database is going to be destroyed")
 
        if self.tests:
 
            destroy = True
 
        else:
 
            destroy = self._ask_ok('Are you sure to destroy old database ? [y/n]')
 
        if not destroy:
 
            print('Nothing done.')
 
            sys.exit(0)
 
        if destroy:
 
            # drop and re-create old schemas
 

	
 
            url = sqlalchemy.engine.url.make_url(self.dburi)
 
            database = url.database
 

	
 
            # Some databases enforce foreign key constraints and Base.metadata.drop_all() doesn't work
 
            if url.drivername == 'mysql':
 
                url.database = None  # don't connect to the database (it might not exist)
 
                engine = sqlalchemy.create_engine(url)
 
                with engine.connect() as conn:
 
                    conn.execute('DROP DATABASE IF EXISTS `%s`' % database)
 
                    conn.execute('CREATE DATABASE `%s`' % database)
 
                    conn.execute('CREATE DATABASE `%s` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci' % database)
 
            elif url.drivername == 'postgresql':
 
                from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
 
                url.database = 'postgres'  # connect to the system database (as the real one might not exist)
 
                engine = sqlalchemy.create_engine(url)
 
                with engine.connect() as conn:
 
                    conn.connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
 
                    conn.execute('DROP DATABASE IF EXISTS "%s"' % database)
 
                    conn.execute('CREATE DATABASE "%s"' % database)
 
            else:
 
                # known to work on SQLite - possibly not on other databases with strong referential integrity
 
                Base.metadata.drop_all()
 

	
 
        checkfirst = not override
 
        Base.metadata.create_all(checkfirst=checkfirst)
 

	
 
        # Create an Alembic configuration and generate the version table,
 
        # "stamping" it with the most recent Alembic migration revision, to
 
        # tell Alembic that all the schema upgrades are already in effect.
 
        alembic_cfg = alembic.config.Config()
 
        alembic_cfg.set_main_option('script_location', 'kallithea:alembic')
 
        alembic_cfg.set_main_option('sqlalchemy.url', self.dburi)
 
        # This command will give an error in an Alembic multi-head scenario,
 
        # but in practice, such a scenario should not come up during database
 
        # creation, even during development.
 
        alembic.command.stamp(alembic_cfg, 'head')
 

	
 
        log.info('Created tables for %s', self.dbname)
 

	
 
    def admin_prompt(self, second=False):
 
        if not self.tests:
 
            import getpass
 

	
 
            username = self.cli_args.get('username')
 
            password = self.cli_args.get('password')
 
            email = self.cli_args.get('email')
 

	
 
            def get_password():
 
                password = getpass.getpass('Specify admin password '
 
                                           '(min 6 chars):')
 
                confirm = getpass.getpass('Confirm password:')
 

	
 
                if password != confirm:
 
                    log.error('passwords mismatch')
 
                    return False
 
                if len(password) < 6:
 
                    log.error('password is to short use at least 6 characters')
 
                    return False
 

	
 
                return password
 
            if username is None:
 
                username = input('Specify admin username:')
 
            if password is None:
 
                password = get_password()
 
                if not password:
 
                    # second try
 
                    password = get_password()
 
                    if not password:
 
                        sys.exit()
 
            if email is None:
 
                email = input('Specify admin email:')
 
            self.create_user(username, password, email, True)
 
        else:
 
            log.info('creating admin and regular test users')
 
            from kallithea.tests.base import TEST_USER_ADMIN_LOGIN, \
 
                TEST_USER_ADMIN_PASS, TEST_USER_ADMIN_EMAIL, \
 
                TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS, \
 
                TEST_USER_REGULAR_EMAIL, TEST_USER_REGULAR2_LOGIN, \
 
                TEST_USER_REGULAR2_PASS, TEST_USER_REGULAR2_EMAIL
 

	
 
            self.create_user(TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS,
 
                             TEST_USER_ADMIN_EMAIL, True)
 

	
 
            self.create_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS,
 
                             TEST_USER_REGULAR_EMAIL, False)
 

	
 
            self.create_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS,
 
                             TEST_USER_REGULAR2_EMAIL, False)
 

	
 
    def create_auth_plugin_options(self, skip_existing=False):
 
        """
 
        Create default auth plugin settings, and make it active
 

	
 
        :param skip_existing:
 
        """
 

	
 
        for k, v, t in [('auth_plugins', 'kallithea.lib.auth_modules.auth_internal', 'list'),
 
                        ('auth_internal_enabled', 'True', 'bool')]:
 
            if skip_existing and Setting.get_by_name(k) is not None:
 
                log.debug('Skipping option %s', k)
 
                continue
 
            setting = Setting(k, v, t)
 
            self.sa.add(setting)
 

	
 
    def create_default_options(self, skip_existing=False):
 
        """Creates default settings"""
 

	
 
        for k, v, t in [
 
            ('default_repo_enable_downloads', False, 'bool'),
 
            ('default_repo_enable_statistics', False, 'bool'),
 
            ('default_repo_private', False, 'bool'),
 
            ('default_repo_type', 'hg', 'unicode')
 
        ]:
 
            if skip_existing and Setting.get_by_name(k) is not None:
 
                log.debug('Skipping option %s', k)
 
                continue
 
            setting = Setting(k, v, t)
 
            self.sa.add(setting)
 

	
 
    def prompt_repo_root_path(self, test_repo_path='', retries=3):
 
        _path = self.cli_args.get('repos_location')
 
        if retries == 3:
 
            log.info('Setting up repositories config')
 

	
 
        if _path is not None:
 
            path = _path
 
        elif not self.tests and not test_repo_path:
 
            path = input(
 
                 'Enter a valid absolute path to store repositories. '
 
                 'All repositories in that path will be added automatically:'
 
            )
 
        else:
 
            path = test_repo_path
 
        path_ok = True
 

	
 
        # check proper dir
 
        if not os.path.isdir(path):
 
            path_ok = False
 
            log.error('Given path %s is not a valid directory', path)
 

	
 
        elif not os.path.isabs(path):
 
            path_ok = False
 
            log.error('Given path %s is not an absolute path', path)
 

	
 
        # check if path is at least readable.
 
        if not os.access(path, os.R_OK):
 
            path_ok = False
 
            log.error('Given path %s is not readable', path)
 

	
 
        # check write access, warn user about non writeable paths
 
        elif not os.access(path, os.W_OK) and path_ok:
 
            log.warning('No write permission to given path %s', path)
 
            if not self._ask_ok('Given path %s is not writeable, do you want to '
 
                          'continue with read only mode ? [y/n]' % (path,)):
 
                log.error('Canceled by user')
 
                sys.exit(-1)
 

	
 
        if retries == 0:
 
            sys.exit('max retries reached')
 
        if not path_ok:
 
            if _path is not None:
 
                sys.exit('Invalid repo path: %s' % _path)
 
            retries -= 1
 
            return self.prompt_repo_root_path(test_repo_path, retries) # recursing!!!
 

	
 
        real_path = os.path.normpath(os.path.realpath(path))
 

	
 
        if real_path != os.path.normpath(path):
 
            log.warning('Using normalized path %s instead of %s', real_path, path)
 

	
 
        return real_path
 

	
 
    def create_settings(self, repo_root_path):
 
        ui_config = [
 
            ('paths', '/', repo_root_path, True),
 
            #('phases', 'publish', 'false', False)
 
            ('hooks', Ui.HOOK_UPDATE, 'hg update >&2', False),
 
            ('hooks', Ui.HOOK_REPO_SIZE, 'python:kallithea.lib.hooks.repo_size', True),
 
            ('extensions', 'largefiles', '', True),
 
            ('largefiles', 'usercache', os.path.join(repo_root_path, '.cache', 'largefiles'), True),
 
            ('extensions', 'hgsubversion', '', False),
 
            ('extensions', 'hggit', '', False),
 
        ]
 
        for ui_section, ui_key, ui_value, ui_active in ui_config:
 
            ui_conf = Ui(
 
                ui_section=ui_section,
 
                ui_key=ui_key,
 
                ui_value=ui_value,
 
                ui_active=ui_active)
 
            self.sa.add(ui_conf)
 

	
 
        settings = [
 
            ('realm', 'Kallithea', 'unicode'),
 
            ('title', '', 'unicode'),
 
            ('ga_code', '', 'unicode'),
 
            ('show_public_icon', True, 'bool'),
 
            ('show_private_icon', True, 'bool'),
 
            ('stylify_metalabels', False, 'bool'),
 
            ('dashboard_items', 100, 'int'), # TODO: call it page_size
 
            ('admin_grid_items', 25, 'int'),
 
            ('show_version', True, 'bool'),
 
            ('use_gravatar', True, 'bool'),
 
            ('gravatar_url', User.DEFAULT_GRAVATAR_URL, 'unicode'),
kallithea/model/db.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.db
 
~~~~~~~~~~~~~~~~~~
 

	
 
Database Models for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 08, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import base64
 
import collections
 
import datetime
 
import functools
 
import hashlib
 
import logging
 
import os
 
import time
 
import traceback
 

	
 
import ipaddr
 
import sqlalchemy
 
from sqlalchemy import Boolean, Column, DateTime, Float, ForeignKey, Index, Integer, LargeBinary, String, Unicode, UnicodeText, UniqueConstraint
 
from sqlalchemy.ext.hybrid import hybrid_property
 
from sqlalchemy.orm import class_mapper, joinedload, relationship, validates
 
from tg.i18n import lazy_ugettext as _
 
from webob.exc import HTTPNotFound
 

	
 
import kallithea
 
from kallithea.lib import ext_json
 
from kallithea.lib.exceptions import DefaultUserException
 
from kallithea.lib.utils2 import (Optional, asbool, ascii_bytes, aslist, get_changeset_safe, get_clone_url, remove_prefix, safe_bytes, safe_int, safe_str,
 
                                  urlreadable)
 
from kallithea.lib.vcs import get_backend
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.model.meta import Base, Session
 

	
 

	
 
URL_SEP = '/'
 
log = logging.getLogger(__name__)
 

	
 
#==============================================================================
 
# BASE CLASSES
 
#==============================================================================
 

	
 
class BaseDbModel(object):
 
    """
 
    Base Model for all classes
 
    """
 

	
 
    @classmethod
 
    def _get_keys(cls):
 
        """return column names for this model """
 
        # Note: not a normal dict - iterator gives "users.firstname", but keys gives "firstname"
 
        return class_mapper(cls).c.keys()
 

	
 
    def get_dict(self):
 
        """
 
        return dict with keys and values corresponding
 
        to this model data """
 

	
 
        d = {}
 
        for k in self._get_keys():
 
            d[k] = getattr(self, k)
 

	
 
        # also use __json__() if present to get additional fields
 
        _json_attr = getattr(self, '__json__', None)
 
        if _json_attr:
 
            # update with attributes from __json__
 
            if callable(_json_attr):
 
                _json_attr = _json_attr()
 
            for k, val in _json_attr.items():
 
                d[k] = val
 
        return d
 

	
 
    def get_appstruct(self):
 
        """return list with keys and values tuples corresponding
 
        to this model data """
 

	
 
        return [
 
            (k, getattr(self, k))
 
            for k in self._get_keys()
 
        ]
 

	
 
    def populate_obj(self, populate_dict):
 
        """populate model with data from given populate_dict"""
 

	
 
        for k in self._get_keys():
 
            if k in populate_dict:
 
                setattr(self, k, populate_dict[k])
 

	
 
    @classmethod
 
    def query(cls):
 
        return Session().query(cls)
 

	
 
    @classmethod
 
    def get(cls, id_):
 
        if id_:
 
            return cls.query().get(id_)
 

	
 
    @classmethod
 
    def guess_instance(cls, value, callback=None):
 
        """Haphazardly attempt to convert `value` to a `cls` instance.
 

	
 
        If `value` is None or already a `cls` instance, return it. If `value`
 
        is a number (or looks like one if you squint just right), assume it's
 
        a database primary key and let SQLAlchemy sort things out. Otherwise,
 
        fall back to resolving it using `callback` (if specified); this could
 
        e.g. be a function that looks up instances by name (though that won't
 
        work if the name begins with a digit). Otherwise, raise Exception.
 
        """
 

	
 
        if value is None:
 
            return None
 
        if isinstance(value, cls):
 
            return value
 
        if isinstance(value, int):
 
            return cls.get(value)
 
        if isinstance(value, str) and value.isdigit():
 
            return cls.get(int(value))
 
        if callback is not None:
 
            return callback(value)
 

	
 
        raise Exception(
 
            'given object must be int, long or Instance of %s '
 
            'got %s, no callback provided' % (cls, type(value))
 
        )
 

	
 
    @classmethod
 
    def get_or_404(cls, id_):
 
        try:
 
            id_ = int(id_)
 
        except (TypeError, ValueError):
 
            raise HTTPNotFound
 

	
 
        res = cls.query().get(id_)
 
        if res is None:
 
            raise HTTPNotFound
 
        return res
 

	
 
    @classmethod
 
    def delete(cls, id_):
 
        obj = cls.query().get(id_)
 
        Session().delete(obj)
 

	
 
    def __repr__(self):
 
        return '<DB:%s>' % (self.__class__.__name__)
 

	
 

	
 
_table_args_default_dict = {'extend_existing': True,
 
                            'mysql_engine': 'InnoDB',
 
                            'mysql_charset': 'utf8mb4',
 
                            'sqlite_autoincrement': True,
 
                           }
 

	
 
class Setting(Base, BaseDbModel):
 
    __tablename__ = 'settings'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    SETTINGS_TYPES = {
 
        'str': safe_bytes,
 
        'int': safe_int,
 
        'unicode': safe_str,
 
        'bool': asbool,
 
        'list': functools.partial(aslist, sep=',')
 
    }
 
    DEFAULT_UPDATE_URL = ''
 

	
 
    app_settings_id = Column(Integer(), primary_key=True)
 
    app_settings_name = Column(String(255), nullable=False, unique=True)
 
    _app_settings_value = Column("app_settings_value", Unicode(4096), nullable=False)
 
    _app_settings_type = Column("app_settings_type", String(255), nullable=True) # FIXME: not nullable?
 

	
 
    def __init__(self, key='', val='', type='unicode'):
 
        self.app_settings_name = key
 
        self.app_settings_value = val
 
        self.app_settings_type = type
 

	
 
    @validates('_app_settings_value')
 
    def validate_settings_value(self, key, val):
 
        assert isinstance(val, str)
 
        return val
 

	
 
    @hybrid_property
 
    def app_settings_value(self):
 
        v = self._app_settings_value
 
        _type = self.app_settings_type
 
        converter = self.SETTINGS_TYPES.get(_type) or self.SETTINGS_TYPES['unicode']
 
        return converter(v)
 

	
 
    @app_settings_value.setter
 
    def app_settings_value(self, val):
 
        """
 
        Setter that will always make sure we use str in app_settings_value
 
        """
 
        self._app_settings_value = safe_str(val)
 

	
 
    @hybrid_property
 
    def app_settings_type(self):
 
        return self._app_settings_type
 

	
 
    @app_settings_type.setter
 
    def app_settings_type(self, val):
 
        if val not in self.SETTINGS_TYPES:
 
            raise Exception('type must be one of %s got %s'
 
                            % (list(self.SETTINGS_TYPES), val))
 
        self._app_settings_type = val
 

	
 
    def __repr__(self):
 
        return "<%s %s.%s=%r>" % (
 
            self.__class__.__name__,
 
            self.app_settings_name, self.app_settings_type, self.app_settings_value
 
        )
 

	
 
    @classmethod
 
    def get_by_name(cls, key):
 
        return cls.query() \
 
            .filter(cls.app_settings_name == key).scalar()
 

	
 
    @classmethod
 
    def get_by_name_or_create(cls, key, val='', type='unicode'):
 
        res = cls.get_by_name(key)
 
        if res is None:
 
            res = cls(key, val, type)
 
        return res
 

	
 
    @classmethod
 
    def create_or_update(cls, key, val=Optional(''), type=Optional('unicode')):
 
        """
 
        Creates or updates Kallithea setting. If updates are triggered, it will only
 
        update parameters that are explicitly set. Optional instance will be skipped.
 

	
 
        :param key:
 
        :param val:
 
        :param type:
 
        :return:
 
        """
 
        res = cls.get_by_name(key)
 
        if res is None:
 
            val = Optional.extract(val)
 
            type = Optional.extract(type)
 
            res = cls(key, val, type)
 
            Session().add(res)
 
        else:
 
            res.app_settings_name = key
 
            if not isinstance(val, Optional):
 
                # update if set
 
                res.app_settings_value = val
 
            if not isinstance(type, Optional):
 
                # update if set
 
                res.app_settings_type = type
 
        return res
 

	
 
    @classmethod
 
    def get_app_settings(cls):
 

	
 
        ret = cls.query()
 
        if ret is None:
 
            raise Exception('Could not get application settings !')
 
        settings = {}
 
        for each in ret:
 
            settings[each.app_settings_name] = \
 
                each.app_settings_value
 

	
 
        return settings
 

	
 
    @classmethod
 
    def get_auth_settings(cls):
 
        ret = cls.query() \
 
                .filter(cls.app_settings_name.startswith('auth_')).all()
 
        fd = {}
 
        for row in ret:
 
            fd[row.app_settings_name] = row.app_settings_value
 
        return fd
 

	
 
    @classmethod
 
    def get_default_repo_settings(cls, strip_prefix=False):
 
        ret = cls.query() \
 
                .filter(cls.app_settings_name.startswith('default_')).all()
 
        fd = {}
 
        for row in ret:
 
            key = row.app_settings_name
 
            if strip_prefix:
 
                key = remove_prefix(key, prefix='default_')
 
            fd.update({key: row.app_settings_value})
 

	
 
        return fd
 

	
 
    @classmethod
 
    def get_server_info(cls):
 
        import pkg_resources
 
        import platform
 
        from kallithea.lib.utils import check_git_version
 
        mods = [(p.project_name, p.version) for p in pkg_resources.working_set]
 
        info = {
 
            'modules': sorted(mods, key=lambda k: k[0].lower()),
 
            'py_version': platform.python_version(),
 
            'platform': platform.platform(),
 
            'kallithea_version': kallithea.__version__,
 
            'git_version': str(check_git_version()),
 
            'git_path': kallithea.CONFIG.get('git_path')
 
        }
 
        return info
 

	
 

	
 
class Ui(Base, BaseDbModel):
 
    __tablename__ = 'ui'
 
    __table_args__ = (
 
        Index('ui_ui_section_ui_key_idx', 'ui_section', 'ui_key'),
 
        UniqueConstraint('ui_section', 'ui_key'),
 
        _table_args_default_dict,
 
    )
 

	
 
    HOOK_UPDATE = 'changegroup.update'
 
    HOOK_REPO_SIZE = 'changegroup.repo_size'
 

	
 
    ui_id = Column(Integer(), primary_key=True)
 
    ui_section = Column(String(255), nullable=False)
 
    ui_key = Column(String(255), nullable=False)
 
    ui_value = Column(String(255), nullable=True) # FIXME: not nullable?
 
    ui_active = Column(Boolean(), nullable=False, default=True)
 

	
 
    @classmethod
 
    def get_by_key(cls, section, key):
 
        """ Return specified Ui object, or None if not found. """
 
        return cls.query().filter_by(ui_section=section, ui_key=key).scalar()
 

	
 
    @classmethod
 
    def get_or_create(cls, section, key):
 
        """ Return specified Ui object, creating it if necessary. """
 
        setting = cls.get_by_key(section, key)
 
        if setting is None:
 
            setting = cls(ui_section=section, ui_key=key)
 
            Session().add(setting)
 
        return setting
 

	
 
    @classmethod
 
    def get_builtin_hooks(cls):
 
        q = cls.query()
 
        q = q.filter(cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE]))
 
        q = q.filter(cls.ui_section == 'hooks')
 
        q = q.order_by(cls.ui_section, cls.ui_key)
0 comments (0 inline, 0 general)