# -*- coding: utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
kallithea.model.db
~~~~~~~~~~~~~~~~~~
Database Models for Kallithea
This file was forked by the Kallithea project in July 2014.
Original author and date, and relevant copyright and licensing information is below:
:created_on: Apr 08, 2010
:author: marcink
:copyright: (c) 2013 RhodeCode GmbH, and others.
:license: GPLv3, see LICENSE.md for more details.
import base64
import collections
import datetime
import functools
import hashlib
import logging
import os
import time
import traceback
import ipaddr
import sqlalchemy
from sqlalchemy import Boolean, Column, DateTime, Float, ForeignKey, Index, Integer, LargeBinary, String, Unicode, UnicodeText, UniqueConstraint
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import class_mapper, joinedload, relationship, validates
from tg.i18n import lazy_ugettext as _
from webob.exc import HTTPNotFound
import kallithea
from kallithea.lib import ext_json, ssh
from kallithea.lib.exceptions import DefaultUserException
from kallithea.lib.utils2 import (Optional, asbool, ascii_bytes, aslist, get_changeset_safe, get_clone_url, remove_prefix, safe_bytes, safe_int, safe_str,
urlreadable)
from kallithea.lib.vcs import get_backend
from kallithea.lib.vcs.backends.base import EmptyChangeset
from kallithea.lib.vcs.utils.helpers import get_scm
from kallithea.model.meta import Base, Session
URL_SEP = '/'
log = logging.getLogger(__name__)
#==============================================================================
# BASE CLASSES
class BaseDbModel(object):
Base Model for all classes
@classmethod
def _get_keys(cls):
"""return column names for this model """
# Note: not a normal dict - iterator gives "users.firstname", but keys gives "firstname"
return class_mapper(cls).c.keys()
def get_dict(self):
return dict with keys and values corresponding
to this model data """
d = {}
for k in self._get_keys():
d[k] = getattr(self, k)
# also use __json__() if present to get additional fields
_json_attr = getattr(self, '__json__', None)
if _json_attr:
# update with attributes from __json__
if callable(_json_attr):
_json_attr = _json_attr()
for k, val in _json_attr.items():
d[k] = val
return d
def get_appstruct(self):
"""return list with keys and values tuples corresponding
return [
(k, getattr(self, k))
for k in self._get_keys()
]
def populate_obj(self, populate_dict):
"""populate model with data from given populate_dict"""
if k in populate_dict:
setattr(self, k, populate_dict[k])
def query(cls):
return Session().query(cls)
def get(cls, id_):
if id_:
return cls.query().get(id_)
def guess_instance(cls, value, callback=None):
"""Haphazardly attempt to convert `value` to a `cls` instance.
If `value` is None or already a `cls` instance, return it. If `value`
is a number (or looks like one if you squint just right), assume it's
a database primary key and let SQLAlchemy sort things out. Otherwise,
fall back to resolving it using `callback` (if specified); this could
e.g. be a function that looks up instances by name (though that won't
work if the name begins with a digit). Otherwise, raise Exception.
if value is None:
return None
if isinstance(value, cls):
return value
if isinstance(value, int):
return cls.get(value)
if isinstance(value, str) and value.isdigit():
return cls.get(int(value))
if callback is not None:
return callback(value)
raise Exception(
'given object must be int, long or Instance of %s '
'got %s, no callback provided' % (cls, type(value))
)
def get_or_404(cls, id_):
try:
id_ = int(id_)
except (TypeError, ValueError):
raise HTTPNotFound
res = cls.query().get(id_)
if res is None:
return res
def delete(cls, id_):
obj = cls.query().get(id_)
Session().delete(obj)
def __repr__(self):
return '<DB:%s>' % (self.__class__.__name__)
_table_args_default_dict = {'extend_existing': True,
'mysql_engine': 'InnoDB',
'sqlite_autoincrement': True,
}
class Setting(Base, BaseDbModel):
__tablename__ = 'settings'
__table_args__ = (
_table_args_default_dict,
SETTINGS_TYPES = {
'str': safe_bytes,
'int': safe_int,
'unicode': safe_str,
'bool': asbool,
'list': functools.partial(aslist, sep=',')
DEFAULT_UPDATE_URL = ''
app_settings_id = Column(Integer(), primary_key=True)
app_settings_name = Column(String(255), nullable=False, unique=True)
_app_settings_value = Column("app_settings_value", Unicode(4096), nullable=False)
_app_settings_type = Column("app_settings_type", String(255), nullable=True) # FIXME: not nullable?
def __init__(self, key='', val='', type='unicode'):
self.app_settings_name = key
self.app_settings_value = val
self.app_settings_type = type
@validates('_app_settings_value')
def validate_settings_value(self, key, val):
assert isinstance(val, str)
return val
@hybrid_property
def app_settings_value(self):
v = self._app_settings_value
_type = self.app_settings_type
converter = self.SETTINGS_TYPES.get(_type) or self.SETTINGS_TYPES['unicode']
return converter(v)
@app_settings_value.setter
def app_settings_value(self, val):
Setter that will always make sure we use str in app_settings_value
self._app_settings_value = safe_str(val)
def app_settings_type(self):
return self._app_settings_type
@app_settings_type.setter
def app_settings_type(self, val):
if val not in self.SETTINGS_TYPES:
raise Exception('type must be one of %s got %s'
% (list(self.SETTINGS_TYPES), val))
self._app_settings_type = val
return "<%s %s.%s=%r>" % (
self.__class__.__name__,
self.app_settings_name, self.app_settings_type, self.app_settings_value
def get_by_name(cls, key):
return cls.query() \
.filter(cls.app_settings_name == key).scalar()
def get_by_name_or_create(cls, key, val='', type='unicode'):
res = cls.get_by_name(key)
res = cls(key, val, type)
def create_or_update(cls, key, val=Optional(''), type=Optional('unicode')):
Creates or updates Kallithea setting. If updates are triggered, it will only
update parameters that are explicitly set. Optional instance will be skipped.
:param key:
:param val:
:param type:
:return:
val = Optional.extract(val)
type = Optional.extract(type)
Session().add(res)
else:
res.app_settings_name = key
if not isinstance(val, Optional):
# update if set
res.app_settings_value = val
if not isinstance(type, Optional):
res.app_settings_type = type
def get_app_settings(cls):
ret = cls.query()
if ret is None:
raise Exception('Could not get application settings !')
settings = {}
for each in ret:
settings[each.app_settings_name] = \
each.app_settings_value
return settings
def get_auth_settings(cls):
ret = cls.query() \
.filter(cls.app_settings_name.startswith('auth_')).all()
fd = {}
for row in ret:
fd[row.app_settings_name] = row.app_settings_value
return fd
def get_default_repo_settings(cls, strip_prefix=False):
.filter(cls.app_settings_name.startswith('default_')).all()
key = row.app_settings_name
if strip_prefix:
key = remove_prefix(key, prefix='default_')
fd.update({key: row.app_settings_value})
def get_server_info(cls):
import platform
import pkg_resources
from kallithea.lib.utils import check_git_version
mods = [(p.project_name, p.version) for p in pkg_resources.working_set]
info = {
'modules': sorted(mods, key=lambda k: k[0].lower()),
'py_version': platform.python_version(),
'platform': platform.platform(),
'kallithea_version': kallithea.__version__,
'git_version': str(check_git_version()),
'git_path': kallithea.CONFIG.get('git_path')
return info
class Ui(Base, BaseDbModel):
__tablename__ = 'ui'
Index('ui_ui_section_ui_key_idx', 'ui_section', 'ui_key'),
UniqueConstraint('ui_section', 'ui_key'),
HOOK_UPDATE = 'changegroup.update'
HOOK_REPO_SIZE = 'changegroup.repo_size'
ui_id = Column(Integer(), primary_key=True)
ui_section = Column(String(255), nullable=False)
ui_key = Column(String(255), nullable=False)
ui_value = Column(String(255), nullable=True) # FIXME: not nullable?
ui_active = Column(Boolean(), nullable=False, default=True)
def get_by_key(cls, section, key):
""" Return specified Ui object, or None if not found. """
return cls.query().filter_by(ui_section=section, ui_key=key).scalar()
def get_or_create(cls, section, key):
""" Return specified Ui object, creating it if necessary. """
setting = cls.get_by_key(section, key)
if setting is None:
setting = cls(ui_section=section, ui_key=key)
Session().add(setting)
return setting
def get_builtin_hooks(cls):
q = cls.query()
q = q.filter(cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE]))
q = q.filter(cls.ui_section == 'hooks')
q = q.order_by(cls.ui_section, cls.ui_key)
return q.all()
def get_custom_hooks(cls):
q = q.filter(~cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE]))
def get_repos_location(cls):
return cls.get_by_key('paths', '/').ui_value
def create_or_update_hook(cls, key, val):
new_ui = cls.get_or_create('hooks', key)
new_ui.ui_active = True
new_ui.ui_value = val
return '<%s %s.%s=%r>' % (
self.ui_section, self.ui_key, self.ui_value)
class User(Base, BaseDbModel):
__tablename__ = 'users'
Index('u_username_idx', 'username'),
Index('u_email_idx', 'email'),
DEFAULT_USER_NAME = 'default'
DEFAULT_GRAVATAR_URL = 'https://secure.gravatar.com/avatar/{md5email}?d=identicon&s={size}'
# The name of the default auth type in extern_type, 'internal' lives in auth_internal.py
DEFAULT_AUTH_TYPE = 'internal'
user_id = Column(Integer(), primary_key=True)
username = Column(String(255), nullable=False, unique=True)
password = Column(String(255), nullable=False)
active = Column(Boolean(), nullable=False, default=True)
admin = Column(Boolean(), nullable=False, default=False)
name = Column("firstname", Unicode(255), nullable=False)
lastname = Column(Unicode(255), nullable=False)
_email = Column("email", String(255), nullable=True, unique=True) # FIXME: not nullable?
last_login = Column(DateTime(timezone=False), nullable=True)
extern_type = Column(String(255), nullable=True) # FIXME: not nullable?
extern_name = Column(String(255), nullable=True) # FIXME: not nullable?
api_key = Column(String(255), nullable=False)
created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
_user_data = Column("user_data", LargeBinary(), nullable=True) # JSON data # FIXME: not nullable?
user_log = relationship('UserLog')
user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
repositories = relationship('Repository')
repo_groups = relationship('RepoGroup')
user_groups = relationship('UserGroup')
user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
followings = relationship('UserFollowing', primaryjoin='UserFollowing.user_id==User.user_id', cascade='all')
repo_to_perm = relationship('UserRepoToPerm', primaryjoin='UserRepoToPerm.user_id==User.user_id', cascade='all')
repo_group_to_perm = relationship('UserRepoGroupToPerm', primaryjoin='UserRepoGroupToPerm.user_id==User.user_id', cascade='all')
group_member = relationship('UserGroupMember', cascade='all')
# comments created by this user
user_comments = relationship('ChangesetComment', cascade='all')
# extra emails for this user
user_emails = relationship('UserEmailMap', cascade='all')
# extra API keys
user_api_keys = relationship('UserApiKeys', cascade='all')
ssh_keys = relationship('UserSshKeys', cascade='all')
def email(self):
return self._email
@email.setter
def email(self, val):
self._email = val.lower() if val else None
@property
def firstname(self):
# alias for future
return self.name
def emails(self):
other = UserEmailMap.query().filter(UserEmailMap.user == self).all()
return [self.email] + [x.email for x in other]
def api_keys(self):
other = UserApiKeys.query().filter(UserApiKeys.user == self).all()
return [self.api_key] + [x.api_key for x in other]
def ip_addresses(self):
ret = UserIpMap.query().filter(UserIpMap.user == self).all()
return [x.ip_addr for x in ret]
def full_name(self):
return '%s %s' % (self.firstname, self.lastname)
def full_name_or_username(self):
Show full name.
If full name is not set, fall back to username.
return ('%s %s' % (self.firstname, self.lastname)
if (self.firstname and self.lastname) else self.username)
def full_name_and_username(self):
Show full name and username as 'Firstname Lastname (username)'.
return ('%s %s (%s)' % (self.firstname, self.lastname, self.username)
def full_contact(self):
return '%s %s <%s>' % (self.firstname, self.lastname, self.email)
def short_contact(self):
def is_admin(self):
return self.admin
def is_default_user(self):
return self.username == User.DEFAULT_USER_NAME
def user_data(self):
if not self._user_data:
return {}
return ext_json.loads(self._user_data)
except TypeError:
@user_data.setter
def user_data(self, val):
self._user_data = ascii_bytes(ext_json.dumps(val))
except Exception:
log.error(traceback.format_exc())
return "<%s %s: %r>" % (self.__class__.__name__, self.user_id, self.username)
def guess_instance(cls, value):
return super(User, cls).guess_instance(value, User.get_by_username)
def get_or_404(cls, id_, allow_default=True):
'''
Overridden version of BaseDbModel.get_or_404, with an extra check on
the default user.
user = super(User, cls).get_or_404(id_)
if not allow_default and user.is_default_user:
raise DefaultUserException()
return user
def get_by_username_or_email(cls, username_or_email, case_insensitive=True):
For anything that looks like an email address, look up by the email address (matching
case insensitively).
For anything else, try to look up by the user name.
This assumes no normal username can have '@' symbol.
if '@' in username_or_email:
return User.get_by_email(username_or_email)
return User.get_by_username(username_or_email, case_insensitive=case_insensitive)
def get_by_username(cls, username, case_insensitive=False):
if case_insensitive:
q = cls.query().filter(sqlalchemy.func.lower(cls.username) == sqlalchemy.func.lower(username))
q = cls.query().filter(cls.username == username)
return q.scalar()
def get_by_api_key(cls, api_key, fallback=True):
if len(api_key) != 40 or not api_key.isalnum():
q = cls.query().filter(cls.api_key == api_key)
res = q.scalar()
Status change: