@@ -213,53 +213,53 @@ class ReposController(BaseController):
def new(self, format='html'):
"""GET /repos/new: Form to create a new item"""
new_repo = request.GET.get('repo', '')
c.new_repo = repo_name_slug(new_repo)
self.__load_defaults()
return render('admin/repos/repo_add.html')
@HasPermissionAllDecorator('hg.admin')
def update(self, repo_name):
"""
PUT /repos/repo_name: Update an existing item"""
# Forms posted to this method should contain a hidden field:
# <input type="hidden" name="_method" value="PUT" />
# Or using helpers:
# h.form(url('repo', repo_name=ID),
# method='put')
# url('repo', repo_name=ID)
repo_model = RepoModel()
changed_name = repo_name
_form = RepoForm(edit=True, old_data={'repo_name': repo_name},
repo_groups=c.repo_groups_choices)()
try:
form_result = _form.to_python(dict(request.POST))
repo_model.update(repo_name, form_result)
repo = repo_model.update(repo_name, form_result)
invalidate_cache('get_repo_cached_%s' % repo_name)
h.flash(_('Repository %s updated successfully' % repo_name),
category='success')
changed_name = form_result['repo_name_full']
changed_name = repo.repo_name
action_logger(self.rhodecode_user, 'admin_updated_repo',
changed_name, '', self.sa)
except formencode.Invalid, errors:
defaults = self.__load_data(repo_name)
defaults.update(errors.value)
return htmlfill.render(
render('admin/repos/repo_edit.html'),
defaults=defaults,
errors=errors.error_dict or {},
prefix_error=False,
encoding="UTF-8")
except Exception:
log.error(traceback.format_exc())
h.flash(_('error occurred during update of repository %s') \
% repo_name, category='error')
return redirect(url('edit_repo', repo_name=changed_name))
def delete(self, repo_name):
DELETE /repos/repo_name: Delete an existing item"""
@@ -263,49 +263,49 @@ class User(Base, BaseModel):
group_member = relationship('UsersGroupMember', cascade='all')
@property
def full_contact(self):
return '%s %s <%s>' % (self.name, self.lastname, self.email)
def short_contact(self):
return '%s %s' % (self.name, self.lastname)
def is_admin(self):
return self.admin
def __repr__(self):
return "<%s('id:%s:%s')>" % (self.__class__.__name__,
self.user_id, self.username)
except:
return self.__class__.__name__
@classmethod
def get_by_username(cls, username, case_insensitive=False):
if case_insensitive:
return Session.query(cls).filter(cls.username.like(username)).scalar()
return Session.query(cls).filter(cls.username.ilike(username)).scalar()
else:
return Session.query(cls).filter(cls.username == username).scalar()
def get_by_api_key(cls, api_key):
return Session.query(cls).filter(cls.api_key == api_key).one()
def update_lastlogin(self):
"""Update user lastlogin"""
self.last_login = datetime.datetime.now()
Session.add(self)
Session.commit()
log.debug('updated user %s lastlogin', self.username)
def create(cls, form_data):
from rhodecode.lib.auth import get_crypt_password
new_user = cls()
for k, v in form_data.items():
if k == 'password':
v = get_crypt_password(v)
@@ -534,48 +534,57 @@ class Repository(Base, BaseModel):
def groups_and_repo(self):
return self.groups_with_parents, self.just_name
@LazyProperty
def repo_path(self):
Returns base full path for that repository means where it actually
exists on a filesystem
q = Session.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/')
q.options(FromCache("sql_cache_short", "repository_repo_path"))
return q.one().ui_value
def repo_full_path(self):
p = [self.repo_path]
# we need to split the name by / since this is how we store the
# names in the database, but that eventually needs to be converted
# into a valid system path
p += self.repo_name.split('/')
return os.path.join(*p)
def get_new_name(self, repo_name):
returns new full repository name based on assigned group and new new
:param group_name:
path_prefix = self.group.full_path_splitted if self.group else []
return '/'.join(path_prefix + [repo_name])
def _ui(self):
Creates an db based ui object for this repository
from mercurial import ui
from mercurial import config
baseui = ui.ui()
#clean the baseui object
baseui._ocfg = config.config()
baseui._ucfg = config.config()
baseui._tcfg = config.config()
ret = Session.query(RhodeCodeUi)\
.options(FromCache("sql_cache_short", "repository_repo_ui")).all()
hg_ui = ret
for ui_ in hg_ui:
if ui_.ui_active:
log.debug('settings ui from db[%s]%s:%s', ui_.ui_section,
ui_.ui_key, ui_.ui_value)
baseui.setconfig(ui_.ui_section, ui_.ui_key, ui_.ui_value)
@@ -101,49 +101,49 @@ def ValidUsersGroup(edit, old_data):
if old_ugname != value or not edit:
if UsersGroup.get_by_group_name(value, cache=False,
case_insensitive=True):
raise formencode.Invalid(_('This users group '
'already exists') , value,
state)
if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+$', value) is None:
raise formencode.Invalid(_('Group name may only contain '
'alphanumeric characters '
'underscores, periods or dashes '
'and must begin with alphanumeric '
'character'), value, state)
return _ValidUsersGroup
def ValidReposGroup(edit, old_data):
class _ValidReposGroup(formencode.validators.FancyValidator):
def validate_python(self, value, state):
#TODO WRITE VALIDATIONS
group_name = value.get('group_name')
group_parent_id = int(value.get('group_parent_id') or - 1)
group_parent_id = int(value.get('group_parent_id') or -1)
# slugify repo group just in case :)
slug = repo_name_slug(group_name)
# check for parent of self
if edit and old_data['group_id'] == group_parent_id:
e_dict = {'group_parent_id':_('Cannot assign this group '
'as parent')}
raise formencode.Invalid('', value, state,
error_dict=e_dict)
old_gname = None
if edit:
old_gname = Group.get(
old_data.get('group_id')).group_name
if old_gname != group_name or not edit:
# check filesystem
gr = Group.query().filter(Group.group_name == slug)\
.filter(Group.group_parent_id == group_parent_id).scalar()
if gr:
e_dict = {'group_name':_('This group already exists')}
@@ -77,114 +77,117 @@ class RepoModel(BaseModel):
users = self.sa.query(User).filter(User.active == True).all()
u_tmpl = '''{id:%s, fname:"%s", lname:"%s", nname:"%s"},'''
users_array = '[%s]' % '\n'.join([u_tmpl % (u.user_id, u.name,
u.lastname, u.username)
for u in users])
return users_array
def get_users_groups_js(self):
users_groups = self.sa.query(UsersGroup)\
.filter(UsersGroup.users_group_active == True).all()
g_tmpl = '''{id:%s, grname:"%s",grmembers:"%s"},'''
users_groups_array = '[%s]' % '\n'.join([g_tmpl % \
(gr.users_group_id, gr.users_group_name,
len(gr.members))
for gr in users_groups])
return users_groups_array
def update(self, repo_name, form_data):
cur_repo = self.get_by_repo_name(repo_name, cache=False)
#update permissions
# update permissions
for member, perm, member_type in form_data['perms_updates']:
if member_type == 'user':
r2p = self.sa.query(RepoToPerm)\
.filter(RepoToPerm.user == User.get_by_username(member))\
.filter(RepoToPerm.repository == cur_repo)\
.one()
r2p.permission = self.sa.query(Permission)\
.filter(Permission.permission_name ==
perm).scalar()
self.sa.add(r2p)
g2p = self.sa.query(UsersGroupRepoToPerm)\
.filter(UsersGroupRepoToPerm.users_group ==
UsersGroup.get_by_group_name(member))\
.filter(UsersGroupRepoToPerm.repository ==
cur_repo).one()
g2p.permission = self.sa.query(Permission)\
self.sa.add(g2p)
#set new permissions
# set new permissions
for member, perm, member_type in form_data['perms_new']:
r2p = RepoToPerm()
r2p.repository = cur_repo
r2p.user = User.get_by_username(member)
.filter(Permission.
permission_name == perm)\
.scalar()
g2p = UsersGroupRepoToPerm()
g2p.repository = cur_repo
g2p.users_group = UsersGroup.get_by_group_name(member)
#update current repo
# update current repo
if k == 'user':
cur_repo.user = User.get_by_username(v)
elif k == 'repo_name':
cur_repo.repo_name = form_data['repo_name_full']
pass
elif k == 'repo_group':
cur_repo.group_id = v
setattr(cur_repo, k, v)
new_name = cur_repo.get_new_name(form_data['repo_name'])
cur_repo.repo_name = new_name
self.sa.add(cur_repo)
if repo_name != form_data['repo_name_full']:
if repo_name != new_name:
# rename repository
self.__rename_repo(old=repo_name,
new=form_data['repo_name_full'])
self.__rename_repo(old=repo_name, new=new_name)
self.sa.commit()
return cur_repo
self.sa.rollback()
raise
def create(self, form_data, cur_user, just_db=False, fork=False):
if fork:
repo_name = form_data['fork_name']
org_name = form_data['repo_name']
org_full_name = org_name
org_name = repo_name = form_data['repo_name']
repo_name_full = form_data['repo_name_full']
new_repo = Repository()
new_repo.enable_statistics = False
if k == 'repo_name':
v = repo_name
@@ -214,49 +217,49 @@ class RepoModel(BaseModel):
break
default_perm = 'repository.none' if form_data['private'] else default
repo_to_perm.permission_id = self.sa.query(Permission)\
.filter(Permission.permission_name == default_perm)\
.one().permission_id
repo_to_perm.repository = new_repo
repo_to_perm.user_id = User.get_by_username('default').user_id
self.sa.add(repo_to_perm)
if not just_db:
self.__create_repo(repo_name, form_data['repo_type'],
form_data['repo_group'],
form_data['clone_uri'])
#now automatically start following this repository as owner
from rhodecode.model.scm import ScmModel
ScmModel(self.sa).toggle_following_repo(new_repo.repo_id,
cur_user.user_id)
return new_repo
def create_fork(self, form_data, cur_user):
from rhodecode.lib.celerylib import tasks, run_task
run_task(tasks.create_repo_fork, form_data, cur_user)
def delete(self, repo):
self.sa.delete(repo)
self.__delete_repo(repo)
def delete_perm_user(self, form_data, repo_name):
self.sa.query(RepoToPerm)\
.filter(RepoToPerm.repository \
== self.get_by_repo_name(repo_name))\
@@ -116,41 +116,47 @@ class ReposGroupModel(BaseModel):
return new_repos_group
def update(self, repos_group_id, form_data):
repos_group = Group.get(repos_group_id)
old_path = repos_group.full_path
#change properties
repos_group.group_description = form_data['group_description']
repos_group.parent_group = Group.get(form_data['group_parent_id'])
repos_group.group_name = repos_group.get_new_name(form_data['group_name'])
new_path = repos_group.full_path
self.sa.add(repos_group)
self.__rename_group(old_path, new_path)
# we need to get all repositories from this new group and
# rename them accordingly to new group path
for r in repos_group.repositories:
r.repo_name = r.get_new_name(r.just_name)
self.sa.add(r)
return repos_group
def delete(self, users_group_id):
users_group = Group.get(users_group_id)
self.sa.delete(users_group)
self.__delete_group(users_group)
@@ -124,64 +124,64 @@ class TestLoginController(TestController
{'username':'error user',
'password':'test12',
'password_confirmation':'test12',
'email':'goodmailm',
'name':'test',
'lastname':'test'})
self.assertEqual(response.status , '200 OK')
assert 'An email address must contain a single @' in response.body
assert ('Username may only contain '
'alphanumeric characters underscores, '
'periods or dashes and must begin with '
'alphanumeric character') in response.body
def test_register_err_case_sensitive(self):
response = self.app.post(url(controller='login', action='register'),
{'username':'Test_Admin',
assert 'This username already exists' in response.body
self.assertTrue('An email address must contain a single @' in response.body)
self.assertTrue('This username already exists' in response.body)
def test_register_special_chars(self):
{'username':'xxxaxn',
'password':'ąćźżąśśśś',
'password_confirmation':'ąćźżąśśśś',
'email':'goodmailm@test.plx',
assert 'Invalid characters in password' in response.body
self.assertTrue('Invalid characters in password' in response.body)
def test_register_password_mismatch(self):
{'username':'xs',
'password':'123qwe',
'password_confirmation':'qwe123',
'email':'goodmailm@test.plxa',
assert 'Passwords do not match' in response.body
def test_register_ok(self):
username = 'test_regular4'
password = 'qweqwe'
email = 'marcin@test.com'
name = 'testname'
lastname = 'testlastname'
{'username':username,
'password':password,
import os
import unittest
from rhodecode.tests import *
from rhodecode.model.repos_group import ReposGroupModel
from rhodecode.model.db import Group
from rhodecode.model.repo import RepoModel
from rhodecode.model.db import Group, User
from sqlalchemy.exc import IntegrityError
class TestReposGroups(unittest.TestCase):
def setUp(self):
self.g1 = self.__make_group('test1', skip_if_exists=True)
self.g2 = self.__make_group('test2', skip_if_exists=True)
self.g3 = self.__make_group('test3', skip_if_exists=True)
def tearDown(self):
print 'out'
def __check_path(self, *path):
path = [TESTS_TMP_PATH] + list(path)
path = os.path.join(*path)
return os.path.isdir(path)
def _check_folders(self):
print os.listdir(TESTS_TMP_PATH)
def __make_group(self, path, desc='desc', parent_id=None,
skip_if_exists=False):
gr = Group.get_by_group_name(path)
@@ -92,24 +93,61 @@ class TestReposGroups(unittest.TestCase)
new_sg1 = self.__update_group(sg1.group_id, 'after')
self.assertTrue(self.__check_path('after'))
self.assertEqual(Group.get_by_group_name('initial'), None)
def test_update_group_parent(self):
sg1 = self.__make_group('initial', parent_id=self.g1.group_id)
new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id)
self.assertTrue(self.__check_path('test1', 'after'))
self.assertEqual(Group.get_by_group_name('test1/initial'), None)
new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
self.assertTrue(self.__check_path('test3', 'after'))
self.assertEqual(Group.get_by_group_name('test3/initial'), None)
new_sg1 = self.__update_group(sg1.group_id, 'hello')
self.assertTrue(self.__check_path('hello'))
self.assertEqual(Group.get_by_group_name('hello'), new_sg1)
def test_subgrouping_with_repo(self):
g1 = self.__make_group('g1')
g2 = self.__make_group('g2')
# create new repo
form_data = dict(repo_name='john',
repo_name_full='john',
fork_name=None,
description=None,
repo_group=None,
private=False,
repo_type='hg',
clone_uri=None)
cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
r = RepoModel().create(form_data, cur_user)
self.assertEqual(r.repo_name, 'john')
# put repo into group
form_data = form_data
form_data['repo_group'] = g1.group_id
form_data['perms_new'] = []
form_data['perms_updates'] = []
RepoModel().update(r.repo_name, form_data)
self.assertEqual(r.repo_name, 'g1/john')
self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id)
self.assertTrue(self.__check_path('g2', 'g1'))
# test repo
self.assertEqual(r.repo_name, os.path.join('g2', 'g1', r.just_name))
Status change: