@@ -75,268 +75,271 @@ class RhodeCodeSettings(Base, BaseModel)
app_settings_id = Column("app_settings_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
app_settings_name = Column("app_settings_name", String(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
app_settings_value = Column("app_settings_value", String(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
def __init__(self, k='', v=''):
self.app_settings_name = k
self.app_settings_value = v
def __repr__(self):
return "<%s('%s:%s')>" % (self.__class__.__name__,
self.app_settings_name, self.app_settings_value)
class RhodeCodeUi(Base, BaseModel):
__tablename__ = 'rhodecode_ui'
__table_args__ = {'useexisting':True}
ui_id = Column("ui_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
ui_section = Column("ui_section", String(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_key = Column("ui_key", String(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_value = Column("ui_value", String(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_active = Column("ui_active", Boolean(), nullable=True, unique=None, default=True)
class User(Base, BaseModel):
__tablename__ = 'users'
__table_args__ = (UniqueConstraint('username'), UniqueConstraint('email'), {'useexisting':True})
user_id = Column("user_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
username = Column("username", String(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
password = Column("password", String(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
active = Column("active", Boolean(), nullable=True, unique=None, default=None)
admin = Column("admin", Boolean(), nullable=True, unique=None, default=False)
name = Column("name", String(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
lastname = Column("lastname", String(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
email = Column("email", String(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
last_login = Column("last_login", DateTime(timezone=False), nullable=True, unique=None, default=None)
ldap_dn = Column("ldap_dn", String(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
user_log = relationship('UserLog', cascade='all')
user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
repositories = relationship('Repository')
user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
@property
def full_contact(self):
return '%s %s <%s>' % (self.name, self.lastname, self.email)
def is_admin(self):
return self.admin
return "<%s('id:%s:%s')>" % (self.__class__.__name__,
self.user_id, self.username)
def update_lastlogin(self):
"""Update user lastlogin"""
try:
session = Session.object_session(self)
self.last_login = datetime.datetime.now()
session.add(self)
session.commit()
log.debug('updated user %s lastlogin', self.username)
except (DatabaseError,):
session.rollback()
class UserLog(Base, BaseModel):
__tablename__ = 'user_logs'
user_log_id = Column("user_log_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=None, default=None)
repository_id = Column("repository_id", Integer(length=None, convert_unicode=False, assert_unicode=None), ForeignKey('repositories.repo_id'), nullable=False, unique=None, default=None)
repository_name = Column("repository_name", String(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
user_ip = Column("user_ip", String(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
action = Column("action", String(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
action_date = Column("action_date", DateTime(timezone=False), nullable=True, unique=None, default=None)
def action_as_day(self):
return date(*self.action_date.timetuple()[:3])
user = relationship('User')
repository = relationship('Repository')
class UsersGroup(Base, BaseModel):
__tablename__ = 'users_groups'
users_group_id = Column("users_group_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
users_group_name = Column("users_group_name", String(length=None, convert_unicode=False, assert_unicode=None), nullable=False, unique=True, default=None)
users_group_active = Column("users_group_active", Boolean(), nullable=True, unique=None, default=None)
members = relationship('UsersGroupMember', cascade="all, delete, delete-orphan", lazy="joined")
class UsersGroupMember(Base, BaseModel):
__tablename__ = 'users_groups_members'
users_group_member_id = Column("users_group_member_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
users_group_id = Column("users_group_id", Integer(), ForeignKey('users_groups.users_group_id'), nullable=False, unique=None, default=None)
user = relationship('User', lazy='joined')
users_group = relationship('UsersGroup')
def __init__(self, gr_id, u_id):
self.users_group_id = gr_id
self.user_id = u_id
class Repository(Base, BaseModel):
__tablename__ = 'repositories'
__table_args__ = (UniqueConstraint('repo_name'), {'useexisting':True},)
repo_id = Column("repo_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
repo_name = Column("repo_name", String(length=None, convert_unicode=False, assert_unicode=None), nullable=False, unique=True, default=None)
repo_type = Column("repo_type", String(length=None, convert_unicode=False, assert_unicode=None), nullable=False, unique=False, default='hg')
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=False, default=None)
private = Column("private", Boolean(), nullable=True, unique=None, default=None)
enable_statistics = Column("statistics", Boolean(), nullable=True, unique=None, default=True)
enable_downloads = Column("downloads", Boolean(), nullable=True, unique=None, default=True)
description = Column("description", String(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
fork_id = Column("fork_id", Integer(), ForeignKey('repositories.repo_id'), nullable=True, unique=False, default=None)
group_id = Column("group_id", Integer(), ForeignKey('groups.group_id'), nullable=True, unique=False, default=None)
fork = relationship('Repository', remote_side=repo_id)
group = relationship('Group')
repo_to_perm = relationship('RepoToPerm', cascade='all')
stats = relationship('Statistics', cascade='all', uselist=False)
repo_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_repo_id==Repository.repo_id', cascade='all')
logs = relationship('UserLog', cascade='all')
self.repo_id, self.repo_name)
class Group(Base, BaseModel):
__tablename__ = 'groups'
__table_args__ = (UniqueConstraint('group_name'), {'useexisting':True},)
group_id = Column("group_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
group_name = Column("group_name", String(length=None, convert_unicode=False, assert_unicode=None), nullable=False, unique=True, default=None)
group_parent_id = Column("group_parent_id", Integer(), ForeignKey('groups.group_id'), nullable=True, unique=None, default=None)
parent_group = relationship('Group', remote_side=group_id)
def __init__(self, group_name='', parent_group=None):
self.group_name = group_name
self.parent_group = parent_group
return "<%s('%s:%s')>" % (self.__class__.__name__, self.group_id,
self.group_name)
class Permission(Base, BaseModel):
__tablename__ = 'permissions'
permission_id = Column("permission_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
permission_name = Column("permission_name", String(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
permission_longname = Column("permission_longname", String(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
self.permission_id, self.permission_name)
class RepoToPerm(Base, BaseModel):
__tablename__ = 'repo_to_perm'
__table_args__ = (UniqueConstraint('user_id', 'repository_id'), {'useexisting':True})
repo_to_perm_id = Column("repo_to_perm_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
permission_id = Column("permission_id", Integer(), ForeignKey('permissions.permission_id'), nullable=False, unique=None, default=None)
repository_id = Column("repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable=False, unique=None, default=None)
permission = relationship('Permission')
class UserToPerm(Base, BaseModel):
__tablename__ = 'user_to_perm'
__table_args__ = (UniqueConstraint('user_id', 'permission_id'), {'useexisting':True})
user_to_perm_id = Column("user_to_perm_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
class UsersGroupToPerm(Base, BaseModel):
__tablename__ = 'users_group_to_perm'
__table_args__ = (UniqueConstraint('users_group_id', 'permission_id'), {'useexisting':True})
users_group_to_perm_id = Column("users_group_to_perm_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
class GroupToPerm(Base, BaseModel):
__tablename__ = 'group_to_perm'
__table_args__ = (UniqueConstraint('group_id', 'permission_id'), {'useexisting':True})
group_to_perm_id = Column("group_to_perm_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
group_id = Column("group_id", Integer(), ForeignKey('groups.group_id'), nullable=False, unique=None, default=None)
class Statistics(Base, BaseModel):
__tablename__ = 'statistics'
__table_args__ = (UniqueConstraint('repository_id'), {'useexisting':True})
stat_id = Column("stat_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
repository_id = Column("repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable=False, unique=True, default=None)
stat_on_revision = Column("stat_on_revision", Integer(), nullable=False)
commit_activity = Column("commit_activity", LargeBinary(), nullable=False)#JSON data
commit_activity_combined = Column("commit_activity_combined", LargeBinary(), nullable=False)#JSON data
languages = Column("languages", LargeBinary(), nullable=False)#JSON data
repository = relationship('Repository', single_parent=True)
class UserFollowing(Base, BaseModel):
__tablename__ = 'user_followings'
__table_args__ = (UniqueConstraint('user_id', 'follows_repository_id'),
UniqueConstraint('user_id', 'follows_user_id')
, {'useexisting':True})
user_following_id = Column("user_following_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
follows_repo_id = Column("follows_repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable=True, unique=None, default=None)
follows_user_id = Column("follows_user_id", Integer(), ForeignKey('users.user_id'), nullable=True, unique=None, default=None)
user = relationship('User', primaryjoin='User.user_id==UserFollowing.user_id')
follows_user = relationship('User', primaryjoin='User.user_id==UserFollowing.follows_user_id')
follows_repository = relationship('Repository', order_by='Repository.repo_name')
class CacheInvalidation(Base, BaseModel):
__tablename__ = 'cache_invalidation'
__table_args__ = (UniqueConstraint('cache_key'), {'useexisting':True})
cache_id = Column("cache_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
cache_key = Column("cache_key", String(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
cache_args = Column("cache_args", String(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
cache_active = Column("cache_active", Boolean(), nullable=True, unique=None, default=False)
def __init__(self, cache_key, cache_args=''):
self.cache_key = cache_key
self.cache_args = cache_args
self.cache_active = False
self.cache_id, self.cache_key)
class DbMigrateVersion(Base, BaseModel):
__tablename__ = 'db_migrate_version'
repository_id = Column('repository_id', String(250), primary_key=True)
repository_path = Column('repository_path', Text)
version = Column('version', Integer)
@@ -65,482 +65,485 @@ def ValidUsername(edit, old_data):
def validate_python(self, value, state):
if value in ['default', 'new_user']:
raise formencode.Invalid(_('Invalid username'), value, state)
#check if user is unique
old_un = None
if edit:
old_un = UserModel().get(old_data.get('user_id')).username
if old_un != value or not edit:
if UserModel().get_by_username(value, cache=False,
case_insensitive=True):
raise formencode.Invalid(_('This username already exists') ,
value, state)
if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+$', value) is None:
raise formencode.Invalid(_('Username may only contain '
'alphanumeric characters underscores, '
'periods or dashes and must begin with '
'alphanumeric character'),
return _ValidUsername
def ValidUsersGroup(edit, old_data):
class _ValidUsersGroup(formencode.validators.FancyValidator):
if value in ['default']:
raise formencode.Invalid(_('Invalid group name'), value, state)
#check if group is unique
old_ugname = None
old_ugname = UsersGroupModel()\
.get(old_data.get('users_group_id')).users_group_name
if old_ugname != value or not edit:
if UsersGroupModel().get_by_groupname(value, cache=False,
raise formencode.Invalid(_('This users group already exists') ,
raise formencode.Invalid(_('Group name may only contain '
return _ValidUsersGroup
class ValidPassword(formencode.validators.FancyValidator):
def to_python(self, value, state):
if value:
if value.get('password'):
value['password'] = get_crypt_password(value['password'])
except UnicodeEncodeError:
e_dict = {'password':_('Invalid characters in password')}
raise formencode.Invalid('', value, state, error_dict=e_dict)
if value.get('password_confirmation'):
value['password_confirmation'] = \
get_crypt_password(value['password_confirmation'])
e_dict = {'password_confirmation':_('Invalid characters in password')}
if value.get('new_password'):
value['new_password'] = \
get_crypt_password(value['new_password'])
e_dict = {'new_password':_('Invalid characters in password')}
return value
class ValidPasswordsMatch(formencode.validators.FancyValidator):
if value['password'] != value['password_confirmation']:
e_dict = {'password_confirmation':
_('Password do not match')}
class ValidAuth(formencode.validators.FancyValidator):
messages = {
'invalid_password':_('invalid password'),
'invalid_login':_('invalid user name'),
'disabled_account':_('Your account is disabled')
}
#error mapping
e_dict = {'username':messages['invalid_login'],
'password':messages['invalid_password']}
e_dict_disable = {'username':messages['disabled_account']}
password = value['password']
username = value['username']
user = UserModel().get_by_username(username)
if authenticate(username, password):
else:
if user and user.active is False:
log.warning('user %s is disabled', username)
raise formencode.Invalid(self.message('disabled_account',
state=State_obj),
value, state,
error_dict=self.e_dict_disable)
log.warning('user %s not authenticated', username)
raise formencode.Invalid(self.message('invalid_password',
state=State_obj), value, state,
error_dict=self.e_dict)
class ValidRepoUser(formencode.validators.FancyValidator):
sa = meta.Session()
self.user_db = sa.query(User)\
.filter(User.active == True)\
.filter(User.username == value).one()
except Exception:
raise formencode.Invalid(_('This username is not valid'),
finally:
meta.Session.remove()
return self.user_db.user_id
def ValidRepoName(edit, old_data):
class _ValidRepoName(formencode.validators.FancyValidator):
slug = h.repo_name_slug(value)
if slug in ['_admin']:
raise formencode.Invalid(_('This repository name is disallowed'),
if old_data.get('repo_name') != value or not edit:
if RepoModel().get_by_repo_name(slug, cache=False):
raise formencode.Invalid(_('This repository already exists') ,
return slug
return _ValidRepoName
def ValidForkType(old_data):
class _ValidForkType(formencode.validators.FancyValidator):
if old_data['repo_type'] != value:
raise formencode.Invalid(_('Fork have to be the same type as original'),
return _ValidForkType
class ValidPerms(formencode.validators.FancyValidator):
messages = {'perm_new_member_name':_('This username or users group name'
' is not valid')}
perms_update = []
perms_new = []
#build a list of permission to update and new permission to create
for k, v in value.items():
if k.startswith('perm_'):
if k.startswith('perm_new_member'):
#means new added member to permissions
new_perm = value.get('perm_new_member', False)
new_member = value.get('perm_new_member_name', False)
new_type = value.get('perm_new_member_type')
if new_member and new_perm:
if (new_member, new_perm) not in perms_new:
if (new_member, new_perm, new_type) not in perms_new:
perms_new.append((new_member, new_perm, new_type))
usr = k[5:]
t = 'user'
if usr == 'default':
if value['private']:
#set none for default when updating to private repo
v = 'repository.none'
perms_update.append((usr, v, t))
value['perms_updates'] = perms_update
value['perms_new'] = perms_new
#update permissions
sa = meta.Session
for k, v, t in perms_new:
if t is 'user':
.filter(User.username == k).one()
if t is 'users_group':
self.user_db = sa.query(UsersGroup)\
.filter(UsersGroup.users_group_active == True)\
.filter(UsersGroup.users_group_name == k).one()
msg = self.message('perm_new_member_name',
state=State_obj)
raise formencode.Invalid(msg, value, state,
error_dict={'perm_new_member_name':msg})
class ValidSettings(formencode.validators.FancyValidator):
#settings form can't edit user
if value.has_key('user'):
del['value']['user']
class ValidPath(formencode.validators.FancyValidator):
if not os.path.isdir(value):
msg = _('This is not a valid path')
error_dict={'paths_root_path':msg})
def UniqSystemEmail(old_data):
class _UniqSystemEmail(formencode.validators.FancyValidator):
value = value.lower()
if old_data.get('email') != value:
user = sa.query(User).filter(User.email == value).scalar()
if user:
raise formencode.Invalid(_("This e-mail address is already taken") ,
return _UniqSystemEmail
class ValidSystemEmail(formencode.validators.FancyValidator):
if user is None:
raise formencode.Invalid(_("This e-mail address doesn't exist.") ,
class LdapLibValidator(formencode.validators.FancyValidator):
import ldap
except ImportError:
raise LdapImportError
class AttrLoginValidator(formencode.validators.FancyValidator):
if not value or not isinstance(value, (str, unicode)):
raise formencode.Invalid(_("The LDAP Login attribute of the CN must be specified "
"- this is the name of the attribute that is equivalent to 'username'"),
#===============================================================================
# FORMS
class LoginForm(formencode.Schema):
allow_extra_fields = True
filter_extra_fields = True
username = UnicodeString(
strip=True,
min=1,
not_empty=True,
messages={
'empty':_('Please enter a login'),
'tooShort':_('Enter a value %(min)i characters long or more')}
)
password = UnicodeString(
min=6,
'empty':_('Please enter a password'),
'tooShort':_('Enter %(min)i characters or more')}
#chained validators have access to all data
chained_validators = [ValidAuth]
def UserForm(edit=False, old_data={}):
class _UserForm(formencode.Schema):
username = All(UnicodeString(strip=True, min=1, not_empty=True),
ValidUsername(edit, old_data))
new_password = All(UnicodeString(strip=True, min=6, not_empty=False))
admin = StringBoolean(if_missing=False)
password = All(UnicodeString(strip=True, min=6, not_empty=True))
active = StringBoolean(if_missing=False)
name = UnicodeString(strip=True, min=1, not_empty=True)
lastname = UnicodeString(strip=True, min=1, not_empty=True)
email = All(Email(not_empty=True), UniqSystemEmail(old_data))
chained_validators = [ValidPassword]
return _UserForm
def UsersGroupForm(edit=False, old_data={}, available_members=[]):
class _UsersGroupForm(formencode.Schema):
users_group_name = All(UnicodeString(strip=True, min=1, not_empty=True),
ValidUsersGroup(edit, old_data))
users_group_active = StringBoolean(if_missing=False)
users_group_members = OneOf(available_members, hideList=False,
testValueList=True,
if_missing=None, not_empty=False)
return _UsersGroupForm
def RegisterForm(edit=False, old_data={}):
class _RegisterForm(formencode.Schema):
username = All(ValidUsername(edit, old_data),
UnicodeString(strip=True, min=1, not_empty=True))
password_confirmation = All(UnicodeString(strip=True, min=6, not_empty=True))
chained_validators = [ValidPasswordsMatch, ValidPassword]
return _RegisterForm
def PasswordResetForm():
class _PasswordResetForm(formencode.Schema):
email = All(ValidSystemEmail(), Email(not_empty=True))
return _PasswordResetForm
def RepoForm(edit=False, old_data={}, supported_backends=BACKENDS.keys()):
class _RepoForm(formencode.Schema):
filter_extra_fields = False
repo_name = All(UnicodeString(strip=True, min=1, not_empty=True),
ValidRepoName(edit, old_data))
description = UnicodeString(strip=True, min=1, not_empty=True)
private = StringBoolean(if_missing=False)
enable_statistics = StringBoolean(if_missing=False)
enable_downloads = StringBoolean(if_missing=False)
repo_type = OneOf(supported_backends)
#this is repo owner
user = All(Int(not_empty=True), ValidRepoUser)
chained_validators = [ValidPerms]
return _RepoForm
def RepoForkForm(edit=False, old_data={}, supported_backends=BACKENDS.keys()):
class _RepoForkForm(formencode.Schema):
fork_name = All(UnicodeString(strip=True, min=1, not_empty=True),
repo_type = All(ValidForkType(old_data), OneOf(supported_backends))
return _RepoForkForm
def RepoSettingsForm(edit=False, old_data={}):
chained_validators = [ValidPerms, ValidSettings]
def ApplicationSettingsForm():
class _ApplicationSettingsForm(formencode.Schema):
rhodecode_title = UnicodeString(strip=True, min=1, not_empty=True)
rhodecode_realm = UnicodeString(strip=True, min=1, not_empty=True)
rhodecode_ga_code = UnicodeString(strip=True, min=1, not_empty=False)
return _ApplicationSettingsForm
def ApplicationUiSettingsForm():
class _ApplicationUiSettingsForm(formencode.Schema):
web_push_ssl = OneOf(['true', 'false'], if_missing='false')
paths_root_path = All(ValidPath(), UnicodeString(strip=True, min=1, not_empty=True))
hooks_changegroup_update = OneOf(['True', 'False'], if_missing=False)
hooks_changegroup_repo_size = OneOf(['True', 'False'], if_missing=False)
hooks_pretxnchangegroup_push_logger = OneOf(['True', 'False'], if_missing=False)
hooks_preoutgoing_pull_logger = OneOf(['True', 'False'], if_missing=False)
return _ApplicationUiSettingsForm
def DefaultPermissionsForm(perms_choices, register_choices, create_choices):
class _DefaultPermissionsForm(formencode.Schema):
overwrite_default = StringBoolean(if_missing=False)
anonymous = OneOf(['True', 'False'], if_missing=False)
default_perm = OneOf(perms_choices)
default_register = OneOf(register_choices)
default_create = OneOf(create_choices)
return _DefaultPermissionsForm
def LdapSettingsForm(tls_reqcert_choices, search_scope_choices):
class _LdapSettingsForm(formencode.Schema):
pre_validators = [LdapLibValidator]
ldap_active = StringBoolean(if_missing=False)
ldap_host = UnicodeString(strip=True,)
ldap_port = Number(strip=True,)
ldap_ldaps = StringBoolean(if_missing=False)
ldap_tls_reqcert = OneOf(tls_reqcert_choices)
ldap_dn_user = UnicodeString(strip=True,)
ldap_dn_pass = UnicodeString(strip=True,)
ldap_base_dn = UnicodeString(strip=True,)
ldap_filter = UnicodeString(strip=True,)
ldap_search_scope = OneOf(search_scope_choices)
ldap_attr_login = All(AttrLoginValidator, UnicodeString(strip=True,))
ldap_attr_firstname = UnicodeString(strip=True,)
ldap_attr_lastname = UnicodeString(strip=True,)
ldap_attr_email = UnicodeString(strip=True,)
return _LdapSettingsForm
# -*- coding: utf-8 -*-
"""
rhodecode.model.repo
~~~~~~~~~~~~~~~~~~~~
Repository model for rhodecode
:created_on: Jun 5, 2010
:author: marcink
:copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
:license: GPLv3, see COPYING for more details.
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; version 2
# of the License or (at your opinion) any later version of the license.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
import os
import shutil
import logging
import traceback
from datetime import datetime
from rhodecode.model import BaseModel
from rhodecode.model.caching_query import FromCache
from rhodecode.model.db import Repository, RepoToPerm, User, Permission, \
Statistics, UsersGroup
Statistics, UsersGroup, UsersGroupToPerm
from rhodecode.model.user import UserModel
from rhodecode.model.users_group import UsersGroupMember, UsersGroupModel
from vcs.backends import get_backend
log = logging.getLogger(__name__)
class RepoModel(BaseModel):
def __init__(self, sa=None):
from pylons import app_globals
self._base_path = app_globals.base_path
except:
self._base_path = None
def base_path():
if self._base_path is None:
raise Exception('Base Path is empty, try set this after'
'class initialization when not having '
'app_globals available')
return self._base_path
super(RepoModel, self).__init__()
def get(self, repo_id, cache=False):
repo = self.sa.query(Repository)\
.filter(Repository.repo_id == repo_id)
if cache:
repo = repo.options(FromCache("sql_cache_short",
"get_repo_%s" % repo_id))
return repo.scalar()
def get_by_repo_name(self, repo_name, cache=False):
.filter(Repository.repo_name == repo_name)
"get_repo_%s" % repo_name))
def get_users_js(self):
users = self.sa.query(User).filter(User.active == True).all()
u_tmpl = '''{id:%s, fname:"%s", lname:"%s", nname:"%s"},'''
users_array = '[%s]' % '\n'.join([u_tmpl % (u.user_id, u.name,
u.lastname, u.username)
for u in users])
return users_array
def get_users_groups_js(self):
users_groups = self.sa.query(UsersGroup)\
.filter(UsersGroup.users_group_active == True).all()
g_tmpl = '''{id:%s, grname:"%s",grmembers:"%s"},'''
users_groups_array = '[%s]' % '\n'.join([g_tmpl % \
(gr.users_group_id, gr.users_group_name,
len(gr.members))
for gr in users_groups])
return users_groups_array
def update(self, repo_name, form_data):
cur_repo = self.get_by_repo_name(repo_name, cache=False)
user_model = UserModel(self.sa)
users_group_model = UsersGroupModel(self.sa)
for username, perm, member_type in form_data['perms_updates']:
r2p = self.sa.query(RepoToPerm)\
.filter(RepoToPerm.user == user_model.get_by_username(username))\
.filter(RepoToPerm.repository == cur_repo)\
.one()
for member, perm, member_type in form_data['perms_updates']:
if member_type == 'user':
.filter(RepoToPerm.user == user_model.get_by_username(member))\
r2p.permission = self.sa.query(Permission)\
.filter(Permission.permission_name == perm)\
.scalar()
self.sa.add(r2p)
g2p = self.sa.query(UsersGroupToPerm)\
.filter(UsersGroupToPerm.users_group == users_group_model.get_by_groupname(member))\
.filter(UsersGroupToPerm.repository == cur_repo)\
g2p.permission = self.sa.query(Permission)\
self.sa.add(g2p)
#set new permissions
for username, perm, member_type in form_data['perms_new']:
r2p = RepoToPerm()
r2p.repository = cur_repo
r2p.user = user_model.get_by_username(username, cache=False)
for member, perm, member_type in form_data['perms_new']:
r2p.user = user_model.get_by_username(member)
g2p = UsersGroupToPerm()
g2p.repository = cur_repo
g2p.users_group = users_group_model.get_by_groupname(member)
#update current repo
for k, v in form_data.items():
if k == 'user':
cur_repo.user = user_model.get(v)
setattr(cur_repo, k, v)
self.sa.add(cur_repo)
if repo_name != form_data['repo_name']:
#rename our data
self.__rename_repo(repo_name, form_data['repo_name'])
self.sa.commit()
log.error(traceback.format_exc())
self.sa.rollback()
raise
def create(self, form_data, cur_user, just_db=False, fork=False):
if fork:
#force str since hg doesn't go with unicode
repo_name = str(form_data['fork_name'])
org_name = str(form_data['repo_name'])
org_name = repo_name = str(form_data['repo_name'])
new_repo = Repository()
new_repo.enable_statistics = True
if k == 'repo_name':
v = repo_name
setattr(new_repo, k, v)
parent_repo = self.sa.query(Repository)\
.filter(Repository.repo_name == org_name).scalar()
new_repo.fork = parent_repo
new_repo.user_id = cur_user.user_id
self.sa.add(new_repo)
#create default permission
repo_to_perm = RepoToPerm()
default = 'repository.read'
for p in UserModel(self.sa).get_by_username('default', cache=False).user_perms:
if p.permission.permission_name.startswith('repository.'):
default = p.permission.permission_name
break
default_perm = 'repository.none' if form_data['private'] else default
repo_to_perm.permission_id = self.sa.query(Permission)\
.filter(Permission.permission_name == default_perm)\
.one().permission_id
repo_to_perm.repository_id = new_repo.repo_id
repo_to_perm.user_id = UserModel(self.sa)\
.get_by_username('default', cache=False).user_id
self.sa.add(repo_to_perm)
#now automatically start following this repository as owner
from rhodecode.model.scm import ScmModel
ScmModel(self.sa).toggle_following_repo(new_repo.repo_id,
cur_user.user_id)
if not just_db:
self.__create_repo(repo_name, form_data['repo_type'])
def create_fork(self, form_data, cur_user):
from rhodecode.lib.celerylib import tasks, run_task
run_task(tasks.create_repo_fork, form_data, cur_user)
def delete(self, repo):
self.sa.delete(repo)
self.__delete_repo(repo)
def delete_perm_user(self, form_data, repo_name):
self.sa.query(RepoToPerm)\
.filter(RepoToPerm.repository \
== self.get_by_repo_name(repo_name))\
.filter(RepoToPerm.user_id == form_data['user_id']).delete()
def delete_stats(self, repo_name):
self.sa.query(Statistics)\
.filter(Statistics.repository == \
self.get_by_repo_name(repo_name)).delete()
def __create_repo(self, repo_name, alias):
makes repository on filesystem
:param repo_name:
:param alias:
from rhodecode.lib.utils import check_repo
repo_path = os.path.join(self.base_path, repo_name)
if check_repo(repo_name, self.base_path):
log.info('creating repo %s in %s', repo_name, repo_path)
backend = get_backend(alias)
backend(repo_path, create=True)
def __rename_repo(self, old, new):
renames repository on filesystem
:param old: old name
:param new: new name
log.info('renaming repo from %s to %s', old, new)
old_path = os.path.join(self.base_path, old)
new_path = os.path.join(self.base_path, new)
if os.path.isdir(new_path):
raise Exception('Was trying to rename to already existing dir %s',
new_path)
shutil.move(old_path, new_path)
def __delete_repo(self, repo):
removes repo from filesystem, the removal is acctually made by
added rm__ prefix into dir, and rename internat .hg/.git dirs so this
repository is no longer valid for rhodecode, can be undeleted later on
by reverting the renames on this repository
:param repo: repo object
rm_path = os.path.join(self.base_path, repo.repo_name)
log.info("Removing %s", rm_path)
#disable hg/git
alias = repo.repo_type
shutil.move(os.path.join(rm_path, '.%s' % alias),
os.path.join(rm_path, 'rm__.%s' % alias))
#disable repo
shutil.move(rm_path, os.path.join(self.base_path, 'rm__%s__%s' \
% (datetime.today(), repo.repo_name)))
Status change: