@@ -1486,387 +1486,388 @@ class ApiController(JSONRPCController):
try:
repo_name_cleaned = repo_name.split('/')[-1]
# create structure of groups and return the last group
repo_group = map_groups(repo_name)
data = dict(
repo_name=repo_name_cleaned,
repo_name_full=repo_name,
repo_type=repo_type,
repo_description=description,
owner=owner,
repo_private=private,
clone_uri=clone_uri,
repo_group=repo_group,
repo_landing_rev=landing_rev,
enable_statistics=enable_statistics,
enable_locking=enable_locking,
enable_downloads=enable_downloads,
repo_copy_permissions=copy_permissions,
)
task = RepoModel().create(form_data=data, cur_user=owner)
from celery.result import BaseAsyncResult
task_id = None
if isinstance(task, BaseAsyncResult):
task_id = task.task_id
# no commit, it's done in RepoModel, or async via celery
return dict(
msg="Created new repository `%s`" % (repo_name,),
success=True, # cannot return the repo data here since fork
# can be done async
task=task_id
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError(
'failed to create repository `%s`' % (repo_name,))
# permission check inside
def update_repo(self, apiuser, repoid, name=Optional(None),
owner=Optional(OAttr('apiuser')),
group=Optional(None),
description=Optional(''), private=Optional(False),
clone_uri=Optional(None), landing_rev=Optional('rev:tip'),
enable_statistics=Optional(False),
enable_locking=Optional(False),
enable_downloads=Optional(False)):
"""
Updates repo
:param apiuser: filled automatically from apikey
:type apiuser: AuthUser
:param repoid: repository name or repository id
:type repoid: str or int
:param name:
:param owner:
:param group:
:param description:
:param private:
:param clone_uri:
:param landing_rev:
:param enable_statistics:
:param enable_locking:
:param enable_downloads:
repo = get_repo_or_error(repoid)
if not HasPermissionAnyApi('hg.admin')(user=apiuser):
# check if we have admin permission for this repo !
if not HasRepoPermissionAnyApi('repository.admin')(user=apiuser,
repo_name=repo.repo_name):
raise JSONRPCError('repository `%s` does not exist' % (repoid,))
if (name != repo.repo_name and
not HasPermissionAnyApi('hg.create.repository')(user=apiuser)
):
raise JSONRPCError('no permission to create (or move) repositories')
if not isinstance(owner, Optional):
#forbid setting owner for non-admins
'Only Kallithea admin can specify `owner` param'
updates = {}
repo_group = group
if not isinstance(repo_group, Optional):
repo_group = get_repo_group_or_error(repo_group)
repo_group = repo_group.group_id
store_update(updates, name, 'repo_name')
store_update(updates, repo_group, 'repo_group')
store_update(updates, owner, 'user')
store_update(updates, description, 'repo_description')
store_update(updates, private, 'repo_private')
store_update(updates, clone_uri, 'clone_uri')
store_update(updates, landing_rev, 'repo_landing_rev')
store_update(updates, enable_statistics, 'repo_enable_statistics')
store_update(updates, enable_locking, 'repo_enable_locking')
store_update(updates, enable_downloads, 'repo_enable_downloads')
RepoModel().update(repo, **updates)
Session().commit()
msg='updated repo ID:%s %s' % (repo.repo_id, repo.repo_name),
repository=repo.get_api_data()
raise JSONRPCError('failed to update repo `%s`' % repoid)
@HasPermissionAnyDecorator('hg.admin', 'hg.fork.repository')
def fork_repo(self, apiuser, repoid, fork_name,
description=Optional(''), copy_permissions=Optional(False),
private=Optional(False), landing_rev=Optional('rev:tip')):
Creates a fork of given repo. In case of using celery this will
immediately return success message, while fork is going to be created
asynchronous. This command can be executed only using api_key belonging to
user with admin rights or regular user that have fork permission, and at least
read access to forking repository. Regular users cannot specify owner parameter.
:param fork_name:
:param copy_permissions:
INPUT::
id : <id_for_response>
api_key : "<api_key>"
args: {
"repoid" : "<reponame or repo_id>",
"fork_name": "<forkname>",
"owner": "<username or user_id = Optional(=apiuser)>",
"description": "<description>",
"copy_permissions": "<bool>",
"private": "<bool>",
"landing_rev": "<landing_rev>"
}
OUTPUT::
id : <id_given_in_input>
result: {
"msg": "Created fork of `<reponame>` as `<forkname>`",
"success": true,
"task": "<celery task id or None if done sync>"
error: null
repo_name = repo.repo_name
_repo = RepoModel().get_by_repo_name(fork_name)
if _repo:
type_ = 'fork' if _repo.fork else 'repo'
raise JSONRPCError("%s `%s` already exist" % (type_, fork_name))
if HasPermissionAnyApi('hg.admin')(user=apiuser):
pass
elif HasRepoPermissionAnyApi('repository.admin',
'repository.write',
'repository.read')(user=apiuser,
if not HasPermissionAnyApi('hg.create.repository')(user=apiuser):
raise JSONRPCError('no permission to create repositories')
else:
if isinstance(owner, Optional):
owner = apiuser.user_id
owner = get_user_or_error(owner)
group = map_groups(fork_name)
fork_base_name = fork_name.rsplit('/', 1)[-1]
form_data = dict(
repo_name=fork_name,
repo_name=fork_base_name,
repo_name_full=fork_name,
repo_group=group,
repo_type=repo.repo_type,
description=Optional.extract(description),
private=Optional.extract(private),
copy_permissions=Optional.extract(copy_permissions),
landing_rev=Optional.extract(landing_rev),
update_after_clone=False,
fork_parent_id=repo.repo_id,
task = RepoModel().create_fork(form_data, cur_user=owner)
msg='Created fork of `%s` as `%s`' % (repo.repo_name,
fork_name),
'failed to fork repository `%s` as `%s`' % (repo_name,
fork_name)
def delete_repo(self, apiuser, repoid, forks=Optional('')):
Deletes a repository. This command can be executed only using api_key belonging
to user with admin rights or regular user that have admin access to repository.
When `forks` param is set it's possible to detach or delete forks of deleting
repository
:param forks: `detach` or `delete`, what do do with attached forks for repo
:type forks: Optional(str)
"msg": "Deleted repository `<reponame>`",
"success": true
handle_forks = Optional.extract(forks)
_forks_msg = ''
_forks = [f for f in repo.forks]
if handle_forks == 'detach':
_forks_msg = ' ' + 'Detached %s forks' % len(_forks)
elif handle_forks == 'delete':
_forks_msg = ' ' + 'Deleted %s forks' % len(_forks)
elif _forks:
'Cannot delete `%s` it still contains attached forks' %
(repo.repo_name,)
RepoModel().delete(repo, forks=forks)
msg='Deleted repository `%s`%s' % (repo.repo_name, _forks_msg),
success=True
'failed to delete repository `%s`' % (repo.repo_name,)
@HasPermissionAllDecorator('hg.admin')
def grant_user_permission(self, apiuser, repoid, userid, perm):
Grant permission for user on given repository, or update existing one
if found. This command can be executed only using api_key belonging to user
with admin rights.
:param userid:
:param perm: (repository.(none|read|write|admin))
:type perm: str
"msg" : "Granted perm: `<perm>` for user: `<username>` in repo: `<reponame>`",
user = get_user_or_error(userid)
perm = get_perm_or_error(perm)
RepoModel().grant_user_permission(repo=repo, user=user, perm=perm)
msg='Granted perm: `%s` for user: `%s` in repo: `%s`' % (
perm.permission_name, user.username, repo.repo_name
),
'failed to edit permission for user: `%s` in repo: `%s`' % (
userid, repoid
def revoke_user_permission(self, apiuser, repoid, userid):
Revoke permission for user on given repository. This command can be executed
only using api_key belonging to user with admin rights.
"msg" : "Revoked perm for user: `<username>` in repo: `<reponame>`",
RepoModel().revoke_user_permission(repo=repo, user=user)
msg='Revoked perm for user: `%s` in repo: `%s`' % (
user.username, repo.repo_name
def grant_user_group_permission(self, apiuser, repoid, usergroupid, perm):
Grant permission for user group on given repository, or update
existing one if found. This command can be executed only using
api_key belonging to user with admin rights.
:param usergroupid: id of usergroup
:type usergroupid: str or int
@@ -1202,386 +1202,389 @@ class _BaseTestApi(object):
repo_name = u'/'.join([updates['group'], repo_name.rsplit('/', 1)[-1]])
expected = {
'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name),
'repository': repo.get_api_data()
self._compare_ok(id_, expected, given=response.body)
finally:
fixture.destroy_repo(repo_name)
if changing_attr == 'repo_group':
fixture.destroy_repo_group(updates['group'])
fixture.destroy_repo_group(group_name)
def test_api_update_repo_repo_group_does_not_exist(self):
repo_name = 'admin_owned'
fixture.create_repo(repo_name)
updates = {'group': 'test_group_for_update'}
id_, params = _build_data(self.apikey, 'update_repo',
repoid=repo_name, **updates)
response = api_call(self, params)
expected = 'repository group `%s` does not exist' % updates['group']
self._compare_error(id_, expected, given=response.body)
def test_api_update_repo_regular_user_not_allowed(self):
updates = {'active': False}
id_, params = _build_data(self.apikey_regular, 'update_repo',
expected = 'repository `%s` does not exist' % repo_name
@mock.patch.object(RepoModel, 'update', crash)
def test_api_update_repo_exception_occurred(self):
repo_name = 'api_update_me'
fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
repoid=repo_name, owner=TEST_USER_ADMIN_LOGIN,)
expected = 'failed to update repo `%s`' % repo_name
def test_api_update_repo_regular_user_change_repo_name(self):
new_repo_name = 'new_repo_name'
RepoModel().grant_user_permission(repo=repo_name,
user=self.TEST_USER_LOGIN,
perm='repository.admin')
UserModel().revoke_perm('default', 'hg.create.repository')
UserModel().grant_perm('default', 'hg.create.none')
updates = {'name': new_repo_name}
expected = 'no permission to create (or move) repositories'
fixture.destroy_repo(new_repo_name)
def test_api_update_repo_regular_user_change_repo_name_allowed(self):
repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
UserModel().revoke_perm('default', 'hg.create.none')
UserModel().grant_perm('default', 'hg.create.repository')
'msg': 'updated repo ID:%s %s' % (repo.repo_id, new_repo_name),
def test_api_update_repo_regular_user_change_owner(self):
updates = {'owner': TEST_USER_ADMIN_LOGIN}
expected = 'Only Kallithea admin can specify `owner` param'
def test_api_delete_repo(self):
repo_name = 'api_delete_me'
id_, params = _build_data(self.apikey, 'delete_repo',
repoid=repo_name, )
ret = {
'msg': 'Deleted repository `%s`' % repo_name,
'success': True
expected = ret
def test_api_delete_repo_by_non_admin(self):
fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
cur_user=self.TEST_USER_LOGIN)
id_, params = _build_data(self.apikey_regular, 'delete_repo',
def test_api_delete_repo_by_non_admin_no_permission(self):
expected = 'repository `%s` does not exist' % (repo_name)
def test_api_delete_repo_exception_occurred(self):
with mock.patch.object(RepoModel, 'delete', crash):
expected = 'failed to delete repository `%s`' % repo_name
def test_api_fork_repo(self):
fork_name = 'api-repo-fork'
id_, params = _build_data(self.apikey, 'fork_repo',
repoid=self.REPO,
fork_name=fork_name,
owner=TEST_USER_ADMIN_LOGIN,
'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
'success': True,
'task': None,
fixture.destroy_repo(fork_name)
def test_api_fork_repo_non_admin(self):
@parameterized.expand([
(u'api-repo-fork',),
(u'%s/api-repo-fork' % TEST_REPO_GROUP,),
])
def test_api_fork_repo_non_admin(self, fork_name):
id_, params = _build_data(self.apikey_regular, 'fork_repo',
def test_api_fork_repo_non_admin_specify_owner(self):
def test_api_fork_repo_non_admin_no_permission_to_fork(self):
RepoModel().grant_user_permission(repo=self.REPO,
perm='repository.none')
expected = 'repository `%s` does not exist' % (self.REPO)
@parameterized.expand([('read', 'repository.read'),
('write', 'repository.write'),
('admin', 'repository.admin')])
def test_api_fork_repo_non_admin_no_create_repo_permission(self, name, perm):
# regardless of base repository permission, forking is disallowed
# when repository creation is disabled
perm=perm)
expected = 'no permission to create repositories'
def test_api_fork_repo_unknown_owner(self):
owner = 'i-dont-exist'
expected = 'user `%s` does not exist' % owner
def test_api_fork_repo_fork_exists(self):
fixture.create_fork(self.REPO, fork_name)
expected = "fork `%s` already exist" % fork_name
def test_api_fork_repo_repo_exists(self):
fork_name = self.REPO
expected = "repo `%s` already exist" % fork_name
@mock.patch.object(RepoModel, 'create_fork', crash)
def test_api_fork_repo_exception_occurred(self):
expected = 'failed to fork repository `%s` as `%s`' % (self.REPO,
def test_api_get_user_group(self):
id_, params = _build_data(self.apikey, 'get_user_group',
usergroupid=TEST_USER_GROUP)
user_group = UserGroupModel().get_group(TEST_USER_GROUP)
members = []
for user in user_group.members:
user = user.user
members.append(user.get_api_data())
ret = user_group.get_api_data()
ret['members'] = members
def test_api_get_user_groups(self):
gr_name = 'test_user_group2'
make_user_group(gr_name)
id_, params = _build_data(self.apikey, 'get_user_groups', )
expected = []
for gr_name in [TEST_USER_GROUP, 'test_user_group2']:
user_group = UserGroupModel().get_group(gr_name)
expected.append(ret)
fixture.destroy_user_group(gr_name)
def test_api_create_user_group(self):
group_name = 'some_new_group'
id_, params = _build_data(self.apikey, 'create_user_group',
group_name=group_name)
'msg': 'created new user group `%s`' % group_name,
'user_group': jsonify(UserGroupModel() \
.get_by_name(group_name) \
.get_api_data())
fixture.destroy_user_group(group_name)
def test_api_get_user_group_that_exist(self):
group_name=TEST_USER_GROUP)
expected = "user group `%s` already exist" % TEST_USER_GROUP
@mock.patch.object(UserGroupModel, 'create', crash)
def test_api_get_user_group_exception_occurred(self):
group_name = 'exception_happens'
expected = 'failed to create group `%s`' % group_name
@parameterized.expand([('group_name', {'group_name': 'new_group_name'}),
('group_name', {'group_name': 'test_group_for_update'}),
('owner', {'owner': TEST_USER_REGULAR_LOGIN}),
Status change: