@@ -141,197 +141,197 @@ class ReposController(BaseController):
#fill repository groups
for p in c.repo_info.users_group_to_perm:
defaults.update({'g_perm_%s' % p.users_group.users_group_name:
p.permission.permission_name})
return defaults
@HasPermissionAllDecorator('hg.admin')
def index(self, format='html'):
"""GET /repos: All items in the collection"""
# url('repos')
c.repos_list = ScmModel().get_repos(Repository.query()
.order_by(Repository.repo_name)
.all(), sort_key='name_sort')
return render('admin/repos/repos.html')
@HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
def create(self):
"""
POST /repos: Create a new item"""
repo_model = RepoModel()
self.__load_defaults()
form_result = {}
try:
form_result = RepoForm(repo_groups=c.repo_groups_choices)()\
.to_python(dict(request.POST))
repo_model.create(form_result, self.rhodecode_user)
if form_result['clone_uri']:
h.flash(_('created repository %s from %s') \
% (form_result['repo_name'], form_result['clone_uri']),
category='success')
else:
h.flash(_('created repository %s') % form_result['repo_name'],
if request.POST.get('user_created'):
#created by regular non admin user
action_logger(self.rhodecode_user, 'user_created_repo',
form_result['repo_name_full'], '', self.sa)
action_logger(self.rhodecode_user, 'admin_created_repo',
except formencode.Invalid, errors:
c.new_repo = errors.value['repo_name']
r = render('admin/repos/repo_add_create_repository.html')
r = render('admin/repos/repo_add.html')
return htmlfill.render(
r,
defaults=errors.value,
errors=errors.error_dict or {},
prefix_error=False,
encoding="UTF-8")
except Exception:
log.error(traceback.format_exc())
msg = _('error occurred during creation of repository %s') \
% form_result.get('repo_name')
h.flash(msg, category='error')
return redirect(url('home'))
return redirect(url('repos'))
def new(self, format='html'):
"""GET /repos/new: Form to create a new item"""
new_repo = request.GET.get('repo', '')
c.new_repo = repo_name_slug(new_repo)
return render('admin/repos/repo_add.html')
def update(self, repo_name):
PUT /repos/repo_name: Update an existing item"""
# Forms posted to this method should contain a hidden field:
# <input type="hidden" name="_method" value="PUT" />
# Or using helpers:
# h.form(url('repo', repo_name=ID),
# method='put')
# url('repo', repo_name=ID)
changed_name = repo_name
_form = RepoForm(edit=True, old_data={'repo_name': repo_name},
repo_groups=c.repo_groups_choices)()
form_result = _form.to_python(dict(request.POST))
repo_model.update(repo_name, form_result)
repo = repo_model.update(repo_name, form_result)
invalidate_cache('get_repo_cached_%s' % repo_name)
h.flash(_('Repository %s updated successfully' % repo_name),
changed_name = form_result['repo_name_full']
changed_name = repo.repo_name
action_logger(self.rhodecode_user, 'admin_updated_repo',
changed_name, '', self.sa)
defaults = self.__load_data(repo_name)
defaults.update(errors.value)
render('admin/repos/repo_edit.html'),
defaults=defaults,
h.flash(_('error occurred during update of repository %s') \
% repo_name, category='error')
return redirect(url('edit_repo', repo_name=changed_name))
def delete(self, repo_name):
DELETE /repos/repo_name: Delete an existing item"""
# <input type="hidden" name="_method" value="DELETE" />
# method='delete')
repo = repo_model.get_by_repo_name(repo_name)
if not repo:
h.flash(_('%s repository is not mapped to db perhaps'
' it was moved or renamed from the filesystem'
' please run the application again'
' in order to rescan repositories') % repo_name,
category='error')
action_logger(self.rhodecode_user, 'admin_deleted_repo',
repo_name, '', self.sa)
repo_model.delete(repo)
h.flash(_('deleted repository %s') % repo_name, category='success')
except IntegrityError, e:
if e.message.find('repositories_fork_id_fkey'):
h.flash(_('Cannot delete %s it still contains attached '
'forks') % repo_name,
category='warning')
h.flash(_('An error occurred during '
'deletion of %s') % repo_name,
except Exception, e:
h.flash(_('An error occurred during deletion of %s') % repo_name,
def delete_perm_user(self, repo_name):
DELETE an existing repository permission user
:param repo_name:
repo_model.delete_perm_user(request.POST, repo_name)
h.flash(_('An error occurred during deletion of repository user'),
raise HTTPInternalServerError()
def delete_perm_users_group(self, repo_name):
DELETE an existing repository permission users group
repo_model.delete_perm_users_group(request.POST, repo_name)
h.flash(_('An error occurred during deletion of repository'
' users groups'),
@@ -191,193 +191,193 @@ class RhodeCodeSettings(Base, BaseModel)
class RhodeCodeUi(Base, BaseModel):
__tablename__ = 'rhodecode_ui'
__table_args__ = (UniqueConstraint('ui_key'), {'extend_existing':True})
HOOK_UPDATE = 'changegroup.update'
HOOK_REPO_SIZE = 'changegroup.repo_size'
HOOK_PUSH = 'pretxnchangegroup.push_logger'
HOOK_PULL = 'preoutgoing.pull_logger'
ui_id = Column("ui_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
ui_section = Column("ui_section", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_key = Column("ui_key", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_value = Column("ui_value", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_active = Column("ui_active", Boolean(), nullable=True, unique=None, default=True)
@classmethod
def get_by_key(cls, key):
return Session.query(cls).filter(cls.ui_key == key)
def get_builtin_hooks(cls):
q = cls.query()
q = q.filter(cls.ui_key.in_([cls.HOOK_UPDATE,
cls.HOOK_REPO_SIZE,
cls.HOOK_PUSH, cls.HOOK_PULL]))
return q.all()
def get_custom_hooks(cls):
q = q.filter(~cls.ui_key.in_([cls.HOOK_UPDATE,
q = q.filter(cls.ui_section == 'hooks')
def create_or_update_hook(cls, key, val):
new_ui = cls.get_by_key(key).scalar() or cls()
new_ui.ui_section = 'hooks'
new_ui.ui_active = True
new_ui.ui_key = key
new_ui.ui_value = val
Session.add(new_ui)
Session.commit()
class User(Base, BaseModel):
__tablename__ = 'users'
__table_args__ = (UniqueConstraint('username'), UniqueConstraint('email'), {'extend_existing':True})
user_id = Column("user_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
username = Column("username", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
password = Column("password", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
active = Column("active", Boolean(), nullable=True, unique=None, default=None)
admin = Column("admin", Boolean(), nullable=True, unique=None, default=False)
name = Column("name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
lastname = Column("lastname", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
email = Column("email", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
last_login = Column("last_login", DateTime(timezone=False), nullable=True, unique=None, default=None)
ldap_dn = Column("ldap_dn", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
api_key = Column("api_key", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
user_log = relationship('UserLog', cascade='all')
user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
repositories = relationship('Repository')
user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
repo_to_perm = relationship('RepoToPerm', primaryjoin='RepoToPerm.user_id==User.user_id', cascade='all')
group_member = relationship('UsersGroupMember', cascade='all')
@property
def full_contact(self):
return '%s %s <%s>' % (self.name, self.lastname, self.email)
def short_contact(self):
return '%s %s' % (self.name, self.lastname)
def is_admin(self):
return self.admin
def __repr__(self):
return "<%s('id:%s:%s')>" % (self.__class__.__name__,
self.user_id, self.username)
except:
return self.__class__.__name__
def get_by_username(cls, username, case_insensitive=False):
if case_insensitive:
return Session.query(cls).filter(cls.username.like(username)).scalar()
return Session.query(cls).filter(cls.username.ilike(username)).scalar()
return Session.query(cls).filter(cls.username == username).scalar()
def get_by_api_key(cls, api_key):
return Session.query(cls).filter(cls.api_key == api_key).one()
def update_lastlogin(self):
"""Update user lastlogin"""
self.last_login = datetime.datetime.now()
Session.add(self)
log.debug('updated user %s lastlogin', self.username)
def create(cls, form_data):
from rhodecode.lib.auth import get_crypt_password
new_user = cls()
for k, v in form_data.items():
if k == 'password':
v = get_crypt_password(v)
setattr(new_user, k, v)
new_user.api_key = generate_api_key(form_data['username'])
Session.add(new_user)
return new_user
Session.rollback()
raise
class UserLog(Base, BaseModel):
__tablename__ = 'user_logs'
__table_args__ = {'extend_existing':True}
user_log_id = Column("user_log_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=None, default=None)
repository_id = Column("repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable=False, unique=None, default=None)
repository_name = Column("repository_name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
user_ip = Column("user_ip", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
action = Column("action", UnicodeText(length=1200000, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
action_date = Column("action_date", DateTime(timezone=False), nullable=True, unique=None, default=None)
def action_as_day(self):
return date(*self.action_date.timetuple()[:3])
user = relationship('User')
repository = relationship('Repository')
class UsersGroup(Base, BaseModel):
__tablename__ = 'users_groups'
users_group_id = Column("users_group_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
users_group_name = Column("users_group_name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=True, default=None)
users_group_active = Column("users_group_active", Boolean(), nullable=True, unique=None, default=None)
members = relationship('UsersGroupMember', cascade="all, delete, delete-orphan", lazy="joined")
return '<userGroup(%s)>' % (self.users_group_name)
def get_by_group_name(cls, group_name, cache=False, case_insensitive=False):
gr = Session.query(cls)\
.filter(cls.users_group_name.ilike(group_name))
gr = Session.query(UsersGroup)\
.filter(UsersGroup.users_group_name == group_name)
if cache:
gr = gr.options(FromCache("sql_cache_short",
"get_user_%s" % group_name))
return gr.scalar()
def get(cls, users_group_id, cache=False):
users_group = Session.query(cls)
users_group = users_group.options(FromCache("sql_cache_short",
"get_users_group_%s" % users_group_id))
return users_group.get(users_group_id)
new_users_group = cls()
setattr(new_users_group, k, v)
@@ -462,192 +462,201 @@ class Repository(Base, BaseModel):
repo_type = Column("repo_type", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=False, default='hg')
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=False, default=None)
private = Column("private", Boolean(), nullable=True, unique=None, default=None)
enable_statistics = Column("statistics", Boolean(), nullable=True, unique=None, default=True)
enable_downloads = Column("downloads", Boolean(), nullable=True, unique=None, default=True)
description = Column("description", String(length=10000, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
created_on = Column('created_on', DateTime(timezone=False), nullable=True, unique=None, default=datetime.datetime.now)
fork_id = Column("fork_id", Integer(), ForeignKey('repositories.repo_id'), nullable=True, unique=False, default=None)
group_id = Column("group_id", Integer(), ForeignKey('groups.group_id'), nullable=True, unique=False, default=None)
fork = relationship('Repository', remote_side=repo_id)
group = relationship('Group')
repo_to_perm = relationship('RepoToPerm', cascade='all', order_by='RepoToPerm.repo_to_perm_id')
users_group_to_perm = relationship('UsersGroupRepoToPerm', cascade='all')
stats = relationship('Statistics', cascade='all', uselist=False)
followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_repo_id==Repository.repo_id', cascade='all')
logs = relationship('UserLog', cascade='all')
return "<%s('%s:%s')>" % (self.__class__.__name__,
self.repo_id, self.repo_name)
def get_by_repo_name(cls, repo_name):
q = Session.query(cls).filter(cls.repo_name == repo_name)
q = q.options(joinedload(Repository.fork))\
.options(joinedload(Repository.user))\
.options(joinedload(Repository.group))\
return q.one()
def get_repo_forks(cls, repo_id):
return Session.query(cls).filter(Repository.fork_id == repo_id)
def base_path(cls):
Returns base path when all repos are stored
:param cls:
q = Session.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/')
q.options(FromCache("sql_cache_short", "repository_repo_path"))
return q.one().ui_value
def just_name(self):
return self.repo_name.split(os.sep)[-1]
def groups_with_parents(self):
groups = []
if self.group is None:
return groups
cur_gr = self.group
groups.insert(0, cur_gr)
while 1:
gr = getattr(cur_gr, 'parent_group', None)
cur_gr = cur_gr.parent_group
if gr is None:
break
groups.insert(0, gr)
def groups_and_repo(self):
return self.groups_with_parents, self.just_name
@LazyProperty
def repo_path(self):
Returns base full path for that repository means where it actually
exists on a filesystem
def repo_full_path(self):
p = [self.repo_path]
# we need to split the name by / since this is how we store the
# names in the database, but that eventually needs to be converted
# into a valid system path
p += self.repo_name.split('/')
return os.path.join(*p)
def get_new_name(self, repo_name):
returns new full repository name based on assigned group and new new
:param group_name:
path_prefix = self.group.full_path_splitted if self.group else []
return '/'.join(path_prefix + [repo_name])
def _ui(self):
Creates an db based ui object for this repository
from mercurial import ui
from mercurial import config
baseui = ui.ui()
#clean the baseui object
baseui._ocfg = config.config()
baseui._ucfg = config.config()
baseui._tcfg = config.config()
ret = Session.query(RhodeCodeUi)\
.options(FromCache("sql_cache_short", "repository_repo_ui")).all()
hg_ui = ret
for ui_ in hg_ui:
if ui_.ui_active:
log.debug('settings ui from db[%s]%s:%s', ui_.ui_section,
ui_.ui_key, ui_.ui_value)
baseui.setconfig(ui_.ui_section, ui_.ui_key, ui_.ui_value)
return baseui
def is_valid(cls, repo_name):
returns True if given repo name is a valid filesystem repository
@param cls:
@param repo_name:
from rhodecode.lib.utils import is_valid_repo
return is_valid_repo(repo_name, cls.base_path())
#==========================================================================
# SCM PROPERTIES
def get_changeset(self, rev):
return get_changeset_safe(self.scm_instance, rev)
def tip(self):
return self.get_changeset('tip')
def author(self):
return self.tip.author
def last_change(self):
return self.scm_instance.last_change
# SCM CACHE INSTANCE
def invalidate(self):
Returns Invalidation object if this repo should be invalidated
None otherwise. `cache_active = False` means that this cache
state is not valid and needs to be invalidated
return Session.query(CacheInvalidation)\
.filter(CacheInvalidation.cache_key == self.repo_name)\
.filter(CacheInvalidation.cache_active == False)\
.scalar()
def set_invalidate(self):
set a cache for invalidation for this instance
inv = Session.query(CacheInvalidation)\
if inv is None:
inv = CacheInvalidation(self.repo_name)
inv.cache_active = True
Session.add(inv)
def scm_instance(self):
return self.__get_instance()
def scm_instance_cached(self):
@cache_region('long_term')
@@ -29,193 +29,193 @@ from formencode import All
from formencode.validators import UnicodeString, OneOf, Int, Number, Regex, \
Email, Bool, StringBoolean, Set
from pylons.i18n.translation import _
from webhelpers.pylonslib.secure_form import authentication_token
from rhodecode.config.routing import ADMIN_PREFIX
from rhodecode.lib.utils import repo_name_slug
from rhodecode.lib.auth import authenticate, get_crypt_password
from rhodecode.lib.exceptions import LdapImportError
from rhodecode.model.user import UserModel
from rhodecode.model.repo import RepoModel
from rhodecode.model.db import User, UsersGroup, Group
from rhodecode import BACKENDS
log = logging.getLogger(__name__)
#this is needed to translate the messages using _() in validators
class State_obj(object):
_ = staticmethod(_)
#==============================================================================
# VALIDATORS
class ValidAuthToken(formencode.validators.FancyValidator):
messages = {'invalid_token':_('Token mismatch')}
def validate_python(self, value, state):
if value != authentication_token():
raise formencode.Invalid(self.message('invalid_token', state,
search_number=value), value, state)
def ValidUsername(edit, old_data):
class _ValidUsername(formencode.validators.FancyValidator):
if value in ['default', 'new_user']:
raise formencode.Invalid(_('Invalid username'), value, state)
#check if user is unique
old_un = None
if edit:
old_un = UserModel().get(old_data.get('user_id')).username
if old_un != value or not edit:
if User.get_by_username(value, case_insensitive=True):
raise formencode.Invalid(_('This username already '
'exists') , value, state)
if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+$', value) is None:
raise formencode.Invalid(_('Username may only contain '
'alphanumeric characters '
'underscores, periods or dashes '
'and must begin with alphanumeric '
'character'), value, state)
return _ValidUsername
def ValidUsersGroup(edit, old_data):
class _ValidUsersGroup(formencode.validators.FancyValidator):
if value in ['default']:
raise formencode.Invalid(_('Invalid group name'), value, state)
#check if group is unique
old_ugname = None
old_ugname = UsersGroup.get(
old_data.get('users_group_id')).users_group_name
if old_ugname != value or not edit:
if UsersGroup.get_by_group_name(value, cache=False,
case_insensitive=True):
raise formencode.Invalid(_('This users group '
'already exists') , value,
state)
raise formencode.Invalid(_('Group name may only contain '
return _ValidUsersGroup
def ValidReposGroup(edit, old_data):
class _ValidReposGroup(formencode.validators.FancyValidator):
#TODO WRITE VALIDATIONS
group_name = value.get('group_name')
group_parent_id = int(value.get('group_parent_id') or - 1)
group_parent_id = int(value.get('group_parent_id') or -1)
# slugify repo group just in case :)
slug = repo_name_slug(group_name)
# check for parent of self
if edit and old_data['group_id'] == group_parent_id:
e_dict = {'group_parent_id':_('Cannot assign this group '
'as parent')}
raise formencode.Invalid('', value, state,
error_dict=e_dict)
old_gname = None
old_gname = Group.get(
old_data.get('group_id')).group_name
if old_gname != group_name or not edit:
# check filesystem
gr = Group.query().filter(Group.group_name == slug)\
.filter(Group.group_parent_id == group_parent_id).scalar()
if gr:
e_dict = {'group_name':_('This group already exists')}
return _ValidReposGroup
class ValidPassword(formencode.validators.FancyValidator):
def to_python(self, value, state):
if value:
if value.get('password'):
value['password'] = get_crypt_password(value['password'])
except UnicodeEncodeError:
e_dict = {'password':_('Invalid characters in password')}
raise formencode.Invalid('', value, state, error_dict=e_dict)
if value.get('password_confirmation'):
value['password_confirmation'] = \
get_crypt_password(value['password_confirmation'])
e_dict = {'password_confirmation':_('Invalid characters in password')}
if value.get('new_password'):
value['new_password'] = \
get_crypt_password(value['new_password'])
e_dict = {'new_password':_('Invalid characters in password')}
return value
class ValidPasswordsMatch(formencode.validators.FancyValidator):
if value['password'] != value['password_confirmation']:
e_dict = {'password_confirmation':
_('Passwords do not match')}
class ValidAuth(formencode.validators.FancyValidator):
messages = {
'invalid_password':_('invalid password'),
'invalid_login':_('invalid user name'),
'disabled_account':_('Your account is disabled')
}
#error mapping
e_dict = {'username':messages['invalid_login'],
'password':messages['invalid_password']}
e_dict_disable = {'username':messages['disabled_account']}
password = value['password']
username = value['username']
user = User.get_by_username(username)
if authenticate(username, password):
if user and user.active is False:
log.warning('user %s is disabled', username)
raise formencode.Invalid(self.message('disabled_account',
state=State_obj),
value, state,
error_dict=self.e_dict_disable)
log.warning('user %s not authenticated', username)
@@ -5,330 +5,333 @@
Repository model for rhodecode
:created_on: Jun 5, 2010
:author: marcink
:copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
:license: GPLv3, see COPYING for more details.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import shutil
import logging
import traceback
from datetime import datetime
from sqlalchemy.orm import joinedload, make_transient
from vcs.utils.lazy import LazyProperty
from vcs.backends import get_backend
from rhodecode.lib import safe_str
from rhodecode.model import BaseModel
from rhodecode.model.caching_query import FromCache
from rhodecode.model.db import Repository, RepoToPerm, User, Permission, \
Statistics, UsersGroup, UsersGroupRepoToPerm, RhodeCodeUi, Group
class RepoModel(BaseModel):
def repos_path(self):
"""Get's the repositories root path from database
q = self.sa.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/').one()
return q.ui_value
def get(self, repo_id, cache=False):
repo = self.sa.query(Repository)\
.filter(Repository.repo_id == repo_id)
repo = repo.options(FromCache("sql_cache_short",
"get_repo_%s" % repo_id))
return repo.scalar()
def get_by_repo_name(self, repo_name, cache=False):
.filter(Repository.repo_name == repo_name)
"get_repo_%s" % repo_name))
def get_users_js(self):
users = self.sa.query(User).filter(User.active == True).all()
u_tmpl = '''{id:%s, fname:"%s", lname:"%s", nname:"%s"},'''
users_array = '[%s]' % '\n'.join([u_tmpl % (u.user_id, u.name,
u.lastname, u.username)
for u in users])
return users_array
def get_users_groups_js(self):
users_groups = self.sa.query(UsersGroup)\
.filter(UsersGroup.users_group_active == True).all()
g_tmpl = '''{id:%s, grname:"%s",grmembers:"%s"},'''
users_groups_array = '[%s]' % '\n'.join([g_tmpl % \
(gr.users_group_id, gr.users_group_name,
len(gr.members))
for gr in users_groups])
return users_groups_array
def update(self, repo_name, form_data):
cur_repo = self.get_by_repo_name(repo_name, cache=False)
#update permissions
# update permissions
for member, perm, member_type in form_data['perms_updates']:
if member_type == 'user':
r2p = self.sa.query(RepoToPerm)\
.filter(RepoToPerm.user == User.get_by_username(member))\
.filter(RepoToPerm.repository == cur_repo)\
.one()
r2p.permission = self.sa.query(Permission)\
.filter(Permission.permission_name ==
perm).scalar()
self.sa.add(r2p)
g2p = self.sa.query(UsersGroupRepoToPerm)\
.filter(UsersGroupRepoToPerm.users_group ==
UsersGroup.get_by_group_name(member))\
.filter(UsersGroupRepoToPerm.repository ==
cur_repo).one()
g2p.permission = self.sa.query(Permission)\
self.sa.add(g2p)
#set new permissions
# set new permissions
for member, perm, member_type in form_data['perms_new']:
r2p = RepoToPerm()
r2p.repository = cur_repo
r2p.user = User.get_by_username(member)
.filter(Permission.
permission_name == perm)\
g2p = UsersGroupRepoToPerm()
g2p.repository = cur_repo
g2p.users_group = UsersGroup.get_by_group_name(member)
#update current repo
# update current repo
if k == 'user':
cur_repo.user = User.get_by_username(v)
elif k == 'repo_name':
cur_repo.repo_name = form_data['repo_name_full']
pass
elif k == 'repo_group':
cur_repo.group_id = v
setattr(cur_repo, k, v)
new_name = cur_repo.get_new_name(form_data['repo_name'])
cur_repo.repo_name = new_name
self.sa.add(cur_repo)
if repo_name != form_data['repo_name_full']:
if repo_name != new_name:
# rename repository
self.__rename_repo(old=repo_name,
new=form_data['repo_name_full'])
self.__rename_repo(old=repo_name, new=new_name)
self.sa.commit()
return cur_repo
self.sa.rollback()
def create(self, form_data, cur_user, just_db=False, fork=False):
if fork:
repo_name = form_data['fork_name']
org_name = form_data['repo_name']
org_full_name = org_name
org_name = repo_name = form_data['repo_name']
repo_name_full = form_data['repo_name_full']
new_repo = Repository()
new_repo.enable_statistics = False
if k == 'repo_name':
v = repo_name
v = repo_name_full
if k == 'repo_group':
k = 'group_id'
if k == 'description':
v = v or repo_name
setattr(new_repo, k, v)
parent_repo = self.sa.query(Repository)\
.filter(Repository.repo_name == org_full_name).one()
new_repo.fork = parent_repo
new_repo.user_id = cur_user.user_id
self.sa.add(new_repo)
#create default permission
repo_to_perm = RepoToPerm()
default = 'repository.read'
for p in User.get_by_username('default').user_perms:
if p.permission.permission_name.startswith('repository.'):
default = p.permission.permission_name
default_perm = 'repository.none' if form_data['private'] else default
repo_to_perm.permission_id = self.sa.query(Permission)\
.filter(Permission.permission_name == default_perm)\
.one().permission_id
repo_to_perm.repository = new_repo
repo_to_perm.user_id = User.get_by_username('default').user_id
self.sa.add(repo_to_perm)
if not just_db:
self.__create_repo(repo_name, form_data['repo_type'],
form_data['repo_group'],
form_data['clone_uri'])
#now automatically start following this repository as owner
from rhodecode.model.scm import ScmModel
ScmModel(self.sa).toggle_following_repo(new_repo.repo_id,
cur_user.user_id)
return new_repo
def create_fork(self, form_data, cur_user):
from rhodecode.lib.celerylib import tasks, run_task
run_task(tasks.create_repo_fork, form_data, cur_user)
def delete(self, repo):
self.sa.delete(repo)
self.__delete_repo(repo)
def delete_perm_user(self, form_data, repo_name):
self.sa.query(RepoToPerm)\
.filter(RepoToPerm.repository \
== self.get_by_repo_name(repo_name))\
.filter(RepoToPerm.user_id == form_data['user_id']).delete()
def delete_perm_users_group(self, form_data, repo_name):
self.sa.query(UsersGroupRepoToPerm)\
.filter(UsersGroupRepoToPerm.repository \
.filter(UsersGroupRepoToPerm.users_group_id \
== form_data['users_group_id']).delete()
def delete_stats(self, repo_name):
self.sa.query(Statistics)\
.filter(Statistics.repository == \
self.get_by_repo_name(repo_name)).delete()
def __create_repo(self, repo_name, alias, new_parent_id, clone_uri=False):
makes repository on filesystem. It's group aware means it'll create
a repository within a group, and alter the paths accordingly of
group location
:param alias:
:param parent_id:
:param clone_uri:
if new_parent_id:
paths = Group.get(new_parent_id).full_path.split(Group.url_sep())
new_parent_path = os.sep.join(paths)
new_parent_path = ''
repo_path = os.path.join(*map(lambda x:safe_str(x),
[self.repos_path, new_parent_path, repo_name]))
if is_valid_repo(repo_path, self.repos_path) is False:
log.info('creating repo %s in %s @ %s', repo_name, repo_path,
clone_uri)
backend = get_backend(alias)
backend(repo_path, create=True, src_url=clone_uri)
def __rename_repo(self, old, new):
renames repository on filesystem
:param old: old name
:param new: new name
log.info('renaming repo from %s to %s', old, new)
old_path = os.path.join(self.repos_path, old)
new_path = os.path.join(self.repos_path, new)
@@ -44,113 +44,119 @@ class ReposGroupModel(BaseModel):
Get's the repositories root path from database
q = RhodeCodeUi.get_by_key('/').one()
def __create_group(self, group_name):
makes repositories group on filesystem
create_path = os.path.join(self.repos_path, group_name)
log.debug('creating new group in %s', create_path)
if os.path.isdir(create_path):
raise Exception('That directory already exists !')
os.makedirs(create_path)
def __rename_group(self, old, new):
Renames a group on filesystem
if old == new:
log.debug('skipping group rename')
return
log.debug('renaming repos group from %s to %s', old, new)
log.debug('renaming repos paths from %s to %s', old_path, new_path)
if os.path.isdir(new_path):
raise Exception('Was trying to rename to already '
'existing dir %s' % new_path)
shutil.move(old_path, new_path)
def __delete_group(self, group):
Deletes a group from a filesystem
:param group: instance of group from database
paths = group.full_path.split(Group.url_sep())
paths = os.sep.join(paths)
rm_path = os.path.join(self.repos_path, paths)
os.rmdir(rm_path)
def create(self, form_data):
new_repos_group = Group()
new_repos_group.group_description = form_data['group_description']
new_repos_group.parent_group = Group.get(form_data['group_parent_id'])
new_repos_group.group_name = new_repos_group.get_new_name(form_data['group_name'])
self.sa.add(new_repos_group)
self.__create_group(new_repos_group.group_name)
return new_repos_group
def update(self, repos_group_id, form_data):
repos_group = Group.get(repos_group_id)
old_path = repos_group.full_path
#change properties
repos_group.group_description = form_data['group_description']
repos_group.parent_group = Group.get(form_data['group_parent_id'])
repos_group.group_name = repos_group.get_new_name(form_data['group_name'])
new_path = repos_group.full_path
self.sa.add(repos_group)
self.__rename_group(old_path, new_path)
# we need to get all repositories from this new group and
# rename them accordingly to new group path
for r in repos_group.repositories:
r.repo_name = r.get_new_name(r.just_name)
self.sa.add(r)
return repos_group
def delete(self, users_group_id):
users_group = Group.get(users_group_id)
self.sa.delete(users_group)
self.__delete_group(users_group)
@@ -52,208 +52,208 @@ class TestLoginController(TestController
{'username':'test_admin',
'password':'as'})
self.assertEqual(response.status, '200 OK')
self.assertTrue('Enter 3 characters or more' in response.body)
def test_login_wrong_username_password(self):
response = self.app.post(url(controller='login', action='index'),
{'username':'error',
'password':'test12'})
self.assertEqual(response.status , '200 OK')
self.assertTrue('invalid user name' in response.body)
self.assertTrue('invalid password' in response.body)
# REGISTRATIONS
def test_register(self):
response = self.app.get(url(controller='login', action='register'))
self.assertTrue('Sign Up to RhodeCode' in response.body)
def test_register_err_same_username(self):
response = self.app.post(url(controller='login', action='register'),
'password':'test12',
'password_confirmation':'test12',
'email':'goodmail@domain.com',
'name':'test',
'lastname':'test'})
self.assertTrue('This username already exists' in response.body)
def test_register_err_same_email(self):
{'username':'test_admin_0',
'email':'test_admin@mail.com',
assert 'This e-mail address is already taken' in response.body
def test_register_err_same_email_case_sensitive(self):
{'username':'test_admin_1',
'email':'TesT_Admin@mail.COM',
def test_register_err_wrong_data(self):
{'username':'xs',
'password':'test',
'password_confirmation':'test',
'email':'goodmailm',
assert 'An email address must contain a single @' in response.body
assert 'Enter a value 6 characters long or more' in response.body
def test_register_err_username(self):
{'username':'error user',
assert ('Username may only contain '
'alphanumeric characters underscores, '
'periods or dashes and must begin with '
'alphanumeric character') in response.body
def test_register_err_case_sensitive(self):
{'username':'Test_Admin',
assert 'This username already exists' in response.body
self.assertTrue('An email address must contain a single @' in response.body)
def test_register_special_chars(self):
{'username':'xxxaxn',
'password':'ąćźżąśśśś',
'password_confirmation':'ąćźżąśśśś',
'email':'goodmailm@test.plx',
assert 'Invalid characters in password' in response.body
self.assertTrue('Invalid characters in password' in response.body)
def test_register_password_mismatch(self):
'password':'123qwe',
'password_confirmation':'qwe123',
'email':'goodmailm@test.plxa',
assert 'Passwords do not match' in response.body
def test_register_ok(self):
username = 'test_regular4'
password = 'qweqwe'
email = 'marcin@test.com'
name = 'testname'
lastname = 'testlastname'
{'username':username,
'password':password,
'password_confirmation':password,
'email':email,
'name':name,
'lastname':lastname})
self.assertEqual(response.status , '302 Found')
assert 'You have successfully registered into rhodecode' in response.session['flash'][0], 'No flash message about user registration'
ret = self.sa.query(User).filter(User.username == 'test_regular4').one()
assert ret.username == username , 'field mismatch %s %s' % (ret.username, username)
assert check_password(password, ret.password) == True , 'password mismatch'
assert ret.email == email , 'field mismatch %s %s' % (ret.email, email)
assert ret.name == name , 'field mismatch %s %s' % (ret.name, name)
assert ret.lastname == lastname , 'field mismatch %s %s' % (ret.lastname, lastname)
def test_forgot_password_wrong_mail(self):
response = self.app.post(url(controller='login', action='password_reset'),
{'email':'marcin@wrongmail.org', })
assert "This e-mail address doesn't exist" in response.body, 'Missing error message about wrong email'
def test_forgot_password(self):
response = self.app.get(url(controller='login',
action='password_reset'))
username = 'test_password_reset_1'
email = 'marcin@python-works.com'
name = 'passwd'
lastname = 'reset'
new = User()
new.username = username
new.password = password
new.email = email
new.name = name
new.lastname = lastname
new.api_key = generate_api_key(username)
self.sa.add(new)
response = self.app.post(url(controller='login',
action='password_reset'),
{'email':email, })
self.checkSessionFlash(response, 'Your password reset link was sent')
response = response.follow()
# BAD KEY
key = "bad"
action='password_reset_confirmation',
key=key))
self.assertEqual(response.status, '302 Found')
self.assertTrue(response.location.endswith(url('reset_password')))
# GOOD KEY
key = User.get_by_username(username).api_key
self.assertTrue(response.location.endswith(url('login_home')))
self.checkSessionFlash(response,
('Your password reset was successful, '
'new password has been sent to your email'))
import unittest
from rhodecode.tests import *
from rhodecode.model.repos_group import ReposGroupModel
from rhodecode.model.db import Group
from rhodecode.model.db import Group, User
from sqlalchemy.exc import IntegrityError
class TestReposGroups(unittest.TestCase):
def setUp(self):
self.g1 = self.__make_group('test1', skip_if_exists=True)
self.g2 = self.__make_group('test2', skip_if_exists=True)
self.g3 = self.__make_group('test3', skip_if_exists=True)
def tearDown(self):
print 'out'
def __check_path(self, *path):
path = [TESTS_TMP_PATH] + list(path)
path = os.path.join(*path)
return os.path.isdir(path)
def _check_folders(self):
print os.listdir(TESTS_TMP_PATH)
def __make_group(self, path, desc='desc', parent_id=None,
skip_if_exists=False):
gr = Group.get_by_group_name(path)
if gr and skip_if_exists:
return gr
form_data = dict(group_name=path,
group_description=desc,
group_parent_id=parent_id)
gr = ReposGroupModel().create(form_data)
def __delete_group(self, id_):
ReposGroupModel().delete(id_)
def __update_group(self, id_, path, desc='desc', parent_id=None):
gr = ReposGroupModel().update(id_, form_data)
def test_create_group(self):
g = self.__make_group('newGroup')
self.assertEqual(g.full_path, 'newGroup')
self.assertTrue(self.__check_path('newGroup'))
def test_create_same_name_group(self):
self.assertRaises(IntegrityError, lambda:self.__make_group('newGroup'))
def test_same_subgroup(self):
sg1 = self.__make_group('sub1', parent_id=self.g1.group_id)
self.assertEqual(sg1.parent_group, self.g1)
self.assertEqual(sg1.full_path, 'test1/sub1')
self.assertTrue(self.__check_path('test1', 'sub1'))
ssg1 = self.__make_group('subsub1', parent_id=sg1.group_id)
self.assertEqual(ssg1.parent_group, sg1)
self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1')
self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1'))
def test_remove_group(self):
sg1 = self.__make_group('deleteme')
self.__delete_group(sg1.group_id)
self.assertEqual(Group.get(sg1.group_id), None)
self.assertFalse(self.__check_path('deteteme'))
sg1 = self.__make_group('deleteme', parent_id=self.g1.group_id)
self.assertFalse(self.__check_path('test1', 'deteteme'))
def test_rename_single_group(self):
sg1 = self.__make_group('initial')
new_sg1 = self.__update_group(sg1.group_id, 'after')
self.assertTrue(self.__check_path('after'))
self.assertEqual(Group.get_by_group_name('initial'), None)
def test_update_group_parent(self):
sg1 = self.__make_group('initial', parent_id=self.g1.group_id)
new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id)
self.assertTrue(self.__check_path('test1', 'after'))
self.assertEqual(Group.get_by_group_name('test1/initial'), None)
new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
self.assertTrue(self.__check_path('test3', 'after'))
self.assertEqual(Group.get_by_group_name('test3/initial'), None)
new_sg1 = self.__update_group(sg1.group_id, 'hello')
self.assertTrue(self.__check_path('hello'))
self.assertEqual(Group.get_by_group_name('hello'), new_sg1)
def test_subgrouping_with_repo(self):
g1 = self.__make_group('g1')
g2 = self.__make_group('g2')
# create new repo
form_data = dict(repo_name='john',
repo_name_full='john',
fork_name=None,
description=None,
repo_group=None,
private=False,
repo_type='hg',
clone_uri=None)
cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
r = RepoModel().create(form_data, cur_user)
self.assertEqual(r.repo_name, 'john')
# put repo into group
form_data = form_data
form_data['repo_group'] = g1.group_id
form_data['perms_new'] = []
form_data['perms_updates'] = []
RepoModel().update(r.repo_name, form_data)
self.assertEqual(r.repo_name, 'g1/john')
self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id)
self.assertTrue(self.__check_path('g2', 'g1'))
# test repo
self.assertEqual(r.repo_name, os.path.join('g2', 'g1', r.just_name))
Status change: